当前位置: 首页 > news >正文

mysql实时同步到es

测试了多个方案同步,最终选择oceanu产品,底层基于Flink cdc
1、实时性能够保证,binlog量很大时也不产生延迟
2、配置SQL即可完成,操作上简单

下面示例mysql的100张分表实时同步到es,优化备注等文本字段的like查询

创建SQL作业

CREATE TABLE from_mysql (id int,cid int NOT NULL,gid bigint NOT NULL,content varchar,create_time TIMESTAMP(3)  ,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = 'mysql-ip','port' = '3306','username' = 'mysqluser','password' = 'mysqlpwd','database-name' = 'mysqldb','debezium.snapshot.locking.mode' = 'none','table-name' = 'tb_test[0-9]?[0-9]','server-id' = '100-110','server-time-zone' = 'Asia/Shanghai','debezium.skipped.operations' = 'd','debezium.snapshot.mode' = 'schema_only','debezium.min.row.count.to.stream.results' = '50000'
);CREATE TABLE to_es (id string,tableid int,tablename string,cid int NOT NULL,gid string NOT NULL,content string,create_time string,PRIMARY KEY (id,companyId) NOT ENFORCED
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://ip:9200','connector.index' = 'myindex','connector.document-type' = '_doc','connector.username' = 'elastic','connector.password' = 'password123','update-mode' = 'upsert','connector.key-delimiter' = '$','connector.key-null-literal' = 'n/a','connector.failure-handler' = 'retry-rejected','connector.flush-on-checkpoint' = 'true','connector.bulk-flush.max-actions' = '10000','connector.bulk-flush.max-size' = '2 mb','connector.bulk-flush.interval' = '2000','connector.connection-max-retry-timeout' = '300','format.type' = 'json'
);INSERT INTO to_es
SELECT
concat(CAST(id as string),'-',CAST(mod(cid,100) AS VARCHAR)) as id, 
id tableid,
tablename,
cid,
gid,
content,
DATE_FORMAT(create_time, 'yyyy-MM-dd HH:mm:ss') as create_time
from from_mysql

这里主要注意字段类型
scan.startup.mode:“initial”(默认,同步历史数据),“latest-offset” 同步增量数据
最后insert可以加where,只同步需要的行数据

es配置

配置好mapping、setting和自己的分词器

使用自字义分词是因为字段中所有涉及的标点符号、空格等都可以来检索

PUT myindex-20230314/
{ "mappings": {"properties": {"id":{"type": "text"},"tableid":{"type": "long"},"cid":{"type": "long"},"gid":{"type": "text","analyzer": "my_analyzer"},"content":{"type": "text","analyzer": "my_analyzer"},"create_time" : {"type" : "keyword"}}},"settings": {"index":{"number_of_shards": "10","number_of_replicas": "1","refresh_interval" : "1s","translog": {"sync_interval": "30s","durability": "async"},"codec": "best_compression",   "analysis": {"analyzer": {"my_analyzer": {"tokenizer": "my_tokenizer","filter": ["lowercase"]}},"tokenizer": {"my_tokenizer": {"type": "ngram","min_gram": 1,"max_gram": 2,"token_chars": ["letter","digit","whitespace","punctuation","symbol"]}}}}}
}

使用别名,方便后续的维护

 POST /_aliases
{"actions": [{ "add":    { "index": "myindex-20230314", "alias": "myindex" }}]
}

之前测试的

  • canal单进程延迟越来越大,单独配置历史数据同步
  • go-mysql-elasticsearch经常报错重新同步
  • logstash同步100张分表不知道怎么配置

oceanus是收费的对于运维人员不足的情况,可以参考,有精力的可以考虑flink。

http://www.lryc.cn/news/540287.html

相关文章:

  • DeepSeek动画视频全攻略:从架构到本地部署
  • 第3章 3.3日志 .NET Core日志 NLog使用教程
  • R语言NIMBLE、Stan和INLA贝叶斯平滑及条件空间模型死亡率数据分析:提升疾病风险估计准确性...
  • Java 反射 (Reflection) 详解
  • 在 C++ 中,`QMessageBox_s::question_s2` 和 `app.question_s2` 的区别(由DS-V3生成)
  • vxe-grid 通过配置式给单元格字段格式化树结构数据,转换树结构节点
  • 大厂算法面试常见问题总结:高频考点与备战指南
  • 制造行业CRM选哪家?中大型企业CRM选型方案
  • PHP集成软件用哪个比较好?
  • 当pcie设备变化时centos是否会修改网络设备的名称(AI回答)
  • Mac arm架构使用 Yarn 全局安装 Vue CLI
  • 【Python游戏】双人简单对战游戏
  • Windows11切换回Windows10风格右键菜单
  • 怎么学习调试ISP的参数
  • “三次握手”与“四次挥手”:TCP传输控制协议连接过程
  • OpenCV形态学操作
  • 深入理解WebSocket接口:如何使用C++实现行情接口
  • 汇能感知的光谱相机/模块产品有哪些?
  • 抓包工具是什么?
  • Kubernetes的Ingress 资源是什么?
  • 【操作幂等和数据一致性】保障业务在MySQL和COS对象存储的一致
  • DevOps自动化部署详解:从理念到实践
  • LeetCodehot 力扣热题100
  • 解锁 AIoT 无限可能,乐鑫邀您共赴 Embedded World 2025
  • C# 背景 透明 抗锯齿 (效果完美)
  • Debezium:实时数据捕获与同步的利器
  • Word中接入大模型教程
  • Centos修改ip
  • uni-app小程序开发 基础知识2
  • 第4章 4.1 Entity Framework Core概述