当前位置: 首页 > news >正文

rocketMQ5.0顺序消息golang接入

本人理解,顺序消息如果不分消息组,那么会影响并行处理速度,所以尽量消息组分的散一些
首先上要求,官方文档如下:
在这里插入图片描述
总结:
1.必须同一个消息组,消息组和消费组不是一个概念,不要混
2.必须单一生产者,也就是说线上生产只能开一个 pod,感觉局限有点高,无法多 pod 接入
3.必须串行发送,这个点也不太好,限制过高
以上三点
在我的案例中(企业微信回调消息)
那么只能开一个接口服务Pod 来接收微信回调,如果挂了,那完蛋,开多个 pod的话,那只有把请求放队列,通过队列pop进行消费再生产消息到 mq,这样同时也解决了第三点的串行发送。

生产的时候如何确定是顺序消息,只需要生产消息的时候给设定一个消息组

msg := &rmq_client.Message{
Topic: Topic,Body:  []byte("this is a message : " + strconv.Itoa(i)),
}
// set keys and tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
// 这里设置消息组
msg.SetMessageGroup("fifo")

分组分的越细越好

提高消费速度

当拿到消息后,根据消息分组来进行并发处理,每个分组内进行串行处理,关键代码如下


func (m *RocketMq) ConsumerOrderly(funcMap map[string]func([]byte) error) {var err errorm.consumerOnce.Do(func() {Log().Info("##############pro consume orderly start#############", m.MqConfig)errTmp := m.proSimConsumer.Start()if errTmp != nil {Log().Panic("MQ启动失败", errTmp)return}})defer m.proSimConsumer.GracefulStop()// 总体保证有N个在运行var ch = make(chan int, m.MaxGoroutine)for {fmt.Println("start receive message")mvs, errReceive := m.proSimConsumer.Receive(context.TODO(), 8, 20*time.Second)if errReceive != nil {if strings.Contains(errReceive.Error(), "no new message") {Log().Info(errReceive)} else {Log().Error("顺序消息,拉取MQ消息失败:", errReceive)}time.Sleep(time.Second * 2)continue}var msgGroupMap = make(map[string][]*rmq_client.MessageView, 0)for _, v := range mvs {if _, ok := funcMap[*v.GetTag()]; !ok {Log().Error(v.GetTag(), ": action do not exist")continue} else {// 根据msgGroup汇总msgGroup := v.GetMessageGroup()msgGroupMap[*msgGroup] = append(msgGroupMap[*msgGroup], v)}}// 最大程度多线程消费for _, item := range msgGroupMap {ch <- 1go func(item []*rmq_client.MessageView) {for _, v := range item {fmt.Println(*v.GetTag(), *v.GetMessageGroup(), string(v.GetBody()))action, _ := funcMap[*v.GetTag()]if errTmp := action(v.GetBody()); errTmp != nil {Log().Error("mq顺序消费失败", errTmp)break} else {m.proSimConsumer.Ack(context.TODO(), v)}}<-ch}(item)}}
}

最后需要注意的点:

同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失
请保证订阅一致性
在这里插入图片描述

http://www.lryc.cn/news/241438.html

相关文章:

  • HuggingFace-利用BERT预训练模型实现中文情感分类(下游任务)
  • PSP - 从头搭建 抗原类别 (GPCR) 的 蛋白质结构预测 项目流程
  • 城市NOA加速落地,景联文科技高质量数据标注助力感知系统升级
  • 控制反转(IoC)是什么?
  • Redisson分布式锁源码解析、集群环境存在的问题
  • 2016年10月4日 Go生态洞察:HTTP追踪介绍
  • 分布式篇---第四篇
  • 从零开始的C++(十九)
  • opencv-使用 Haar 分类器进行面部检测
  • C++纯虚函数和抽象类 制作饮品案例(涉及知识点:继承,多态,实例化继承抽象类的子类,多文件实现项目)
  • 什么是网关和链路追踪,以及怎么使用?
  • git 文件被莫名其妙的或略且无论如何都查不到哪个.gitignore文件忽略的
  • nova组件简介
  • 【Vue】响应式与数据劫持
  • Modbus RTU转Profinet网关连接PLC与变频器通讯在机床上应用案例
  • Autoware 整体架构
  • 【maven】手动指定jar推送
  • 算法---定长子串中元音的最大数目
  • 美国汽车零部件巨头 AutoZone 遭遇网络攻击
  • WPF面试题入门篇
  • opencv-ORB检测
  • please upgrade numpy version to >=1.20
  • 关于进制的转化
  • JVM 之 字节码指令
  • 阿里云跨账号建立局域网
  • 【OpenSTL】方便好用的时空预测开源库
  • 【Unity】IBeginDragHandler、IDragHandler 和 IEndDragHandler 介绍
  • 杰发科技AC7801——Flash模拟EEP内存分布情况
  • 【前端知识】Node——http模块url模块的常用操作
  • 平衡二叉树 (简单易懂)