当前位置: 首页 > news >正文

商品中心—15.库存分桶扣减的技术文档

大纲

1.库存分桶扣减和扩容时涉及的数据表

2.下单时商品库存扣减

3.库存分桶扣减后异步更新DB

4.取消订单时回退商品库存

5.查询商品库存

6.库存扣减分桶轮询以及随机备用分桶

7.基于Tair中分桶数据实现库存扣减

8.分桶库存扣减完毕后扣减明细异步落库

9.分桶扣减完库存后异步触发回源扩容

10.库存分桶回源扩容的Double Check

11.库存分桶扩容量计算算法实现

12.库存分桶扩容完成以及分桶下线触发

13.库存分桶下线以及剩余存量归还中心桶

14.库存下线触发剩余库存总量预警机制

1.库存分桶扣减和扩容时涉及的数据表

(1)库存分桶配置表

(2)库存扣减明细表

(1)库存分桶配置表

CREATE TABLE `inventory_bucket_config` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`bucket_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '分桶数量',`max_depth_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '最⼤库存深度,即分桶的最大库存容量',`min_depth_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '最⼩库存深度,即分桶的最小库存容量',`threshold_value` int(10)  NOT NULL DEFAULT '0' COMMENT  '分桶下线阈值,当某个分桶的库存数小于阈值时就需要将该分桶下线了',`back_source_proportion` int(10)  NOT NULL DEFAULT '0' COMMENT  '回源⽐例,从1-100设定⽐例',`back_source_step` int(10)  NOT NULL DEFAULT '0' COMMENT  '回源步⻓,桶扩容的时候默认每次分配的库存⼤⼩',`template_name` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '模板名称',`is_default` tinyint(1)  NOT NULL DEFAULT '0' COMMENT '是否默认模板,只允许⼀个,1为默认模板',`version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',`del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',`create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',`update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存分桶配置模板表';

(2)库存扣减明细表

CREATE TABLE `inventory_deduction_detail` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`order_id` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '订单id',`refund_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '退款编号',`inventory_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '扣减库存数量',`sku_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品skuId',`seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID',`bucket_no` int(10)  NOT NULL COMMENT  '扣减分桶编号',`deduction_type` int(2)  NOT NULL COMMENT  '库存操作类型(10库存扣减,20库存退货)',`del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',`create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',`update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存扣减明细表';

(3)流程图

2.下单时商品库存扣减

(1)使用入口

(2)场景一之扣减库存

(3)场景二之扩容库存

(4)场景三之分桶下线

(1)使用入口

⽤户在前端下单时,会对商品库存发起扣减请求。

@RestController
@RequestMapping("/product/inventory")
public class InventoryController {@Autowiredprivate InventoryService inventoryService;...//扣减库存@RequestMapping("/deduct")public JsonResult deductProductStock(@RequestBody InventoryRequest request) {//这里模拟指定本次的库存业务单号,实际接口需要外部传入request.setOrderId(SnowflakeIdWorker.getCode());JsonResult result = inventoryService.deductProductStock(request);return result;}...
}@Service
public class InventoryServiceImpl implements InventoryService {...//扣减商品库存@Overridepublic JsonResult deductProductStock(InventoryRequest request) {//1.验证入参是否合法checkDeductProductStock(request);//2.构建扣减库存的上下文对象BucketContext bucketContext = buildDeductProductStock(request);try {//3.获取是否已经有一条扣减明细记录,检查该笔订单号是否已经在缓存中存在String repeatDeductInfo = getRepeatDeductInfo(bucketContext);if (!StringUtils.isEmpty(repeatDeductInfo)) {return JsonResult.buildSuccess();}//4.执行库存扣减deductInventory(bucketContext);//5.写入明细,如果已重复写入,则写入失败,并回退库存writeInventoryDetail(bucketContext);} catch (Exception e) {e.printStackTrace();return JsonResult.buildError(e.getMessage());} finally {//6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则发送通知进行异步扩容checkInventoryBackSource(bucketContext);}return JsonResult.buildSuccess();}...
}

(2)场景一之扣减库存

步骤一:库存扣减请求的⼊参校验
步骤二:构建扣减库存的上下文对象
步骤三:检查该笔订单号是否已经在缓存中存在
步骤四:执⾏库存扣减时会⽤到备⽤的分桶进行尝试
步骤五:扣减成功后写入⼀条执⾏明细到缓存
步骤六:⽆论整个扣减流程是否成功,都要执⾏⼀次是否扩容的校验判断

步骤一:库存扣减请求的⼊参校验

步骤二:构建扣减库存的上下文对象。接下来进⾏校验和扣减都会以这个上下文对象的信息为准。构建该上下文对象时,首先会根据⼀个⾃增的访问次数key来定位本次扣减应该路由到哪个分桶。也就是使用Round Robin轮询算法,根据扣减次数定位具体要扣减哪个分桶。为避免扣减失败,会同时随机⽣成2个备⽤分桶,用于扣减失败时的重试。如果其他分桶都作为备用分桶,那么就是库存合并扣减的功能了。

⼀般出现连续多个分桶库存都不⾜且分桶还未被进行下线处理的概率较少,此种场景的分桶应该⼤部分都已下线,且只保留了唯⼀⼀个可⽤分桶。获取分桶元数据时,会先从本地缓存获取,然后再从Tair缓存中获取。

@Service
public class InventoryServiceImpl implements InventoryService {...//构建接下来用于处理具体扣减库存时所需要的上下文对象private BucketContext buildDeductProductStock(InventoryRequest request) {//1.填充扣减库存相关请求信息的明细InventoryDetail inventoryDetail = inventoryConverter.converterRequest(request);//2.填充扣减库存的分桶相关信息BucketContext bucketContext = buildDeductBucketList(request);bucketContext.setInventoryDetail(inventoryDetail);return bucketContext;}//填充扣减库存的分桶相关信息private BucketContext buildDeductBucketList(InventoryRequest request) {BucketContext context = new BucketContext();//获取本地缓存的分桶元数据BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());//获取本地缓存的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();//获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存Integer incrementCount = getIncrementCount(request);//通过取模运算得到本次访问所需要定位的分桶,使用Round Robin轮询算法int index = incrementCount % availableList.size();//获取本次准备处理的分桶信息BucketCacheBO bucketCacheBO = availableList.get(index);context.getAvailableList().add(bucketCacheBO);context.getBucketNoList().add(bucketCacheBO.getBucketNo());//获取分桶配置信息context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());//为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶//如果其他分桶都作为备用分桶,那么就可以实现库存合并扣减的功能了for (int i = 0; i < 2; i++) {//任意填充2个作为备份Random random = new Random();int num = random.nextInt(availableList.size());BucketCacheBO bucketCache = availableList.get(num);//避免拿到重复的分桶,这里处理一下if (context.getBucketNoList().contains(bucketCache.getBucketNo())) {i--;continue;}context.getAvailableList().add(bucketCache);context.getBucketNoList().add(bucketCache.getBucketNo());}return context;}//获取对应售卖商品的扣减访问次数private Integer getIncrementCount(InventoryRequest request) {String incrementKey = TairInventoryConstant.SELLER_SKU_STOCK_COUNT_PREFIX + request.getSellerId() + request.getSkuId();Integer incrementCount = tairCache.incr(incrementKey);return incrementCount;}...
}@Component
@Data
public class InventoryBucketCache {@Autowiredprivate Cache cache;@Autowiredprivate TairCache tairCache;...//获取本地缓存的分桶元数据信息public BucketLocalCache getBucketLocalCache(String bucketKey) {BucketLocalCache bucketLocalCache = (BucketLocalCache) cache.getIfPresent(bucketKey);log.info("local cache get key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));if (Objects.isNull(bucketLocalCache)) {synchronized (bucketKey.intern()) {String bucketCache = tairCache.get(TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketKey);if (!StringUtils.isEmpty(bucketCache)) {bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);cache.put(bucketKey, bucketLocalCache);}}}return bucketLocalCache;}...
}//分桶扣减库存上下文对象
@Data
public class BucketContext {//存储可用的分桶编号private List<String> bucketNoList = new ArrayList<>();//存储可用的分桶的具体信息private List<BucketCacheBO> availableList  = new ArrayList<>();//扣减明细信息private InventoryDetail inventoryDetail;//用来处理扩容所用private Map<String, Integer> capacityMap = new HashMap<>();//当前分桶的配置信息private InventoryBucketConfigDO inventoryBucketConfig;
}//库存扣减的明细对象
@Data
public class InventoryDetail {//商品SKUprivate Long skuId;//扣减库存数量private Integer inventoryNum;//卖家IDprivate String sellerId;//订单IDprivate String orderId;//扣减使用的分桶private String bucketNo;//退款编号private String refundNo;
}

