当前位置: 首页 > news >正文

Logstash 多表增量同步 MySQL 到 Elasticsearch:支持逻辑删除与热加载,Docker 快速部署实战

1. 项目结构

install-elk/
├── start-elastic.sh
├── es-data/                            # Elasticsearch 持久化目录(自动创建)
├── logstash/├── logstash.yml├── pipeline/│   ├── user.conf│   ├── articles.conf│   └── ...          #可以有更多的表├── config/│   └── pipelines.yml├── ext/│   └── mysql-connector-j-9.2.0.jar  # 通过 https://downloads.mysql.com/archives/c-j/ 获取对应的版本。

1.1 本案例项目结构

在这里插入图片描述

1.2 start-elastic.sh

#!/bin/bash# ================== 🎨 Logo 开场 ==================
RED='\033[1;31m'
CYAN='\033[1;36m'
YELLOW='\033[1;33m'
RESET='\033[0m'echo -e "${CYAN}"
cat << "EOF"_____                      _ _ _/ ____|                    | (_) || (___   ___  ___ _ __ _ __| |_| |_ ___ _ __\___ \ / _ \/ __| '__| '__| | | __/ _ \ '__|____) |  __/ (__| |  | |  | | | ||  __/ ||_____/ \___|\___|_|  |_|  |_|_|\__\___|_|EOF
echo -e "${YELLOW}             🚀 Welcome to the someliber Elastic Stack 🚀${RESET}"
echo# ==================  获取版本号 ==================
read -p "请输入要使用的版本号(默认: 8.17.3): " VERSION_INPUT
VERSION=${VERSION_INPUT:-8.17.3}ES_IMAGE="elasticsearch:$VERSION"
KIBANA_IMAGE="kibana:$VERSION"
LOGSTASH_IMAGE="logstash:$VERSION"# ==================  获取密码 ==================
read -s -p "请输入 Elasticsearch 登录密码(默认: 123456): " PASSWORD_INPUT
echo
ES_PASSWORD=${PASSWORD_INPUT:-123456}
KIBANA_USER=kibana_user
KIBANA_PASS=someliber# ==================  是否挂载 ES 数据目录 ==================
read -p "是否挂载 Elasticsearch 数据目录?(y/n 默认 y): " USE_VOLUME
USE_VOLUME=${USE_VOLUME:-y}# ==================  基本变量 ==================
ES_CONTAINER_NAME=es11
KIBANA_CONTAINER_NAME=kibana11
LOGSTASH_CONTAINER_NAME=logstash11
NETWORK_NAME=elastic11
LOGSTASH_DIR="$PWD/logstash"
# ==================  创建网络 ==================
docker network create $NETWORK_NAME >/dev/null 2>&1 || echo "🔗 网络已存在:$NETWORK_NAME"# ==================  启动 Elasticsearch ==================
echo " 启动 Elasticsearch..."
if [[ "$USE_VOLUME" == "y" || "$USE_VOLUME" == "Y" ]]; thenmkdir -p ./es-datadocker run -d --name $ES_CONTAINER_NAME \--network $NETWORK_NAME \-p 9200:9200 -p 9300:9300 \-e "discovery.type=single-node" \-e "xpack.security.enabled=true" \-e "ELASTIC_PASSWORD=$ES_PASSWORD" \-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \-v "$PWD/es-data:/usr/share/elasticsearch/data" \$ES_IMAGE
elsedocker run -d --name $ES_CONTAINER_NAME \--network $NETWORK_NAME \-p 9200:9200 -p 9300:9300 \-e "discovery.type=single-node" \-e "xpack.security.enabled=true" \-e "ELASTIC_PASSWORD=$ES_PASSWORD" \-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \$ES_IMAGE
fi# ==================  等待 ES 启动 ==================
echo " 正在等待 Elasticsearch 启动..."
for i in {1..60}; doSTATUS=$(curl -s -u elastic:$ES_PASSWORD http://localhost:9200/_cluster/health | grep -o '"status":"[^"]\+"' || true)if [[ $STATUS == *"green"* || $STATUS == *"yellow"* ]]; thenecho " Elasticsearch 启动成功:$STATUS"breakfisleep 2
done# ==================  创建 Kibana 用户 ==================
echo " 创建 Kibana 用户($KIBANA_USER)..."
docker exec $ES_CONTAINER_NAME bash -c "bin/elasticsearch-users useradd $KIBANA_USER -p $KIBANA_PASS -r kibana_system" || \echo " 用户可能已存在,忽略错误"# ==================  启动 Kibana ==================
echo " 启动 Kibana..."
docker run -d --name $KIBANA_CONTAINER_NAME \--network $NETWORK_NAME \-p 5601:5601 \-e "ELASTICSEARCH_HOSTS=http://$ES_CONTAINER_NAME:9200" \-e "ELASTICSEARCH_USERNAME=$KIBANA_USER" \-e "ELASTICSEARCH_PASSWORD=$KIBANA_PASS" \-e "XPACK_ENCRYPTEDSAVEDOBJECTS_ENCRYPTIONKEY=2kR9HmNaesytcVDwEAK3uTQ1obCrvP7B" \-e "XPACK_REPORTING_ENCRYPTIONKEY=aSTr3J7sLgt2BCKbIyw0DE6OjZGMY1kX" \-e "XPACK_SECURITY_ENCRYPTIONKEY=WO6Xetyubr45ZonlLd32DfNmRTkcAhvp" \-e "I18N_LOCALE=zh-CN" \$KIBANA_IMAGE# ==================  启动 Logstash(全目录挂载) ==================
if [[ -d "$LOGSTASH_DIR" ]]; thenecho " 启动 Logstash(全目录挂载)..."docker run -d --name $LOGSTASH_CONTAINER_NAME \--network $NETWORK_NAME \-p 5044:5044 -p 9600:9600 -p 5000:5000 \-e "xpack.monitoring.elasticsearch.hosts=http://$ES_CONTAINER_NAME:9200" \-e "xpack.monitoring.elasticsearch.username=elastic" \-e "xpack.monitoring.elasticsearch.password=$ES_PASSWORD" \-v "$PWD/logstash/pipeline:/usr/share/logstash/pipeline" \-v "$PWD/logstash/ext:/usr/share/logstash/ext" \-v "$PWD/logstash/config:/usr/share/logstash/config" \$LOGSTASH_IMAGEecho " Logstash 启动完成(使用 logstash/ 目录)"
elseecho " 未找到 logstash/ 目录,Logstash 未启动"
fi# ==================  提示 ==================
echo
echo -e "${CYAN} 所有容器启动完成!${RESET}"
echo -e "${YELLOW} Elasticsearch: http://localhost:9200${RESET}"
echo -e "${YELLOW} Kibana:        http://localhost:5601${RESET}"

1.3 logstash/pipeline/user.conf

# ====================== 输入阶段:从 MySQL 中读取数据 ======================
input {jdbc {# MySQL JDBC 连接串jdbc_connection_string => "jdbc:mysql://192.168.167.175:3306/test"# 数据库账号和密码jdbc_user => "root"jdbc_password => "123456"# MySQL JDBC 驱动的路径(需要确保已经挂载进容器)jdbc_driver_library => "/usr/share/logstash/ext/mysql-connector-j-9.2.0.jar"# JDBC 驱动类名称jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_paging_enabled => truejdbc_page_size => 10000# ========== 增量同步设置 ==========# 启用基于字段的增量同步use_column_value => true# 增量字段:用来判断是否有更新tracking_column => "updated_at"tracking_column_type => "timestamp"record_last_run => true# 记录上次同步时间的文件(容器中可写路径)last_run_metadata_path => "/usr/share/logstash/last_run_metadata/users_last_run.yml"# 执行的 SQL 语句:查所有新增、修改、逻辑删除的数据statement => "SELECTid,username,gender,email,age,created_at,updated_at,is_deletedFROM usersWHERE updated_at > :sql_last_valueORDER BY updated_at ASC, id ASC"# 每分钟执行一次同步任务(Cron 格式)schedule => "* * * * *"}
}# ====================== 过滤器阶段:处理数据逻辑 ======================
filter {# 如果 is_deleted = 1,则为“逻辑删除”,我们在元数据中标记 action 为 deleteif [is_deleted] and [is_deleted] in [1, "1", true, "true"] {mutate {add_field => { "[@metadata][action]" => "delete" }}} else {# 否则为“新增或更新”操作,标记 action 为 indexmutate {add_field => { "[@metadata][action]" => "index" }}}# 移除不希望写入 ES 的字段(如删除标记)mutate {remove_field => ["is_deleted"]}
}# ====================== 输出阶段:写入到 Elasticsearch ======================
output {elasticsearch {# Elasticsearch 地址hosts => ["http://es11:9200"]# 索引名称为 usersindex => "users"# ES 用户名和密码(需开启身份验证)user => "elastic"password => "123456"# 文档 ID 使用 MySQL 的主键 ID,避免重复写入document_id => "%{id}"# 根据前面 filter 设置的 action 决定是 index 还是 delete 操作action => "%{[@metadata][action]}"}# 控制台输出(调试用)stdout {codec => json_lines}
}

1.3 logstash/pipeline/articles.conf

# ====================== 输入阶段:从 MySQL 中读取文章数据 ======================
input {jdbc {# MySQL JDBC 连接串jdbc_connection_string => "jdbc:mysql://192.168.167.175:3306/test"# 数据库账号和密码jdbc_user => "root"jdbc_password => "123456"# MySQL JDBC 驱动的路径(需确保挂载到容器)jdbc_driver_library => "/usr/share/logstash/ext/mysql-connector-j-9.2.0.jar"# JDBC 驱动类名称jdbc_driver_class => "com.mysql.cj.jdbc.Driver"# 启用分页拉取数据,提高大数据量时性能jdbc_paging_enabled => truejdbc_page_size => 10000# ========== 增量同步设置 ==========use_column_value => true                 # 启用字段值来判断是否更新tracking_column => "updated_at"         # 使用 updated_at 字段做增量对比tracking_column_type => "timestamp"record_last_run => truelast_run_metadata_path => "/usr/share/logstash/last_run_metadata/articles_last_run.yml"# SQL 查询语句,排除敏感字段,按更新时间 & ID 顺序拉取statement => "SELECTid,title,content,author_id,category,created_at,updated_at,is_deletedFROM articlesWHERE updated_at > :sql_last_valueORDER BY updated_at ASC, id ASC"# 同步频率:每分钟执行一次schedule => "* * * * *"}
}# ====================== 过滤器阶段:逻辑删除与字段处理 ======================
filter {# 如果 is_deleted = 1/true,标记 action 为 delete(逻辑删除)if [is_deleted] and [is_deleted] in [1, "1", true, "true"] {mutate {add_field => { "[@metadata][action]" => "delete" }}} else {# 否则表示新增或更新mutate {add_field => { "[@metadata][action]" => "index" }}}# 移除不希望写入 ES 的字段(如删除标记)mutate {remove_field => ["is_deleted"]}
}# ====================== 输出阶段:写入 Elasticsearch ======================
output {elasticsearch {hosts => ["http://es11:9200"]index => "articles"user => "elastic"password => "123456"document_id => "%{id}"                   # 使用文章主键作为文档 IDaction => "%{[@metadata][action]}"       # 动态执行 index 或 delete}# 控制台调试输出(开发期使用)stdout {codec => json_lines}
}

1.4 logstash/config/pipelines.yml

# 定义第一个 pipeline(数据同步管道)
- pipeline.id: users                     # 唯一标识这个 pipeline 的 ID,日志中会看到 users 相关的信息path.config: "/usr/share/logstash/pipeline/user.conf"# 指定该 pipeline 使用的配置文件路径,里面写的是 input/filter/output# 定义第二个 pipeline
- pipeline.id: articles                    # 另一个独立的 pipeline ID,处理 articles 表的同步path.config: "/usr/share/logstash/pipeline/articles.conf"# 指定第二个 pipeline 的配置文件路径

1.5 logstash/config/logstash.yml

# ================================
#  Logstash HTTP API 接口配置
# ================================api.http.host: 0.0.0.0             # Logstash API 绑定的主机地址,0.0.0.0 表示所有网卡
api.http.port: 9600                # Logstash HTTP API 的端口号,默认是 9600# ================================
#  配置自动加载(热加载)
# ================================config.reload.automatic: true      # 开启配置文件自动热加载,无需重启即可更新 pipeline
config.reload.interval: 5s         # 配置检查的时间间隔(单位:秒)# ================================
#  Pipeline 性能调优参数
# ================================pipeline.batch.delay: 50           # 每个批次处理之间的最大等待时间(毫秒),默认 50ms
pipeline.batch.size: 125           # 每个批次处理的最大事件数量
pipeline.workers: 2                # 每个 pipeline 使用的 worker 线程数,建议和 CPU 核心数保持一致# ================================
#  X-Pack 监控配置
# ================================xpack.monitoring.elasticsearch.hosts: ${xpack.monitoring.elasticsearch.hosts}       # Elasticsearch 监控地址,从环境变量读取
xpack.monitoring.elasticsearch.username: ${xpack.monitoring.elasticsearch.username} # ES 认证用户名,从环境变量读取
xpack.monitoring.elasticsearch.password: ${xpack.monitoring.elasticsearch.password} # ES 认证密码,从环境变量读取

1.6 启动 start-elastic.sh

chmod +x start-elastic.sh && ./start-elastic.sh

1.7 数据库相关

1.7.1 用户信息表

CREATE TABLE users (id INT PRIMARY KEY AUTO_INCREMENT COMMENT '用户唯一标识',username VARCHAR(100) NOT NULL COMMENT '用户名',password VARCHAR(255) NOT NULL COMMENT '加密后的用户密码',gender ENUM('male', 'female', 'other') DEFAULT 'other' COMMENT '性别',email VARCHAR(255) COMMENT '用户邮箱',age INT COMMENT '年龄',created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '用户创建时间',updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',is_deleted BOOLEAN DEFAULT FALSE COMMENT '逻辑删除标记,TRUE 表示已删除'
) COMMENT='用户信息表';

1.7.2 文章信息表

CREATE TABLE articles (id INT PRIMARY KEY AUTO_INCREMENT COMMENT '文章唯一标识',title VARCHAR(255) NOT NULL COMMENT '文章标题',content TEXT COMMENT '文章内容正文',author_id INT NOT NULL COMMENT '作者ID,对应 users 表 id',category VARCHAR(100) COMMENT '文章分类',published_at DATETIME COMMENT '发布时间',created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '文章创建时间',updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',is_deleted BOOLEAN DEFAULT FALSE COMMENT '逻辑删除标记,TRUE 表示已删除'
) COMMENT='文章信息表';

1.7.3 测试数据

INSERT INTO users (username, password, gender, email, age, is_deleted)
VALUES 
('Alice',   'hashed_password_1', 'female', 'alice@example.com', 28, FALSE),
('Bob',     'hashed_password_2', 'male',   'bob@example.com',   35, FALSE),
('Charlie', 'hashed_password_3', 'male',   'charlie@example.com', 22, FALSE),
('Diana',   'hashed_password_4', 'female', 'diana@example.com', 40, TRUE),  
('Eve',     'hashed_password_5', 'other',  'eve@example.com',    30, FALSE);INSERT INTO articles (title, content, author_id, category, published_at, is_deleted)
VALUES 
('Elasticsearch 入门', '这是关于 Elasticsearch 的基础教程内容。', 1, '技术', NOW(), FALSE),
('MySQL 优化实践', '介绍常见的 MySQL 优化技巧。', 2, '数据库', NOW(), FALSE),
('Logstash 配置指南', '如何配置 Logstash 管道同步数据。', 1, '日志系统', NOW(), FALSE),
('前端与后端的区别', '讲解 Web 开发中的前后端职责。', 3, 'Web开发', NULL, FALSE),
('已删除的文章示例', '这篇文章已被逻辑删除。', 4, '历史', NOW(), TRUE);  

1.8 测试验证过程

1.8.1 启动start-elastic.sh

在这里插入图片描述

1.8.2 验证索引是否创建成功

在这里插入图片描述

1.8.3 进行删除测试

1.8.3.1 删除前

在这里插入图片描述

1.8.3.2 逻辑删除Alice用户

在这里插入图片描述

1.8.3.3 查看结果

说明:等待一分钟,可以看到users索引下的文档数量由4->3,即文档从ES中删除。

在这里插入图片描述

1.8.4 进行修改测试

1.8.4.1 修改前

在这里插入图片描述

1.8.4.2 修改Bob为Bob123

在这里插入图片描述

1.8.4.3 查看修改结果

说明:等待一分钟,数据更新同步到ES文档。

在这里插入图片描述

2 总结

通过 LogstashMySQL 中同步多个业务表(如 usersarticles),并实现以下能力:

  • 增量同步:基于 updated_at 字段,避免全量拉取。
  • 逻辑删除同步:使用 is_deleted 字段自动触发 ES 删除操作。
  • 定时同步:每分钟调度,适用于数据近实时场景。
  • 多表配置隔离:每个表对应一个独立的 pipeline,实现清晰可维护结构。
  • 全目录挂载:Logstash 配置、驱动、Pipeline 统一挂载,便于统一管理和部署。
http://www.lryc.cn/news/599696.html

相关文章:

  • Qt 状态机框架:复杂交互逻辑的处理
  • uniapp之微信小程序标题对其右上角按钮胶囊
  • Vue3中的标签 ref 与 defineExpose:模板引用与组件暴露
  • 【Linux网络编程】传输层协议 - TCP
  • 图论水题日记
  • Qt 网络编程进阶:WebSocket 通信
  • vue - 使用canvas绘制验证码
  • 容器化环境下的服务器性能瓶颈与优化策略
  • Linux 中 `chown`、`chgrp` 和 `chmod` 命令详解
  • MRDIMM对服务器总体拥有成本(TCO)影响的系统性分析
  • openmv识别数字
  • 8.异常处理:优雅地处理错误
  • java面试题(二)
  • Java面试题及详细答案120道之(001-020)
  • [Rust 基础课程]猜数字游戏-获取用户输入并打印
  • 不用电脑要不要关机?
  • 【Spring WebFlux】为什么 Spring 要拥抱响应式
  • OT82111_VC1:USB OTG音频解码器固件技术解析
  • c++注意点(12)----设计模式(生成器)
  • Promise的allSettled,all,race
  • 智能网关:物联网时代的核心枢纽
  • 在OpenMP中,#pragma omp的使用
  • 【Linux/Ubuntu】VIM指令大全
  • 如何搭建Linux环境下的flink本地集群
  • 为什么选择EasyGBS?
  • 《解锁前端数据持久化与高效查询:IndexedDB深度剖析》
  • vue3单页面连接多个websocket并实现断线重连功能
  • TDengine 转化函数 TO_TIMESTAMP 用户手册
  • 比特币技术简史 第八章:扩展性解决方案 - 闪电网络与隔离见证
  • 软件工程:软件需求