当前位置: 首页 > news >正文

商品中心—库存分桶高并发的优化文档

1.库存扣减时获取分桶元数据的优化

(1)优化原因

库存扣减过程中,⼤量的请求会加载本地缓存中的分桶元数据信息。在填充可⽤分桶到扣减上下⽂中,会调用缓存的分桶元数据信息的读方法。比如会调用本地缓存的分桶元数据对象bucketLocalCache的getAvailableList()方法。而当增加库存、分桶上下线时,会修改本地缓存的分桶元数据对象bucketLocalCache。

所以如果出现大量扣减请求时,也发生对本地缓存的分桶元数据对象修改,那么就会出现并发的读写问题,从而导致偶尔出现读方法的延迟问题。

优化前的部分日志:

...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 179毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 161毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 71毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 620毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 74毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 9毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 28毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 89毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 134毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:373]- 获取备用分桶耗时: 0毫秒

优化前的代码:

//库存扣减业务实现类
@Service
public class InventoryServiceImpl implements InventoryService {...//构建接下来用于具体扣减库存所需要的模型对象private BucketContext buildDeductProductStock(InventoryRequest request) {//1.填充扣减库存相关信息明细InventoryDetail inventoryDetail = inventoryConverter.converterRequest(request);//2.填充扣减库存的分桶配置信息BucketContext bucketContext = buildDeductBucketList(request);bucketContext.setInventoryDetail(inventoryDetail);return bucketContext;}//填充扣减库存的分桶相关信息private BucketContext buildDeductBucketList(InventoryRequest request) {BucketContext context = new BucketContext();//获取缓存中的分桶元数据信息BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());//获取本地缓存的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();//获取本次扣减请求的次数,用来定位具体使用的分桶库存Integer incrementCount = getIncrementCount(request);//通过取模运算得到本次扣减需要定位到的分桶列表下标int index = incrementCount % availableList.size();log.info("本次可用分桶列表数量:{},扣减下标:{}", availableList.size(), index);//获取本次扣减准备处理的分桶信息,避免扣减失败(分桶已下线或者库存不足),多备份几个BucketCacheBO bucketCacheBO = availableList.get(index);context.getAvailableList().add(bucketCacheBO);context.getBucketNoList().add(bucketCacheBO.getBucketNo());context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());//如果其他分桶都作为备用分桶,那么就可以实现库存合并扣减的功能了for (int i = 0; i < 2; i++) {//任意填充2个作为备份Random random = new Random();int num = random.nextInt(availableList.size());BucketCacheBO bucketCache = availableList.get(num);//避免拿到重复的分桶,这里处理一下if (context.getBucketNoList().contains(bucketCache.getBucketNo())) {i--;continue;}context.getAvailableList().add(bucketCache);context.getBucketNoList().add(bucketCache.getBucketNo());}return context;}...
}@Component
@Data
public class InventoryBucketCache {@Autowiredprivate Cache cache;@Autowiredprivate TairCache tairCache;//本地存储分桶元数据信息,增加库存、分桶扩容、分桶上下线时就会触发调用这个方法修改本地缓存对象public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) {log.info("local cache set key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));cache.put(bucketKey, bucketLocalCache);}//获取本地的分桶元数据信息public BucketLocalCache getBucketLocalCache(String bucketKey) {//先查本地缓存BucketLocalCache bucketLocalCache = (BucketLocalCache) cache.getIfPresent(bucketKey);log.info("local cache get key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));if (Objects.isNull(bucketLocalCache)) {//再查远程缓存synchronized (bucketKey.intern()) {String bucketCache = tairCache.get(TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketKey);if (!StringUtils.isEmpty(bucketCache)) {bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);cache.put(bucketKey, bucketLocalCache);}}}return bucketLocalCache;}
}

(2)解决⽅案

由于库存分桶元数据的对象变量是库存扣减请求和库存调配请求共⽤的,所以可以就将该变量交给ThreadLocal来管理其线程副本。

注意:只需要对分桶元数据的对象进行读取时使用ThreadLocal线程副本即可,对分桶元数据的对象进行修改时没必要使用ThreadLocal线程副本。

