Dawn's Blogs

分享技术 记录成长

0%

从零实现分布式缓存 (5) 分布式节点

本节实现一致性哈希选择节点,并且实现了 HTTP 客户端。最终代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
lru/
|--lru.go
|--lru_test.go
byteview.go
cache.go
consistenthash.go
dawncache.go
dawncache_test.go
go.mod
http.go
peers.go

PeerPicker

分布式缓存获取数据的流程如下:

1
2
3
4
5
6

接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴
| 否 是
|-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵
| 否
|-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 ⑶

之前已经实现了 (1) 和 (3) 现在需要实现流程 (2) 从远程获取数据:

1
2
3
4
使用一致性哈希选择节点        是                                    是
|-----> 是否是远程节点 -----> HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值
| 否 ↓ 否
|----------------------------> 回退到本地节点处理。

抽象 PeerPicker

peers.go

  • 现在需要定义一个 PeerPicker 接口,用于选择与哪一个节点进行通信。

  • PeerGetter 接口用于具体的远程获取数据的操作。

1
2
3
4
5
6
7
8
9
10
11
// PeerPicker 选取节点的接口
type PeerPicker interface {
// PickPeer 根据 key 选择相应的 PeerGetter 获取数据
PickPeer(key string) (peer PeerGetter, ok bool)
}

// PeerGetter 远程获取数据的接口
type PeerGetter interface {
// Get 根据 groupName 和 key 获取源数据
Get(groupName string, key string) ([]byte, error)
}

节点选择与 HTTP 客户端

http.go

实现 PeerGetter

首先定义一个 HTTPGetter 结构体,用于实现 PeerGetter 接口:

1
2
3
4
// HTTPGetter 通过 HTTP 远程获取数据
type HTTPGetter struct {
basePath string
}

实现 Get 方法,用于远程获取数据,它通过发送 HTTP GET 请求来从远程获取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Get 实现了 PeerGetter 接口,用于远程获取源数据
func (h *HTTPGetter) Get(groupName string, key string) ([]byte, error) {
url := fmt.Sprintf("%s%s/%s", h.basePath, groupName, key)

res, err := http.Get(url)
if err != nil {
// 发送请求失败
return nil, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
// 状态码不是 200
return nil, fmt.Errorf("server status code: %v", res.StatusCode)
}

data, err := ioutil.ReadAll(res.Body)
if err != nil {
// 读取数据失败
return nil, errors.New("read response body failed")
}

return data, nil
}

var _ PeerGetter = (*HTTPGetter)(nil) // 检查实现 PeerGetter 接口

实现 PeerPicker

修改 HTTPPool 结构体,使之记录一致性哈希的结构体以及 HTTPGetter

1
2
3
4
5
6
7
type HTTPPool struct {
self string // 如 http://127.0.0.1:8080
basePath string // 节点间通讯地址的前缀,如 http:// 127.0.1:8080/basePath/groupName/key 用于请求数据
mu sync.Mutex
peers *Map // 一致性哈希,根据 key 来选择节点
httpGetters map[string]*HTTPGetter // 根据 baseURL 选择 HTTPGetter
}

Set 方法用于添加节点,即在一致性哈希的哈希环中添加节点:

1
2
3
4
5
6
7
8
9
10
11
12
// Set 添加节点
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = New(defaultReplicas, nil)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*HTTPGetter)

for _, peer := range peers {
p.httpGetters[peer] = &HTTPGetter{basePath: peer + p.basePath}
}
}

PickPeer 方法用于根据 key 选择节点,实现 PeerPicker 接口:

1
2
3
4
5
6
7
8
9
10
// PickPeer 实现 PeerPicker 接口,用于根据 key 选择节点
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if peer := p.peers.Get(key); peer != "" && peer != p.self {
p.Log("Pick peer %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}

实现主流程

dawncache.go

修改 Group 结构体,添加 PeerPicker

1
2
3
4
5
6
type Group struct {
name string // 一个组的命名空间,用于区分不同的缓存,如学生姓名、成绩可以放到不同的缓存中去
getter Getter // 当查找数据未命中时,调用该函数获取值
mainCache cache // 底层缓存
peers PeerPicker
}

实现一个方法用于注册 PeerPicker

1
2
3
4
5
6
7
8
// RegisterPeers 注册 PeerPicker
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
// RegisterPeers 不允许调用超过1次
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}

修改 load 方法,使之:

  • 既能够从远程获取数据。
  • 又能够在本地通过回调函数获取数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// load 从别处加载数据
func (g *Group) load(key string) (ByteView, error) {
if g.peers != nil {
// peers 不为空,可以从远程获取数据
if peer, ok := g.peers.PickPeer(key); ok {
// 从远程获取数据
view, err := g.getFromPeer(peer, key)
if err != nil {
log.Println("[GeeCache] Failed to get from peer", err)
return ByteView{}, err
}
return view, nil
}
}
// 本地通过回调函数获取数据
return g.getLocally(key)
}

// getFromPeer 从 peer 处获取数据
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
data, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{b: data}, nil
}