当前位置: 首页 > news >正文

springboot集成canal,将数据发送至接口

前置条件:

  1. mysql开启binlog日志
  2. 部署canal服务

Springboot代码:

接口工具类:

import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class HttpMethodUtil {@Asyncpublic int httpPost(String url,String Content) {// 添加请求头信息Map<String, String> heads = new HashMap<>();// 使用json发送请求,下面的是必须的heads.put("Content-Type", "application/json;charset=UTF-8");HttpResponse response = HttpRequest.post(url).headerMap(heads, false).body(Content).timeout(5 * 1000).execute();JSONObject jsonObject = JSON.parseObject(response.body());int code = (int) jsonObject.get("code");if (code!=0){response = HttpRequest.post(url).headerMap(heads, false).body(Content).timeout(5 * 1000).execute();}jsonObject = JSON.parseObject(response.body());code = (int) jsonObject.get("code");return code;}

canal工具类:

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class CanalUtil {@Value("${canal-monitor-mysql.hostname}")String canalMonitorHost;@Value("${canal-monitor-mysql.port}")Integer canalMonitorPort;@Value("${canal-monitor-mysql.database}")String canalMonitorDatabaseName;private final static int BATCH_SIZE = 100;@ResourceHttpMethodUtil httpMethodUtil;/*** 启动服务*/@PostConstructpublic void startMonitorSQL() {while (true) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", "");try {//打开连接connector.connect();log.info("数据库检测连接成功!" + canalMonitorDatabaseName);//订阅数据库表,全部表q// connector.subscribe(canalMonitorTableName + "\\..*");connector.subscribe(canalMonitorDatabaseName+ "\\.jyz_jyzqgdwa");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {} else {handleDATAChange(message.getEntries());}// 提交确认connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();log.error("成功断开监测连接!尝试重连");} finally {connector.disconnect();//防止频繁访问数据库链接: 线程睡眠 10秒try {Thread.sleep(10 * 1000);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 打印canal server解析binlog获得的实体类信息*/private void handleDATAChange(List<Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}//RowChange对象,包含了一行数据变化的所有特征CanalEntry.RowChange rowChage;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChage.getEventType();//打印Header信息log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChage.getIsDdl()) {log.info("================》;isDdl: true,sql:{}", rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {log.info(">>>>>>>>>> 删除 >>>>>>>>>>");log.info("DELETE:{}", rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {log.info(">>>>>>>>>> 新增 >>>>>>>>>>");JSONObject json = new JSONObject();for (CanalEntry.Column column : rowData.getAfterColumnsList()) {json.put(column.getName(), column.getValue());}log.info(json.toJSONString());String url = "http://192.168.21.11:8000/api/wirte";int code = httpMethodUtil.httpPost(url,json.toJSONString());if(code==0){log.info(json.toJSONString());}else {log.error(json.toJSONString());}//如果是更新的语句} else {log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据log.info("------->; before");JSONObject json = new JSONObject();for (CanalEntry.Column column : rowData.getAfterColumnsList()) {json.put(column.getName(), column.getValue());}log.info(json.toJSONString());String url = "http://192.168.21.11:8000/api/wirte";int code = httpMethodUtil.httpPost(url,json.toJSONString());if(code==0){log.info(json.toJSONString());}else {log.error(json.toJSONString());}}}}}}

application.properties配置文件:

canal-monitor-mysql.hostname: 192.168.21.11
canal-monitor-mysql.port: 11111
canal-monitor-mysql.database: datbases_namelogging.file.name=./log/rizhi.log         
logging.pattern.dateformat=yyyy-MM-dd # 设置日期格式化
logging.logback.rollingpolicy.max-history=30

http://www.lryc.cn/news/205572.html

相关文章:

  • Selenum八种常用定位(案例解析)
  • Web前端接入Microsoft Azure AI文本翻译
  • 容联七陌助力鱼跃医疗升级智能联络中心,让客户服务更“鱼跃”
  • 【Redis系列】在Centos7上安装Redis5.0保姆级教程!
  • 线性代数-Python-03:矩阵的变换 - 手写Matrix Transformation及numpy中的用法
  • 【单片机基础】按键状态机实现短按、长按、双击、三击和N击
  • Ubuntu虚拟机部署OpenStack
  • ES在企业项目中的实战总结,彻底掌握ES的使用
  • QT的Qporcess功能的使用
  • 【图灵诸葛】jvm笔记
  • 数据安全小课堂开讲啦!看这里!
  • 单片机矩阵键盘
  • 横坐标日期等间隔绘图 python示例代码
  • photoshop2024免费插件Portraiture3
  • NewStarCTF2023week4-More Fast(GC回收)
  • 和鲸赞助丨第16届中国R会议暨2023 X-AGI大会通知
  • Python第三方库 - Flask(python web框架)
  • c# sqlite 修改字段类型
  • [Pytorch] 保存模型与加载模型
  • AES解密报错,Input length must be multiple of 16 when decrypting with padded cipher
  • 电子学会C/C++编程等级考试2023年05月(三级)真题解析
  • 【2023_10_21_计算机热点知识分享】:机器学习中的神经网络
  • app开发者提升第四季度广告收入的方法
  • #电子电器架构 —— 车载网关初入门
  • 系统工程利用计算机作为工具
  • MathType7.4绿色和谐版数学公式编辑器
  • JAVA代码审计-纵向越权漏洞分析
  • 【PG】PostgreSQL逻辑备份(pg_dump)
  • JVM、JRE、JDK
  • Latex 插入矢量图