当前位置: 首页 > news >正文

elasticsearch 批量写入(Python版).md

1. 插入数据

现在我们如果有大量的文档(例如10000000万条文档)需要写入es 的某条索引中,该怎么办呢?

1.1 顺序插入

import time
from elasticsearch import Elasticsearches = Elasticsearch()def timer(func):def wrapper(*args, **kwargs):start = time.time()res = func(*args, **kwargs)print('共耗时约 {:.2f} 秒'.format(time.time() - start))return resreturn wrapper@timer
def create_data():""" 写入数据 """for line in range(100):es.index(index='s2', doc_type='doc', body={'title': line})if __name__ == '__main__':create_data()   # 执行结果大约耗时 7.79 秒

1.2 批量插入

import time
from elasticsearch import Elasticsearch
from elasticsearch import helperses = Elasticsearch()def timer(func):def wrapper(*args, **kwargs):start = time.time()res = func(*args, **kwargs)print('共耗时约 {:.2f} 秒'.format(time.time() - start))return resreturn wrapper@timer
def create_data():""" 写入数据 """for line in range(100):es.index(index='s2', doc_type='doc', body={'title': line})@timer
def batch_data():""" 批量写入数据 """action = [{"_index": "s2","_type": "doc","_source": {"title": i}} for i in range(10000000)]helpers.bulk(es, action)if __name__ == '__main__':# create_data()batch_data()  # MemoryError

我们通过elasticsearch模块导入helper,通过helper.bulk来批量处理大量的数据。首先我们将所有的数据定义成字典形式,各字段含义如下:

  • _index对应索引名称,并且该索引必须存在。
  • _type对应类型名称。
  • _source对应的字典内,每一篇文档的字段和值,可有有多个字段。

首先将每一篇文档(组成的字典)都整理成一个大的列表,然后,通过helper.bulk(es, action)将这个列表写入到es对象中。
然后,这个程序要执行的话——你就要考虑,这个一千万个元素的列表,是否会把你的内存撑爆(MemoryError)!很可能还没到没到写入es那一步,却因为列表过大导致内存错误而使写入程序崩溃!很不幸,我的程序报错了。下图是我在生成列表的时候,观察任务管理器的进程信息,可以发现此时Python消耗了大量的系统资源,而运行es实例的Java虚拟机却没什么变动。

解决办法是什么呢?我们可以分批写入,比如我们一次生成长度为一万的列表,再循环着去把一千万的任务完成。这样, Python和Java虚拟机达到负载均衡。

下面的示例测试10万条数据分批写入的速度

import time
from elasticsearch import Elasticsearch
from elasticsearch import helperses = Elasticsearch()def timer(func):def wrapper(*args, **kwargs):start = time.time()res = func(*args, **kwargs)print('共耗时约 {:.2f} 秒'.format(time.time() - start))return resreturn wrapper
@timer
def batch_data():""" 批量写入数据 """# 分批写# for i in range(1, 10000001, 10000):#     action = [{#         "_index": "s2",#         "_type": "doc",#         "_source": {#             "title": k#         }#     } for k in range(i, i + 10000)]#     helpers.bulk(es, action)# 使用生成器for i in range(1, 100001, 1000):action = ({"_index": "s2","_type": "doc","_source": {"title": k}} for k in range(i, i + 1000))helpers.bulk(es, action)if __name__ == '__main__':# create_data()batch_data()	# 耗时 93.53 s

1.3 批量插入优化

采用 Python 生成器

import time
from elasticsearch import Elasticsearch
from elasticsearch import helperses = Elasticsearch()def timer(func):def wrapper(*args, **kwargs):start = time.time()res = func(*args, **kwargs)print('共耗时约 {:.2f} 秒'.format(time.time() - start))return resreturn wrapper
@timer
def gen():""" 使用生成器批量写入数据 """action = ({"_index": "s2","_type": "doc","_source": {"title": i}} for i in range(100000))helpers.bulk(es, action)if __name__ == '__main__':# create_data()# batch_data()gen()		# 约90s

参考文章:https://www.cnblogs.com/Neeo/articles/10788573.html

http://www.lryc.cn/news/735.html

相关文章:

  • 【排序算法】快速排序(Quick Sort)
  • SpringIOC之创建Bean的核心方法doGetBean
  • docker快速部署xxjob2.3.0-SpringBoot快速集成示例
  • 项目管理的前路,前辈能给一些意见吗?
  • 省钱的年轻人,钱包被折扣店钻了空子
  • 【华为OD机试真题 js、python】优选核酸检测点、寻找核酸检测点【2022 Q4 100分】
  • 【MySQL】MySQL 8.0 新特性之 - 公用表表达式(CTE)
  • 基础面试题:C++中如何理解const修饰符
  • 在RT-Thread STM32F407平台下配置SPI flash为U盘
  • 数据存储技术复习(二)未完
  • 使用 QuTrunk+Amazon Deep Learning AMI(TensorFlow2)构建量子神经网络
  • python selenium浏览器复用技术
  • 第二章:创建虚拟机
  • 码上【call,apply,bind】的手写
  • 代谢组学Nature子刊!抑郁症居然“男女有别”,脑膜淋巴管起关键作用!
  • nacos配置中心搭建
  • uni-app低成本封装一个取色器组件
  • APP 怎么免费接入 MobPush
  • XGBoost
  • 你是什么时候从轻视到高看软件测试的?
  • 基于ssm的航空售票系统
  • 滑动窗口最大值
  • 接口文档参考示例
  • 2010-2019年290个城市经济发展与环境污染数据
  • web开发
  • 【数据结构】优先级队列----堆
  • Python深度学习实战PyQt5信号与槽的连接
  • Window 10 OpenCV 打开罗技(Logitech)摄像头速度慢问题解决
  • 基于yolo的小球位置实时检测
  • 【微服务】Elasticsearch数据聚合自动补全数据同步(四)