  复制package prome             import (                 "bufio"                "bytes"                "context"                "io"                "io/ioutil"                "net/http"                "net/url"                "regexp"                "time"                "github.com/gogo/protobuf/proto"                "github.com/golang/snappy"                "github.com/opentracing-contrib/go-stdlib/nethttp"                opentracing "github.com/opentracing/opentracing-go"                "github.com/pkg/errors"                "github.com/prometheus/common/model"                "github.com/prometheus/prometheus/pkg/labels"                "github.com/prometheus/prometheus/prompb"            )             type RecoverableError struct {                 error }             type HttpClient struct {                 url     *url.URL                 Client  *http.Client                 timeout time.Duration }             var MetricNameRE = regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`)             type MetricPoint struct {                 Metric  string            `json:"metric"` // 指标名称                 TagsMap map[string]string `json:"tags"`   // 数据标签                 Time    int64             `json:"time"`   // 时间戳,远程单位是云南idc服务商写入秒                 Value   float64           `json:"value"`  // 内部字段,源码库最终转换之后的亿华云存储float64数值 }             func (c *HttpClient) remoteWritePost(req []byte) error {                 httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req))                 if err != nil {                     return err                 }                 httpReq.Header.Add("Content-Encoding", "snappy")                 httpReq.Header.Set("Content-Type", "application/x-protobuf")                 httpReq.Header.Set("User-Agent", "opcai")                 httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")                 ctx, cancel := context.WithTimeout(context.Background(), c.timeout)                 defer cancel()                 httpReq = httpReq.WithContext(ctx)                 if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {                     var ht *nethttp.Tracer                     httpReq, ht = nethttp.TraceRequest(                         parentSpan.Tracer(),                         httpReq,                         nethttp.OperationName("Remote Store"),                         nethttp.ClientTrace(false),                     )                     defer ht.Finish()                 }                 httpResp, err := c.Client.Do(httpReq)                 if err != nil {                     // Errors from Client.Do are from (for example) network errors, so are                     // recoverable.                     return RecoverableError{err}                 }                 defer func() {                     io.Copy(ioutil.Discard, httpResp.Body)                     httpResp.Body.Close()                 }()                 if httpResp.StatusCode/100 != 2 {                     scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 512))                     line := ""                    if scanner.Scan() {                         line = scanner.Text()                     }                     err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)                 }                 if httpResp.StatusCode/100 == 5 {                     return RecoverableError{err}                 }                 return err             }             func buildWriteRequest(samples []*prompb.TimeSeries) ([]byte, error) {                 req := &prompb.WriteRequest{                     Timeseries: samples,                 }                 data, err := proto.Marshal(req)                 if err != nil {                     return nil, err                 }                 compressed := snappy.Encode(nil, data)                 return compressed, nil             }             type sample struct {                 labels labels.Labels                 t      int64                 v      float64 }             const (                 LABEL_NAME = "__name__"            )             func convertOne(item *MetricPoint) (*prompb.TimeSeries, error) {                 pt := prompb.TimeSeries{}                 pt.Samples = []prompb.Sample{{}}                 s := sample{}                 s.t = item.Time                s.v = item.Value                 // name                if !MetricNameRE.MatchString(item.Metric) {                     return &pt, errors.New("invalid metrics name")                 }                 nameLs := labels.Label{                     Name:  LABEL_NAME,                     Value: item.Metric,                 }                 s.labels = append(s.labels, nameLs)                 for k, v := range item.TagsMap {                     if model.LabelNameRE.MatchString(k) {                         ls := labels.Label{                             Name:  k,                             Value: v,                         }                         s.labels = append(s.labels, ls)                     }                 }                 pt.Labels = labelsToLabelsProto(s.labels, pt.Labels)                 // 时间赋值问题,使用毫秒时间戳                 tsMs := time.Unix(s.t, 0).UnixNano() / 1e6                 pt.Samples[0].Timestamp = tsMs                 pt.Samples[0].Value = s.v                 return &pt, nil             }             func labelsToLabelsProto(labels labels.Labels, buf []*prompb.Label) []*prompb.Label {                 result := buf[:0]                 if cap(buf) < len(labels) {                     result = make([]*prompb.Label, 0, len(labels))                 }                 for _, l := range labels {                     result = append(result, &prompb.Label{                         Name:  l.Name,                         Value: l.Value,                     })                 }                 return result             }             func (c *HttpClient) RemoteWrite(items []MetricPoint) (err error) {                 if len(items) == 0 {                     return                }                 ts := make([]*prompb.TimeSeries, len(items))                 for i := range items {                     ts[i], err = convertOne(&items[i])                     if err != nil {                         return                    }                 }                 data, err := buildWriteRequest(ts)                 if err != nil {                     return                }                 err = c.remoteWritePost(data)                 return            }             func NewClient(ur string, timeout time.Duration) (c *HttpClient, err error) {                 u, err := url.Parse(ur)                 if err != nil {                     return                }                 c = &HttpClient{                     url:     u,                     Client:  &http.Client{},                     timeout: timeout,                 }                 return            }             1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.123.124.125.126.127.128.129.130.131.132.133.134.135.136.137.138.139.140.141.142.143.144.145.146.147.148.149.150.151.152.153.154.155.156.157.158.159.160.161.162.163.164.165.166.167.168.169.170.171.172.173.174.175.176.177.178.179.180.181.182.183.184.185.186.187.188.189.190.191.192.193. |