Dawn's Blogs

分享技术 记录成长

0%

Kuma学习笔记 (8) KDS Watchdog

Watchdog

什么是 Watchdog

在 Kuma KDS 中,当 Zone CP 和 Global CP 建立 streaming 连接后,需要推送资源的一方在每一条连接开启一个 Watchdog。

Watchdog 用于收集变化的资源,并定期更新 xds-cache(xds-cache 的更新会引发 xds 推送),完成从 Global CP 与 Zone CP 之间的被更新资源的主动推送过程

image-20240308110246222

Watchdog 数据结构

Watchdog 的数据结构如下:

  • Node:表示一个 envoy 节点,这里是一个 Zone CP 节点。
  • EventBus:事件总线,Watchdog 通过事件总线订阅资源更新的事件。
  • Reconciler:调和器,用于重新计算配置资源,并更新 xds-cache。
  • ProvidedTypes:需要同步的所有资源类型。
1
2
3
4
5
6
7
8
9
10
type EventBasedWatchdog struct {
Ctx context.Context
Node *envoy_core.Node
EventBus events.EventBus
Reconciler reconcile.Reconciler
ProvidedTypes map[model.ResourceType]struct{}
Log logr.Logger
NewFlushTicker func() *time.Ticker
NewFullResyncTicker func() *time.Ticker
}

流程

在 Start 方法中定义了 Watchdog 的工作流程:

  1. 首先,从事件总线中订阅 ResourceChangedEvent 资源更新事件
1
2
3
4
5
6
7
8
9
10
listener := e.EventBus.Subscribe(func(event events.Event) bool {
resChange, ok := event.(events.ResourceChangedEvent)
if !ok {
return false
}
if _, ok := e.ProvidedTypes[resChange.Type]; !ok {
return false
}
return true
})
  1. changedTypes 记录已经修改的资源,第一次同步的是全量的资源
1
2
3
4
5
// for the first reconcile assign all types
changedTypes := maps.Clone(e.ProvidedTypes)
reasons := map[string]struct{}{
ReasonResync: {},
}
  1. 开启两个定时器 flushTicker 和 fullResyncTicker。flushTicker 用于定期向 xds-cache 中同步被修改的资源;fullResyncTicker 用于定期向 xds-cache 进行全量的资源同步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for {
select {
case <-stop:
if err := e.Reconciler.Clear(e.Ctx, e.Node); err != nil {
e.Log.Error(err, "reconcile clear failed")
}
listener.Close()
return
case <-flushTicker.C:
if len(changedTypes) == 0 {
continue
}
reason := strings.Join(util_maps.SortedKeys(reasons), "_and_")
e.Log.V(1).Info("reconcile", "changedTypes", changedTypes, "reason", reason)
err, _ := e.Reconciler.Reconcile(e.Ctx, e.Node, changedTypes, e.Log)
if err != nil && errors.Is(err, context.Canceled) {
e.Log.Error(err, "reconcile failed", "changedTypes", changedTypes, "reason", reason)
} else {
changedTypes = map[model.ResourceType]struct{}{}
reasons = map[string]struct{}{}
}
case <-fullResyncTicker.C:
e.Log.V(1).Info("schedule full resync")
changedTypes = maps.Clone(e.ProvidedTypes)
reasons[ReasonResync] = struct{}{}
case event := <-listener.Recv():
resChange := event.(events.ResourceChangedEvent)
e.Log.V(1).Info("schedule sync for type", "typ", resChange.Type)
changedTypes[resChange.Type] = struct{}{}
reasons[ReasonEvent] = struct{}{}
}
}

Reconciler 调和器

Reconciler 用于重新计算给定 node 的配置资源信息,并同步到 xds-cache 中,接口定义如下:

  • Reconcile 方法:调和已经修改的资源类型,从老的 Snapshot 中取未改变的资源,再重新查询已经改变的资源,并以此创建新的 Snapshot 并更新给 xds-cache。
  • Clear 方法:在 xds-cache 中清理 Snapshot,在 streaming 结束时调用。
1
2
3
4
5
6
7
// Reconciler re-computes configuration for a given node.
type Reconciler interface {
// Reconcile reconciles state of node given changed resource types.
// Returns error and bool which is true if any resource was changed.
Reconcile(context.Context, *envoy_core.Node, map[model.ResourceType]struct{}, logr.Logger) (error, bool)
Clear(context.Context, *envoy_core.Node) error
}

Reconcile 方法

Reconcile 方法调和已经修改的资源类型,从老的 Snapshot 中取未改变的资源,再重新查询已经改变的资源,并以此创建新的 Snapshot 并更新给 xds-cache。具体流程如下:

  1. 首先从 xds-cache 中查询老的 Snapshot。
1
2
id := r.hasher.ID(node)
old, _ := r.cache.GetSnapshot(id)
  1. 对于未变更的资源,直接从老的 Snapshot 中获取。接着调用 r.generator.GenerateSnapshot 方法,重新查询已经修改的资源,并根据已变更和未变更的资源创建新的 Snapshot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// construct builder with unchanged types from the old snapshot
builder := cache.NewSnapshotBuilder()
if old != nil {
for _, typ := range util_dds.GetSupportedTypes() {
resType := core_model.ResourceType(typ)
if _, ok := changedTypes[resType]; ok {
continue
}

oldRes := old.GetResources(typ)
if len(oldRes) > 0 {
builder = builder.With(resType, maps.Values(oldRes))
}
}
}

new, err := r.generator.GenerateSnapshot(ctx, node, builder, changedTypes)
if err != nil {
return err, false
}
if new == nil {
return errors.New("nil snapshot"), false
}
  1. 若新的 Snapshot 相比于老的发生了修改,则同步到 xds-cache 中。
1
2
3
4
5
6
7
new, changed := r.Version(new, old)
if changed {
r.logChanges(logger, new, old, node)
r.meterConfigReadyForDelivery(new, old, node.Id)
return r.cache.SetSnapshot(ctx, id, new), true
}
return nil, false