Dawn's Blogs

分享技术 记录成长

0%

Istio学习笔记 (6) Pilot 配置和服务发现启动流程梳理

Pilot-discovery 的主要启动过程如下:

  • 创建并初始化 ConfigController。
  • 创建并初始化 ServiceController。
  • 将 Discovery Server 注册为 Config Controller 和 Service Controller 的 Event Handler,监听配置和服务变化消息。
  • 初始化 DiscoveryService,注册 GrpcServer。

pilot-discovery-init

关键数据结构

Server

pilot-dicovery 的初始化流程就是 boostrap.NewServer 方法初始化一个 pilot Server,Server 就是 pilot-discovery 组件的核心,其关键字段如下(安全性和可观测性相关的先忽略):

1
2
3
4
5
6
7
8
9
10
11
// Server contains the runtime configuration for the Pilot discovery service.
type Server struct {
XDSServer *xds.DiscoveryServer // xDS服务
environment *model.Environment // pilot环境所需要的API集合
kubeClient kubelib.Client // k8s客户端连接

configController model.ConfigStoreController // 统一处理配置数据(如 VirtualService 等) 的 Controller
ConfigStores []model.ConfigStoreController // 不同配置信息的缓存器,提供 Get、List、Create 等方法
serviceEntryController *serviceentry.Controller // 单独处理 ServiceEntry 的 Controller
fileWatcher filewatcher.FileWatcher // 文件监听器,主要 watch meshconfig 和 networks 配置文件等
}

Environment

Environment 为 pilot 提供了一个汇总的、运行中所需的 API 集合。主要字段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Environment provides an aggregate environmental API for Pilot
type Environment struct {
// Discovery interface for listing services and instances.
ServiceDiscovery // 服务发现接口
// Config interface for listing routing rules
ConfigStore // 配置(路由规则等)接口
// Watcher is the watcher for the mesh config (to be merged into the config store)
mesh.Watcher // mesh配置监听
// NetworksWatcher (loaded from a config map) provides information about the
// set of networks inside a mesh and how to route to endpoints in each
// network. Each network provides information about the endpoints in a
// routable L3 network. A single routable L3 network can have one or more
// service registries.
NetworksWatcher mesh.NetworksWatcher // mesh网络配置监听器
// pushContext holds information during push generation. It is reset on config change, at the beginning
// of the pushAll. It will hold all errors and stats and possibly caches needed during the entire cache computation.
pushContext *PushContext // 在推送xDS响应前,保存信息的上下文
// DomainSuffix provides a default domain for the Istio server.
DomainSuffix string
// EndpointShards for a service. This is a global (per-server) list, built from
// incremental updates. This is keyed by service and namespace
EndpointIndex *EndpointIndex
// Cache for XDS resources.
Cache XdsCache
}

其中:

  • PushContext 是 pilot 推送 xDS 前,生成配置期间保存相关信息的上下文的地方,在全量推送配置和配置发生改变时重置。它会保存所有的错误和统计信息,并缓存一些配置的计算信息。
  • ServiceDiscovery 提供了枚举 Istio 服务和实例的方法。
  • mesh.Watch 和 mesh.NetworksWatcher 负责监听 istiod 启动时创建的 ConfigMap(包括 mesh 配置和网络配置),映射挂载到 Pod 的文件系统中,监听器将在监听到配置文件变化时运行预先注册的 Handler。

DiscoveryServer

DiscoveryServer 为 Envoy xDS API 的 gRPC 实现,监听 xDS 请求的服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs
type DiscoveryServer struct {
// Env is the model environment.
Env *model.Environment
// ConfigGenerator is responsible for generating data plane configuration using Istio networking
// APIs and service registry info
ConfigGenerator core.ConfigGenerator // 控制面 Istio 配置的生成器,如 VirtualService 等
// Generators allow customizing the generated config, based on the client metadata.
// Key is the generator type - will match the Generator metadata to set the per-connection
// default generator, or the combination of Generator metadata and TypeUrl to select a
// different generator for a type.
// Normal istio clients use the default generator - will not be impacted by this.
Generators map[string]model.XdsResourceGenerator // 针对不同配置类型的定制化生成器
// concurrentPushLimit is a semaphore that limits the amount of concurrent XDS pushes.
concurrentPushLimit chan struct{}
// RequestRateLimit limits the number of new XDS requests allowed. This helps prevent thundering hurd of incoming requests.
RequestRateLimit *rate.Limiter
// pushChannel is the buffer used for debouncing.
// after debouncing the pushRequest will be sent to pushQueue
pushChannel chan *model.PushRequest // 接收 push 请求的 channel
// pushQueue is the buffer that used after debounce and before the real xds push.
pushQueue *PushQueue // 防抖之后,真正 Push xDS 之前所用的缓冲队列
// adsClients reflect active gRPC channels, for both ADS and EDS.
adsClients map[string]*Connection // 保存xDS连接
StatusReporter DistributionStatusCache // 监听连接断开、ACK
// StatusGen is notified of connect/disconnect/nack on all connections
StatusGen *StatusGen
WorkloadEntryController *autoregistration.Controller
// serverReady indicates caches have been synced up and server is ready to process requests.
serverReady atomic.Bool // 表示缓存已同步,server 可以接受请求
// Cache for XDS resources
Cache model.XdsCache // xDS 资源的缓存
}

