当前位置: 首页 > news >正文

zk111111111111111111

Zookeeper
1 zookeeper(作为 dubbo 的注册中心):
概述: zookeper 是 一个分布式的、开源的分布式应用程序的协调服务,管理分布式应
作用: 配置管理,分布式锁,集群管理
2 zookeeper 的安装 (dubbo 的资料中已经整理)
3 zookeeper 的数据模型
zookeeper 是一个树形的服务目录,具有明确的层次化结构,树的每一个节点叫
znode,每个节点都记录自己的信息和数据,节点上存储的数据大小为 1m,在我们的
window 目录结构中目录不存数据,(注意对比)
节点分类:
PERSISTENT 持久化节点
EPHEMERAL 临时节点 :-e
PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es
图示

 

4 zookeeper 的服务端命令
启动 ZooKeeper 服务: ./zkServer.sh start
查看 ZooKeeper 服务状态: ./zkServer.sh status
停止 ZooKeeper 服务: ./zkServer.sh stop
重启 ZooKeeper 服务: ./zkServer.sh restart
5 zookeeper 的客户端命令:
连接 zookeeper 的服务端: ./zkCli.sh –server ip:port
断开连接: quit
设置节点值: set /节点 path value
删除单个节点: delete /节点 path
获取帮助: help
显示指定目录下节点: ls 目录
删除带有子节点的节点: deleteall /节点 path
创建子节点: create /path value
获取节点值: get /节点 path
创建临时节点: create -e /节点 path value
创建顺序节点: create -s /节点 path value
查询节点详细信息: ls –s /节点 path
6 zookeeper 通过 javaapi 进行操作
Curator 一套操作 zookeeper 的 Java api:
pom.xml 文件(注意在版本上有限制,直接使用交高的版本)
<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
(1) 创建连接 zookeeper 的对象(可以抽取为工具类):
代码:

