当前位置: 首页 > news >正文

将 OneLake 数据索引到 Elasticsearch - 第 1 部分

作者:来自 Elastic Gustavo Llermaly

学习配置 OneLake,使用 Python 消费数据并在 Elasticsearch 中索引文档,然后运行语义搜索。

OneLake 是一款工具,可让你连接到不同的 Microsoft 数据源,例如 Power BI、Data Activator 和 Data factory 等。它支持将数据集中在 DataLakes 中,DataLakes 是支持全面数据存储、分析和处理的大容量存储库。

在本文中,我们将学习如何配置 OneLake、使用 Python 消费数据以及在 Elasticsearch 中索引文档,然后运行语义搜索。

有时,你可能希望在非结构化数据和来自不同来源和软件提供商的结构化数据中运行搜索,并使用 Kibana 创建可视化。对于这种任务,在 Elasticsearch 中索引文档作为中央存储库会变得非常有用。

在这个例子中,我们将使用一家名为 Shoestic 的虚拟公司,这是一家在线鞋店。我们在结构化文件 (CSV) 中列出了产品列表,而一些产品的数据表则采用非结构化格式 (DOCX)。这些文件存储在 OneLake 中。

你可以在此处找到包含完整示例(包括测试文档)的笔记本。

步骤

  • OneLake 初始配置
  • 使用 Python 连接到 OneLake
  • 索引文档
  • 查询

OneLake 初始配置

OneLake 架构可以总结如下:

要使用 OneLake 和 Microsoft Fabric,我们需要一个 Office 365 帐户。如果你没有,可以在此处创建一个试用帐户。

使用你的帐户登录 Microsoft Fabric。然后,创建一个名为 “ShoesticWorkspace” 的工作区。进入新创建的工作区后,创建一个 Lakehouse 并将其命名为“ShoesticDatalake”。最后一步是在 “Files” 中创建一个新文件夹。单击 “new subfolder” 并将其命名为 “ProductsData”。

完成了!我们准备开始提取数据了。

使用 Python 连接到 OneLake

配置完 OneLake 后,我们现在可以准备 Python 脚本。Azure 有处理凭据并与 OneLake 通信的库。

pip install azure-identity elasticsearch==8.14 azure-storage-file-datalake azure-cli python-docx

“azure-identity azure-storage-file-datalake” 库让我们可以与 OneLake 交互,同时 “azure-cli” 可以访问凭据并授予权限。为了读取文件内容以便稍后将其索引到 Elasticsearch,我们使用 python-docx。

在我们的本地环境中保存 Microsoft 凭据

我们将使用 “az login” 进入我们的 Microsoft 帐户并运行:

 az login --allow-no-subscriptions

标志 “ --allow-no-subscriptions”允许我们在没有有效订阅的情况下向 Microsoft Azure 进行身份验证。

此命令将打开一个浏览器窗口,你必须在其中访问你的帐户,然后选择你帐户的订阅号。

现在我们可以开始编写代码了!

创建一个名为 onelake.py 的文件并添加以下内容:

_onelake.py_

# Importing dependencies 
import chardet 
from azure.identity import DefaultAzureCredential 
from docx import Document 
from azure.storage.filedatalake import DataLakeServiceClient # Initializing the OneLake client 
ONELAKE_ACCOUNT_NAME = "onelake" 
ONELAKE_WORKSPACE_NAME = "ShoesticWorkspace" 
# Path in format <DataLake>.Lakehouse/files/<Folder path> 
ONELAKE_DATA_PATH = "shoesticDatalake.Lakehouse/Files/ProductsData" # Microsoft token 
token_credential = DefaultAzureCredential() # OneLake services 
service_client = DataLakeServiceClient( account_url=f"https://{ONELAKE_ACCOUNT_NAME}.dfs.fabric.microsoft.com", credential=token_credential, 
) 
file_system_client = service_client.get_file_system_client(ONELAKE_WORKSPACE_NAME) 
directory_client = file_system_client.get_directory_client(ONELAKE_DATA_PATH) # OneLake functions   # Upload a file to a LakeHouse directory 
def upload_file_to_directory(directory_client, local_path, file_name): file_client = directory_client.get_file_client(file_name) with open(local_path, mode="rb") as data: file_client.upload_data(data, overwrite=True) print(f"File: {file_name} uploaded to the data lake.") # Get directory contents from your lake folder 
def list_directory_contents(file_system_client, directory_name): paths = file_system_client.get_paths(path=directory_name) for path in paths: print(path.name + "\n") # Get a file by name from your lake folder 
def get_file_by_name(file_name, directory_client): return directory_client.get_file_client(file_name) # Decode docx 
def get_docx_content(file_client): download = file_client.download_file() file_content = download.readall() temp_file_path = "temp.docx" with open(temp_file_path, "wb") as temp_file: temp_file.write(file_content) doc = Document(temp_file_path) text = [] for paragraph in doc.paragraphs: text.append(paragraph.text) return "\n".join(text) # Decode csv 
def get_csv_content(file_client): download = file_client.download_file() file_content = download.readall() result = chardet.detect(file_content) encoding = result["encoding"] return file_content.decode(encoding) 

