etcd是一个非常可靠的kv存储系统,常在分布式系统中存储着关键的数据。它是由coreos团队开发并开源的分布式键值存储系统,具备以下特点:

  • 简单:提供定义明确且面向用户的API
  • 安全:支持SSL证书验证
  • 性能:基准压测支持1w+/sec写入
  • 可靠:采用Raft协议保证分布式系统数据的可用性和一致性。

etcd的这些特性,使得它常常出现在分布式设计场景下的工具集中。

1. 下载

1.1 windows

打开GitHub的上的etcd项目,直接下载压缩包就可以了
github地址
在这里插入图片描述
这里我下载的是3.3.13版本的,直接解压,然后运行etcd.exe就可以了
在这里插入图片描述
在这里插入图片描述
etcd 目前默认使用 2379提供 HTTP API 服务, 2380 端口和peer 通信(这两个端口已经被 IANA 官方预留给 etcd )

2. 连接

注意:
默认的etccdctrl使用的是v2版本的命令。我们需要设置环境变量来使用v3版本的API

2.1 修改环境变量(window)

控制台输入命令

set ETCDCTL_API=3

在这里插入图片描述

3. 简单使用

3.1 etcdctl

etcdctl 是一个命令行客户端,它能提供一些简洁的命令,供用户直接跟etcd服务打交道,而无需基于 HTTP API 方式。可以方便我们在对服务进行测试或者手动修改数据库内容。我们刚开始可以通过 etdctl 来熟悉相关操作。这些操作跟 HTTP API 基本上是对应的。etcdctl 在两个不同的 etcd 版本下的行为方式也完全不同。

put

使用etccdctl 来put一个键值对进去
在这里插入图片描述
返回OK
在这里插入图片描述

get

根据key来获取value
在这里插入图片描述

del

根据key删除键值对
在这里插入图片描述

3.2 Go语言

安装 Golang 的 Etcd 包

我们使用 v3 版本的 etcd client , 首先通过 go get 下载并编译安装 etcd clinet v3。

go get -v github.com/coreos/etcd/clientv3

该命令会将包下载到 $GOPATH/src/github.com/coreos/etcd/clientv3 中,所有相关依赖包会自动下载编译,包括protobuf、grpc等。

连接ETCD

用程序访问 etcd 首先要创建 client ,它需要传入一个 Config 配置,这里传了 2 个参数:

  • Endpoints : etcd 的多个节点服务地址;
  • DialTimeout :创建 client 的首次连接超时时间,这里传了 5 秒,如果 5 秒都没有连接成功就会返回 err ,一旦 client 创建成功,我们就不用再关心后续底层连接的状态了, client 内部会重连;
cli, err := clientv3.New(clientv3.Config{
   Endpoints:   []string{"localhost:2379"},
   // Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}
   DialTimeout: 5 * time.Second,
})

返回的client,类型中的成员是 etcd 客户端核心功能模块具体实现,它的类型具体如下:

type Client struct {
    Cluster  //向集群里增加 etcd 服务端节点之类,属于管理员操作。
    KV 		 //我们主要使用的功能,即 K-V 键值库的操作。
    Lease    //租约相关操作,比如申请一个 TTL=10 秒的租约(应用给 key 可以实现键值的自动过期)。
    Watcher  //观察订阅,从而监听最新的数据变化。
    Auth     //管理 etcd 的用户和权限,属于管理员操作。
    Maintenance   //维护 etcd ,比如主动迁移 etcd 的 leader 节点,属于管理员操作。
    
    // Username is a user name for authentication.
    Username string
    // Password is a password for authentication.
    Password string
    // contains filtered or unexported fields
}

我们需要使用什么功能,就去 client 里获取对应的成员即可。
Client.KV 是一个 interface ,提供了关于 K-V 操作的所有方法:

type KV interface {

    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

    Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

    // Delete deletes a key, or optionally using WithRange(end), [key, end).
    Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

    // Compact compacts etcd KV history before the given rev.
    Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)

    Do(ctx context.Context, op Op) (OpResponse, error)

    // Txn creates a transaction.
    Txn(ctx context.Context) Txn
}

