当前位置: 首页 > news >正文

腾讯mini项目-【指标监控服务重构】2023-08-23

今日已办

进度和问题汇总

  1. 请求合并
    1. feature/venus trace
    2. feature/venus metric
    3. feature/profile-otel-baserunner-style
    4. bugfix/profile-logger-Sync
    5. feature/profile_otelclient_enable_config
  2. 完成otel 开关
    1. trace-采样
    2. metrice-reader
  3. 已经都在各自服务器运行,并接入了云clickhouse集群,开始准备测试【详细需求】
    1. 测试的用例,并发的数目-【用例拓展-kafka的消息积压】
    2. clickhouse的哪些指标,cpu、内存,耗时等
    3. 以什么形式来输出这个性能对比?(表格or图形)
    4. 指标采集的性能消耗,复杂指标查询的消耗
    5. 对比对象-Jaeger
      1. 存储后端-elasticsearch 【手动部署或者购买】
      2. 收集存储,查询
      3. golang pprof 抓取文件 CPU 占用和耗时,内存-火焰图
      4. 不同方案做对比
    6. ck 的指标
      1. **数据库的延时,(五分钟)入库成功率 **【压测】
      2. 通过指标或者链路耗时,定位哪个环节卡住
      3. 压测 jaeger 数据收集出现问题-【qps】,降低配置,突出优势
      4. 内存和cpu占有,profile 手动收集指标
  4. profile服务器3301的端口
  5. watermill和baserunner的benmark,做得差不多了,修改了publisher用了kafka-client的异步生产者,耗时快了很多
  6. 需要启动其他监控工具(zipkin,jaeger【已经接入,正在尝试连入ck】,Prometheus等来进行对比吗)
  7. 一个优化代码中接入otel-sdk,如何减少显式声明,提高代码的可扩展性
    1. profile 已经将otel逻辑嵌入到baserunner的handler中
    2. venus 待办
    3. profile-watermill 待办

分工

  1. 测试用例 - 1
  2. jaeger - 2
  3. pprof - 1
  4. 测试对比两种方案的 clickhouse 指标
  5. docker-compose拉低配置

watermill-benchmark

代码实现

  1. 先初始化 producer
  2. watermill 初始化并启动 router / baserunner 初始化 consumer
  3. 在 for 循环中同步生产完固定数量的消息【开始计时】
  4. 阻塞等待固定数量的消息被消费,解析,处理,异步推回 kafka 完成【结束计时】
  5. 本机和服务器测试单个topic的100条消息的结果见下列表格
    1. watermill 的性能和资源利用率均好于 baserunner
    2. 核心数多的情况下,优势会更加明显