当使⽤ThreadLocal维护缓存的分桶元数据变量时,ThreadLocal为会每个使⽤该变量的线程提供独⽴的变量副本。从而每个线程都可以独⽴改变⾃⼰的副本,⽽不会影响其它线程的副本。

优化后,获取本地缓存的分桶元数据对象的可⽤分桶列表的耗时都为0毫秒了,没有再出现⼏⼗上百毫秒的情况。

@Component
@Data
public class InventoryBucketCache {//每次获取本地缓存存储的分桶元数据信息时,需要使⽤ThreadLocal来存储,避免线程之间的竞争private ThreadLocal<BucketLocalCache> bucketLocalCacheThreadLocal = new ThreadLocal<>();...//获取本地缓存的分桶元数据信息public BucketLocalCache getBucketLocalCache(String bucketKey) {bucketKey = TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketKey;//先查本地缓存BucketLocalCache bucketLocalCache = (BucketLocalCache) cache.getIfPresent(bucketKey);log.info("local cache get key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));if (Objects.isNull(bucketLocalCache)) {//再查远程缓存Long startTime = System.currentTimeMillis();synchronized (bucketKey.intern()) {String bucketCache = getBucketCache(bucketKey);if (!StringUtils.isEmpty(bucketCache)) {bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);cache.put(bucketKey, bucketLocalCache);}log.error("本地加载缓存模型未命中缓存,远程重新加载耗时{}毫秒", System.currentTimeMillis() - startTime);}}bucketLocalCacheThreadLocal.set(bucketLocalCache);return bucketLocalCacheThreadLocal.get();}public void threadLocalRemove() {bucketLocalCacheThreadLocal.remove();}...
}@Service
public class InventoryServiceImpl implements InventoryService {...//填充扣减库存的分桶相关信息private BucketContext buildDeductBucketList(InventoryRequest request) {BucketContext context = new BucketContext();//获取本地缓存的分桶元数据BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());//获取本地缓存的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();if (!CollectionUtils.isEmpty(availableList)) {//获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存Integer incrementCount = getIncrementCount(request);//通过运算得到本次访问所需要定位的分桶int index = incrementCount % availableList.size();//获取本次准备处理的分桶信息BucketCacheBO bucketCacheBO = availableList.get(index);context.getAvailableList().add(bucketCacheBO);//为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶//全部分桶都作为备份,就是合并扣减的实现了for (int i = 0; i < 2; i++) {//填充2个作为备份,如果超过下标则从0开始继续取int num = index + i;if (num >= availableList.size()) {num = 0;}BucketCacheBO bucketCache = availableList.get(num);context.getAvailableList().add(bucketCache);}} else {//并发下,可能全部的分桶都下线了,这个时候使用中心桶进行库存扣减(因为其它分桶的下线库存回源会加到中心桶上)BucketCacheBO bucketCacheBO = new BucketCacheBO();bucketCacheBO.setBucketNo(buildSellerInventoryKey(request.getSellerId(), request.getSkuId()));//中心桶无需扩容,但是出现这种场景属于高并发下,分桶全部被下线了,此时需要保证分桶本地元数据和远程保持一致,为了性能,分桶下线未上粒度较大得锁//所以需要当遇到使用中心桶的时候,再次触发一次远程缓存和本地缓存同步的操作,并且需要保证远程缓存最少有一个可用分桶存在bucketCacheBO.setBucketNum(0);context.getAvailableList().add(bucketCacheBO);//异步消息发送同步本地缓存的消息bucketRefreshProducer.sendBucketOffline(request);}Long index = InventorBucketUtil.createDetailBucketKey(request.getOrderId(), bucketLocalCache.getInventoryBucketConfig().getBucketNum());String inventoryDetailKey = bucketLocalCache.getBucketDetailKeyList().get(Integer.valueOf(index + ""));context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());context.setInventoryDetailKey(inventoryDetailKey);inventoryBucketCache.threadLocalRemove();return context;}...
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//刷新分桶元数据缓存//@param maxDepthNum      分桶最大库存深度//@param bucketLocalCache 分桶元数据信息//@param bucketNo         分桶编号private void refreshBucketCache(Integer maxDepthNum, BucketLocalCache bucketLocalCache, String bucketNo, Integer inventoryNum) {List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();for (BucketCacheBO bucketCacheBO : availableList) {if (bucketCacheBO.getBucketNo().equals(bucketNo)) {//每次库存具体深度变化都要更细,否则很容易触发回源的比例bucketCacheBO.setBucketNum(maxDepthNum);bucketCacheBO.setAllotNum(inventoryNum + (Objects.isNull(bucketCacheBO.getAllotNum()) ? 0 : bucketCacheBO.getAllotNum()));break;}}String key = buildBucketCacheKey(bucketLocalCache.getSellerId(), bucketLocalCache.getSkuId());//刷新本地缓存inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);}...
}

