zookeeper Curator(2):Curator的节点操作
文章目录
- zookeeper 的安装
- Curator 介绍
- Curator API 常用操作
- 本章必要的相关依赖和配置
- 建立连接
- 第一种方式
- 第二种方式
- 关闭连接
- 添加节点
- 创建节点
- 创建节点并设置值和类型
- 创建多级节点
- 查询节点
- 查询数据
- 查询所有子节点
- 查询节点信息
- 修改节点
- 修改节点(乐观锁修改,根据版本号)
- 删除节点
- 删除节点
- 删除带有子节点的节点
- 必须删除成功
- 删除后回调
本章代码已分享至Gitee: https://gitee.com/lengcz/curator01
zookeeper 的安装
关于zookeeper的安装请见: dubbo(2):zookeeper和dubbo-admin的安装
Curator 介绍
Curator 是Apache Zookeeper 的java 客户端库
常见的Zookeeper Java API
- 原生Java API
- ZkClient
- Curator
Curator 项目的目标是简化Zookeeper 客户端的使用。
Curator 最初是Netfix 研发的,后来捐献给了Apache基金会,目前属于 Apache顶级项目。
Curator 官网: https://curator.apache.org/
Curator API 常用操作
- 建立连接
- 添加节点
- 删除节点
- 修改节点
- 查询节点
- Watch事件监听
- 分布式锁实现
本章必要的相关依赖和配置
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.10</version><scope>test</scope></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.21</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.57</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
建立连接
第一种方式
//1 第一种方式/** String connectString, 连接字符串 zk server 地址和端口 ip1:port1,ip2:port2....* int sessionTimeoutMs, 会话超时时间 单位ms* int connectionTimeoutMs, 连接超时时间,单位ms* RetryPolicy retryPolicy 重拾策略*/RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);String connectString = "localhost:2181";CuratorFramework client =CuratorFrameworkFactory.newClient(connectString, 60_000, 15_000, retryPolicy);client.start();
第二种方式
//2 第二种方式,链式编程CuratorFramework client2 = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(60_000).connectionTimeoutMs(15_000).retryPolicy(retryPolicy).namespace("demo01").build();client2.start();
namespace表示根节点,表示这个客户端的操作都在这个节点之下,后续使用中不需要从根开始声明。
- 节点不需要手动创建,连接是会自动创建
关闭连接
client.close();
添加节点
创建节点
@Testpublic void testCreate() throws Exception {//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储String path = client.create().forPath("/app1");logger.info(path);}
创建节点并设置值和类型
@Testpublic void testCreate() throws Exception {//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
// String path = client.create().forPath("/app1");
// logger.info(path);// withMode 指定节点类型,是临时的,还是临时的,顺序的String path2 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","hello world".getBytes()); //创建节点并设置值logger.info(path2);}
withMode :模式。EPHEMERAL 临时的,当会话关闭时,节点会被删除。
创建多级节点
// creatingParentContainersIfNeeded 如果节点不存在则创建String path3 = client.create().creatingParentContainersIfNeeded().forPath("/app4/p1"); //创建多级节点logger.info(path3);
- creatingParentContainersIfNeeded 如果节点不存在则创建
查询节点
查询数据
byte[] bytes = client.getData().forPath("/app1");logger.info(new String(bytes));
查询所有子节点
//获取子节点List<String> paths = client.getChildren().forPath("/app4");paths.forEach(c ->logger.info(c));List<String> paths2 = client.getChildren().forPath("/");//查询根节点
查询节点信息
@Testpublic void testGetNodeInfo() throws Exception {// 查询节点状态信息 ls -sStat status = new Stat();client.getData().storingStatIn(status).forPath("/app1");logger.info(JSONObject.toJSONString(status));}
修改节点
@Testpublic void testSet() throws Exception {// 修改数据client.setData().forPath("/app1","hello".getBytes());}
修改节点(乐观锁修改,根据版本号)
@Testpublic void testSetForVersion() throws Exception {Stat status = new Stat();client.getData().storingStatIn(status).forPath("/app1");int version = status.getVersion();//查询版本logger.info("版本号:"+version);// 修改数据,如果版本不一致,则不修改数据,乐观锁模式client.setData().withVersion(version).forPath("/app1","hello".getBytes());}
图片表示版本号
删除节点
删除节点
/*** 1 删除节点 delete* 2 删除节点带有子节点的节点 deleteall* 3 必须成功的删除* 4 回调* @throws Exception*/@Testpublic void testDelete() throws Exception {// 删除节点dclient.delete().forPath("/app1");}
删除带有子节点的节点
@Testpublic void testDelete2() throws Exception {// 删除节点带有子节点的节点client.delete().deletingChildrenIfNeeded().forPath("/app4");}
必须删除成功
@Testpublic void testDelete3() throws Exception {// 必须删除成功client.delete().guaranteed().forPath("/app4");}
必须成功就是重试,防止网络抖动
删除后回调
@Testpublic void testCallback() throws Exception{// 回调client.delete().guaranteed().inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {logger.info("删除后的消息回调");}}).forPath("/app4");}