当前位置: 首页 > news >正文

Python高效操作Kafka实战指南

Python操作Kafka的高效

以下是使用Python操作Kafka的高效消息发送实例,涵盖基础发送、批量处理、异步回调等场景。示例基于confluent-kafka库(推荐)和kafka-python库,代码均经过实测。

流程图

基础消息发送(同步)

from confluent_kafka import Producerproducer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('test_topic', key='key1', value='Hello Kafka')
producer.flush()  # 确保消息发送完成

基础消息发送(异步)

from confluent_kafka import Producerdef delivery_report(err, msg):if err:print(f'Message delivery failed: {err}')else:print(f'Message delivered to {msg.topic()} [{msg.partition()}]')producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('test_topic', value='Async message', callback=delivery_report)
producer.poll(0)  # 触发回调
producer.flush()

批量消息发送

from confluent_kafka import Producerproducer = Producer({'bootstrap.servers': 'localhost:9092'})
for i in range(100):producer.produce('batch_topic', value=f'Message {i}')
producer.flush()


带Key的消息发送

from confluent_kafka import Producerproducer = Producer({'bootstrap.servers': 'localhost:9092'})
for user_id in ['user1', 'user2', 'user3']:producer.produce('user_events', key=user_id, value=f'Event for {user_id}')
producer.flush()


高性能配置

from confluent_kafka import Producerconf = {'bootstrap.servers': 'localhost:9092','queue.buffering.max.messages': 100000,'queue.buffering.max.ms': 500,'batch.num.messages': 1000
}
producer = Producer(conf)


消息头(Headers)支持

from confluent_kafka import Producerproducer = Producer({'bootstrap.servers': 'localhost:9092'})
headers = [('trace-id', '12345'), ('source', 'python-app')]
producer.produce('with_headers', value='Message', headers=headers)
producer.flush()


消息时间戳

from confluent_kafka import Producer
import timeproducer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('timed_topic', value='Timestamped', timestamp=int(time.time()*1000))
producer.flush()


自定义分区路由

from confluent_kafka import Producerdef partitioner(key, partitions, opaque):return hash(key) % len(partitions)producer = Producer({'bootstrap.servers': 'localhost:9092','partitioner': partitioner
})
producer.produce('custom_partition', key='user123', value='Data')
producer.flush()


压缩消息

from confluent_kafka import Producerproducer = Producer({'bootstrap.servers': 'localhost:9092','compression.type': 'gzip'
})
producer.produce('compressed_topic', value='Compressed message')
producer.flush()


同步发送超时控制

from confluent_kafka import Producer, KafkaExceptionproducer = Producer({'bootstrap.servers': 'localhost:9092'})
try:producer.produce('timeout_topic', value='Test')producer.flush(timeout=5)  # 5秒超时
except KafkaException as e:print(f"Send failed: {e}")


使用kafka-python基础发送

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('python_topic', value=b'Message from kafka-python')
producer.flush()


kafka-python带Key发送

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('keyed_topic', key=b'user1', value=b'User event')
producer.flush()


kafka-python批量发送

from kafka i
http://www.lryc.cn/news/599378.html

相关文章:

  • 专为小靶面工业相机的抗振微距镜头
  • C++ string:准 STL Container
  • Java线程基础面试复习笔记
  • 相机ROI 参数
  • 力扣-32.最长有效括号
  • Python(32)Python内置函数全解析:30个核心函数的语法、案例与最佳实践
  • 188.买卖股票的最佳时机IV 309.最佳买卖股票时机含冷冻期 714.买卖股票的最佳时机含手续费
  • 《C++初阶之STL》【vector容器:详解 + 实现】
  • Python应用append()方法向列表末尾添加元素
  • 深入解析HBase如何保证强一致性:WAL日志与MVCC机制
  • selenium 元素定位
  • 【unitrix】 6.15 “非零非负一“的整数类型(NonZeroNonMinusOne)特质(non_zero_non_minus_one.rs)
  • XCTF-crypto-幂数加密
  • Docker 实战大纲
  • Windows Installer安全深度剖析
  • SQL基础⑭ | 变量、流程控制与游标篇
  • 解放生产力:Amazon API Gateway 与 Amazon Lambda 的优雅组合
  • adb 下载并安装
  • 使用Python绘制金融数据可视化工具
  • SR9900低功耗USB 2.0转百兆以太网控制器芯片,SR9900规格书,SR9900原理图
  • 【第四章:大模型(LLM)】01.神经网络中的 NLP-(1)RNN、LSTM 和 GRU 的基本原理和应用
  • Linux网络框架分析
  • 使用vllm创建相同模型的多个实例,使用nginx进行负载均衡,提高模型吞吐量
  • RabbitMQ—HAProxy负载均衡
  • 数仓主题域划分
  • [linux]Haproxy七层代理
  • Agent领域,近年来的前沿研究方向:多智能体协作、认知启发架构、伦理安全、边缘计算集成
  • 多租户系统中的安全隔离机制设计
  • 【数学建模|Matlab】数学建模「常用作图」示例
  • classgraph:Java轻量级类和包扫描器