2.库存扣减的分桶路由⾃增序号优化

(1)优化原因

每次库存扣减,对应的路由分桶原本是通过缓存的⾃增序号来获取的。但是由于是同⼀个key,⾼并发压⼒下,这个key的访问压⼒很⼤。进⽽部分请求出现阻塞,获取序列号的性能下降。

优化前的部分日志:

...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 0毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 3毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 6毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 4毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 238毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 258毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 3毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 2毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 1毫秒
...r.e.i.s.impl.InventoryServiceImpl[buildDeductBucketList:356]- 获取扣减分桶耗时: 0毫秒

优化前的代码:

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate TairCache tairCache;...//获取对应售卖商品的扣减访问次数private Integer getIncrementCount(InventoryRequest request) {String incrementKey = TairInventoryConstant.SELLER_SKU_STOCK_COUNT_PREFIX + request.getSellerId() + request.getSkuId();Integer incrementCount = tairCache.incr(incrementKey);return incrementCount;}...
}

(2)解决⽅案

使⽤号段的方案,每次⾃增获取⼀万个序列号。消费序列号过程中如序列号使⽤过快,则⾃动增⻓序列号的⻓度。并在使⽤过程中,提前⽣成⼀批新的序列号等待使⽤。

使⽤该⽅案负责⾃增序号的⽣成后,获取扣减分桶耗时稳定在0毫秒内。除了第⼀次不存在序号时初始化耗时会⾼⼀些,后续请求性能稳定。

@Service
public class InventoryServiceImpl implements InventoryService {@Autowiredprivate SegmentNoGen segmentNoGen;...//获取对应售卖商品的扣减访问次数//这里考虑并发的时候自增导致性能过低,所以采取了批量获取一批序号,当这批序号被使用完以后才会再次获取一次private Integer getIncrementCount(InventoryRequest request) {String incrementKey = TairInventoryConstant.SELLER_SKU_STOCK_COUNT_PREFIX + request.getSellerId() + request.getSkuId();Long incrementCount = segmentNoGen.genNewNo(incrementKey);if (incrementCount > 0) {return incrementCount.intValue();}//避免获取缓存的时候出现异常,当为负数的时候默认取第一个,分桶最少存在1个return 0;}...
}//号段ID生成器组件
@Service
public class SegmentIDGenImpl implements SegmentIDGen {//下一次异步更新比率因子public static final double NEXT_INIT_FACTOR = 0.9;//最大步长不超过100,0000private static final int MAX_STEP = 1000000;//默认一个Segment会维持的时间为15分钟//如果在15分钟内Segment就消耗完了,则步长要扩容一倍,但不能超过MAX_STEP//如果在超过15*2=30分钟才将Segment消耗完,则步长要缩容一倍,但不能低于MIN_STEP,MIN_STEP的值为数据库中初始的step字段值private static final long SEGMENT_DURATION = 15 * 60 * 1000L;//更新因子//更新因子=2时,表示成倍扩容或者折半缩容private static final int EXPAND_FACTOR = 2;private final ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new UpdateThreadFactory());@Autowiredprivate LeafAllocNoRepository leafAllocNoRepository;@Resourceprivate SegmentIDCache cache;//生成新的ID@Overridepublic Long genNewNo(String bizTag) {if (!cache.isInitOk()) {throw new RuntimeException("not init");}//如果没有,此时需要初始化一个if (!cache.containsKey(bizTag)) {leafAllocNoRepository.insertLeadAlloc(bizTag);cache.updateCacheFromDb(bizTag);}SegmentBuffer buffer = cache.getValue(bizTag);if (!buffer.isInitOk()) {synchronized (buffer) {if (!buffer.isInitOk()) {try {updateSegmentFromDb(bizTag, buffer.getCurrent());log.info("Init buffer. Update leafkey {} {} from db", bizTag, buffer.getCurrent());buffer.setInitOk(true);} catch (Exception e) {log.warn("Init buffer {} exception", buffer.getCurrent(), e);throw new RuntimeException("init error:" + bizTag);}}}}return getIdFromSegmentBuffer(buffer);}...
}

