etcd的使用(GO)
文章目录1. 下载1.1 windows2. 连接2.1 修改环境变量(window)3. 简单使用3.1 etcdctlputgetdel3.2 Go语言安装 Golang 的 Etcd 包连接ETCDPUTGETDeleteLease续租Op 获取设置联合操作Txn事务操作Watch 监听操作etcd是一个非常可靠的kv存储系统,常在分布式系统中存储着关键的数据。它是由coreos团队开发并开
文章目录
etcd是一个非常可靠的kv存储系统,常在分布式系统中存储着关键的数据。它是由coreos团队开发并开源的分布式键值存储系统,具备以下特点:
- 简单:提供定义明确且面向用户的API
- 安全:支持SSL证书验证
- 性能:基准压测支持1w+/sec写入
- 可靠:采用Raft协议保证分布式系统数据的可用性和一致性。
etcd的这些特性,使得它常常出现在分布式设计场景下的工具集中。
1. 下载
1.1 windows
打开GitHub的上的etcd项目,直接下载压缩包就可以了
github地址
这里我下载的是3.3.13版本的,直接解压,然后运行etcd.exe就可以了
etcd 目前默认使用 2379
提供 HTTP API 服务, 2380
端口和peer 通信(这两个端口已经被 IANA 官方预留给 etcd )
2. 连接
注意:
默认的etccdctrl使用的是v2版本的命令。我们需要设置环境变量来使用v3版本的API
2.1 修改环境变量(window)
控制台输入命令
set ETCDCTL_API=3
3. 简单使用
3.1 etcdctl
etcdctl 是一个命令行客户端,它能提供一些简洁的命令,供用户直接跟etcd服务打交道,而无需基于 HTTP API 方式。可以方便我们在对服务进行测试或者手动修改数据库内容。我们刚开始可以通过 etdctl 来熟悉相关操作。这些操作跟 HTTP API 基本上是对应的。etcdctl 在两个不同的 etcd 版本下的行为方式也完全不同。
put
使用etccdctl 来put一个键值对进去
返回OK
get
根据key来获取value
del
根据key删除键值对
3.2 Go语言
安装 Golang 的 Etcd 包
我们使用 v3 版本的 etcd client , 首先通过 go get 下载并编译安装 etcd clinet v3。
go get -v github.com/coreos/etcd/clientv3
该命令会将包下载到 $GOPATH/src/github.com/coreos/etcd/clientv3 中,所有相关依赖包会自动下载编译,包括protobuf、grpc等。
连接ETCD
用程序访问 etcd
首先要创建 client
,它需要传入一个 Config
配置,这里传了 2 个参数:
- Endpoints : etcd 的多个节点服务地址;
- DialTimeout :创建 client 的首次连接超时时间,这里传了 5 秒,如果 5 秒都没有连接成功就会返回 err ,一旦 client 创建成功,我们就不用再关心后续底层连接的状态了, client 内部会重连;
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
// Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}
DialTimeout: 5 * time.Second,
})
返回的client,类型中的成员是 etcd 客户端核心功能模块的具体实现,它的类型具体如下:
type Client struct {
Cluster //向集群里增加 etcd 服务端节点之类,属于管理员操作。
KV //我们主要使用的功能,即 K-V 键值库的操作。
Lease //租约相关操作,比如申请一个 TTL=10 秒的租约(应用给 key 可以实现键值的自动过期)。
Watcher //观察订阅,从而监听最新的数据变化。
Auth //管理 etcd 的用户和权限,属于管理员操作。
Maintenance //维护 etcd ,比如主动迁移 etcd 的 leader 节点,属于管理员操作。
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
Password string
// contains filtered or unexported fields
}
我们需要使用什么功能,就去 client 里获取对应的成员即可。
Client.KV 是一个 interface ,提供了关于 K-V 操作的所有方法:
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn creates a transaction.
Txn(ctx context.Context) Txn
}
我们通过方法clientv3.NewKV()来获得 KV
接口的实现(实现中内置了错误重试机制):
kv := clientv3.NewKV(cli)
接下来,我们就可以通过kv来操作etcd中的数据。
PUT
putResp, err := kv.Put(context.TODO(),"/test/key1", "Hello etcd!")
第一个参数是 goroutine
的上下文 Context
。后面两个参数分别是 key
和 value
,对于 etcd 来说, key=/test/key1
只是一个字符串而已,但是对我们而言却可以模拟出目录层级关系。
Put的声明如下:
// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
除了上面例子中的三个的参数,还支持一个变长参数,可以传递一些控制项来影响 Put
的行为,例如可以携带一个 lease ID
来支持 key
过期。
Put 操作返回的是 PutResponse
,不同的 KV 操作对应不同的 response
结构,所有 KV 操作返回的 response
结构如下:
type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)
程序代码里导入 clientv3
后在 VSCode 中可以很快定位到 PutResponse
的定义文件中, PutResponse
只是 pb.PutResponse
的类型别名,通过VSCode跳转过去后可以看到 PutResponse
的详细定义:
type PutResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// if prev_kv is set in the request, the previous key-value pair will be returned.
PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
Header
里保存的主要是本次更新的 revision
信息,而 PrevKv
可以返回 Put 覆盖之前的 value
是什么(目前是 nil ,后面会说原因),我们需要判断 err 来确定操作是否成功。
我们Put 其他 3 个 key ,用于后续演示:
putResp, err := kv.Put(context.TODO(),"/test/key1", "Hello etcd!")
if err != nil{
fmt.print("put failed!")
}
kv.Put(context.TODO(),"/test/key2", "Hello World!")
// 再写一个同前缀的干扰项
kv.Put(context.TODO(), "/testspam", "spam")
现在 /test 目录下有两个键: key1
和 key2
, 而 /testspam
并不归属于 /test
目录。
代码:
package main
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"}, // 集群列表
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
// 用于读写etcd的键值对
kv := clientv3.NewKV(client)
// clientv3.WithPrevKV() 是一个可选控制项,用于获取在设置当前键值对之前的该键的键值对
// 有了该控制项后,putResp 才有 PrevKv 的属性,即获取之前的键值对。
// context.TODO() 表示当前还不知道用哪个 context 控制该操作,先用该字段占位
putResp, err := kv.Put(context.TODO(), "/demo/A/B", "hello", clientv3.WithPrevKV())
if err != nil {
fmt.Println(err)
}
fmt.Println("putResp is ", putResp)
fmt.Println("Revision:", putResp.Header.Revision)
if putResp.PrevKv != nil {
fmt.Println("PrevValue:", string(putResp.PrevKv.Value))
}
putResp, err = kv.Put(context.TODO(),"/test/key1", "Hello etcd!")
if err != nil{
fmt.print("put failed!")
}
kv.Put(context.TODO(),"/test/key2", "Hello World!")
// 再写一个同前缀的干扰项
kv.Put(context.TODO(), "/testspam", "spam")
}
GET
使用 KV 的 Get
方法来读取给定键的值:
getResp, err := kv.Get(context.TODO(), "/test/key1")
函数声明如下:
// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
和 Put
类似,函数注释里提示我们可以传递一些控制参数来影响 Get
的行为,比如: WithFromKey
表示读取从参数 key
开始递增的所有 key
,而不是读取单个 key
。
在上面的例子中,我没有传递 opOption
,所以就是获取 key=/test/key1
的最新版本数据。这里 err
并不能反馈出 key
是否存在(只能反馈出本次操作因为各种原因异常了),我们需要通过 GetResponse
(实际上是 pb.RangeResponse
)判断 key
是否存在
让我们看一下返回值的类型:
type RangeResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
// more indicates if there are more keys to return in the requested range.
More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
// count is set to the number of keys within the range when requested.
Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}
Kvs
字段,保存了本次 Get 查询到的所有 k-v
对,因为上述例子只 Get 了一个单 key
,所以只需要判断一下 len(Kvs
) 是否等于 1 即可知道 key
是否存在。
RangeResponse.More
和Count
,当我们使用withLimit()
等选项进行 Get 时会发挥作用,相当于翻页查询。
接下来,我们通过给 Get 查询增加 WithPrefix
选项,获取 /test
目录下的所有子元素:
rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())
WithPrefix()
是指查找以/test/
为前缀的所有 key
,因此可以模拟出查找子目录的效果。
etcd是一个有序的 k-v
存储,因此 /test/
为前缀的 key
总是顺序排列在一起。
withPrefix()
实际上会转化为范围查询,它根据前缀/test/
生成了一个前闭后开的key range:[“/test/”, “/test0”)
,为什么呢?因为比/(47)
大的字符是0(48)
,所以以/test0作为范围的末尾,就可以扫描到所有以/test/为前缀的 key 了。
在之前,我们 Put
了一个/testspam
键值,因为不符合/test/
前缀(注意末尾的 /
),所以就不会被这次Get获取到。但是,如果查询的前缀是/test,那么/testspam就会被返回,使用时一定要特别注意。
打印 rangeResp.Kvs 可以看到获得了两个键值:
[key:"/test/key1" create_revision:2 mod_revision:13 version:6 value:"Hello etcd!"
key:"/test/key2" create_revision:5 mod_revision:14 version:4 value:"Hello World!" ]
代码:
package main
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"192.168.0.113:2379"}, // 集群列表
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
// 用于读写etcd的键值对
kv := clientv3.NewKV(client)
kv.Put(context.TODO(), "/demo/A/B", "BBB", clientv3.WithPrevKV())
kv.Put(context.TODO(), "/demo/A/C", "CCC", clientv3.WithPrevKV())
// 读取/demo/A/为前缀的所有key
// clientv3.WithPrefix() , clientv3.WithCountOnly() 可以有多个并以 逗号分隔即可
getResp, err := kv.Get(context.TODO(), "/demo/A/", clientv3.WithPrefix() /*,clientv3.WithCountOnly()*/)
if err != nil {
fmt.Println(err)
}
fmt.Println(getResp.Kvs, getResp.Count)
for _, resp := range getResp.Kvs {
fmt.Printf("key: %s, value:%s\n", string(resp.Key), string(resp.Value))
}
}
结果:
[key:"/demo/A/B" create_revision:6 mod_revision:22 version:6 value:"BBB"
key:"/demo/A/C" create_revision:7 mod_revision:23 version:12 value:"CCC" ] 2
key: /demo/A/B, value:BBB
key: /demo/A/C, value:CCC
Delete
代码:
package main
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"192.168.0.113:2379"}, // 集群列表
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
// 用于读写etcd的键值对
kv := clientv3.NewKV(client)
kv.Put(context.TODO(), "/demo/A/B1", "BBB", clientv3.WithPrevKV())
kv.Put(context.TODO(), "/demo/A/B2", "CCC", clientv3.WithPrevKV())
kv.Put(context.TODO(), "/demo/A/B3", "DDD", clientv3.WithPrevKV())
/*
clientv3.WithFromKey() 表示针对的key操作是大于等于当前给定的key
clientv3.WithPrevKV() 表示返回的 response 中含有之前删除的值,否则
下面的 delResp.PrevKvs 为空
*/
delResp, err := kv.Delete(context.TODO(), "/demo/A/B",
clientv3.WithFromKey(), clientv3.WithPrevKV())
if err != nil {
fmt.Println(err)
}
// 查看被删除的 key 和 value 是什么
if delResp.PrevKvs != nil {
// if len(delResp.PrevKvs) != 0 {
for _, kvpair := range delResp.PrevKvs {
fmt.Println("已删除:", string(kvpair.Key), string(kvpair.Value))
}
}
}
结果:
已删除: /demo/A/B1 BBB
已删除: /demo/A/B2 CCC
已删除: /demo/A/B3 DDD
Lease续租
etcd
客户端的Lease
对象可以通过以下代码获取到
lease := clientv3.NewLease(cli)
其中,lease
对象是 Lease
接口的实现, Lease
接口的声明如下:
type Lease interface {
// Grant 创建一个新租约
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// Revoke 销毁给定租约ID的租约
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// KeepAlive keeps the given lease alive forever.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
// should be used instead of KeepAliveOnce.
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
}
Lease 提供了以下功能:
- Grant :分配一个租约;
- Revoke :释放一个租约;
- TimeToLive :获取剩余TTL时间;
- Leases :列举所有etcd中的租约;
- KeepAlive :自动定时的续约某个租约;
- KeepAliveOnce :为某个租约续约一次;
- Close :释放当前客户端建立的所有租约;
要想实现key
自动过期,首先得创建一个租约,下面的代码创建一个TTL
为 10 秒的租约:
grantResp, err := lease.Grant(context.TODO(), 10)
返回的 grantResponse 的结构体声明如下:
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
Error string
}
在应用程序代码中主要使用到的是租约 ID 。
接下来我们用这个 Lease 往 etcd 中存储一个 10 秒过期的 key :
kv.Put(context.TODO(), "/test/vanish", "vanish in 10s", clientv3.WithLease(grantResp.ID))
这里特别需要注意,有一种情况是在 Put 之前 Lease
已经过期了,那么这个 Put 操作会返回 error
,此时你需要重新分配 Lease
。
当我们实现服务注册时,需要主动给 Lease
进行续约,通常是以小于 TTL
的间隔循环调用 Lease
的 KeepAliveOnce()
方法对租约进行续期,一旦某个服务节点出错无法完成租约的续期,等 key
过期后客户端即无法在查询服务时获得对应节点的服务,这样就通过租约到期实现了服务的错误隔离。
keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)
或者使用KeepAlive()
方法,其会返回<-chan *LeaseKeepAliveResponse
只读通道,每次自动续租成功后会向通道中发送信号。
一般都用KeepAlive()
方法, KeepAlive
和 Put
一样,如果在执行之前 Lease
就已经过期了,那么需要重新分配 Lease
。 etcd
并没有提供 API 来实现原子的 Put with Lease
,需要我们自己判断 err
重新分配 Lease
。
package main
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"192.168.0.113:2379"}, // 集群列表
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
// 创建一个lease(租约)对象
lease := clientv3.NewLease(client)
// 申请一个10秒的租约
leaseGrantResp, err := lease.Grant(context.TODO(), 10)
if err != nil {
fmt.Println(err)
return
}
// 拿到租约的ID
leaseId := leaseGrantResp.ID
// 自动永久续租
keepRespChan, err := lease.KeepAlive(context.TODO(), leaseId)
if err != nil {
fmt.Println(err)
return
}
// 处理续约应答的协程
go func() {
for {
select {
case keepResp := <-keepRespChan:
if keepResp == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()
// 获得kv API子集
kv := clientv3.NewKV(client)
// Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期
putResp, err := kv.Put(context.TODO(), "/demo/A/B1", "hello", clientv3.WithLease(leaseId))
if err != nil {
fmt.Println(err)
return
}
fmt.Println("写入成功:", putResp.Header.Revision)
// 定时的看一下key过期了没有
for {
getResp, err := kv.Get(context.TODO(), "/demo/A/B1")
if err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
fmt.Println("kv过期了")
break
}
fmt.Println("还没过期:", getResp.Kvs)
time.Sleep(2 * time.Second)
}
}
结果:
收到自动续租应答: 8488292048996991588
写入成功: 80
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
收到自动续租应答: 8488292048996991588
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
收到自动续租应答: 8488292048996991588
Op 获取设置联合操作
Op
字面意思就是”操作”, Get
和 Put
都属于 Op
,只是为了简化用户开发而开放的特殊 API
。
KV
对象有一个 Do
方法接受一个 Op
:
// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
其参数 Op
是一个抽象的操作,可以是 Put/Get/Delete…
;而 OpResponse
是一个抽象的结果,可以是 PutResponse/GetResponse…
可以通过 Client
中定义的一些方法来创建 Op
:
- func OpDelete(key string, opts …OpOption) Op
- func OpGet(key string, opts …OpOption) Op
- func OpPut(key, val string, opts …OpOption) Op
- func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op
其实和直接调用 KV.Put
, KV.GET
没什么区别。下面是一个例子:
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ops := []clientv3.Op{
clientv3.OpPut("put-key", "123"),
clientv3.OpGet("put-key"),
clientv3.OpPut("put-key", "456")}
for _, op := range ops {
if _, err := cli.Do(context.TODO(), op); err != nil {
log.Fatal(err)
}
}
把 Op
交给 Do
方法执行,返回的 opResp
结构如下:
type OpResponse struct {
put *PutResponse
get *GetResponse
del *DeleteResponse
txn *TxnResponse
}
你的操作是什么类型,你就用哪个指针来访问对应的结果。
示例代码:
package main
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"192.168.0.113:2379"}, // 集群列表
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
// 获得kv API子集
kv := clientv3.NewKV(client)
// 创建Op: operation
putOp := clientv3.OpPut("/demo/A/B1", "BBBBB")
// 执行OP // kv.Do(op)
opResp, err := kv.Do(context.TODO(), putOp)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("写入Revision:", opResp.Put().Header.Revision)
// 创建Op
getOp := clientv3.OpGet("/demo/A/B1")
// 执行OP
opResp, err = kv.Do(context.TODO(), getOp)
if err != nil {
fmt.Println(err)
return
}
// 打印 create rev == mod rev
fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision)
fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
}
结果:
写入Revision: 105
数据Revision: 105
数据value: BBBBB
Txn事务操作
etcd
中事务是原子执行的,只支持 if … then … else …
这种表达。首先来看一下 Txn
中定义的方法:
type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
}
Txn
必须是这样使用的:If
(满足条件) Then
(执行若干Op
) Else
(执行若干Op
)。
If
中支持传入多个 Cmp
比较条件,如果所有条件满足,则执行 Then
中的 Op
(上一节介绍过Op
),否则执行 Else
中 的 Op
。
首先,我们需要开启一个事务,这是通过 KV
对象的方法实现的:
txn := kv.Txn(context.TODO())
下面的测试程序,判断如果 k1
的值大于 v1
并且 k1
的版本号是 2,则 Put
键值 k2
和 k3
,否则 Put
键值 k4
和 k5
。
kv.Txn(context.TODO()).If(
clientv3.Compare(clientv3.Value(k1), ">", v1),
clientv3.Compare(clientv3.Version(k1), "=", 2)
).Then(
clientv3.OpPut(k2,v2), clentv3.OpPut(k3,v3)
).Else(
clientv3.OpPut(k4,v4), clientv3.OpPut(k5,v5)
).Commit()
类似于 clientv3.Value()
用于指定 key
属性的,有这么几个方法:
- func CreateRevision(key string) Cmp:key=xxx的创建版本必须满足…
- func LeaseValue(key string) Cmp:key=xxx的Lease ID必须满足…
- func ModRevision(key string) Cmp:key=xxx的最后修改版本必须满足…
- func Value(key string) Cmp:key=xxx的创建值必须满足…
- func Version(key string) Cmp:key=xxx的累计更新次数必须满足…
package main
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"192.168.0.113:2379"}, // 集群列表
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
// lease实现锁自动过期:
// op操作
// txn事务: if else then
// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
lease := clientv3.NewLease(client)
// 申请一个5秒的租约
leaseGrantResp, err := lease.Grant(context.TODO(), 5)
if err != nil {
fmt.Println(err)
return
}
// 拿到租约的ID
leaseId := leaseGrantResp.ID
// 准备一个用于取消自动续租的context
ctx, cancelFunc := context.WithCancel(context.TODO())
// 确保函数退出后, 自动续租会停止
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)
// 5秒后会取消自动续租
keepRespChan, err := lease.KeepAlive(ctx, leaseId)
if err != nil {
fmt.Println(err)
return
}
// 处理续约应答的协程
go func() {
for {
select {
case keepResp := <-keepRespChan:
if keepResp == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()
// if 不存在key, then 设置它, else 抢锁失败
kv := clientv3.NewKV(client)
// 创建事务
txn := kv.Txn(context.TODO())
// 定义事务
// 如果key不存在
txn.If(clientv3.Compare(clientv3.CreateRevision("/demo/A/B1"), "=", 0)).
Then(clientv3.OpPut("/demo/A/B1", "xxx", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("/demo/A/B1")) // 否则抢锁失败
// 提交事务
txnResp, err := txn.Commit()
if err != nil {
fmt.Println(err)
return // 没有问题
}
// 判断是否抢到了锁
if !txnResp.Succeeded {
fmt.Println("锁被占用:", string(
txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2, 处理业务
fmt.Println("处理任务")
time.Sleep(5 * time.Second)
// 3, 释放锁(取消自动续租, 释放租约)
// defer 会把租约释放掉, 关联的KV就被删除了
}
结果:
收到自动续租应答: 8488292048996991680
锁被占用: BBBBB
Watch 监听操作
Watch
用于监听某个键的变化, Watch
调用后返回一个WatchChan
,它的类型声明如下:
type WatchChan <-chan WatchResponse
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event
CompactRevision int64
Canceled bool
Created bool
}
当监听的 key
有变化后会向WatchChan
发送WatchResponse
。
Watch
的典型应用场景是应用于系统配置的热加载,我们可以在系统读取到存储在 etcd key
中的配置后,用 Watch
监听 key
的变化。在单独的 goroutine
中接收 WatchChan
发送过来的数据,并将更新应用到系统设置的配置变量中,比如像下面这样在 goroutine
中更新变量 appConfig
,这样系统就实现了配置变量的热加载。
type AppConfig struct {
config1 string
config2 string
}
var appConfig Appconfig
func watchConfig(clt *clientv3.Client, key string, ss interface{}) {
watchCh := clt.Watch(context.TODO(), key)
go func() {
for res := range watchCh {
value := res.Events[0].Kv.Value
if err := json.Unmarshal(value, ss); err != nil {
fmt.Println("now", time.Now(), "watchConfig err", err)
continue
}
fmt.Println("now", time.Now(), "watchConfig", ss)
}
}()
}
watchConfig(client, "config_key", &appConfig)
代码:
package main
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"192.168.0.113:2379"}, // 集群列表
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
// 获得kv API子集
kv := clientv3.NewKV(client)
// 模拟etcd中KV的变化
go func() {
for {
kv.Put(context.TODO(), "/demo/A/B1", "i am B1")
kv.Delete(context.TODO(), "/demo/A/B1")
time.Sleep(1 * time.Second)
}
}()
// 先GET到当前的值,并监听后续变化
getResp, err := kv.Get(context.TODO(), "/demo/A/B1")
if err != nil {
fmt.Println(err)
return
}
// 现在key是存在的
if len(getResp.Kvs) != 0 {
fmt.Println("当前值:", string(getResp.Kvs[0].Value))
}
// 当前etcd集群事务ID, 单调递增的
watchStartRevision := getResp.Header.Revision + 1
// 创建一个watcher
watcher := clientv3.NewWatcher(client)
// 启动监听
fmt.Println("从该版本向后监听:", watchStartRevision)
// 创建一个 5s 后取消的上下文
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5*time.Second, func() {
cancelFunc()
})
// 该监听动作在 5s 后取消
watchRespChan := watcher.Watch(ctx, "/demo/A/B1", clientv3.WithRev(watchStartRevision))
// 处理kv变化事件
for watchResp := range watchRespChan {
for _, event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:",
event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
}
}
}
}
结果:
从该版本向后监听: 94
修改为: i am B1 Revision: 94 94
删除了 Revision: 95
修改为: i am B1 Revision: 96 96
删除了 Revision: 97
修改为: i am B1 Revision: 98 98
删除了 Revision: 99
修改为: i am B1 Revision: 100 100
删除了 Revision: 101
修改为: i am B1 Revision: 102 102
删除了 Revision: 103
参考资料:
https://blog.csdn.net/wohu1104/article/details/108552649
更多推荐
所有评论(0)