// Package consumer
// @Author xzx 2023/8/19 14:13:00
package internalimport ("context""fmt"kc "github.com/Kevinello/kafka-client""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""github.com/bytedance/sonic""github.com/garsue/watermillzap""github.com/google/uuid""github.com/segmentio/kafka-go""go.uber.org/zap""go.uber.org/zap/zapcore""profile/cmd""profile/internal/config"baseconsumer "profile/internal/context/consumer""profile/internal/log""profile/internal/schema""profile/internal/watermill/consumer""profile/internal/watermill/watermillkafka""testing""time"
)// BenchmarkWatermill-16           240           5631314 ns/op         3684370 B/op           37997 allocs/op
// BenchmarkWatermill-16           153           7084305 ns/op         3706966 B/op           38168 allocs/op
// BenchmarkWatermill-16           145           6917486 ns/op         3712511 B/op           38175 allocs/op
func BenchmarkWatermill(b *testing.B) {router := newRouter()go func() {if err := router.Run(context.Background()); err != nil {log.Logger.Error("router run error", zap.Error(err))}}()producer := newProducer()time.Sleep(10 * time.Millisecond)b.ResetTimer()for i := 0; i < b.N; i++ {b.StopTimer()watermillkafka.MessageCount = 0err := publishMessage(producer, 100)if err != nil {break}b.StartTimer()// 阻塞等待消费完成指定数量for {if watermillkafka.MessageCount >= 100 && router.IsRunning() {b.StopTimer()log.Logger.Error("PubSub Count End", zap.Any("count", watermillkafka.MessageCount))break}}}b.StopTimer()router.Close()
}// BenchmarkBaseRunner-16         12         100429542 ns/op         4959836 B/op      42119 allocs/op
// BenchmarkBaseRunner-16         10         100110220 ns/op         4946421 B/op      42132 allocs/op
// BenchmarkBaseRunner-16         10         106747810 ns/op         4942656 B/op      42107 allocs/op
func BenchmarkBaseRunner(b *testing.B) {producer := newProducer()myConsumer, err := kc.NewConsumer(context.Background(),kc.ConsumerConfig{Bootstrap: config.Profile.GetString("kafka.bootstrap"),GroupID:   config.Profile.GetString("kafka.group"),GetTopics: func(broker string) (topics []string, err error) {return []string{"to_analyzer__0.PERF_CRASH","to_analyzer__0.PERF_LAG",}, nil},MessageHandler: cmd.ConsumerDispatchHandler,LogLevel:       int(zapcore.InfoLevel),},)if err != nil {log.Logger.Fatal("create consumer error", zap.Error(err))return}go func() {select {case <-myConsumer.Closed():log.Logger.Info("consumer Closed")return}}()time.Sleep(10 * time.Millisecond)b.ResetTimer()for i := 0; i < b.N; i++ {b.StopTimer()baseconsumer.ConsumeCount = 0err := publishMessage(producer, 100)if err != nil {break}b.StartTimer()// 阻塞等待消费完成指定数量for {if baseconsumer.ConsumeCount >= 100 {log.Logger.Error("PubSub Count End", zap.Any("count", baseconsumer.ConsumeCount))break}}}b.StopTimer()myConsumer.Closed()
}func publishMessage(producer *kc.Producer, nums int) (err error) {var event = &schema.Event{Meta: schema.Meta{AppID:    "1024",Category: "PERF_CRASH",Model:    "xiaomi13",DeviceID: "1b201ff9-5002-4fae-8d22-507a1c1a10b6",Os:       "ios",OsVer:    "13.1",UserID:   "28865194-fd08-480f-957d-ee9f21b32c3c",Version:  "100.24.56.7.19",Arch:     "aarch64",SdkVer:   "5.12.6",Platform: "ios",},Data: schema.Data{Time:         1688491757512,IP:           "119.147.10.203",ID:           "a4b838db-4f34-4da8-a27b-e725477ed336",NetType:      "5G",NetOp:        "CT",BatteryLevel: 92,PageID:       "com.tencent.test.page1",Dimensions: map[string]string{"crashed_thread": "com.tencent.thread1","crash_type":     "native","lose_data":      "true","repeat_occur":   "false",},Values: map[string]int64{"memory_free":  600,"memory_max":   1200,"memory_total": 1600,"remain_disk":  4000,},},VenusData: schema.VenusData{UploadTime: time.Now().UnixMilli(),BackendID:  uuid.NewString(),Country:    "China",Region:     "Guangdong",City:       "Shenzhen",},}topic := fmt.Sprintf("to_analyzer__0.%s", event.Category)messages := make([]kafka.Message, 0, nums)for i := 0; i < nums; i++ {event.UploadTime = time.Now().UnixMilli()event.BackendID = uuid.NewString()bytes, err := sonic.Marshal(event)if err != nil {fmt.Printf("failed to marshal event: %v\n", err)}messages = append(messages, kafka.Message{Topic: topic,Value: bytes,})}if err = producer.WriteMessages(context.Background(), messages...); err != nil {fmt.Printf("failed to write messages: %v\n", err)}return
}func newProducer() *kc.Producer {eventKafkaConfig := &kc.ProducerConfig{Bootstrap:              "127.0.0.1:9092",Async:                  false,AllowAutoTopicCreation: true,Logger:                 &log.LogrLogger,}producer, err := kc.NewProducer(context.Background(), *eventKafkaConfig)if err != nil {panic("cannot connect to kafka with address 127.0.0.1:9092")}return producer
}func newRouter() *message.Router {logger := watermillzap.NewLogger(log.Logger)publisher, subscriber := consumer.NewPubSub(logger)router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Fatal("create router error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.InstantAck,middleware.Recoverer,)router.AddMiddleware(consumer.UnpackKafkaMessage, consumer.InitPerformanceEvent, consumer.AnalyzeEvent)router.AddHandler("crash", "to_analyzer__0.PERF_CRASH", subscriber, "solar-dev.PERF_CRASH", publisher, consumer.CrashHandler)router.AddHandler("lag", "to_analyzer__0.PERF_LAG", subscriber, "solar-dev.PERF_LAG", publisher, consumer.LagHandler)return router
}

本地测试

BenchmarkWatermill-16BenchmarkBaseRunner-16
240 563,1314 ns/op 368,4370 B/op 3,7997 allocs/op12 1,0042,9542 ns/op 495,9836 B/op 4,2119 allocs/op
153 708,4305 ns/op 370,6966 B/op 3,8168 allocs/op10 1,0011,0220 ns/op 494,6421 B/op 4,2132 allocs/op
145 691,7486 ns/op 371,2511 B/op 3,8175 allocs/op10 1,0674,7810 ns/op 1,0674,7810 B/op 4,2107 allocs/op

服务器上测试

image-20230823202739491

image-20230823204753051

单个topic的100条消息

BenchmarkWatermill-4BenchmarkBaseRunner-4
10 4339,8240 ns/op 363,0762 B/op 3,7820 allocs/op25 4616,7095 ns/op 315,8836 B/op 3,9902 allocs/op
78 4065,2822 ns/op 360,0755 B/op 3,7893 allocs/op26 4330,6776 ns/op 317,8770 B/op 3,9880 allocs/op
100 3549,3863 ns/op 360,5322 B/op 3,7899 allocs/op100 4489,2327 ns/op 316,3158 B/op 3,9775 allocs/op
386 1427,4034 ns/op 358,7454 B/op 3,7876 allocs/op10000 4949,4435 ns/op 319,7664 B/op 3,9874 allocs/op

本地测试单个topic的100条消息

testb.nns/opB/opallocs/op
BenchmarkWatermill-161537084305370696638168
BenchmarkWatermill-161456917486371251138175
BenchmarkBaseRunner-1610100110220494642142132
BenchmarkBaseRunner-161010674781010674781042107

服务器测试单个topic的100条消息

testb.nns/opB/opallocs/op
BenchmarkWatermill-47840652822360075537893
BenchmarkBaseRunner-42643306776317877039880

明日待办

  1. 协助部署 jaeger
http://www.lryc.cn/news/168854.html

相关文章:

  • C- ssize_t size_t
  • ubuntu20.04 Supervisor 开机自启动脚本一文配置
  • 【面试刷题】——函数指针和指针函数
  • 目标分类笔记(一): 利用包含多个网络多种训练策略的框架来完成多目标分类任务(从数据准备到训练测试部署的完整流程)
  • 【100天精通Python】Day61:Python 数据分析_Pandas可视化功能:绘制饼图,箱线图,散点图,散点图矩阵,热力图,面积图等(示例+代码)
  • 2023华为产品测评官-开发者之声 | 华为云CodeArts征文活动,多重好礼邀您发声!
  • Python 图形化界面基础篇:获取文本框中的用户输入
  • 【驱动开发】实现三盏灯的控制,编写应用程序测试
  • Vue3+ElementUI使用
  • MySQL 和 MariaDB 版本管理的历史背景及差异
  • linux驱动开发--day4(字符设备驱动注册内部流程、及实现备文件和设备的绑定下LED灯实验)
  • elasticsearch5-RestAPI操作
  • 数据结构与算法(一)
  • Matlab--微积分问题的计算机求解
  • GRU实现时间序列预测(PyTorch版)
  • 文本框粘贴时兼容Unix、Mac换行符的方法源码
  • 2023年华为杯研究生数学建模竞赛辅导
  • post更新,put相当于删除重新增一条
  • python责任链模式
  • 大数据技术准备
  • 【力扣周赛】第 362 场周赛(⭐差分匹配状态压缩DP矩阵快速幂优化DPKMP)
  • 四大函数式接口(重点,必须掌握)
  • 2023Web前端逻辑面试题
  • uniapp中git忽略node_modules,unpackage文件
  • Json-Jackson和FastJson
  • RK3588 点亮imx586摄像头
  • C++---继承
  • 使用新版Maven-mvnd快速构建项目
  • 【ICASSP 2023】ST-MVDNET++论文阅读分析与总结
  • MySQL 面试题——MySQL 基础