下面描述使用 Etcd 实现分布式锁的业务流程,假设对某个共享资源设置的锁名为:/lock/mylock
步骤 1: 准备
客户端连接 Etcd,以 /lock/mylock
为前缀创建全局唯一的 key,假设第一个客户端对应的 key="/lock/mylock/UUID1"
,第二个为 key="/lock/mylock/UUID2"
;客户端分别为自己的 key 创建租约 - Lease,租约的长度根据业务耗时确定,假设为 15s;
步骤 2: 创建定时任务作为租约的“心跳”
当一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,key 将因租约到期而被删除,从而锁释放,避免死锁。
步骤 3: 客户端将自己全局唯一的 key 写入 Etcd
进行 put 操作,将步骤 1 中创建的 key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,假设两个客户端 put 操作返回的 Revision 分别为 1、2,客户端需记录 Revision 用以接下来判断自己是否获得锁。
步骤 4: 客户端判断是否获得锁
客户端以前缀 /lock/mylock
读取 keyValue 列表(keyValue 中带有 key 对应的 Revision),判断自己 key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。
步骤 5: 执行业务
获得锁后,操作共享资源,执行业务代码。
步骤 6: 释放锁
完成业务流程后,删除对应的key释放锁。
我们希望同一时间只有一个线程能够访问到资源,但是分布式资源点之间的协调会非常麻烦,这个时候我们就需要一个分布式锁。
etcd分布式锁实现原理:
1.利用租约在etcd集群中创建一个key,这个key有两种形态,存在和不存在,而这两种形态就是互斥量。
2.如果这个key不存在,那么线程创建key,成功则获取到锁,该key就为存在状态。 3.如果该key已经存在,那么线程就不能创建key,则获取锁失败。锁结构体:
在使用该锁时,需要传入Ttl,Conf,Key字段来初始化锁
type EtcdMutex struct { Ttl int64 //租约时间 Conf clientv3.Config //etcd集群配置 Key string //etcd的key cancel context.CancelFunc //关闭续租的func lease clientv3.Lease leaseID clientv3.LeaseID txn clientv3.Txn }
初始化锁:
func(em *EtcdMutex)init()error{ var err error var ctx context.Context client,err := clientv3.New(em.Conf) if err != nil{ return err } em.txn = clientv3.NewKV(client).Txn(context.TODO()) em.lease = clientv3.NewLease(client) leaseResp,err := em.lease.Grant(context.TODO(),em.Ttl) if err != nil{ return err } ctx,em.cancel = context.WithCancel(context.TODO()) em.leaseID = leaseResp.ID _,err = em.lease.KeepAlive(ctx,em.leaseID) return err }
获取锁:
func(em *EtcdMutex)Lock()error{ err := em.init() if err != nil{ return err } //LOCK: em.txn.If(clientv3.Compare(clientv3.CreateRevision(em.Key),"=",0)). Then(clientv3.OpPut(em.Key,"",clientv3.WithLease(em.leaseID))). Else() txnResp,err := em.txn.Commit() if err != nil{ return err } if !txnResp.Succeeded{ //判断txn.if条件是否成立 return fmt.Errof("抢锁失败") } return nil }
释放锁:
func(em *EtcdMutex)UnLock(){ em.cancel() em.lease.Revoke(context.TODO(),em.leaseID) fmt.Println("释放了锁")}
调用锁:
func main(){ var conf = clientv3.Config{ Endpoints: []string{ "172.16.196.129:2380", "192.168.50.250:2380"}, DialTimeout: 5 * time.Second, } eMutex1 := &EtcdMutex{ Conf:conf, Ttl:10, Key:"lock", } eMutex2 := &EtcdMutex{ Conf:conf, Ttl:10, Key:"lock", } //groutine1 go func() { err := eMutex1.Lock() if err != nil{ fmt.Println("groutine1抢锁失败") fmt.Println(err) return } fmt.Println("groutine1抢锁成功") time.Sleep(10*time.Second) defer eMutex.UnLock() }() //groutine2 go func() { err := eMutex2.Lock() if err != nil{ fmt.Println("groutine2抢锁失败") fmt.Println(err) return } fmt.Println("groutine2抢锁成功") defer eMutex.UnLock() }() time.Sleep(30*time.Second) }