当前位置: 首页 > news >正文

Lostash同步Mysql数据到Elasticsearch(三)Elasticsearch模板与索引设置

logstash数据同步ES相关

同步数据时,Elasticsearch配合脚本的相关处理设置

1.模板创建更新

在kibana中执行,或者直接给ES发送请求,你懂得,不懂得百度下ES创建template
PUT /_template/test-xxx
{"template": "idx_znyw_data_gkb_logstash","order": 1,"settings": {"number_of_shards": 1},"mappings": {"dynamic": "true","dynamic_date_formats": ["yyyy-MM-dd'T'HH:mm:ss.SSS'Z'","yyyy-MM-dd'T'HH:mm:ss.SSS+0800","yyyy-MM-dd'T'HH:mm:ss'Z'","yyyy-MM-dd'T'HH:mm:ss+0800","yyyy-MM-dd'T'HH:mm:ss","yyyy-MM-dd HH:mm:ss","yyyy-MM-dd"],"properties": {"@timestamp": {"type": "date"},"@version": {"type": "integer"},"_class": {"type": "text","fields": {"keyword": {"type": "keyword","ignore_above": 256}}},"id": {"type": "long"},"location": {"properties": {"lat": {"type": "double"},"lon": {"type": "double"}}},"latitude": {"type": "double"},"longitude": {"type": "double"},"ipAddr": {"type": "ip","index": true,"store": false},"setupTime": {"type": "date","index": true,"store": false,"format": "date","ignore_malformed": false},"updateDate": {"type": "date","index": true,"store": false,"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd'T'HH:mm:ss||yyyy-MM-dd'T'HH:mm:ss+0800||yyyy-MM-dd'T'HH:mm:ss'Z'||yyyy-MM-dd'T'HH:mm:ss.SSS+0800||yyyy-MM-dd'T'HH:mm:ss.SSS'Z'||yyyy-MM-dd||epoch_millis","ignore_malformed": false},"delFlag": {"type": "integer"}}}
}

2.特殊版本类型logstash脚本处理

上面红色部分的需要特殊处理,下面给出特殊操作的logstash脚本处理方法,在filter中特殊处理,这里是直接上干货,其他的概念性的问题,百度一下。

filter {#坐标 mutate {add_field => ["[location][lon]", "%{[longitude]}"]add_field => ["[location][lat]", "%{[latitude]}"]}#时间
ruby {code => "date_person = event.get('setupTime').time.localtime + 0*60*60event.set('setupTime', date_person.strftime('%Y-%m-%d'))"} 
ruby {code => "
event.set('createDate', event.get('createDate').time.localtime + 0*60*60)"}}

3.索引更新设置

#在kibana中执行,执行前要关闭索引,但是注意,这里不能修改分片数,分片数只能在索引创建时指定,那数据量超量时,分片数不够用怎么整呢,看标题4
//先把索引关掉
POST /idx_xxx_sysinfo/_close
POST /idx_xxx_sysinfo/_open
//更新索引
PUT /idx_xxx_sysinfo/_settings
{"index":{"number_of_replicas": 2,"max_result_window": 65536,"max_inner_result_window": 10000,"translog.durability": "request","translog.sync_interval": "3s","auto_expand_replicas": false,"analysis.analyzer.default.type": "ik_max_word","analysis.search_analyzer.default.type": "ik_smart","shard.check_on_startup": false,"codec": "default","store.type": "niofs"}
}

4.分片数量不够时(ES内部内部数据迁移策略)

ES内部内部数据迁移策略

使用场景:

1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。

2、当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;但是在ES中,一个字段的mapping在定义并且导入数据之后是不能再修改的,

所以这种情况下也可以考虑尝试使用Reindex。

POST _reindex?slices=5&refresh
{"source": {"index": "idx_znyg_datanbqseries_new","size": 10000  //批量执行数},"dest": {"index": "idx_znyg_datanbqseries",//version_type"version_type": "internal"或者不设置,则Elasticsearch强制性的将文档转储到目标中,覆盖具有相同类型和ID的任何内容:"version_type": "internal"}
}

slices大小设置注意事项:
1)slices大小的设置可以手动指定,或者设置slices设置为auto,auto的含义是:针对单索引,slices大小=分片数;针对多索引,slices=分片的最小值。
2)当slices的数量等于索引中的分片数量时,查询性能最高效。slices大小大于分片数,非但不会提升效率,反而会增加开销。
3)如果这个slices数字很大(例如500),建议选择一个较低的数字,因为过大的slices 会影响性能。

