当前位置: 首页 > news >正文

使用maxwell实时同步mysql数据到kafka

一、软件环境:

操作系统:CentOS release 6.5 (Final)
java版本: jdk1.8
zookeeper版本: zookeeper-3.4.11
kafka 版本: kafka_2.11-1.1.0.tgz
maxwell版本:maxwell-1.16.0.tar.gz
注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口

二、环境部署

1、安装jdk

export JAVA_HOME=/usr/java/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$CLASSPATH

2、安装maven

参考:https://www.cnblogs.com/wcwen1990/p/7227278.html

3、安装zookeeper

1)下载软件:

wget http://101.96.8.157/archive.apache.org/dist/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz 
tar zxvf zookeeper-3.4.11.tar.gz  
mv zookeeper-3.4.11 /usr/local/zookeeper

2)修改环境变量

编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置:

## ZooKeeper Envexport ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

运行以下命令使环境变量生效: source /etc/profile

3)重命名配置文件

初次使用 ZooKeeper 时,需要将$ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg

mv  $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg

4)单机模式–修改配置文件
创建目录/usr/local/zookeeper/data 和/usr/local/zookeeper/logs 修改配置文件

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181

5)启动zookeeper

bin/zkServer.sh startZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

6)验证zukeeper服务

telnet chavin.king 2181Trying 192.168.72.130...
Connected to chavin.king.
Escape character is '^]'.
stat
Zookeeper version: 3.4.11-37e277162d567b55a07d1755f0b31c32e93c01a0, built on 11/01/2017 18:06 GMT
Clients:/192.168.72.130:44054[0](queued=0,recved=1,sent=0)Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x1a4
Mode: standalone
Node count: 147
Connection closed by foreign host.

4、安装zkui

git clone https://github.com/DeemOpen/zkui.git
cd zkui 
mvn clean install

修改配置文件默认值

#vim config.cfgserverPort=9090     #指定端口zkServer=192.168.1.110:2181sessionTimeout=300000

启动程序至后台
2.0-SNAPSHOT 会随软件的更新版本不同而不同,执行时请查看target 目录中真正生成的版本

nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar & 

用浏览器访问:

http://chavin.king:9090/

5、安装kafka

wget http://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -zxvf kafka_2.11-1.1.0.tgz -C /usr/local/kafkamkdir -p /usr/local/kafka/data-logs

修改配置文件

vim server.propertieslog.dirs=/usr/local/kafka/data-logs
zookeeper.connect=chavin.king:2181

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties &

创建topic

bin/kafka-topics.sh --create --zookeeper chavin.king:2181 --replication-factor 1 --partitions 1 --topic maxwell 

查看所有topic

bin/kafka-topics.sh --list --zookeeper chavin.king:2181

启动producer

bin/kafka-console-producer.sh --broker-list chavin.king:9092 --topic maxwell

启动consumer

bin/kafka-console-consumer.sh --zookeeper chavin.king:2181 --topic maxwell --from-beginning

或者

bin/kafka-console-consumer.sh --bootstrap-server chavin.king:9092  --from-beginning --topic maxwell

6、开启mysql binlog

 more /etc/my.cnf
[client]
default_character_set = utf8[mysqld]
basedir = /usr/local/mysql-5.6.24
datadir = /usr/local/mysql-5.6.24/data
port = 3306
#skip-grant-tables
character_set_server = utf8
log_error = /usr/local/mysql-5.6.24/data/mysql.errbinlog_format = row
log-bin = /usr/local/mysql-5.6.24/logs/mysql-bin
sync_binlog = 2
max_binlog_size = 16M
expire_logs_days = 10server_id = 1
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

7、安装maxwell

wget https://github.com/zendesk/maxwell/releases/download/v1.16.0/maxwell-1.16.0.tar.gz
tar -zxvf maxwell-1.16.0.tar.gz -C /usr/local/maxwell 

启动maxwell

