本节实现一致性哈希选择节点,并且实现了 HTTP 客户端。最终代码结构如下:
1 2 3 4 5 6 7 8 9 10 11
| lru/ |--lru.go |--lru_test.go byteview.go cache.go consistenthash.go dawncache.go dawncache_test.go go.mod http.go peers.go
|
PeerPicker
分布式缓存获取数据的流程如下:
1 2 3 4 5 6
| 是 接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴ | 否 是 |-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵ | 否 |-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 ⑶
|
之前已经实现了 (1) 和 (3) 现在需要实现流程 (2) 从远程获取数据:
1 2 3 4
| 使用一致性哈希选择节点 是 是 |-----> 是否是远程节点 -----> HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值 | 否 ↓ 否 |----------------------------> 回退到本地节点处理。
|
抽象 PeerPicker
peers.go
1 2 3 4 5 6 7 8 9 10 11
| type PeerPicker interface { PickPeer(key string) (peer PeerGetter, ok bool) }
type PeerGetter interface { Get(groupName string, key string) ([]byte, error) }
|
节点选择与 HTTP 客户端
http.go
实现 PeerGetter
首先定义一个 HTTPGetter 结构体,用于实现 PeerGetter 接口:
1 2 3 4
| type HTTPGetter struct { basePath string }
|
实现 Get 方法,用于远程获取数据,它通过发送 HTTP GET 请求来从远程获取数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func (h *HTTPGetter) Get(groupName string, key string) ([]byte, error) { url := fmt.Sprintf("%s%s/%s", h.basePath, groupName, key)
res, err := http.Get(url) if err != nil { return nil, err } defer res.Body.Close()
if res.StatusCode != http.StatusOK { return nil, fmt.Errorf("server status code: %v", res.StatusCode) }
data, err := ioutil.ReadAll(res.Body) if err != nil { return nil, errors.New("read response body failed") }
return data, nil }
var _ PeerGetter = (*HTTPGetter)(nil)
|
实现 PeerPicker
修改 HTTPPool 结构体,使之记录一致性哈希的结构体以及 HTTPGetter:
1 2 3 4 5 6 7
| type HTTPPool struct { self string basePath string mu sync.Mutex peers *Map httpGetters map[string]*HTTPGetter }
|
Set 方法用于添加节点,即在一致性哈希的哈希环中添加节点:
1 2 3 4 5 6 7 8 9 10 11 12
| func (p *HTTPPool) Set(peers ...string) { p.mu.Lock() defer p.mu.Unlock() p.peers = New(defaultReplicas, nil) p.peers.Add(peers...) p.httpGetters = make(map[string]*HTTPGetter)
for _, peer := range peers { p.httpGetters[peer] = &HTTPGetter{basePath: peer + p.basePath} } }
|
PickPeer 方法用于根据 key 选择节点,实现 PeerPicker 接口:
1 2 3 4 5 6 7 8 9 10
| func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) { p.mu.Lock() defer p.mu.Unlock() if peer := p.peers.Get(key); peer != "" && peer != p.self { p.Log("Pick peer %s", peer) return p.httpGetters[peer], true } return nil, false }
|
实现主流程
dawncache.go
修改 Group 结构体,添加 PeerPicker:
1 2 3 4 5 6
| type Group struct { name string getter Getter mainCache cache peers PeerPicker }
|
实现一个方法用于注册 PeerPicker:
1 2 3 4 5 6 7 8
| func (g *Group) RegisterPeers(peers PeerPicker) { if g.peers != nil { panic("RegisterPeerPicker called more than once") } g.peers = peers }
|
修改 load 方法,使之:
- 既能够从远程获取数据。
- 又能够在本地通过回调函数获取数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func (g *Group) load(key string) (ByteView, error) { if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { view, err := g.getFromPeer(peer, key) if err != nil { log.Println("[GeeCache] Failed to get from peer", err) return ByteView{}, err } return view, nil } } return g.getLocally(key) }
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) { data, err := peer.Get(g.name, key) if err != nil { return ByteView{}, err } return ByteView{b: data}, nil }
|