步骤三:检查该笔订单号是否已经在缓存中存在。如果已经存在则认为这笔订单已被扣减过库存了,可以直接返回。

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate TairCache tairCache;...//验证当前得请求扣减是否已经存在了private String getRepeatDeductInfo(BucketContext bucketContext) {//获取当前扣减库存对应的订单明细缓存String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();return tairCache.exhget(key, String.valueOf(bucketContext.getInventoryDetail().getOrderId()));}...
}@Component
public class TairCache {private JedisPool jedisPool;public TairCache(JedisPool jedisPool) {this.jedisPool = jedisPool;}public Jedis getJedis() {return jedisPool.getResource();}private TairHash createTairHash(Jedis jedis) {return new TairHash(jedis);}//获取hash对象public String exhget(String key, String field) {try (Jedis jedis = getJedis()) {String exhget = createTairHash(jedis).exhget(key, field);log.info("exhget key:{}, field:{}, value:{}", key, field, exhget);return exhget;}}
}

步骤四:执⾏库存扣减时会⽤到备⽤的分桶进行尝试。当第⼀次分桶扣减库存失败,默认会重试其它⼏个分桶。如果都失败则直接提示库存不⾜抛出异常。如果备用的分桶全部是可用的分桶,那么第一个分桶不够扣,就会继续扣第二个分桶,从而实现库存合并扣减。

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate TairCache tairCache;...//扣减库存//@param bucketContext 库存扣减上下文对象private void deductInventory(BucketContext bucketContext) {//获取可以使用的分桶编号,即对应缓存中的keyList<String> bucketNoList = bucketContext.getBucketNoList();//获取扣减明细信息InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();//获取扣减库存数量Integer inventoryNum = inventoryDetail.getInventoryNum();//获取用于处理扩容的MapMap<String, Integer> capacityMap = bucketContext.getCapacityMap();Boolean isDeduct = false;//对分桶进行库存扣减(每次)//如果bucketNoList是全部可用分桶,那么第一个分桶不够扣减,就会继续扣减第二个分桶,从而实现库存合并扣减for (String bucketNo : bucketNoList) {//自减,默认扣减后不能小于0,否则返回-1Integer residueNum = tairCache.decr(bucketNo, inventoryNum);//capacityMap可用于判断分桶是否是扩容的分桶,以及标记当前分桶剩余的库存数capacityMap.put(bucketNo, residueNum);//库存扣减成功if (residueNum >= 0) {//标记一下具体扣减的分桶属于哪个inventoryDetail.setBucketNo(bucketNo);isDeduct = true;break;}}//分桶扣减都没有成功,此时抛出异常提示库存不足if (!isDeduct) {throw new BaseBizException(InventoryExceptionCode.INVENTORY_INSUFFICIENT_ERROR);}}...
}

步骤五:扣减成功后写入⼀条执⾏明细到缓存。注意并发场景下可能获取时没有明细,但写⼊时⼜有明细了。⼀旦发⽣这种场景,说明并发了,需要进⾏库存的回退。

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate TairCache tairCache;@Resourceprivate InventoryDetailProducer inventoryDetailProducer;...//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)private void writeInventoryDetail(BucketContext bucketContext) {//获取扣减明细信息InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();//尝试写入明细记录,如果没有写入成功则说明库存需要回退Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));if (count < 0) {//说明明细已经存在了,写入失败,此时需要将库存回退到对应的分桶上tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());} else {//发送消息,异步写入库存扣减的明细到DBinventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());}}...
}@Component
public class TairCache {private JedisPool jedisPool;public TairCache(JedisPool jedisPool) {this.jedisPool = jedisPool;}public Jedis getJedis() {return jedisPool.getResource();}private TairHash createTairHash(Jedis jedis) {return new TairHash(jedis);}//存储hash对象public Integer exhset(String key,String field, String value){try (Jedis jedis = getJedis()) {return createTairHash(jedis).exhset(key, field, value, ExhsetParams.ExhsetParams().nx()).intValue();}}
}

步骤六:⽆论整个扣减流程是否成功,都要执⾏⼀次是否扩容的校验判断。执⾏依据是分桶扣减完库存后返回的扣减后剩余库存值以及对应的分桶。例如第⼀个分桶失败,第⼆个分桶成功,必然会触发第⼀个分桶的扩容。然后校验第⼆个分桶的库存返回值是否触发回源⽐例后决定是否触发扩容。

@Service
public class InventoryServiceImpl implements InventoryService {...//扣减商品库存@Overridepublic JsonResult deductProductStock(InventoryRequest request) {//1.验证入参是否合法checkDeductProductStock(request);//2.构建扣减库存的上下文对象BucketContext bucketContext = buildDeductProductStock(request);try {//3.获取是否已经有一条扣减明细记录,检查该笔订单号是否已经在缓存中存在String repeatDeductInfo = getRepeatDeductInfo(bucketContext);if (!StringUtils.isEmpty(repeatDeductInfo)) {return JsonResult.buildSuccess();}//4.执行库存扣减deductInventory(bucketContext);//5.写入明细,如果已重复写入,则写入失败,并回退库存writeInventoryDetail(bucketContext);} catch (Exception e) {e.printStackTrace();return JsonResult.buildError(e.getMessage());} finally {//6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则异步通知扩容checkInventoryBackSource(bucketContext);}return JsonResult.buildSuccess();}...//检测扣减成功后的库存是否触发回源//例如商品分桶库存1000,回源比例40%,那么实际剩余库存小于400就会触发回源库存的操作private void checkInventoryBackSource(BucketContext bucketContext) {//获取扣减明细信息InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();//存储对应的需要校验扩容的分桶Map<String, Integer> bucketMap = bucketContext.getCapacityMap();//获取库存分桶配置InventoryBucketConfigDO inventoryBucketConfig = bucketContext.getInventoryBucketConfig();//判断分桶当初分配的最大库存容量,计算是否触发回源比例List<BucketCacheBO> availableList = bucketContext.getAvailableList();for (BucketCacheBO bucketCacheBO : availableList) {//具体使用的是哪个分桶扣减库存if (bucketMap.containsKey(bucketCacheBO.getBucketNo())) {Integer residueNum = bucketMap.get(bucketCacheBO.getBucketNo());//当前分桶的分配总库存Integer bucketNum = bucketCacheBO.getBucketNum();//触发回源比例的百分比Integer backSourceProportion = inventoryBucketConfig.getBackSourceProportion();//这里如果要更准确,需要用小数得到回源数,剩余数量小于回源数,那么就要回源//这里省略了小数,所以可能会有一个数的误差,影响不大int backSourceNum = bucketNum * backSourceProportion / 100;//回源比例的库存 大于剩余的库存,触发异步扩容,或者没有返回剩余库存也说明扣减失败if (backSourceNum > residueNum) {//标记出回源的具体分桶inventoryDetail.setBucketNo(bucketCacheBO.getBucketNo());//发送通知到消息队列进行异步库存扩容sendAsynchronous(inventoryDetail);}}}}...
}

(3)场景二之扩容库存

步骤一:库存分桶缓存的扣减过程会触发库存扩容
步骤二:消费者消费分桶库存扩容的消息
步骤三:检测是否需要扩容,如果⽆需扩容则直接结束
步骤四:获取分布式锁来进行扩容处理或分桶下线处理
步骤五:进行具体的库存分桶缓存的扩容处理

步骤一:库存分桶缓存的扣减过程会触发库存扩容。执⾏库存分桶缓存的扣减时,如果发现分桶的库存剩余值小于回源配置数,此时就需要发送⼀个异步消息,通知该库存分桶进⾏扩容。

步骤二:消费者消费分桶库存扩容的消息。此时会调用分桶扩容接口InventoryBucketService的bucketCapacity()来进行扩容。