nohup bin/maxwell --user='canal' --password='canal' --host='chavin.king' --producer=kafka --kafka.bootstrap.servers=chavin.king:9092 > maxwell.log &

8、开发kafka消费程序

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class KafkaTest {public static void main(String[] args){String topicName = "maxwell";String groupID = "example-group";Properties props = new Properties();props.put("bootstrap.servers","192.168.72.130:9092");props.put("group.id",groupID);props.put("auto.offset.reset","earliest");props.put("serializer.encoding","utf-8");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String > consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topicName));try{while(true){ConsumerRecords<String,String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());}}finally{consumer.close();}}}

ideal启动以上消费程序

9、测试

offset = 3428, key = {"database":"chavin","table":"dept","_uuid":"0b195622-e7c7-4cf6-8203-5576752f9024"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2276,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3429, key = {"database":"chavin","table":"dept","_uuid":"333b98e3-a597-47fc-95ad-6e59ee0dadf6"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2277,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3430, key = {"database":"chavin","table":"dept","_uuid":"cf9fa656-ed13-4cb0-b909-d1218e402e96"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2278,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3431, key = {"database":"chavin","table":"dept","_uuid":"7f2f683a-39bc-498b-9a4e-920697b3da18"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2279,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3432, key = {"database":"chavin","table":"dept","_uuid":"ef639cd1-9206-4145-8608-372bbaaaa14a"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2280,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3433, key = {"database":"chavin","table":"dept","_uuid":"ebdf15ad-7149-4ac4-b567-627dd910182c"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2281,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3434, key = {"database":"chavin","table":"dept","_uuid":"1bc667f4-15f0-438c-8139-6f1cbe8b4db3"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2282,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3435, key = {"database":"chavin","table":"dept","_uuid":"1613b695-284a-49e3-9793-74fb2cf8dc5b"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2283,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3436, key = {"database":"chavin","table":"dept","_uuid":"f72a800c-92cc-4494-9438-bc61c58b5cb9"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2284,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3437, key = {"database":"chavin","table":"dept","_uuid":"9887d144-d75d-46f8-96ba-ad7c3adf45fd"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2285,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}

至此数据同步已经可以正常进行了,是不是很简单。

http://www.lryc.cn/news/327960.html

相关文章:

  • 知识图谱与大数据:区别、联系与应用
  • Nagios工具
  • 微信小程序全局数据共享
  • 算法训练营第24天|回溯算法理论基础 LeetCode 77.组合
  • pip永久修改镜像地址
  • RK3588平台开发系列讲解(硬件篇-功能外设2)
  • SpringBoot学习记录
  • 财富池指标--通达信顾比均线实战指标免费源码
  • AJAX(一):初识AJAX、http协议、配置环境、发送AJAX请求、请求时的问题
  • idea常用的快捷键总结:
  • LeetCode 热题 100 题解(一):哈希部分
  • C语言 | qsort()函数使用
  • 继承的特点 | java
  • 6、jenkins项目构建类型-项目类型介绍
  • 指针函数的应用——找出哪些学生有不及格的科目
  • 【微服务】Gateway
  • 王道C语言督学营OJ课后习题(课时14)
  • Filter、Listener、AJAX
  • FastAPI+React全栈开发04 FastAPI概述
  • 基于单片机的二维码LCD显示控制设计
  • Ubuntu20.04下PCL安装,查看,卸载等操作
  • Android TargetSdkVersion 30 安装失败 resources.arsc 需要对齐且不压缩。
  • c++20中的jthread再谈
  • Fastgpt 无法启动或启动后无法正常使用的讨论(启动失败、用户未注册等问题这里)
  • Rust 实战练习 - 7. FFI, 库, ABI, libc
  • vue实现把Ox格式颜色值转换成rgb渐变颜色值(开箱即用)
  • Unity 窗口化设置
  • Android14之深入理解sp模板类(二百零二)
  • .NET core 5.0 及以上的Windows Service开发
  • Nginx配置文件解释