Watchdog 什么是 Watchdog 在 Kuma KDS 中,当 Zone CP 和 Global CP 建立 streaming 连接后 ,需要推送资源的一方在每一条连接开启一个 Watchdog。
Watchdog 用于收集变化的资源,并定期更新 xds-cache(xds-cache 的更新会引发 xds 推送),完成从 Global CP 与 Zone CP 之间的被更新资源的主动推送过程 。
Watchdog 数据结构 Watchdog 的数据结构如下:
Node:表示一个 envoy 节点,这里是一个 Zone CP 节点。 
EventBus:事件总线,Watchdog 通过事件总线订阅资源更新的事件。 
Reconciler:调和器,用于重新计算配置资源,并更新 xds-cache。 
ProvidedTypes:需要同步的所有资源类型。 
 
1 2 3 4 5 6 7 8 9 10 type  EventBasedWatchdog struct  {   Ctx                 context.Context    Node                *envoy_core.Node    EventBus            events.EventBus    Reconciler          reconcile.Reconciler    ProvidedTypes       map [model.ResourceType]struct {}    Log                 logr.Logger    NewFlushTicker      func ()  *time .Ticker     NewFullResyncTicker func ()  *time .Ticker  } 
流程 在 Start 方法中定义了 Watchdog 的工作流程:
首先,从事件总线中订阅 ResourceChangedEvent 资源更新事件 。 
 
1 2 3 4 5 6 7 8 9 10 listener := e.EventBus.Subscribe(func (event events.Event)  bool     resChange, ok := event.(events.ResourceChangedEvent)    if  !ok {       return  false     }    if  _, ok := e.ProvidedTypes[resChange.Type]; !ok {       return  false     }    return  true  }) 
changedTypes 记录已经修改的资源,第一次同步的是全量的资源 。 
 
1 2 3 4 5 changedTypes := maps.Clone(e.ProvidedTypes) reasons := map [string ]struct {}{    ReasonResync: {}, } 
开启两个定时器 flushTicker 和 fullResyncTicker 。flushTicker 用于定期向 xds-cache 中同步被修改的资源 ;fullResyncTicker 用于定期向 xds-cache 进行全量 的资源同步。 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 for  {   select  {    case  <-stop:       if  err := e.Reconciler.Clear(e.Ctx, e.Node); err != nil  {          e.Log.Error(err, "reconcile clear failed" )       }       listener.Close()       return     case  <-flushTicker.C:       if  len (changedTypes) == 0  {          continue        }       reason := strings.Join(util_maps.SortedKeys(reasons), "_and_" )       e.Log.V(1 ).Info("reconcile" , "changedTypes" , changedTypes, "reason" , reason)       err, _ := e.Reconciler.Reconcile(e.Ctx, e.Node, changedTypes, e.Log)       if  err != nil  && errors.Is(err, context.Canceled) {          e.Log.Error(err, "reconcile failed" , "changedTypes" , changedTypes, "reason" , reason)       } else  {          changedTypes = map [model.ResourceType]struct {}{}          reasons = map [string ]struct {}{}       }    case  <-fullResyncTicker.C:       e.Log.V(1 ).Info("schedule full resync" )       changedTypes = maps.Clone(e.ProvidedTypes)       reasons[ReasonResync] = struct {}{}    case  event := <-listener.Recv():       resChange := event.(events.ResourceChangedEvent)       e.Log.V(1 ).Info("schedule sync for type" , "typ" , resChange.Type)       changedTypes[resChange.Type] = struct {}{}       reasons[ReasonEvent] = struct {}{}    } } 
Reconciler 调和器 Reconciler 用于重新计算给定 node 的配置资源信息,并同步到 xds-cache 中,接口定义如下:
Reconcile 方法: 调和已经修改的资源类型,从老的 Snapshot 中取未改变的资源,再重新查询已经改变的资源,并以此创建新的 Snapshot 并更新给 xds-cache。Clear 方法:在 xds-cache 中清理 Snapshot,在 streaming 结束时调用。 
 
1 2 3 4 5 6 7 type  Reconciler interface  {           Reconcile(context.Context, *envoy_core.Node, map [model.ResourceType]struct {}, logr.Logger) (error, bool )    Clear(context.Context, *envoy_core.Node) error } 
Reconcile 方法 Reconcile 方法调和已经修改的资源类型,从老的 Snapshot 中取未改变的资源,再重新查询已经改变的资源,并以此创建新的 Snapshot 并更新给 xds-cache 。具体流程如下:
首先从 xds-cache 中查询老的 Snapshot。 
 
1 2 id := r.hasher.ID(node) old, _ := r.cache.GetSnapshot(id) 
对于未变更的资源 ,直接从老的 Snapshot 中获取 。接着调用 r.generator.GenerateSnapshot 方法,重新查询已经修改的资源 ,并根据已变更和未变更的资源创建新的 Snapshot 。 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 builder := cache.NewSnapshotBuilder() if  old != nil  {   for  _, typ := range  util_dds.GetSupportedTypes() {       resType := core_model.ResourceType(typ)       if  _, ok := changedTypes[resType]; ok {          continue        }       oldRes := old.GetResources(typ)       if  len (oldRes) > 0  {          builder = builder.With(resType, maps.Values(oldRes))       }    } } new , err := r.generator.GenerateSnapshot(ctx, node, builder, changedTypes)if  err != nil  {   return  err, false  } if  new  == nil  {   return  errors.New("nil snapshot" ), false  } 
若新的 Snapshot 相比于老的发生了修改 ,则同步 到 xds-cache 中。 
 
1 2 3 4 5 6 7 new , changed := r.Version(new , old)if  changed {   r.logChanges(logger, new , old, node)    r.meterConfigReadyForDelivery(new , old, node.Id)    return  r.cache.SetSnapshot(ctx, id, new ), true  } return  nil , false