Dawn's Blogs

分享技术 记录成长

0%

Groupcache源码阅读 (4) HTTP服务

groupcache 的其中一个特定就是,它既是一个客户端,也是一个服务器。它连接到自己的 peer,形成分布式缓存。

所以,groupcache 中的 HTTP 服务即包括客户端也包括服务端。

HTTP 服务

客户端

groupcache 中,节点之间的通信是通过 protobuf 完成的,HTTP 客户端需要实现 ProtoGetter 接口。

1
2
3
4
// ProtoGetter is the interface that must be implemented by a peer.
type ProtoGetter interface {
Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResponse) error
}

httpGetter 结构体

httpGetter 结构体维护与某一个节点的 protobuf 通信,其中 baseURL 就记录了节点的远程 HTTP 地址。httpGetter 节点就实现了 ProtoGetter 接口。

1
2
3
4
type httpGetter struct {
transport func(context.Context) http.RoundTripper
baseURL string
}

服务端

HTTPPool 结构体

HTTPPool 结构体实现了 PeerPicker 接口和 http.Handler 接口。实现 PeerPicker 用于选择获取数据的节点;实现 http.Handler 接口用于实现 HTTP 服务端。

其中重要的字段如下:

  • self:记录自己的本地 URL。
  • peers:一致性哈希环,用于决定从哪一个节点中获取数据。
  • httpGetters:保存所有的 httpGetter,存储远程节点的 baseURL 与其对应的 httpGetter 的映射。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
// Context optionally specifies a context for the server to use when it
// receives a request.
// If nil, the server uses the request's context
Context func(*http.Request) context.Context

// Transport optionally specifies an http.RoundTripper for the client
// to use when it makes a request.
// If nil, the client uses http.DefaultTransport.
Transport func(context.Context) http.RoundTripper

// this peer's base URL, e.g. "https://example.net:8000"
self string

// opts specifies the options.
opts HTTPPoolOptions

mu sync.Mutex // guards peers and httpGetters
peers *consistenthash.Map
httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
}

实现 PeerPicker 接口

HTTPPool 实现了 PeerPicker 接口,在一致性哈希环中根据 key 查找节点

1
2
3
4
5
6
7
8
9
10
11
func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.peers.IsEmpty() {
return nil, false
}
if peer := p.peers.Get(key); peer != p.self {
return p.httpGetters[peer], true
}
return nil, false
}

同时它还定义了 Set 方法

  • 在一致性哈希环 peers 中添加节点
  • 同时完成节点与 httpGetter 的映射关系。
1
2
3
4
5
6
7
8
9
10
11
12
13
// Set updates the pool's list of peers.
// Each peer value should be a valid base URL,
// for example "http://example.net:8000".
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
}
}

实现 http.Handler 接口

HTTPPool 通过定义 ServeHTTP 方法,实现了 http.Handler 接口。ServeHTTP 方法流程如下:

  • 首先解析请求,从请求中解析出 group name 和 key
1
2
3
4
5
6
7
8
9
10
11
// Parse request.
if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
panic("HTTPPool serving unexpected path: " + r.URL.Path)
}
parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
groupName := parts[0]
key := parts[1]
  • 通过 group name 得到相应的 group。
1
2
3
4
5
6
// Fetch the value for this group/key.
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
  • 调用 group 中的 Get 方法获取 key 相应的数据。Get 方法会从本地或者远程节点中获取数据,并将缓存数据返回。
1
2
3
4
5
6
7
group.Stats.ServerRequests.Add(1)
var value []byte
err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
  • 将获取到的数据,以 protobuf 消息的方式,写入的 HTTP 响应体中
1
2
3
4
5
6
7
8
// Write the value to the response body as a proto message.
body, err := proto.Marshal(&pb.GetResponse{Value: value})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/x-protobuf")
w.Write(body)