博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
golang基于etcd实现分布式锁(转)
阅读量:5300 次
发布时间:2019-06-14

本文共 2901 字,大约阅读时间需要 9 分钟。

下面描述使用 Etcd 实现分布式锁的业务流程,假设对某个共享资源设置的锁名为:/lock/mylock

步骤 1: 准备

客户端连接 Etcd,以 /lock/mylock 为前缀创建全局唯一的 key,假设第一个客户端对应的 key="/lock/mylock/UUID1",第二个为 key="/lock/mylock/UUID2";客户端分别为自己的 key 创建租约 - Lease,租约的长度根据业务耗时确定,假设为 15s;

步骤 2: 创建定时任务作为租约的“心跳”

当一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,key 将因租约到期而被删除,从而锁释放,避免死锁。

步骤 3: 客户端将自己全局唯一的 key 写入 Etcd

进行 put 操作,将步骤 1 中创建的 key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,假设两个客户端 put 操作返回的 Revision 分别为 1、2,客户端需记录 Revision 用以接下来判断自己是否获得锁。

步骤 4: 客户端判断是否获得锁

客户端以前缀 /lock/mylock 读取 keyValue 列表(keyValue 中带有 key 对应的 Revision),判断自己 key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。

步骤 5: 执行业务

获得锁后,操作共享资源,执行业务代码。

步骤 6: 释放锁

完成业务流程后,删除对应的key释放锁。

 

 

我们希望同一时间只有一个线程能够访问到资源,但是分布式资源点之间的协调会非常麻烦,这个时候我们就需要一个分布式锁。

etcd分布式锁实现原理:

1.利用租约在etcd集群中创建一个key,这个key有两种形态,存在和不存在,而这两种形态就是互斥量。

2.如果这个key不存在,那么线程创建key,成功则获取到锁,该key就为存在状态。
3.如果该key已经存在,那么线程就不能创建key,则获取锁失败。

锁结构体:

在使用该锁时,需要传入Ttl,Conf,Key字段来初始化锁

type EtcdMutex struct {    Ttl int64  //租约时间    Conf clientv3.Config  //etcd集群配置 Key string //etcd的key cancel context.CancelFunc //关闭续租的func lease clientv3.Lease leaseID clientv3.LeaseID txn clientv3.Txn }

初始化锁:

func(em *EtcdMutex)init()error{    var err error    var ctx context.Context    client,err := clientv3.New(em.Conf)    if err != nil{ return err } em.txn = clientv3.NewKV(client).Txn(context.TODO()) em.lease = clientv3.NewLease(client) leaseResp,err := em.lease.Grant(context.TODO(),em.Ttl) if err != nil{ return err } ctx,em.cancel = context.WithCancel(context.TODO()) em.leaseID = leaseResp.ID _,err = em.lease.KeepAlive(ctx,em.leaseID) return err }

获取锁:

func(em *EtcdMutex)Lock()error{    err := em.init()    if err != nil{        return err    }    //LOCK:        em.txn.If(clientv3.Compare(clientv3.CreateRevision(em.Key),"=",0)). Then(clientv3.OpPut(em.Key,"",clientv3.WithLease(em.leaseID))). Else() txnResp,err := em.txn.Commit() if err != nil{ return err } if !txnResp.Succeeded{ //判断txn.if条件是否成立 return fmt.Errof("抢锁失败") } return nil }

释放锁:

func(em *EtcdMutex)UnLock(){    em.cancel()    em.lease.Revoke(context.TODO(),em.leaseID)    fmt.Println("释放了锁")}

调用锁:

func main(){    var conf = clientv3.Config{        Endpoints:   []string{
"172.16.196.129:2380", "192.168.50.250:2380"}, DialTimeout: 5 * time.Second, } eMutex1 := &EtcdMutex{ Conf:conf, Ttl:10, Key:"lock", } eMutex2 := &EtcdMutex{ Conf:conf, Ttl:10, Key:"lock", } //groutine1 go func() { err := eMutex1.Lock() if err != nil{ fmt.Println("groutine1抢锁失败") fmt.Println(err) return } fmt.Println("groutine1抢锁成功") time.Sleep(10*time.Second) defer eMutex.UnLock() }() //groutine2 go func() { err := eMutex2.Lock() if err != nil{ fmt.Println("groutine2抢锁失败") fmt.Println(err) return } fmt.Println("groutine2抢锁成功") defer eMutex.UnLock() }() time.Sleep(30*time.Second) }

转载于:https://www.cnblogs.com/wangbin/p/10702171.html

你可能感兴趣的文章
【BZOJ1004】【HNOI20008】cards
查看>>
【BZOJ3672】【UOJ#6】【NOI2014】随机数生成器
查看>>
Oracle常用转换函数总结
查看>>
博客员的第一次写博客
查看>>
ogg:Extract 进程遇长事务执行 Forcestop 引发的惨案
查看>>
windows自带的线程池
查看>>
基于Qt的信号分析简单应用软件的设计
查看>>
常见的几种“分析”概念
查看>>
SAS如何看待大数据
查看>>
[NOIP2016 DAY1 T2]天天爱跑步-[差分+线段树合并][解题报告]
查看>>
Linked List Cycle 判断一个链表是否存在回路(循环)
查看>>
《人机猜拳》
查看>>
【科技】线性异或方程组的相关
查看>>
大数据导出数据库的设计
查看>>
列表和元组
查看>>
两两组合算法-递归实现方法
查看>>
python中time模块
查看>>
JavaWeb学习笔记(3)
查看>>
StreamAPI 小练习
查看>>
进程关系之作业控制
查看>>