当前位置: 首页 > news >正文

maxwell 基于zookeeper的高可用方案

Maxwell版本1.39.2

一: 添加zk的pox文件

<!-- customize HA -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.4.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.4.0</version>
</dependency>

二: 创建zk工具类

在 com.zendesk.maxwell.util 包下创建 CuratorUtil 类,后面会使用此类实现高可用

package com.zendesk.maxwell.util;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class CuratorUtil {private final String zookeeperServers;private final int sessionTimeoutMs;private final int connectionTimeoutMs;private final int baseSleepTimeMs;private final int maxRetries;private CuratorFramework client;public CuratorUtil(String zookeeperServers, int sessionTimeoutMs, int connectionTimeoutMs, int baseSleepTimeMs,int maxRetries) {this.zookeeperServers = zookeeperServers;this.sessionTimeoutMs = sessionTimeoutMs;this.connectionTimeoutMs = connectionTimeoutMs;this.baseSleepTimeMs = baseSleepTimeMs;this.maxRetries = maxRetries;}/** 构造 zookeeper 客户端,并连接 zookeeper 集群*/public void start() {ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);client = CuratorFrameworkFactory.newClient(this.zookeeperServers,this.sessionTimeoutMs,this.connectionTimeoutMs,retryPolicy);client.start();}/** 实现分布式锁*/public void highAvailable() {// 1.连接 Zookeeper 客户端this.start();// 2.向 zookeeper 注册自己String lockPath = "/maxwell/ha/lock";InterProcessMutex lock = new InterProcessMutex(client, lockPath);try {// 3.获取锁lock.acquire();// 4.将自己信息注册到 leader 路径String leaderPath = "/maxwell/ha/leader";client.create().withMode(CreateMode.EPHEMERAL).forPath(leaderPath);} catch (Exception e) {e.printStackTrace();}}
}

三: 修改 com.zendesk.maxwell 包下的MaxwellConfig类

3.1 添加属性

// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;

3.2 buildOptionParser 方法添加代码

parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" ).withRequiredArg();
parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" ).withRequiredArg();
parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" ).withRequiredArg();
parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" ).withRequiredArg();
parser.accepts( "max_retries", "max retry times" ).withRequiredArg();

3.3 setup 方法添加代码

this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode && zookeeperServers == null){LOGGER.error("you must specify --zookeeper because you want to use maxwell in ha mode");
}

四:修改 com.zendesk.maxwell.Maxwell 的main函数

将代码段

if ( config.haMode ) {new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
} else {maxwell.start();
}

全部注释掉,修改为

if ( config.haMode ) {CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);curatorUtil.highAvailable();
}
maxwell.start();

然后重新打包就能得到基于zk的高可用版本了,打包时可以将test包删除,防止出现错误

源码下载地址

五: 启动脚本

5.1 创建配置文件 config.properties

log_level=info

# mysql login info
host=localhost
port=3306
user=root
password=root123
schema_database=maxwell
# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello

producer=kafka

#       *** kafka ***
producer=kafka
#kafka.bootstrap.servers=hosta:9092,hostb:9092
kafka.bootstrap.servers=localhost:9092
kafka.max.request.size = 104857600

kafka_topic=mysql.%{database}.%{table}
kafka_version=2.7.0

# alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl

# hash function to use.  "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]

# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]

5.2 启动脚本编写 startup.sh

#!/bin/bash

single(){
  bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --daemon
  echo -e "\033[32m单机版启动成功\n\033[0m"
}

ha(){
  ## zookeeper 多个用,分割
  bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --ha --zookeeper=127.0.0.1:2181 --daemon
  echo -e "\033[32m高可用版启动成功\n\033[0m"
}

case "$1" in
  'ha')
     ha
     ;;
  *)
     single
     ;;
esac

5.2.1 高可用版本启动命令

./startup.sh ha

5.2.2 单机版启动命令

./startup.sh


 

http://www.lryc.cn/news/118357.html

相关文章:

  • 【JavaScript】match用法 | 正则匹配
  • 前端css + js +vue +element-ui 实现响应式布局,根据浏览器窗体大小自动响应
  • 小程序生成App:轻量低门槛的开发方式
  • Linux命名管道进程通信
  • 如何将苹果彻底删除视频找回?试试这3种方法
  • 【音视频、chatGpt】h5页面最小化后,再激活后视频停住问题的解决
  • [CSS] 图片九宫格
  • MChat-Gpt V1.0.0 (将ChatGpt机器人接入内网供全体使用)
  • 日常开发中Git命令指北
  • API 测试 | 了解 API 接口概念|电商平台 API 接口测试指南
  • 【计算机组成原理】24王道考研笔记——第三章 存储系统
  • 学习C语言的好处:
  • 基于k8s的devOps自动化运维平台架构设计(中英文版本)
  • P450进阶款无人机室内定位功能研测
  • 深度学习,计算机视觉任务
  • 使用 Docker 部署 canal 服务实现MySQL和ES实时同步
  • const易错详解
  • 网络安全—黑客技术【自学】
  • 作为数据产品经理的一天
  • Rust 编程小技巧摘选(7)
  • 爬虫程序中使用爬虫ip的优势
  • json-server的入门
  • uniapp调查问卷评价功能
  • Centos Linux带进度条复制(同步)文件和文件夹
  • 从数据仓库到数据结构:数据架构的演变之路
  • kafka-2.12使用记录
  • C++笔记之将定时器加入向量并设置定时器的ID为i
  • 将一组元素四舍五入到指定精度(小数位数)numpy.around()
  • tinyint这个值在MySQL中的值有哪些
  • JVM 内存结构