3.库存扣减明细消息异步发送到MQ优化

(1)优化原因

每次库存扣减,都需要发送消息来进行异步记录⼀条库存扣减明细。由于原来发送消息时是等待消息发送成功后才返回,这会导致⾼并发下消息的吞吐量上不去,从⽽影响整体库存扣减的性能。

优化前的代码:

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate InventoryDetailProducer inventoryDetailProducer;...//扣减商品库存@Overridepublic JsonResult deductProductStock(InventoryRequest request) {//1.验证入参是否合法checkDeductProductStock(request);//2.构建扣减库存的上下文对象BucketContext bucketContext = buildDeductProductStock(request);try {//3.获取是否已经有一条扣减明细记录String repeatDeductInfo = getRepeatDeductInfo(bucketContext);if (!StringUtils.isEmpty(repeatDeductInfo)){return JsonResult.buildSuccess();}//4.执行库存扣减deductInventory(bucketContext);//5.写入明细,如果已重复写入,则写入失败并回退库存writeInventoryDetail(bucketContext);} catch (Exception e){e.printStackTrace();return JsonResult.buildError(e.getMessage());} finally {//6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则发送通知进行异步扩容checkInventoryBackSource(bucketContext);}return JsonResult.buildSuccess();}//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)private void writeInventoryDetail(BucketContext bucketContext) {//获取库存扣减的明细详情InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();//尝试写入明细记录,如果没有写入成功则说明库存需要回退Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));if (count < 0){//说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());} else {//发送消息,异步写入库存扣减的明细到DBinventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());}}...
}@Component
public class InventoryDetailProducer {@Autowiredprivate DefaultProducer defaultProducer;//库存扣减明细 MQ生产public void sendInventoryDetail(InventoryDetail inventoryDetail) {//发送库存扣减明细保存消息defaultProducer.sendMessage(RocketMqConstant.INVENTORY_DETAIL_TOPIC,JSONObject.toJSONString(inventoryDetail), "库存扣减");}
}@Component
public class DefaultProducer {private DefaultMQProducer producer;@Autowiredpublic DefaultProducer(RocketMQProperties rocketMQProperties) {producer = new DefaultMQProducer(RocketMqConstant.ORDER_DEFAULT_PRODUCER_GROUP);producer.setNamesrvAddr(rocketMQProperties.getNameServer());start();}//对象在使用之前必须要调用一次,只能初始化一次public void start() {try {this.producer.start();} catch (MQClientException e) {log.error("producer start error", e);}}...//发送消息public void sendMessage(String topic, String message, String type) {sendMessage(topic, message, -1, type);}//发送消息,同步等待消息发送请求返回成功public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) {Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));try {if (delayTimeLevel > 0) {msg.setDelayTimeLevel(delayTimeLevel);}SendResult send = producer.send(msg);if (SendStatus.SEND_OK == send.getSendStatus()) {log.info("发送MQ消息成功, type:{}, message:{}", type, message);} else {throw new ProductBizException(send.getSendStatus().toString());}} catch (Exception e) {log.error("发送MQ消息失败:", e);throw new ProductBizException(CommonErrorCodeEnum.SEND_MQ_FAILED);}}...
}

(2)解决⽅案