@Service
public class InventoryServiceImpl implements InventoryService {...//发送通知到消息队列进行异步库存扩容//@param inventoryDetail 库存扣减明细对象private void sendAsynchronous(InventoryDetail inventoryDetail) {//1.构建发送的消息对象BucketCapacity bucketCapacity = inventoryConverter.converter(inventoryDetail);//2.发送消息,异步处理扩容bucketCapacityProducer.sendBucketCapacity(bucketCapacity);}...
}//分桶扩容对象
@Data
public class BucketCapacity {//分桶编号private String bucketNo;//商品skuIDprivate String skuId;//卖家IDprivate String sellerId;
}//分桶扩容的消息队列
@Component
public class BucketCapacityProducer {@Autowiredprivate DefaultProducer defaultProducer;//分桶扩容的消息public void sendBucketCapacity(BucketCapacity bucketCapacity) {//发送分桶扩容消息defaultProducer.sendMessage(RocketMqConstant.BUCKET_CAPACITY_TOPIC, JSONObject.toJSONString(bucketCapacity), "分桶扩容");}
}//消费分桶库存扩容的消息
@Component
public class BucketCapacityListener implements MessageListenerConcurrently {@Autowiredprivate InventoryBucketService inventoryBucketService;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());log.info("执行分桶库存扩容,消息内容:{}", msg);BucketCapacity bucketCapacity = JsonUtil.json2Object(msg, BucketCapacity.class);//调用分桶扩容接口inventoryBucketService.bucketCapacity(bucketCapacity);}} catch (Exception e) {log.error("consume error, 分桶库存扩容失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//分桶扩容接口@Overridepublic void bucketCapacity(BucketCapacity bucketCapacity) {//先锁住中心桶库存,避免此时库存发生变化String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();String value = SnowflakeIdWorker.getCode();//1.校验是否已经无需扩容了,如果是则快速结束BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (!bucketCapacityContext.getIsCapacity()) {return;}//获取分布式锁来进行扩容处理boolean lock = tairLock.tryLock(key, value);if (lock) {try {//再次校验是否需要扩容,此处不允许并发bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (bucketCapacityContext.getIsCapacity()) {//2.获取中心桶缓存的剩余库存Integer residueNum = getCenterStock(bucketCapacity);//3.可以扩容,计算出可回源的库存进行处理if (residueNum > 0) {backSourceInventory(residueNum, bucketCapacityContext);} else {//4.中心桶无库存,检查是否触发下线checkBucketOffline(bucketCapacity);}}} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}...
}

步骤三:检测是否需要扩容,如果⽆需扩容则结束

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//校验本次请求是否还需要执行扩容处理private BucketCapacityContext checkBucketCapacity(BucketCapacity bucketCapacity) {String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();//1.获取远程的分桶缓存信息Integer residueNum = getBucketInventoryNum(bucketCapacity.getBucketNo());//2.获取缓存元数据信息BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);//3.校验是否还需要执行扩容List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();for (BucketCacheBO bucketCacheBO:availableList) {//具体使用的是哪个分桶进行扣减库存if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {//当前分桶的分配总库存Integer bucketNum = bucketCacheBO.getBucketNum();//触发回源比例的百分比Integer backSourceProportion = inventoryBucketConfig.getBackSourceProportion();int backSourceNum = bucketNum * backSourceProportion / 100;//回源比例的库存大于剩余的库存,触发异步扩容return new BucketCapacityContext(residueNum, backSourceNum > residueNum, bucketCapacity);}}//如果不在可用列表里面,则意味已下线,快速结束掉return new BucketCapacityContext(residueNum, false, bucketCapacity);}//获取得到当前分桶对应的实际剩余库存private Integer getBucketInventoryNum( String bucketNo) {String bucketNum = tairCache.get(bucketNo);if (StringUtils.isEmpty(bucketNum)){return 0;}return Integer.valueOf(bucketNum);}...
}

步骤四:获取分布式锁来进行扩容处理或分桶下线处理。这⾥会查看中⼼桶缓存的剩余库存,判断是否还有剩余库存可以扩容。如果有则进⾏扩容,如果没有则判断分桶是否触发下线阈值。并且处理请求前,需要再次判断是否⽆需扩容。避免有竞争锁的请求跳过前面的校验进⼊锁,需要过滤掉这种⽆效请求。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//分桶扩容接口@Overridepublic void bucketCapacity(BucketCapacity bucketCapacity) {//先锁住中心桶库存,避免此时库存发生变化String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();String value = SnowflakeIdWorker.getCode();//1.校验是否已经无需扩容了,如果是则快速结束BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (!bucketCapacityContext.getIsCapacity()) {return;}//获取分布式锁来进行扩容处理boolean lock = tairLock.tryLock(key, value);if (lock) {try {//再次校验是否需要扩容,此处不允许并发bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (bucketCapacityContext.getIsCapacity()) {//2.获取中心桶缓存的剩余库存Integer residueNum = getCenterStock(bucketCapacity);//3.可以扩容,计算出可回源的库存进行处理if (residueNum > 0) {backSourceInventory(residueNum, bucketCapacityContext);} else {//4.中心桶无库存,检查是否触发下线checkBucketOffline(bucketCapacity);}}} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}...
}

步骤五:进行具体的库存分桶缓存的扩容处理。分桶需要扩容多少库存,需要尽量保证每个分桶的库存尽可能均匀。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//回源库存到分桶上//@param residueNum            中心桶库存//@param bucketCapacityContext 扩容上下文对象private void backSourceInventory(Integer residueNum, BucketCapacityContext bucketCapacityContext) {//首先需要当前分桶的库存,其次还需要获取目前分桶的可发库存深度(第一次初始化的时候分配的库存)//根据当初分配的库存深度以及最大库存深度以及中心桶库存,得出均匀到目前支持可用的分桶均匀分配库存大概数量//同时根据本次同步的库存数量刷新分桶的实际库存深度BucketCapacity bucketCapacity = bucketCapacityContext.getBucketCapacity();//先获取本地的分桶元数据信息,获取当前分桶的总发放上限String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();Integer inventoryNum = 0;//获取实际配置的最大可用库存深度Integer maxBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();BucketCacheBO bucketCache = null;for (BucketCacheBO bucketCacheBO : availableList) {if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {bucketCache = bucketCacheBO;break;}}//这里没有匹配到分桶,则该分桶已被下线,不处理后续流程if (Objects.isNull(bucketCache)) {return;}//中心桶库存超过最大深度库存(全部分桶总计),直接以配置的回源步长增长库存if (residueNum > maxBucketNum) {inventoryNum = inventoryBucketConfig.getBackSourceStep();} else {inventoryNum = calcEvenInventoryNum(maxBucketNum, inventoryBucketConfig, residueNum, bucketCache);}//填充变更元数据关于库存的深度数据Integer maxDepthNum = getMaxDepthNum(inventoryNum, inventoryBucketConfig, bucketCache, bucketCapacityContext);//更新分桶元数据相关信息,注意需要判断当前分桶的库存深度是否真实发生变化,如无变化则不需要更新refreshBucketCache(maxDepthNum, bucketLocalCache, bucketCapacity.getBucketNo(), inventoryNum);log.info("本次分桶:{},回源库存:{}", bucketCapacity.getBucketNo(), inventoryNum);//回源分桶的库存Integer incr = tairCache.incr(bucketCapacity.getBucketNo(), inventoryNum);//扣减中心桶库存Integer decr = tairCache.decr(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, inventoryNum);log.info("本次分桶:{},回源库存:{}, 回源后分桶库存:{}, 中心桶剩余库存:{}", bucketCapacity.getBucketNo(), inventoryNum, incr, decr);}...
}

(4)场景三之分桶下线

说明一:当中⼼桶库存缓存⽆剩余库存,分桶库存也处于下线的阈值时。为了避免碎⽚化的问题出现,需要将⼀些⼩于阈值的库存进⾏分桶回收。分桶回收也就是,将库存回源到中⼼桶,提供给其它分桶扩容。如此反复,当库存越来越少,最终只留下⼀个分桶扣减库存。