我们通过方法clientv3.NewKV()来获得 KV 接口的实现(实现中内置了错误重试机制):

kv := clientv3.NewKV(cli)

接下来,我们就可以通过kv来操作etcd中的数据。

PUT
putResp, err := kv.Put(context.TODO(),"/test/key1", "Hello etcd!")

第一个参数是 goroutine 的上下文 Context 。后面两个参数分别是 keyvalue ,对于 etcd 来说, key=/test/key1 只是一个字符串而已,但是对我们而言却可以模拟出目录层级关系。

Put的声明如下:

// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

除了上面例子中的三个的参数,还支持一个变长参数,可以传递一些控制项来影响 Put 的行为,例如可以携带一个 lease ID 来支持 key 过期。

Put 操作返回的是 PutResponse ,不同的 KV 操作对应不同的 response 结构,所有 KV 操作返回的 response 结构如下:

type (
   CompactResponse pb.CompactionResponse
   PutResponse     pb.PutResponse
   GetResponse     pb.RangeResponse
   DeleteResponse  pb.DeleteRangeResponse
   TxnResponse     pb.TxnResponse
)

程序代码里导入 clientv3 后在 VSCode 中可以很快定位到 PutResponse 的定义文件中, PutResponse 只是 pb.PutResponse 的类型别名,通过VSCode跳转过去后可以看到 PutResponse 的详细定义:

type PutResponse struct {
   Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
   // if prev_kv is set in the request, the previous key-value pair will be returned.
   PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}

Header 里保存的主要是本次更新的 revision 信息,而 PrevKv 可以返回 Put 覆盖之前的 value 是什么(目前是 nil ,后面会说原因),我们需要判断 err 来确定操作是否成功。
我们Put 其他 3 个 key ,用于后续演示:

putResp, err := kv.Put(context.TODO(),"/test/key1", "Hello etcd!")
if err != nil{
	fmt.print("put failed!")
}
kv.Put(context.TODO(),"/test/key2", "Hello World!")
// 再写一个同前缀的干扰项
kv.Put(context.TODO(), "/testspam", "spam")

现在 /test 目录下有两个键: key1key2 , 而 /testspam 并不归属于 /test 目录。

代码:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/coreos/etcd/clientv3"
)

func main() {
	config := clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"}, // 集群列表
		DialTimeout: 5 * time.Second,
	}

	// 建立一个客户端
	client, err := clientv3.New(config)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 用于读写etcd的键值对
	kv := clientv3.NewKV(client)
	
	// clientv3.WithPrevKV() 是一个可选控制项,用于获取在设置当前键值对之前的该键的键值对
	// 有了该控制项后,putResp 才有 PrevKv 的属性,即获取之前的键值对。
	// context.TODO() 表示当前还不知道用哪个 context 控制该操作,先用该字段占位
	putResp, err := kv.Put(context.TODO(), "/demo/A/B", "hello", clientv3.WithPrevKV())

	if err != nil {
		fmt.Println(err)
	}

	fmt.Println("putResp is ", putResp)
	fmt.Println("Revision:", putResp.Header.Revision)
	if putResp.PrevKv != nil {
		fmt.Println("PrevValue:", string(putResp.PrevKv.Value))
	}
	putResp, err = kv.Put(context.TODO(),"/test/key1", "Hello etcd!")
	if err != nil{
		fmt.print("put failed!")
	}
	kv.Put(context.TODO(),"/test/key2", "Hello World!")
	// 再写一个同前缀的干扰项
	kv.Put(context.TODO(), "/testspam", "spam")
}

GET

使用 KV 的 Get 方法来读取给定键的值:

getResp, err := kv.Get(context.TODO(), "/test/key1")

函数声明如下:

// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

Put 类似,函数注释里提示我们可以传递一些控制参数来影响 Get 的行为,比如: WithFromKey 表示读取从参数 key 开始递增的所有 key ,而不是读取单个 key

