当前位置: 首页 > news >正文

Python连接Kafka收发数据等操作

目录

一、Kafka

二、发送端(生产者)

三、接收端(消费者)

四、其他操作


一、Kafka

Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构建实时的数据流和流式处理应用程序,它以高吞吐量、可扩展性和容错性著称。

kafka-python 是一个用 Python 编写的 Apache Kafka 客户端库。

安装命令如下:

pip install kafka-python

二、发送端(生产者)

自动创建test主题,并每隔一秒发送一条数据,示例代码如下:

from kafka import KafkaProducer
import json
import time# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)# 发送消息的函数
def send_message(topic, message):# 将消息转换为字节producer.send(topic, json.dumps(message).encode('utf-8'))producer.flush()if __name__ == '__main__':# 创建'test'主题topic = 'test'# 发送消息i = 1while True:message = {'num': i, 'msg': f'Hello Kafka {i}'}send_message(topic, message)i += 1time.sleep(1)

三、接收端(消费者)

代码如下:

from kafka import KafkaConsumer
import json# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaConsumer实例
consumer = KafkaConsumer('test',bootstrap_servers=bootstrap_servers,auto_offset_reset='latest',  # 从最新的消息开始消费# auto_offset_reset='earliest',  # 从最早的offset开始消费enable_auto_commit=True,  # 自动提交offsetgroup_id='my-group'  # 消费者组ID
)# 消费消息
for message in consumer:# 将接收到的消息解码并转换为字典message = json.loads(message.value.decode('utf-8'))print(f"Received message: {message}")

消费者参数如下:

1、auto_offset_reset
该参数指定了当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时(例如数据被删除了),消费者应从何处开始读取数据。
可选值:
earliest:从最早的记录开始消费,即从分区日志的开始处开始。
latest:从最新的记录开始消费,即从分区日志的末尾开始。(默认)
none:如果没有为消费者指定初始偏移量,就抛出一个异常。

2、enable_auto_commit

该参数指定了消费者是否周期性地提交它所消费的偏移量。自动提交偏移量可以简化消费者的使用,但可能有重复消费或数据丢失的风险。禁用自动提交可以更精确地控制偏移量的提交时机,通常在确保消息处理成功后才提交偏移量。
可选值:
true:自动提交偏移量。(默认)
false:不自动提交偏移量,需要手动调用commitSync()或commitAsync()来提交偏移量。

3、group_id

该参数用于指定消费者所属的消费组。同一个消费组的消费者将共同消费一个主题的不同分区,而不同消费组的消费者可以独立地消费消息,互不影响。这对于实现负载均衡和故障转移很有用。
类型:字符串(必须指定)

四、其他操作

list_topics():获取主题元数据。

create_topics():创建新主题。

delete_topics():删除主题。

from kafka.admin import KafkaAdminClient, NewTopic# 获取主题元数据
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092', client_id='test')
topics = admin_client.list_topics()
print(topics)# 创建主题
new_topic = NewTopic(name="test-topic", num_partitions=3, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)# 删除主题
admin_client.delete_topics(topics=['test-topic'])

http://www.lryc.cn/news/448091.html

相关文章:

  • MySql在更新操作时引入“两阶段提交”的必要性
  • 充气模块方案——无刷充气泵pcba方案
  • [sql-03] 求阅读至少两章的人数
  • Linux如何通过链接下载文件
  • seL4 IPC(五)
  • 【Java】多线程基础操作
  • 基于Hive和Hadoop的病例分析系统
  • 数据结构编程实践20讲(Python版)—03栈
  • 【注册/登录安全分析报告:孔夫子旧书网】
  • PMP--二模--解题--141-150
  • 我的领域-关怀三次元成长的二次元虚拟陪伴 | OPENAIGC开发者大赛高校组AI创作力奖
  • 个人账号(学校+个人)申请专利过程中遇见的问题
  • 在ubuntu系统中,如何让其按下物理关机键时,系统不处理,但qt程序能检测到关机键按下的事件,并处理信号
  • 先进制造aps专题二十六 基于强化学习的人工智能ai生产排程aps模型简介
  • 各领域/行业硬件一览表
  • 机器学习-SVM
  • 翻译器在线翻译:开启多语言交流新时代
  • 网络编程(10)——json序列化
  • 基于FreeRTOS的STM32多功能手表设计
  • 18.Linux-配置DNF仓库
  • GeoPB:高效处理地理空间数据的Protobuf解决方案
  • 华为仓颉语言入门(6):if条件表达式
  • openlayers中一些问题的解决方案
  • java通过redis完成幂等性操作
  • 48 旋转图像
  • TDengine 签约青山钢铁,实现冶金全流程质量管控智能化
  • __pycache__文件夹
  • 利用 Local Data 导入文件到 OceanBase 的方法
  • 改变安全策略的五大实践
  • 在MacOS上安装MongoDB数据库