初始化流程

初始化流程如下:

  • 初始化 Environment

初始化 Environment

在初始化 Environment 时,同时初始化了一个聚合所有注册中心的 Controller 作为 Environment 的 ServiceDiscovery。这里传入了一个参数 MeshHolder,就是想利用 Environment 中的 mesh.Watcher 将 mesh 配置同步过去

1
2
3
4
5
6
7
8
e := model.NewEnvironment()
e.DomainSuffix = args.RegistryOptions.KubeOptions.DomainSuffix
e.SetLedger(buildLedger(args.RegistryOptions))

ac := aggregate.NewController(aggregate.Options{
MeshHolder: e,
})
e.ServiceDiscovery = ac

初始化 Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
s := &Server{
clusterID: getClusterID(args),
environment: e,
fileWatcher: filewatcher.NewWatcher(),
httpMux: http.NewServeMux(),
monitoringMux: http.NewServeMux(),
readinessProbes: make(map[string]readinessProbe),
readinessFlags: &readinessFlags{},
workloadTrustBundle: tb.NewTrustBundle(nil),
server: server.New(),
shutdownDuration: args.ShutdownDuration,
internalStop: make(chan struct{}),
istiodCertBundleWatcher: keycertbundle.NewWatcher(),
webhookInfo: &webhookInfo{},
}

// Apply custom initialization functions.
for _, fn := range initFuncs {
fn(s)
}
// Initialize workload Trust Bundle before XDS Server
e.TrustBundle = s.workloadTrustBundle
s.XDSServer = xds.NewDiscoveryServer(e, args.RegistryOptions.KubeOptions.ClusterAliases)

初始化 HTTP GRPC Server

initServer 初始化 HTTP 和 GRPC Server,其中 initGrpcServer 方法将 xDS 服务注册到了 Grpc 服务器上

1
2
3
4
5
6
7
8
9
10
func (s *Server) initGrpcServer(options *istiokeepalive.Options) {
interceptors := []grpc.UnaryServerInterceptor{
// setup server prometheus monitoring (as final interceptor in chain)
grpcprom.UnaryServerInterceptor,
}
grpcOptions := istiogrpc.ServerOptions(options, interceptors...)
s.grpcServer = grpc.NewServer(grpcOptions...)
s.XDSServer.Register(s.grpcServer)
reflection.Register(s.grpcServer)
}

HTTP 只注册了 ready 路由,探测 istio pilot-discovery 是否已经准备好。

1
2
// Readiness Handler.
s.httpMux.HandleFunc("/ready", s.istiodReadyHandler)

初始化 KubeClient、Mesh 配置、Mesh 网络配置、Mesh Handlers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Apply the arguments to the configuration.
if err := s.initKubeClient(args); err != nil {
return nil, fmt.Errorf("error initializing kube client: %v", err)
}

s.initMeshConfiguration(args, s.fileWatcher)
spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())

s.initMeshNetworks(args, s.fileWatcher)
s.initMeshHandlers()
s.environment.Init()
if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {
return nil, err
}

初始化 ConfigController 和 ServiceController

1
2
3
if err := s.initControllers(args); err != nil {
return nil, err
}

在 initControllers 方法中:

  • initConfigController 和 initServiceControllers 初始化了 ConfigController 和 ServiceController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// initControllers initializes the controllers.
func (s *Server) initControllers(args *PilotArgs) error {
log.Info("initializing controllers")
s.initMulticluster(args)

s.initSDSServer()

if features.EnableEnhancedResourceScoping {
// setup namespace filter
args.RegistryOptions.KubeOptions.DiscoveryNamespacesFilter = s.multiclusterController.DiscoveryNamespacesFilter
}
if err := s.initConfigController(args); err != nil {
return fmt.Errorf("error initializing config controller: %v", err)
}
if err := s.initServiceControllers(args); err != nil {
return fmt.Errorf("error initializing service controllers: %v", err)
}
return nil
}

initConfigController

istio 可以使用 MCP、Memory、K8S 作为 ConfigController:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if len(meshConfig.ConfigSources) > 0 {
// Using MCP for config.
if err := s.initConfigSources(args); err != nil {
return err
}
} else if args.RegistryOptions.FileDir != "" {
// Local files - should be added even if other options are specified
store := memory.Make(collections.Pilot)
configController := memory.NewController(store)

err := s.makeFileMonitor(args.RegistryOptions.FileDir, args.RegistryOptions.KubeOptions.DomainSuffix, configController)
if err != nil {
return err
}
s.ConfigStores = append(s.ConfigStores, configController)
} else {
err := s.initK8SConfigStore(args)
if err != nil {
return err
}
}

最后进行进一步的封装,作为聚合 ConfigController。

1
2
3
4
5
6
7
8
9
// Wrap the config controller with a cache.
aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)
if err != nil {
return err
}
s.configController = aggregateConfigController

// Create the config store.
s.environment.ConfigStore = aggregateConfigController

