当前位置: 首页 > news >正文

go语言kafka入门

消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据

消息队列相关概念:

生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。

消费者(Consumer):从消息队列中接收和处理消息的应用程序或系统组件。

主题(Topic):消息队列中用于分类和分组消息的逻辑概念,生产者将消息发送到指定的主题,而消费者可以订阅特定的主题以接收相应的消息。

队列(Queue):消息队列中存储消息的容器,遵循先进先出(FIFO)的原则。

发布-订阅模式(Publish-Subscribe Pattern):一种消息分发模式,生产者将消息发送到一个或多个主题,而消费者通过订阅感兴趣的主题来接收相应的消息。

点对点模式(Point-to-Point Pattern):一种消息传递模式,生产者将消息发送到特定的队列中,而消费者从队列中接收并处理消息。

消息序列化(Message Serialization):将消息从应用程序的数据结构转换为可以在消息队列中传输和存储的格式,通常使用如JSON、XML 或二进制等格式。

消息持久化(Message Persistence):将消息保存到持久化存储中,以确保即使在消息队列或应用程序重启之后也不会丢失

在这里插入图片描述

安装kafka-go

go get github.com/segmentio/kafka-go

简单示例

package mainimport ("context""fmt""log""sync""time""github.com/segmentio/kafka-go"
)// writeByConn 基于Conn发送消息
func writeByConn(wg *sync.WaitGroup) {defer wg.Done()topic := "my-topic"partition := 0// 连接至Kafka集群的Leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))// 发送消息_, err = conn.WriteMessages(kafka.Message{Value: []byte("one!")},kafka.Message{Value: []byte("two!")},kafka.Message{Value: []byte("three!")},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn(wg *sync.WaitGroup) {defer wg.Done()// 指定要连接的topic和partitiontopic := "my-topic"partition := 0// 连接至Kafka的leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {n, err := batch.Read(b)if err != nil {break}fmt.Println(string(b[:n]))}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}func main() {var wg sync.WaitGroupwg.Add(2)go writeByConn(&wg)go readByConn(&wg)wg.Wait()
}

运行结果

one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
2023/08/25 10:14:10 failed to close batch:[7] Request Timed Out: the request exceeded the user-specified time limit in the request

可以看到,利用两个goroutine,成功在一个文件里实现了生产者写入消息,消费者消费消息并打印出来

http://www.lryc.cn/news/138523.html

相关文章:

  • 自定义拖拽功能,上下拖拽改变盒子高度
  • JavaScript Es6_4笔记
  • Python“牵手”易贝(Ebay)商品列表数据,关键词搜索ebayAPI接口数据,ebayAPI接口申请指南
  • C语言:选择+编程(每日一练Day8)
  • 使用 uniapp 适用于wx小程序 - 实现移动端头部的封装和调用
  • ARM Linux 系统稳定性分析入门及渐进 13 -- gdb 反汇编 disassemble 命令详细介绍及举例】
  • python连接Microsoft SQL Server 数据库
  • docker可视化工具
  • MySQL 用户管理操作
  • 【python办公自动化】PysimpleGUI中的popup弹窗中的按钮设置居中
  • postgresql 同步流复制两个相关参数synchronous_commit 和 synchronous_standby_names
  • 运算放大器发展史
  • LVS+Keepalived 实验
  • FreeSWITCH 1.10.10 简单图形化界面1 - docker/脚本/ISO镜像安装
  • 内网渗透神器CobaltStrike之权限提升(七)
  • 使用haproxy搭建web架构
  • Java基础之IO流File类创建及删除
  • 高速道路监控:工业路由器助力高速监控远程管理与维护
  • 【校招VIP】前端基础之post和get
  • 如何合理设计API接口?
  • Jsp 解决out.print()输出多出空行
  • SMC状态机 讲解2 从模型到SMC
  • MyBatis-Plus的使用
  • 板卡设计+硬件每日学习十个知识点(44)23.8.24 (检测单元设计,接口部分设计,板卡电源输入设计,电源检测电路)
  • jmeter HTTP信息头管理器
  • 各种中间件的默认端口
  • leetcode303. 区域和检索 - 数组不可变(java)
  • PHP 安装Composer,vue前端依赖包
  • OpenCV项目开发实战--基于Python/C++实现鼠标注释图像和轨迹栏来控制图像大小
  • ❤ Vue使用Eslint检测报错问题和解决