可以使⽤消息的异步发送,这样可以不用等待Broker返回结果。但是库存扣减明细消息是不允许丢失的,异步发送消息就可能发送失败。所以对于发送消息时返回发送失败的,可以进⾏重试处理。

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate InventoryDetailProducer inventoryDetailProducer;...//扣减商品库存@Overridepublic JsonResult deductProductStock(InventoryRequest request) {//1.验证入参是否合法checkDeductProductStock(request);//2.构建扣减库存的上下文对象BucketContext bucketContext = buildDeductProductStock(request);try {//3.获取是否已经有一条扣减明细记录,检查该笔订单号是否已经在缓存中存在String repeatDeductInfo = getRepeatDeductInfo(bucketContext);if (!StringUtils.isEmpty(repeatDeductInfo)) {return JsonResult.buildSuccess();}//4.执行库存扣减deductInventory(bucketContext);//5.写入明细,如果已重复写入失败,则回退库存writeInventoryDetail(bucketContext);} catch (Exception e) {log.error("库存扣减失败", e);return JsonResult.buildError(e.getMessage());} finally {//6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则发送通知进行异步扩容checkInventoryBackSource(bucketContext);}return JsonResult.buildSuccess();}//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)private void writeInventoryDetail(BucketContext bucketContext) {//获取扣减明细信息InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();//尝试写入明细记录,如果没有写入成功则说明库存需要回退Integer count = tairCache.exhsetNx(bucketContext.getInventoryDetailKey(), String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));if (count < 0) {//说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());} else {//发送消息,异步写入库存扣减的明细到DBinventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());}}...
}@Component
public class InventoryDetailProducer {@Autowiredprivate DefaultProducer defaultProducer;//库存扣减明细 MQ生产public void sendInventoryDetail(InventoryDetail inventoryDetail) {//发送库存扣减 明细保存消息defaultProducer.sendAsyncMessage(RocketMqConstant.INVENTORY_DETAIL_TOPIC,JSONObject.toJSONString(inventoryDetail), "库存扣减明细");}
}@Component
public class DefaultProducer {private DefaultMQProducer producer;@Autowiredpublic DefaultProducer(RocketMQProperties rocketMQProperties) {producer = new DefaultMQProducer(RocketMqConstant.ORDER_DEFAULT_PRODUCER_GROUP);producer.setNamesrvAddr(rocketMQProperties.getNameServer());start();}public DefaultMQProducer getProducer() {return this.producer;}//对象在使用之前必须要调用一次,只能初始化一次public void start() {try {this.producer.start();} catch (MQClientException e) {log.error("producer start error", e);}}...//异步发送消息public void sendAsyncMessage(String topic, String message, String type) {Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));try {//2.异步发送producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {}@Overridepublic void onException(Throwable e) {//对于失败的消息,要做重试处理log.error("发送MQ消息失败, type:{}, message:{}", type, message, e);}});} catch (Exception e) {log.error("发送MQ消息失败, type:{}, message:{}", type, message, e);throw new ProductBizException(CommonErrorCodeEnum.SEND_MQ_FAILED);}}...
}

4.库存扣减明细key热点缓存打散优化

(1)优化原因

库存进⾏分桶后,库存扣减的并发请求会均匀打散到多个缓存分⽚上。但库存扣减明细的key并没有进行缓存分片,⾼并发下会导致库存扣减明细的热key都集中在同⼀个分⽚上,从⽽影响写⼊性能。而其它⼏个缓存分⽚的性能还没有压到极限,所以要提升库存的性能,还需处理库存扣减明细的的热点key问题。

优化前的代码:

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate InventoryDetailProducer inventoryDetailProducer;...//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)private void writeInventoryDetail(BucketContext bucketContext) {//获取库存扣减的明细详情InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();//尝试写入明细记录,如果没有写入成功则说明库存需要回退Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));if (count < 0){//说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());} else {//发送消息,异步写入库存扣减的明细到DBinventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());}}...
}

(2)解决⽅案

⽣成库存分桶的同时,还需要⽣成⼀份用于库存扣减明细的key。这样当发生库存扣减时,就可以对订单号ID进⾏Hash,然后与分桶数量进⾏取模。从而计算出要使⽤的库存明细的缓存key,实现对库存明细缓存的写入按缓存key均匀打散到不同分⽚上。

注意如下代码中的:

