Dawn's Blogs

分享技术 记录成长

0%

Groupcache源码阅读 (3) Getter、PeerPicker和Group

Getter 从其他数据源加载数据

Getter 是一个接口,用于从别处(比如数据库中)加载数据。它只定义了一个函数 Get,用于根据 key 来加载数据。

1
2
3
4
5
6
7
8
9
10
// A Getter loads data for a key.
type Getter interface {
// Get returns the value identified by key, populating dest.
//
// The returned data must be unversioned. That is, key must
// uniquely describe the loaded data, without an implicit
// current time, and without relying on cache expiration
// mechanisms.
Get(ctx context.Context, key string, dest Sink) error
}

Sink 是一个接口,用于接收从缓存中 Get 到的数据

GetterFunc

GetterFunc 是接口型函数,这是为了向接口参数里传函数,就让函数继承了接口。

1
2
3
4
5
6
// A GetterFunc implements Getter with a function.
type GetterFunc func(ctx context.Context, key string, dest Sink) error

func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error {
return f(ctx, key, dest)
}

PeerPicker 选择节点

PeerPicker 是一个接口,用于实现根据一个 key 定位节点(根据一致性哈希,确定从哪一个分布式节点中获取数据)。

1
2
3
4
5
6
7
8
// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
// PickPeer returns the peer that owns the specific key
// and true to indicate that a remote peer was nominated.
// It returns nil, false if the key owner is the current peer.
PickPeer(key string) (peer ProtoGetter, ok bool)
}

ProtoGetter 是一个接口,用于实现节点之间的 protobuf 通信。

Group - 缓存数据库

一个 Group 可以看作是一个数据库

  • name:数据库名。
  • getter:从其他数据源获取数据。
  • peersOnce:用于保证 peers 属性只被初始化一次。
  • peers:用于获取从哪一个节点中获取数据。
  • cacheBytes:主缓存和热点数据缓存的最大容量。
  • mainCache:主缓存。
  • hotCache:热点数据缓存,若是从其他节点中获取到数据,有概率被存储到热点数据缓存中。
  • loadGroup:用于保证并发的每一个 key 只有一个 caller 去执行,用于 singleflight。
  • Stats:统计数据,用于统计命中数量、请求数量等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// A Group is a cache namespace and associated data loaded spread over
// a group of 1 or more machines.
type Group struct {
name string
getter Getter
peersOnce sync.Once
peers PeerPicker
cacheBytes int64 // limit for sum of mainCache and hotCache size

// mainCache is a cache of the keys for which this process
// (amongst its peers) is authoritative. That is, this cache
// contains keys which consistent hash on to this process's
// peer number.
mainCache cache

// hotCache contains keys/values for which this peer is not
// authoritative (otherwise they would be in mainCache), but
// are popular enough to warrant mirroring in this process to
// avoid going over the network to fetch from a peer. Having
// a hotCache avoids network hotspotting, where a peer's
// network card could become the bottleneck on a popular key.
// This cache is used sparingly to maximize the total number
// of key/value pairs that can be stored globally.
hotCache cache

// loadGroup ensures that each key is only fetched once
// (either locally or remotely), regardless of the number of
// concurrent callers.
loadGroup flightGroup

_ int32 // force Stats to be 8-byte aligned on 32-bit platforms

// Stats are statistics on the group.
Stats Stats
}

Get 查询缓存

Group 的核心方法就是 Get 方法,Get 方法根据 key 获取相应的缓存数据,并且将数据传给 dest

1
func (g *Group) Get(ctx context.Context, key string, dest Sink)

其流程为:

  • 首先保证 peers 被初始化,且仅被初始化一次
1
2
g.peersOnce.Do(g.initPeers)
g.Stats.Gets.Add(1)
  • 本地的主缓存和热点数据缓存中查询 key,若存在数据则缓存命中数加 1并且返回。
1
2
3
4
5
6
value, cacheHit := g.lookupCache(key)

if cacheHit {
g.Stats.CacheHits.Add(1)
return setSinkView(dest, value)
}
  • 若不存在,则 调用 load 加载数据。load 可能从本地或者远程节点中加载数据。加载数据后,返回结果。
1
2
3
4
5
6
7
8
9
destPopulated := false
value, destPopulated, err := g.load(ctx, key, dest)
if err != nil {
return err
}
if destPopulated {
return nil
}
return setSinkView(dest, value)

load 加载数据

load 函数用于加载数据,其流程为:

  • 调用 loadGroup 中的 Do 方法,保证 singleflight。Do 方法中的流程如下:
    • 本地主缓存和热点数据缓存中查询,查询到了就直接返回。
    • 利用 peers 的 PickPeer 方法查询应该从哪个节点中获取数据。若从远程节点中获取数据,则调用 getFromPeer 方法从远程获取
    • 否则就从本地获取数据,并且将本地得到的数据加载到主缓存中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
g.Stats.Loads.Add(1)
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// Check the cache again because singleflight can only dedup calls
// that overlap concurrently. It's possible for 2 concurrent
// requests to miss the cache, resulting in 2 load() calls. An
// unfortunate goroutine scheduling would result in this callback
// being run twice, serially. If we don't check the cache again,
// cache.nbytes would be incremented below even though there will
// be only one entry for this key.
//
// Consider the following serialized event ordering for two
// goroutines in which this callback gets called twice for the
// same key:
// 1: Get("key")
// 2: Get("key")
// 1: lookupCache("key")
// 2: lookupCache("key")
// 1: load("key")
// 2: load("key")
// 1: loadGroup.Do("key", fn)
// 1: fn()
// 2: loadGroup.Do("key", fn)
// 2: fn()
if value, cacheHit := g.lookupCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return value, nil
}
g.Stats.LoadsDeduped.Add(1)
var value ByteView
var err error
if peer, ok := g.peers.PickPeer(key); ok {
value, err = g.getFromPeer(ctx, peer, key)
if err == nil {
g.Stats.PeerLoads.Add(1)
return value, nil
}
g.Stats.PeerErrors.Add(1)
// TODO(bradfitz): log the peer's error? keep
// log of the past few for /groupcachez? It's
// probably boring (normal task movement), so not
// worth logging I imagine.
}
value, err = g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.LocalLoadErrs.Add(1)
return nil, err
}
g.Stats.LocalLoads.Add(1)
destPopulated = true // only one caller of load gets this return value
g.populateCache(key, value, &g.mainCache)
return value, nil
})
  • 等待 Do 方法返回的数据,得到数据后,结束加载。

getLocally 从本地加载数据

getLocally 从本地加载数据,它通过调用 getter.Get 方法,在本地从别的数据源(如数据库)中加载数据

1
2
3
4
5
6
7
func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) {
err := g.getter.Get(ctx, key, dest)
if err != nil {
return ByteView{}, err
}
return dest.view()
}

getFromPeer 从远程节点加载数据

getFromPeer 从远程节点中加载数据。

1
func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error)

其流程为:

  • 构造 protobuf 请求结构体,包括 Group name 和 key,接着使用 protobuf 向远程节点发送请求
1
2
3
4
5
6
7
8
9
10
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
}
res := &pb.GetResponse{}
err := peer.Get(ctx, req, res)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: res.Value}
  • 若从远程节点中获取到了结果,则有十分之一的概率被加载到热点数据缓存中。
1
2
3
if rand.Intn(10) == 0 {
g.populateCache(key, value, &g.hotCache)
}