效果
实践证明,比默认设置reindex速度能提升10倍+。

创建索引


PUT /idx_znyg_datanbqseries_new?pretty
{"settings": {"index.number_of_shards": 12,"index.number_of_replicas": 1,"index.max_result_window": 65536,"index.max_inner_result_window": 10000,"index.translog.durability": "request","index.translog.sync_interval": "3s","index.auto_expand_replicas": false,"index.analysis.analyzer.default.type": "ik_smart","index.analysis.search_analyzer.default.type": "ik_smart","index.shard.check_on_startup": false,"index.codec": "default","index.store.type": "niofs"}, "mappings": {}}

5.logstash大数据量脚本加速

脚本执行SQL分析,人为加速处理设置。

如下SQL为logstash数据同步执行脚本,分析设置后,做如下处理,OFFSET -500获取上次执行完数据,拿最小ID保存至ID文件内,关闭服务重启启动,此时速度飞起,当OFFSET 数量过500万时,每次查询30S+,所以速度就下降了。
SELECT * FROM (select del_flag as delFlag from znyw_data_nbq_series WHERE id>= 1347422522840252418) AS `t1` LIMIT 500 OFFSET 500

6.logstash特性(重要)

有时候发现数据库数量和索引数量不匹配,需要重新同步,请按照如下操作处理:
1.停止服务
2.删除索引更新数据文件
3.重启启动服务查看日志
这样操作是因为,增量id更新,没有读文件,这样操作最保险,还有就是数据不一致不要慌,查看任务执行日志,有些脚本过滤插件有问题的也不会更新。

http://www.lryc.cn/news/176667.html

相关文章:

  • python——ptp()函数
  • SpringBoot之异常处理
  • Flask-[实现websocket]-(2): flask-socketio文档学习
  • 网页中使用的图片格式
  • 【android】如何设置LD_LIBRARY_PATH?
  • 【hadoop3.x】一 搭建集群调优
  • linux使用操作[2]
  • 华南理工大学电子与信息学院23年预推免复试面试经验贴
  • Linux网络编程- ether_header iphdr tcphdr
  • wpf中的StaticResource和DynamicResource
  • 数据结构与算法基础-(3)
  • maven中relativepath标签的含义
  • Greenplum 对比 Hadoop
  • OJ练习第182题——字典树(前缀树)
  • 前端知识总结
  • 中国JP-10燃料行业市场研究与预测报告(2023版)
  • 护眼灯显色指数应达多少?眼科医生推荐灯光显色指数多少合适
  • AI 大模型
  • 一个案例熟悉使用pytorch
  • MySQL - limit 分页查询 (查询操作 五)
  • 代码随想录笔记--动态规划篇
  • vue之vuex
  • ISO 26262 系列学习笔记 ———— ASIL定义(Automotive Safety Integration Level)
  • 代码随想录 第8章 二叉树
  • 计算机网络工程师多选题系列——计算机网络
  • Zabbix5.0_介绍_组成架构_以及和prometheus的对比_大数据环境下的监控_网络_软件_设备监控_Zabbix工作笔记001
  • Spring | 事件监听器应用与最佳实践
  • 正点原子lwIP学习笔记——NETCONN接口简介
  • PHP自动识别采集何意网址文章正文内容
  • 区块链实验室(27) - 区块链+物联网应用案例