说明二:分桶下线采取异步⽅式执⾏,因为分桶下线请求的路由需要时间,所以这⾥对于分桶下线需要先把下线分桶从可⽤列表移除。再通过⼀个延迟消息将对应分桶⾥的库存回源到中⼼桶,避免库存超发。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//校验当前分桶是否触发下线的阈值private void checkBucketOffline(BucketCapacity bucketCapacity) {//1.获取当前分桶的配置信息String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);//2.检测分桶的库存是否触发下线阈值,先获取当前分桶的具体库存以及下线配置阈值Integer thresholdValue = bucketLocalCache.getInventoryBucketConfig().getThresholdValue();Integer inventoryNum = getBucketInventoryNum(bucketCapacity.getBucketNo());//3.如果触发下线,发送消息调用分桶下线if (thresholdValue > inventoryNum) {log.info("触发下线{},阈值{},当前库存值{}", thresholdValue > inventoryNum, thresholdValue, inventoryNum);sendAsynchronous(bucketCapacity);}}//对分桶进行异步下线private void sendAsynchronous(BucketCapacity bucketCapacity) {//1.构建分桶下线接口模型InventorOfflineRequest offlineRequest = buildOfflineBucketInfo(bucketCapacity);//2.发送消息,通知处理分桶下线bucketOfflineProducer.sendBucketOffline(offlineRequest);}...
}

3.库存分桶扣减后异步更新DB

(1)使用入口

(2)具体实现

(1)使用入口

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate TairCache tairCache;//扣减商品库存@Overridepublic JsonResult deductProductStock(InventoryRequest request) {//1.验证入参是否合法checkDeductProductStock(request);//2.构建扣减库存的上下文对象BucketContext bucketContext = buildDeductProductStock(request);try {//3.获取是否已经有一条扣减明细记录,检查该笔订单号是否已经在缓存中存在String repeatDeductInfo = getRepeatDeductInfo(bucketContext);if (!StringUtils.isEmpty(repeatDeductInfo)) {return JsonResult.buildSuccess();}//4.执行库存扣减deductInventory(bucketContext);//5.写入明细,如果已重复写入,则写入失败,并回退库存writeInventoryDetail(bucketContext);} catch (Exception e) {e.printStackTrace();return JsonResult.buildError(e.getMessage());} finally {//6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则异步通知扩容checkInventoryBackSource(bucketContext);}return JsonResult.buildSuccess();}//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)private void writeInventoryDetail(BucketContext bucketContext) {//获取扣减明细信息InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();//尝试写入明细记录,如果没有写入成功则说明库存需要回退Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));if (count < 0) {//说明明细已经存在了,写入失败,此时需要将库存回退到对应的分桶上tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());} else {//发送消息,异步写入库存扣减的明细到DBinventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());}}...
}//库存扣减明细
@Component
public class InventoryDetailProducer {@Autowiredprivate DefaultProducer defaultProducer;//库存扣减明细 MQ生产public void sendInventoryDetail(InventoryDetail inventoryDetail) {//发送库存扣减 明细保存消息defaultProducer.sendAsyncMessage(RocketMqConstant.INVENTORY_DETAIL_TOPIC, JSONObject.toJSONString(inventoryDetail), "库存扣减明细");}
}

(2)具体实现

说明一:当有任意⼀个库存发⽣扣减或者撤销时,会将此次发⽣变化的库存具体明细进⾏消息发送。

说明二:扣减需要验证是否已有该订单号的库存明细记录,同样的记录只允许⽣成⼀次,重复则抛出唯⼀索引异常。捕获异常返回成功消息,不写⼊数据库。

说明三:写入数据库时,通过批量插入库存明细代替单条数据插入。

//处理库存扣减明细的记录消息
@Component
public class InventoryDetailListener implements MessageListenerConcurrently {@Resourceprivate InventoryRepository inventoryRepository;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {log.info("执行库存扣减明细保存,消息数量:{}", list.size());List<InventoryDetail> inventoryDetailList = new ArrayList<>(list.size());for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());InventoryDetail inventoryDetail = JsonUtil.json2Object(msg, InventoryDetail.class);inventoryDetailList.add(inventoryDetail);}inventoryRepository.saveInventoryDetailList(inventoryDetailList, InventoryDeductionTypeEnum.INVENTORY_DETAIL_DEDUCTIONS_TYPE.getCode());} catch (DuplicateKeyException ex) {log.error("consume repeat, 库存扣减明细重复记录,不再重复执行", ex);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {log.error("consume error, 库存扣减明细保存失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}@Repository
public class InventoryRepository {...//批量保存库存扣减的明细public void saveInventoryDetailList(List<InventoryDetail> inventoryDetailList, Integer deductionType) throws DuplicateKeyException {List<InventoryDeductionDetailDO> inventoryDeductionDetailDOList = inventoryConverter.converterDOList(inventoryDetailList);//对象转换赋值for (InventoryDeductionDetailDO inventoryDeductionDetailDO : inventoryDeductionDetailDOList) {inventoryDeductionDetailDO.setDeductionType(deductionType);inventoryDeductionDetailDO.initCommon();}int count = inventoryDetailMapper.insertBatch(inventoryDeductionDetailDOList);if (count <= 0) {throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL);}}...
}

4.取消订单时回退商品库存

(1)使用入口

(2)具体实现

(1)使用入口

@DubboService(version = "1.0.0", interfaceClass = InventoryServiceApi.class, retries = 0)
public class InventoryServiceApiImpl implements InventoryServiceApi {@Resourceprivate InventoryService inventoryService;...//取消订单时,回退商品库存@Overridepublic JsonResult cancelProductStock(InventoryRequest request) {try {return inventoryService.cancelProductStock(request);} catch (ProductBizException e) {log.error("biz error: request={}", JSON.toJSONString(request), e);return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());} catch (Exception e) {log.error("system error: request={}", JSON.toJSONString(request), e);return JsonResult.buildError(e.getMessage());}}...
}@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate TairLock tairLock;...//取消订单 回退商品库存@Transactional(rollbackFor = Exception.class)@Overridepublic JsonResult cancelProductStock(InventoryRequest request) {//1.验证入参checkDeductProductStock(request);String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + request.getOrderId();String value = SnowflakeIdWorker.getCode();boolean lock = tairLock.tryLock(key, value);if (lock) {try {//2.记录取消订单的库存申请记录InventoryDetail inventoryDetail = saveRefundInventoryDetail(request);//3.释放对应库存回到对应中心桶releaseInventory(inventoryDetail);return JsonResult.buildSuccess();} catch (Exception e) {e.printStackTrace();return JsonResult.buildError(e.getMessage());} finally {tairLock.unlock(key, value);}}return JsonResult.buildError("请求频繁,请稍后再试");}...
}

(2)具体实现

步骤一:库存取消订单参数⼊参校验

步骤二:订单加锁避免重复执行退还库存请求

步骤三:保存退还库存记录

先获取该订单已退的库存记录,和该订单当前申请退款的库存进⾏相加。计算是否超过该订单下单时的库存数量,如未超过则允许退还库存。

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate InventoryRepository inventoryRepository;...//保存退还库存记录private InventoryDetail saveRefundInventoryDetail(InventoryRequest request) {InventoryDetail inventoryDetail = inventoryRepository.getInventoryDetail(request.getOrderId());if (Objects.isNull(inventoryDetail)) {throw new BaseBizException(InventoryExceptionCode.INVENTORY_DETAIL_NULL_ERROR, InventoryExceptionCode.INVENTORY_DETAIL_NULL_ERROR.getErrorCode());}//校验对应订单的退款库存申请记录List<InventoryDetail> inventoryDetails = inventoryRepository.queryRefundInventoryDetailList(request.getOrderId());Integer refundNum = 0;//校验已退库存 + 本次申请退货库存 是否超过下单库存if (!CollectionUtils.isEmpty(inventoryDetails)) {for (InventoryDetail inventoryDetail1 : inventoryDetails) {refundNum = refundNum + inventoryDetail1.getInventoryNum();}}//如果扣减的库存大于已退库存和本次退的库存,则允许退货if (inventoryDetail.getInventoryNum() > (refundNum + request.getInventoryNum())) {InventoryDetail inventoryRefundDetail = inventoryConverter.converterRequest(request);inventoryRepository.saveInventoryDetail(inventoryRefundDetail, InventoryDeductionTypeEnum.INVENTORY_DETAIL_REFUND_TYPE.getCode());return inventoryRefundDetail;}return null;}...
}@Repository
public class InventoryRepository {...//根据订单号查询 库存扣减明细public InventoryDetail getInventoryDetail(String orderId) {LambdaQueryWrapper<InventoryDeductionDetailDO> queryWrapper = Wrappers.lambdaQuery();queryWrapper.eq(InventoryDeductionDetailDO::getOrderId, orderId);queryWrapper.eq(InventoryDeductionDetailDO::getDeductionType, InventoryDeductionTypeEnum.INVENTORY_DETAIL_DEDUCTIONS_TYPE.getCode());InventoryDeductionDetailDO inventoryDeductionDetailDO = inventoryDetailMapper.selectOne(queryWrapper);return inventoryConverter.converter(inventoryDeductionDetailDO);}//获取扣减库存对应的已退明细public List<InventoryDetail> queryRefundInventoryDetailList(String orderId) {LambdaQueryWrapper<InventoryDeductionDetailDO> queryWrapper = Wrappers.lambdaQuery();queryWrapper.eq(InventoryDeductionDetailDO::getOrderId, orderId);queryWrapper.eq(InventoryDeductionDetailDO::getDeductionType, InventoryDeductionTypeEnum.INVENTORY_DETAIL_REFUND_TYPE.getCode());List<InventoryDeductionDetailDO> inventoryDeductionDetailDOS = inventoryDetailMapper.selectList(queryWrapper);return inventoryConverter.converterList(inventoryDeductionDetailDOS);}//保存库存扣减的明细public void saveInventoryDetail(InventoryDetail inventoryDetail, Integer deductionType) throws DuplicateKeyException {InventoryDeductionDetailDO inventoryDeductionDetailDO = inventoryConverter.converterDO(inventoryDetail);inventoryDeductionDetailDO.setDeductionType(deductionType);inventoryDeductionDetailDO.initCommon();int count = inventoryDetailMapper.insert(inventoryDeductionDetailDO);if (count <= 0) {throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL);}}...
}

步骤四:库存退还成功后,将对应库存写回中⼼桶

待后续分桶扩容时使⽤。

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate TairCache tairCache;...//库存回退到中心桶上private void releaseInventory(InventoryDetail inventoryDetail) {//对象为null,说明没有执行退货明细写入if (!Objects.isNull(inventoryDetail)) {String key = TairInventoryConstant.SELLER_INVENTORY_PREFIX + inventoryDetail.getSellerId() + inventoryDetail.getSkuId();//将本次申请退回的库存,返回到中心桶上tairCache.incr(key, inventoryDetail.getInventoryNum());}}...
}

