Dawn's Blogs

分享技术 记录成长

0%

Istio学习笔记 (7) Pilot 注册配置和服务变化Handler

在 pilot Server 初始化过程中,在初始化 ConfigController 和 ServiceController 之后,利用 initRegistryEventHandlers 方法注册了配置和服务变化事件的处理 handler。这其中头两个重要步骤:

  • 监听到配置/服务的变化(在 Kube 中为 Informer)。
  • 针对这些变化做出响应(注册 Handler)。

监听配置变化

在使用 Kubernetes 作为 ConfigController 时,使用 crdclient.Client 作为 ConfigController 的实现,在 crdclient.Client 中为每一个 istio 定义的 CRD 配置开启了一个 Informer 用于监听 CRD 配置信息的变化

Informer 机制

Kubernetes Informer 机制本质上就是一个 Kubernetes 资源缓存,只依赖于 K8S 的 List 和 Watch API,只有初始化时使用 List,之后利用 Watch 更新缓存资源。

当 Informer 监听到配置信息发生变化(新增,更新,删除)后,通过 EventHandler (Client.onEvent 方法)响应配置信息变化事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kc.AddEventHandler(controllers.EventHandler[controllers.Object]{
AddFunc: func(obj controllers.Object) {
incrementEvent(kind, "add")
cl.queue.Push(func() error {
cl.onEvent(resourceGVK, nil, obj, model.EventAdd)
return nil
})
},
UpdateFunc: func(old, cur controllers.Object) {
incrementEvent(kind, "update")
cl.queue.Push(func() error {
cl.onEvent(resourceGVK, old, cur, model.EventUpdate)
return nil
})
},
DeleteFunc: func(obj controllers.Object) {
incrementEvent(kind, "delete")
cl.queue.Push(func() error {
cl.onEvent(resourceGVK, nil, obj, model.EventDelete)
return nil
})
},
})

onEvent 方法

在 onEvent 方法中,会调用 GVK 对应的所有 handlers,依次对配置信息的更新事件进行响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (cl *Client) onEvent(resourceGVK config.GroupVersionKind, old controllers.Object, curr controllers.Object, event model.Event) {
currItem := controllers.ExtractObject(curr)
if currItem == nil {
return
}

currConfig := TranslateObject(currItem, resourceGVK, cl.domainSuffix)

var oldConfig config.Config
if old != nil {
oldConfig = TranslateObject(old, resourceGVK, cl.domainSuffix)
}

for _, f := range cl.handlers[resourceGVK] {
f(oldConfig, currConfig, event)
}
}

注册 EventHandler

在初始化过程中,依次对每一种 Istio CRD 注册 EventHandler除了 ServiceEntry 和 WorkloadEntry,这两个 CRD 是通过 ServiceController 响应更新的)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
configHandler := func(prev config.Config, curr config.Config, event model.Event) {
defer func() {
if event != model.EventDelete {
s.statusReporter.AddInProgressResource(curr)
} else {
s.statusReporter.DeleteInProgressResource(curr)
}
}()
log.Debugf("Handle event %s for configuration %s", event, curr.Key())
// For update events, trigger push only if spec has changed.
if event == model.EventUpdate && !needsPush(prev, curr) {
log.Debugf("skipping push for %s as spec has not changed", prev.Key())
return
}
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.MustFromGVK(curr.GroupVersionKind), Name: curr.Name, Namespace: curr.Namespace}),
Reason: model.NewReasonStats(model.ConfigUpdate),
}
s.XDSServer.ConfigUpdate(pushReq)
}
schemas := collections.Pilot.All()
if features.EnableGatewayAPI {
schemas = collections.PilotGatewayAPI().All()
}
for _, schema := range schemas {
// This resource type was handled in external/servicediscovery.go, no need to rehandle here.
if schema.GroupVersionKind() == gvk.ServiceEntry {
continue
}
if schema.GroupVersionKind() == gvk.WorkloadEntry {
continue
}
if schema.GroupVersionKind() == gvk.WorkloadGroup {
continue
}

s.configController.RegisterEventHandler(schema.GroupVersionKind(), configHandler)
}

crdclient.Client.RegisterEventHandler 方法:

1
2
3
func (cl *Client) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) {
cl.handlers[kind] = append(cl.handlers[kind], handler)
}

监听服务变化

添加注册中心 Registry

在 pilot-discovery Server 初始化时,添加了服务注册中心,istio 主要添加了两种注册中心:

  • ServiceEntry Registry:用于管理集群服务。
  • Kube Registry:用于管理集群服务。

aggregate.Controller(聚合注册中心,istio 可以实现多种注册中心的组合)在添加注册中心时,使用 AddRegistry 方法。在添加时:添加了 istio 服务注册变化通知的 handler(NotifyServiceHandlers)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (c *Controller) AddRegistry(registry serviceregistry.Instance) {
c.storeLock.Lock()
defer c.storeLock.Unlock()
c.addRegistry(registry, nil)
}

