Dawn's Blogs

分享技术 记录成长

0%

从零实现分布式缓存 (7) 使用Protobuf通信

本节实现 protobuf 进行节点间的通信。最终代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
dawncachepb/
|--dawncachepb.pb.go
|--dawncachepb.proto
lru/
|--lru.go
|--lru_test.go
singleflight/
|--singleflight.go
byteview.go
cache.go
consistenthash.go
dawncache.go
dawncache_test.go
go.mod
http.go
peers.go

使用 protobuf 通信

编写 proto 文件

dawncachepb/dawncachepb.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
syntax = "proto3";

package dawncachepb;

option go_package = "./;dawncachepb";

message Request {
string group = 1;
string key = 2;
}

message Response {
bytes value = 1;
}

service GroupCache {
rpc Get(Request) returns (Response);
}

生成 pb.go 代码

生成 dawncachepb.pb.go

1
protoc --go_out=. *.proto

dawncachepb/dawncachepb.pb.go 有如下数据类型:

1
2
3
4
5
6
7
8
9
10
type Request struct {
// ...
Group string `protobuf:"bytes,1,opt,name=group,proto3" json:"group,omitempty"`
Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
}

type Response struct {
// ...
Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
}

修改 PeerGetter 接口

修改 peers.go 中的 PeerGetter 接口,参数使用 geecachepb.pb.go 中的数据类型。

1
2
3
4
5
// PeerGetter 远程获取数据的接口
type PeerGetter interface {
// Get 根据 groupName 和 key 获取源数据
Get(in *pb.Request, out *pb.Response) error
}

修改使用 PeerGetter 接口的代码

dawncache.go

1
2
3
4
5
6
7
8
9
10
11
12
13
// getFromPeer 从 peer 处获取数据
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
req := &pb.Request{
Group: g.name,
Key: key,
}
res := &pb.Response{}
err := peer.Get(req, res)
if err != nil {
return ByteView{}, err
}
return ByteView{b: res.Value}, nil
}

http.go

  • ServeHTTP() 中使用 proto.Marshal() 编码 HTTP 响应。
  • Get() 中使用 proto.Unmarshal() 解码 HTTP 响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// ServeHTTP 处理查询缓存的请求,实现了 http.Handler 接口
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 判断是否有 basePath
if !strings.HasPrefix(r.URL.Path, p.basePath) {
http.Error(w, "HTTPPool serving unexpected path: "+r.URL.Path, http.StatusBadRequest)
return
}

// 检查是否有 groupName 和 key
parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}

groupName := parts[0]
key := parts[1]

// 通过 groupName 获取 group
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group:"+groupName, http.StatusBadRequest)
return
}

// 从缓存中获取数据
view, err := group.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// 响应客户端
body, err := proto.Marshal(&pb.Response{Value: view.ByteSlice()}) // 编码
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(body)
}

// Get 实现了 PeerGetter 接口,用于远程获取源数据
func (h *HTTPGetter) Get(in *pb.Request, out *pb.Response) error {
url := fmt.Sprintf("%s%s/%s", h.basePath, url.QueryEscape(in.GetGroup()), url.QueryEscape(in.GetKey()))

res, err := http.Get(url)
if err != nil {
// 发送请求失败
return err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
// 状态码不是 200
return fmt.Errorf("server status code: %v", res.StatusCode)
}

data, err := ioutil.ReadAll(res.Body)
if err != nil {
// 读取数据失败
return errors.New("read response body failed")
}

if err = proto.Unmarshal(data, out); err != nil {
return fmt.Errorf("decoding response body: %v", err)
}

return nil
}