当前位置: 首页 > news >正文

Logstash 迁移索引元数据(设置和映射)

https://help.aliyun.com/zh/es/use-cases/use-logstash-to-migrate-full-or-incremental-data-from-self-managed-elasticsearch-to-alibaba-cloud-elasticsearch

在进行数据迁移时,Logstash会帮助您自动创建索引,但是自动创建的索引可能与您待迁移的索引存在差异,导致迁移前后数据的格式不一致。因此建议您在数据迁移前,在阿里云Elasticsearch中手动创建目标索引,确保迁移前后索引数据完全一致。

您可以通过Python脚本创建目标索引,具体操作步骤如下:

适配 Python 3.10.9

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import base64
import http.client
import json## 源集群host。
oldClusterHost = "localhost:9200"
## 源集群用户名,可为空。
oldClusterUserName = "elastic"
## 源集群密码,可为空。
oldClusterPassword = "xxxxxx"
## 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
newClusterHost = "jiankunking****.elasticsearch.aliyuncs.com:9200"
## 目标集群用户名。
newClusterUser = "elastic"
## 目标集群密码。
newClusterPassword = "xxxxxx"
DEFAULT_REPLICAS = 0def httpRequest(method, host, endpoint, params="", username="", password=""):conn = http.client.HTTPConnection(host)headers = {}if (username != ""):'Hello {name}, your age is {age} !'.format(name='Tom', age='20')up = ('{username}:{password}'.format(username=username, password=password))# print(up)# print(up.encode())# base64string = base64.encodestring(#     up.encode()).replace('\n', '')base64string = base64.b64encode(up.encode()).decode()print(base64string)headers["Authorization"] = "Basic %s" % base64string;if "GET" == method:headers["Content-Type"] = "application/x-www-form-urlencoded"conn.request(method=method, url=endpoint, headers=headers)else:headers["Content-Type"] = "application/json"conn.request(method=method, url=endpoint, body=params, headers=headers)response = conn.getresponse()res = response.read()return resdef httpGet(host, endpoint, username="", password=""):return httpRequest("GET", host, endpoint, "", username, password)def httpPost(host, endpoint, params, username="", password=""):return httpRequest("POST", host, endpoint, params, username, password)def httpPut(host, endpoint, params, username="", password=""):return httpRequest("PUT", host, endpoint, params, username, password)def getIndices(host, username="", password=""):endpoint = "/_cat/indices"indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)indicesList = indicesResult.decode().split("\n")indexList = []for indices in indicesList:if (indices.find("open") > 0):indexList.append(indices.split()[2])return indexListdef getSettings(index, host, username="", password=""):endpoint = "/" + index + "/_settings"indexSettings = httpGet(host, endpoint, username, password)print(index + "  原始settings如下:\n" + indexSettings.decode())settingsDict = json.loads(indexSettings)## 分片数默认和源集群索引保持一致。number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]## 副本数默认为0。number_of_replicas = DEFAULT_REPLICASnewSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)return newSettingdef getMapping(index, host, username="", password=""):endpoint = "/" + index + "/_mapping"indexMapping = httpGet(host, endpoint, username, password)print(index + " 原始mapping如下:\n" + indexMapping.decode())mappingDict = json.loads(indexMapping)mappings = json.dumps(mappingDict[index]["mappings"])newMapping = "\"mappings\" : " + mappingsreturn newMappingdef createIndexStatement(oldIndexName):settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"return createstatementdef createIndex(oldIndexName, newIndexName=""):if (newIndexName == ""):newIndexName = oldIndexNamecreatestatement = createIndexStatement(oldIndexName)print("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement)endpoint = "/" + newIndexNamecreateResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)print("新索引 " + newIndexName + " 创建结果:" + createResult.decode())## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:if (index.startswith(".")):systemIndex.append(index)else:createIndex(index, index)
if (len(systemIndex) > 0):for index in systemIndex:print(index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")
http://www.lryc.cn/news/472897.html

相关文章:

  • 用python将pdf转成图片转换成对应的word文件
  • list(c++)
  • 51单片机STC8G串口Uart配置
  • uni-app使用movable-area 实现数据的拖拽排序功能
  • 如何设置使PPT的画的图片导出变清晰
  • 和鲸科技 CEO 范向伟受邀揭牌启动南京大学 2024 级大学生人工智能素养大赛
  • NewStarCTF2024-Week4-Web-WP
  • Java学习Day56:暴打舔狗!(SpringBoot)
  • RSA加密算法实现
  • 大数据新视界 -- 大数据大厂之优化大数据计算框架 Tez 的实践指南
  • java 中 List<T> 类型数据在 postgreSql 数据库中存储
  • 公共命名空间,2024年10月的笔记
  • frida脚本,自动化寻址JNI方法
  • ‌MySQL中‌between and的基本用法‌
  • Ceph 存储系统全解
  • C# ftp帮助类 项目实战优化版
  • 栈和队列相关|有效的括号|用队列实现栈|用栈实现队列|设计循环队列(C)
  • 云原生后端开发教程
  • TortoiseSVN小乌龟下载安装(Windows11)
  • Android adb命令获取设备id
  • Skywalking教程一
  • React中管理state的方式
  • 服务器数据恢复—RAID5阵列中部分成员盘重组RAID5阵列后如何恢复原raid5阵列数据?
  • 【Linux】文件切割排序 cut sort
  • 零售EDI:HornBach EDI 项目案例
  • SpringBoot 集成RabbitMQ 实现钉钉日报定时发送功能
  • 基于java ssm springboot女士电商平台系统源码+文档设计
  • Matlab数字信号处理——基于改进小波变换的图像去噪方法(7种去噪算法)
  • leetcode hot100【LeetCode 70. 爬楼梯】java实现
  • Java异常2