当前位置: 首页 > news >正文

Python 消费Kafka手动提交 批量存入Elasticsearch

一、第三方包选择

pip install kafka,对比了kafka和pykafka,还是选择kafka,消费速度更快
pip install elasticsearch==7.12.0(ES版本)

二、创建es连接对象

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulkclass Create_ES(object):_instance = Nonedef __new__(cls, *args, **kwargs):if cls._instance is None:cls._instance = super().__new__(cls)return cls._instancedef __init__(self, hosts):try:self.es = Elasticsearch([{'host':host, 'port':9200}])except Exception as e:print('Connect ES Fail db:{} error:{}'.format(hosts, str(e)))def get_conn(self):return self.esdef set_multi_data(self, datas):'''批量插入数据'''success = bulk(self.es, datas, raise_on_error=True)return success

三、消费kafka数据

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from . import Create_ESclass AppKfkConsumer(object):def __init__(self):self.server = 'localhost:9092'self.topic = KAFKA_TOPICself.consumer = Noneself.tp = Noneself.consumer_timeout_ms = 5000  # 设置消费超时时间,self.type = 'members'self.group_id = 'test1'  # 设置消费group_id,避免重复消费self.es_index = 'index'  # es的indexdef get_connect(self):self.consumer = KafkaConsumer(group_id=self.group_id,auto_offset_reset='earliest',  # 从最早的数据开始消费bootstrap_servers=self.server,enable_auto_commit=False,  # 关闭自动提交consumer_timeout_ms=self.consumer_timeout_ms)self.tp = TopicPartition(topic=self.topic, partition=0)  # 设置我们要消费的分区self.consumer.assign([self.tp])  # 由consumer对象分配分区def beginConsumer(self):now_offset = 0  # 当前偏移量es_conn = Create_ES()Actions = []while True:for message in self.consumer:now_offset = message.offset  # 获取当前偏移量data = eval(message.value.decode())  # 解析数据action = {"_index": self.es_index,"_type": self.type,"_source": data}Actions.append(action)if len(Actions) >= 50000:result = es_conn.set_multi_data(Actions)  # 批量插入数据Actions = []# 提交偏移量,now_offset+1的原因是因为我发现如果不加1,下次消费会从上次消费最后一条数据开始,重复消费self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})if len(Actions) > 0:result = es_conn.set_multi_data(Actions)Actions = []self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})def delconnect(self):self.consumer.close()# 执行任务
ks = AppKfkConsumer()
ks.get_connect()
ks.beginConsumer()

http://www.lryc.cn/news/409117.html

相关文章:

  • oracle 基础知识表的主键
  • opencascade AIS_MouseGesture AIS_MultipleConnectedInteractive源码学习
  • Unity Apple Vision Pro 开发:如何把 PolySpatial 和 Play To Device 的版本从 1.2.3 升级为 1.3.1
  • 大数据时代,区块链是如何助力数据开放共享的?
  • 睿抗2024省赛----RC-u4 章鱼图的判断
  • py2exe,一个神奇的 Python 库
  • 博途PLC网络连接不上
  • 哪个邮箱最安全最好用啊
  • 企业微信开发智能升级:AIGC技术赋能,打造高效沟通平台
  • Apache Doris + Paimon 快速搭建指南|Lakehouse 使用手册(二)
  • Inno setup pascal编码下如何美化安装界面支持带边框,圆角,透明阴影窗口
  • SQL语句(以MySQL为例)——单表、多表查询
  • C++第二十八弹---进一步理解模板:特化和分离编译
  • 正则表达式的独占模式,懒惰模式等有那些区别
  • 【INTEL(ALTERA)】Quartus® Prime Pro Edition 软件 v24.2 中,哪些 Agilex™ 5 IP 功能的硬件验证有限?
  • Lua编程
  • 2019数字经济公测大赛-VMware逃逸
  • 如何改桥接模式
  • 江科大/江协科技 STM32学习笔记P13
  • loadrunner录制解决提示安全问题
  • 为什么要读写分离?如何实现业务系统读写分离?
  • C#基础——类、构造函数和静态成员
  • hadoop学习(二)
  • WXZ196微机消谐装置的运行方式了解一下
  • 单链表的建立
  • Shell脚本编程学习
  • 从宏基因组量化细菌生长动态
  • Linux---git工具
  • 【JavaScript】函数的动态传参
  • 从0到1,AI我来了- (4)AI图片识别的理论知识-II