当前位置: 首页 > news >正文

分布式事务与分布式锁

Java分布式事务与分布式锁技术指南

一、分布式事务

1. 基本概念

1.1 定义与特性

分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务需要保证其具备事务的ACID特性:

  • 原子性(Atomicity):事务中包含的操作被看做一个逻辑单元,这个逻辑单元中的操作要么全部成功,要么全部失败
  • 一致性(Consistency):只有合法的数据可以被写入数据库,否则事务应该将其回滚到最初状态
  • 隔离性(Isolation):事务允许多个用户对同一个数据进行并发访问,而不破坏数据的正确性和完整性
  • 持久性(Durability):事务结束后,事务处理的结果必须能够得到固化
1.2 分布式事务挑战

网络不可靠性

  • 网络分区导致消息丢失
  • 网络延迟造成超时
  • 节点故障影响事务进行

并发控制复杂

  • 跨节点的锁管理
  • 死锁检测与处理
  • 性能与一致性平衡

故障恢复困难

  • 部分节点故障恢复
  • 事务状态同步
  • 数据一致性修复
1.3 CAP理论与BASE理论

CAP理论

  • Consistency (一致性):所有节点在同一时间具有相同的数据
  • Availability (可用性):保证每个请求都能接收到成功或者失败的响应
  • Partition tolerance (分区容错性):系统在遇到某节点或网络分区故障时,仍然能够继续提供满足一致性和可用性的服务

分布式系统无法同时满足CAP三个特性,只能满足其中两个。

BASE理论

  • Basically Available (基本可用):系统在出现不可预知故障时,允许损失部分可用性
  • Soft state (软状态):系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性
  • Eventually consistent (最终一致性):系统中的所有数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态

2. 主流解决方案

2.1 两阶段提交(2PC)
2.1.1 协议流程

第一阶段(投票阶段)

