搜索

远程写入prometheus存储

发表于 2025-11-04 20:02:00 来源:全栈开发
远程写入prometheus存储
复制package prome  import (      "bufio"     "bytes"     "context"     "io"     "io/ioutil"     "net/http"     "net/url"     "regexp"     "time"     "github.com/gogo/protobuf/proto"     "github.com/golang/snappy"     "github.com/opentracing-contrib/go-stdlib/nethttp"     opentracing "github.com/opentracing/opentracing-go"     "github.com/pkg/errors"     "github.com/prometheus/common/model"     "github.com/prometheus/prometheus/pkg/labels"     "github.com/prometheus/prometheus/prompb" )  type RecoverableError struct {      error }  type HttpClient struct {      url     *url.URL      Client  *http.Client      timeout time.Duration }  var MetricNameRE = regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`)  type MetricPoint struct {      Metric  string            `json:"metric"` // 指标名称      TagsMap map[string]string `json:"tags"`   // 数据标签      Time    int64             `json:"time"`   // 时间戳,远程单位是云南idc服务商写入秒      Value   float64           `json:"value"`  // 内部字段,源码库最终转换之后的亿华云存储float64数值 }  func (c *HttpClient) remoteWritePost(req []byte) error {      httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req))      if err != nil {          return err      }      httpReq.Header.Add("Content-Encoding", "snappy")      httpReq.Header.Set("Content-Type", "application/x-protobuf")      httpReq.Header.Set("User-Agent", "opcai")      httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")      ctx, cancel := context.WithTimeout(context.Background(), c.timeout)      defer cancel()      httpReq = httpReq.WithContext(ctx)      if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {          var ht *nethttp.Tracer          httpReq, ht = nethttp.TraceRequest(              parentSpan.Tracer(),              httpReq,              nethttp.OperationName("Remote Store"),              nethttp.ClientTrace(false),          )          defer ht.Finish()      }      httpResp, err := c.Client.Do(httpReq)      if err != nil {          // Errors from Client.Do are from (for example) network errors, so are          // recoverable.          return RecoverableError{err}      }      defer func() {          io.Copy(ioutil.Discard, httpResp.Body)          httpResp.Body.Close()      }()      if httpResp.StatusCode/100 != 2 {          scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 512))          line := ""         if scanner.Scan() {              line = scanner.Text()          }          err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)      }      if httpResp.StatusCode/100 == 5 {          return RecoverableError{err}      }      return err  }  func buildWriteRequest(samples []*prompb.TimeSeries) ([]byte, error) {      req := &prompb.WriteRequest{          Timeseries: samples,      }      data, err := proto.Marshal(req)      if err != nil {          return nil, err      }      compressed := snappy.Encode(nil, data)      return compressed, nil  }  type sample struct {      labels labels.Labels      t      int64      v      float64 }  const (      LABEL_NAME = "__name__" )  func convertOne(item *MetricPoint) (*prompb.TimeSeries, error) {      pt := prompb.TimeSeries{}      pt.Samples = []prompb.Sample{{}}      s := sample{}      s.t = item.Time     s.v = item.Value      // name     if !MetricNameRE.MatchString(item.Metric) {          return &pt, errors.New("invalid metrics name")      }      nameLs := labels.Label{          Name:  LABEL_NAME,          Value: item.Metric,      }      s.labels = append(s.labels, nameLs)      for k, v := range item.TagsMap {          if model.LabelNameRE.MatchString(k) {              ls := labels.Label{                  Name:  k,                  Value: v,              }              s.labels = append(s.labels, ls)          }      }      pt.Labels = labelsToLabelsProto(s.labels, pt.Labels)      // 时间赋值问题,使用毫秒时间戳      tsMs := time.Unix(s.t, 0).UnixNano() / 1e6      pt.Samples[0].Timestamp = tsMs      pt.Samples[0].Value = s.v      return &pt, nil  }  func labelsToLabelsProto(labels labels.Labels, buf []*prompb.Label) []*prompb.Label {      result := buf[:0]      if cap(buf) < len(labels) {          result = make([]*prompb.Label, 0, len(labels))      }      for _, l := range labels {          result = append(result, &prompb.Label{              Name:  l.Name,              Value: l.Value,          })      }      return result  }  func (c *HttpClient) RemoteWrite(items []MetricPoint) (err error) {      if len(items) == 0 {          return     }      ts := make([]*prompb.TimeSeries, len(items))      for i := range items {          ts[i], err = convertOne(&items[i])          if err != nil {              return         }      }      data, err := buildWriteRequest(ts)      if err != nil {          return     }      err = c.remoteWritePost(data)      return }  func NewClient(ur string, timeout time.Duration) (c *HttpClient, err error) {      u, err := url.Parse(ur)      if err != nil {          return     }      c = &HttpClient{          url:     u,          Client:  &http.Client{},          timeout: timeout,      }      return }  1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.123.124.125.126.127.128.129.130.131.132.133.134.135.136.137.138.139.140.141.142.143.144.145.146.147.148.149.150.151.152.153.154.155.156.157.158.159.160.161.162.163.164.165.166.167.168.169.170.171.172.173.174.175.176.177.178.179.180.181.182.183.184.185.186.187.188.189.190.191.192.193.
随机为您推荐
版权声明:本站资源均来自互联网,如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

Copyright © 2016 Powered by 远程写入prometheus存储,全栈开发  滇ICP备2023006006号-32sitemap

回顶部