5.查询商品库存

说明一:默认查询的skuId集合最⼤不超过100,⼀般主要都是单个SKU查询。

说明二:首先获取到这个商品SKU的本地缓存的分桶元数据信息,然后把元数据中的分桶编号 + 中⼼桶缓存key加⼊到查询key集合中,接着使⽤mget⽅法批量获取缓存库存。最后将返回的中⼼桶剩余库存和分桶库存进⾏合并,得到商品实际库存。

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate InventoryBucketCache inventoryBucketCache;@Resourceprivate TairCache tairCache;...//查询返回卖家的商品实际库存信息private List<InventoryResponseDTO> queryInventoryCacheList(InventoryQueryRequest queryRequest) {//1.获取组装查询的缓存keyList<String> skuIdList = queryRequest.getSkuIdList();List<InventoryResponseDTO> inventoryResponseDTOList = new ArrayList<>();for (String skuId : skuIdList) {//商品库存的分桶元数据的缓存keyString cacheKey = queryRequest.getSellerId() + skuId;//获取分桶元数据的缓存,先查本地缓存,再查远程缓存BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(cacheKey);if (Objects.isNull(bucketLocalCache)) {continue;}List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();availableList.addAll(bucketLocalCache.getUndistributedList());//获取存有库存值的缓存keyList<String> bucketList = availableList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());//将缓存中心桶剩余库存的缓存key添加到bucketListString key = TairInventoryConstant.SELLER_INVENTORY_PREFIX + cacheKey;bucketList.add(key);//获取每个缓存key对应的库存值List<String> bucketNumList = tairCache.mget(bucketList);Integer inventoryNum = 0;for (String bucketNum : bucketNumList) {if (!Objects.isNull(bucketNum)) {inventoryNum = inventoryNum + Integer.valueOf(bucketNum);}}//构建商品库存模型InventoryResponseDTO inventoryResponseDTO = new InventoryResponseDTO();inventoryResponseDTO.setSellerId(queryRequest.getSellerId());inventoryResponseDTO.setSellerId(skuId);inventoryResponseDTO.setInventoryNum(inventoryNum);inventoryResponseDTOList.add(inventoryResponseDTO);}return inventoryResponseDTOList;}...
}

6.库存扣减分桶轮询以及随机备用分桶

会使用Round Robin轮询算法,根据扣减次数来定位具体要扣减哪个分桶。为避免扣减失败,会同时随机⽣成2个备⽤分桶,用于扣减失败时的重试。

@Service
public class InventoryServiceImpl implements InventoryService {...//构建接下来用于处理具体扣减库存时所需要的上下文对象private BucketContext buildDeductProductStock(InventoryRequest request) {//1.填充扣减库存相关请求信息的明细InventoryDetail inventoryDetail = inventoryConverter.converterRequest(request);//2.填充扣减库存的分桶相关信息BucketContext bucketContext = buildDeductBucketList(request);bucketContext.setInventoryDetail(inventoryDetail);return bucketContext;}//填充扣减库存的分桶相关信息private BucketContext buildDeductBucketList(InventoryRequest request) {BucketContext context = new BucketContext();//获取本地缓存的分桶元数据BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());//获取本地缓存的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();//获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存Integer incrementCount = getIncrementCount(request);//通过运算得到本次访问所需要定位的分桶,使用Round Robin轮询算法int index = incrementCount % availableList.size();//获取本次准备处理的分桶信息BucketCacheBO bucketCacheBO = availableList.get(index);context.getAvailableList().add(bucketCacheBO);context.getBucketNoList().add(bucketCacheBO.getBucketNo());//获取分桶配置信息context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());//为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶for (int i = 0; i < 2; i++) {//任意填充2个作为备份Random random = new Random();int num = random.nextInt(availableList.size());BucketCacheBO bucketCache = availableList.get(num);//避免拿到重复的分桶,这里处理一下if (context.getBucketNoList().contains(bucketCache.getBucketNo())) {i--;continue;}context.getAvailableList().add(bucketCache);context.getBucketNoList().add(bucketCache.getBucketNo());}return context;}//获取对应售卖商品的扣减访问次数private Integer getIncrementCount(InventoryRequest request) {String incrementKey = TairInventoryConstant.SELLER_SKU_STOCK_COUNT_PREFIX + request.getSellerId() + request.getSkuId();Integer incrementCount = tairCache.incr(incrementKey);return incrementCount;}...
}

7.基于Tair中分桶数据实现库存扣减

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate TairCache tairCache;...//扣减库存//@param bucketContext 库存扣减上下文对象private void deductInventory(BucketContext bucketContext) {//获取可以使用的分桶编号,即对应缓存中的keyList<String> bucketNoList = bucketContext.getBucketNoList();//获取扣减明细信息InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();//获取扣减库存数量Integer inventoryNum = inventoryDetail.getInventoryNum();//获取用于处理扩容的MapMap<String, Integer> capacityMap = bucketContext.getCapacityMap();Boolean isDeduct = false;//对分桶进行库存扣减(每次)for (String bucketNo : bucketNoList) {//自减,默认扣减后不能小于0,否则返回-1Integer residueNum = tairCache.decr(bucketNo, inventoryNum);//capacityMap可用于判断分桶是否是扩容的分桶,以及标记当前分桶剩余的库存数capacityMap.put(bucketNo, residueNum);//库存扣减成功if (residueNum >= 0) {//标记一下具体扣减的分桶属于哪个inventoryDetail.setBucketNo(bucketNo);isDeduct = true;break;}}//分桶扣减都没有成功,此时抛出异常提示库存不足if (!isDeduct) {throw new BaseBizException(InventoryExceptionCode.INVENTORY_INSUFFICIENT_ERROR);}}...
}

