当前位置: 首页 > news >正文

使用 Maxwell 和 RabbitMQ 监控 Mysql Flowable 表变更

为什么需要监控数据库变化?

当 Flowable 表中的数据发生变化(例如插入新任务、更新状态或删除记录),我们可能需要触发其他操作,比如通知用户、更新仪表盘或启动新流程。Maxwell 可以读取 MySQL 的二进制日志(binlog),将变化事件以 JSON 格式发送到 RabbitMQ 消息队列,供其他系统消费。

准备工作

你需要以下组件:

  • MySQL:运行 Flowable 的数据库,需启用二进制日志(binlog)。

  • RabbitMQ:消息队列,用于接收 Maxwell 发送的事件。

  • Maxwell:读取 MySQL binlog 并发送事件到 RabbitMQ。

  • Flowable:已部署,数据库中有需要监控的表(例如 flowable.ACT_RU_TASK)。

步骤 1:配置 MySQL

确保 MySQL 启用了二进制日志。以下是一个简单的 Docker Compose 配置:

mysql:image: mysql:8.0container_name: mysqlports:- "3306:3306"volumes:- mysql_data:/var/lib/mysqlcommand:- "--log_bin=mysql-bin"- "--server_id=1"- "--binlog_format=ROW"environment:MYSQL_ROOT_PASSWORD: rootMYSQL_DATABASE: flowableMYSQL_USER: maxwellMYSQL_PASSWORD: maxwell_password
  • log_bin=mysql-bin:启用二进制日志。

  • server_id=1:设置唯一服务器 ID。

  • binlog_format=ROW:使用 ROW 格式,适合 Maxwell 解析。

为 Maxwell 创建用户并授予权限:

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell_password';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';

步骤 2:启动 RabbitMQ

使用 Docker 运行 RabbitMQ:

rabbitmq:image: rabbitmq:3-managementcontainer_name: rabbitmqports:- "5672:5672"- "15672:15672"environment:RABBITMQ_DEFAULT_USER: guestRABBITMQ_DEFAULT_PASS: guest

访问 http://localhost:15672(默认用户/密码:guest/guest)检查 RabbitMQ 是否正常运行。

步骤 3:配置 Maxwell

Maxwell 读取 MySQL 的 binlog 并发送事件到 RabbitMQ。创建一个配置文件 maxwell.properties:

log_level=INFO
host=mysql
port=3306
user=maxwell
password=maxwell_password
producer=rabbitmq
rabbitmq_host=rabbitmq
rabbitmq_port=5672
rabbitmq_user=guest
rabbitmq_pass=guest
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=topic
rabbitmq_routing_key_template=%database%.%table%
filter=exclude: *.*, include: flowable.*
replica_server_id=64
  • filter:只监控 flowable 数据库的表。

  • rabbitmq_exchange:事件发送到 maxwell 交换机。

运行 Maxwell:

docker run -d --name maxwell \-v $(pwd)/maxwell.properties:/config.properties \--network=host \zendesk/maxwell:latest \bin/maxwell --config=/config.properties

注意:确保 Maxwell、MySQL 和 RabbitMQ 在同一网络(例如使用 --network=host 或自定义 Docker 网络)。

步骤 4:测试监控

  1. 在 Flowable 数据库中插入一条任务记录:

    INSERT INTO flowable.ACT_RU_TASK (ID_, NAME_, PRIORITY_, CREATE_TIME_)
    VALUES ('task123', 'Test Task', 50, NOW());
  2. Maxwell 将捕获变更并发送 JSON 事件到 RabbitMQ,例如:

    {"database": "flowable","table": "ACT_RU_TASK","type": "insert","data": {"id_": "task123","name_": "Test Task","priority_": 50,"create_time_": "2025-08-06 17:03:00"}
    }
  3. 在 RabbitMQ 管理界面(http://localhost:15672):

    • 创建一个队列(例如 flowable_queue)。

    • 绑定到 maxwell 交换机,路由键为 flowable.*。

    • 检查队列中的消息。

步骤 5:消费事件

开发一个简单的消费者来处理 RabbitMQ 的事件。以下是一个 Python 示例:

import pika
import jsonconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='flowable_queue')
channel.queue_bind(exchange='maxwell', queue='flowable_queue', routing_key='flowable.*')def callback(ch, method, properties, body):event = json.loads(body)print(f"Received event: {event}")if event['table'] == 'ACT_RU_TASK' and event['type'] == 'insert':print(f"New task created: ID={event['data']['id_']}, Name={event['data']['name_']}")channel.basic_consume(queue='flowable_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()

保存为 consumer.py 并运行:

pip install pika
python consumer.py

当插入新任务时,消费者会打印类似以下输出:

Received event: {'database': 'flowable', 'table': 'ACT_RU_TASK', 'type': 'insert', ...}
New task created: ID=task123, Name=Test Task

总结

通过 Maxwell 和 RabbitMQ,我们可以轻松监控 Flowable 表的变更,并将事件发送到消息队列供其他系统使用。这个方案简单高效,适合实时数据处理场景。下一步,你可以:

  • 优化 Maxwell 过滤规则,只监控特定表(如 ACT_RU_TASK)。

  • 将事件集成到 Flowable 流程,触发自动化任务。

  • 使用 Prometheus 监控 RabbitMQ 队列性能。

http://www.lryc.cn/news/611813.html

相关文章:

  • 医学影像PACS系统的设计与实现,PACS系统源码
  • LMS/NLMS最小均值算法:双麦克风降噪
  • python中的推导式
  • YOLOv5 上使用 **labelImg** 标注并训练自己的数据集
  • PyTorch生成式人工智能——Hugging Face环境配置与应用详解
  • 【32】C++实战篇—— m行n列的坐标点,求每行相邻点X差值dX,每列相邻点y差值dY,并以矩阵形式左端对齐
  • 远程连接----ubuntu ,rocky 等Linux系统,WindTerm_2.7.0
  • Spring选择哪种方式代理?
  • 阿里云DMS Data Copilot——高效智能的数据助手,助力企业实现数据驱动的未来
  • 深入理解 Maven POM 文件:核心配置详解
  • Jenkinsfile各指令详解
  • Java学习第一百零九部分——Jenkins(一)
  • 基于通用优化软件GAMS的数学建模和优化分析
  • AlphaEarth模型架构梳理及借鉴哪些深度学习领域方面的思想
  • React:受控组件和非受控组件
  • WebStorm转VSCode:高效迁移指南
  • 前端开发_怎么禁止用户复制内容
  • vue3 el-dialog自定义实现拖拽、限制视口范围增加了拖拽位置持久化的功能
  • 【前端开发】三. JS运算符
  • 2.6 sync
  • vue3 find 数组查找方法
  • JSON巴巴 - 专业JSON格式化工具:让任何JSON都能完美格式化
  • Excel将整列值转换为字符串
  • Git 乱码文件处理全流程指南
  • 通过最严时序标准,再登产业图谱榜首,TDengine 时序数据库在可信数据库大会荣获双荣誉
  • Apache Flink 的详细介绍
  • 时序数据库的发展现状与未来趋势
  • Excel单元格设置下拉框、选项背景
  • 【OSCP】- Monitoring 靶场学习(Proving Grounds Play)
  • SpringBoot 整合Langchain4j 对接主流大模型实战详解