当前位置: 首页 > news >正文

33.5 remote实战项目之设计prometheus数据源的结构

本节重点介绍 :

  • 项目要求
    • 通过remote read读取prometheus中的数据
    • 通过remote write向prometheus中写入数据
  • 准备工作
    • 新建项目 prome_remote_read_write
    • 设计prometheus 数据源的结构
    • 初始化

项目要求

  • 通过remote read读取prometheus中的数据
  • 通过remote write向prometheus中写入数据

准备工作

新建项目 prome_remote_read_write

go mod init prome_remote_read_write

准备配置文件 prome_remote_read_write.yml

  • remoteWrite代表 支持remote_write的多个后端
  • remoteRead代表 支持remote_read的多个后端
remoteWrite:# m3db的配置#- name: m3db01#  url: http://localhost:7201/api/v1/prom/remote/write#  remoteTimeoutSecond: 5# prometheus的配置- name: prome01url: http://172.20.70.205:9090/api/v1/writeremoteTimeoutSecond: 5
remoteRead:- name: prome01url: http://172.20.70.205:9090/api/v1/readremoteTimeoutSecond: 5

配置文件解析

  • config/config.go
package configimport ("github.com/toolkits/pkg/logger""gopkg.in/yaml.v2""io/ioutil"
)type RemoteConfig struct {Name                string `yaml:"name"`Url                 string `yaml:"url"`RemoteTimeoutSecond int    `yaml:"remoteTimeoutSecond"`
}type PromeSection struct {RemoteWrite []RemoteConfig `yaml:"remoteWrite"`RemoteRead  []RemoteConfig `yaml:"remoteRead"`
}func Load(s string) (*PromeSection, error) {cfg := &PromeSection{}err := yaml.Unmarshal([]byte(s), cfg)if err != nil {return nil, err}return cfg, nil
}func LoadFile(filename string) (*PromeSection, error) {content, err := ioutil.ReadFile(filename)if err != nil {return nil, err}cfg, err := Load(string(content))if err != nil {logger.Errorf("[parsing YAML file errr...][error:%v]", err)return nil, err}return cfg, nil
}

main.go中解析配置

package mainimport ("flag""github.com/toolkits/pkg/logger""math/rand""prome_remote_read_write/config""prome_remote_read_write/datasource""time"
)func main() {rand.Seed(time.Now().UnixNano())configFile := flag.String("config", "prome_remote_read_write.yml","Address on which to expose metrics and web interface.")flag.Parse()sConfig, err := config.LoadFile(*configFile)if err != nil {logger.Infof("config.LoadFile Error,Exiting ...error:%v", err)return}
}

设计prometheus 数据源的结构

  • 位置 datasource/prome.go
package datasourceimport ("github.com/go-kit/kit/log""github.com/prometheus/client_golang/prometheus"config_util "github.com/prometheus/common/config""github.com/prometheus/common/model""github.com/prometheus/common/promlog"pc "github.com/prometheus/prometheus/config""github.com/prometheus/prometheus/prompb""github.com/prometheus/prometheus/promql""github.com/prometheus/prometheus/storage""github.com/prometheus/prometheus/storage/remote""github.com/toolkits/pkg/logger""go.uber.org/atomic""io/ioutil""net/http""net/url""prome_remote_read_write/config""time"
)type PromeDataSource struct {Section      *config.PromeSection            //配置PushQueue    chan []prompb.TimeSeries        // 数据推送的chanLocalTmpDir  string                          // 本地临时目录,存放queries.active文件Queryable    storage.SampleAndChunkQueryable // 除了promql的查询,需要后端存储,如查询seriesQueryEngine  *promql.Engine                  // promql相关查询WriteTargets []*HttpClient                   // remote_write写入的后端地址
}type HttpClient struct {remoteName string // Used to differentiate clients in metrics.url        *url.URLClient     *http.Clienttimeout    time.Duration
}

new函数

  • 根据传入的配置new
func NewPromeDataSource(cg *config.PromeSection) *PromeDataSource {pd := &PromeDataSource{Section:   cg,PushQueue: make(chan []prompb.TimeSeries, 10000),}return pd
}

Init初始化函数

  • 完整代码如下

