当前位置: 首页 > news >正文

python 自建kafka消息生成和消费小工具

要将 Kafka 的消息生产和消费转换为 API 接口,我们可以使用 Python 的 Web 框架。其中 Flask 是一个轻量级且易于使用的选择。下面是一个简单的例子,使用 Flask 创建 API 来生成和消费 Kafka 消息。

1. 安装所需的库

pip install kafka-python flask

2. 创建 Flask API

from flask import Flask, request, jsonify
from kafka import KafkaProducer, KafkaConsumerapp = Flask(__name__)# 配置 Kafka
KAFKA_BROKER_URL = 'localhost:9092'
TOPIC = 'test_topic'
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)@app.route('/send', methods=['POST'])
def send_message():message = request.json.get('message')if message:producer.send(TOPIC, value=message.encode('utf-8'))return jsonify({"status": "success", "message": "Message sent!"}), 200else:return jsonify({"status": "error", "message": "Message cannot be empty!"}), 400@app.route('/receive', methods=['GET'])
def receive_message():consumer = KafkaConsumer(TOPIC, bootstrap_servers=KAFKA_BROKER_URL, auto_offset_reset='earliest')messages = []for message in consumer:messages.append(message.value.decode('utf-8'))if len(messages) > 5:  # 只收集最近的5条消息,可以根据需要调整breakreturn jsonify(messages)if __name__ == '__main__':app.run(debug=True, port=5000)

这个 Flask 应用程序定义了两个端点:

  • /send: 它接受 POST 请求并发送消息到 Kafka。
  • /receive: 它返回 Kafka 主题中的最近消息。

3. 使用 API

  • 发送消息:
curl -X POST http://localhost:5000/send -H "Content-Type: application/json" -d '{"message": "Hello, Kafka!"}'
  • 接收消息:
curl http://localhost:5000/receive

这只是一个简单的示例,您可能需要添加错误处理、日志记录、认证、消息序列化和反序列化等功能,以满足更复杂的需求。

http://www.lryc.cn/news/174523.html

相关文章:

  • Prim算法:经过图中所有节点的最短路径
  • Linux 信号捕捉函数 signal sigaction
  • StarRocks操作笔记
  • Linux的ls -ld命令产生的信息怎么看
  • Linux- 内存映射文件(Memory-Mapped File)
  • 李航老师《统计学习方法》第五章阅读笔记
  • iOS16新特性:实时活动-在锁屏界面实时更新APP消息 | 京东云技术团队
  • 使用 Elasticsearch、OpenAI 和 LangChain 进行语义搜索
  • NIFI集群_队列Queue中数据无法清空_清除队列数据报错_无法删除queue_解决_集群中机器交替重启删除---大数据之Nifi工作笔记0061
  • leetcode20. 有效的括号 [简单题]
  • ubuntu20.04下源码编译colmap
  • Jumpserver堡垒机
  • 第一百五十三回 如何实现滑动窗口
  • Oracle 12c自动化管理特性的新进展:自动备份、自动恢复和自动维护功能的优势|oracle 12c相对oralce 11g的新特性(3)
  • Redis——Jedis中hash类型使用
  • 肖sir__项目实战讲解__004
  • 数据库数据恢复-ORACLE常见故障有哪些?恢复数据的可能性高吗?
  • 合规性管理如何帮助产品团队按时交付?
  • 从平均数到排名算法
  • 如何使用ESP8266微控制器和Nextion显示器为Home Assistant展示温度传感器和互联网天气预报
  • 阻塞队列-生产者消费者模型
  • Vector Art - 矢量艺术
  • ruoyi-nbcio增加flowable流程待办消息的提醒,并提供右上角的红字数字提醒(一)
  • 数据结构:二叉树的基本概念
  • 利用Socks5代理IP加强跨界电商爬虫的网络安全
  • Spring学习笔记6 Bean的实例化方式
  • 大二毕设.3-网盘系统-用户模块讲解
  • (Vue2)智慧商城项目
  • Nginx实战
  • day-57 代码随想录算法训练营(19)动态规划 part 17