分布式锁(Redis的setnx、Redisson)
一、使用Redis的setnx实现分布式锁
1、使用Redis的setnx实现分布式锁出现的问题
(1) 宕机时的锁释放问题
在分布式系统中,如果一个节点获取了锁,但在执行任务过程中发生故障,没有释放锁,其他节点可能会一直等待锁被释放。
解决方案:设置锁的过期时间,确保即使持有锁的节点发生故障,锁也会在一定时间后被自动释放。
(2)锁误释放问题
如果使用固定的键来表示锁,并且客户端在完成任务后释放锁,可能会错误地释放其他客户端获取的锁。
解决方案:每个客户端在获取锁时使用一个唯一的值(如UUID),并在释放锁时检查存储在键中的值是否与自己的值匹配。
(3)原子性问题
SETNX
命令本身是原子的,但是如果你需要执行一系列操作(比如设置值和设置过期时间),它们不是原子的。这意味着如果在设置值之后和设置过期时间之前发生故障,可能会导致键没有过期时间。
解决方案:执行Lua脚本期间不会有其他脚本或命令被执行,从而保证了操作的原子性。
2、测试锁的使用代码实现
/*** 采用SpringDataRedis实现分布式锁* 原理:执行业务方法前先尝试获取锁(setnx存入key val),如果获取锁成功再执行业务代码,业务执行完毕后将锁释放(del key)*/
@Override
public void testLock() {//0.先尝试获取锁 setnx key val//问题:锁可能存在线程间相互释放//Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock", 10, TimeUnit.SECONDS);//解决:锁值设置为uuidString uuid = UUID.randomUUID().toString();Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);if(flag){//获取锁成功,执行业务代码//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0String value = stringRedisTemplate.opsForValue().get("num");//2.如果值为空则非法直接返回即可if (StringUtils.isBlank(value)) {return;}//3.对num值进行自增加一int num = Integer.parseInt(value);stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));//4.将锁释放 判断uuid//问题:删除操作缺乏原子性。//if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){ //线程一:判断是满足是当前线程锁的值// //条件满足,此时锁正好到期,redis锁自动释放了线程2获取锁成功,线程1将线程2的锁删除// stringRedisTemplate.delete("lock");//}//解决:redis执行lua脚本保证原子,lua脚本执行会作为一个整体执行//执行脚本参数 参数1:脚本对象封装lua脚本,参数二:lua脚本中需要key参数(KEYS[i]) 参数三:lua脚本中需要参数值 ARGV[i]//4.1 先创建脚本对象 DefaultRedisScript泛型脚本语言返回值类型 Long 0:失败 1:成功DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();//4.2设置脚本文本String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +"then\n" +" return redis.call(\"del\",KEYS[1])\n" +"else\n" +" return 0\n" +"end";redisScript.setScriptText(script);//4.3 设置响应类型redisScript.setResultType(Long.class);stringRedisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);}else{try {//睡眠Thread.sleep(100);//自旋重试this.testLock();} catch (InterruptedException e) {e.printStackTrace();}}
}
二、使用Redisson
1. 引入依赖:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId>
</dependency>
2. Redission配置类:
/*** redisson配置信息*/
@Data
@Configuration
@ConfigurationProperties("spring.data.redis")
public class RedissonConfig {private String host;private String password;private String port;private int timeout = 3000;private static String ADDRESS_PREFIX = "redis://";/*** 自动装配**/@BeanRedissonClient redissonSingle() {Config config = new Config();if(!StringUtils.hasText(host)){throw new RuntimeException("host is empty");}SingleServerConfig serverConfig = config.useSingleServer().setAddress(ADDRESS_PREFIX + this.host + ":" + port).setTimeout(this.timeout);if(StringUtils.hasText(this.password)) {serverConfig.setPassword(this.password);}return Redisson.create(config);}
}
3. 测试Redisson锁的使用代码实现
@Autowired
private RedissonClient redissonClient;/*** 使用Redison实现分布式锁* 开发步骤:* 1.使用RedissonClient客户端对象 创建锁对象* 2.调用获取锁方法* 3.执行业务逻辑* 4.将锁释放**/
public void testLock() {//0.创建锁对象RLock lock = redissonClient.getLock("lock1");//0.1 尝试加锁//0.1.1 lock() 阻塞等待一直到获取锁,默认锁有效期30slock.lock();//1.先从redis中通过key num获取值 key提前手动设置 num 初始值:0String value = stringRedisTemplate.opsForValue().get("num");//2.如果值为空则非法直接返回即可if (StringUtils.isBlank(value)) {return;}//3.对num值进行自增加一int num = Integer.parseInt(value);stringRedisTemplate.opsForValue().set("num", String.valueOf(++num));//4.将锁释放lock.unlock();}
负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
看门狗原理:
只要线程一加锁成功,就会启动一个 watch dog 看门狗,它是一个后台线程,会每隔`10`秒检查一下,如果线程一还持有锁,那么就会不断的延长锁`key`的生存时间。因此,Redisson就是使用Redisson解决了锁过期释放,业务没执行完问题。
- 如果我们指定了锁的超时时间,就发送给Redis执行脚本,进行占锁,默认超时就是我们制定的时间,不会自动续期;
- 如果我们未指定锁的超时时间,就使用 lockWatchdogTimeout = 30 * 1000 【看门狗默认时间】
4. 实战使用:
@Autowired
private RedissonClient redissonClient;@Transactional(rollbackFor = Exception.class)
@Override
public Boolean robNewOrder(Long driverId, Long orderId) {//抢单成功或取消订单,都会删除该key,redis判断,减少数据库压力if(!redisTemplate.hasKey(RedisConstant.ORDER_ACCEPT_MARK)) {//抢单失败throw new GuiguException(ResultCodeEnum.COB_NEW_ORDER_FAIL);}// 初始化分布式锁,创建一个RLock实例RLock lock = redissonClient.getLock(RedisConstant.ROB_NEW_ORDER_LOCK + orderId);try {/*** TryLock是一种非阻塞式的分布式锁,实现原理:Redis的SETNX命令* 参数:* waitTime:等待获取锁的时间* leaseTime:加锁的时间*/boolean flag = lock.tryLock(RedisConstant.ROB_NEW_ORDER_LOCK_WAIT_TIME,RedisConstant.ROB_NEW_ORDER_LOCK_LEASE_TIME, TimeUnit.SECONDS);//获取到锁if (flag){//二次判断,防止重复抢单if(!redisTemplate.hasKey(RedisConstant.ORDER_ACCEPT_MARK)) {//抢单失败throw new GuiguException(ResultCodeEnum.COB_NEW_ORDER_FAIL);}//修改订单状态//update order_info set status = 2, driver_id = #{driverId} where id = #{id}//修改字段OrderInfo orderInfo = new OrderInfo();orderInfo.setId(orderId);orderInfo.setStatus(OrderStatus.ACCEPTED.getStatus());orderInfo.setAcceptTime(new Date());orderInfo.setDriverId(driverId);int rows = orderInfoMapper.updateById(orderInfo);if(rows != 1) {//抢单失败throw new GuiguException(ResultCodeEnum.COB_NEW_ORDER_FAIL);}//记录日志this.log(orderId, orderInfo.getStatus());//删除redis订单标识redisTemplate.delete(RedisConstant.ORDER_ACCEPT_MARK);}} catch (InterruptedException e) {//抢单失败throw new GuiguException(ResultCodeEnum.COB_NEW_ORDER_FAIL);} finally {if(lock.isLocked()) {lock.unlock();}}return true;
}
使用了 Redisson 客户端来获取一个分布式锁,并且通过 tryLock()
方法来尝试获取锁。这个方法接受三个参数:
waitTime
:尝试获取锁的超时时间。leaseTime
:锁的自动续期时间。unit
:时间单位,例如TimeUnit.SECONDS
。
设置了 waitTime
和 leaseTime
,这表明您正在使用 tryLock()
方法的自动续期功能。这意味着即使原始的 leaseTime
到期,锁也会在后台自动续期,直到手动释放锁或者发生异常导致锁被自动释放。