Kuma-cp 的 Intercp(Control Plane Intercommunication)组件用于 kuma 控制面之间的通信,旨在实现 kuma-cp 的水平扩展。
kuma 的水平扩展包含三个方面:
Leader 选举机制: 在 kuma 中,leader 选举机制非常简单,就是选择最新加入的 kuma-cp 节点作为 Leader 。作为 Leader,会接收成员的心跳,并且定期监测超时的 kuma-cp 成员。
心跳机制: 作为成员,会定期向 Leader 发送心跳,表明自己还存活。
EnvoyAdminServer: 对于 Global Cp 的水平扩展多实例部署,会将 EnvoyAdmin 请求转发 给 Leader Global Kuma-cp 执行。对于 Zone Cp 的水平扩展多实例部署,会在本实例执行对 envoy 发起 Admin 请求获取响应。
下文中的实例,表示 kuma-cp 水平扩展中的某一实例。
可以发现,kuma-cp 在水平扩展时,如果 leader 下线了没有自动故障切换机制 ,除非原来的 leader 再次上线或者有新的 kuma-cp 实例上线,leader 才会正常工作。
Leader 选举 流程 Kuma-cp 水平扩展的领导选举机制十分简单,选择最新 加入的 kuma-cp 作为 Leader。
在开始时,首先调用 ReplaceLeader,将当前 kuma-cp 实例替换为 Leader 。
1 2 3 if err := r.catalog.ReplaceLeader(ctx, r.instance); err != nil { writerLog.Error(err, "could not replace leader" ) }
所有的从节点会给 leader 发送心跳信息,表明自己存活。所以启动一个定时器 ,每一次读取存活的从节点 (heartbeats.ResetAndCollect),用于领导节点更新 保存 kuma-cp 水平扩展实例的 catalog。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ticker := time.NewTicker(r.interval) for { select { case <-ticker.C: start := core.Now() instances := r.heartbeats.ResetAndCollect() instances = append (instances, r.instance) updated, err := r.catalog.Replace(ctx, instances) if err != nil { writerLog.Error(err, "could not update catalog" ) continue } if updated { writerLog.Info("instances catalog updated" , "instances" , instances) } else { writerLog.V(1 ).Info("no need to update instances, because the catalog is the same" , "instances" , instances) } r.metric.Observe(float64 (core.Now().Sub(start).Milliseconds())) case <-stop: return nil } }
Catalog Catalog 是一个接口,用于定义保存 kuma-cp 水平扩展实例信息 的目录(在 Instance 中会记录实例的地址,InterCp EnvoyAdminServer 的开放端口,是否为 Leader)。此目录**由 leader 更新 **,其他从节点只能读取。定义了三个方法:
Instances 方法: 返回当前所有水平扩展的实例。
Replace 方法: 以入参 Instance 切片替换 Catalog 中保存的实例,返回是否有更新。
ReplaceLeader 方法: 以入参 Instance 替换为当前的 Leader。
1 2 3 4 5 6 7 8 9 type Reader interface { Instances(context.Context) ([]Instance, error) } type Catalog interface { Reader Replace(context.Context, []Instance) (bool , error) ReplaceLeader(context.Context, Instance) error }
Catalog 的实现维护了一个 ResourceManager ,对 Catalog 的所有更新(Replace 和 ReplaceLeader)都会反映在 ResourceManager 中(ResourceManager 用于更新 Kubernetes 资源或者更新 PostgreSQL)。
只有 leader 节点可以更新 catalog ,从节点只能读 catalog。
心跳机制 作为成员,会定期向 Leader 发送心跳,表明自己还存活。
连接 leader 节点 (从 catalog 中查询 leader 节点的地址,并连接),如果自己就是 leader 则直接返回,leader 节点不需要给自己发送心跳。
1 2 3 4 5 6 7 8 9 10 if h.leader == nil { if err := h.connectToLeader(ctx); err != nil { heartbeatLog.Error(err, "could not connect to leader" ) return false } } if h.leader.Id == h.request.InstanceId { heartbeatLog.V(1 ).Info("this instance is a leader. No need to send a heartbeat." ) return true }
向 leader 发送 ping,表明自己还存活 。如果响应中表明目标节点不是 leader,则等待下一次时间片到达再次连接新的 leader。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 client, err := h.getClientFn(h.leader.InterCpURL()) if err != nil { heartbeatLog.Error(err, "could not get or create a client to a leader" ) h.leader = nil return false } resp, err := client.Ping(ctx, h.request) if err != nil { heartbeatLog.Error(err, "could not send a heartbeat to a leader" ) h.leader = nil return false } if !resp.Leader { heartbeatLog.V(1 ).Info("instance responded that it is no longer a leader" ) h.leader = nil }
EnvoyAdminServer 对于 Global Cp 的水平扩展多实例部署,会将 EnvoyAdmin 请求转发 给 Leader Global Kuma-cp 执行。对于 Zone Cp 的水平扩展多实例部署,会在本实例执行对 envoy 发起 Admin 请求获取响应。
Kuma-cp 提供的 Envoy 接口如下:
1 2 3 4 5 service InterCPEnvoyAdminForwardService { rpc XDSConfig(XDSConfigRequest) returns (XDSConfigResponse) ; rpc Stats(StatsRequest) returns (StatsResponse) ; rpc Clusters(ClustersRequest) returns (ClustersResponse) ; }
以 XDSConfig 为例,说明 kuma-cp 提供的 EnvoyAdmin 接口的通用流程:
首先根据资源类型(仅 Ingress 和 Dataplane )、资源名、资源所在的 mesh(多租户下,几个集群可以存在多个 mesh),查询资源所在地址(即目标 envoy 的地址)。
adminClient 向资源所在地址发起 ConfigDump 请求 (global 和 zone cp 的行为不同在此不同),得到 xDS 配置信息,返回响应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (s *server) XDSConfig (ctx context.Context, req *mesh_proto.XDSConfigRequest) (*mesh_proto.XDSConfigResponse, error) { ctx = extractTenantMetadata(ctx) serverLog.V(1 ).Info("received forwarded request" , "operation" , "XDSConfig" , "request" , req) resWithAddr, err := s.resWithAddress(ctx, req.ResourceType, req.ResourceName, req.ResourceMesh) if err != nil { return nil , err } configDump, err := s.adminClient.ConfigDump(ctx, resWithAddr) if err != nil { return nil , err } return &mesh_proto.XDSConfigResponse{ Result: &mesh_proto.XDSConfigResponse_Config{ Config: configDump, }, }, nil }
在启动时:
若是 Global kuma-cp,会实例化一个可以转发请求的 forwardingClient(向 zone kuma-cp 转发请求获取响应,或者转发给 leader global kuma-cp)。
若是 Zone kuma-cp,则会实例化一个普通的 client,用于向 envoy 发起 admin 请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 if cfg.Mode == config_core.Global { kdsEnvoyAdminClient := admin.NewKDSEnvoyAdminClient( builder.KDSContext().EnvoyAdminRPCs, cfg.Store.Type == store.KubernetesStore, ) forwardingClient := envoyadmin.NewForwardingEnvoyAdminClient( builder.ReadOnlyResourceManager(), catalog.NewConfigCatalog(resourceManager), builder.GetInstanceId(), intercp.PooledEnvoyAdminClientFn(builder.InterCPClientPool()), kdsEnvoyAdminClient, ) builder.WithEnvoyAdminClient(forwardingClient) } else { builder.WithEnvoyAdminClient(admin.NewEnvoyAdminClient( resourceManager, builder.CaManagers(), builder.Config().GetEnvoyAdminPort(), )) }