如何开发一个支持海量分布式锁的应用库
分布式锁是一种用于控制分布式系统中资源访问的同步机制,确保在任意时刻只有一个客户端能够获取到锁,并对共享资源进行操作。
作用
1.保证数据一致性:在多个节点并发执行的情况下,分布式锁可以防止同时修改同一份数据,从而避免数据不一致的问题。
2.协调任务执行:确保特定的任务不会被重复执行,特别是在需要幂等性(idempotent)保证的时候。
应用场景例如
•库存扣减:在电商系统中,当用户下单时需要扣减库存,为了避免超卖现象,必须确保每次扣减操作都是原子性的。
•定时任务调度:在分布式环境中,确保同一个定时任务只在一个节点上运行,防止重复执行。
•缓存更新:当多个服务实例试图更新同一个缓存项时,使用分布式锁可以确保更新过程的线程安全。
•秒杀活动:对于高并发的抢购活动,如秒杀,使用分布式锁来控制对有限商品资源的访问是至关重要的。
•文件上传:在分布式文件系统中,确保同一文件不会被多次上传或覆盖。
常见实现方式
•基于数据库:可以使用数据库的唯一索引来实现简单的分布式锁,也可以通过for update等机制来实现分布式锁。例如,在尝试获取锁时插入一条记录,如果插入成功则表示获取到锁;如果违反了唯一索引约束,则说明锁已经被其他客户端持有。这种方法简单直接,但性能可能不如其他专门设计的解决方案,并且需要处理死锁和锁的自动释放等问题。
•基于Redis:Redis是一个内存中的键值存储系统,它提供了原子性的SETNX(Set if Not Exists)命令来设置一个键,只有当该键不存在时才会成功。结合EXPIRE或PEXPIRE命令,可以为锁设置一个过期时间,防止死锁的发生。
•基于Zookeeper:Zookeeper支持临时顺序节点,这使得它可以实现复杂的分布式锁逻辑,如公平锁、重入锁以及读写锁。客户端创建一个临时顺序节点作为锁对象,然后检查自己创建的节点是否是最小编号的节点,以此判断是否获得锁。
•基于Etcd:Etcd是一个高可用的分布式键值存储系统,它也能够提供分布式锁功能。与Zookeeper类似,etcd使用临时键和租约机制来实现锁。
•基于Consul:同样可以用来实现分布式锁。Consul利用KV存储和会话机制,可以方便地构建出分布式锁的应用。
本文将利用raftx,用简单的方法,编写一个分布式锁的应用库,它的特点是:
•使用方式简单并且可用性强
•支持海量创建分布式锁,可以同时创建几十万甚至上百万个分布式锁
•占用极少量的系统资源
•无自旋阻塞策略,不占用CPU资源
•抢占式获取锁
•支持TTL(time to live), 防止集群节点宕机造成死锁
raftx的分布式易失性数据扩展模块实现分布式锁 有比常见分布式锁的实现较为明显的特点
1.高效,它基于内存。获取与释放分布式锁过程更快
2.可以创建海量分布式锁。如果系统需要创建海量分布式锁,比如售票系统,电商秒杀活动等, 对于Zookeeper,Etcd,redis等,在创建海量分布式锁时,可能面临大量日志与大量触发机制,导致系统负载过大的问题。而raftx则不会有这个问题。可以通过以下的Lockx的实现过程,详细了解。
什么是Raftx
raftx 是一种对经典 Raft 协议的扩展,结合了 Multi-Paxos、ZAB(Zookeeper Atomic Broadcast)和 Raft 协议的优势。RaftX 具备快速选举、并发提案、数据同步、数据回滚以及易失性数据同步等特性,适用于高并发和大规模分布式系统场景。
raftx wiki
Lockx 分布式锁应用库,支持创建海量分布式锁
Lockx是依赖raftx实现的一个分布式锁应用库,实现方式简单,代码量少,100行左右代码,但是它的功能却十分强大,主要表现在:
•高效性与及时性
•资源占用极少
•支持海量创建分布式锁
•API使用简单方便
Lockx 支持一次性创建成千上万,甚至数十万或数百万个分布式锁,它的实现机制保证了它不会大量占用CPU资源和内存资源;它的锁动作变更触发机制针对的是锁资源,而非分布式对象锁本身,也就是说,即使节点中有100万个锁竞争一个锁资源,每次也只会触发一次锁的释放与竞争的指令;比如锁资源"lockmux",那么在分布式系统中,当资源 “lockmux”被释放时,它将触发节点中的 “lockmux”绑定事件一次,并让等待的资源随机发送一条竞争锁的指令竞争该资源锁,而不是触发100万个等待中的锁对象竞争事件。
Lockx 实现方式
lockx主要依赖raftx的易失性数据API实现,它的特点是高效,强一致性,并且可以绑定键值的增删改的触发事件;利用这些特性,可以轻松实现分布式锁的逻辑。
m.raft.MemWatch([]byte(lockstr), func(key, value []byte, watchType raft.WatchType) {//获取锁成功与否if watchType == raft.ADD {if mb, ok := m.mp.Get(util.BytesToInt64(value)); ok {m.del(string(key), util.BytesToInt64(value))close(mb.ctx)}}//锁释放,阻塞代码再次重新获取分布式锁if watchType == raft.DELETE {m.mux.Lock()defer m.mux.Unlock()if ids, b := m.rmap[string(key)]; b {for k := range ids {m.raft.MemCommand(key, util.Int64ToBytes(k), timeout, raft.MEM_PUT)break}}}//TryLock获取锁失败触发if watchType == raft.UPDATE {if mb, ok := m.mp.Get(util.BytesToInt64(value)); ok {if mb.isTry {m.del(string(key), util.BytesToInt64(value))mb.ctx <- trueclose(mb.ctx)}}}}, false, raft.ADD, raft.DELETE, raft.UPDATE)
这是lockx实现的核心代码,主要通过监听raftx易失性数据主键的增删改事件来实现资源锁的锁定与释放
•raft.ADD 这是资源锁新增的触发事件,通过它判断哪个对象获取到分布式锁,同时关闭相应阻塞的通道,让获取锁的程序继续执行。
•raft.DELETE 这是资源锁删除的触发事件,同时它将再次发送获取资源锁的指令,抢占资源锁
•raft.UPDATE 这是资源锁更新的触发事件,它表示资源锁获取失败,用于TryLock,同时关闭相应阻塞的通道并返回false
Lockx 使用方式
Lockx 的使用非常简单,并且它可以支持大量创建分布式锁,它一共有3个方法
•Lock(string,int)
获取指定资源的分布式锁并设置过期时间,阻塞
•TryLock(string,int)bool
获取指定资源的分布式锁并设置过期时间,若获取不到返回false,不阻塞
•UnLock(string)
释放指定资源的分布式锁
以下模拟3个集群节点
//节点1,创建分布式锁管理器 mutex1
mutex1 = NewMutex(":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})//节点2,创建分布式锁管理器 mutex2
mutex2 = NewMutex(":20002", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})//节点3,创建分布式锁管理器 mutex3
mutex3 = NewMutex(":20003", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
•第一个参数 raftx服务地址
•第二个参数是所有集群节点都相同的,为所有节点的访问地址 []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"}
这样就完成了分布式锁管理器的创建,并可以直接获取各个自定义资源的分布式锁,这里的资源指的是字符串,比如 “test”
示例
//节点1
func lock1(i int) {logger.Debugf("mutex1 lock%d lock.....", i)mutex1.Lock("test", 10)logger.Debugf("mutex1 lock%d get lock successful", i)time.Sleep(2 * time.Second)mutex1.Unlock("test")logger.Debugf("mutex1 lock%d unlock", i)
}//节点2
func lock2(i int) {logger.Debugf("mutex2 lock%d lock.....", i)mutex2.Lock("test", 10)logger.Debugf("mutex2 lock%d get lock successful", i)
}//节点3
func lock3(i int) {logger.Debugf("mutex3 lock%d lock.....", i)mutex3.Lock("test", 10)logger.Debugf("mutex3 lock%d get lock successful", i)time.Sleep(2 * time.Second)mutex3.Unlock("test")logger.Debugf("mutex3 lock%d unlock", i)
}
测试调用
func Test_lock(t *testing.T) {go lock1(1)go lock2(2)go lock3(3)select {}
}
执行结果:
可以看到:
•2024/12/31 22:34:35
三个节点的同时抢占分布式锁
•节点mutex2获取到了锁,由于mutex2没有主动释放锁,mutex2.Lock("test", 10) 这里表示10秒后 ,集群自动释放锁
•2024/12/31 22:34:45
mutex2持有的分布式锁被服务自动释放,同时mutex1节点获取到分布式锁
•2024/12/31 22:34:47
mutex1在2秒后显式调用UnLock释放锁,同时mutex3节点获取到分布式锁
•2024/12/31 22:34:49
mutex3在2秒后显式调用UnLock释放锁
Lockx 可以海量创建分布式锁,如:
func Test_multi_lock(t *testing.T) {for i := 1; i < 1<<15; i++ { //mutex1节点创建32768个并发任务go lock1(i)}for i := 1; i < 1<<15; i++ { //mutex2节点创建32768个并发任务go lock2(i)}for i := 1; i < 1<<15; i++ { //mutex3节点创建32768个并发任务go lock3(i)}select {}
}
•每个节点同时并发创建32768个分布式锁对象
执行结果:
可以看到,每2秒有一个对象获取到分布式锁,按顺序依次执行获取分布式锁与解锁。
(注意:mutex2增加了2秒后释放锁,否则mutex2节点获取锁后,将等待10秒后有raftx集群释放锁)
Lockx 的源码地址
可以直接将其当成第三方分布式锁库在工程中使用
程序中调用示例
import "github.com/donnie4w/lockx"//创建分布式锁管理器 mutex1
var mutex1 = lockx.NewMutex(":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
结论
Lockx
利用了 raftx
的高效特性和易失性数据存储能力,提供了一种简洁而强大的分布式锁解决方案。它不仅适合常规的分布式锁需求,还能够在高并发环境下保持性能优势,确保系统的稳定性和可靠性。
如果你考虑在项目中引入这样的分布式锁库,可以参考上述信息进行评估和集成。此外,也可以根据自己的具体需求调整 Lockx
的实现,例如实现更复杂的锁行为(如公平锁等)。