当前位置: 首页 > news >正文

Zookeeper之手写一个分布式锁

前言

我之前写了一篇快速上手ZK的文章:https://blog.csdn.net/qq_38974073/article/details/135293106

本篇最要是进一步加深学习ZK,算是一次简单的实践,巩固学习成果。

设计一个分布式锁

对锁的基本要求

  • 可重入:允许同一个应用内的同一个线程重复调用同一个方法;
  • 阻塞:没有拿到锁的线程将进入阻塞。
  • 公平的:先来先得。

实现原理

使用zk作为发号器,每个线程申请锁时会创建一个临时有序节点:

  • 节点编号最小的获得锁,完成业务操作之后删除临时节点;
  • 如果不是最小编号的节点,就监听前一个节点的删除事件,并进入阻塞状态,当触发回调的事件时,唤醒阻塞线程,并重新进行获取锁操作。

锁要求实现的描述:

  • 可重入:对同一个线程,不用重复获取锁,重入计数+1即可;
  • 阻塞:利用CountDownLatch实现,当触发回调时唤醒线程;
  • 公平的:利用zk临时有序节点的特点进行排队,先到先申请锁。

问:申请到锁之后,网络中断怎么办?

  • 临时节点随客户端关闭而被删除

问:如何避免羊群效应?

  • 每个线程只监听前一个节点

关键流程

在这里插入图片描述

关键代码实现

锁的关键方法:

  • 加锁:lock
  • 解锁:unLock
  • 尝试加锁:tryLock

public boolean lock() {if(Thread.currentThread().equals(thread)) {lockCount.incrementAndGet();return true;}while (true) {if (tryLock()) {thread = Thread.currentThread();lockCount.incrementAndGet();return true;}try {await();} catch (Exception e) {throw new RuntimeException(e);}}
}public synchronized boolean unlock() {if (!thread.equals(Thread.currentThread())) {return false;}int newLockCount = lockCount.decrementAndGet();if (newLockCount < 0) {throw new IllegalMonitorStateException("重入锁计数不可为负数" );}// 是否剩余重入次数if (newLockCount != 0) {return true;}// 到这一步,意味着lockCount已经为0,可以删除临时节点了try{if(client.isNodeExist(properties.getZkPath())) {client.deleteNode(lockedPathMap.get(thread));}} catch (Exception e) {return false;} finally {lockedPathMap.remove(thread);priorPathMap.remove(thread);}return true;
}protected boolean tryLock() {String lockedPath = lockedPathMap.get(Thread.currentThread());if (null == lockedPath || !client.isNodeExist(lockedPath)) {lockedPathMap.put(Thread.currentThread(), lockedPath = client.createEphemeralSeqNode(getLockPrefix()));}// 取得加锁的排队编号String lockedShortPath = getShorPath(lockedPath);List<String> waiters = getWaiters();// 如果自己是所有等待锁中的第一个,则获得锁if (checkLocked(waiters, lockedShortPath)) {return true;}// 当前线程节点是否在排队int index = Collections.binarySearch(waiters, lockedShortPath);if(index < 0) {throw new NullPointerException("可能网络抖动,连接断开,临时节点失效");}// waiters最后面的节点写入map,用来监听priorPathMap.put(Thread.currentThread(), getLockPrefix() + waiters.get(index - 1));return false;
}private boolean await() throws Exception {String priorPath = priorPathMap.get(Thread.currentThread());if (null == priorPath) {throw new NullPointerException("prior_path error");}final CountDownLatch latch = new CountDownLatch(1);// 删除事件Watcher w = watchedEvent -> {// 监测到前一个节点发生变化,接下来就可以唤起等待线程,重新尝试获取锁latch.countDown();};try{// 监听前一个节点的删除时间client.watcher(w, priorPath);} catch (KeeperException.NoNodeException e) {e.printStackTrace();return false;}return latch.await(properties.getTimeout(), TimeUnit.MILLISECONDS);
}