在上面的例子中,我没有传递 opOption ,所以就是获取 key=/test/key1 的最新版本数据。这里 err 并不能反馈出 key 是否存在(只能反馈出本次操作因为各种原因异常了),我们需要通过 GetResponse (实际上是 pb.RangeResponse )判断 key 是否存在
让我们看一下返回值的类型:

type RangeResponse struct {
    Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
    // kvs is the list of key-value pairs matched by the range request.
    // kvs is empty when count is requested.
    Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
    // more indicates if there are more keys to return in the requested range.
    More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
    // count is set to the number of keys within the range when requested.
    Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}

Kvs 字段,保存了本次 Get 查询到的所有 k-v 对,因为上述例子只 Get 了一个单 key ,所以只需要判断一下 len(Kvs) 是否等于 1 即可知道 key 是否存在。

RangeResponse.MoreCount,当我们使用withLimit()等选项进行 Get 时会发挥作用,相当于翻页查询。
接下来,我们通过给 Get 查询增加 WithPrefix 选项,获取 /test 目录下的所有子元素:

rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())

WithPrefix()是指查找以/test/为前缀的所有 key ,因此可以模拟出查找子目录的效果。

etcd是一个有序的 k-v 存储,因此 /test/ 为前缀的 key 总是顺序排列在一起。

withPrefix()实际上会转化为范围查询,它根据前缀/test/生成了一个前闭后开的key range:[“/test/”, “/test0”),为什么呢?因为比/(47)大的字符是0(48),所以以/test0作为范围的末尾,就可以扫描到所有以/test/为前缀的 key 了。

在之前,我们 Put 了一个/testspam键值,因为不符合/test/前缀(注意末尾的 / ),所以就不会被这次Get获取到。但是,如果查询的前缀是/test,那么/testspam就会被返回,使用时一定要特别注意。

打印 rangeResp.Kvs 可以看到获得了两个键值:

[key:"/test/key1" create_revision:2 mod_revision:13 version:6 value:"Hello etcd!"  
key:"/test/key2" create_revision:5 mod_revision:14 version:4 value:"Hello World!" ]

代码:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/coreos/etcd/clientv3"
)

func main() {
	config := clientv3.Config{
		Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表
		DialTimeout: 5 * time.Second,
	}

	// 建立一个客户端
	client, err := clientv3.New(config)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 用于读写etcd的键值对
	kv := clientv3.NewKV(client)

	kv.Put(context.TODO(), "/demo/A/B", "BBB", clientv3.WithPrevKV())
	kv.Put(context.TODO(), "/demo/A/C", "CCC", clientv3.WithPrevKV())
	// 	读取/demo/A/为前缀的所有key
	// clientv3.WithPrefix() , clientv3.WithCountOnly() 可以有多个并以 逗号分隔即可
	getResp, err := kv.Get(context.TODO(), "/demo/A/", clientv3.WithPrefix() /*,clientv3.WithCountOnly()*/)
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println(getResp.Kvs, getResp.Count)
	for _, resp := range getResp.Kvs {
		fmt.Printf("key: %s, value:%s\n", string(resp.Key), string(resp.Value))
	}
}

结果:

[key:"/demo/A/B" create_revision:6 mod_revision:22 version:6 value:"BBB"  
key:"/demo/A/C" create_revision:7 mod_revision:23 version:12 value:"CCC" ] 2
key: /demo/A/B, value:BBB
key: /demo/A/C, value:CCC

Delete

代码:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/coreos/etcd/clientv3"
)

func main() {
	config := clientv3.Config{
		Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表
		DialTimeout: 5 * time.Second,
	}

	// 建立一个客户端
	client, err := clientv3.New(config)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 用于读写etcd的键值对
	kv := clientv3.NewKV(client)

	kv.Put(context.TODO(), "/demo/A/B1", "BBB", clientv3.WithPrevKV())
	kv.Put(context.TODO(), "/demo/A/B2", "CCC", clientv3.WithPrevKV())
	kv.Put(context.TODO(), "/demo/A/B3", "DDD", clientv3.WithPrevKV())
	/*
		clientv3.WithFromKey() 表示针对的key操作是大于等于当前给定的key
		clientv3.WithPrevKV() 表示返回的 response 中含有之前删除的值,否则
		下面的 delResp.PrevKvs 为空
	*/
	delResp, err := kv.Delete(context.TODO(), "/demo/A/B",
		clientv3.WithFromKey(), clientv3.WithPrevKV())
	if err != nil {
		fmt.Println(err)
	}
	// 查看被删除的 key 和 value 是什么
	if delResp.PrevKvs != nil {
		// if len(delResp.PrevKvs) != 0 {
		for _, kvpair := range delResp.PrevKvs {
			fmt.Println("已删除:", string(kvpair.Key), string(kvpair.Value))
		}
	}
}

