当前位置: 首页 > news >正文

Python访问ElasticSearch

ElasticSearch是广受欢迎的NoSQL数据库,其分布式架构提供了极佳的数据空间的水平扩展能力,同时保障了数据的可靠性;反向索引技术使得数据检索和查询速度非常快。更多功能参见官网介绍

https://www.elastic.co/cn/elasticsearch/

下面简单罗列了通过Python访问ES的方法。

注:本文不是Elasticsearch的入门介绍,需要有ES基本知识。

Python - ElasticSearch 接口

Elastic提供的Python ElasticSearch原生接口,源代码托管在Github上。项目链接和文档链接如下:

https://github.com/elastic/elasticsearch-py

https://www.elastic.co/guide/en/elasticsearch/client/python-api/7.17/examples.html#examples

下面是常见操作示例:

建立ES连接

    
from elasticsearch import Elasticsearch, helpers
from elasticsearch.exceptions import ConnectionError, ConnectionTimeout, TransportError
...try :#es = Elasticsearch(es_server, retry_on_timeout=True)es = Elasticsearch(es_server, http_auth=(es_user, es_pass), timeout=30, max_retries=10, retry_on_timeout=True) print("Connection failed, exit ...")sys.exit(1)

创建ES数据

doc = {'author': 'author_name','text': 'Interesting content...','timestamp': datetime.now(),
}
res = es.index(index="test-index", id=1, body=doc)

获取ES数据

res = es.get(index="test-index", id=1)

通过查询获取ES数据

    query={"match_all":{}}try :result = es.search(index=index, query=query, size=10000)except([ConnectionError, ConnectionTimeout, TransportError]):print("Connection failed, exit ...")sys.exit(1)data=[]for item in result['hits']['hits'] :data.push(item['_source'])

更新ES数据

doc = {'author': 'author_name','text': 'Interesting modified content...','timestamp': datetime.now(),
}
res = es.update(index="test-index", id=1, body=doc)

删除ES数据

es.delete(index="test-index", id=1)

ElasticSearch-DSL python接口

原生ES python接口在查询时需要编写复杂的DSL查询语句,Elastic提供的ElasticSearch-DSL库极大地简化了查询语法,方便编写查询语句。相关项目和文档的URL:

https://github.com/elastic/elasticsearch-dsl-py

https://elasticsearch-dsl.readthedocs.io/en/latest/

示例代码如下:

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Searchclient = Elasticsearch()s = Search(using=client, index="my-index") \.filter("term", category="search") \.query("match", title="python")   \.exclude("match", description="beta")s.aggs.bucket('per_tag', 'terms', field='tags') \.metric('max_lines', 'max', field='lines')response = s.execute()for hit in response:print(hit.meta.score, hit.title)for tag in response.aggregations.per_tag.buckets:print(tag.key, tag.max_lines.value)

ElasticSearch - Pandas 接口

Pandas是流行的大数据处理Python库,Elastic提供了Pandas DataFrame的接口 ,可以直接将索引(数据表)中的数据放到 pandas 的 dataframe 中,非常方便。相关项目和文档URL如下:

https://github.com/elastic/eland

https://eland.readthedocs.io/en/latest/reference/dataframe.html

注意:返回的并不是原生Pandas DataFrame,而是Elastic自己的实现,但并没有实现所有DataFrame的功能。

示例代码如下:

import eland as ed# Connecting to an Elasticsearch instance running on 'localhost:9200'
df = ed.DataFrame("localhost:9200", es_index_pattern="flights")

也可以先建立 ES 连接

# Connecting to an Elastic Cloud instance
from elasticsearch import Elasticsearches = Elasticsearch("localhost:9200",http_auth=("elastic", "<password>")
)
df = ed.DataFrame(es, es_index_pattern="flights")

 

第三方 ElasticSearch - Pandas 接口

eland虽然可以方便将 Elastic 中的数据转换为 dataframe,但没有提供将 dataframe 保存到 Elastic的接口。这时我们需要使用第三方的接口。es_pandas是开源的 ES Pandas接口,可以直接将ES查询得到的数据以Pandas DataFrame的方式返回,也可将 dataframe 保存到 Elastic 中。