K8S ConfigController

特别的,当使用 k8s Controller 时,使用 crdclient.Client 作为 ConfigController。crdlient 对 Isito 定义的每一个 CRD,都开启一个 Informer 监听配置变化。istio 定义的 CRD 资源如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Pilot contains only collections used by Pilot.
Pilot = collection.NewSchemasBuilder().
MustAdd(AuthorizationPolicy).
MustAdd(DestinationRule).
MustAdd(EnvoyFilter).
MustAdd(Gateway).
MustAdd(PeerAuthentication).
MustAdd(ProxyConfig).
MustAdd(RequestAuthentication).
MustAdd(ServiceEntry).
MustAdd(Sidecar).
MustAdd(Telemetry).
MustAdd(VirtualService).
MustAdd(WasmPlugin).
MustAdd(WorkloadEntry).
MustAdd(WorkloadGroup).
Build()

initServiceController

从之前初始化的 environment.ServiceDiscovery 中获取已经添加的注册中心。注册中心需要关注 Kubernetes 集群之外的服务,这些服务基本都是通过 ServiceEntry 注册到控制面的,所有 ServiceEntry 配置数据目前还都在之前初始化的 configController 配置中心控制器中,这里将 ServiceEntry 数据单独拎出来初始化一个 ServicEntry 注册中心,加入到 serviceControllers 中。

1
2
3
4
5
6
7
serviceControllers := s.ServiceController()

s.serviceEntryController = serviceentry.NewController(
s.configController, s.XDSServer,
serviceentry.WithClusterID(s.clusterID),
)
serviceControllers.AddRegistry(s.serviceEntryController)

默认支持 KubeRegistry(1.19 版本已经不会内置使用第三方注册中心了),使用 initKubeRegistry 方法初始化 KubeRegistry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
registered := sets.New[provider.ID]()
for _, r := range args.RegistryOptions.Registries {
serviceRegistry := provider.ID(r)
if registered.Contains(serviceRegistry) {
log.Warnf("%s registry specified multiple times.", r)
continue
}
registered.Insert(serviceRegistry)
log.Infof("Adding %s registry adapter", serviceRegistry)
switch serviceRegistry {
case provider.Kubernetes:
if err := s.initKubeRegistry(args); err != nil {
return err
}
default:
return fmt.Errorf("service registry %s is not supported", r)
}
}

初始化 RegistryEventHandlers

此流程为 ConfigController 和 ServiceController 初始化当配置和服务更新时的 envent handler。

ServiceController Event Handler

当服务发生更新时,生成一个全量推送请求。

1
2
3
4
5
6
7
8
9
10
// Flush cached discovery responses whenever services configuration change.
serviceHandler := func(prev, curr *model.Service, event model.Event) {
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: string(curr.Hostname), Namespace: curr.Attributes.Namespace}),
Reason: model.NewReasonStats(model.ServiceUpdate),
}
s.XDSServer.ConfigUpdate(pushReq)
}
s.ServiceController().AppendServiceHandler(serviceHandler)

ConfigController Event Handler

为 istio 定义的 CRD 注册 event handler,当配置发送更新时,生成一个全量推送请求。

但是会跳过 ServiceEntry 和 WorkGroup,这两个资源的下发是由 ServiceEntryStore 管理的,在初始化 ServiceController 时加入的 ServiceEntry 注册中心会处理,所以跳过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
configHandler := func(prev config.Config, curr config.Config, event model.Event) {
defer func() {
if event != model.EventDelete {
s.statusReporter.AddInProgressResource(curr)
} else {
s.statusReporter.DeleteInProgressResource(curr)
}
}()
log.Debugf("Handle event %s for configuration %s", event, curr.Key())
// For update events, trigger push only if spec has changed.
if event == model.EventUpdate && !needsPush(prev, curr) {
log.Debugf("skipping push for %s as spec has not changed", prev.Key())
return
}
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.MustFromGVK(curr.GroupVersionKind), Name: curr.Name, Namespace: curr.Namespace}),
Reason: model.NewReasonStats(model.ConfigUpdate),
}
s.XDSServer.ConfigUpdate(pushReq)
}
schemas := collections.Pilot.All()
if features.EnableGatewayAPI {
schemas = collections.PilotGatewayAPI().All()
}
for _, schema := range schemas {
// This resource type was handled in external/servicediscovery.go, no need to rehandle here.
if schema.GroupVersionKind() == gvk.ServiceEntry {
continue
}
if schema.GroupVersionKind() == gvk.WorkloadEntry {
continue
}
if schema.GroupVersionKind() == gvk.WorkloadGroup {
continue
}

s.configController.RegisterEventHandler(schema.GroupVersionKind(), configHandler)
}

启动流程

在初始化 Server 后,启动 Server。

1
2
3
4
5
6
7
8
9
10
// Create the server for the discovery service.
discoveryServer, err := bootstrap.NewServer(serverArgs)
if err != nil {
return fmt.Errorf("failed to create discovery service: %v", err)
}

// Start the server
if err := discoveryServer.Start(stop); err != nil {
return fmt.Errorf("failed to start discovery service: %v", err)
}