当前位置: 首页 > news >正文

腾讯mini项目-【指标监控服务重构】2023-08-16

今日已办

v1

验证 StageHandler 在处理消息时是否为单例,【错误尝试】

type StageHandler struct {
}func (s StageHandler) Middleware1(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 1")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Middleware2(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 2")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Middleware3(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 3")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Handler1(msg *message.Message) error {log.Logger.Info("StageHandler Handler 1")fmt.Printf("%p\n", &s)return nil
}

image-20230816161140984

v2

  • 定义不同 Handler
type CrashHandler struct {Topic string
}func (s CrashHandler) Handler1(msg *message.Message) error {log.Logger.Info(s.Topic + ": CrashHandler Handler 1 start")fmt.Printf("%p\n", &s)time.Sleep(1 * time.Second)log.Logger.Info(s.Topic + ": CrashHandler Handler 1 end")return nil
}type LagHandler struct {Topic string
}func (s LagHandler) Handler1(msg *message.Message) error {log.Logger.Info(s.Topic + ": LagHandler Handler 1 start")fmt.Printf("%p\n", &s)time.Sleep(1 * time.Second)log.Logger.Info(s.Topic + ": LagHandler Handler 1 end")return nil
}
  • 添加到router中
	for _, topic := range topics {var category stringvar handlerFunc message.NoPublishHandlerFuncif strings.Contains(topic, performance.CategoryCrash) {category = performance.CategoryCrashhandlerFunc = CrashHandler{Topic: category}.Handler1} else if strings.Contains(topic, performance.CategoryLag) {category = performance.CategoryLaghandlerFunc = LagHandler{Topic: category}.Handler1} else {continue}handler := router.AddNoPublisherHandler(topic+"test-handler", topic, subscriber, handlerFunc)}

image-20230816171659632

  • 结论
    • handler 实例会不断创建
    • 不同的 handler 可以并行处理不同主题的消息
    • 相同的 handler 在处理该主题的消息时是顺序的

官方文档: Message Router (watermill.io)

订阅者可以一次消费一条消息,也可以并行消费多条消息

  • Single stream of messages 是最简单的方法,这意味着在调用 msg.Ack() 之前,订阅者将不会收到任何新消息
  • Multiple message streams 仅部分订阅者支持。通过一次订阅多个主题分区,可以并行消费多条消息,甚至是之前未确认的消息(例如,Kafka 订阅者就是这样工作的) Router 通过运行并发 HandlerFuncs(每个分区一个)来处理此模型

v3

存在并发安全问题

  1. 公用一个上下文
  2. 频繁的修改上下文中的字段值
  3. 不同Handler和MiddleWare存在并发

解决思路

  • 将一次消息处理会使用到的数据集合定义为一个结构体
type ContextData struct {Status intEvent  schema.EventAppID         string // API 上报FetchScenario string // API 上报
}
  • 使用message的Context来传递这个数据

image-20230816222038683

  • 移除掉 ProfileCtx 的相关设计
  • 使用watermillzap.Logger来替换本身的 LoggerAdapter,更加直观且与原项目适配image-20230816225840032

完整代码

profile/internal/watermill/consumer/consumer_context.go

// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumerimport ("context"kc "github.com/Kevinello/kafka-client""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""github.com/garsue/watermillzap""github.com/qiulin/watermill-kafkago/pkg/kafkago""go.uber.org/zap""profile/internal/config""profile/internal/connector""profile/internal/log""profile/internal/schema/performance""strings""time"
)// Consume
// @Description
// @Author xzx 2023-08-16 22:52:52
func Consume() {logger := watermillzap.NewLogger(log.Logger)publisher, subscriber := newPubSub(logger)router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Fatal("creates a new Router with given configuration error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.Retry{MaxRetries:      3,InitialInterval: time.Millisecond * 100,Logger:          logger,}.Middleware,middleware.Recoverer,)getTopics := kc.GetTopicReMatch(strings.Split(config.Profile.GetString("kafka.topicRE"), ","))topics, err := getTopics(config.Profile.GetString("kafka.bootstrap"))if err != nil {log.Logger.Fatal("get topics failed", zap.Error(err))return}for _, topic := range topics {var category stringvar handlerFunc message.HandlerFuncif strings.Contains(topic, performance.CategoryCrash) {category = performance.CategoryCrashhandlerFunc = CrashWriteKafka} else if strings.Contains(topic, performance.CategoryLag) {category = performance.CategoryLaghandlerFunc = LagWriteKafka} else {continue}router.AddHandler(category, topic, subscriber, connector.GetTopic(category), publisher, handlerFunc).AddMiddleware(UnpackKafkaMessage,InitPerformanceEvent,AnalyzeEvent)}if err = router.Run(context.Background()); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))}
}// newPubSub
// @Description
// @Author xzx 2023-08-16 22:52:45
// @Param logger
// @Return message.Publisher
// @Return message.Subscriber
func newPubSub(logger watermill.LoggerAdapter) (message.Publisher, message.Subscriber) {marshaler := kafkago.DefaultMarshaler{}publisher := kafkago.NewPublisher(kafkago.PublisherConfig{Brokers:     []string{config.Profile.GetString("kafka.bootstrap")},Async:       false,Marshaler:   marshaler,OTELEnabled: false,Ipv4Only:    true,Timeout:     100 * time.Second,}, logger)subscriber, err := kafkago.NewSubscriber(kafkago.SubscriberConfig{Brokers:       []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler:   marshaler,ConsumerGroup: config.Profile.GetString("kafka.group"),OTELEnabled:   false,}, logger)if err != nil {log.Logger.Fatal("Unable to create subscriber", zap.Error(err))}return publisher, subscriber
}

profile/internal/watermill/consumer/consumer_stage.go

// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumerimport ("context""encoding/json""github.com/ThreeDotsLabs/watermill/message""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/schema""profile/internal/schema/performance""profile/internal/state"
)type ContextData struct {Status intEvent  schema.EventAppID         string // API 上报FetchScenario string // API 上报
}// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {var data ContextData// 反序列化,存入通用结构体if contextErr := json.Unmarshal(msg.Payload, &data.Event); contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}log.Logger.Info("[1-UnpackKafkaItem] unpack kafka item success", zap.Any("event", data.Event))msg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)event, contextErr := performance.EventFactory(data.Event.Category, data.Event.Dimensions, data.Event.Values)if contextErr != nil {data.Status = state.StatusEventFactoryErrorreturn nil, contextErr}log.Logger.Info("[2-InitPerformanceEvent] Consume performance event success", zap.Any("event", data.Event))data.Event.ProfileData = eventmsg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)contextErr := data.Event.ProfileData.Analyze()if contextErr != nil {data.Status = state.StatusAnalyzeErrorreturn nil, contextErr}log.Logger.Info("[3-AnalyzeEvent] analyze event success", zap.Any("event", data.Event))// clear dimensions and valuesdata.Event.Dimensions = nildata.Event.Values = nilmsg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// CrashWriteKafka
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func CrashWriteKafka(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}msg = message.NewMessage(data.Event.ID, toWriteBytes)log.Logger.Info("[4-CrashWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))return message.Messages{msg}, nil
}func LagWriteKafka(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}msg = message.NewMessage(data.Event.ID, toWriteBytes)log.Logger.Info("[4-LagWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))return message.Messages{msg}, nil
}

测试

上报PERF_LAG Event可以并发处理 2 条消息,不必等待上一条消息处理完

image-20230816222712201

image-20230816222820505

多次测试发现是由于两条消息走了不同的 Handler

image-20230816230158777

暂未修复,明明是同一主题的两条消息却都走了两条不同的链路,而且 publisher 最后写回的主题也是写到了不同的主题上,并且上报另一个类型的事件,即另一个主题的消息却无法触发消费者消费!

暂定先写死两个主题名称测试是否正常

明日待办

  1. 开会讨论项目规划和任务分工
  2. 继续完成需求
http://www.lryc.cn/news/168587.html

相关文章:

  • PTA:7-3 两个递增链表的差集
  • 智能合约漏洞案例,DEI 漏洞复现
  • Attention is all you need 论文笔记
  • Hdoop伪分布式集群搭建
  • java临时文件
  • C++中的<string>头文件 和 <cstring>头文件简介
  • 安装MySQL
  • 输入学生成绩,函数返回最大元素的数组下标,求最高分学生成绩(输入负数表示输入结束)
  • 常用音频接口:TDM,PDM,I2S,PCM
  • git clone报错Failed to connect to github.com port 443 after 21055 ms:
  • 【操作系统】深入浅出死锁问题
  • springboot实现webSocket服务端和客户端demo
  • 代码走读: FFMPEG-ffplayer02
  • 【数据结构】——排序算法的相关习题
  • C高级day5(Makefile)
  • Android 系统中适配OAID获取
  • 差分数组leetcode 2770 数组的最大美丽值
  • 请求响应状态码
  • 安卓机型系统美化 Color.xml文件必备常识 自定义颜色资源
  • YOLO物体检测-系列教程1:YOLOV1整体解读(预选框/置信度/分类任/回归任务/损失函数/公式解析/置信度/非极大值抑制)
  • 2023/9/12 -- C++/QT
  • 【Purple Pi OH RK3566鸿蒙开发板】OpenHarmony音频播放应用,真实体验感爆棚!
  • Android rom开发:9.0系统上实现4G wifi 以太网共存
  • 高速自动驾驶HMI人机交互
  • 【自然语言处理】关系抽取 —— SOLS 讲解
  • 周易算卦流程c++实现
  • 软件架构设计(十三) 构件与中间件技术
  • PyTorch深度学习实战——基于ResNet模型实现猫狗分类
  • 机器学习第六课--朴素贝叶斯
  • 基于Java+SpringBoot+Vue的图书借还小程序的设计与实现(亮点:多角色、点赞评论、借书还书、在线支付)