当前位置: 首页 > news >正文

腾讯mini项目-【指标监控服务重构】2023-08-19

今日已办

benchmark

How can we create a configuration for gobench with -benchmem – IDEs Support (IntelliJ Platform) | JetBrains

本机进行watermill-benchmark

  1. 使用 apifox 自动化测试上报固定数量的消息

  2. 启动watermill-pub/sub的 benchmark 函数

    func BenchmarkPubSub(b *testing.B) {for i := 0; i < 1; i++ {b.StopTimer()logger := watermillzap.NewLogger(log.Logger)publisher, subscriber := consumer.NewPubSub(logger)router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Fatal("create router error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.InstantAck,middleware.Recoverer,)router.AddMiddleware(consumer.UnpackKafkaMessage, consumer.InitPerformanceEvent, consumer.AnalyzeEvent)router.AddHandler("crash", "to_analyzer__0.PERF_CRASH", subscriber, "solar-dev.PERF_CRASH", publisher, consumer.CrashHandler)router.AddHandler("lag", "to_analyzer__0.PERF_LAG", subscriber, "solar-dev.PERF_LAG", publisher, consumer.LagHandler)go func() {for {if kafka.PublishCount >= 10000 && router.IsRunning() {err = router.Close()fmt.Printf("router close err:%v\n", err)break}}}()b.StartTimer()if err = router.Run(context.Background()); err != nil {log.Logger.Error("router run error", zap.Error(err))}}
    }
    

单 topic

Event numbermsns/opB/opallocs/op
crash 106980.0130600
crash 1007010.0137300
crash 100011431164375900131129392815484
crash 100006174616030730013000890888179332

双 topic

Event numbermsns/opB/opallocs/op
crash & lag 106890.0148600
crash & lag 1007180.0143800
crash & lag 1000166116772692002699498881797754
crash & lag 100001169711573685900268443070417945041

add publisher send message trace & log logic

  1. 在 handler 了中将 data 用 message 的 Context 传递下来
  2. 获取 WriteKafkaSpan 和 RootSpan 手动 End
  3. 打日志

profile/internal/watermill/pubsub/consumer_stage.go

// crashHandler
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func crashHandler(msg *message.Message) ([]*message.Message, error) {data := GetDataFromMsg(msg)writeKafkaCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "crashHandler",trace.WithSpanKind(trace.SpanKindProducer))setSpanAttributes(span, data)data.WriteKafkaSpan = spantoWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorhandlerErr(writeKafkaCtx, "marshal error", contextErr)data.WriteKafkaSpan.End()data.RootSpan.End()return nil, contextErr}msg = message.NewMessage(data.Event.BackendID, toWriteBytes)msg.Metadata.Set(watermillkafka.HeaderKey, data.Event.ID)log.Logger.Info("[4-crashHandler]", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))SetDataInMsg(msg, data)return message.Messages{msg}, nil
}

profile/internal/watermill/watermillkafka/publisher.go

// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {if p.closed {return errors.New("publisher closed")}logFields := make(watermill.LogFields, 4)logFields["topic"] = topicfor _, msg := range msgs {logFields["message_uuid"] = msg.UUIDp.logger.Trace("Sending message to Kafka", logFields)kafkaMsg, err := p.config.Marshaler.Marshal(topic, msg)if err != nil {return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)}// todo: add otel-trace and log about sendMessagedata := pubsub.GetDataFromMsg(msg)partition, offset, err := p.producer.SendMessage(kafkaMsg)if err != nil {log.Logger.ErrorContext(msg.Context(), "send message to kafka error", zap.Error(err))data.WriteKafkaSpan.End()data.RootSpan.End()return errors.Wrapf(err, "cannot produce message %s", msg.UUID)}log.Logger.Info("[4-WriteKafka] write kafka success",zap.String("topic", connector.GetTopic(data.Event.Category)),zap.String("id", data.Event.ID), zap.Any("msg", data.Event),zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))data.WriteKafkaSpan.End()logFields["kafka_partition"] = partitionlogFields["kafka_partition_offset"] = offsetp.logger.Trace("Message sent to Kafka", logFields)data.RootSpan.End()}return nil
}

明日待办

  1. ppt 制作
http://www.lryc.cn/news/169402.html

相关文章:

  • go实现grpc-快速开始
  • linux上的init 0-6指令作用以及一些快捷键和系统指令
  • Mixin 混入
  • pycharm快捷键
  • 【面试刷题】——Linux基础命令
  • 第四步 Vue2 配置ESLint
  • [.NET学习笔记] - Thread.Sleep与Task.Delay在生产中应用的性能测试
  • 【单线图的系统级微电网仿真】基于 PQ 的可再生能源和柴油发电机组微电网仿真(Simulink)
  • 人脸识别技术应用安全管理规定(试行)|企业采用人脸打卡方式,这4条规定值得关注
  • leetcode 817. 链表组件(java)
  • 分布式事务基础理论
  • 《打造高可用PostgreSQL:策略与工具》
  • 【八大经典排序算法】快速排序
  • vue 父组件给子组件传递一个函数,子组件调用父组件中的方法
  • docker 获取Nvidia 镜像 | cuda |cudnn
  • uTool快捷指令
  • R reason ‘拒绝访问‘的解决方案
  • 许战海战略文库|品类缩量时代:制造型企业如何跨品类打造份额产品?
  • BIT-4-数组
  • L9945的H桥续流模式
  • Ubuntu20.04安装Nvidia显卡驱动、CUDA11.3、CUDNN、TensorRT、Anaconda、ROS/ROS2
  • linux下使用crontab定时器,并且设置定时不执行的情况,附:项目启动遇到的一些问题和命令
  • linux下二进制安装docker最新版docker-24.0.6
  • 计算机视觉 01(介绍)
  • Java下部笔记
  • 链表基本操作
  • Linux学习笔记-Ubuntu系统下配置用户ssh只能访问git仓库
  • 央媒发稿不能改?媒体发布新闻稿有哪些注意点
  • 计算机竞赛 深度学习 opencv python 公式识别(图像识别 机器视觉)
  • KPM算法