https://github.com/fuyb1992/es_pandas

初始化与ES的连接

import pandas as pd
from es_pandas import es_pandas...
epcon = None
try :epcon = es_pandas(esurl)
except Exception as e:logger.error("Initializa DB connection failed! Error[{}]".format(str(e)))

从ES表中获取数据,返回格式为Pandas DataFrame

# 从ES表中获取数据返回DataFrame 
try: if query is None:data = epcon.to_pandas(dbname, infer_dtype=True, show_progress=False)else:data = epcon.to_pandas(dbname, infer_dtype=True, show_progress=False, query_rule=query)
except exceptions.NotFoundError:logger.debug("Not found data. Params: dbname[{}] query[{}]".format(dbname, query))

将Pandas DataFrame中的数据写入ES表中

# 将DataFrame中的数据写入ES表中   
ret = True
try:epcon.to_es(df, dbname, use_index=True, _op_type='create', thread_count=2, chunk_size=10000, show_progress=False)
except ConnectionError:ret = Falselogger.error("Save data failed! Params: dbname[{}] data[{}],, connection error!".format(dbname, df))

将Pandas DataFrame中的数据更新到ES表中

# 将DataFrame中的更新到ES表中   
ret = True
try:epcon.to_es(df, dbname, use_index=True, _op_type='update', thread_count=2, chunk_size=10000, show_progress=False)
except ConnectionError:ret = Falselogger.error("Update data failed! Params: dbname[{}] data[{}],, connection error!".format(dbname, df))

将Pandas DataFrame中的数据从ES表中删除

# 将DataFrame中的数据从ES表中删除   
ret = True
try:epcon.to_es(df, dbname, use_index=True, _op_type='delete', thread_count=2, chunk_size=10000, show_progress=False)
except ConnectionError:ret = Falselogger.error("Delete data failed! Params: dbname[{}] data[{}],, connection error!".format(dbname, df))

http://www.lryc.cn/news/272007.html

相关文章:

  • Flutter 混合开发 - 动态下发 libflutter.so libapp.so
  • Peter算法小课堂—动态规划
  • 2022–2023学年2021级计算机科学与技术专业数据库原理 (A)卷
  • Clojure 实战(4):编写 Hadoop MapReduce 脚本
  • Django 分页(表单)
  • socket实现视频通话-WebRTC
  • simulink代码生成(九)—— 串口显示数据(纸飞机联合调试)
  • Mysql数据库(中)——增删改查的学习(全面,详细)
  • test dbtest-03-对比 Liquibase、flyway、dbDeploy、dbsetup
  • 力导向图与矩阵排序
  • word 常用功能记录
  • C#线程基础(线程启动和停止)
  • 如何利用ChatGPT来提高编程效率
  • java智慧工地源码,互联网+建筑工地,实现对工程项目内人员、车辆、安全、设备、材料等的智能化管理
  • 创建并使用自己的C++模块(Windows10+MSVC)
  • Spring Boot 2.7.11 集成 GraphQL
  • 软件工程期末总结
  • MidTool图文创作-GPT-4与DALL·E 3的结合
  • Python将两个或多个列表合并为一个列表,并根据每个输入列表中的元素的位置将其组合在一起
  • 数模混合SoC芯片中LEF2Milkyway的golden flow
  • Five tips to make your essay flow
  • linux驱动(二):led补
  • 性能测试-jmeter:安装 / 基础使用
  • 数据仓库-数仓优化小厂实践
  • uniapp中uview组件丰富的Code 验证码输入框的使用方法
  • md文件图片上传方案:Github+PicGo 搭建图床
  • 从零开始 - 在Python中构建和训练生成对抗网络(GAN)模型
  • OfficeWeb365 Indexs 任意文件读取漏洞复现
  • Crypto的简单应用-前后端加密传输
  • Vue3-32-路由-重定向路由