func (c *Controller) addRegistry(registry serviceregistry.Instance, stop <-chan struct{}) {
c.registries = append(c.registries, &registryEntry{Instance: registry, stop: stop})

// Observe the registry for events.
registry.AppendNetworkGatewayHandler(c.NotifyGatewayHandlers)
registry.AppendServiceHandler(c.handlers.NotifyServiceHandlers)
registry.AppendServiceHandler(func(prev, curr *model.Service, event model.Event) {
for _, handlers := range c.getClusterHandlers() {
handlers.NotifyServiceHandlers(prev, curr, event)
}
})
}

KubeRegistry 监听资源变化

在 KubeRegistry 中,维护了 Endpoint、Service、Node、Pod 的 Informer,用于监听这些Kubernetes资源的变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
if c.opts.SystemNamespace != "" {
registerHandlers[*v1.Namespace](
c,
c.namespaces,
"Namespaces",
func(old *v1.Namespace, cur *v1.Namespace, event model.Event) error {
if cur.Name == c.opts.SystemNamespace {
return c.onSystemNamespaceEvent(old, cur, event)
}
return nil
},
nil,
)
}

// always init for each cluster, otherwise different ns labels in different cluster may not take effect,
// but we skip it for configCluster which has been initiated before
if !c.opts.ConfigCluster || c.opts.DiscoveryNamespacesFilter == nil {
c.opts.DiscoveryNamespacesFilter = namespace.NewDiscoveryNamespacesFilter(c.namespaces, options.MeshWatcher.Mesh().DiscoverySelectors)
}
c.initDiscoveryHandlers(options.MeshWatcher, c.opts.DiscoveryNamespacesFilter)

c.services = kclient.NewFiltered[*v1.Service](kubeClient, kclient.Filter{ObjectFilter: c.opts.DiscoveryNamespacesFilter.Filter})

registerHandlers[*v1.Service](c, c.services, "Services", c.onServiceEvent, nil)

c.endpoints = newEndpointSliceController(c)

// This is for getting the node IPs of a selected set of nodes
c.nodes = kclient.NewFiltered[*v1.Node](kubeClient, kclient.Filter{ObjectTransform: kubelib.StripNodeUnusedFields})
registerHandlers[*v1.Node](c, c.nodes, "Nodes", c.onNodeEvent, nil)

c.podsClient = kclient.NewFiltered[*v1.Pod](kubeClient, kclient.Filter{
ObjectFilter: c.opts.DiscoveryNamespacesFilter.Filter,
ObjectTransform: kubelib.StripPodUnusedFields,
})
c.pods = newPodCache(c, c.podsClient, func(key types.NamespacedName) {
c.queue.Push(func() error {
return c.endpoints.sync(key.Name, key.Namespace, model.EventAdd, true)
})
})
registerHandlers[*v1.Pod](c, c.podsClient, "Pods", c.pods.onEvent, c.pods.labelFilter)

注册 Handler

KubeRegistry 直接使用 Informer 来监听 Service、Pod 等 K8S Resource 的变化,Service 资源通过 ControllerHandlers 来响应。

而 ServiceEntry 通过将 Event Handler 注册到 ConfigController,来实现对 ServiceEntry 和 Workload CRD 资源更新的响应。

注册到 ControllerHandlers

ControllerHandlers 管理 Istio Service 和 Workload handler,NotifyXxxHandlers 定义了当遇到 Xxx 更新事件时,执行所有定义的 Handelrs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// ControllerHandlers is a utility to help Controller implementations manage their lists of handlers.
type ControllerHandlers struct {
mutex sync.RWMutex
serviceHandlers []ServiceHandler
workloadHandlers []func(*WorkloadInstance, Event)
}

// 。。。。

func (c *ControllerHandlers) NotifyServiceHandlers(prev, curr *Service, event Event) {
for _, f := range c.GetServiceHandlers() {
f(prev, curr, event)
}
}

func (c *ControllerHandlers) NotifyWorkloadHandlers(w *WorkloadInstance, event Event) {
for _, f := range c.GetWorkloadHandlers() {
f(w, event)
}
}

ServiceController 使用 AppendServiceHandler 方法注册服务变化事件的 handler(该 handler 会注册到 ControllerHandlers 中),当检测到服务发生变化时,向 XDSServer 的 pushChannel 中推送一条 PushRequest。

1
2
3
4
5
6
7
8
9
serviceHandler := func(prev, curr *model.Service, event model.Event) {
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: string(curr.Hostname), Namespace: curr.Attributes.Namespace}),
Reason: model.NewReasonStats(model.ServiceUpdate),
}
s.XDSServer.ConfigUpdate(pushReq)
}
s.ServiceController().AppendServiceHandler(serviceHandler)

注册到 ConfigController

所有 ServiceEntry 配置数据在 ConfigController 中,在初始化 ServiceEntryRegistry 时:将 ServiceEntry 和 WorkloadEntry CRD 配置变化 handler 注册到 ConfigController(通过 Informer 的 EventHandler)。

  • serviceEntryHandler 检测 ServiceEntry 变化。
  • workloadEntryHandler 检测 WorkloadEntry 变化。
1
2
3
4
5
6
7
8
9
10
11
// NewController creates a new ServiceEntry discovery service.
func NewController(configController model.ConfigStoreController, xdsUpdater model.XDSUpdater,
options ...Option,
) *Controller {
s := newController(configController, xdsUpdater, options...)
if configController != nil {
configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler)
configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)
}
return s
}