将文件上传到 OneLake

在此示例中,我们将使用一个 CSV 文件和一些包含有关我们鞋店产品信息的 .docx 文件。虽然你可以使用 UI 上传它们,但我们将使用 Python 来完成。在此处下载文件。

我们将文件放在文件夹 /data 中,位于名为 upload_files.py 的新 Python 脚本旁边:

# upload_files.py # Importing dependencies 
from azure.identity import DefaultAzureCredential 
from azure.storage.filedatalake import DataLakeServiceClient from functions import list_directory_contents, upload_file_to_directory 
from onelake import ONELAKE_DATA_PATH, directory_client, file_system_client csv_file_name = "products.csv" 
csv_local_path = f"./data/{csv_file_name}" docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"] 
docx_local_paths = [f"./data/{file_name}" for file_name in docx_files] # Upload files to Lakehouse 
upload_file_to_directory(directory_client, csv_local_path, csv_file_name) for docx_local_path in docx_local_paths: docx_file_name = docx_local_path.split("/")[-1] upload_file_to_directory(directory_client, docx_local_path, docx_file_name) # To check that the files have been uploaded, run "list_directory_contents" function to show the contents of the /ProductsData folder in our Datalake: 
print("Upload finished, Listing files: ") 
list_directory_contents(file_system_client, ONELAKE_DATA_PATH) 

运行上传脚本:

python upload_files.py

结果应该是:

Upload finished, Listing files: 
shoesticDatalake.Lakehouse/Files/ProductsData/beach-flip-flops.docx 
shoesticDatalake.Lakehouse/Files/ProductsData/classic-loafers.docx 
shoesticDatalake.Lakehouse/Files/ProductsData/products.csv 
shoesticDatalake.Lakehouse/Files/ProductsData/sport-sneakers.docx 

现在我们已经准备好文件了,让我们开始使用 Elasticsearch 分析和搜索我们的数据!

索引文档

我们将使用 ELSER 作为向量数据库的嵌入提供程序,以便我们可以运行语义查询。

我们选择 ELSER 是因为它针对 Elasticsearch 进行了优化,在域外检索方面胜过大多数竞争对手,这意味着按原样使用模型,而无需针对你自己的数据进行微调。

配置 ELSER

首先创建推理端点:

