常见问题整理
DevOps 和 CICD
DevOps
全称Development & Operation
一种实现开发和运维一体化的协同模式,提供快速交付应用和服务的能力
用于协作:开发,部署,质量测试 整体生命周期工作内容,最终实现持续继承,持续部署,持续交付
DevOps 平台组成部分:
代码托管 git,svn
项目管理:jira 禅道 Teambition
运维平台:腾讯蓝鲸 /自主研发平台
持续交付:jenkins /gitlab
DevOps 是理念,不是具体某个工具
部分观点:Development & operations collaboration Means faster and smaller releases
CICD
CI-Continus integration
多名开发人员在开发不同代码过程中,可以频繁的将代码行合并到一起并相互不影响工作
CICD:
行为规范循环操作流:
Plan—>code—>build—>test—>release—>deploy—>operate—>monitor—>plan->….
Git 分布式持续集成工具
发布流程:
基于master 拉去分支development
基于开发分支开发,单元测试,sonar质量扫描
Code review
合并到master上,进行build,单元测试
Build docker image
部署到test env
Smoke test
进行QA系统测试(压力测试&功能性测试)
System test
预发布,QA测试,压测
灰度发布
生产100%发布
Linux
free -m
rz -y 把文件上传到Linux中,如果有相同文件名的文件,会将其覆盖。
Rz -e 把文件上传到Linux中,如果有相同文件名的文件,不会将其覆盖,而是会在所上传文件后面加上 .序号
Redis
Redis 击穿 穿透 雪崩
雪崩:Key短时间大量过期,redis服务不可用
击穿:redis中没有数据,直接访问到数据库
穿透:redis和数据库中都没有,例如恶意大量访问
开发规范:防止击穿,雪崩
Public void getValue(String key){
//先判断访问的key是否是黑名单,如果是拦截---这段逻辑,根据需求,可有可无
//判断访问的key是否存在于布隆过滤器白名单中,存在,才放行---这段逻辑,根据需求,可有可无// 用key从redis中获取//如果不存在,则进入到锁代码块中(获取锁){// 双重校验,再从redis中获取
// 如果不存在,到数据库中获取Object value = getValueFromDB();
If(value!=null){//将value保存到redis中,并设置随机过期时间(避免雪崩)
}
对于value不存在的,要要在redis中缓存空对象(但是要考虑不存在的key是否很多)
避免不存在的key太多,可以额外用布隆过滤器维护一套白名单,再请求以前,先过下过滤器
Return value;
}
}
防止服务器不可用造成的雪崩
集群模式,哨兵模式,保证高可用
针对穿透:
方案1:增加参数校验:判断key是否存在,存在才让程序往下走
方案2:Redis中缓存空对象:对于不存在的key,也可以在redis中保存,这样也能将这些不存在的key拦截到数据库以外;
方案3:创建布隆过滤器,可以将存在的key放到布隆过滤器中,类似于白名单功能
场景:当一个key以前很冷门,突然变成爆款
对于非常热门的key,可以设置永不过期(或者设计过期时间很长)
可以通过nginx 内置lua脚本或者其他方式计算哪些key访问次数,将超过阈值的key,自动调正为热门key
单独一个线程统计访问量,对于爆款的key,自动将这些key更新为永不过期;不热门的key,如果是永不过期的,自动更新为有有效期
实现方案要遵循:大道至简原则,能代码层实现的,不要上升到组件,解决方案也要遵循先代码层,再组件层,网络层递进解决
用redis记录上亿用户的签到问题
采用bitmap
存储的是二进制,0 /1
String类型=512M
1type=8个bit
512M=51210241024*8=4,294,967,296==42亿
也就是说可以存储42亿用户的bit格式的数据信息
例如统计30天的签到信息
可以以天数为维度创建30个bitmap
01day: 0 0 0 0 0 ………,每一个bit 对应一个用户,这样每一天都可以存储上亿的用户,登录就将相应的下标标注为1
02day 又是一个新的bitmap
每一个bitmap 的过期时间是30天
也可以以用户为维度,但是这种要求用户比较少的场景
如何统计上亿用户的共同好友
假设有两个集合 set1 和 set2
redis> SADD set1 "a"
redis> SADD set1 "b"
redis> SADD set1 "c"
redis> SADD set2 "b"
redis> SADD set2 "c"
redis> SADD set2 "d"# 计算交集
redis> SINTER set1 set2
# 输出:
# 1) "b"
# 2) "c"
但是如果上亿的用户好友,直接都存储到内存中,太耗费资源了
我们可以将好友的数据存到的数据库中;
再结合Neo4j(本身就是用于社交数据结构)计算共同好友
或者通过大数据套件(Hbase+Hadoop)来进行海量离线计算
如何避免重复下单
前端,点完下单,下单按钮不可用
后端:自定义注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RepeatSubmit {enum Type{PARAM,TOKEN}Type limitType() default Type.PARAM;long lockTime() default 5;
}
在需要保证幂等性的接口上,增加此注解
注解切面中:
将请求URL+用户TOKEN+关键词(例如sumitOrder),作为key保存到redis中,且按照locktime设置过期时间,且采用setNX的方式,一旦保存失败,则注解拦截返回;
Redis6为什么引入多线程
只是在网络IO层面上实现了多路复用,但是redis 实际的命令执行依然是单线程的
热点key问题
方案1:本地缓存—适合于提前预知哪些是热点key
方案2:请求分摊,将热key拆分成多个子key
方案3:限流
Import com.google.common.util.concurrent.RateLimiter;
Public class RateLimiterExmaple{Private RateLimiter rateLimiter = RateLimiter.create(10);Public String accessResource(String key){If(rateLimiter.tryAcquire()){Return RedisClient.get(key);}}
}
增加监测:
监控和报警:
通过设计监控来实时观测redis的使用情况,及时应对热key问题,一般对接热点探测系统
大key如何解决
Redis的大key:
指单个key所对应的数据过大,一般单个key超过10kb,就会被认为是大key
导致的问题:
1.网络延迟增加:传输数据需要更多时间
2. 阻塞Redis性能:大key的操作会阻塞Redis单线程的性能
3. 内存不足和导致OOM:大key可能会占用过多内存,影响其它部分的缓存使用
解决办法:
1. 分拆大key
Big list:list1,list2,list3,。。。listN
Big hash:可以将数据分段存储,比如一个大的key,假设存了一百万的用户数据,可以拆分成200个key,每个key下面存储5000个用户
2. 压缩数据
存储之前将数据进行压缩
public class DataCompressor {public byte[] compress(String data) throws IOException {ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length());GZIPOutputStream gzip = new GZIPOutputStream(bos);gzip.write(data.getBytes(StandardCharsets.UTF_8));gzip.close();return bos.toByteArray();}public void storeCompressKey(String key,String data){try{byte[] compressData = compress(data);RedisClient.set(key.getBytes(StandardCharsets.UTF_8),compressData);}catch (IOException e){}}}
3. 开启惰性删除的配置
当更新或删除大key的时候,使用惰性删除(lazyfree-lazy-expire yes)来避免阻塞整个redis
如果大key过期,redis会将过期的key同步做删除,如果不开启惰性删除,会阻塞前端发过来的命令的。
开启后,会对redis阻塞会有缓解
4. 使用SCAN 代替KEYS
在处理集合的时候,使用SCAN命令遍历key 而不是keys,避免一次性加载所有数据
public class RedisScanner {private Jedis jedis;public RedisScanner(Jedis jedis){this.jedis = jedis;}public void scanLargeKey(String largSetKey){String cursor = ScanParams.SCAN_POINTER_START;ScanParams scanParams = new ScanParams().count(100);//分批次取100个do{ScanResult<String> scanResult = jedis.sscan(largSetKey,cursor,scanParams);List<String> results = scanResult.getResult();//do somethingcursor = scanResult.getStringCursor();}while (!cursor.equals(ScanParams.SCAN_POINTER_START));}
}
数据库和redis数据不一致问题
在使用redis缓存作为数据库加速层时,可能会遇到缓存和数据库双写不一致的问题。这种不一致通常发生在更新操作时,就是写入数据库,但未及时更新缓存或者缓存更新后数据库未同步,导致读取到过时的数据。
解决办法:
- 缓存Aside模式(Cache Aside Pattern)–一般公司都是用这种
这个方案只是改善,并不能彻底解决
读操作:先存缓存读取数据,如果缓存未命中,再从数据库读取数据,然后将数据写入缓存
写操作:先更新数据库,再删除缓存。这种方式会导致缓存短暂失效,下一次读取会动数据库加载到缓存中。
因为这种不是实时一致的,所以就需要设置一个合理的过期时间
- 更新缓存策略
使用分布式锁,确保只有一个更新操作在同时进行
锁内:获取锁:更新数据库,更新缓存,释放锁
更新操作,串行化—一般不用
- 采用canal中间件,异步更新缓存
Alibaba:通过监听数据库更新的binlog日志,去异步更新缓存
Redis中保证实时一致性,代价太大
- 利用过期时间
利用合理的过期时间可以缓解不一致带来的影响,通过TTL确保数据在一定时间后更新
key 过期后如何删除的
Redis中,key的过期和删除不是实时的,redis采用了一种惰性删除 lazy deletion 和定期删除 periodic expiration 相结合的机制来处理过期键
1. 惰性删除 lazy deletion
机制:当客户访问一个键的时候,Redis会检查这个键是否已经过期,如果键过期了,那么它会在被访问时被惰性删除。
特点:这种机制表示一个键过期了,只要它没有被访问,Redis是不会主动删除它的。因此,惰性删除并不保证过期键会立即小时。
2. 定期删除
机制:Redis会周期性地随机抽取一部分设置了过期时间键进行检查,并删除已过期的键。
频率:定期删除由Redis的后台执行,大约每隔100毫秒进行一次扫描
key 和value规范
Key要短小,不要超过256字节
使用命名空间,用:分隔,例如:user:1001:profile
避免热key:确保key的分布均匀,避免单一key访问压力太大,可能需要进行分片处理
存储value的时候,避免值太大
- 分片存储: 对于需要存储大量数据的value,可以考虑拆分成多部分存储,可以降低单个操作的复杂度
- 合理设置blob:如果需要存储blob数据,考虑放在外部存储引擎中,只将引用或索引保存在redis中
- 将数据进行压缩
使用redis要点
-
优化数据结构,拒绝大key,避免热key
-
设计良好的命名空间,用:命名空间
-
合理的过期时间设置
-
持久化策略,混合持久化机制 RDB +AOF
定期备份数据到其他机器上 -
监控和报警
实时监控:使用Redis自带的INFO命令,慢查询日志或者外部监控工具如Prometheus,Grafana,实时监控Redis性能指标
设置警报:针对内存使用,链接数,请求延迟等关键指标,设置自动警报 -
集群模式,主从架构和哨兵,不要用单点模式
减少网络延迟:确保redis和应用程序在同一个数据中心,降低网络演延迟;
批量操作:使用批量操作减少网络往返次数 -
参数调优:
调整Redis配置:例如maxmemory,maxclients,timeout等参数需根据应用实际情况调整
慢查询日志:启动慢查询日志 -
客户端重试策略:
合理的重试机制:在网络抖动或者瞬间故障情况下,使用适当的重试机制和熔断策略 -
访问控制和安全:
为Redis 服务设置访问密码,增加安全性
使用ACL:从redis6.0开始,利用ACL进行访问控制管理,细化权限控制 -
代码实现时规避雪崩,穿透,击穿问题
Redis分布式锁
JVM 本地锁 synchronized,ReentrantLock
分布式锁:一般流程
a. 加锁 b.执行业务 c.删除锁
- 问题: 如果用户加锁后,因为特殊原因,没有释放锁,造成死锁—解决:锁过期时间
- 但是加锁setNX 和设置过期时间是 两个命令,并不是原子性的,还是会出现,加锁以后,宕机了,没有对key设置上过期时间
- –解决:采用setNX加强版命令,在设置的时候,直接设置上过期时间,让set和过期命令在一个原子里,例如set key1 value1 ex 10nx
- 进一步分析,还有问题,如果用户A加锁后,执行业务的时候,锁过期了,用户B请求过来,也加上锁了。此时,用户A 执行完,删除锁,把用户B加的锁给删除了
- 解决办法: 对锁加一个唯一标识,每个请求会生成唯一ID,加锁的时候,让ID作为锁的值; 当删除锁的时候,判断,锁的value是否和当前请求的ID一致,一致才可以删除。
- 还是有问题,场景:假设用户A,判断完当前锁是自己加的,正当准备删除锁的时候,CPU调度其他的了,而这个时候,正好锁也到期了,自己就没了;此时用户B过来,加上了用户B的锁。然后CPU又恢复调度到了用户A删除锁,这样就又把用户B的锁给删除了。
- 所以解决办法: 要保证 加锁—执行业务—判断锁ID—删除锁 都要在一个原子里才可以。
- 解决办法:Lua 脚本
public boolean test(String phoneId){String lockKey = "phone-"+phoneId;String id = String.valueOf(idGenerator.nextId());try{String result = jedis.set(lockKey,id,SetParams.setParams().nx().ex(5));if(!"OK".equals(result)){return false;}Integer stock = iphoneService.getStock(phoneId);if(stock<1){return false;}stock = stock - 1;iphoneService.updateStock(phoneId,stock);}finally{//删除锁String script = "local value = redis.call('get',KEYS[1])"+"if(value ==ARGV[1]) then"+"redis.call(‘del’,KEYS[1])"+"return 'OK'"+"else"+"return nil"+"end";List<String> KEYS = Stream.of(lockKey).toList();List<String> ARGV = Stream.of(id).toList();Jedis.eval(script,KEYS,ARGV);}return true;}
对分布式完善的框架Redisson,不仅可以解决以上提到的所有问题,还能提供
自动续期
读写锁
公平锁
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.16.2</version>
</dependency>
application.properties配置文件spring.redisson.address=redis://127.0.0.1:6379
spring.redisson.password=null
Lock
@Autowired
private RedissonClient redissonClient;
//获取锁
RLock lock = null;
try {lock = redisson.getLock("Lxlxxx_Lock");// 加锁lock.lock();// 或者使用lock.lock(10, TimeUnit.SECONDS);
} catch (Exception e) {e.getStackTrace();
} finally {// 解锁if (lock != null) {lock.unlock();}System.out.println("Finally,释放锁成功");
}
//获取锁
RLock lock = null;
try {// 等待2秒,上锁以后10秒自动解锁if (lock.tryLock(2, 10, TimeUnit.SECONDS)) {//执行业务逻辑} else {System.out.println("未获取到锁");}
} catch (Exception e) {e.getStackTrace();
} finally {// 解锁if (lock != null) {lock.unlock();}System.out.println("Finally,释放锁成功");
}
Lock
①. lock.lock()方法:会尝试获取锁,如果锁被其他客户端持有,则当前客户端会阻塞,直到获取到锁为止。
②. lock.lock(long leaseTime, TimeUnit unit) :跟无参数类似,多了锁的持有时间,单位由unit参数指定。在这个时间内,如果锁的持有者没有主动释放锁,Redisson会自动释放锁,以避免因为线程崩溃等原因导致的死锁。
TryLock
①. 无参数版本:lock.tryLock()。这个版本的tryLock()会立即返回,无论锁是否可用。
②. 等待时间参数版本:lock.tryLock(long waitTime, long leaseTime, TimeUnit unit)。这个版本的tryLock()会尝试获取锁,最多等待waitTime时间(由unit参数指定时间单位)。如果在这个时间内成功获取到锁,则锁的租约时间(lease time)将被设置为leaseTime。如果等待时间结束后仍未获取到锁,则方法将返回false。
lock方法:这是一种阻塞式的获取锁的方式。当线程调用lock方法时,如果锁已经被其他线程持有,则当前线程会被阻塞,直到获取到锁或者发生超时、中断等情况。这种方式可以确保线程对共享资源的访问是互斥的,适用于需要确保共享资源只能被一个线程访问的场景。
tryLock方法:这是一种非阻塞式的获取锁的方式。当线程调用tryLock方法时,如果锁已经被其他线程持有,则当前线程不会被阻塞,而是立即返回一个布尔值来表示是否成功获得了锁
原理:
加锁的时候,用的是什么结构
用的是hset命令 –key就是锁的key,value 用的是map结构 key线程id value重入次数
如何保证,加锁和解锁是同一个线程
如何给锁自动续期—看门狗机制,默认锁过期时间是30s,如果30s业务没有结束,这个看门狗每10s扫描一次,发现业务还没有结束,会设置过期时间30s
重入锁是如何实现的
看门狗机制,默认是关闭的
如果加锁失败,线程就会被semaphore(其实就是AQS)阻塞住,当锁被释放以后,当前阻塞的线程会被semaphore唤醒,被唤醒后,会再一次尝试加锁逻辑
Redis set 的时候,会将key的过期时间清楚
Redis实例中最多放多少key