好了,如果你对这个感兴趣,不妨拉一下完整源码: https://gitee.com/liangshij/zk-lock-demo

源码简要说明

模块说明

  • lsj-zk-lock:核心实现。
  • lsj-zk-lock-spring-boot-starter:整合springboot
  • lsj-zk-lock-test:使用demo

安装

经典三步走:导包、配置、使用

  1. 拉取代码,将lsj-zk-lock、lsj-zk-lock-spring-boot-starter通过 mvn install 命令安装到本地仓库。
  2. 引入依赖:
<dependency><groupId>cn.lsj</groupId><artifactId>lsj-zk-lock-spring-boot-starter</artifactId><version>2.4.2</version>
</dependency>

配置

  • 配置locks和dataSource:
spring:zk:dataSource:url: "localhost"port: 2181locks:- zkPath: "/test/lock"lockName: "countLock"# 获取锁失败时,进入等待的时间,等待结束将重新尝试获取锁timeout: 5000- zkPath: "/test2/lock"lockName: "lock"timeout: 5000

使用

  • 使用方式1:通过@GlobalLock注解,指定要使用那个lock
@GetMapping("test2")
@GlobalLock("countLock")
public String test2() {// 业务代码return "";
}
  • 使用方式2:通过@Qualifier注解,指定要使用那个lock
@RestController
public class TestController {int count = 0;@Resource@Qualifier("lock")private ReentrantLock lock;@Resource@Qualifier("countLock")private ReentrantLock countLock;@GetMapping("test")public String test() {countLock.lock();try{for (int i = 0; i < 10000; i++) {count++;}} finally {countLock.unlock();}return String.valueOf(count);}
}
http://www.lryc.cn/news/269680.html

相关文章:

  • 【音视频 ffmpeg 学习】 RTMP推流 mp4文件
  • 跨进程通信 macOS XPC 创建实例
  • Python圣诞树代码
  • flask之文件管理系统-项目 JRP上线啦!!! ---修订版,兼容Windows和Linux系统
  • 希尔排序:排序算法中的调优大师
  • LeetCode 1185. 一周中的第几天
  • 大数据学习(30)-Spark Shuffle
  • Linux部署ELK
  • Python 实现 PDF 到 Word 文档的高效转换(DOC、DOCX)
  • 【MYSQL】MYSQL 的学习教程(七)之 慢 SQL 优化思路
  • unity学习笔记----游戏练习0
  • ai概念:强人工智能介绍、迁移学习
  • go语言设计模式-单例模式
  • 超维空间S2无人机使用说明书——51、基础版——使用yolov8进行目标跟踪
  • Transformer(seq2seq、self-attention)学习笔记
  • 2023-12-29 服务器开发-centos部署ftp
  • 螺旋数字阵(100%用例)C卷 (JavaPythonNode.jsC语言C++)
  • AUTOSAR从入门到精通-网络通信(UDPNm)(二)
  • 显示器与按键(LCD 1602 + button)
  • 2020年认证杯SPSSPRO杯数学建模B题(第一阶段)分布式无线广播全过程文档及程序
  • 【CISSP学习笔记】7. 安全评估与测试
  • Gateway集成方法以及拦截器和过滤器的使用
  • 第G2周:人脸图像生成(DCGAN)
  • 【Web】Ctfshow Thinkphp5 非强制路由RCE漏洞
  • python3遇到Can‘t connect to HTTPS URL because the SSL module is not available.
  • QSPI Flash xip取指同时program过程中概率性出现usb播歌时断音
  • MySQL聚簇索引和非聚簇索引的区别
  • 【C#】蜗牛爬井问题C#控制台实现
  • IP地址的四大类型:动态IP、固定IP、实体IP、虚拟IP的区别与应用
  • Linux Debian12安装和使用ImageMagick图像处理工具 常见图片png、jpg格式转webp格式