Dawn's Blogs

分享技术 记录成长

0%

GO语言杂谈 (7) 操作etcd

etcd

etcd是使用Go语言开发的一个开源的、高可用的分布式key-value存储系统,可以用于配置共享服务的注册和发现

etcd具有以下特点

  • 完全复制:集群中的每个节点都可以使用完整的存档
  • 高可用性:Etcd可用于避免硬件的单点故障或网络问题
  • 一致性:每次读取都会返回跨多主机的最新写入
  • 简单:包括一个定义良好、面向用户的API(gRPC)
  • 安全:实现了带有可选的客户端证书身份验证的自动化TLS
  • 快速:每秒10000次写入的基准速度
  • 可靠:使用Raft算法实现了强一致、高可用的服务存储目录

应用场景

服务发现

在一个分布式集群中,如何找到提供某项服务的主机并与之建立连接,这就是服务发现所完成的功能:

  1. 服务提供者首先在注册中心注册服务
  2. 服务请求者根据所需要的服务在注册中心请求服务提供者的地址信息
  3. 服务请求者与服务提供者建立连接,完成服务

服务发现

配置中心

将一些配置信息放到 etcd 上进行集中管理。

这类场景的使用方式通常是这样:应用在启动的时候主动从 etcd 获取一次配置信息,同时,在 etcd 节点上注册一个 Watcher 并等待,以后每次配置有更新的时候,etcd 都会实时通知订阅者,以此达到获取最新配置信息的目的。

分布式锁

因为 etcd 使用 Raft 算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁。锁服务有两种使用方式,一是保持独占,二是控制时序。

  • 保持独占即所有获取锁的用户最终只有一个可以得到。etcd 为此提供了一套实现分布式锁原子操作 CAS(CompareAndSwap)的 API。通过设置prevExist值,可以保证在多个节点同时去创建某个目录时,只有一个成功。而创建成功的用户就可以认为是获得了锁。
  • 控制时序,即所有想要获得锁的用户都会被安排执行,但是获得锁的顺序也是全局唯一的,同时决定了执行顺序。etcd 为此也提供了一套 API(自动创建有序键),对一个目录建值时指定为POST动作,这样 etcd 会自动在目录下生成一个当前最大的值为键,存储这个新的值(客户端编号)。同时还可以使用 API 按顺序列出所有当前目录下的键值。此时这些键的值就是客户端的时序,而这些键中存储的值可以是代表客户端的编号。

etcd架构

etcd主要分为四个部分:

  • HTTP Server:用于处理用户发送的API请求以及其他etcd节点的同步与心跳信息请求。
  • Store:用于处理etcd支持的各类功能的事务,包括数据索引、节点状态变更、监控与反馈、事件的处理与执行等等,是etcd对用户提供的大多数API的具体实现。
  • Raft:强一致性算法
  • WAL:Write Ahead Log(预写式日志),etcd通过WAL对数据进行持久化存储。WAL中,所有的数据提交前都会实现记录日志。Snapshot(快照)是为了方式数据过多而进行的状态快照;Entry表示存储的日志内容。

etcd架构

etcd集群

etcd一般部署集群推荐奇数个节点,推荐的数量为 3、5 或者 7 个节点构成一个集群。

GO语言操作etcd

put和get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)

func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")

defer cli.Close()
// put
// ctx用于控制超时
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "dawn", "zh")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}

// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "dawn")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
// 遍历取到结果
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}

watch

用于监控etcd中的key的变化(创建/更改/删除)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")

// watch,返回一个通道
wChan := cli.Watch(context.Background(), "key")

// 遍历通道,一旦有事件发生,watch会向通道中发送数据
for wresp := range wChan {
for _, evt := range wresp.Events {
fmt.Printf("type:%s key:%s value:%s\n", evt.Type, evt.Kv.Key, evt.Kv.Value)
}
}
}

lease租约

keepAlive

基于etcd实现分布式锁