将 Discovery Server 注册为 Config Controller 和 Service Controller 的 Event Handler,监听配置和服务变化消息。
初始化 DiscoveryService,注册 GrpcServer。
关键数据结构
Server
pilot-dicovery 的初始化流程就是 boostrap.NewServer 方法初始化一个 pilot Server,Server 就是 pilot-discovery 组件的核心,其关键字段如下(安全性和可观测性相关的先忽略):
1 2 3 4 5 6 7 8 9 10 11
// Server contains the runtime configuration for the Pilot discovery service. type Server struct { XDSServer *xds.DiscoveryServer // xDS服务 environment *model.Environment // pilot环境所需要的API集合 kubeClient kubelib.Client // k8s客户端连接
// Environment provides an aggregate environmental API for Pilot type Environment struct { // Discovery interface for listing services and instances. ServiceDiscovery // 服务发现接口 // Config interface for listing routing rules ConfigStore // 配置(路由规则等)接口 // Watcher is the watcher for the mesh config (to be merged into the config store) mesh.Watcher // mesh配置监听 // NetworksWatcher (loaded from a config map) provides information about the // set of networks inside a mesh and how to route to endpoints in each // network. Each network provides information about the endpoints in a // routable L3 network. A single routable L3 network can have one or more // service registries. NetworksWatcher mesh.NetworksWatcher // mesh网络配置监听器 // pushContext holds information during push generation. It is reset on config change, at the beginning // of the pushAll. It will hold all errors and stats and possibly caches needed during the entire cache computation. pushContext *PushContext // 在推送xDS响应前,保存信息的上下文 // DomainSuffix provides a default domain for the Istio server. DomainSuffix string // EndpointShards for a service. This is a global (per-server) list, built from // incremental updates. This is keyed by service and namespace EndpointIndex *EndpointIndex // Cache for XDS resources. Cache XdsCache }
其中:
PushContext 是 pilot 推送 xDS 前,生成配置期间保存相关信息的上下文的地方,在全量推送配置和配置发生改变时重置。它会保存所有的错误和统计信息,并缓存一些配置的计算信息。
// DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs type DiscoveryServer struct { // Env is the model environment. Env *model.Environment // ConfigGenerator is responsible for generating data plane configuration using Istio networking // APIs and service registry info ConfigGenerator core.ConfigGenerator // 控制面 Istio 配置的生成器,如 VirtualService 等 // Generators allow customizing the generated config, based on the client metadata. // Key is the generator type - will match the Generator metadata to set the per-connection // default generator, or the combination of Generator metadata and TypeUrl to select a // different generator for a type. // Normal istio clients use the default generator - will not be impacted by this. Generators map[string]model.XdsResourceGenerator // 针对不同配置类型的定制化生成器 // concurrentPushLimit is a semaphore that limits the amount of concurrent XDS pushes. concurrentPushLimit chanstruct{} // RequestRateLimit limits the number of new XDS requests allowed. This helps prevent thundering hurd of incoming requests. RequestRateLimit *rate.Limiter // pushChannel is the buffer used for debouncing. // after debouncing the pushRequest will be sent to pushQueue pushChannel chan *model.PushRequest // 接收 push 请求的 channel // pushQueue is the buffer that used after debounce and before the real xds push. pushQueue *PushQueue // 防抖之后,真正 Push xDS 之前所用的缓冲队列 // adsClients reflect active gRPC channels, for both ADS and EDS. adsClients map[string]*Connection // 保存xDS连接 StatusReporter DistributionStatusCache // 监听连接断开、ACK // StatusGen is notified of connect/disconnect/nack on all connections StatusGen *StatusGen WorkloadEntryController *autoregistration.Controller // serverReady indicates caches have been synced up and server is ready to process requests. serverReady atomic.Bool // 表示缓存已同步,server 可以接受请求 // Cache for XDS resources Cache model.XdsCache // xDS 资源的缓存 }
iflen(meshConfig.ConfigSources) > 0 { // Using MCP for config. if err := s.initConfigSources(args); err != nil { return err } } elseif args.RegistryOptions.FileDir != "" { // Local files - should be added even if other options are specified store := memory.Make(collections.Pilot) configController := memory.NewController(store)
// Create the server for the discovery service. discoveryServer, err := bootstrap.NewServer(serverArgs) if err != nil { return fmt.Errorf("failed to create discovery service: %v", err) }
// Start the server if err := discoveryServer.Start(stop); err != nil { return fmt.Errorf("failed to start discovery service: %v", err) }