8.分桶库存扣减完毕后扣减明细异步落库

@Service
public class InventoryServiceImpl implements InventoryService {@Resourceprivate TairCache tairCache;@Resourceprivate InventoryDetailProducer inventoryDetailProducer;...//将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)private void writeInventoryDetail(BucketContext bucketContext) {//获取扣减明细信息InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();//尝试写入明细记录,如果没有写入成功则说明库存需要回退Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));if (count < 0) {//说明明细已经存在了,写入失败,此时需要将库存回退到对应的分桶上tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());} else {//发送消息,异步写入库存扣减的明细到DBinventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());}}...
}@Component
public class TairCache {private JedisPool jedisPool;public TairCache(JedisPool jedisPool) {this.jedisPool = jedisPool;}public Jedis getJedis() {return jedisPool.getResource();}private TairHash createTairHash(Jedis jedis) {return new TairHash(jedis);}//存储hash对象public Integer exhset(String key,String field, String value){try (Jedis jedis = getJedis()) {return createTairHash(jedis).exhset(key, field, value, ExhsetParams.ExhsetParams().nx()).intValue();}}
}//处理库存扣减明细的记录消息
@Component
public class InventoryDetailListener implements MessageListenerConcurrently {@Resourceprivate InventoryRepository inventoryRepository;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {log.info("执行库存扣减明细保存,消息数量:{}", list.size());List<InventoryDetail> inventoryDetailList = new ArrayList<>(list.size());for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());InventoryDetail inventoryDetail = JsonUtil.json2Object(msg, InventoryDetail.class);inventoryDetailList.add(inventoryDetail);}inventoryRepository.saveInventoryDetailList(inventoryDetailList, InventoryDeductionTypeEnum.INVENTORY_DETAIL_DEDUCTIONS_TYPE.getCode());} catch (DuplicateKeyException ex) {log.error("consume repeat, 库存扣减明细重复记录,不再重复执行", ex);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {log.error("consume error, 库存扣减明细保存失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}

9.分桶扣减完库存后异步触发回源扩容

@Service
public class InventoryServiceImpl implements InventoryService {...//检测扣减成功后的库存是否触发回源//例如商品分桶库存1000,回源比例40%,那么实际剩余库存小于400就会触发回源库存的操作private void checkInventoryBackSource(BucketContext bucketContext) {//获取扣减明细信息InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();//存储对应的需要校验扩容的分桶Map<String, Integer> bucketMap = bucketContext.getCapacityMap();//获取当前的库存分桶配置InventoryBucketConfigDO inventoryBucketConfig = bucketContext.getInventoryBucketConfig();//判断分桶当初分配的最大库存容量,计算是否触发回源比例List<BucketCacheBO> availableList = bucketContext.getAvailableList();for (BucketCacheBO bucketCacheBO : availableList) {//具体使用的是哪个分桶扣减库存if (bucketMap.containsKey(bucketCacheBO.getBucketNo())) {Integer residueNum = bucketMap.get(bucketCacheBO.getBucketNo());//当前分桶的分配总库存Integer bucketNum = bucketCacheBO.getBucketNum();//触发回源比例的百分比Integer backSourceProportion = inventoryBucketConfig.getBackSourceProportion();//这里如果要更准确,需要用小数得到回源数,剩余数量小于回源数,那么就要回源//这里省略了小数,所以可能会有一个数的误差,影响不大int backSourceNum = bucketNum * backSourceProportion / 100;//回源比例的库存 大于剩余的库存,触发异步扩容,或者没有返回剩余库存也说明扣减失败if (backSourceNum > residueNum) {//标记出回源的具体分桶inventoryDetail.setBucketNo(bucketCacheBO.getBucketNo());//发送通知到消息队列进行异步库存扩容sendAsynchronous(inventoryDetail);}}}}//发送通知到消息队列进行异步库存扩容//@param inventoryDetail 库存扣减明细对象private void sendAsynchronous(InventoryDetail inventoryDetail) {//1.构建发送的消息对象BucketCapacity bucketCapacity = inventoryConverter.converter(inventoryDetail);//2.发送消息,异步处理扩容bucketCapacityProducer.sendBucketCapacity(bucketCapacity);}...
}

10.库存分桶回源扩容的Double Check

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//分桶扩容接口@Overridepublic void bucketCapacity(BucketCapacity bucketCapacity) {//先锁住中心桶库存,避免此时库存发生变化String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();String value = SnowflakeIdWorker.getCode();//1.校验是否已经无需扩容了,如果是则快速结束BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (!bucketCapacityContext.getIsCapacity()) {return;}//获取分布式锁来进行扩容处理boolean lock = tairLock.tryLock(key, value);if (lock) {try {//再次校验是否需要扩容,此处不允许并发bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (bucketCapacityContext.getIsCapacity()) {//2.获取中心桶缓存的剩余库存Integer residueNum = getCenterStock(bucketCapacity);//3.可以扩容,计算出可回源的库存进行处理if (residueNum > 0) {backSourceInventory(residueNum, bucketCapacityContext);} else {//4.中心桶无库存,检查是否触发下线checkBucketOffline(bucketCapacity);}}} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}...
}

11.库存分桶扩容量计算算法实现

分桶需要扩容多少库存,需要注意尽量保证每个分桶的库存尽可能均匀。

如果中心桶库存超过最大深度库存,则直接以配置的回源步长增长库存,否则汇总当前分桶的实际库存深度。也就是根据当前的可⽤分桶列表、中⼼桶库存、总的可⽤库存深度,计算出平均的⼀个可分配库存数量。从而避免每个分桶扩容的库存不均匀(最⼩值必须超过最⼩库存深度)。

如果扩容的库存深度超过当时分配的库存深度,且未超过最⼤库存深度,则以当前分配的实际库存更新当前分桶库存深度。

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//回源库存到分桶上//@param residueNum            中心桶库存//@param bucketCapacityContext 扩容上下文对象private void backSourceInventory(Integer residueNum, BucketCapacityContext bucketCapacityContext) {//首先需要当前分桶的库存,其次还需要获取目前分桶的可发库存深度(第一次初始化的时候分配的库存)//根据当初分配的库存深度以及最大库存深度以及中心桶库存,得出均匀到目前支持可用的分桶均匀分配库存大概数量//同时根据本次同步的库存数量刷新分桶的实际库存深度BucketCapacity bucketCapacity = bucketCapacityContext.getBucketCapacity();//先获取本地的分桶元数据信息,获取当前分桶的总发放上限String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();Integer inventoryNum = 0;//获取实际配置的最大可用库存深度Integer maxBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();BucketCacheBO bucketCache = null;for (BucketCacheBO bucketCacheBO : availableList) {if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {bucketCache = bucketCacheBO;break;}}//这里没有匹配到分桶,则该分桶已被下线,不处理后续流程if (Objects.isNull(bucketCache)) {return;}//3.中心桶库存超过最大深度库存(全部分桶总计),直接以配置的回源步长增长库存if (residueNum > maxBucketNum) {inventoryNum = inventoryBucketConfig.getBackSourceStep();} else {inventoryNum = calcEvenInventoryNum(maxBucketNum, inventoryBucketConfig, residueNum, bucketCache);}//4.获取扩容后的预估库存深度Integer maxDepthNum = getMaxDepthNum(inventoryNum, inventoryBucketConfig, bucketCache, bucketCapacityContext);//5.更新分桶元数据相关信息,注意需要判断当前分桶的库存深度是否真实发生变化,如无变化则不需要更新refreshBucketCache(maxDepthNum, bucketLocalCache, bucketCapacity.getBucketNo(), inventoryNum);log.info("本次分桶:{},回源库存:{}", bucketCapacity.getBucketNo(), inventoryNum);//6.回源分桶的库存Integer incr = tairCache.incr(bucketCapacity.getBucketNo(), inventoryNum);//6.扣减中心桶库存Integer decr = tairCache.decr(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, inventoryNum);log.info("本次分桶:{},回源库存:{}, 回源后分桶库存:{}, 中心桶剩余库存:{}", bucketCapacity.getBucketNo(), inventoryNum, incr, decr);}...//计算出均匀后的每个分桶实际分配的库存值//@param maxBucketNum          最大的库存深度//@param inventoryBucketConfig 分桶配置//@param residueNum            中心桶剩余库存//@param bucketCache           扩容分桶private Integer calcEvenInventoryNum(Integer maxBucketNum, InventoryBucketConfigDO inventoryBucketConfig, Integer residueNum, BucketCacheBO bucketCache) {//获取当前扩容的分桶深度Integer bucketDepthNum = bucketCache.getBucketNum();//得到扩容的分桶深度 和当前全部可用分桶的库存深度,计算占比//根据占比计算出回源的步长,注意最小深度,如果计算后的步长小于最小库存深度,则默认取最小库存深度BigDecimal proportion = new BigDecimal(bucketDepthNum).divide(new BigDecimal(maxBucketNum), 6, BigDecimal.ROUND_DOWN);//根据比例计算出可分配的库存BigDecimal allotNum = new BigDecimal(residueNum).multiply(proportion).setScale(0, BigDecimal.ROUND_DOWN);if (allotNum.compareTo(new BigDecimal(inventoryBucketConfig.getMinDepthNum())) < 0) {allotNum = new BigDecimal(inventoryBucketConfig.getMinDepthNum());}//当最小深度都已无法满足剩余库存,则以实际剩余库存扩容if (new BigDecimal(residueNum).compareTo(allotNum) < 0) {return residueNum;}//得到扩容的库存值return allotNum.intValue();}//返回目前扩容后的库存深度,库存深度只允许增长不允许减少//@param inventoryNum          步长扩容库存//@param inventoryBucketConfig 分桶配置信息//@param bucketCache           分桶信息private Integer getMaxDepthNum(Integer inventoryNum, InventoryBucketConfigDO inventoryBucketConfig, BucketCacheBO bucketCache, BucketCapacityContext bucketCapacityContext) {//获取当前分桶的实际库存,实际库存和真实库存会有差异,但是这里只是计算一个大概库存深度,无需精确Integer residueNum = bucketCapacityContext.getResidueNum();//预估出实际库存深度,当前分桶库存 + 步长增长库存Integer maxBucketNum = residueNum + inventoryNum;if (bucketCache.getBucketNum() > maxBucketNum) {return bucketCache.getBucketNum();}log.info("前分桶的实际库存:{},预估的实际库存深度:{}", residueNum, maxBucketNum);//实际库存深度,不能超过配置的最大库存深度,同理,最小深度也不能小于最小的库存深度if (inventoryBucketConfig.getMaxDepthNum() < maxBucketNum) {return inventoryBucketConfig.getMaxDepthNum();}return maxBucketNum;}//刷新分桶元数据缓存//@param maxDepthNum      分桶最大库存深度//@param bucketLocalCache 分桶元数据信息//@param bucketNo         分桶编号private void refreshBucketCache(Integer maxDepthNum, BucketLocalCache bucketLocalCache, String bucketNo, Integer inventoryNum) {List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();for (BucketCacheBO bucketCacheBO : availableList) {if (bucketCacheBO.getBucketNo().equals(bucketNo)) {//每次库存具体深度变化都要更细,否则很容易触发 回源的比例bucketCacheBO.setBucketNum(maxDepthNum);bucketCacheBO.setAllotNum(inventoryNum + (Objects.isNull(bucketCacheBO.getAllotNum()) ? 0 : bucketCacheBO.getAllotNum()));break;}}String key = bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();//1.刷新本地缓存inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);//2.刷新远程缓存tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);}...
}

12.库存分桶扩容完成以及分桶下线触发

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//分桶扩容接口@Overridepublic void bucketCapacity(BucketCapacity bucketCapacity) {//先锁住中心桶库存,避免此时库存发生变化String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();String value = SnowflakeIdWorker.getCode();//1.校验是否已经无需扩容了,如果是则快速结束BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (!bucketCapacityContext.getIsCapacity()) {return;}//获取分布式锁来进行扩容处理boolean lock = tairLock.tryLock(key, value);if (lock) {try {//再次校验是否需要扩容,此处不允许并发bucketCapacityContext = checkBucketCapacity(bucketCapacity);if (bucketCapacityContext.getIsCapacity()) {//2.获取中心桶缓存的剩余库存Integer residueNum = getCenterStock(bucketCapacity);//3.可以扩容,计算出可回源的库存进行处理if (residueNum > 0) {backSourceInventory(residueNum, bucketCapacityContext);} else {//4.中心桶无库存,检查是否触发下线checkBucketOffline(bucketCapacity);}}} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}}//校验当前分桶是否触发下线的阈值private void checkBucketOffline(BucketCapacity bucketCapacity) {//1.获取当前分桶的配置信息String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);//2.检测分桶的库存是否触发下线阈值,先获取当前分桶的具体库存以及下线配置阈值Integer thresholdValue = bucketLocalCache.getInventoryBucketConfig().getThresholdValue();Integer inventoryNum = getBucketInventoryNum(bucketCapacity.getBucketNo());//3.如果触发下线,发送消息调用分桶下线if (thresholdValue > inventoryNum) {log.info("触发下线{},阈值{},当前库存值{}", thresholdValue > inventoryNum, thresholdValue, inventoryNum);sendAsynchronous(bucketCapacity);}}//对分桶进行异步下线private void sendAsynchronous(BucketCapacity bucketCapacity) {//1.构建分桶下线接口模型InventorOfflineRequest offlineRequest = buildOfflineBucketInfo(bucketCapacity);//2.发送消息,通知处理分桶下线bucketOfflineProducer.sendBucketOffline(offlineRequest);}...
}