结果:

已删除: /demo/A/B1 BBB
已删除: /demo/A/B2 CCC
已删除: /demo/A/B3 DDD
Lease续租

etcd客户端的Lease对象可以通过以下代码获取到

lease := clientv3.NewLease(cli)

其中,lease 对象是 Lease 接口的实现, Lease 接口的声明如下:

type Lease interface {
    // Grant 创建一个新租约
    Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
    // Revoke 销毁给定租约ID的租约
    Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
    // TimeToLive retrieves the lease information of the given lease ID.
    TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
    // Leases retrieves all leases.
    Leases(ctx context.Context) (*LeaseLeasesResponse, error)
    // KeepAlive keeps the given lease alive forever.
    KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
    // KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
    // should be used instead of KeepAliveOnce.
    KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
    // Close releases all resources Lease keeps for efficient communication
    // with the etcd server.
    Close() error
}

Lease 提供了以下功能:

  • Grant :分配一个租约;
  • Revoke :释放一个租约;
  • TimeToLive :获取剩余TTL时间;
  • Leases :列举所有etcd中的租约;
  • KeepAlive :自动定时的续约某个租约;
  • KeepAliveOnce :为某个租约续约一次;
  • Close :释放当前客户端建立的所有租约;
    要想实现 key 自动过期,首先得创建一个租约,下面的代码创建一个 TTL 为 10 秒的租约:
grantResp, err := lease.Grant(context.TODO(), 10)

返回的 grantResponse 的结构体声明如下:

// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
    *pb.ResponseHeader
    ID    LeaseID
    TTL   int64
    Error string
}

在应用程序代码中主要使用到的是租约 ID

接下来我们用这个 Lease 往 etcd 中存储一个 10 秒过期的 key :

kv.Put(context.TODO(), "/test/vanish", "vanish in 10s", clientv3.WithLease(grantResp.ID))

这里特别需要注意,有一种情况是在 Put 之前 Lease 已经过期了,那么这个 Put 操作会返回 error ,此时你需要重新分配 Lease

当我们实现服务注册时,需要主动给 Lease 进行续约,通常是以小于 TTL 的间隔循环调用 LeaseKeepAliveOnce() 方法对租约进行续期,一旦某个服务节点出错无法完成租约的续期,等 key 过期后客户端即无法在查询服务时获得对应节点的服务,这样就通过租约到期实现了服务的错误隔离

keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)

或者使用KeepAlive()方法,其会返回<-chan *LeaseKeepAliveResponse只读通道,每次自动续租成功后会向通道中发送信号。

一般都用KeepAlive()方法, KeepAlivePut 一样,如果在执行之前 Lease 就已经过期了,那么需要重新分配 Leaseetcd 并没有提供 API 来实现原子的 Put with Lease ,需要我们自己判断 err 重新分配 Lease

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/coreos/etcd/clientv3"
)