// 协调者
public class TwoPhaseCommitCoordinator {public boolean prepare(List<Participant> participants, Transaction tx) {List<Boolean> votes = new ArrayList<>();for (Participant participant : participants) {try {boolean vote = participant.prepare(tx);votes.add(vote);if (!vote) {// 如果有参与者投反对票,直接返回falsereturn false;}} catch (Exception e) {// 参与者无响应,视为反对票return false;}}return votes.stream().allMatch(vote -> vote);}
}// 参与者
public class TwoPhaseCommitParticipant implements Participant {@Overridepublic boolean prepare(Transaction tx) {try {// 执行事务操作但不提交executeTransaction(tx);// 将事务状态写入日志writeToLog("PREPARED", tx);return true; // 投赞成票} catch (Exception e) {// 回滚操作rollback(tx);return false; // 投反对票}}
}

第二阶段(提交阶段)

public void commit(List<Participant> participants, Transaction tx) {for (Participant participant : participants) {try {participant.commit(tx);} catch (Exception e) {// 记录错误,可能需要人工干预logger.error("Participant commit failed", e);}}
}public void abort(List<Participant> participants, Transaction tx) {for (Participant participant : participants) {try {participant.rollback(tx);} catch (Exception e) {logger.error("Participant rollback failed", e);}}
}
2.1.2 优缺点分析

优点:

  • 实现简单,容易理解
  • 保证强一致性
  • 数据完整性好

缺点:

  • 同步阻塞:参与者在等待协调者指令时会阻塞
  • 单点故障:协调者故障会导致整个系统瘫痪
  • 数据不一致:在第二阶段如果出现网络分区,可能导致部分参与者提交,部分参与者回滚
2.1.3 改进方案(3PC)

三阶段提交增加了超时机制和预提交阶段:

public class ThreePhaseCommitCoordinator {// 第一阶段:CanCommitpublic boolean canCommit(List<Participant> participants, Transaction tx) {for (Participant participant : participants) {if (!participant.canCommit(tx)) {return false;}}return true;}// 第二阶段:PreCommitpublic boolean preCommit(List<Participant> participants, Transaction tx) {for (Participant participant : participants) {if (!participant.preCommit(tx)) {return false;}}return true;}// 第三阶段:DoCommitpublic void doCommit(List<Participant> participants, Transaction tx) {for (Participant participant : participants) {participant.doCommit(tx);}}
}
2.2 TCC补偿事务
2.2.1 Try-Confirm-Cancel三阶段

Try阶段:尝试执行业务,完成所有业务检查,预留必要的业务资源
Confirm阶段:确认执行业务,使用Try阶段预留的业务资源
Cancel阶段:取消执行业务,释放Try阶段预留的业务资源

@Service
public class AccountTccService {@Autowiredprivate AccountMapper accountMapper;/*** Try阶段:冻结账户金额*/@Compensable(confirmMethod = "confirmDeduct", cancelMethod = "cancelDeduct")public boolean tryDeduct(String accountId, BigDecimal amount) {Account account = accountMapper.selectById(accountId);if (account.getBalance().compareTo(amount) < 0) {throw new RuntimeException("余额不足");}// 冻结金额account.setFrozenAmount(account.getFrozenAmount().add(amount));accountMapper.updateById(account);return true;}/*** Confirm阶段:确认扣款*/public boolean confirmDeduct(String accountId, BigDecimal amount) {Account account = accountMapper.selectById(accountId);// 扣减余额account.setBalance(account.getBalance().subtract(amount));// 解冻金额account.setFrozenAmount(account.getFrozenAmount().subtract(amount));accountMapper.updateById(account);return true;}/*** Cancel阶段:取消扣款*/public boolean cancelDeduct(String accountId, BigDecimal amount) {Account account = accountMapper.selectById(accountId);// 解冻金额account.setFrozenAmount(account.getFrozenAmount().subtract(amount));accountMapper.updateById(account);return true;}
}
2.2.2 实现要点

幂等性保证

@Service
public class IdempotentTccService {@Autowiredprivate TccTransactionRepository transactionRepository;@Compensable(confirmMethod = "confirmTransfer", cancelMethod = "cancelTransfer")public boolean tryTransfer(String transactionId, TransferRequest request) {// 检查事务是否已存在TccTransaction existingTx = transactionRepository.findById(transactionId);if (existingTx != null) {return existingTx.getStatus() == TccStatus.TRIED;}// 创建事务记录TccTransaction transaction = new TccTransaction();transaction.setTransactionId(transactionId);transaction.setStatus(TccStatus.TRYING);transactionRepository.save(transaction);try {// 执行业务逻辑performTryLogic(request);transaction.setStatus(TccStatus.TRIED);transactionRepository.save(transaction);return true;} catch (Exception e) {transaction.setStatus(TccStatus.FAILED);transactionRepository.save(transaction);throw e;}}
}
2.2.3 适用场景
  • 对一致性要求较高的核心业务
  • 业务逻辑相对简单,容易实现补偿
  • 参与者较少的分布式事务
  • 对性能要求较高的场景
2.3 Saga模式
2.3.1 编排式实现

中央协调器控制整个事务流程:

@Component
public class OrderSagaOrchestrator {@Autowiredprivate OrderService orderService;@Autowiredprivate PaymentService paymentService;@Autowiredprivate InventoryService inventoryService;public void processOrder(OrderRequest request) {SagaTransaction saga = new SagaTransaction();try {// 步骤1:创建订单OrderCreatedEvent orderEvent = orderService.createOrder(request);saga.addCompensation(() -> orderService.cancelOrder(orderEvent.getOrderId()));// 步骤2:扣减库存InventoryReservedEvent inventoryEvent = inventoryService.reserveInventory(orderEvent.getOrderId(), request.getItems());saga.addCompensation(() -> inventoryService.releaseInventory(inventoryEvent.getReservationId()));// 步骤3:处理支付PaymentProcessedEvent paymentEvent = paymentService.processPayment(orderEvent.getOrderId(), request.getPaymentInfo());saga.addCompensation(() -> paymentService.refund(paymentEvent.getPaymentId()));// 所有步骤成功,提交事务saga.commit();} catch (Exception e) {// 执行补偿操作saga.compensate();throw new SagaExecutionException("订单处理失败", e);}}
}public class SagaTransaction {private List<Runnable> compensations = new ArrayList<>();public void addCompensation(Runnable compensation) {compensations.add(0, compensation); // LIFO顺序}public void compensate() {for (Runnable compensation : compensations) {try {compensation.run();} catch (Exception e) {logger.error("补偿操作失败", e);}}}public void commit() {// 清空补偿操作compensations.clear();}
}
2.3.2 协同式实现

基于事件驱动的实现方式:

@Component
public class OrderEventHandler {@EventListenerpublic void handleOrderCreated(OrderCreatedEvent event) {try {inventoryService.reserveInventory(event.getOrderId(), event.getItems());} catch (Exception e) {// 发布订单创建失败事件eventPublisher.publishEvent(new OrderCreationFailedEvent(event.getOrderId()));}}@EventListenerpublic void handleInventoryReserved(InventoryReservedEvent event) {try {paymentService.processPayment(event.getOrderId(), event.getPaymentInfo());} catch (Exception e) {// 发布库存预留失败事件eventPublisher.publishEvent(new InventoryReservationFailedEvent(event.getOrderId()));}}@EventListenerpublic void handleInventoryReservationFailed(InventoryReservationFailedEvent event) {// 补偿:取消订单orderService.cancelOrder(event.getOrderId());}
}
2.3.3 优缺点对比

编排式优点:

  • 集中控制,逻辑清晰
  • 容易调试和监控
  • 补偿逻辑明确

编排式缺点:

  • 单点故障风险
  • 协调器复杂度高
  • 紧耦合

协同式优点:

  • 松耦合
  • 扩展性好
  • 无单点故障

协同式缺点:

  • 逻辑分散,难以理解
  • 调试困难
  • 事件风暴风险
2.4 本地消息表
2.4.1 实现原理
@Service
@Transactional
public class LocalMessageService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate LocalMessageMapper messageMapper;@Autowiredprivate MessageProducer messageProducer;public void createOrderWithMessage(OrderRequest request) {// 1. 在本地事务中创建订单和消息记录Order order = new Order();order.setOrderId(UUID.randomUUID().toString());order.setUserId(request.getUserId());order.setAmount(request.getAmount());order.setStatus(OrderStatus.PENDING);orderMapper.insert(order);// 2. 创建本地消息记录LocalMessage message = new LocalMessage();message.setMessageId(UUID.randomUUID().toString());message.setBusinessId(order.getOrderId());message.setMessageContent(JSON.toJSONString(order));message.setStatus(MessageStatus.PENDING);message.setCreatedTime(new Date());messageMapper.insert(message);// 本地事务提交后,异步发送消息}@Async@EventListener@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)public void handleOrderCreated(OrderCreatedEvent event) {// 发送消息到MQList<LocalMessage> pendingMessages = messageMapper.findPendingMessages();for (LocalMessage message : pendingMessages) {try {messageProducer.sendMessage(message.getMessageContent());// 更新消息状态为已发送message.setStatus(MessageStatus.SENT);message.setSentTime(new Date());messageMapper.updateById(message);} catch (Exception e) {logger.error("发送消息失败", e);// 可以实现重试机制}}}
}
2.4.2 消息可靠性保证

定时补偿机制

@Component
public class MessageCompensationTask {@Scheduled(fixedDelay = 30000) // 每30秒执行一次public void compensateFailedMessages() {// 查找超时未发送的消息List<LocalMessage> timeoutMessages = messageMapper.findTimeoutMessages();for (LocalMessage message : timeoutMessages) {if (message.getRetryCount() < MAX_RETRY_COUNT) {try {messageProducer.sendMessage(message.getMessageContent());message.setStatus(MessageStatus.SENT);message.setSentTime(new Date());messageMapper.updateById(message);} catch (Exception e) {message.setRetryCount(message.getRetryCount() + 1);message.setLastRetryTime(new Date());messageMapper.updateById(message);}} else {// 超过最大重试次数,标记为失败message.setStatus(MessageStatus.FAILED);messageMapper.updateById(message);}}}
}
2.5 最大努力通知
2.5.1 实现机制
@Service
public class BestEffortNotificationService {private static final int[] RETRY_INTERVALS = {1, 5, 10, 30, 60, 120}; // 分钟@Autowiredprivate NotificationMapper notificationMapper;@Autowiredprivate HttpClient httpClient;public void sendNotification(NotificationRequest request) {Notification notification = new Notification();notification.setId(UUID.randomUUID().toString());notification.setUrl(request.getCallbackUrl());notification.setContent(request.getContent());notification.setStatus(NotificationStatus.PENDING);notification.setRetryCount(0);notification.setCreatedTime(new Date());notificationMapper.insert(notification);// 立即尝试发送tryToSendNotification(notification);}private void tryToSendNotification(Notification notification) {try {HttpResponse response = httpClient.post(notification.getUrl(), notification.getContent());if (response.getStatusCode() == 200) {// 发送成功notification.setStatus(NotificationStatus.SUCCESS);notification.setSuccessTime(new Date());notificationMapper.updateById(notification);} else {// 发送失败,安排重试scheduleRetry(notification);}} catch (Exception e) {logger.error("通知发送失败", e);scheduleRetry(notification);}}private void scheduleRetry(Notification notification) {int retryCount = notification.getRetryCount();if (retryCount < RETRY_INTERVALS.length) {// 计算下次重试时间long delayMinutes = RETRY_INTERVALS[retryCount];Date nextRetryTime = new Date(System.currentTimeMillis() + delayMinutes * 60 * 1000);notification.setRetryCount(retryCount + 1);notification.setNextRetryTime(nextRetryTime);notification.setStatus(NotificationStatus.RETRYING);notificationMapper.updateById(notification);} else {// 超过最大重试次数notification.setStatus(NotificationStatus.FAILED);notificationMapper.updateById(notification);}}@Scheduled(fixedDelay = 60000) // 每分钟检查一次public void processRetryNotifications() {List<Notification> retryNotifications = notificationMapper.findRetryNotifications();for (Notification notification : retryNotifications) {tryToSendNotification(notification);}}
}
2.5.2 适用场景
  • 对实时性要求不高的通知场景
  • 第三方系统集成
  • 支付结果通知
  • 订单状态同步

3. 技术框架

3.1 Seata

Seata是阿里开源的分布式事务解决方案,支持AT、TCC、SAGA、XA四种模式。

AT模式配置

# application.yml
seata:enabled: trueapplication-id: order-servicetx-service-group: my_tx_groupconfig:type: nacosnacos:server-addr: 127.0.0.1:8848namespace: seatagroup: SEATA_GROUPregistry:type: nacosnacos:server-addr: 127.0.0.1:8848namespace: seatagroup: SEATA_GROUP

业务代码

@Service
public class OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate AccountService accountService;@Autowiredprivate InventoryService inventoryService;@GlobalTransactional(rollbackFor = Exception.class)public void createOrder(OrderRequest request) {// 1. 创建订单Order order = new Order();order.setUserId(request.getUserId());order.setAmount(request.getAmount());orderMapper.insert(order);// 2. 扣减账户余额accountService.deduct(request.getUserId(), request.getAmount());// 3. 扣减库存inventoryService.deduct(request.getProductId(), request.getQuantity());}
}
3.2 Hmily

Hmily是基于TCC补偿模式的分布式事务框架。

@Service
public class PaymentServiceImpl implements PaymentService {@HmilyTCC(confirmMethod = "confirmPayment", cancelMethod = "cancelPayment")@Overridepublic boolean makePayment(PaymentRequest request) {// Try阶段:冻结金额return freezeAmount(request.getUserId(), request.getAmount());}public boolean confirmPayment(PaymentRequest request) {// Confirm阶段:确认扣款return deductAmount(request.getUserId(), request.getAmount());}public boolean cancelPayment(PaymentRequest request) {// Cancel阶段:解冻金额return unfreezeAmount(request.getUserId(), request.getAmount());}
}
3.3 TCC-Transaction

京东开源的TCC分布式事务框架。

@Compensable
public class AccountServiceImpl implements AccountService {@Compensable(confirmMethod = "confirmTransferFrom", cancelMethod = "cancelTransferFrom")public void transferFrom(String accountId, BigDecimal amount) {// Try阶段实现Account account = accountRepository.findById(accountId);account.freezeAmount(amount);accountRepository.save(account);}public void confirmTransferFrom(String accountId, BigDecimal amount) {// Confirm阶段实现Account account = accountRepository.findById(accountId);account.deductBalance(amount);account.unfreezeAmount(amount);accountRepository.save(account);}public void cancelTransferFrom(String accountId, BigDecimal amount) {// Cancel阶段实现Account account = accountRepository.findById(accountId);account.unfreezeAmount(amount);accountRepository.save(account);}
}
3.4 消息队列集成方案

RocketMQ事务消息

@Service
public class TransactionalMessageService {@Autowiredprivate TransactionMQProducer producer;public void sendTransactionalMessage(OrderRequest request) {Message message = new Message("OrderTopic", JSON.toJSONString(request).getBytes());producer.sendMessageInTransaction(message, request);}@Componentpublic static class OrderTransactionListener implements TransactionListener {@Autowiredprivate OrderService orderService;@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {OrderRequest request = (OrderRequest) arg;orderService.createOrder(request);return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 检查本地事务状态String orderId = msg.getUserProperty("orderId");Order order = orderService.getOrder(orderId);if (order != null) {return LocalTransactionState.COMMIT_MESSAGE;} else {return LocalTransactionState.ROLLBACK_MESSAGE;}}}
}

4. 实践与优化

4.1 事务拆分策略

按业务边界拆分

// 不好的设计:一个大事务
@GlobalTransactional
public void processOrder(OrderRequest request) {createOrder(request);updateInventory(request);processPayment(request);sendNotification(request);updateUserPoints(request);recordAnalytics(request);
}// 好的设计:拆分为核心事务和异步处理
@GlobalTransactional
public void processCoreOrder(OrderRequest request) {createOrder(request);updateInventory(request);processPayment(request);
}@Async
public void processOrderAsync(OrderCreatedEvent event) {sendNotification(event);updateUserPoints(event);recordAnalytics(event);
}
4.2 性能优化方向

批量处理

@Service
public class BatchTransactionService {@GlobalTransactionalpublic void batchProcessOrders(List<OrderRequest> requests) {// 批量创建订单List<Order> orders = requests.stream().map(this::createOrder).collect(Collectors.toList());orderMapper.batchInsert(orders);// 批量更新库存Map<String, Integer> inventoryUpdates = requests.stream().collect(Collectors.groupingBy(OrderRequest::getProductId,Collectors.summingInt(OrderRequest::getQuantity)));inventoryService.batchUpdate(inventoryUpdates);}
}

异步化改造

@Service
public class AsyncTransactionService {@Autowiredprivate RabbitTemplate rabbitTemplate;@GlobalTransactionalpublic void createOrderAsync(OrderRequest request) {// 核心业务同步处理Order order = createOrder(request);// 非核心业务异步处理OrderCreatedEvent event = new OrderCreatedEvent(order.getId());rabbitTemplate.convertAndSend("order.created", event);}@RabbitListener(queues = "order.created")public void handleOrderCreated(OrderCreatedEvent event) {// 异步处理用户积分、通知等updateUserPoints(event);sendNotification(event);}
}
4.3 常见问题与解决方案

超时问题

@Service
public class TimeoutHandlingService {@GlobalTransactional(timeoutMills = 60000) // 设置事务超时时间public void processLongRunningTransaction() {// 对于长时间运行的事务,考虑拆分processInBatches();}private void processInBatches() {List<Item> items = getAllItems();int batchSize = 100;for (int i = 0; i < items.size(); i += batchSize) {List<Item> batch = items.subList(i, Math.min(i + batchSize, items.size()));processBatch(batch);}}
}

幂等性处理

@Service
public class IdempotentTransactionService {@Autowiredprivate TransactionRecordMapper recordMapper;@GlobalTransactionalpublic void processIdempotentTransaction(String transactionId, OrderRequest request) {// 检查事务是否已处理TransactionRecord record = recordMapper.findByTransactionId(transactionId);if (record != null && record.getStatus() == TransactionStatus.SUCCESS) {return; // 已处理过,直接返回}try {// 处理业务逻辑processOrder(request);// 记录事务成功if (record == null) {record = new TransactionRecord();record.setTransactionId(transactionId);}record.setStatus(TransactionStatus.SUCCESS);recordMapper.save(record);} catch (Exception e) {// 记录事务失败if (record == null) {record = new TransactionRecord();record.setTransactionId(transactionId);}record.setStatus(TransactionStatus.FAILED);recordMapper.save(record);throw e;}}
}

二、分布式锁

1. 基本概念

1.1 定义与作用

分布式锁是在分布式系统中实现互斥访问共享资源的一种机制。当多个进程或线程需要访问同一个共享资源时,通过分布式锁来保证在同一时刻只有一个进程能够访问该资源,从而保证数据的一致性和避免竞态条件。

主要作用:

  • 保证数据一致性
  • 避免重复执行
  • 资源互斥访问
  • 任务调度控制
1.2 核心特性

互斥性(Mutual Exclusion)

  • 在任意时刻,只有一个客户端能持有锁

防死锁(Deadlock Prevention)

  • 锁必须有超时机制,避免死锁

容错性(Fault Tolerance)

  • 锁服务必须具有高可用性

可重入性(Reentrant)

  • 同一个客户端可以重复获取同一把锁
1.3 应用场景

定时任务

@Component
public class ScheduledTaskWithLock {@Autowiredprivate DistributedLock distributedLock;@Scheduled(fixedRate = 60000)public void executeTask() {String lockKey = "scheduled-task-lock";if (distributedLock.tryLock(lockKey, 30, TimeUnit.SECONDS)) {try {// 执行定时任务doScheduledWork();} finally {distributedLock.unlock(lockKey);}}}
}

订单号生成

@Service
public class OrderNumberGenerator {@Autowiredprivate DistributedLock distributedLock;@Autowiredprivate RedisTemplate<String, String> redisTemplate;public String generateOrderNumber() {String lockKey = "order-number-generator";if (distributedLock.tryLock(lockKey, 5, TimeUnit.SECONDS)) {try {String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));String counterKey = "order:counter:" + today;Long counter = redisTemplate.opsForValue().increment(counterKey);if (counter == 1) {// 设置过期时间为第二天凌晨redisTemplate.expire(counterKey, Duration.ofDays(1));}return today + String.format("%06d", counter);} finally {distributedLock.unlock(lockKey);}} else {throw new RuntimeException("获取锁失败");}}
}

2. 实现方案

2.1 基于Redis
2.1.1 SET NX EX实现

基础实现

@Component
public class RedisDistributedLock implements DistributedLock {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"    return redis.call('del', KEYS[1]) " +"else " +"    return 0 " +"end";@Overridepublic boolean tryLock(String key, long timeout, TimeUnit unit) {String value = UUID.randomUUID().toString();long expireTime = unit.toSeconds(timeout);Boolean result = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, TimeUnit.SECONDS);if (Boolean.TRUE.equals(result)) {// 在ThreadLocal中保存锁的值,用于释放锁时验证lockValueHolder.set(value);return true;}return false;}@Overridepublic void unlock(String key) {String value = lockValueHolder.get();if (value != null) {DefaultRedisScript<Long> script = new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class);redisTemplate.execute(script, Collections.singletonList(key), value);lockValueHolder.remove();}}private ThreadLocal<String> lockValueHolder = new ThreadLocal<>();
}

可重入锁实现

@Component
public class ReentrantRedisLock implements DistributedLock {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String LOCK_SCRIPT = "local key = KEYS[1] " +"local value = ARGV[1] " +"local ttl = tonumber(ARGV[2]) " +"local current = redis.call('HGET', key, 'value') " +"if current == false then " +"    redis.call('HSET', key, 'value', value) " +"    redis.call('HSET', key, 'count', 1) " +"    redis.call('EXPIRE', key, ttl) " +"    return 1 " +"elseif current == value then " +"    local count = redis.call('HINCRBY', key, 'count', 1) " +"    redis.call('EXPIRE', key, ttl) " +"    return 1 " +"else " +"    return 0 " +"end";private static final String UNLOCK_SCRIPT = "local key = KEYS[1] " +"local value = ARGV[1] " +"local current = redis.call('HGET', key, 'value') " +"if current == false then " +"    return 0 " +"elseif current == value then " +"    local count = redis.call('HINCRBY', key, 'count', -1) " +"    if count > 0 then " +"        return 1 " +"    else " +"        redis.call('DEL', key) " +"        return 1 " +"    end " +"else " +"    return 0 " +"end";private ThreadLocal<String> lockValues = new ThreadLocal<>();private ThreadLocal<Integer> lockCounts = new ThreadLocal<>();@Overridepublic boolean tryLock(String key, long timeout, TimeUnit unit) {String value = lockValues.get();if (value == null) {value = UUID.randomUUID().toString();lockValues.set(value);}DefaultRedisScript<Long> script = new DefaultRedisScript<>(LOCK_SCRIPT, Long.class);Long result = redisTemplate.execute(script, Collections.singletonList(key), value, String.valueOf(unit.toSeconds(timeout)));if (result != null && result == 1) {Integer count = lockCounts.get();lockCounts.set(count == null ? 1 : count + 1);return true;}return false;}@Overridepublic void unlock(String key) {String value = lockValues.get();Integer count = lockCounts.get();if (value != null && count != null && count > 0) {DefaultRedisScript<Long> script = new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class);redisTemplate.execute(script, Collections.singletonList(key), value);count--;if (count == 0) {lockValues.remove();lockCounts.remove();} else {lockCounts.set(count);}}}
}
2.1.2 Redisson框架

配置

@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0).setConnectionPoolSize(10).setConnectionMinimumIdleSize(2);return Redisson.create(config);}
}

使用示例

@Service
public class RedissonLockService {@Autowiredprivate RedissonClient redissonClient;public void executeWithLock(String lockKey, Runnable task) {RLock lock = redissonClient.getLock(lockKey);try {// 尝试获取锁,最多等待10秒,锁超时时间30秒if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {try {task.run();} finally {lock.unlock();}} else {throw new RuntimeException("获取锁失败");}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("获取锁被中断", e);}}public void executeWithFairLock(String lockKey, Runnable task) {RLock fairLock = redissonClient.getFairLock(lockKey);try {if (fairLock.tryLock(10, 30, TimeUnit.SECONDS)) {try {task.run();} finally {fairLock.unlock();}}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}public void executeWithReadWriteLock(String lockKey, Runnable readTask, Runnable writeTask) {RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockKey);// 读操作if (readTask != null) {RLock readLock = readWriteLock.readLock();try {if (readLock.tryLock(10, 30, TimeUnit.SECONDS)) {try {readTask.run();} finally {readLock.unlock();}}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}// 写操作if (writeTask != null) {RLock writeLock = readWriteLock.writeLock();try {if (writeLock.tryLock(10, 30, TimeUnit.SECONDS)) {try {writeTask.run();} finally {writeLock.unlock();}}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
2.1.3 集群环境处理

RedLock算法实现

@Component
public class RedLockDistributedLock {private List<RedissonClient> redissonClients;private static final int QUORUM = 3; // 大多数节点public RedLockDistributedLock(List<RedissonClient> clients) {this.redissonClients = clients;}public boolean tryLock(String key, long timeout, TimeUnit unit) {long startTime = System.currentTimeMillis();long ttl = unit.toMillis(timeout);List<RLock> locks = new ArrayList<>();List<Boolean> results = new ArrayList<>();// 尝试从所有Redis实例获取锁for (RedissonClient client : redissonClients) {RLock lock = client.getLock(key);locks.add(lock);try {boolean acquired = lock.tryLock(0, timeout, unit);results.add(acquired);} catch (Exception e) {results.add(false);}}// 检查是否获取到大多数锁long successCount = results.stream().mapToLong(r -> r ? 1 : 0).sum();long elapsedTime = System.currentTimeMillis() - startTime;if (successCount >= QUORUM && elapsedTime < ttl) {return true;} else {// 释放已获取的锁releaseLocks(locks, results);return false;}}private void releaseLocks(List<RLock> locks, List<Boolean> results) {for (int i = 0; i < locks.size(); i++) {if (results.get(i)) {try {locks.get(i).unlock();} catch (Exception e) {// 忽略释放锁时的异常}}}}
}
2.2 基于ZooKeeper
2.2.1 临时有序节点

原生ZooKeeper实现

@Component
public class ZooKeeperDistributedLock {private ZooKeeper zooKeeper;private static final String LOCK_ROOT = "/distributed-locks";@PostConstructpublic void init() throws Exception {zooKeeper = new ZooKeeper("localhost:2181", 5000, null);// 确保根节点存在if (zooKeeper.exists(LOCK_ROOT, false) == null) {zooKeeper.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}public boolean tryLock(String lockName, long timeout, TimeUnit unit) {String lockPath = LOCK_ROOT + "/" + lockName;try {// 创建临时有序节点String currentPath = zooKeeper.create(lockPath + "/lock-", new byte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 获取所有子节点List<String> children = zooKeeper.getChildren(lockPath, false);Collections.sort(children);String currentNode = currentPath.substring(lockPath.length() + 1);int index = children.indexOf(currentNode);if (index == 0) {// 当前节点是最小的,获取锁成功return true;} else {// 监听前一个节点String prevNode = children.get(index - 1);CountDownLatch latch = new CountDownLatch(1);Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.NodeDeleted) {latch.countDown();}}};if (zooKeeper.exists(lockPath + "/" + prevNode, watcher) == null) {// 前一个节点已经被删除return true;}// 等待前一个节点被删除return latch.await(timeout, unit);}} catch (Exception e) {throw new RuntimeException("获取锁失败", e);}}
}
2.2.2 Curator框架

配置

@Configuration
public class CuratorConfig {@Beanpublic CuratorFramework curatorFramework() {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("localhost:2181").sessionTimeoutMs(5000).connectionTimeoutMs(3000).retryPolicy(retryPolicy).build();client.start();try {client.blockUntilConnected(10, TimeUnit.SECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();}return client;}
}

分布式锁实现

@Service
public class CuratorDistributedLock {@Autowiredprivate CuratorFramework curatorFramework;public void executeWithLock(String lockPath, Runnable task) {InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);try {if (lock.acquire(10, TimeUnit.SECONDS)) {try {task.run();} finally {lock.release();}} else {throw new RuntimeException("获取锁超时");}} catch (Exception e) {throw new RuntimeException("执行锁定任务失败", e);}}public void executeWithReadWriteLock(String lockPath, Runnable readTask, Runnable writeTask) {InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curatorFramework, lockPath);// 读锁if (readTask != null) {InterProcessMutex readLock = readWriteLock.readLock();try {if (readLock.acquire(10, TimeUnit.SECONDS)) {try {readTask.run();} finally {readLock.release();}}} catch (Exception e) {throw new RuntimeException("读锁执行失败", e);}}// 写锁if (writeTask != null) {InterProcessMutex writeLock = readWriteLock.writeLock();try {if (writeLock.acquire(10, TimeUnit.SECONDS)) {try {writeTask.run();} finally {writeLock.release();}}} catch (Exception e) {throw new RuntimeException("写锁执行失败", e);}}}public void executeWithSemaphore(String semaphorePath, int permits, Runnable task) {InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(curatorFramework, semaphorePath, permits);Lease lease = null;try {lease = semaphore.acquire(10, TimeUnit.SECONDS);if (lease != null) {task.run();} else {throw new RuntimeException("获取信号量超时");}} catch (Exception e) {throw new RuntimeException("信号量执行失败", e);} finally {if (lease != null) {try {semaphore.returnLease(lease);} catch (Exception e) {// 忽略释放异常}}}}
}
2.2.3 性能优化

连接池优化

@Configuration
public class ZooKeeperOptimizedConfig {@Beanpublic CuratorFramework optimizedCuratorFramework() {RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(1000, 8000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("zk1:2181,zk2:2181,zk3:2181").sessionTimeoutMs(30000).connectionTimeoutMs(15000).retryPolicy(retryPolicy).namespace("distributed-locks").build();client.start();return client;}@Beanpublic InterProcessMutex sharedLock(CuratorFramework curatorFramework) {// 共享锁实例,避免重复创建return new InterProcessMutex(curatorFramework, "/shared-lock");}
}
2.3 基于数据库
2.3.1 悲观锁实现

基于唯一索引

@Entity
@Table(name = "distributed_lock")
public class DistributedLockEntity {@Id@Column(name = "lock_name", unique = true)private String lockName;@Column(name = "lock_value")private String lockValue;@Column(name = "expire_time")private LocalDateTime expireTime;@Column(name = "created_time")private LocalDateTime createdTime;// getters and setters
}@Repository
public interface DistributedLockRepository extends JpaRepository<DistributedLockEntity, String> {@Modifying@Query("DELETE FROM DistributedLockEntity d WHERE d.expireTime < :now")int deleteExpiredLocks(@Param("now") LocalDateTime now);@Modifying@Query("DELETE FROM DistributedLockEntity d WHERE d.lockName = :lockName AND d.lockValue = :lockValue")int deleteLock(@Param("lockName") String lockName, @Param("lockValue") String lockValue);
}@Service
@Transactional
public class DatabaseDistributedLock {@Autowiredprivate DistributedLockRepository lockRepository;private ThreadLocal<String> lockValues = new ThreadLocal<>();public boolean tryLock(String lockName, long timeout, TimeUnit unit) {String lockValue = UUID.randomUUID().toString();LocalDateTime expireTime = LocalDateTime.now().plus(timeout, unit == TimeUnit.SECONDS ? ChronoUnit.SECONDS : ChronoUnit.MILLIS);try {DistributedLockEntity lockEntity = new DistributedLockEntity();lockEntity.setLockName(lockName);lockEntity.setLockValue(lockValue);lockEntity.setExpireTime(expireTime);lockEntity.setCreatedTime(LocalDateTime.now());lockRepository.save(lockEntity);lockValues.set(lockValue);return true;} catch (DataIntegrityViolationException e) {// 唯一索引冲突,说明锁已被占用return false;}}public void unlock(String lockName) {String lockValue = lockValues.get();if (lockValue != null) {lockRepository.deleteLock(lockName, lockValue);lockValues.remove();}}@Scheduled(fixedRate = 60000) // 每分钟清理一次过期锁public void cleanExpiredLocks() {lockRepository.deleteExpiredLocks(LocalDateTime.now());}
}
2.3.2 乐观锁实现

基于版本号

@Entity
public class OptimisticLockEntity {@Idprivate String resourceId;@Versionprivate Long version;private String lockHolder;private LocalDateTime lockTime;// getters and setters
}@Service
public class OptimisticDistributedLock {@Autowiredprivate OptimisticLockRepository repository;@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 3)public boolean tryLock(String resourceId, String lockHolder) {try {OptimisticLockEntity entity = repository.findById(resourceId).orElse(new OptimisticLockEntity());if (entity.getLockHolder() != null && entity.getLockTime().isAfter(LocalDateTime.now().minusMinutes(5))) {// 锁已被占用且未过期return false;}entity.setResourceId(resourceId);entity.setLockHolder(lockHolder);entity.setLockTime(LocalDateTime.now());repository.save(entity);return true;} catch (OptimisticLockingFailureException e) {throw e; // 让@Retryable处理重试}}public void unlock(String resourceId, String lockHolder) {OptimisticLockEntity entity = repository.findById(resourceId).orElse(null);if (entity != null && lockHolder.equals(entity.getLockHolder())) {entity.setLockHolder(null);entity.setLockTime(null);repository.save(entity);}}
}
2.3.3 优缺点分析

悲观锁优缺点:

优点:

  • 实现简单
  • 强一致性保证
  • 不依赖外部系统

缺点:

  • 性能较差
  • 容易出现死锁
  • 单点故障

乐观锁优缺点:

优点:

  • 性能较好
  • 不会产生死锁
  • 适合读多写少场景

缺点:

  • 可能出现饥饿问题
  • 需要重试机制
  • 不适合高并发写场景
2.4 其他实现
2.4.1 Etcd实现
@Service
public class EtcdDistributedLock {private Client etcdClient;private Lock lockClient;@PostConstructpublic void init() {etcdClient = Client.builder().endpoints("http://localhost:2379").build();lockClient = etcdClient.getLockClient();}public void executeWithLock(String lockName, Runnable task) {try {LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockName, StandardCharsets.UTF_8), 30 // 租约时间30秒).get();try {task.run();} finally {lockClient.unlock(lockResponse.getKey()).get();}} catch (Exception e) {throw new RuntimeException("Etcd锁执行失败", e);}}
}
2.4.2 Consul实现
@Service
public class ConsulDistributedLock {@Autowiredprivate ConsulClient consulClient;public boolean tryLock(String lockKey, String sessionId, long timeout) {try {// 创建会话NewSession newSession = new NewSession();newSession.setTtl(timeout);String session = consulClient.sessionCreate(newSession, QueryParams.DEFAULT).getValue();// 尝试获取锁PutParams putParams = new PutParams();putParams.setAcquireSession(session);boolean acquired = consulClient.setKVValue(lockKey, "locked", putParams).getValue();if (!acquired) {// 销毁会话consulClient.sessionDestroy(session, QueryParams.DEFAULT);}return acquired;} catch (Exception e) {throw new RuntimeException("Consul锁获取失败", e);}}public void unlock(String lockKey, String sessionId) {try {consulClient.sessionDestroy(sessionId, QueryParams.DEFAULT);} catch (Exception e) {// 忽略解锁异常}}
}

3. 关键问题

3.1 死锁处理

超时机制

@Component
public class DeadlockPreventionLock {@Autowiredprivate RedisTemplate<String, String> redisTemplate;private static final int DEFAULT_TIMEOUT = 30; // 默认30秒超时public boolean tryLockWithTimeout(String lockKey, int timeoutSeconds) {String lockValue = UUID.randomUUID().toString();String threadInfo = Thread.currentThread().getName() + "-" + System.currentTimeMillis();// 设置锁,带超时时间Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, threadInfo, timeoutSeconds, TimeUnit.SECONDS);if (Boolean.TRUE.equals(result)) {// 启动锁监控startLockMonitor(lockKey, threadInfo, timeoutSeconds);return true;}return false;}private void startLockMonitor(String lockKey, String lockValue, int timeoutSeconds) {CompletableFuture.runAsync(() -> {try {Thread.sleep((timeoutSeconds - 5) * 1000); // 提前5秒检查String currentValue = redisTemplate.opsForValue().get(lockKey);if (lockValue.equals(currentValue)) {// 锁仍然存在,可能出现死锁,记录日志logger.warn("可能存在死锁,锁key: {}, 持有线程: {}", lockKey, lockValue);// 可以选择强制释放锁或者延长锁时间// forceReleaseLock(lockKey, lockValue);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}
}
3.2 锁超时机制

看门狗机制

@Component
public class WatchdogLock {@Autowiredprivate RedisTemplate<String, String> redisTemplate;private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);private Map<String, ScheduledFuture<?>> renewalTasks = new ConcurrentHashMap<>();public boolean tryLockWithWatchdog(String lockKey, int initialTimeout) {String lockValue = UUID.randomUUID().toString();Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, initialTimeout, TimeUnit.SECONDS);if (Boolean.TRUE.equals(result)) {startWatchdog(lockKey, lockValue, initialTimeout);return true;}return false;}private void startWatchdog(String lockKey, String lockValue, int timeout) {ScheduledFuture<?> renewalTask = scheduler.scheduleAtFixedRate(() -> {try {String currentValue = redisTemplate.opsForValue().get(lockKey);if (lockValue.equals(currentValue)) {// 续期锁redisTemplate.expire(lockKey, timeout, TimeUnit.SECONDS);logger.debug("锁续期成功: {}", lockKey);} else {// 锁已被释放或被其他线程获取stopWatchdog(lockKey);}} catch (Exception e) {logger.error("锁续期失败: " + lockKey, e);stopWatchdog(lockKey);}}, timeout / 3, timeout / 3, TimeUnit.SECONDS);renewalTasks.put(lockKey, renewalTask);}public void unlock(String lockKey) {stopWatchdog(lockKey);// 释放锁的Lua脚本String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) else return 0 end";DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);redisTemplate.execute(redisScript, Collections.singletonList(lockKey), Thread.currentThread().getName());}private void stopWatchdog(String lockKey) {ScheduledFuture<?> task = renewalTasks.remove(lockKey);if (task != null && !task.isCancelled()) {task.cancel(false);}}
}
3.3 可重入性实现

基于ThreadLocal的可重入锁

@Component
public class ReentrantDistributedLock {@Autowiredprivate RedisTemplate<String, String> redisTemplate;private ThreadLocal<Map<String, LockInfo>> lockHolder = new ThreadLocal<>();private static class LockInfo {private String lockValue;private int count;public LockInfo(String lockValue) {this.lockValue = lockValue;this.count = 1;}public void increment() {count++;}public boolean decrement() {return --count == 0;}}public boolean tryLock(String lockKey, long timeout, TimeUnit unit) {Map<String, LockInfo> locks = lockHolder.get();if (locks == null) {locks = new HashMap<>();lockHolder.set(locks);}LockInfo lockInfo = locks.get(lockKey);if (lockInfo != null) {// 可重入lockInfo.increment();return true;}// 尝试获取新锁String lockValue = UUID.randomUUID().toString();Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, timeout, unit);if (Boolean.TRUE.equals(result)) {locks.put(lockKey, new LockInfo(lockValue));return true;}return false;}public void unlock(String lockKey) {Map<String, LockInfo> locks = lockHolder.get();if (locks == null) {return;}LockInfo lockInfo = locks.get(lockKey);if (lockInfo == null) {return;}if (lockInfo.decrement()) {// 完全释放锁locks.remove(lockKey);if (locks.isEmpty()) {lockHolder.remove();}// 从Redis中删除锁String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) else return 0 end";DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);redisTemplate.execute(redisScript, Collections.singletonList(lockKey), lockInfo.lockValue);}}
}
3.4 公平性保证

基于队列的公平锁

@Component
public class FairDistributedLock {@Autowiredprivate RedisTemplate<String, String> redisTemplate;private static final String FAIR_LOCK_SCRIPT = "local key = KEYS[1] " +"local queue_key = key .. ':queue' " +"local current_key = key .. ':current' " +"local client_id = ARGV[1] " +"local ttl = tonumber(ARGV[2]) " +// 检查是否已经持有锁"local current_holder = redis.call('get', current_key) " +"if current_holder == client_id then " +"    redis.call('expire', current_key, ttl) " +"    return 1 " +"end " +// 检查队列中的位置"local queue_position = redis.call('lpos', queue_key, client_id) " +"if queue_position == false then " +"    redis.call('rpush', queue_key, client_id) " +"    queue_position = redis.call('llen', queue_key) - 1 " +"end " +// 如果没有当前持有者且是队列第一个"if current_holder == false and queue_position == 0 then " +"    redis.call('set', current_key, client_id, 'EX', ttl) " +"    redis.call('lpop', queue_key) " +"    return 1 " +"end " +"return 0";public boolean tryLock(String lockKey, long timeout, TimeUnit unit) {String clientId = Thread.currentThread().getName() + "-" + System.currentTimeMillis();DefaultRedisScript<Long> script = new DefaultRedisScript<>(FAIR_LOCK_SCRIPT, Long.class);Long result = redisTemplate.execute(script, Collections.singletonList(lockKey), clientId, String.valueOf(unit.toSeconds(timeout)));if (result != null && result == 1) {return true;}// 等待轮到自己return waitForTurn(lockKey, clientId, timeout, unit);}private boolean waitForTurn(String lockKey, String clientId, long timeout, TimeUnit unit) {long endTime = System.currentTimeMillis() + unit.toMillis(timeout);String queueKey = lockKey + ":queue";while (System.currentTimeMillis() < endTime) {Long position = redisTemplate.opsForList().indexOf(queueKey, clientId);if (position != null && position == 0) {// 轮到自己了,尝试获取锁if (tryAcquireLock(lockKey, clientId, timeout, unit)) {redisTemplate.opsForList().leftPop(queueKey);return true;}}try {Thread.sleep(100); // 等待100ms} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}// 超时,从队列中移除redisTemplate.opsForList().remove(queueKey, 1, clientId);return false;}private boolean tryAcquireLock(String lockKey, String clientId, long timeout, TimeUnit unit) {String currentKey = lockKey + ":current";Boolean result = redisTemplate.opsForValue().setIfAbsent(currentKey, clientId, timeout, unit);return Boolean.TRUE.equals(result);}
}
3.5 高可用设计

主备切换机制

@Component
public class HighAvailabilityDistributedLock {private List<RedisTemplate<String, String>> redisTemplates;private AtomicInteger currentIndex = new AtomicInteger(0);public HighAvailabilityDistributedLock(List<RedisTemplate<String, String>> templates) {this.redisTemplates = templates;}public boolean tryLock(String lockKey, long timeout, TimeUnit unit) {int attempts = redisTemplates.size();for (int i = 0; i < attempts; i++) {RedisTemplate<String, String> template = getCurrentRedisTemplate();try {String lockValue = UUID.randomUUID().toString();Boolean result = template.opsForValue().setIfAbsent(lockKey, lockValue, timeout, unit);if (Boolean.TRUE.equals(result)) {return true;}} catch (Exception e) {logger.warn("Redis节点故障,切换到下一个节点", e);switchToNextRedis();}}return false;}private RedisTemplate<String, String> getCurrentRedisTemplate() {int index = currentIndex.get() % redisTemplates.size();return redisTemplates.get(index);}private void switchToNextRedis() {currentIndex.incrementAndGet();}@Scheduled(fixedRate = 30000) // 每30秒检查节点健康状态public void healthCheck() {for (int i = 0; i < redisTemplates.size(); i++) {RedisTemplate<String, String> template = redisTemplates.get(i);try {template.opsForValue().get("health-check");logger.debug("Redis节点 {} 健康", i);} catch (Exception e) {logger.warn("Redis节点 {} 不健康", i, e);}}}
}

4. 性能对比

4.1 各方案性能指标

基准测试代码

@Component
public class DistributedLockBenchmark {@Autowiredprivate RedisDistributedLock redisLock;@Autowiredprivate ZooKeeperDistributedLock zkLock;@Autowiredprivate DatabaseDistributedLock dbLock;public void benchmarkLocks() {int threadCount = 100;int operationsPerThread = 1000;// Redis锁性能测试long redisTime = benchmarkLock("Redis", () -> {redisLock.tryLock("test-key", 10, TimeUnit.SECONDS);// 模拟业务操作try { Thread.sleep(1); } catch (InterruptedException e) {}redisLock.unlock("test-key");}, threadCount, operationsPerThread);// ZooKeeper锁性能测试long zkTime = benchmarkLock("ZooKeeper", () -> {zkLock.tryLock("test-key", 10, TimeUnit.SECONDS);try { Thread.sleep(1); } catch (InterruptedException e) {}zkLock.unlock("test-key");}, threadCount, operationsPerThread);// 数据库锁性能测试long dbTime = benchmarkLock("Database", () -> {dbLock.tryLock("test-key", 10, TimeUnit.SECONDS);try { Thread.sleep(1); } catch (InterruptedException e) {}dbLock.unlock("test-key");}, threadCount, operationsPerThread);System.out.println("性能对比结果:");System.out.println("Redis: " + redisTime + "ms");System.out.println("ZooKeeper: " + zkTime + "ms");System.out.println("Database: " + dbTime + "ms");}private long benchmarkLock(String name, Runnable lockOperation, int threadCount, int operationsPerThread) {CountDownLatch latch = new CountDownLatch(threadCount);ExecutorService executor = Executors.newFixedThreadPool(threadCount);long startTime = System.currentTimeMillis();for (int i = 0; i < threadCount; i++) {executor.submit(() -> {try {for (int j = 0; j < operationsPerThread; j++) {lockOperation.run();}} finally {latch.countDown();}});}try {latch.await();} catch (InterruptedException e) {Thread.currentThread().interrupt();}long endTime = System.currentTimeMillis();executor.shutdown();return endTime - startTime;}
}
4.2 适用场景选择

选择决策矩阵

@Component
public class LockSelectionStrategy {public enum LockType {REDIS, ZOOKEEPER, DATABASE, ETCD}public enum Scenario {HIGH_PERFORMANCE,    // 高性能要求HIGH_CONSISTENCY,    // 强一致性要求SIMPLE_DEPLOYMENT,   // 简单部署HIGH_AVAILABILITY    // 高可用要求}public LockType selectLock(Scenario scenario, int expectedTps, int nodeCount) {switch (scenario) {case HIGH_PERFORMANCE:if (expectedTps > 10000) {return LockType.REDIS; // Redis性能最高} else if (expectedTps > 1000) {return nodeCount > 3 ? LockType.ETCD : LockType.REDIS;} else {return LockType.ZOOKEEPER;}case HIGH_CONSISTENCY:return LockType.ZOOKEEPER; // ZooKeeper一致性最强case SIMPLE_DEPLOYMENT:return LockType.DATABASE; // 数据库部署最简单case HIGH_AVAILABILITY:if (nodeCount >= 3) {return LockType.ZOOKEEPER; // 集群模式下ZK最稳定} else {return LockType.REDIS; // 单节点Redis足够}default:return LockType.REDIS; // 默认选择Redis}}
}
4.3 最佳实践

分布式锁使用模板

@Component
public class DistributedLockTemplate {@Autowiredprivate List<DistributedLock> distributedLocks;public <T> T executeWithLock(String lockKey, int timeout, TimeUnit unit,Supplier<T> supplier) {DistributedLock lock = selectBestLock(lockKey);if (lock.tryLock(lockKey, timeout, unit)) {try {return supplier.get();} finally {lock.unlock(lockKey);}} else {throw new LockAcquisitionException("获取分布式锁失败: " + lockKey);}}public void executeWithLock(String lockKey, int timeout, TimeUnit unit,Runnable runnable) {executeWithLock(lockKey, timeout, unit, () -> {runnable.run();return null;});}private DistributedLock selectBestLock(String lockKey) {// 根据锁键选择最合适的锁实现if (lockKey.startsWith("high-freq-")) {return findLockByType(RedisDistributedLock.class);} else if (lockKey.startsWith("consistency-")) {return findLockByType(ZooKeeperDistributedLock.class);} else {return distributedLocks.get(0); // 默认第一个}}private DistributedLock findLockByType(Class<?> type) {return distributedLocks.stream().filter(lock -> type.isAssignableFrom(lock.getClass())).findFirst().orElse(distributedLocks.get(0));}
}// 使用示例
@Service
public class BusinessService {@Autowiredprivate DistributedLockTemplate lockTemplate;public void processOrder(String orderId) {lockTemplate.executeWithLock("order:" + orderId,30,TimeUnit.SECONDS,() -> {// 处理订单业务逻辑doProcessOrder(orderId);});}public String generateUniqueId() {return lockTemplate.executeWithLock("id-generator",10,TimeUnit.SECONDS,() -> {// 生成唯一ID的业务逻辑return doGenerateId();});}
}

监控和告警

@Component
public class LockMonitoring {private MeterRegistry meterRegistry;private Counter lockAcquisitionCounter;private Timer lockHoldTimer;private Gauge activeLockGauge;private AtomicLong activeLockCount = new AtomicLong(0);@PostConstructpublic void init() {lockAcquisitionCounter = Counter.builder("distributed.lock.acquisition").description("分布式锁获取次数").register(meterRegistry);lockHoldTimer = Timer.builder("distributed.lock.hold.time").description("分布式锁持有时间").register(meterRegistry);activeLockGauge = Gauge.builder("distributed.lock.active").description("当前活跃锁数量").register(meterRegistry, activeLockCount, AtomicLong::get);}public void recordLockAcquisition(String lockKey, boolean success) {lockAcquisitionCounter.increment(Tags.of("lock_key", lockKey,"success", String.valueOf(success)));if (success) {activeLockCount.incrementAndGet();}}public void recordLockRelease(String lockKey, long holdTimeMs) {lockHoldTimer.record(holdTimeMs, TimeUnit.MILLISECONDS);activeLockCount.decrementAndGet();}@EventListenerpublic void handleLockTimeout(LockTimeoutEvent event) {logger.warn("锁超时: key={}, holdTime={}ms", event.getLockKey(), event.getHoldTime());// 发送告警alertService.sendAlert("分布式锁超时", event.toString());}
}

总结

分布式事务和分布式锁是分布式系统中的核心技术,选择合适的方案需要根据具体的业务场景、性能要求和一致性需求来决定。

分布式事务选择建议:

  • 强一致性要求:选择2PC或TCC
  • 性能要求高:选择Saga或消息事务
  • 业务逻辑复杂:选择Saga编排模式
  • 简单场景:选择本地消息表或最大努力通知

分布式锁选择建议:

  • 高性能场景:选择Redis锁
  • 强一致性要求:选择ZooKeeper锁
  • 简单部署:选择数据库锁
  • 高可用要求:选择集群方案

在实际应用中,需要结合监控、告警、降级等手段来保证系统的稳定性和可靠性。

http://www.lryc.cn/news/612812.html

相关文章:

  • “物联网+职业本科”:VR虚拟仿真实训室的发展前景
  • USB枚举介绍 以及linux USBFFS应用demo
  • 抖音、快手、视频号等多平台视频解析下载 + 磁力嗅探下载、视频加工(提取音频 / 压缩等)
  • Go语言Ebiten坦克大战
  • JVM类加载
  • Redis中间件(三):Redis存储原理与数据模型
  • Spring MVC拦截器与过滤器的区别详解
  • Ubuntu24.04的“errors from xkbcomp are not fatal to the X server”终极修复方案
  • Ethereum:如何优雅部署 NPM 包中的第三方智能合约?
  • SpringBoot学习日记 Day5:解锁企业级开发核心技能
  • 90-基于Flask的中国博物馆数据可视化分析系统
  • 8- 知识图谱 — 应用案例怎么 “落地” 才有效?构建流程与行业实践全解析
  • LoRaWAN的网络拓扑
  • Kong vs. NGINX:从反向代理到云原生网关的全景对比
  • PCL提取平面上的圆形凸台特征
  • 阿里系bx_et加密分析
  • 构造函数:C++对象初始化的核心机制
  • 天猫商品评论API技术指南
  • uni-app X能成为下一个Flutter吗?
  • Flutter报错...Unsupported class file major version 65
  • C# 异步编程(async_await特性的结构)
  • PyTorch 核心三件套:Tensor、Module、Autograd
  • `/dev/vdb` 是一个新挂载的 4TB 硬盘,但目前尚未对其进行分区和格式化。
  • vscode 打开设置
  • Flutter 三棵树
  • 【物联网】基于树莓派的物联网开发【25】——树莓派安装Grafana与Influxdb无缝集成
  • CentOS 7 下通过 Anaconda3 运行llm大模型、deepseek大模型的完整指南
  • 人工智能的20大应用
  • 从Centos 9 Stream 版本切换到 Rocky Linux 9
  • 360纳米AI、实在Agent、CrewAI与AutoGen……浅析多智能体协作系统