PUT _inference/sparse_embedding/onelake-inference-endpoint 
{ "service": "elser", "service_settings": { "num_allocations": 1, "num_threads": 1 } 

在后台加载模型时,如果你以前没有使用过 ELSER,则可能会收到 502 Bad Gateway 错误。在 Kibana 中,你可以在 “Machine Learning” > “Trained Models” 中检查模型状态。等到模型部署完成后再继续执行后续步骤。

索引数据

现在,由于我们同时拥有结构化数据和非结构化数据,因此我们将在 Kibana DevTools 控制台中使用具有不同映射的两个不同索引。

对于我们的结构化销售,让我们创建以下索引:

PUT shoestic-products 
{ "mappings": { "properties": { "product_id": { "type": "keyword" }, "product_name": { "type": "text" }, "amount": { "type": "float" }, "tags": { "type": "keyword" } } } 
} 

为了索引我们的非结构化数据(产品数据表),我们将使用:

PUT shoestic-products-descriptions 
{ "mappings": { "properties": { "title": { "type": "text", "analyzer": "english" }, "super_body": { "type": "semantic_text", "inference_id": "onelake-inference-endpoint" }, "body": { "type": "text", "copy_to": "super_body" } } } 
} 

注意:使用带有 copy_to 的字段很重要,这样还可以运行全文搜索,而不仅仅是在正文字段上运行语义搜索。

读取 OneLake 文件

在开始之前,我们需要使用这些命令(使用你自己的云 ID 和 API 密钥)初始化我们的 Elasticsearch 客户端。

创建一个名为 indexing.py 的 Python 脚本并添加以下几行:

# Importing dependencies 
import csv 
from io import StringIO from onelake import directory_client 
from elasticsearch import Elasticsearch, helpers from functions import get_csv_content, get_docx_content, get_file_by_name 
from upload_files_to_onelake import csv_file_client ELASTIC_CLUSTER_ID = "your-cloud-id" 
ELASTIC_API_KEY = "your-api-key" # Elasticsearch client 
es_client = Elasticsearch( cloud_id=ELASTIC_CLUSTER_ID, api_key=ELASTIC_API_KEY, 
) docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"] 
docx_local_paths = [f"./data/{file_name}" for file_name in docx_files] csv_file_client = get_file_by_name("products.csv", directory_client) 
docx_files_clients = [] for docx_file_name in docx_files: docx_files_clients.append(get_file_by_name(docx_file_name, directory_client)) # We use these functions to extract data from the files: 
csv_content = get_csv_content(csv_file_client) 
reader = csv.DictReader(StringIO(csv_content)) 
docx_contents = [] for docx_file_client in docx_files_clients: docx_contents.append(get_docx_content(docx_file_client)) print("CSV FILE CONTENT: ", csv_content) 
print("DOCX FILE CONTENT: ", docx_contents) # The CSV tags are separated by commas (,). We'll turn these tags into an array: 
rows = csv_content.splitlines() 
reader = csv.DictReader(rows) 
modified_rows = [] for row in reader: row["tags"] = row["tags"].replace('"', "").split(",") modified_rows.append(row) print(row["tags"]) # We can now index the files into Elasticsearch 
reader = modified_rows 
csv_actions = [{"_index": "shoestic-products", "_source": row} for row in reader] docx_actions = [ { "_index": "shoestic-products-descriptions", "_source": {"title": docx_file_name, "body": docx}, } for docx_file_name, docx in zip(docx_files, docx_contents) 
] helpers.bulk(es_client, csv_actions) 
print("CSV data indexed successfully.") 
helpers.bulk(es_client, docx_actions) 
print("DOCX data indexed successfully.") 

现在运行脚本:

python indexing.py

查询

在 Elasticsearch 中对文档进行索引后,我们就可以测试语义查询了。在本例中,我们将在某些产品(tag)中搜索唯一术语。我们将针对结构化数据运行关键字搜索,针对非结构化数据运行语义搜索。

1. 关键字搜索

GET shoestic-products/_search 
{ "query": { "term": { "tags": "summer" } } 
} 

结果:

"_source": { "product_id": "P-118", "product_name": "Casual Sandals", "amount": "128.22", "tags": [ "casual", "summer" ] } 

2. 语义搜索:

GET shoestic-products-descriptions/_search 
{ "_source": { "excludes": [ "*embeddings", "*chunks" ] }, "query": { "semantic": { "field": "super_body", "query": "summer" } } 
} 

*我们排除了嵌入和块只是为了便于阅读。

结果:

"hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 4.3853106, "hits": [ { "_index": "shoestic-products-descriptions", "_id": "P2Hj6JIBF7lnCNFTDQEA", "_score": 4.3853106, "_source": { "super_body": { "inference": { "inference_id": "onelake-inference-endpoint", "model_settings": { "task_type": "sparse_embedding" } } }, "title": "beach-flip-flops.docx", "body": "Ideal for warm, sunny days by the water, these lightweight essentials are water-resistant and come in bright colors, bringing a laid-back vibe to any outing in the sun." } } ] } 

如你所见,当使用关键字搜索时,我们会得到与其中一个标签的完全匹配,相反,当我们使用语义搜索时,我们会得到与描述中的含义匹配的结果,而无需完全匹配。

结论

OneLake 使使用来自不同 Microsoft 来源的数据变得更容易,然后索引这些文档 Elasticsearch 允许我们使用高级搜索工具。在第一部分中,我们学习了如何连接到 OneLake 并在 Elasticsearch 中索引文档。在第二部分中,我们将使用 Elastic 连接器框架制作更强大的解决方案。敬请期待!

想要获得 Elastic 认证?了解下一次 Elasticsearch 工程师培训的时间!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在你的本地机器上试用 Elastic。

原文:Indexing OneLake data into Elasticsearch - Part 1 - Elasticsearch Labs

http://www.lryc.cn/news/528205.html

相关文章:

  • 【C++】STL介绍 + string类使用介绍 + 模拟实现string类
  • Hive:基本查询语法
  • 日志收集Day008
  • 【解决方案】VMware虚拟机adb连接宿主机夜神模拟器
  • 基于金融新闻的大型语言模型强化学习在投资组合管理中的应用
  • 脚本运行禁止:npm 无法加载文件,因为在此系统上禁止运行脚本
  • 借DeepSeek-R1东风,开启创业新机遇
  • C# lock使用详解
  • 简易CPU设计入门:控制总线的剩余信号(四)
  • 使用 lock4j-redis-template-spring-boot-starter 实现 Redis 分布式锁
  • 22_解析XML配置文件_List列表
  • 编译器gcc/g++ --【Linux基础开发工具】
  • 58.界面参数传递给Command C#例子 WPF例子
  • games101-(5/6)
  • 人工智能在计算机视觉中的应用与创新发展研究
  • 1-2 飞机大战游戏场景
  • Mac Electron 应用签名(signature)和公证(notarization)
  • Sklearn 中的逻辑回归
  • 【阅读笔记】New Edge Diected Interpolation,NEDI算法,待续
  • 编程题-最长的回文子串(中等)
  • Versal - 基础3(AXI NoC 专题+仿真+QoS)
  • 知识库建设对提升团队协作与创新能力的影响分析
  • Java 实现Excel转HTML、或HTML转Excel
  • stack 和 queue容器的介绍和使用
  • 云计算与虚拟化技术讲解视频分享
  • python flask 使用 redis写一个例子
  • 深入解析 Linux 内核内存管理核心:mm/memory.c
  • 跟我学C++中级篇——64位的处理
  • 指针的介绍2后
  • Linux 学习笔记__Day3