func main() {
	config := clientv3.Config{
		Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表
		DialTimeout: 5 * time.Second,
	}

	// 建立一个客户端
	client, err := clientv3.New(config)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 创建一个lease(租约)对象
	lease := clientv3.NewLease(client)
	// 申请一个10秒的租约
	leaseGrantResp, err := lease.Grant(context.TODO(), 10)
	if err != nil {
		fmt.Println(err)
		return
	}
	// 拿到租约的ID
	leaseId := leaseGrantResp.ID

	// 自动永久续租
	keepRespChan, err := lease.KeepAlive(context.TODO(), leaseId)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 处理续约应答的协程
	go func() {
		for {
			select {
			case keepResp := <-keepRespChan:
				if keepResp == nil {
					fmt.Println("租约已经失效了")
					goto END
				} else { // 每秒会续租一次, 所以就会受到一次应答
					fmt.Println("收到自动续租应答:", keepResp.ID)
				}
			}
		}
	END:
	}()

	// 获得kv API子集
	kv := clientv3.NewKV(client)

	// Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期
	putResp, err := kv.Put(context.TODO(), "/demo/A/B1", "hello", clientv3.WithLease(leaseId))
	if err != nil {
		fmt.Println(err)
		return
	}

	fmt.Println("写入成功:", putResp.Header.Revision)

	// 定时的看一下key过期了没有
	for {
		getResp, err := kv.Get(context.TODO(), "/demo/A/B1")
		if err != nil {
			fmt.Println(err)
			return
		}
		if getResp.Count == 0 {
			fmt.Println("kv过期了")
			break
		}
		fmt.Println("还没过期:", getResp.Kvs)
		time.Sleep(2 * time.Second)
	}
}

结果:

收到自动续租应答: 8488292048996991588
写入成功: 80
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
收到自动续租应答: 8488292048996991588
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
收到自动续租应答: 8488292048996991588
Op 获取设置联合操作

Op 字面意思就是”操作”, GetPut 都属于 Op ,只是为了简化用户开发而开放的特殊 API

KV 对象有一个 Do 方法接受一个 Op

// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)

其参数 Op 是一个抽象的操作,可以是 Put/Get/Delete… ;而 OpResponse 是一个抽象的结果,可以是 PutResponse/GetResponse…

可以通过 Client 中定义的一些方法来创建 Op

  • func OpDelete(key string, opts …OpOption) Op
  • func OpGet(key string, opts …OpOption) Op
  • func OpPut(key, val string, opts …OpOption) Op
  • func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op

其实和直接调用 KV.PutKV.GET 没什么区别。下面是一个例子:

cli, err := clientv3.New(clientv3.Config{
    Endpoints:   endpoints,
    DialTimeout: dialTimeout,
})
if err != nil {
    log.Fatal(err)
}
defer cli.Close()
ops := []clientv3.Op{
    clientv3.OpPut("put-key", "123"),
    clientv3.OpGet("put-key"),
    clientv3.OpPut("put-key", "456")}
for _, op := range ops {
    if _, err := cli.Do(context.TODO(), op); err != nil {
        log.Fatal(err)
    }
}

Op 交给 Do 方法执行,返回的 opResp 结构如下:

type OpResponse struct {
    put *PutResponse
    get *GetResponse
    del *DeleteResponse
    txn *TxnResponse
}

你的操作是什么类型,你就用哪个指针来访问对应的结果。

示例代码:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/coreos/etcd/clientv3"
)

func main() {
	config := clientv3.Config{
		Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表
		DialTimeout: 5 * time.Second,
	}

	// 建立一个客户端
	client, err := clientv3.New(config)
	if err != nil {
		fmt.Println(err)
		return
	}
	// 获得kv API子集
	kv := clientv3.NewKV(client)

	// 创建Op: operation
	putOp := clientv3.OpPut("/demo/A/B1", "BBBBB")

	// 执行OP 	// kv.Do(op)
	opResp, err := kv.Do(context.TODO(), putOp)
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("写入Revision:", opResp.Put().Header.Revision)

	// 创建Op
	getOp := clientv3.OpGet("/demo/A/B1")

	// 执行OP
	opResp, err = kv.Do(context.TODO(), getOp)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 打印 create rev == mod rev
	fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision) 
	fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
}

结果:

写入Revision: 105
数据Revision: 105
数据value: BBBBB
Txn事务操作

etcd 中事务是原子执行的,只支持 if … then … else … 这种表达。首先来看一下 Txn 中定义的方法:

type Txn interface {
    // If takes a list of comparison. If all comparisons passed in succeed,
    // the operations passed into Then() will be executed. Or the operations
    // passed into Else() will be executed.
    If(cs ...Cmp) Txn
    // Then takes a list of operations. The Ops list will be executed, if the
    // comparisons passed in If() succeed.
    Then(ops ...Op) Txn
    // Else takes a list of operations. The Ops list will be executed, if the
    // comparisons passed in If() fail.
    Else(ops ...Op) Txn
    // Commit tries to commit the transaction.
    Commit() (*TxnResponse, error)
}

Txn 必须是这样使用的:If(满足条件) Then(执行若干Op) Else(执行若干Op)。

If 中支持传入多个 Cmp 比较条件,如果所有条件满足,则执行 Then 中的 Op (上一节介绍过Op),否则执行 Else中 的 Op

首先,我们需要开启一个事务,这是通过 KV 对象的方法实现的:

txn := kv.Txn(context.TODO())

下面的测试程序,判断如果 k1 的值大于 v1 并且 k1 的版本号是 2,则 Put 键值 k2k3 ,否则 Put 键值 k4k5

kv.Txn(context.TODO()).If(
 clientv3.Compare(clientv3.Value(k1), ">", v1),
 clientv3.Compare(clientv3.Version(k1), "=", 2)
).Then(
 clientv3.OpPut(k2,v2), clentv3.OpPut(k3,v3)
).Else(
 clientv3.OpPut(k4,v4), clientv3.OpPut(k5,v5)
).Commit()

类似于 clientv3.Value() 用于指定 key 属性的,有这么几个方法:

  • func CreateRevision(key string) Cmp:key=xxx的创建版本必须满足…
  • func LeaseValue(key string) Cmp:key=xxx的Lease ID必须满足…
  • func ModRevision(key string) Cmp:key=xxx的最后修改版本必须满足…
  • func Value(key string) Cmp:key=xxx的创建值必须满足…
  • func Version(key string) Cmp:key=xxx的累计更新次数必须满足…
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/coreos/etcd/clientv3"
)