package utils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* @ClassName: CuratorUtils
* @Author: nanfeng
* @Date: 2021/1/20 11:26
* @Description: curator 发音: [ˈkjʊrˌeɪdər]
*/
public class CuratorUtils {
private static CuratorFramework client;
static {
/*
* @param connectString 连接字符串。zk server 地址和端口
"192.168.149.135:2181,
* 192.168.149.136:2181" 多个连接地址
* @param sessionTimeoutMs 会话超时时间 单位 ms
* @param connectionTimeoutMs 连接超时时间 单位 ms
* @param retryPolicy 重试策略
*/
// 重试策略:参数为休眠时间和最大重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
/*
//1.第一种方式
CuratorFramework client =
CuratorFrameworkFactory.newClient("192.168.149.135:2181",
60 * 1000, 15 * 1000, retryPolicy);
*/
//2.第二种方式:链式创建
//CuratorFrameworkFactory.builder();
client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") //zookeeper 的连接地
址和端口号
.sessionTimeoutMs(60 * 1000) //会话时间
.connectionTimeoutMs(15 * 1000) //连接超时时间
.retryPolicy(retryPolicy) //重试策略
.namespace("nanfeng") //名称空间,可有可无,
根据情况进行添加
.build();
//开启连接
client.start();
}
/**
* 获取连接对象
*
* @param client
* @return
*/
public static CuratorFramework getClient(CuratorFramework client) {
return client;
}
/**
* 释放连接
*
* @param client
*/
public static void clossConnection(CuratorFramework client) {
if (client != null) {
client.close(); }}}
(2) 进行基本的增删改查:
package test;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import utils.CuratorUtils;
import java.util.Arrays;
import java.util.List;
/**
* @ClassName: curatorTest
* @Author: nanfeng
* @Date: 2021/1/20 14:40
* @Description: Curator 的 api 操作 zookeeper
*/
public class CuratorTest {
private static CuratorFramework client;
/*获取公共的连接对象*/
static {
client = CuratorUtils.getClient();
}
/* 创建节点
*/
/**
* 创建节点:create 持久 临时 顺序 数据
* 1. 基本创建 :create().forPath("")
* 2. 创建节点 带有数据:create().forPath("",data)
* 3. 设置节点的类型:create().withMode().forPath("",data)
* 4. 创建多级节点 /app1/p1 :
create().creatingParentsIfNeeded().forPath("",data)
*/
@Test
/*增加*/
public void addZnode1() throws Exception {
//创建节点
String nanfeng1 = client.create().forPath("/nanfeng1");
System.out.println(nanfeng1);
//释放连接
CuratorUtils.clossConnection(client);
}
@Test
public void addZnode2() throws Exception {
//创建节点带有数据
String nanfeng2 = client.create().forPath("/nanfeng2",
"nanfeng2".getBytes());
System.out.println(nanfeng2);
CuratorUtils.clossConnection(client);
}
@Test
public void addZnode3() throws Exception {
//创建节点设置节点类型
//节点默认类型为持久化的
String nanfeng3w =
client.create().withMode(CreateMode.EPHEMERAL).forPath("/nanfeng3");
System.out.println(nanfeng3w);
CuratorUtils.clossConnection(client);
}
@Test
public void addZnode4() throws Exception {
//创建多级节点
String s =
client.create().creatingParentContainersIfNeeded().forPath("/nanfeng4
/nanzi");
System.out.println(s);
CuratorUtils.clossConnection(client);
}
/* 删除节点 */
/**
* 删除节点: delete deleteall
* 1. 删除单个节点:delete().forPath("/app1");
* 2. 删除带有子节点的节
点:delete().deletingChildrenIfNeeded().forPath("/app1");
* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。
client.delete().guaranteed().forPath("/app2");
* 4. 回调:inBackground
*
* @throws Exception
*/
/* 关于回调函数: 在执行某项操作时,自动开启自定义的一系列业务,即
监听执行 */
@Test
/*删除*/
public void deleteZnode1() throws Exception {
//删除单个节点
client.delete().forPath("/nanfeng1");
CuratorUtils.clossConnection(client);
}
@Test
public void deleteZnode2() throws Exception {
//删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/nanfeng4");
CuratorUtils.clossConnection(client);
}
@Test
public void deleteZnode3() throws Exception {
//强制删除(必须删除成功)
client.delete().guaranteed().forPath("/nanfeng2");
CuratorUtils.clossConnection(client);
}
@Test
public void deleteZnode4() throws Exception {
//删除回调(可以完成一些特殊的功能)
client.delete().guaranteed().inBackground(new
BackgroundCallback() {
@Override
public void processResult(CuratorFramework
curatorFramework, CuratorEvent curatorEvent) throws Exception {
//回调
//此处执行特定功能,
System.out.println("回调函数触发了");
}
}).forPath("/nanfeng3");
CuratorUtils.clossConnection(client);
}
/* 修改 */
/**
* 修改数据
* 1. 基本修改数据:setData().forPath()
* 2. 根据版本修改: setData().withVersion().forPath()
* * version 是通过查询出来的。目的就是为了让其他客户端或者线程不
干扰我。
*/
@Test
public void setZnode1() throws Exception {
//修改主要是修改节点存放的数据
Stat stat = client.setData().forPath("/nanfeng2",
"nanzi".getBytes());
//将修改的状态打印查看
System.out.println(stat);
CuratorUtils.clossConnection(client);
}
/* 获取
*/
/**
* 查询节点:
* 1. 查询数据:get: getData().forPath()
* 2. 查询子节点: ls: getChildren().forPath()
* 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对
象).forPath()
*/
@Test
public void getZnode1() throws Exception {
//查询节点数据
byte[] bytes = client.getData().forPath("/nanfeng2");
System.out.println(Arrays.toString(bytes));
CuratorUtils.clossConnection(client);
}
@Test
public void getZnode2() throws Exception {
//查询子节点
List<String> list = client.getChildren().forPath("/");
System.out.println(list);
CuratorUtils.clossConnection(client);
}
@Test
public void getZnode3() throws Exception {
//查询节点的状态信息
Stat stat = new Stat();
System.out.println(stat);
client.getData().storingStatIn(stat).forPath("/nanfeng1");
//打印刷新后的状态信息
System.out.println(stat);
CuratorUtils.clossConnection(client);
}
}
(3) Watch 事件监听
package test;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.junit.Test;
import utils.CuratorUtils;
import java.util.Arrays;
/**
* @ClassName: WatchTesyt
* @Author: nanfeng
* @Date: 2021/1/20 21:10
* @Description: TODO
*/
public class WatchTest {
CuratorFramework client;
public WatchTest() {
client = CuratorUtils.getClient();
}
@Test
/*1:NodeCache 监听某一个特定的节点*/
public void testNodeCache() throws Exception {
//创建 NodeCache 对象
final NodeCache nodeCache = new NodeCache(client, "/nanfeng1");
//注册监听
nodeCache.getListenable().addListener(new NodeCacheListener()
{
@Override
public void nodeChanged() throws Exception {
//处理节点变化后的业务;
//类似触发器业务
System.out.println("发生改变了");
//获取修改后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(Arrays.toString(data));
}
});
//开启监听,同时加载缓冲数据
nodeCache.start(true);
//设置死循环为了不断续监听
for (; ; ) {
}
}
/*2:PathChildreCache 监听一个 ZNode 的子节点,不包含本节点*/
public void testPathChildrenCache() throws Exception {
//创建监听器对象
PathChildrenCache pathChildrenCache = new
PathChildrenCache(client,"/nanfeng"/*监听对象*/,true/*缓存状态信息*/);
//绑定监听器
pathChildrenCache.getListenable().addListener(new
PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework,
PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
//发生变化后需要处理的业务代码
//触发某个信息
System.out.println("发生变化了");
//查看这个对象的状态
System.out.println(pathChildrenCacheEvent);
PathChildrenCacheEvent.Type type =
pathChildrenCacheEvent.getType();
//如果 type 是升级,就获取最新的数据
if
(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
//确认是升级了
System.out.println("升级了");
//获取最新的数据
byte[] data =
pathChildrenCacheEvent.getData().getData();
System.out.println(Arrays.toString(data));
}
}
});
//开启监听
pathChildrenCache.start();
//死循环为了查看,当真正部署到服务器不用使用这种
while (true){}
}
/*3:TreeCach 监听整个树的节点 */
public void testTreeCache() throws Exception {
//1. 创建监听器
TreeCache treeCache = new TreeCache(client,"/nanfeng");
//2. 注册监听
treeCache.getListenable().addListener(new TreeCacheListener()
{
@Override
public void childEvent(CuratorFramework client,
TreeCacheEvent event) throws Exception {
System.out.println("节点变化了");
System.out.println(event);
}
});
//3. 开启监听
treeCache.start();
while (true){
}}}
7 分布式锁
7 分布式锁
作用: 解决跨机器的进程之间的数据同步问题
实现分布式锁的三种方式:
(1) 基于 redis 的
图示:
实现原理:
前提说明 (每次执行的 key 访问 redis 消费者约定相同,因为在当前的服务执行完毕当前的
业务后会删除(即释放锁/超时),再次执行相同的键 设置值,会根据结果返回锁的获取结果)
1> 当第一个 client 来访问 redis 获取生产者信息数据时,redis 执行 setnx key value 同
时限制有效时间,(在限制了有效时间后即使当前的 client 宕机了,当有效时间过了,锁也会
自动被释放,控制不会产生死锁),
If(result=1){
执行业务
释放锁
}
//超时,自动释放
2 > 当第二个 client 再来访问 redis 时,执行 setnx key value
If(result=1){
执行业务
释放锁
} else{ 排队
}
3 >.......
(2) 基于数据库的
思想:让服务端去访问数据库,在数据库中设置一张表,每一有消费者访问数据库,请求
锁时,在特定的表中为其设置数据,消费者拿到锁后,去进行资源的访问,执行完毕后,释
放锁(中间会要设置消费者的数据的有效时间,过时自动清除消费者的请求信息),当后
面的消费者来请求锁时,先查询这张约存放锁信息的表,若是表中含有数据,排队,若是
表中没有数据则执行后续操作.......
(3) zookeeper 实现分布式锁的原理:
思想:客户端想要获取去访问资源, 先注册节点,执行完毕后,释放锁(即删除节点)
解析图:

 

 