13.库存分桶下线以及剩余存量归还中心桶

//处理分桶下线的消息
@Component
public class BucketOfflineListener implements MessageListenerConcurrently {@Autowiredprivate InventoryBucketService inventoryBucketService;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());log.info("执行分桶下线,消息内容:{}", msg);InventorOfflineRequest inventorOfflineRequest = JsonUtil.json2Object(msg, InventorOfflineRequest.class);inventoryBucketService.bucketOffline(inventorOfflineRequest);}} catch (Exception e) {log.error("consume error, 分桶下线失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//分桶下线接口@Overridepublic void bucketOffline(InventorOfflineRequest request) {long start = System.currentTimeMillis();//1.验证入参必填checkInventorOfflineParams(request);//过滤只有一个分桶的无效请求Boolean isOffline = checkBucketOffline(request);if (isOffline) {//2.注意这里需要锁定中心桶库存String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + request.getSellerId() + request.getSkuId();String value = SnowflakeIdWorker.getCode();boolean lock = tairLock.tryLock(key, value);if (lock) {try {//3.先将准备下线的分桶库存从本地和远程列表中移除至不可用列表,避免新的请求进来updateBucket(request);} catch (Exception e) {e.printStackTrace();} finally {tairLock.unlock(key, value);}} else {throw new BaseBizException("请求繁忙,稍后重试!");}log.error("分桶下线处理时间,request:{}, lock:{}, time:{}", JSON.toJSONString(request), lock, System.currentTimeMillis() - start);}}//将准备下线的分桶列表,先从本地缓存以及远程缓存列表里面移除//待改进点://先更新分桶库存缓存,再更新本地分桶元数据缓存及远程元数据缓存,避免不同机器的本地分桶元数据缓存不一致//比如更新了本地缓存的机器不会路由到该分桶,而没更新本地缓存的机器依然路由到了该分桶private void updateBucket(InventorOfflineRequest request) {//1.获取本地和远程的分桶列表BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());if (!Objects.isNull(bucketLocalCache)) {//过滤返回下线的分桶确实存在于存活的分桶列表上Map<String, BucketCacheBO> bucketCacheMap = bucketLocalCache.getAvailableList().stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));//过滤已不存在远程缓存的列表List<String> bucketCacheList = request.getBucketNoList().stream().filter(bucketCacheMap::containsKey).collect(Collectors.toList());//过滤后,有可下线的分桶缓存if (!CollectionUtils.isEmpty(bucketCacheList)) {//分桶最少也需要保留一个if (bucketLocalCache.getAvailableList().size() > 1) {//2.先移除缓存的分桶列表,避免新的请求访问影响真实库存updateBucketCache(bucketCacheList, bucketLocalCache);}}}}//移除本地分桶的对应分桶列表以及远程的分桶列表//@param bucketCacheList 下线的分桶列表//@param bucketCache     远程缓存元数据信息private void updateBucketCache(List<String> bucketCacheList, BucketLocalCache bucketCache) {String key = bucketCache.getSellerId() + bucketCache.getSkuId();//1.获取到本地的缓存列表BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);//2.填充下线的分桶到不可用列表中for (String bucketNo : bucketCacheList) {bucketLocalCache.getUndistributedList().add(new BucketCacheBO(bucketNo));}//过滤返还上线的分桶列表List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList().stream().filter(bucketCacheBO -> !bucketCacheList.contains(bucketCacheBO.getBucketNo())).collect(Collectors.toList());bucketLocalCache.setAvailableList(availableList);//3.从本地缓存里面更新inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);//4.覆盖远程的分桶元数据信息tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);log.info("下线分桶,分桶元数据信息:{}", JSONObject.toJSONString(bucketLocalCache));//发送清空下线分桶库存的消息bucketClearProducer.sendBucketClear(new BucketClearRequest(bucketCache.getSkuId(), bucketCache.getSellerId(), bucketCacheList, 0));}...
}//处理清空分桶库存的消息
@Component
public class BucketClearListener implements MessageListenerConcurrently {@Autowiredprivate InventoryBucketService inventoryBucketService;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt : list) {String msg = new String(messageExt.getBody());log.info("执行分桶下线清空库存,消息内容:{}", msg);BucketClearRequest bucketClearRequest = JsonUtil.json2Object(msg, BucketClearRequest.class);inventoryBucketService.bucketClear(bucketClearRequest);}} catch (Exception e) {log.error("consume error, 清空分桶库存失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//清空分桶库存,分桶库存放回中央库存@Overridepublic void bucketClear(BucketClearRequest request) {long start = System.currentTimeMillis();String key = TairInventoryConstant.SELLER_BUCKET_PREFIX + request.getSellerId() + request.getSkuId();String bucketCache = tairCache.get(key);if (!StringUtils.isEmpty(bucketCache)) {BucketLocalCache bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);updateBucketInventory(request.getBucketNoList(), bucketLocalCache);}log.error("清空下线分桶库存,request:{},时间:{}", JSON.toJSONString(request), System.currentTimeMillis() - start);//商品库存值预警warningProductInventory(bucketCache);}//将分桶的缓存库存返回给中心桶库存上private void updateBucketInventory(List<String> bucketCacheList, BucketLocalCache bucketLocalCache) {//中心桶的库存keyString key = TairInventoryConstant.SELLER_INVENTORY_PREFIX + bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();Integer inventoryNum = 0;//下线的分桶列表List<String> undistributedList = bucketLocalCache.getUndistributedList().stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());//只处理已经下线的分桶bucketCacheList = bucketCacheList.stream().filter(undistributedList::contains).collect(Collectors.toList());if (CollectionUtils.isEmpty(bucketCacheList)) {return;}for (String bucketNo : bucketCacheList) {//先获取下线的分桶实际剩余库存String bucketNum = tairCache.get(bucketNo);//当分桶的库存大于0的时候才处理if (!StringUtils.isEmpty(bucketNum) && Integer.valueOf(bucketNum) > 0) {//清理下线的分桶库存,设置为0Integer result = tairCache.decr(bucketNo, Integer.parseInt(bucketNum));if (result >= 0) {log.info("下线分桶,bucketNo:{},desc:{}", bucketNo, bucketNum);inventoryNum = inventoryNum + Integer.parseInt(bucketNum);} else {log.info("分桶已下线,bucketNo:{}", bucketNo);}}}if (inventoryNum > 0) {//将下线的剩余库存加至 中心桶库存上Integer incr = tairCache.incr(key, inventoryNum);log.info("回源中心桶,inventoryNum:{}, after value :{}", inventoryNum, incr);}}...
}

14.库存下线触发剩余库存总量预警机制

@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {...//对商品的库存发生变化进行预警处理private void warningProductInventory(String bucketCache) {//1.批量获取一下可用的缓存分桶列表编号BucketLocalCache bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();//2.批量获取汇总商品剩余库存(分桶下线代表中心桶库存已经没有了,不校验中心桶库存)List<String> cacheKeyList = availableList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());List<String> productInventoryList = tairCache.mget(cacheKeyList);//3.检测卖家单个商品的总库存 是否触发最小值预警或者百分比预警,是则异步消息通知供需服务Integer sumInventoryNum = 0;for (int i = 0; i < productInventoryList.size(); i++) {String inventoryNum = productInventoryList.get(i);if (StringUtils.isNotEmpty(inventoryNum)) {sumInventoryNum = sumInventoryNum + Integer.valueOf(inventoryNum);}}Boolean isWarning = false;//如果实际库存值,小于预警值500,或者总库存触发比例阈值,异步消息通知if (sumInventoryNum < 500) {isWarning = true;}//未触发最小库存预警,检测是否触发最小比例预警if (!isWarning) {//总的库存深度,不仅仅要看可用分桶的库存深度,还要看下线的库存深度,从而计算出一个当时实际分配的库存深度,计算出一个预警值int sumBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList();if (!CollectionUtils.isEmpty(undistributedList)) {sumBucketNum = sumBucketNum + undistributedList.stream().mapToInt(cacheBO -> Objects.isNull(cacheBO.getBucketNum()) ? 0 : cacheBO.getBucketNum()).sum();}log.info("总的库存深度:{}", sumBucketNum);//预警比例BigDecimal warningProportion = new BigDecimal(proportion).divide(new BigDecimal(100), 3, BigDecimal.ROUND_DOWN);//库存占比BigDecimal inventoryProportion = new BigDecimal(sumInventoryNum).divide(new BigDecimal(sumBucketNum), 6, BigDecimal.ROUND_HALF_UP);//配置的预警比例,大于分配的实际库存深度和已剩的库存占比if (warningProportion.compareTo(inventoryProportion) > 0) {isWarning = true;}}//异步消息通知预警if (isWarning) {WarningInventoryDTO warningInventoryDTO = inventoryConverter.converterDTO(bucketLocalCache);warningInventoryProducer.sendWarningInventory(warningInventoryDTO);}}...
}

http://www.lryc.cn/news/575828.html

相关文章:

  • 一款被我拿来处理图片和视频的免费环保软件
  • Web基础关键_003_CSS(一)
  • 小程序学习笔记:加载效果、上拉加载与节流处理
  • Ubuntu安装Docker部署Python Flask Web应用
  • PHP语法基础篇(六):数组
  • 代码随想录|图论|09沉没孤岛
  • LSTM每个变量的shape分析
  • 从输入到路径:AI赋能的地图语义解析与可视化探索之旅
  • 通过ETL从MySQL同步到GaussDB
  • 喜讯 | Mediatom斩获2025第十三届TopDigital创新营销奖「年度程序化广告平台」殊荣
  • LINUX625 DNS反向解析
  • 基于 Spring Boot + Vue 3的现代化社区团购系统
  • 科技如何影响我们的生活?
  • 工业电子 | 什么是SerDes,为何工业和汽车应用需要它?
  • HarmonyOS NEXT仓颉开发语言实战案例:简约音乐播放页
  • 金蝶云星空客户端自定义控件插件-WPF实现自定义控件
  • 使用Docker部署mysql8
  • 社会工程--如何使用对方的语言
  • JDBC入门:Java连接数据库全指南
  • AI辅助编写前端VUE应用流程
  • 树状dp(dfs)(一道挺基础的)
  • Spring Boot 项目问题:while constructing a mapping found duplicate key api
  • 微信小程序封装loading 修改
  • 常见网络安全威胁和防御措施
  • 智能实验室革命:Deepoc大模型驱动全自动化科研新生态
  • HTML简介,初步了解HTML
  • SQl中多使用EXISTS导致多查出了一条不符合条件的数据
  • 教程 | 一键批量下载 Dify「Markdown 转 Docx」生成的 Word 文件(附源码)
  • 【Linux】基础开发工具(2)
  • 操作系统面试知识点(1):操作系统基础