func main() {
	config := clientv3.Config{
		Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表
		DialTimeout: 5 * time.Second,
	}

	// 建立一个客户端
	client, err := clientv3.New(config)
	if err != nil {
		fmt.Println(err)
		return
	}

	// lease实现锁自动过期:
	// op操作
	// txn事务: if else then

	// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
	lease := clientv3.NewLease(client)

	// 申请一个5秒的租约
	leaseGrantResp, err := lease.Grant(context.TODO(), 5)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 拿到租约的ID
	leaseId := leaseGrantResp.ID

	// 准备一个用于取消自动续租的context
	ctx, cancelFunc := context.WithCancel(context.TODO())

	// 确保函数退出后, 自动续租会停止
	defer cancelFunc()
	defer lease.Revoke(context.TODO(), leaseId)

	// 5秒后会取消自动续租
	keepRespChan, err := lease.KeepAlive(ctx, leaseId)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 处理续约应答的协程
	go func() {
		for {
			select {
			case keepResp := <-keepRespChan:
				if keepResp == nil {
					fmt.Println("租约已经失效了")
					goto END
				} else { // 每秒会续租一次, 所以就会受到一次应答
					fmt.Println("收到自动续租应答:", keepResp.ID)
				}
			}
		}
	END:
	}()

	//  if 不存在key, then 设置它, else 抢锁失败
	kv := clientv3.NewKV(client)

	// 创建事务
	txn := kv.Txn(context.TODO())

	// 定义事务

	// 如果key不存在
	txn.If(clientv3.Compare(clientv3.CreateRevision("/demo/A/B1"), "=", 0)).
		Then(clientv3.OpPut("/demo/A/B1", "xxx", clientv3.WithLease(leaseId))).
		Else(clientv3.OpGet("/demo/A/B1")) // 否则抢锁失败

	// 提交事务
	txnResp, err := txn.Commit()
	if err != nil {
		fmt.Println(err)
		return // 没有问题
	}

	// 判断是否抢到了锁
	if !txnResp.Succeeded {
		fmt.Println("锁被占用:", string(
			txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
		return
	}

	// 2, 处理业务

	fmt.Println("处理任务")
	time.Sleep(5 * time.Second)

	// 3, 释放锁(取消自动续租, 释放租约)
	// defer 会把租约释放掉, 关联的KV就被删除了
}

结果:

收到自动续租应答: 8488292048996991680
锁被占用: BBBBB
Watch 监听操作

Watch 用于监听某个键的变化, Watch调用后返回一个WatchChan,它的类型声明如下:

type WatchChan <-chan WatchResponse
type WatchResponse struct {
    Header pb.ResponseHeader
    Events []*Event
    CompactRevision int64
    Canceled bool
    Created bool
}

当监听的 key 有变化后会向WatchChan发送WatchResponse

Watch 的典型应用场景是应用于系统配置的热加载,我们可以在系统读取到存储在 etcd key 中的配置后,用 Watch 监听 key 的变化。在单独的 goroutine 中接收 WatchChan 发送过来的数据,并将更新应用到系统设置的配置变量中,比如像下面这样在 goroutine 中更新变量 appConfig ,这样系统就实现了配置变量的热加载

type AppConfig struct {
  config1 string
  config2 string
}

var appConfig Appconfig

func watchConfig(clt *clientv3.Client, key string, ss interface{}) {
    watchCh := clt.Watch(context.TODO(), key)
    go func() {
        for res := range watchCh {
            value := res.Events[0].Kv.Value
            if err := json.Unmarshal(value, ss); err != nil {
                fmt.Println("now", time.Now(), "watchConfig err", err)
                continue
            }
            fmt.Println("now", time.Now(), "watchConfig", ss)
        }
    }()
}
watchConfig(client, "config_key", &appConfig)

代码:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/mvcc/mvccpb"
)

func main() {
	config := clientv3.Config{
		Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表
		DialTimeout: 5 * time.Second,
	}

	// 建立一个客户端
	client, err := clientv3.New(config)
	if err != nil {
		fmt.Println(err)
		return
	}
	// 获得kv API子集
	kv := clientv3.NewKV(client)

	// 模拟etcd中KV的变化
	go func() {
		for {
			kv.Put(context.TODO(), "/demo/A/B1", "i am B1")

			kv.Delete(context.TODO(), "/demo/A/B1")

			time.Sleep(1 * time.Second)
		}
	}()

	// 先GET到当前的值,并监听后续变化
	getResp, err := kv.Get(context.TODO(), "/demo/A/B1")
	if err != nil {
		fmt.Println(err)
		return
	}

	// 现在key是存在的
	if len(getResp.Kvs) != 0 {
		fmt.Println("当前值:", string(getResp.Kvs[0].Value))
	}

	// 当前etcd集群事务ID, 单调递增的
	watchStartRevision := getResp.Header.Revision + 1

	// 创建一个watcher
	watcher := clientv3.NewWatcher(client)

	// 启动监听
	fmt.Println("从该版本向后监听:", watchStartRevision)

	// 创建一个 5s 后取消的上下文
	ctx, cancelFunc := context.WithCancel(context.TODO())
	time.AfterFunc(5*time.Second, func() {
		cancelFunc()
	})

	// 该监听动作在 5s 后取消
	watchRespChan := watcher.Watch(ctx, "/demo/A/B1", clientv3.WithRev(watchStartRevision))

	// 处理kv变化事件
	for watchResp := range watchRespChan {
		for _, event := range watchResp.Events {
			switch event.Type {
			case mvccpb.PUT:
				fmt.Println("修改为:", string(event.Kv.Value), "Revision:", 
								event.Kv.CreateRevision, event.Kv.ModRevision)
			case mvccpb.DELETE:
				fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
			}
		}
	}

}

结果:

从该版本向后监听: 94
修改为: i am B1 Revision: 94 94
删除了 Revision: 95
修改为: i am B1 Revision: 96 96
删除了 Revision: 97
修改为: i am B1 Revision: 98 98
删除了 Revision: 99
修改为: i am B1 Revision: 100 100
删除了 Revision: 101
修改为: i am B1 Revision: 102 102
删除了 Revision: 103

参考资料:
https://blog.csdn.net/wohu1104/article/details/108552649

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