Watchdog 什么是 Watchdog 在 Kuma KDS 中,当 Zone CP 和 Global CP 建立 streaming 连接后 ,需要推送资源的一方在每一条连接开启一个 Watchdog。
Watchdog 用于收集变化的资源,并定期更新 xds-cache(xds-cache 的更新会引发 xds 推送),完成从 Global CP 与 Zone CP 之间的被更新资源的主动推送过程 。
Watchdog 数据结构 Watchdog 的数据结构如下:
Node:表示一个 envoy 节点,这里是一个 Zone CP 节点。
EventBus:事件总线,Watchdog 通过事件总线订阅资源更新的事件。
Reconciler:调和器,用于重新计算配置资源,并更新 xds-cache。
ProvidedTypes:需要同步的所有资源类型。
1 2 3 4 5 6 7 8 9 10 type EventBasedWatchdog struct { Ctx context.Context Node *envoy_core.Node EventBus events.EventBus Reconciler reconcile.Reconciler ProvidedTypes map [model.ResourceType]struct {} Log logr.Logger NewFlushTicker func () *time .Ticker NewFullResyncTicker func () *time .Ticker }
流程 在 Start 方法中定义了 Watchdog 的工作流程:
首先,从事件总线中订阅 ResourceChangedEvent 资源更新事件 。
1 2 3 4 5 6 7 8 9 10 listener := e.EventBus.Subscribe(func (event events.Event) bool { resChange, ok := event.(events.ResourceChangedEvent) if !ok { return false } if _, ok := e.ProvidedTypes[resChange.Type]; !ok { return false } return true })
changedTypes 记录已经修改的资源,第一次同步的是全量的资源 。
1 2 3 4 5 changedTypes := maps.Clone(e.ProvidedTypes) reasons := map [string ]struct {}{ ReasonResync: {}, }
开启两个定时器 flushTicker 和 fullResyncTicker 。flushTicker 用于定期向 xds-cache 中同步被修改的资源 ;fullResyncTicker 用于定期向 xds-cache 进行全量 的资源同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 for { select { case <-stop: if err := e.Reconciler.Clear(e.Ctx, e.Node); err != nil { e.Log.Error(err, "reconcile clear failed" ) } listener.Close() return case <-flushTicker.C: if len (changedTypes) == 0 { continue } reason := strings.Join(util_maps.SortedKeys(reasons), "_and_" ) e.Log.V(1 ).Info("reconcile" , "changedTypes" , changedTypes, "reason" , reason) err, _ := e.Reconciler.Reconcile(e.Ctx, e.Node, changedTypes, e.Log) if err != nil && errors.Is(err, context.Canceled) { e.Log.Error(err, "reconcile failed" , "changedTypes" , changedTypes, "reason" , reason) } else { changedTypes = map [model.ResourceType]struct {}{} reasons = map [string ]struct {}{} } case <-fullResyncTicker.C: e.Log.V(1 ).Info("schedule full resync" ) changedTypes = maps.Clone(e.ProvidedTypes) reasons[ReasonResync] = struct {}{} case event := <-listener.Recv(): resChange := event.(events.ResourceChangedEvent) e.Log.V(1 ).Info("schedule sync for type" , "typ" , resChange.Type) changedTypes[resChange.Type] = struct {}{} reasons[ReasonEvent] = struct {}{} } }
Reconciler 调和器 Reconciler 用于重新计算给定 node 的配置资源信息,并同步到 xds-cache 中,接口定义如下:
Reconcile 方法: 调和已经修改的资源类型,从老的 Snapshot 中取未改变的资源,再重新查询已经改变的资源,并以此创建新的 Snapshot 并更新给 xds-cache。
Clear 方法:在 xds-cache 中清理 Snapshot,在 streaming 结束时调用。
1 2 3 4 5 6 7 type Reconciler interface { Reconcile(context.Context, *envoy_core.Node, map [model.ResourceType]struct {}, logr.Logger) (error, bool ) Clear(context.Context, *envoy_core.Node) error }
Reconcile 方法 Reconcile 方法调和已经修改的资源类型,从老的 Snapshot 中取未改变的资源,再重新查询已经改变的资源,并以此创建新的 Snapshot 并更新给 xds-cache 。具体流程如下:
首先从 xds-cache 中查询老的 Snapshot。
1 2 id := r.hasher.ID(node) old, _ := r.cache.GetSnapshot(id)
对于未变更的资源 ,直接从老的 Snapshot 中获取 。接着调用 r.generator.GenerateSnapshot
方法,重新查询已经修改的资源 ,并根据已变更和未变更的资源创建新的 Snapshot 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 builder := cache.NewSnapshotBuilder() if old != nil { for _, typ := range util_dds.GetSupportedTypes() { resType := core_model.ResourceType(typ) if _, ok := changedTypes[resType]; ok { continue } oldRes := old.GetResources(typ) if len (oldRes) > 0 { builder = builder.With(resType, maps.Values(oldRes)) } } } new , err := r.generator.GenerateSnapshot(ctx, node, builder, changedTypes)if err != nil { return err, false } if new == nil { return errors.New("nil snapshot" ), false }
若新的 Snapshot 相比于老的发生了修改 ,则同步 到 xds-cache 中。
1 2 3 4 5 6 7 new , changed := r.Version(new , old)if changed { r.logChanges(logger, new , old, node) r.meterConfigReadyForDelivery(new , old, node.Id) return r.cache.SetSnapshot(ctx, id, new ), true } return nil , false