当前位置: 首页 > news >正文

Redis分布式锁核心原理源码

文章目录

  • 概述
  • 一、Redis实现分布式锁
    • 1.1、第一版
    • 1.2、第二版
    • 1.3、第三版
    • 1.3、第四版
  • 二、Redisson实现分布式锁核心源码分析
    • 2.1、加锁核心源码
    • 2.2、锁续期核心源码
    • 2.3、重试机制核心源码
    • 2.4、解锁核心源码
  • 总结


概述

  传统的单机锁(Synchronized,ReentrantLock)都是进程级别的锁,无法应对服务多实例部署的场景(每个服务实例都有自己的进程),如果需要跨进程加锁,则需要引入第三方工具对于进程统一管理。
  使用Redis可以实现简易的分布式锁,而最常见的成熟的分布式锁方案是Redisson

一、Redis实现分布式锁

  案例工程:减库存,没有加锁控制,在高并发的场景下必然会出现超卖的问题。如果是在单点部署的情况下,可以通过本地锁解决,但是目前服务多点部署,本地锁的方案无法进行控制。

@Service
public class DistributedLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum,int orderId){//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));}
}

1.1、第一版

  以stock:lock:前缀加上orderId作为key,使用setIfAbsent进行加锁,setIfAbsent命令是Redis原生的setNx命令在客户端的体现,setNx命令是仅仅当设置的key不存在时,才可以成功,保证互斥性。
  这样做存在的问题是,如果在执行业务代码的过程中,出现了异常,那么解锁的代码则永远无法执行,造成死锁

@Service
public class DistributedLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum,int orderId){Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");if (lock){//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));stringRedisTemplate.delete(STOCK_LOCK + orderId);}}
}

