前置条件:
- mysql开启binlog日志
- 部署canal服务
Springboot代码:
接口工具类:
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class HttpMethodUtil {@Asyncpublic int httpPost(String url,String Content) {// 添加请求头信息Map<String, String> heads = new HashMap<>();// 使用json发送请求,下面的是必须的heads.put("Content-Type", "application/json;charset=UTF-8");HttpResponse response = HttpRequest.post(url).headerMap(heads, false).body(Content).timeout(5 * 1000).execute();JSONObject jsonObject = JSON.parseObject(response.body());int code = (int) jsonObject.get("code");if (code!=0){response = HttpRequest.post(url).headerMap(heads, false).body(Content).timeout(5 * 1000).execute();}jsonObject = JSON.parseObject(response.body());code = (int) jsonObject.get("code");return code;}
canal工具类:
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class CanalUtil {@Value("${canal-monitor-mysql.hostname}")String canalMonitorHost;@Value("${canal-monitor-mysql.port}")Integer canalMonitorPort;@Value("${canal-monitor-mysql.database}")String canalMonitorDatabaseName;private final static int BATCH_SIZE = 100;@ResourceHttpMethodUtil httpMethodUtil;/*** 启动服务*/@PostConstructpublic void startMonitorSQL() {while (true) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", "");try {//打开连接connector.connect();log.info("数据库检测连接成功!" + canalMonitorDatabaseName);//订阅数据库表,全部表q// connector.subscribe(canalMonitorTableName + "\\..*");connector.subscribe(canalMonitorDatabaseName+ "\\.jyz_jyzqgdwa");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {} else {handleDATAChange(message.getEntries());}// 提交确认connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();log.error("成功断开监测连接!尝试重连");} finally {connector.disconnect();//防止频繁访问数据库链接: 线程睡眠 10秒try {Thread.sleep(10 * 1000);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 打印canal server解析binlog获得的实体类信息*/private void handleDATAChange(List<Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}//RowChange对象,包含了一行数据变化的所有特征CanalEntry.RowChange rowChage;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChage.getEventType();//打印Header信息log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChage.getIsDdl()) {log.info("================》;isDdl: true,sql:{}", rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {log.info(">>>>>>>>>> 删除 >>>>>>>>>>");log.info("DELETE:{}", rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {log.info(">>>>>>>>>> 新增 >>>>>>>>>>");JSONObject json = new JSONObject();for (CanalEntry.Column column : rowData.getAfterColumnsList()) {json.put(column.getName(), column.getValue());}log.info(json.toJSONString());String url = "http://192.168.21.11:8000/api/wirte";int code = httpMethodUtil.httpPost(url,json.toJSONString());if(code==0){log.info(json.toJSONString());}else {log.error(json.toJSONString());}//如果是更新的语句} else {log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据log.info("------->; before");JSONObject json = new JSONObject();for (CanalEntry.Column column : rowData.getAfterColumnsList()) {json.put(column.getName(), column.getValue());}log.info(json.toJSONString());String url = "http://192.168.21.11:8000/api/wirte";int code = httpMethodUtil.httpPost(url,json.toJSONString());if(code==0){log.info(json.toJSONString());}else {log.error(json.toJSONString());}}}}}}
application.properties配置文件:
canal-monitor-mysql.hostname: 192.168.21.11
canal-monitor-mysql.port: 11111
canal-monitor-mysql.database: datbases_namelogging.file.name=./log/rizhi.log
logging.pattern.dateformat=yyyy-MM-dd # 设置日期格式化
logging.logback.rollingpolicy.max-history=30