当前位置: 首页 > news >正文

Redis分布式锁

一、背景

  • 与分布式锁相对应的是「单机锁」,我们在写多线程程序时,避免同时操作一个共享变量产生数据问题,通常会使用一把锁来「互斥」,以保证共享变量的正确性,其使用范围是在「同一个进程」中。
  • 单机环境下,我们常用 synchronized 或者 Lock 锁解决多线程并发访问产生的数据安全问题,但是如果是在集群环境,本地锁就会失效。
  • 为解决分布式场景下的并发问题, 就需要用到分布式锁。下面介绍下Redis分布式锁。

二、实现思路

1. 如何实现互斥?

为达到互斥,可以使用SETNX 命令,这个命令表示Set If Not Exists,即如果 key 不存在,才会设置它的值,否则什么也不做。
如:

	SETNX lock 1    //加锁DEL lock //释放锁

但是这样有个问题,当客户端 1 拿到锁后,如果发生下面的场景,就会造成「死锁」:
①程序处理业务逻辑异常,未释放锁
②拿到锁后进程挂了,没机会释放锁
这时,这个客户端就会一直占用这个锁,而其它客户端就「永远」拿不到这把锁了。

2. 如何避免死锁?

锁无法释放,产生「死锁」。那么我们给这个锁加个「租期」,让它在一定时间内如果一直没释放就过期,问题不就解决了。Redis支持这种语法,示例:

SETNX lock 1    // 加锁
EXPIRE lock 10  // 10s后自动过期

但这样真的没问题了吗?
No!

现在的操作,加锁、设置过期是 2 条命令,有没有可能只执行了第一条,第二条却「来不及」执行的情况发生呢?例如:
①SETNX 执行成功,执行 EXPIRE 时由于网络问题,执行失败
②SETNX 执行成功,Redis 异常宕机,EXPIRE 没有机会执行
③SETNX 执行成功,客户端异常崩溃,EXPIRE 也没有机会执行
总之,这两条命令不能保证是原子操作(一起成功),就有潜在的风险导致过期时间设置失败,依旧发生「死锁」问题。

如何解决?
Redis 2.6.12 之后,Redis 扩展了 SET 命令的参数,用这一条命令就可以执行上述两步操作:

	// 一条命令保证原子性执行SET lock 1 EX 10 NX

我们再来看分析下,它还有什么问题?
试想这样一种场景:
①客户端 1 加锁成功,开始操作共享资源
②客户端 1 操作共享资源的时间,「超过」了锁的过期时间,锁被「自动释放」
③客户端 2 加锁成功,开始操作共享资源
④客户端 1 操作共享资源完成,释放锁(但释放的是客户端 2 的锁)

这里存在两个严重的问题:
锁过期:客户端 1 操作共享资源耗时太久,导致锁被自动释放,之后被客户端 2 持有
释放别人的锁:客户端 1 操作共享资源完成后,却又释放了客户端 2 的锁

3. 锁被别人释放怎么办?

解决办法是:客户端在加锁时,设置一个只有自己知道的「唯一标识」进去。释放时,先判断这把锁是否是自己所有,是的话再进行释放。
这样涉及到两步操作:
①判断这把锁是否是自己所有;
②释放锁。
非原子性,也会出现并发问题,如何解决呢?Lua脚本
我们可以写好Lua脚本后交给Redis执行,脚本如下:

if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end

4. 锁过期怎么办?

方案:

  • 可能是我们评估操作共享资源的时间不准确导致的。可以「冗余」适量过期时间,降低锁提前过期的概率。
  • 加锁时,先设置一个过期时间,然后我们开启一个「守护线程」,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行「续期」,重新设置过期时间。

备注:个人根据项目的业务情况,考虑「锁过期」对业务的影响,看是否需要续期。

5. Redisson

Redisson 是一个 Java 语言实现的 Redis SDK 客户端,在使用分布式锁时,它就采用了「自动续期」的方案来避免锁过期,这个守护线程我们一般也把它叫做「看门狗」线程。
Ression原理
除此之外,这个 SDK 还封装了很多易用的功能:

  • 可重入锁
  • 乐观锁
  • 公平锁
  • 读写锁
  • Redlock

6. Redlock

之前分析的场景都是,锁在「单个」Redis 实例中可能产生的问题,并没有涉及到 Redis 的部署架构细节。
而我们在使用 Redis 时,一般会采用主从集群 + 哨兵的模式部署,这样做的好处在于,当主库异常宕机时,哨兵可以实现「故障自动切换」,把从库提升为主库,继续提供服务,以此保证可用性。
那当「主从发生切换」时,这个分布锁会依旧安全吗?