1.2、第二版

  针对第一版的问题进行改造,将解锁的代码放到finally代码块中。这种方案依旧会存在问题,因为finally代码块只能保证程序出错时最终执行,无法保证服务器宕机造成的死锁,所以最好在加锁时设置一个超时时间,到期自动释放。

    public void deduceStock(int orderNum,int orderId){try {Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");if (lock){//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0){stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));stringRedisTemplate.delete(STOCK_LOCK + orderId);}} catch (Exception e) {}finally {stringRedisTemplate.delete(STOCK_LOCK + orderId);}}

1.3、第三版

  设置超时时间,可以使用stringRedisTemplate.expire方法,但是这样写:

Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock");
stringRedisTemplate.expire(STOCK_LOCK + orderId, 10, TimeUnit.SECONDS);

  是不具有原子性的,需要分为两条命令执行,如果在执行两条命令之间出现问题,依旧会造成死锁的问题。在Redis的层面提供了一条命令 set NX EX,保证设置超时时间和加锁是原子性操作:

Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, "lock",10, TimeUnit.SECONDS);

  加锁时存在的问题看似是解决了,但是解锁的代码:

stringRedisTemplate.delete(STOCK_LOCK + orderId);

  会存在一种情况:

  • 线程一获取到了锁,然后在执行业务代码的时候陷入了阻塞。
  • 线程一的锁到期自动释放。
    • 线程二获取到了锁,执行业务代码
  • 线程一从阻塞状态恢复,执行完业务代码,要执行最终的解锁逻辑
  • 线程一将线程二的锁解锁。

1.3、第四版

  为了避免当前线程将其他线程的锁误解锁,需要在加锁时加入自己的线程唯一标识,并且在解锁时进行判断
  注意,不要用当前Thread.currentThread().getId()方法去获取线程ID,因为不同机器上的线程ID可能会重复。Redisson底层也不是直接用上述的API获取的线程ID,而是和UUID进行了拼接。

//加锁
String threadId = UUID.randomUUID().toString().replace("-","");
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(STOCK_LOCK + orderId, threadId,10, TimeUnit.SECONDS);//解锁
String threadIdFromRedis = stringRedisTemplate.opsForValue().get("STOCK_LOCK + orderId");
if (threadId.equals(threadIdFromRedis)){stringRedisTemplate.delete(STOCK_LOCK + orderId);
}

  但是这样写, 解锁和之前的加锁设置超时时间有同样的问题,都是操作分为了两步,不能保证原子性。 在解锁的判断上,Redis并没有提供原子性的命令,需要自己去通过lua脚本实现。


  经过四版改动,自己用Redis实现的分布式锁已经基本可用了,但是深究下来依旧存在一些问题或不足:

  1. 如果执行业务代码的时间,超过了设置的锁超时时间,当前逻辑是没有自动续期的。
  2. 当前的逻辑不支持锁重入。
  3. 当前的逻辑没有实现重试机制,获取不到锁的线程无法进行重试。

二、Redisson实现分布式锁核心源码分析

  相比较于自己通过set NX EX + lua脚本实现的分布式缓存锁,Redisson是更为成熟的方案,也推荐在生产环境使用。Redisson分布式锁在API层面是非常简单的:

public class RedissonLockDemo {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate Redisson redisson;private final String STOCK_PREFIX = "stock:";private final String STOCK_LOCK = "stock:lock:";public void deduceStock(int orderNum, int orderId) {//获取分布式锁RLock lock = redisson.getLock(STOCK_LOCK + orderId);try {//加分布式锁,可以指定超时时间,没有指定超时时间默认30s,底层会自动续期。lock.lock();//业务代码int stockNumber = Integer.parseInt(stringRedisTemplate.opsForValue().get(STOCK_PREFIX + orderId));if (stockNumber > 0) {stockNumber = stockNumber - orderNum;}stringRedisTemplate.opsForValue().set(STOCK_PREFIX + orderId, String.valueOf(stockNumber));} catch (Exception e) {} finally {lock.unlock();}}
}

  关键代码:

//获取分布式锁
RLock lock = redisson.getLock(STOCK_LOCK + orderId);
//加分布式锁,可以指定超时时间,没有指定超时时间默认30s,底层会自动续期。
lock.lock();
//解锁
lock.unlock();

2.1、加锁核心源码

  跟踪lock.lock();,进入lockInterruptibly
在这里插入图片描述
  首先第一次加锁,进入的是tryAcquire方法,最终的核心逻辑是:
在这里插入图片描述
  底层执行的是一段lua脚本,lua脚本和pipeline类似,也是可以将命令批量执行。虽然脚本中分了很多条命令,但是其他客户端要等到当前客户端的lua脚本全部执行完,才能执行脚本。

  • KEYS[1]:是作为当前分布式锁的Key,也就是用户在redisson.getLock时传入的。
  • ARGV[1]:是默认的超时时间,30s。
  • ARGV[2]:是当前线程的唯一标识,用线程ID拼接上UUID。
# 加锁的逻辑
# 当前分布式锁的key不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
#	调用hset命令,key是分布式锁的key,value的key是当前线程的唯一标识,用线程ID拼接上UUID。 value是1(重入次数)
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
#  设置超时时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 返回null
"return nil; " +
"end; " +
# 重入的逻辑
# 当前分布式锁的key存在,并且是当前线程持有
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
# 重入次数 + 1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
# 设置超时时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 返回null 
"return nil; " +
"end; " +
# 查询指定键的剩余生存时间,并且返回
"return redis.call('pttl', KEYS[1]);"

  上面这个脚本的执行,包含了可重入锁初次加锁的逻辑,最终还会返回当前锁的剩余时间

2.2、锁续期核心源码

  Redisson锁的续期,也称为看门狗机制。如果要实现锁续期,常见的设计思想是在业务线程执行时,开启一个守护线程,对业务线程进行监控,如果锁到期,业务线程还没有执行完,就执行续期的逻辑
  在Redisson中的实现,调用完tryLockInnerAsync方法后,会回调operationComplete,通过future.getNow();获取到加锁的结果,上面的lua脚本,在加锁成功和重入成功后,都会返回null。
在这里插入图片描述
  进入scheduleExpirationRenewal方法,该方法就是实现续期的核心方法实现,类似于一个延迟任务的线程池,延迟30/3 = 10s执行,整个方法分为两部分
  首先依旧是执行一段lua脚本**(KEYS[1],ARGV[2],ARGV[1] 和第一段加锁时的lua脚本参数含义相同)**

# 当前分布式锁的key存在,并且是当前线程持有
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
# 进行续期30s
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
# 续期成功就返回1
"return 1; " +
"end; " +
# 否则返回 0 
"return 0;",

  第二部分则是拿到lua脚本执行的结果,递归调用scheduleExpirationRenewal方法,延迟10s执行,最终的结果是每隔10s进行一次续期,每次续期30s
在这里插入图片描述

2.3、重试机制核心源码

  加锁时的lua脚本,如果没有加锁或者重入成功,那么最终返回的是key的剩余生存时间
在这里插入图片描述

  返回到方法的最外层,在lockInterruptibly中执行自旋重试的逻辑。这里的自旋重试并非是在while循环中不断地循环,而是有一定的间隔时间。
  在进入while循环后首先会再次尝试获取锁,如果失败了,就通过Semaphore的API,在规定的TTL毫秒内尝试获取许可,如果有其他线程释放(即唤醒),当前线程就会继续执行。如果超时仍未获取到许可,则返回 false。
在这里插入图片描述
  如果业务代码执行的时间短于设置的锁超时时间,那么其他等待锁的线程并不会阻塞到超时时间后再去竞争锁,在执行while循环之前,会通过redis的发布订阅模型,将自身存入一个队列中。
在这里插入图片描述
  唤醒队列中元素的逻辑,在解锁中。

2.4、解锁核心源码

  解锁同样是通过lua脚本,将判断线程标识和解锁组成原子性的操作,解锁的lua脚本在unlockInnerAsync方法中:

  • KEYS[1]:当前分布式锁的key
  • KEYS[2]:当前分布式锁的key 拼接上 redisson_lock__channel
  • ARGV[1]:解锁消息标识,默认0L
  • ARGV[2]:锁超时释放时间
  • ARGV[3]:当前线程的唯一标识,用线程ID拼接上UUID。

  主线程在解锁的时候会往队列中发送给一条消息,唤醒等待线程:

  • 当前锁不存在,超时释放了
  • 存在并且解锁成功
# 当前key对应的分布式锁不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
# 发布解锁消息标识到当前分布式锁的key 拼接上 redisson_lock__channel
"redis.call('publish', KEYS[2], ARGV[1]); " +
# 返回1
"return 1; " +
"end;" +
# 当前key对应的分布式锁非本线程持有
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
# 返回null
"return nil;" +
"end; " +
# 可重入锁的解锁,重入次数 - 1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
# 重入次数>0
"if (counter > 0) then " +
# 重新设置超时时间
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
# 返回
"return 0; " +
"else " +
# 删除当前key对应的分布式锁
"redis.call('del', KEYS[1]); " +
# 发布解锁消息标识到当前分布式锁的key 拼接上 redisson_lock__channel
"redis.call('publish', KEYS[2], ARGV[1]); " +
# 返回1
"return 1; "+
"end; " +
"return nil;",

  消费者 (正在阻塞等待的线程) 接受到了消息,会回调LockPubSubonmessage方法,被唤醒然后重新争抢锁。
在这里插入图片描述

总结

在这里插入图片描述

http://www.lryc.cn/news/576825.html

相关文章:

  • #华为鲲鹏#华为计算#鲲鹏开发者计划2025#
  • Transformer结构与代码实现详解
  • 【电路笔记 TMS320F28335DSP】TI SCI (增强型的UART) 点对点异步串行通信接口
  • 【OpenGL学习】(八)图形变换
  • Oauth2 自定义设置token过期时间
  • 状态机编程实战 | 如何更优雅地处理字符串
  • 全新大模型开源,腾讯(int4能打DeepSeek) Vs 谷歌(2GB运行多模态)
  • Gemini-CLI:谷歌开源的命令行AI工具,重新定义开发者工作流
  • Ubuntu22 安装 RTX 5070 Ti Nvidia Driver 驱动
  • 自学嵌入式 day27 进程
  • mac系统快捷键及命令安装
  • 状态模式 - Flutter中的状态变身术,让对象随“状态“自由切换行为!
  • 边界的艺术:支持向量机与统计学习时代的王者
  • 设计模式-外观模式、适配器模式
  • 【数据挖掘】聚类算法学习—K-Means
  • YOLOv12_ultralytics-8.3.145_2025_5_27部分代码阅读笔记-conv.py
  • 设备预测性维护和异常检测系统设计方案
  • 【HuggingFace】模型下载至本地访问
  • Git安装全攻略:避坑指南与最佳实践
  • C++ 格式化输入输出
  • 人工智能时代的职业替代风险与应对策略分析
  • MySQL技巧
  • 性能分析专栏 -- top命令
  • 【修电脑的小记录】连不上网
  • 打造地基: App拉起基础小程序容器
  • 疏通经脉: Bridge 联通逻辑层和渲染层
  • 深入理解 Dubbo 负载均衡:原理、源码与实践
  • RK3588集群服务器性能优化案例:电网巡检集群、云手机集群、工业质检集群
  • [Python 基础课程]PyCharm 的安装
  • 大数据Hadoop之——Flume安装与使用(详细)