http://www.lryc.cn/news/61566.html

相关文章:

  • 018:Mapbox GL加载Google地图(影像瓦片图)
  • Web API 和 API 的区别编写api
  • IDEA 用上这款免费 GPT4 插件,生产力爆表了
  • 1187.使数组严格递增 学习记录
  • 权限控制_SpringSecurity
  • 2023年最系统的自动化测试,测试开发面试题,10k以下不建议看
  • 今年SMETA审核费用即将涨价
  • 基于贝叶斯优化CNN-LSTM混合神经网络预测(Matlab代码实现)
  • 基于深度学习和生理信号的疾病筛查:个体内和个体间研究的价值与应用
  • 现在有t1,t2,t3三个线程,实现t1,t2线程同步执行,然后再执行t3线程,使用Java实现该程序
  • 83.qt qml-初步学习2D粒子影响器(二)
  • 4.17-4.18学习总结
  • Spring事务
  • Linux新的设备或分区挂载到系统中mount使用方法
  • 移动硬盘损坏如何恢复数据
  • Material Design:为你的 Android 应用提供精美的 UI 体验
  • springboot+vue学生毕业离校系统(源码+说明文档)
  • 【Android入门到项目实战-- 6.2】—— 如何访问其他应用程序的数据?
  • 【100个 Unity实用技能】 | InputField输入框组件实现输入限制,只能输入中文或特殊字符等
  • 倍数+路径之谜
  • 【Unity渲染】URP透明物体自身渲染穿插异常问题
  • c/c++:指针,指针定义和使用,指针大小4字节,野指针,空指针*p=NULL
  • CAS实现原⼦操作的三⼤问题,该如何解决?
  • Linux Shell 实现一键部署二进制go+caddy+filebrowser
  • 无人机巡检智能一体化解决方案
  • 2023-2029全球粘结剂喷射技术行业调研及趋势分析报告
  • Python每日一练(20230420)
  • 【社区图书馆】读《悲惨世界》有感
  • 随机蛙跳算法 (SFLA)简单实现(Matlab代码实现)
  • 【手把手做ROS2机器人系统开发二】熟悉ROS2基本命令