试想这样的场景:

  1. 客户端 1 在主库上执行 SET 命令,加锁成功
  2. 此时,主库异常宕机,SET 命令还未同步到从库上(主从复制是异步的)
  3. 从库被哨兵提升为新主库,这个锁在新的主库上,丢失了!
    Redis分布式锁集群问题
    为此,Redis 的作者提出一种解决方案,就是我们经常听到的 Redlock(红锁)。

Redlock 的方案基于 2 个前提:

不再需要部署从库和哨兵实例,只部署主库
但主库要部署多个,官方推荐至少 5 个实例
也就是说,想用使用 Redlock,你至少要部署 5 个 Redis 实例,而且都是主库,它们之间没有任何关系,都是一个个孤立的实例。
注意:不是部署 Redis Cluster,就是部署 5 个简单的 Redis 实例。

Redlock流程是这样的,一共分为 5 步:

  1. 客户端先获取「当前时间戳T1」
  2. 客户端依次向这 5 个 Redis 实例发起加锁请求(用前面讲到的 SET 命令),且每个请求会设置超时时间(毫秒级,要远小于锁的有效时间),如果某一个实例加锁失败(包括网络超时、锁被其它人持有等各种异常情况),就立即向下一个 Redis 实例申请加锁
  3. 如果客户端从 >=3 个(大多数)以上 Redis 实例加锁成功,则再次获取「当前时间戳T2」,如果 T2 - T1 < 锁的过期时间,此时,认为客户端加锁成功,否则认为加锁失败
  4. 加锁成功,去操作共享资源(例如修改 MySQL 某一行,或发起一个 API 请求)
  5. 加锁失败,向「全部节点」发起释放锁请求(前面讲到的 Lua 脚本释放锁)

上述过程有4个重点:

  1. 客户端在多个 Redis 实例上申请加锁
  2. 必须保证大多数节点加锁成功
  3. 大多数节点加锁的总耗时,要小于锁设置的过期时间
  4. 释放锁,要向全部节点发起释放锁请求

实际生产中,Redlock很少使用,所以就简单介绍到这里。

三、springboot项目实现Redis分布式锁

1. 依赖

	<!--Springboot依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.12.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test --><!--Springboot 测试依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>2.3.12.RELEASE</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis --><!--Springboot Redis依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.3.12.RELEASE</version><!--排除掉默认的 lettuce ,lettuce 在使用中存在偶尔连接超时问题--><exclusions><exclusion><artifactId>lettuce-core</artifactId><groupId>io.lettuce</groupId></exclusion></exclusions></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.2.0</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-commons --><!-- Redis 需要引入这个依赖,否则报错 --><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-commons</artifactId><version>2.3.9.RELEASE</version></dependency>