BucketContext.setInventoryDetailKey() + getInventoryDetailKey();
BucketLocalCache.setBucketDetailKeyList() + getBucketDetailKeyList();

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate InventoryDetailProducer inventoryDetailProducer;...//填充扣减库存的分桶相关信息private BucketContext buildDeductBucketList(InventoryRequest request) {BucketContext context = new BucketContext();//获取本地缓存的分桶元数据BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());//获取本地缓存的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();if (!CollectionUtils.isEmpty(availableList)) {//获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存Integer incrementCount = getIncrementCount(request);//通过运算得到本次访问所需要定位的分桶int index = incrementCount % availableList.size();//获取本次准备处理的分桶信息BucketCacheBO bucketCacheBO = availableList.get(index);context.getAvailableList().add(bucketCacheBO);//为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶//全部分桶都作为备份,就是合并扣减的实现了for (int i = 0; i < 2; i++) {//填充2个作为备份,如果超过下标则从0开始继续取int num = index + i;if (num >= availableList.size()) {num = 0;}BucketCacheBO bucketCache = availableList.get(num);context.getAvailableList().add(bucketCache);}} else {//并发下,可能全部的分桶都下线了,这个时候使用中心桶进行库存扣减(因为其它分桶的下线库存回源会加到中心桶上)BucketCacheBO bucketCacheBO = new BucketCacheBO();bucketCacheBO.setBucketNo(buildSellerInventoryKey(request.getSellerId(), request.getSkuId()));//中心桶无需扩容,但是出现这种场景属于高并发下,分桶全部被下线了,此时需要保证分桶本地元数据和远程保持一致,为了性能,分桶下线未上粒度较大得锁//所以需要当遇到使用中心桶的时候,再次触发一次远程缓存和本地缓存同步的操作,并且需要保证远程缓存最少有一个可用分桶存在bucketCacheBO.setBucketNum(0);context.getAvailableList().add(bucketCacheBO);//异步消息发送同步本地缓存的消息bucketRefreshProducer.sendBucketOffline(request);}Long index = InventorBucketUtil.createDetailBucketKey(request.getOrderId(), bucketLocalCache.getInventoryBucketConfig().getBucketNum());String inventoryDetailKey = bucketLocalCache.getBucketDetailKeyList().get(Integer.valueOf(index + ""));context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());context.setInventoryDetailKey(inventoryDetailKey);inventoryBucketCache.threadLocalRemove();return context;}//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)private void writeInventoryDetail(BucketContext bucketContext) {//获取扣减明细信息InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();//尝试写入明细记录,如果没有写入成功则说明库存需要回退Integer count = tairCache.exhsetNx(bucketContext.getInventoryDetailKey(), String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));if (count < 0) {//说明明细已经存在了,写入失败,需要将库存回退到对应的分桶上tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());} else {//发送消息,异步写入库存扣减的明细到DBinventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());}}...
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//构建缓存模型//@param key//@param bucketNum             分桶数量//@param inventoryNum          分桶分配的库存数量//@param residueNum            剩余的未分配均匀的库存//@param inventoryBucketConfig 分桶配置信息private BucketLocalCache buildBucketCache(String key, Integer bucketNum, Integer inventoryNum, Integer residueNum, InventoryBucketConfigDO inventoryBucketConfig) {BucketLocalCache bucketLocalCache = new BucketLocalCache();//先获取得到这个模板配置的对应可分槽位的均匀桶列表List<String> bucketNoList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum());List<BucketCacheBO> bucketCacheBOList = new ArrayList<>(bucketNum);List<BucketCacheBO> undistributedList = new ArrayList<>(bucketNum);//构建出多个分桶对象for (int i = 0; i < bucketNum; i++) {//生成对应的分桶编号,方便定义到具体的分桶上BucketCacheBO bucketCache = new BucketCacheBO();String bucketNo = bucketNoList.get(i);bucketCache.setBucketNo(bucketNo);//最后一个分桶,分配剩余未除尽的库存+平均库存if (i == bucketNum - 1) {bucketCache.setBucketNum(inventoryNum + residueNum);} else {bucketCache.setBucketNum(inventoryNum);}bucketCacheBOList.add(bucketCache);}//生成的分桶对象超过实际可分配的分桶对象,保留这批多余的分桶模型为不可用分桶,后续分桶上线可以选择使用if (bucketNoList.size() > bucketNum) {for (int i = bucketNum; i < bucketNoList.size(); i++) {BucketCacheBO bucketCache = new BucketCacheBO();String bucketNo = bucketNoList.get(i);bucketCache.setBucketNo(bucketNo);undistributedList.add(bucketCache);}}//生成缓存的明细keyList<String> bucketDetailKeyList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum(), "%07d");//设置分桶缓存明细的keybucketLocalCache.setBucketDetailKeyList(bucketDetailKeyList);//设置可用的分桶缓存列表bucketLocalCache.setAvailableList(bucketCacheBOList);//设置不可用或者已下线的分桶缓存列表bucketLocalCache.setUndistributedList(undistributedList);return bucketLocalCache;}...
}public class InventorBucketUtil {private static final int MAX_SIZE = 100000;//生成对应的槽位key,明细使用,多使用一位区分//@param key       卖家Id+商品skuId//@param bucketNum 分桶配置数量//@return 预先保留的槽位集合public static List<String> createBucketNoList(String key, Integer bucketNum, String format) {Map<Long, String> cacheKey = new HashMap<>(bucketNum);//bucketNoList用来存放每个桶对应的hashKeyList<String> bucketNoList = new ArrayList<>(bucketNum);//分配桶的编号for (int i = 1; i <= MAX_SIZE; i++) {String serialNum = String.format(format, i);//卖家ID + 商品SKU ID + 序号String hashKey = key + serialNum;//一致性哈希算法murmurlong hash = HashUtil.murMurHash(hashKey.getBytes());//对分桶数量进行取模运算long c = (hash %= bucketNum);//确保被选中的hashKey都能哈希到不同的分桶if (cacheKey.containsKey(c)) {continue;}cacheKey.put(c, hashKey);bucketNoList.add(hashKey);if (cacheKey.size() >= bucketNum) {break;}}return bucketNoList;}...
}

