在 pilot Server 初始化过程中,在初始化 ConfigController 和 ServiceController 之后,利用 initRegistryEventHandlers 方法注册了配置和服务变化事件的处理 handler 。这其中头两个重要步骤:
监听 到配置/服务的变化(在 Kube 中为 Informer)。
针对这些变化做出响应 (注册 Handler)。
监听配置变化 在使用 Kubernetes 作为 ConfigController 时,使用 crdclient.Client 作为 ConfigController 的实现,在 crdclient.Client 中为每一个 istio 定义的 CRD 配置开启了一个 Informer 用于监听 CRD 配置信息的变化 。
Kubernetes Informer 机制本质上就是一个 Kubernetes 资源缓存 ,只依赖于 K8S 的 List 和 Watch API,只有初始化时使用 List,之后利用 Watch 更新缓存资源。
当 Informer 监听到配置信息发生变化 (新增,更新,删除)后,通过 EventHandler (Client.onEvent 方法)响应配置信息变化事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 kc.AddEventHandler(controllers.EventHandler[controllers.Object]{ AddFunc: func (obj controllers.Object) { incrementEvent(kind, "add" ) cl.queue.Push(func () error { cl.onEvent(resourceGVK, nil , obj, model.EventAdd) return nil }) }, UpdateFunc: func (old, cur controllers.Object) { incrementEvent(kind, "update" ) cl.queue.Push(func () error { cl.onEvent(resourceGVK, old, cur, model.EventUpdate) return nil }) }, DeleteFunc: func (obj controllers.Object) { incrementEvent(kind, "delete" ) cl.queue.Push(func () error { cl.onEvent(resourceGVK, nil , obj, model.EventDelete) return nil }) }, })
onEvent 方法 在 onEvent 方法中,会调用 GVK 对应的所有 handlers ,依次对配置信息的更新事件进行响应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (cl *Client) onEvent (resourceGVK config.GroupVersionKind, old controllers.Object, curr controllers.Object, event model.Event) { currItem := controllers.ExtractObject(curr) if currItem == nil { return } currConfig := TranslateObject(currItem, resourceGVK, cl.domainSuffix) var oldConfig config.Config if old != nil { oldConfig = TranslateObject(old, resourceGVK, cl.domainSuffix) } for _, f := range cl.handlers[resourceGVK] { f(oldConfig, currConfig, event) } }
注册 EventHandler 在初始化过程中,依次对每一种 Istio CRD 注册 EventHandler (除了 ServiceEntry 和 WorkloadEntry ,这两个 CRD 是通过 ServiceController 响应更新的)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 configHandler := func (prev config.Config, curr config.Config, event model.Event) { defer func () { if event != model.EventDelete { s.statusReporter.AddInProgressResource(curr) } else { s.statusReporter.DeleteInProgressResource(curr) } }() log.Debugf("Handle event %s for configuration %s" , event, curr.Key()) if event == model.EventUpdate && !needsPush(prev, curr) { log.Debugf("skipping push for %s as spec has not changed" , prev.Key()) return } pushReq := &model.PushRequest{ Full: true , ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.MustFromGVK(curr.GroupVersionKind), Name: curr.Name, Namespace: curr.Namespace}), Reason: model.NewReasonStats(model.ConfigUpdate), } s.XDSServer.ConfigUpdate(pushReq) } schemas := collections.Pilot.All() if features.EnableGatewayAPI { schemas = collections.PilotGatewayAPI().All() } for _, schema := range schemas { if schema.GroupVersionKind() == gvk.ServiceEntry { continue } if schema.GroupVersionKind() == gvk.WorkloadEntry { continue } if schema.GroupVersionKind() == gvk.WorkloadGroup { continue } s.configController.RegisterEventHandler(schema.GroupVersionKind(), configHandler) }
crdclient.Client.RegisterEventHandler 方法:
1 2 3 func (cl *Client) RegisterEventHandler (kind config.GroupVersionKind, handler model.EventHandler) { cl.handlers[kind] = append (cl.handlers[kind], handler) }
监听服务变化 添加注册中心 Registry 在 pilot-discovery Server 初始化时,添加了服务注册中心,istio 主要添加了两种注册中心:
ServiceEntry Registry: 用于管理集群外 服务。
Kube Registry: 用于管理集群内 服务。
aggregate.Controller(聚合注册中心,istio 可以实现多种注册中心的组合)在添加注册中心 时,使用 AddRegistry 方法 。在添加时:添加了 istio 服务注册变化通知的 handler(NotifyServiceHandlers )。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (c *Controller) AddRegistry (registry serviceregistry.Instance) { c.storeLock.Lock() defer c.storeLock.Unlock() c.addRegistry(registry, nil ) } func (c *Controller) addRegistry (registry serviceregistry.Instance, stop <-chan struct {}) { c.registries = append (c.registries, ®istryEntry{Instance: registry, stop: stop}) registry.AppendNetworkGatewayHandler(c.NotifyGatewayHandlers) registry.AppendServiceHandler(c.handlers.NotifyServiceHandlers) registry.AppendServiceHandler(func (prev, curr *model.Service, event model.Event) { for _, handlers := range c.getClusterHandlers() { handlers.NotifyServiceHandlers(prev, curr, event) } }) }
KubeRegistry 监听资源变化 在 KubeRegistry 中,维护了 Endpoint、Service、Node、Pod 的 Informer ,用于监听这些Kubernetes资源的变化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 if c.opts.SystemNamespace != "" { registerHandlers[*v1.Namespace]( c, c.namespaces, "Namespaces" , func (old *v1.Namespace, cur *v1.Namespace, event model.Event) error { if cur.Name == c.opts.SystemNamespace { return c.onSystemNamespaceEvent(old, cur, event) } return nil }, nil , ) } if !c.opts.ConfigCluster || c.opts.DiscoveryNamespacesFilter == nil { c.opts.DiscoveryNamespacesFilter = namespace.NewDiscoveryNamespacesFilter(c.namespaces, options.MeshWatcher.Mesh().DiscoverySelectors) } c.initDiscoveryHandlers(options.MeshWatcher, c.opts.DiscoveryNamespacesFilter) c.services = kclient.NewFiltered[*v1.Service](kubeClient, kclient.Filter{ObjectFilter: c.opts.DiscoveryNamespacesFilter.Filter}) registerHandlers[*v1.Service](c, c.services, "Services" , c.onServiceEvent, nil ) c.endpoints = newEndpointSliceController(c) c.nodes = kclient.NewFiltered[*v1.Node](kubeClient, kclient.Filter{ObjectTransform: kubelib.StripNodeUnusedFields}) registerHandlers[*v1.Node](c, c.nodes, "Nodes" , c.onNodeEvent, nil ) c.podsClient = kclient.NewFiltered[*v1.Pod](kubeClient, kclient.Filter{ ObjectFilter: c.opts.DiscoveryNamespacesFilter.Filter, ObjectTransform: kubelib.StripPodUnusedFields, }) c.pods = newPodCache(c, c.podsClient, func (key types.NamespacedName) { c.queue.Push(func () error { return c.endpoints.sync(key.Name, key.Namespace, model.EventAdd, true ) }) }) registerHandlers[*v1.Pod](c, c.podsClient, "Pods" , c.pods.onEvent, c.pods.labelFilter)
注册 Handler KubeRegistry 直接使用 Informer 来监听 Service、Pod 等 K8S Resource 的变化,Service 资源通过 ControllerHandlers 来响应。
而 ServiceEntry 通过将 Event Handler 注册到 ConfigController,来实现对 ServiceEntry 和 Workload CRD 资源更新的响应。
注册到 ControllerHandlers ControllerHandlers 管理 Istio Service 和 Workload handler,NotifyXxxHandlers 定义了当遇到 Xxx 更新事件时,执行所有定义的 Handelrs 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type ControllerHandlers struct { mutex sync.RWMutex serviceHandlers []ServiceHandler workloadHandlers []func (*WorkloadInstance, Event) } func (c *ControllerHandlers) NotifyServiceHandlers (prev, curr *Service, event Event) { for _, f := range c.GetServiceHandlers() { f(prev, curr, event) } } func (c *ControllerHandlers) NotifyWorkloadHandlers (w *WorkloadInstance, event Event) { for _, f := range c.GetWorkloadHandlers() { f(w, event) } }
ServiceController 使用 AppendServiceHandler 方法注册服务变化事件的 handler (该 handler 会注册到 ControllerHandlers 中),当检测到服务发生变化时,向 XDSServer 的 pushChannel 中推送一条 PushRequest。
1 2 3 4 5 6 7 8 9 serviceHandler := func (prev, curr *model.Service, event model.Event) { pushReq := &model.PushRequest{ Full: true , ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: string (curr.Hostname), Namespace: curr.Attributes.Namespace}), Reason: model.NewReasonStats(model.ServiceUpdate), } s.XDSServer.ConfigUpdate(pushReq) } s.ServiceController().AppendServiceHandler(serviceHandler)
注册到 ConfigController 所有 ServiceEntry 配置数据在 ConfigController 中 ,在初始化 ServiceEntryRegistry 时:将 ServiceEntry 和 WorkloadEntry CRD 配置变化 handler 注册到 ConfigController (通过 Informer 的 EventHandler)。
serviceEntryHandler 检测 ServiceEntry 变化。
workloadEntryHandler 检测 WorkloadEntry 变化。
1 2 3 4 5 6 7 8 9 10 11 func NewController (configController model.ConfigStoreController, xdsUpdater model.XDSUpdater, options ...Option, ) *Controller { s := newController(configController, xdsUpdater, options...) if configController != nil { configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler) configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler) } return s }