Redisson 分布式锁原理解析
Redisson 分布式锁原理解析
分布锁有很多种形式,今天就来研究下Redisson 分布式锁的实现
Redisson 分布式锁使用
在讲实现前,先来说下Redisson分布式锁的使用。首先先引入Redisson 得依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.47.0</version>
</dependency>
如果是Spring项目则引入starter
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.47.0</version>
</dependency>
(这里使用spring项目作为基础使用)
public class RedissionClientTest {public void RedissonTest(){// 获取锁实例RLock lock = redissonClient.getLock(key);try {// 尝试加锁lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);} finally {// 释放锁if (lock.isLocked() && lock.isHeldByCurrentThread()) {lock.unlock();}}}
}
上面是对于常规分布式锁的使用,一般会使用tryLock进行加锁,以下是对锁参数的解释:
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);
waitTime:获取锁的等待时间,现场会尝试获取锁,超过这个时间,获取锁失败
leaseTime:锁的过期时间,超过时间,自动释放锁
Redisson源码实现
WatchDog 看门狗机制原理
如果说我们不设置过期时间,则redisson会有锁的续约机制,也就是常说的看门狗机制,watchDog。
代码的实现如下:
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}CompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getRawName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// reschedule itselfrenewExpiration();} else {cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}
internalLockLeaseTime 默认为 30 * 1000 也就是30秒,redisson开了个task每过1/3得时间也就是10秒会自动续约一次,直到现场手动释放锁。
Redisson 各个类型锁的实现
非公平锁
先来说下非公平锁的实现,我们在使用的时候如果没有指定,则默认走的都是非公平锁,他的加锁是用的redis的Hash表,通过lua脚本来实现。具体源码如下:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}
通过上面的lua脚本我们可以看出来,其实有两步:
- 先判断是否存在key了,如果不存在,也就是是否已经加过锁了,如果没有加过,则先hincrby计数+1然后设置过期时间。
- 如果存在,则说明已经获取过一次锁了,通过hincrby计数+1然后设置过期时间,此时就是实现了锁的重入功能,非常的巧妙。
公平锁实现
然后再来说下公平锁,公平锁的实现是在非公平锁的基础上加入了队列,而他队列的实现是通过ZSET结构来实现。按照时间作为score进行先后顺序排列。
lua脚本源码如下:
-- 清理等待队列中超时的线程
while true dolocal firstThreadId2 = redis.call('lindex', KEYS[2], 0);if firstThreadId2 == false thenbreak;end;local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));if timeout <= tonumber(ARGV[4]) thenredis.call('zrem', KEYS[3], firstThreadId2);redis.call('lpop', KEYS[2]);elsebreak;end;
end;-- 如果锁未被占用,并且(等待队列为空 或者 队列头是自己)
if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then-- 移除自己redis.call('lpop', KEYS[2]);redis.call('zrem', KEYS[3], ARGV[2]);-- 队列中的其他线程等待时间递减(防止饥饿)local keys = redis.call('zrange', KEYS[3], 0, -1);for i = 1, #keys, 1 doredis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);end;-- 设置锁(hash 结构保存线程 ID + 重入次数)redis.call('hset', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;-- 如果锁已存在,并且当前线程是持有者:重入逻辑
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 thenredis.call('hincrby', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;
参数说明:
KEYS 参数(由 Java 代码传入)
KEYS[i] | 描述 |
---|---|
KEYS[1] | 锁标识,如:lock:{key} ,类型:Hash(用于重入) |
KEYS[2] | 等待线程的 FIFO 队列,类型:List |
KEYS[3] | 等待线程的超时集合,类型:ZSet(score=过期时间戳) |
ARGV 参数(由 Java 代码传入)
ARGV[i] | 描述 |
---|---|
ARGV[1] | 锁 TTL(毫秒) |
ARGV[2] | 当前线程唯一标识(客户端 + UUID + threadId) |
ARGV[3] | 单线程最大等待时间(毫秒) |
ARGV[4] | 当前时间戳(毫秒) |
流程图:
线程请求锁
│
├─▶ 判断锁是否空闲 + 当前线程是否队头
│ └─是:加锁 + 设置 TTL
│ └─否:
│ ├─是否重入 → 是:重入 + TTL 延期
│ └─是否已在队列 → 是:返回预计等待时间
│ 否:加入队列,记录超时点
读锁
todo
写锁
todo