2. Redis配置类

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.JedisPoolConfig;import java.util.HashSet;
import java.util.Set;@Configuration
public class RedisConfig {@Value("${spring.redis.cluster.nodes:IP:Port}")private String clusterNodes;@Value("${spring.redis.cluster.max-redirects:3}")private int maxRedirects;@Value("${spring.redis.password:***}")private String password;@Value("${spring.redis.timeout:3000}")private int timeout;/*** 最大空闲数*/@Value("${spring.redis.maxIdle:100}")private int maxIdle;/*** 控制一个pool可分配多少个jedis实例*/@Value("${spring.redis.maxTotal:100}")private int maxTotal;/*** 最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制*/@Value("${spring.redis.maxWaitMillis:5000}")private int maxWaitMillis;/*** 最小空闲数*/@Value("${spring.redis.minIdle:5}")private int minIdle;/*** 连接的最小空闲时间 默认1800000毫秒(30分钟)*/@Value("${spring.redis.minEvictableIdleTimeMillis:300000}")private int minEvictableIdleTimeMillis;/*** 每次释放连接的最大数目,默认3*/@Value("${spring.redis.numTestsPerEvictionRun:3}")private int numTestsPerEvictionRun;/*** 逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1*/@Value("${spring.redis.timeBetweenEvictionRunsMillis:30000}")private int timeBetweenEvictionRunsMillis;/*** 是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个*/@Value("${spring.redis.testOnBorrow:true}")private boolean testOnBorrow;/*** 在空闲时检查有效性, 默认false*/@Value("${spring.redis.testWhileIdle:true}")private boolean testWhileIdle;@Beanpublic JedisPoolConfig getJedisPoolConfig() {JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();// 最大空闲数jedisPoolConfig.setMaxIdle(maxIdle);// 最小空闲数jedisPoolConfig.setMinIdle(minIdle);// 连接池的最大数据库连接数jedisPoolConfig.setMaxTotal(maxTotal);// 最大建立连接等待时间jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);// 逐出连接的最小空闲时间 默认1800000毫秒(30分钟)jedisPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);// 每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3jedisPoolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);// 逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1jedisPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);// 是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个jedisPoolConfig.setTestOnBorrow(testOnBorrow);// 在空闲时检查有效性, 默认falsejedisPoolConfig.setTestWhileIdle(testWhileIdle);return jedisPoolConfig;}/*** Redis集群的配置** @return RedisClusterConfiguration*/@Beanpublic RedisClusterConfiguration redisClusterConfiguration() {RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();// Set<RedisNode> clusterNodesString[] serverArray = clusterNodes.split(",");Set<RedisNode> nodes = new HashSet<>();for (String ipPort : serverArray) {String[] ipAndPort = ipPort.split(":");nodes.add(new RedisNode(ipAndPort[0].trim(), Integer.parseInt(ipAndPort[1])));}redisClusterConfiguration.setClusterNodes(nodes);redisClusterConfiguration.setMaxRedirects(maxRedirects);redisClusterConfiguration.setPassword(RedisPassword.of(password));return redisClusterConfiguration;}/*** redis连接工厂类** @return JedisConnectionFactory*/@Beanpublic JedisConnectionFactory jedisConnectionFactory() {// 集群模式return new JedisConnectionFactory(redisClusterConfiguration(), getJedisPoolConfig());}@Beanpublic RedisTemplate<String, String> poolRedisTemplate() {RedisTemplate<String, String> template = new RedisTemplate<>();template.setConnectionFactory(jedisConnectionFactory());// 如果不配置Serializer,那么存储的时候缺省使用String,如果用User类型存储,那么会提示错误User can't cast to String!StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();template.setKeySerializer(stringRedisSerializer);// hash的key也采用String的序列化方式template.setHashKeySerializer(stringRedisSerializer);Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper mapper = new ObjectMapper();mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(mapper);template.setKeySerializer(stringRedisSerializer);template.setHashKeySerializer(stringRedisSerializer);template.setValueSerializer(jackson2JsonRedisSerializer);template.setHashValueSerializer(new StringRedisSerializer());template.setDefaultSerializer(jackson2JsonRedisSerializer);template.setEnableDefaultSerializer(true);template.afterPropertiesSet();return template;}
}

3. Redis 分布式锁

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;@Service
public class RedisService {@Autowired@Qualifier("poolRedisTemplate")private RedisTemplate<String, String> stringRedisTemplate;public String getStr(String key) {return stringRedisTemplate.opsForValue().get(key);}/*** key 不存在 则进行设置* <p>* 原子性操作** @param k       键* @param v       值* @param timeout 超时时间* @param unit    超时时间的单位* @return key存在返回false,设置失败*/public Boolean setIfAbsent(String k, String v, long timeout, TimeUnit unit) {return stringRedisTemplate.opsForValue().setIfAbsent(k, v, timeout, unit);}/*** 加分布式锁** @param key     锁* @param timeout 超时时间,单位:秒* @return 空串 表示加锁失败, uuid 表示加锁成功,后续uuid要作为解锁的标识*/public String tryLock(String key, long timeout) {//释放锁时要根据uuid判断是否是自己的锁,防止释放其他人的锁
//        String uuid = System.currentTimeMillis() + "";String uuid = UUID.randomUUID().toString();Boolean tryLock = setIfAbsent(key, uuid, timeout, TimeUnit.SECONDS);if (tryLock) {return uuid;}//加锁失败,返回 空串return "";}/*** “判断值与旧值是否相等,相等则删除键” 的 Lua 脚本,保证原子性操作*/private static final String SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";/*** 释放分布式锁** @param key      锁* @param oldValue 加锁时放入的标识* @return 返回释放锁成功失败*/public Boolean unLock(String key, String oldValue) {List<String> keys = new ArrayList<>();keys.add(key);List<String> args = new ArrayList<>();//这里需要加下引号,原因:stringRedisTemplate获取的值带引号(即redis.call('get', KEYS[1]) 的结果带引号),而ARGV[1]不带引号,比较时会出现不等的问题args.add("\"" + oldValue + "\"");Long result = stringRedisTemplate.execute((RedisCallback<Long>) connection -> {Object nativeConnection = connection.getNativeConnection();// 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行// 集群模式if (nativeConnection instanceof JedisCluster) {return (Long) ((JedisCluster) nativeConnection).eval(SCRIPT, keys, args);}// 单机模式else if (nativeConnection instanceof Jedis) {return (Long) ((Jedis) nativeConnection).eval(SCRIPT, keys, args);}return 0L;});return result == 1L;}}

4. 测试类

import com.example.demo.redis.RedisService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.StringUtils;@SpringBootTest
public class RedisTest {@Autowiredprivate RedisService redisService;@Testpublic void test() {String key = "test-lock-1235637";String uid = redisService.tryLock(key, 10000);if(StringUtils.isEmpty(uid)){return;}System.out.println(uid);String uid2 = redisService.tryLock("test-lock-123", 3000);System.out.println("重新获得锁是否成功:" + !StringUtils.isEmpty(uid2));try {//执行业务代码System.out.println("111111111");} finally {Boolean unlockSuccess = redisService.unLock(key, uid);System.out.println("解锁结果:" + unlockSuccess);System.out.println(redisService.getStr(key));}}
}

四、springboot项目中Redission的简单使用

1. 依赖

<!--redission相关依赖--><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.16.0</version></dependency>

2. 配置类

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RedissionConfig {@Beanpublic RedissonClient getRedisson() {
//        //单机
//        Config config = new Config();
//        //单机模式  依次设置redis地址和密码
//        config.useSingleServer().
//                setAddress("redis://" + host + ":" + port);
//        RedissonClient redisson = Redisson.create(config);
//
//        //主从
//        Config config = new Config();
//        config.useMasterSlaveServers()
//                .setMasterAddress("redis://127.0.0.1:6379")
//                .addSlaveAddress("redis://127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")
//                .addSlaveAddress("redis://127.0.0.1:6399");
//        RedissonClient redisson = Redisson.create(config);
//
//
//        //哨兵
//        Config config = new Config();
//        config.useSentinelServers()
//                .setMasterName("mymaster")
//                .addSentinelAddress("redis://127.0.0.1:26389", "127.0.0.1:26379")
//                .addSentinelAddress("redis://127.0.0.1:26319");
//        RedissonClient redisson = Redisson.create(config);//集群  ,,,,,Config config = new Config();config.useClusterServers().setScanInterval(2000) // cluster state scan interval in milliseconds.addNodeAddress("redis://ip:port", "redis://ip:port").setPassword("***");RedissonClient redisson = Redisson.create(config);return redisson;}
}

3. 测试类

import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;@SpringBootTest
public class RedissionTest {@Resourceprivate RedissonClient redisson;@Testpublic void testRedission() {System.out.println("开始执行");String lockKey = "123456";RLock lock = redisson.getLock(lockKey);System.out.println("获取锁");try {//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean b = lock.tryLock(1, 10, TimeUnit.SECONDS);System.out.println("是否获取锁:" + b);//执行业务逻辑System.out.println("执行业务逻辑....");Thread.sleep(3000);} catch (Exception e) {System.out.println("系统异常,稍后重试....");} finally {//删除锁lock.unlock();System.out.println("解锁成功");}}
}

五、参考文献

https://mp.weixin.qq.com/s/2et43aJT6qjBsJ8Z9pcZcQ

http://www.lryc.cn/news/6096.html

相关文章:

  • 京东前端经典面试题整理
  • django+mysql实现一个简单的web登录页面
  • python cartopy手动导入地图数据绘制底图/python地图上绘制散点图:Downloading:warnings/散点图添加图里标签
  • JavaScript中常用的数组方法
  • 磁疗为什么“没效果”?原来真相是这样!
  • 【直击招聘C++】5.1函数模板
  • 谈谈Java多线程离不开的AQS
  • 国际化语言,多语言三种方式
  • C++——哈希3|位图
  • 75 error
  • ESP-C3入门8. 连接WiFi并打印信息
  • 使用python将EXCEL表格中数据转存到数据库
  • 【C++】类和对象(三)
  • vTESTstudio - VT System CAPL Functions - General/Trigger Function
  • IDEA 快捷键
  • 2023新华为OD机试题 - 入栈出栈(JavaScript) | 刷完必过
  • 微信公众号扫码授权登录思路
  • 数据结构与算法基础-学习-10-线性表之顺序栈的清理、销毁、压栈、弹栈
  • Hazel游戏引擎(005)
  • 牛客网Python篇数据分析习题(四)
  • 盲盒如何创业?
  • 第1集丨Java中面向对象相关概念汇总
  • 高性能(二)
  • Allegro如何实现同一个屏幕界面分屏显示操作指导
  • 前后端一些下载与配置(第二篇 第10天过后)nuxt banner redis 短信服务
  • OSG三维渲染引擎编程学习之四十八:“第五章:OSG场景渲染” 之 “5.6 多重纹理映射”
  • 对Node.js 的理解?优缺点?应用场景?
  • Bean的生命周期
  • Python学习-----函数2.0(函数对象,名称空间,作用域-->全局变量与局部变量)
  • Java中Json字符串和Java对象的互转