type safePromQLNoStepSubqueryInterval struct {value atomic.Int64
}func durationToInt64Millis(d time.Duration) int64 {return int64(d / time.Millisecond)
}
func (i *safePromQLNoStepSubqueryInterval) Set(ev model.Duration) {i.value.Store(durationToInt64Millis(time.Duration(ev)))
}
func (i *safePromQLNoStepSubqueryInterval) Get(int64) int64 {return i.value.Load()
}func NewPromeDataSource(cg *config.PromeSection) *PromeDataSource {pd := &PromeDataSource{Section:   cg,PushQueue: make(chan []prompb.TimeSeries, 10000),}return pd
}func (pd *PromeDataSource) Init() {// 模拟创建本地存储目录dbDir, err := ioutil.TempDir("", "tsdb-api-ready")if err != nil {logger.Errorf("[error_create_local_tsdb_dir][err: %v]", err)return}pd.LocalTmpDir = dbDirpromlogConfig := promlog.Config{}// 使用本地目录创建remote-storageremoteS := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {return 0, nil}, dbDir, 1*time.Minute, nil)// ApplyConfig 加载queryablesremoteReadC := make([]*pc.RemoteReadConfig, 0)for _, u := range pd.Section.RemoteRead {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}remoteReadC = append(remoteReadC,&pc.RemoteReadConfig{URL:           &config_util.URL{URL: ur},RemoteTimeout: model.Duration(time.Duration(u.RemoteTimeoutSecond) * time.Second),ReadRecent:    true,},)}if len(remoteReadC) == 0 {logger.Errorf("[prome_ds_error_got_zero_remote_read_storage]")return}err = remoteS.ApplyConfig(&pc.Config{RemoteReadConfigs: remoteReadC})if err != nil {logger.Errorf("[error_load_remote_read_config][err: %v]", err)return}pLogger := log.NewNopLogger()noStepSubqueryInterval := &safePromQLNoStepSubqueryInterval{}queryQueueDir, err := ioutil.TempDir(dbDir, "prom_query_concurrency")opts := promql.EngineOpts{Logger:                   log.With(pLogger, "component", "query engine"),Reg:                      prometheus.DefaultRegisterer,MaxSamples:               50000000,Timeout:                  30 * time.Second,ActiveQueryTracker:       promql.NewActiveQueryTracker(queryQueueDir, 20, log.With(pLogger, "component", "activeQueryTracker")),LookbackDelta:            5 * time.Minute,NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,EnableAtModifier:         true,}queryEngine := promql.NewEngine(opts)pd.QueryEngine = queryEnginepd.Queryable = remoteS// 初始化writeClientsif len(pd.Section.RemoteWrite) == 0 {logger.Warningf("[prome_ds_init_with_zero_RemoteWrite_target]")logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(pd.Section.RemoteRead),len(pd.Section.RemoteWrite),)return}writeTs := make([]*HttpClient, 0)for _, u := range pd.Section.RemoteWrite {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}writeTs = append(writeTs,&HttpClient{remoteName: u.Name,url:        ur,Client:     &http.Client{},timeout:    time.Duration(u.RemoteTimeoutSecond) * time.Second,})}pd.WriteTargets = writeTs// 开启prometheus 队列消费协程go pd.remoteWrite()logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(remoteReadC),len(writeTs),)
}

创建本地存储目录和remote-storage

  • 模拟创建本地存储目录
	// 模拟创建本地存储目录dbDir, err := ioutil.TempDir("", "tsdb-api-ready")if err != nil {logger.Errorf("[error_create_local_tsdb_dir][err: %v]", err)return}pd.LocalTmpDir = dbDir
  • 使用本地目录创建remote-storage
	// 使用本地目录创建remote-storageremoteS := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {return 0, nil}, dbDir, 1*time.Minute, nil)

创建remote_read对象

  • 遍历配置中的remote_read,构造RemoteReadConfig
  • 使用RemoteReadConfig.ApplyConfig 生效配置
	// ApplyConfig 加载queryablesremoteReadC := make([]*pc.RemoteReadConfig, 0)for _, u := range pd.Section.RemoteRead {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}remoteReadC = append(remoteReadC,&pc.RemoteReadConfig{URL:           &config_util.URL{URL: ur},RemoteTimeout: model.Duration(time.Duration(u.RemoteTimeoutSecond) * time.Second),ReadRecent:    true,},)}if len(remoteReadC) == 0 {logger.Errorf("[prome_ds_error_got_zero_remote_read_storage]")return}err = remoteS.ApplyConfig(&pc.Config{RemoteReadConfigs: remoteReadC})if err != nil {logger.Errorf("[error_load_remote_read_config][err: %v]", err)return}

