当前位置: 首页 > news >正文

缓存和分布式锁笔记

缓存

开发中,凡是放入缓存中的数据都应该指定过期时间,使其可以在系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致 问题。
redis作为缓存使用redisTemplate操作redis

分布式锁的原理和使用

分布式加锁:本地锁,只能锁住当前进程,所以我们需要分布式锁

分布式锁演进

基本原理:多个操作用户操作,抢占锁,获取到锁的用户执行业务,释放锁。

分布式锁演进阶段1:

redis获取锁:setnx(“lock”,1111) -->获取到锁->执行业务->删除锁->结束,未获取到锁的等待重试

代码:

public Map<String, List<Catelog2Vo>>  getCatalogJsonFromDbWithRedisLock() {//1. 分布式锁 去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");if(lock){//加锁成功Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();redisTemplate.delete("lock");return dataFromDb;}else {//加锁失败  重试  synchronizereturn getCatalogJsonFromDbWithRedisLock();//自旋的方式}
}

问题:

setnx占好了位,业务代码异常或者程序在页面过程中宕机,没有执行删除锁逻辑,造成死锁

解决:

设置锁的自动过期,即使没有删除,会自动删除

分布式锁演进阶段2:

redis获取锁:setnx(“lock”,1111) -->获取到锁->设置过期时间->执行业务->删除锁->结束,未获取到锁的等待重试

代码:

public Map<String, List<Catelog2Vo>>  getCatalogJsonFromDbWithRedisLock() {//1. 分布式锁 去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");if(lock){//加锁成功//2. 设置过期时间redisTemplate.expire("lock",30,TimeUnit.SECONDS);Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();redisTemplate.delete("lock");return dataFromDb;}else {//加锁失败  重试  synchronizereturn getCatalogJsonFromDbWithRedisLock();//自旋的方式}
}

问题:

setnx设置好,正要去设置过期时间,宕机,死锁。

解决:

设置过期时间和占位必须是原子的,redis支持使用setnx ex命令

分布式锁演进阶段3:

redis获取锁:setnxex(“lock”,1111,10s) -->获取到锁->执行业务->删除锁->结束,未获取到锁的等待重试

代码:

public Map<String, List<Catelog2Vo>>  getCatalogJsonFromDbWithRedisLock() {//1. 分布式锁 去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",300,TimeUnit.SECONDS);if(lock){//加锁成功//2. 设置过期时间//            redisTemplate.expire("lock",30,TimeUnit.SECONDS);Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();redisTemplate.delete("lock");return dataFromDb;}else {//加锁失败  重试  synchronizereturn getCatalogJsonFromDbWithRedisLock();//自旋的方式}}

问题:

删除锁直接删除?由于业务时间很长,锁自己过期了,直接删除,有可能把别人正在持有的锁删除了。

解决:

占锁的时候,值指定为uuid,每个人匹配的是自己的锁才删除

分布式锁演进阶段4:

redis获取锁:setnxex(“lock”,uuid,10s) -->获取到锁->执行业务->如果当前锁的值是之前的uuid的锁–>删除锁->结束,未获取到锁的等待重试

代码:

public Map<String, List<Catelog2Vo>>  getCatalogJsonFromDbWithRedisLock() {String uuid = UuidUtils.generateUuid().toString();//1. 分布式锁 去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);if(lock){//加锁成功//2. 设置过期时间//            redisTemplate.expire("lock",30,TimeUnit.SECONDS);Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();String lockValue = redisTemplate.opsForValue().get("lock");if(lockValue.equals(uuid)) {//删除自己的锁redisTemplate.delete("lock");}return dataFromDb;}else {//加锁失败  重试  synchronizereturn getCatalogJsonFromDbWithRedisLock();//自旋的方式}}

问题:

如果正好判断当前值,正要删除锁的时候,锁已经过期别人已经设置到了新的值,删除的还是别人的锁

解决:

删除锁必须保证原子性,使用redis+lua脚本

分布式锁演进阶段5:

redis获取锁:setnxex(“lock”,uuid,10s) -->获取到锁->执行业务->脚本解锁保证原子性->结束,未获取到锁的等待重试

代码:

    public Map<String, List<Catelog2Vo>>  getCatalogJsonFromDbWithRedisLock() {String uuid = UuidUtils.generateUuid().toString();//1. 分布式锁 去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);if(lock){//加锁成功Map<String, List<Catelog2Vo>> dataFromDb = null;try {dataFromDb = getDataFromDb();}finally {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";//删除锁Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);}//            String lockValue = redisTemplate.opsForValue().get("lock");//            if(lockValue.equals(uuid)) {//                //删除自己的锁//                redisTemplate.delete("lock");//            }return dataFromDb;}else {//加锁失败  重试  synchronizereturn getCatalogJsonFromDbWithRedisLock();//自旋的方式}}String script = "if redis.call"('get',KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。更难的事情,锁的自动续期

可重入锁(Reentrant Lock)

某个线程已经获得某个锁,可以再次获取锁而不会出现死锁,再次获取锁的时候会判断当前线程是否是已经加锁的线程,如果是对锁的次数+1,释放锁的时候加了几次锁,就需要释放几次锁。

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

@ResponseBody@GetMapping("/hello")public String hello(){//1.获取一把锁,只要锁的名字一样,就是同一把锁RLock lock = redisson.getLock("my-lock");//2.加锁lock.lock();//阻塞式等待 默认加的锁是30s//1. 锁的自动续期 如果业务超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期会删除//2.加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s后自动删除try {System.out.println("加锁成功,执行业务"+Thread.currentThread().getId());Thread.sleep(30000);}catch (Exception e){e.printStackTrace();}finally {//3.解锁System.out.println("释放锁"+Thread.currentThread().getId());lock.unlock();}return "hello";}

问题:负责存储分布式锁的Redission节点宕机后,这个锁正好处于锁住的状态时,这个锁会出现锁死的状态

解决:reddison内部提供了一个监控锁的看门狗,作用是在redission实例被关闭前,不断的延长锁的有效期,默认情况下,看门狗的检查锁的超时时间是30秒钟,可以通过Config.lockWatchdogTimeout。还通过加锁的方法提供了leaseTime的参数来指定加锁的时间,超过时间后锁便自动解开了。

读写锁

基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

@GetMapping("/write")@ResponseBodypublic String writeValue(){String s="";RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");RLock lock = readWriteLock.writeLock();try {//改数据加写锁 读数据加读锁lock.lock();s = UUID.randomUUID().toString();Thread.sleep(30000);redisTemplate.opsForValue().set("writeValue",s);}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}return s;}@GetMapping("/read")@ResponseBodypublic String readValue(){String s="";RReadWriteLock writeLock = redisson.getReadWriteLock("rw-lock");//加读锁Lock rLock = writeLock.readLock();try {rLock.lock();s = redisTemplate.opsForValue().get("writeValue").toString();Thread.sleep(30000);}catch (Exception e){e.printStackTrace();}finally {rLock.unlock();}return s;}

结论:

  • 保证一定可以读到最新的数据,修改期间,写锁是一个排他锁(互斥锁,独享锁).读锁是一个共享锁
  • 写锁没有释放 读就必须等待
  • 读 + 读:相当于无锁并发读,只会的redis中记录好,所有当前的读锁,他们都会同时加锁成功
  • 写 + 读:等待写锁释放
  • 写 + 写:阻塞方式
  • 读 + 写:有读锁也需要等待
  • 只要有写锁的存在,都必须等待

信号量

基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
这里以停车位为例,当停车时,获取一个信号量,获取到信号量之后进行停车,车开走之后可以再释放一个信号量

/*** 车库停车* @return* @throws InterruptedException* 信号量 可以用作分布式限流*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {RSemaphore park = redisson.getSemaphore("park");park.acquire();//获取一个信号量,获取一个信号量占一个车位return "ok";
}@GetMapping("/go")
@ResponseBody
public String go(){RSemaphore park = redisson.getSemaphore("park");park.release();//释放一个车位return "ok";
}

闭锁

原理:闭锁相当于一扇门,在闭锁到达结束状态之前,这扇门一直是关闭着的,没有任何线程可以通过,当到达结束状态时,这扇门才会打开并容许所有线程通过。它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,初始化为一个正式,正数表示需要等待的事件数量。countDown方法递减计数器,表示一个事件已经发生,而await方法等待计数器到达0,表示等待的事件已经发生。CountDownLatch强调的是一个线程(或多个)需要等待另外的n个线程干完某件事情之后才能继续执行。

应用场景

10个运动员准备赛跑,他们等待裁判一声令下就开始同时跑,当最后一个人通过终点的时候,比赛结束。10个运动相当于10个线程,这里关键是控制10个线程同时跑起来,还有怎么判断最后一个线程到达终点。可以用2个闭锁,第一个闭锁用来控制10个线程等待裁判的命令,第二个闭锁控制比赛结束。

示例

5个班放学,当5个班的同学都走完之后,锁门

@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {RCountDownLatch door = redisson.getCountDownLatch("door");door.trySetCount(5);door.await();return "放假了";
}@GetMapping("/gogogo/{id}")
@ResponseBody
public String gogogo(@PathVariable("id") Long id){RCountDownLatch door = redisson.getCountDownLatch("door");door.countDown();//计算减一return id+"班的人走完了";
}

数据一致性问题

  • 双写模式
  • 失效模式
  • 解决方案
    • 无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。
    • 如果是用户维度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加 上过期时间,每隔一段时间触发读的主动更新即可
    • 如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。
    • 缓存数据+过期时间也足够解决大部分业务对于缓存的要求。
    • 通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心 脏数据,允许临时脏数据可忽略);
    • 总结:
      1. 放入缓存的数据本不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每天拿到当前最新数据即可。
      2. 不应该过度设计,增加系统的复杂性 • 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。

Spring Cache

  • Cache 接口为缓存的组件规范定义,包含缓存的各种操作集合; Cache 接 口 下 Spring 提 供 了 各 种 xxxCache 的 实 现 ; 如 RedisCache , EhCacheCache , ConcurrentMapCache 等
  • 每次调用需要缓存功能的方法时,Spring 会检查检查指定参数的指定的目标方法是否已 经被调用过;如果有就直接从缓存中获取方法调用后的结果,如果没有就调用方法并缓 存结果后返回给用户。下次调用直接从缓存中获取。
  • 使用 Spring 缓存抽象时我们需要关注以下两点;
    1. 确定方法需要被缓存以及他们的缓存策略
    2. 从缓存中读取之前缓存存储的数据

参考:缓存和分布式锁

http://www.lryc.cn/news/151194.html

相关文章:

  • React笔记(七)Antd
  • 无涯教程-Android - RadioButton函数
  • kafka如何避免消费组重平衡
  • 浅谈一下企业信息化管理
  • 北京APP外包开发团队人员构成
  • Node基础and包管理工具
  • 【python使用 Pillow 库】缩小|放大图片
  • 解决Ubuntu 或Debian apt-get IPv6问题:如何设置仅使用IPv4
  • Xubuntu16.04系统中解决无法识别exFAT格式的U盘
  • Pygame中Trivia游戏解析6-1
  • idea中创建springboot项目显示Spring Initializr Error
  • VScode 国内下载源 以及 nvm版本控制器下载与使用
  • GO|经典错误之回车与\n
  • 【MATLAB第71期】基于MATLAB的Abcboost自适应决策树多输入单输出回归预测及多分类预测模型(更新中)
  • ARM编程模型-内存空间和数据
  • leetcode原题: 最大数
  • docker 是什么
  • 基于Gin框架的HTTP接口限速实践
  • WSL中为Ubuntu和Debian设置固定IP的终极指南
  • axios+vite配置反向代理踩坑记录
  • Spring IOC的理解
  • 2023年京东箱包行业数据分析(京东数据运营)
  • 对称加密 非对称加密 AC认证 https原理
  • 如何在PyQt应用程序中使用Qt Designer和Pyuic工具?
  • 【云计算•云原生】5.云原生之初识DevOps
  • 20230830工作心得:巧用标记位和For循环遍历
  • AUTOSAR规范与ECU软件开发(实践篇)7.9 MCAL模块配置方法及常用接口函数介绍之Can的配置
  • SpringBoot整合websockt实现消息对话
  • MIME类型(Multipurpose Internet Mail Extensions,多用途互联网邮件扩展类型)MultipartFile 多媒体文件上传
  • 数据库(MySQL)的存储过程