当前位置: 首页 > news >正文

kafka-go操作kafka

package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)var (topic  = "user_click"reader *kafka.Reader
)// 生产消息
func writeKafka(ctx context.Context) {writer := &kafka.Writer{Addr:                   kafka.TCP("localhost:9092"), //不定长参数,支持传入多个broker的ip:portTopic:                  topic,                       //为所有message指定统一的topic。如果这里不指定统一的Topic,则创建kafka.Message{}时需要分别指定TopicBalancer:               &kafka.Hash{},               //把message的key进行hash,确定partitionWriteTimeout:           1 * time.Second,             //设定写超时RequiredAcks:           kafka.RequireNone,           //RequireNone不需要等待ack返回,效率最高,安全性最低;RequireOne只需要确保Leader写入成功就可以发送下一条消息;RequiredAcks需要确保Leader和所有Follower都写入成功才可以发送下一条消息。AllowAutoTopicCreation: true,                        //Topic不存在时自动创建。生产环境中一般设为false,由运维管理员创建Topic并配置partition数目}defer writer.Close() //记得关闭连接for i := 0; i < 3; i++ { //允许重试3次if err := writer.WriteMessages(ctx, //批量写入消息,原子操作,要么全写成功,要么全写失败kafka.Message{Key: []byte("1"), Value: []byte("A")},kafka.Message{Key: []byte("2"), Value: []byte("B")},kafka.Message{Key: []byte("3"), Value: []byte("C")},kafka.Message{Key: []byte("1"), Value: []byte("D")}, //key相同时肯定写入同一个partitionkafka.Message{Key: []byte("2"), Value: []byte("E")},); err != nil {if err == kafka.LeaderNotAvailable { //首次写一个新的Topic时,会发生LeaderNotAvailable错误,重试一次就好了time.Sleep(500 * time.Millisecond)continue} else {fmt.Printf("batch write message failed: %v", err)}} else {break //只要成功一次就不再尝试下一次了}}
}// 消费消息
func readKafka(ctx context.Context) {reader = kafka.NewReader(kafka.ReaderConfig{Brokers:        []string{"localhost:9092"}, //支持传入多个broker的ip:portTopic:          topic,CommitInterval: 1 * time.Second,   //每隔多长时间自动commit一次offset。即一边读一边向kafka上报读到了哪个位置。GroupID:        "recommend_biz",   //一个Group内消费到的消息不会重复StartOffset:    kafka.FirstOffset, //当一个特定的partition没有commited offset时(比如第一次读一个partition,之前没有commit过),通过StartOffset指定从第一个还是最后一个位置开始消费。StartOffset的取值要么是FirstOffset要么是LastOffset,LastOffset表示Consumer启动之前生成的老数据不管了。仅当指定了GroupID时,StartOffset才生效。})// defer reader.Close() //由于下面是死循环,正常情况下readKafka()函数永远不会结束,defer不会执行。所以需要监听信息2和15,当收到信号时关闭reader。需要把reader设为全局变量for { //消息队列里随时可能有新消息进来,所以这里是死循环,类似于读Channelif message, err := reader.ReadMessage(ctx); err != nil {fmt.Printf("read message from kafka failed: %v", err)break} else {offset := message.Offsetfmt.Printf("topic=%s, partition=%d, offset=%d, key=%s, message content=%s\n", message.Topic, message.Partition, offset, string(message.Key), string(message.Value))}}
}// 需要监听信息2和15,当收到信号时关闭reader
func listenSignal() {c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) //注册信号2和15sig := <-c                                        //阻塞,直到信号的到来fmt.Printf("receive signal %s\n", sig.String())if reader != nil {reader.Close()}os.Exit(0) //进程退出
}func main() {ctx := context.Background()// writeKafka(ctx)go listenSignal()readKafka(ctx)
} 
http://www.lryc.cn/news/223768.html

相关文章:

  • 如何判断被DDoS攻击
  • web —— html
  • 【C/PTA】数组练习(编程)
  • 力扣:155. 最小栈(Python3)
  • uniapp实现在线PDF文件预览
  • Python tkinter实现复刻Windows记事本UI和菜单的文本编辑器(一)
  • 【系统架构设计】架构核心知识: 3.3 DSSA和ABSD
  • Git的安装和常用命令Git与SVN的区别Gitee远程仓库团队开发代码共享演示
  • 五、计算机网络
  • 使用Grafana与MySQL监控监控网络延迟
  • 互联网常见职称
  • UI设计软件有哪些好用和免费的吗?
  • Linux开发工具之编译器gcc/g++
  • 【Kurbernetes资源管理】陈述式资源管理方式
  • flink测试map转换函数和process函数
  • 【跟小嘉学习JavaWeb开发】第一章 开发环境搭建
  • CSS语法、选择器、属性
  • 深度学习读取txt训练数据绘制参数曲线图的方法
  • VB.NET—DataGridView控件教程详解
  • MCU测试科普|如何进行MCU芯片测试,具体流程是什么?
  • 单向循环代码实现cpp
  • 【原创】java+jsp+servlet简单图书管理系统设计与实现
  • JVM之jinfo虚拟机配置信息工具
  • 软件测试|PO设计模式在 UI 自动化中的实践
  • 如何上传自己的Jar到Maven中央仓库
  • 智能井盖传感器功能,万宾科技产品介绍
  • 洛谷P4185 离线+并查集
  • 遇到java.security.AccessControlException:access denied怎么办?
  • c++对接CAT1400
  • Linux基础【Linux知识贩卖机】