创建QueryEngine并赋值

	noStepSubqueryInterval := &safePromQLNoStepSubqueryInterval{}queryQueueDir, err := ioutil.TempDir(dbDir, "prom_query_concurrency")opts := promql.EngineOpts{Logger:                   log.With(pLogger, "component", "query engine"),Reg:                      prometheus.DefaultRegisterer,MaxSamples:               50000000,Timeout:                  30 * time.Second,ActiveQueryTracker:       promql.NewActiveQueryTracker(queryQueueDir, 20, log.With(pLogger, "component", "activeQueryTracker")),LookbackDelta:            5 * time.Minute,NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,EnableAtModifier:         true,}queryEngine := promql.NewEngine(opts)pd.QueryEngine = queryEnginepd.Queryable = remoteS

初始化writeClients创建RemoteWrite对象

  • 遍历RemoteWrite配置创建
  • 开启prometheus 队列消费协程
	// 初始化writeClientsif len(pd.Section.RemoteWrite) == 0 {logger.Warningf("[prome_ds_init_with_zero_RemoteWrite_target]")logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(pd.Section.RemoteRead),len(pd.Section.RemoteWrite),)return}writeTs := make([]*HttpClient, 0)for _, u := range pd.Section.RemoteWrite {ur, err := url.Parse(u.Url)if err != nil {logger.Errorf("[prome_ds_init_error][parse_url_error][url:%+v][err:%+v]", u.Url, err)continue}writeTs = append(writeTs,&HttpClient{remoteName: u.Name,url:        ur,Client:     &http.Client{},timeout:    time.Duration(u.RemoteTimeoutSecond) * time.Second,})}pd.WriteTargets = writeTs// 开启prometheus 队列消费协程go pd.remoteWrite()logger.Infof("[successfully_init_prometheus_datasource][remote_read_num:%+v][remote_write_num:%+v]",len(remoteReadC),len(writeTs),)

本节重点总结 :

  • 项目要求
    • 通过remote read读取prometheus中的数据
    • 通过remote write向prometheus中写入数据
  • 准备工作
    • 新建项目 prome_remote_read_write
    • 设计prometheus 数据源的结构
    • 初始化
http://www.lryc.cn/news/497534.html

相关文章:

  • 微服务springboot详细解析(一)
  • 深入探讨Go语言中的双向链表
  • Fastapi + vue3 自动化测试平台---移动端App自动化篇
  • ElasticSearch easy-es 聚合函数 group by 混合写法求Top N 词云 分词
  • 在 ASP.NET C# Web API 中实现 Serilog 以增强请求和响应的日志记录
  • 2024年顶级小型语言模型前15名
  • 精通 Python 网络安全(一)
  • 【python自动化二】pytest集成allure生成测试报告
  • 网络版本的通讯录青春版(protobuf)
  • 开源模型应用落地-安全合规篇-用户输入价值观判断(三)
  • 神经网络入门实战:(十四)pytorch 官网内置的 CIFAR10 数据集,及其网络模型
  • 【Rust在WASM中实现pdf文件的生成】
  • 在MySQL中执行sum case when报错:SUM does not exist
  • 【openssl】相关指令
  • 实例分割详解
  • D87【python 接口自动化学习】- pytest基础用法
  • 浅谈MySQL路由
  • matlab中disp,fprintf,sprintf,display,dlmwrite输出函数之间的区别
  • 30.100ASK_T113-PRO 用QT编写视频播放器(一)
  • Linux-GPIO应用编程
  • opencvocr识别手机摄像头拍摄的指定区域文字,文字符合规则就语音报警
  • 微服务即时通讯系统(5)用户管理子服务,网关子服务
  • postgreSQL安装后启动有The application server could not be contacted问题
  • 架构05-架构安全性
  • 虚幻引擎---材质篇
  • NPM镜像详解
  • 从智能合约到去中心化AI:Web3的技术蓝图
  • STM32进阶 定时器3 通用定时器 案例1:LED呼吸灯——PWM脉冲
  • 开源即时通讯与闭源即时通讯该怎么选择,其优势是什么?
  • 930[water]