当前位置: 首页 > news >正文

golang kafka sarama 源码解析

  • 消费者组重平衡

github.com/!shopify/sarama@v1.27.2/consumer_group.go

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err := c.client.Coordinator(c.groupID)if err != nil {if retries <= 0 {return nil, err}return c.retryNewSession(ctx, topics, handler, retries, true)}// 申请加入组// Join consumer groupjoin, err := c.joinGroupRequest(coordinator, topics)if err != nil {_ = coordinator.Close()return nil, err}switch join.Err {case ErrNoError:c.memberID = join.MemberIdcase ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, true)// 已经在重平衡期间case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, join.Err}// 消费者组中的一个消费者作为leader,进行分区方案分配// Prepare distribution plan if we joined as the leadervar plan BalanceStrategyPlanif join.LeaderId == join.MemberId {members, err := join.GetMembers()if err != nil {return nil, err}// 分配分区plan, err = c.balance(members)if err != nil {return nil, err}}// 同步给kafka,只有 leader会带上分区方案// Sync consumer groupgroupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)if err != nil {_ = coordinator.Close()return nil, err}switch groupRequest.Err {case ErrNoError:case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, true)case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, groupRequest.Err}// Retrieve and sort claimsvar claims map[string][]int32 // topic->partions// 如果有可消费的分区if len(groupRequest.MemberAssignment) > 0 {members, err := groupRequest.GetMemberAssignment()if err != nil {return nil, err}claims = members.Topicsc.userData = members.UserDatafor _, partitions := range claims {sort.Sort(int32Slice(partitions))}}return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}
  • 消费者拉取消息
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {bc := &brokerConsumer{consumer:         c,broker:           broker,input:            make(chan *partitionConsumer),newSubscriptions: make(chan []*partitionConsumer),wait:             make(chan none),subscriptions:    make(map[*partitionConsumer]none),refs:             0,}go withRecover(bc.subscriptionManager)go withRecover(bc.subscriptionConsumer)return bc
}
http://www.lryc.cn/news/323531.html

相关文章:

  • Flutter知识点整理
  • 现代游戏引擎架构
  • 深度学习:复杂工业场景下的复杂缺陷检测方法
  • CSDN个人简介优化 html font属性
  • 从哈希桶角度看 unordered_map 与 unordered_set 的实现
  • 飞天使-k8s知识点27-kubernetes温故知新2-deployment
  • 手机网页关键词视频爬虫采集软件可导出视频分享链接|视频无水印批量下载工具
  • 基于OpenCV的图像处理案例之图像矫正(Python)
  • 创建linux虚拟机系统:(安装Ubuntu镜像文件,包含语言设置、中文输入法、时间设置)
  • 3.0 V-22V 宽输入电压,高效率异步升压芯片-ZCC5429
  • Sphinx + Readthedocs 避坑速通指南
  • IPP-7010 表面贴装 90 度混合耦合器
  • 25.2 微服务Dubbo
  • CI/CD环境搭建
  • API调试管理工具Postman下载及操作介绍
  • vue集成百度地图,实现关键字搜索并自定义覆盖物,保存成静态图片
  • Java中的Stream流
  • 前端UI怎么防止用户反复提交?
  • OpenHarmony游戏应用程序-实现的一个手柄游戏
  • Redis+Lua脚本+SpringAOP实现接口限流
  • 【wpf应用8】如何让WPF Grid控件根据屏幕尺寸自动调整
  • 掌握ChatGPT:如何用AI撰写高质量论文
  • 平衡隐私与效率,Partisia Blockchain 解锁数字安全新时代
  • 【JavaScript】NPM常用指令指南
  • k8s-多容器Pod、容器保护策略、宽限期、最大生命周期、嵌入式脚本、多容器Pod、资源监控工具
  • 机器学习——线性回归(头歌实训)
  • Echarts 利用多X轴实现未来15天天气预报
  • [数据结构初阶]二叉树
  • matlab和stm32的安装环境。能要求与时俱进吗,en.stm32cubeprg-win64_v2-6-0.zip下载太慢了
  • Opencv面试题