当前位置: 首页 > news >正文

Logstash同步MySQL数据到ElasticSearch

当MySQL数据到一定的数量级,而且索引不能实现时,查询就会变得非常缓慢,所以使用ElasticSearch来查询数据。本篇博客介绍使用Logstash同步MySQL数据到ElasticSearch,再进行查询。

测试环境

  • Windows系统
  • MySQL 5.7
  • Logstash 7.0.1
  • ElasticSearch 7.0.1
  • Kibana 7.0.1

ELK工具下载可访问:https://www.elastic.co/cn/downloads/

ELK同步环境搭建

ElasticSearch、Kibana启动

将下载的ElasticSearch、Kibana解压,并依次启动,Windows目录下,ElasticSearch启动可点击bin/elasticsearch.bat,Kibana启动可点击kibana.bat

Logstash配置启动

核心是Logstash的配置。

1、解压Logstash

2、将MySQL的JDBC的连接包放入lib包下

3、在bin目录下新建配置文件-logstash_sync_mysql.conf,需要注意该配置文件需要UTF-8 无BOM格式,不然会报错。

4、编写配置文件

input {jdbc {# 索引类型type => "product"# 驱动包位置jdbc_driver_library => "D:\ELk_SYNC_MYSQL\logstash-7.0.1\lib\mysql\mysql-connector-java-5.1.43.jar"# 驱动jdbc_driver_class => "com.mysql.jdbc.Driver"# 数据库名称jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/clothingsale?useUnicode=true&characterEncoding=UTF-8&useSSL=true"# 用户名jdbc_user => "root"# 密码jdbc_password => "root"# SQL文件# statement_filepath => "filename.sql"# SQL语言 SELECT * FROM product WHERE update_time > :last_sql_valuestatement => "SELECT * from product"# 设置时区jdbc_default_timezone => "Asia/Shanghai"# 是否分页jdbc_paging_enabled => "true"# 分页数量jdbc_page_size => "500"# 追踪字段tracking_column => "update_time"# 这里如果是用时间追踪比如:数据的更新时间或创建时间等和时间有关的这里一定不能是trueuse_column_value => false# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新schedule => "* * * * *"}jdbc {# 索引类型type => "message"# 驱动包位置jdbc_driver_library => "D:\ELk_SYNC_MYSQL\logstash-7.0.1\lib\mysql\mysql-connector-java-5.1.43.jar"# 驱动jdbc_driver_class => "com.mysql.jdbc.Driver"# 数据库名称jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/clothingsale?useUnicode=true&characterEncoding=UTF-8&useSSL=true"# 用户名jdbc_user => "root"# 密码jdbc_password => "root"# SQL文件# statement_filepath => "filename.sql"# SQL语言 SELECT * FROM product WHERE update_time > :last_sql_valuestatement => "SELECT * from message"# 设置时区jdbc_default_timezone => "Asia/Shanghai"# 是否分页jdbc_paging_enabled => "true"# 分页数量jdbc_page_size => "500"# 追踪字段tracking_column => "update_time"# 这里如果是用时间追踪比如:数据的更新时间或创建时间等和时间有关的这里一定不能是trueuse_column_value => false# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新schedule => "* * * * *"}
}# 修改@timestamp默认时间
filter {ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" }ruby {code => "event.set('@timestamp',event.get('timestamp'))"}mutate {remove_field => ["timestamp"]}
}output {# 目前使用的elasticsearch7.x,所以一个index只能存储一种type,所以以下的index需要不一样if [type]=="product" {elasticsearch {hosts => "127.0.0.1:9200"# 索引名称 相当于数据库名称index => "cloproduct"# 类型名称 相当于数据库中的数据表document_type => "product"document_id => "%{id}"}}if [type]=="message" {elasticsearch {hosts => "127.0.0.1:9200"# 索引名称 相当于数据库名称index => "clomessage"# 类型名称 相当于数据库中的数据表document_type => "message"document_id => "%{id}"}}
}

上述是多表同步,每行都有注释,意思比较明了,就是input中多表使用jdbc隔开,然后output中用type区分。

5、启动

logstash -f logstash_sync_mysql.conf >> C:\Users\Panlf\Desktop\log.txt

这样还能看到实时日志产生,方便查看错误和进程。

注意

上述即可实现MySQL的数据同步,但是存在问题 - 时区问题,MySQL是时间比ElasticSearch晚8个小时,我试了各种方式还是不能解决把时间调整过来。目前可以在取数据的时候,进行时间调整,应该问题不是很大。

http://www.lryc.cn/news/156695.html

相关文章:

  • 【C++】运算符重载的示例实现和应用
  • Kubernetes禁止调度
  • CocosCreator3.8研究笔记(七)CocosCreator 节点和组件的介绍
  • Ceph入门到精通-C++入门知识点
  • Ansible之playbook详解和应用实例
  • 经验萃取方法
  • 手写apply方法
  • Jenkins实现基础CD操作
  • 开源软件合集(Docker)
  • Ceph入门到精通-生产日志级别设置
  • 16-MyCat
  • RKNPU2通用API和零拷贝API
  • LeetCode 1123. 最深叶节点的最近公共祖先:DFS
  • 多线程应用——线程池
  • OPENCV+QT环境配置
  • Kafka3.0.0版本——文件清理策略
  • SRT参数说明
  • vue响应式原理
  • elk安装篇之 Kibana安装
  • MySQL 用户授权管理及白名单
  • pc-签字画板vue-esign的使用
  • javaScript:节点操作
  • git 忽略已经提交的文件或文件夹 (修改.gitignore文件无效)
  • 学习左耳听风栏目90天——第十二天 12/90(学习左耳朵耗子的工匠精神,对技术的热爱)【时间管理:同扭曲时间的事儿抗争】
  • 前端如何将后台数组进行等分切割
  • 如何有效防止服务器被攻击?
  • layui表格高度
  • 一文1800字从0到1使用Python Flask实战构建Web应用
  • 【LeetCode-中等题】210. 课程表 II
  • vue修饰符的用法