文章转载自:东阳马生架构

原文链接:商品中心—19.库存分桶高并发的优化文档 - 东阳马生架构 - 博客园

体验地址:JNPF快速开发平台

http://www.lryc.cn/news/582102.html

相关文章:

  • 力扣 3258 统计满足 K 约束的子字符串数量 I 题解
  • Java工具类,对象List提取某个属性为List,对象List转为对象Map其中某个属性作为Key值
  • RAG实战指南 Day 8:PDF、Word和HTML文档解析实战
  • UI自动化常见面试题
  • day08-Elasticsearch
  • 云计算领域“XaaS”是什么?
  • Python编译器(Pycharm Jupyter)
  • 第4.2节 Android App生成追溯关系
  • 【Mac 从 0 到 1 保姆级配置教程 19】- 英语学习篇-我的英语工作流分享(AI 辅助学习)
  • JavaWeb笔记07
  • 比亚迪6月销量38.25万辆,同比增长11.9%
  • window显示驱动开发—BGRA 扫描输出支持
  • 特伦斯T1节拍器,突出综合优势与用户体验
  • Python 包管理工具 uv
  • 【C语言进阶】数据是如何存储的?
  • Web后端开发-请求响应
  • 国产CAD皇冠CAD(CrownCAD)建模教程:哈雷摩托车发动机零件
  • [论文阅读] 人工智能 | 读懂Meta-Fair:让LLM摆脱偏见的自动化测试新方法
  • 【mini-spring】【更新中】第一章 IOC与Bean源码及思路解析
  • IT 与动环一体化运维的技术融合实践
  • MySQL Galera Cluster企业级部署
  • 力扣_链表(前后指针)_python版本
  • verilog中timescale指令的使用
  • 零知开源——STM32F4结合BMP581气压传感器实现ST7789中文显示教程
  • centos stream 10设置本地网络
  • 沙箱逃逸漏洞
  • 音频信号的预加重:提升语音清晰度
  • OpenCV 人脸分析------面部关键点检测类cv::face::FacemarkLBF
  • 使用ansible的角色实现批量安装nginx服务
  • 图像处理基础:镜像、缩放与矫正