Dawn's Blogs

分享技术 记录成长

0%

Kuma学习笔记 (6) Inter CP

Kuma-cp 的 Intercp(Control Plane Intercommunication)组件用于 kuma 控制面之间的通信,旨在实现 kuma-cp 的水平扩展。

kuma 的水平扩展包含三个方面:

  • Leader 选举机制:在 kuma 中,leader 选举机制非常简单,就是选择最新加入的 kuma-cp 节点作为 Leader。作为 Leader,会接收成员的心跳,并且定期监测超时的 kuma-cp 成员。
  • 心跳机制:作为成员,会定期向 Leader 发送心跳,表明自己还存活。
  • EnvoyAdminServer:对于 Global Cp 的水平扩展多实例部署,会将 EnvoyAdmin 请求转发给 Leader Global Kuma-cp 执行。对于 Zone Cp 的水平扩展多实例部署,会在本实例执行对 envoy 发起 Admin 请求获取响应。

下文中的实例,表示 kuma-cp 水平扩展中的某一实例。

可以发现,kuma-cp 在水平扩展时,如果 leader 下线了没有自动故障切换机制,除非原来的 leader 再次上线或者有新的 kuma-cp 实例上线,leader 才会正常工作。

Leader 选举

流程

Kuma-cp 水平扩展的领导选举机制十分简单,选择最新加入的 kuma-cp 作为 Leader。

  1. 在开始时,首先调用 ReplaceLeader,将当前 kuma-cp 实例替换为 Leader
1
2
3
if err := r.catalog.ReplaceLeader(ctx, r.instance); err != nil {
writerLog.Error(err, "could not replace leader") // continue, it will be replaced in ticker anyways
}
  1. 所有的从节点会给 leader 发送心跳信息,表明自己存活。所以启动一个定时器,每一次读取存活的从节点(heartbeats.ResetAndCollect),用于领导节点更新保存 kuma-cp 水平扩展实例的 catalog。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ticker := time.NewTicker(r.interval)
for {
select {
case <-ticker.C:
start := core.Now()
instances := r.heartbeats.ResetAndCollect()
instances = append(instances, r.instance)
updated, err := r.catalog.Replace(ctx, instances)
if err != nil {
writerLog.Error(err, "could not update catalog")
continue
}
if updated {
writerLog.Info("instances catalog updated", "instances", instances)
} else {
writerLog.V(1).Info("no need to update instances, because the catalog is the same", "instances", instances)
}
r.metric.Observe(float64(core.Now().Sub(start).Milliseconds()))
case <-stop:
return nil
}
}

Catalog

Catalog 是一个接口,用于定义保存 kuma-cp 水平扩展实例信息的目录(在 Instance 中会记录实例的地址,InterCp EnvoyAdminServer 的开放端口,是否为 Leader)。此目录**由 leader 更新 **,其他从节点只能读取。定义了三个方法:

  1. Instances 方法:返回当前所有水平扩展的实例。
  2. Replace 方法:以入参 Instance 切片替换 Catalog 中保存的实例,返回是否有更新。
  3. ReplaceLeader 方法:以入参 Instance 替换为当前的 Leader。
1
2
3
4
5
6
7
8
9
type Reader interface {
Instances(context.Context) ([]Instance, error)
}

type Catalog interface {
Reader
Replace(context.Context, []Instance) (bool, error)
ReplaceLeader(context.Context, Instance) error
}

Catalog 的实现维护了一个 ResourceManager,对 Catalog 的所有更新(Replace 和 ReplaceLeader)都会反映在 ResourceManager 中(ResourceManager 用于更新 Kubernetes 资源或者更新 PostgreSQL)。

只有 leader 节点可以更新 catalog,从节点只能读 catalog。

心跳机制

作为成员,会定期向 Leader 发送心跳,表明自己还存活。

  1. 连接 leader 节点(从 catalog 中查询 leader 节点的地址,并连接),如果自己就是 leader 则直接返回,leader 节点不需要给自己发送心跳。
1
2
3
4
5
6
7
8
9
10
if h.leader == nil {
if err := h.connectToLeader(ctx); err != nil {
heartbeatLog.Error(err, "could not connect to leader")
return false
}
}
if h.leader.Id == h.request.InstanceId {
heartbeatLog.V(1).Info("this instance is a leader. No need to send a heartbeat.")
return true
}
  1. 向 leader 发送 ping,表明自己还存活。如果响应中表明目标节点不是 leader,则等待下一次时间片到达再次连接新的 leader。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
client, err := h.getClientFn(h.leader.InterCpURL())
if err != nil {
heartbeatLog.Error(err, "could not get or create a client to a leader")
h.leader = nil
return false
}
resp, err := client.Ping(ctx, h.request)
if err != nil {
heartbeatLog.Error(err, "could not send a heartbeat to a leader")
h.leader = nil
return false
}
if !resp.Leader {
heartbeatLog.V(1).Info("instance responded that it is no longer a leader")
h.leader = nil
}

EnvoyAdminServer

对于 Global Cp 的水平扩展多实例部署,会将 EnvoyAdmin 请求转发给 Leader Global Kuma-cp 执行。对于 Zone Cp 的水平扩展多实例部署,会在本实例执行对 envoy 发起 Admin 请求获取响应。

Kuma-cp 提供的 Envoy 接口如下:

1
2
3
4
5
service InterCPEnvoyAdminForwardService {
rpc XDSConfig(XDSConfigRequest) returns (XDSConfigResponse);
rpc Stats(StatsRequest) returns (StatsResponse);
rpc Clusters(ClustersRequest) returns (ClustersResponse);
}

以 XDSConfig 为例,说明 kuma-cp 提供的 EnvoyAdmin 接口的通用流程:

  1. 首先根据资源类型(仅 Ingress 和 Dataplane)、资源名、资源所在的 mesh(多租户下,几个集群可以存在多个 mesh),查询资源所在地址(即目标 envoy 的地址)。
  2. adminClient 向资源所在地址发起 ConfigDump 请求(global 和 zone cp 的行为不同在此不同),得到 xDS 配置信息,返回响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (s *server) XDSConfig(ctx context.Context, req *mesh_proto.XDSConfigRequest) (*mesh_proto.XDSConfigResponse, error) {
ctx = extractTenantMetadata(ctx)
serverLog.V(1).Info("received forwarded request", "operation", "XDSConfig", "request", req)
resWithAddr, err := s.resWithAddress(ctx, req.ResourceType, req.ResourceName, req.ResourceMesh)
if err != nil {
return nil, err
}
configDump, err := s.adminClient.ConfigDump(ctx, resWithAddr)
if err != nil {
return nil, err
}
return &mesh_proto.XDSConfigResponse{
Result: &mesh_proto.XDSConfigResponse_Config{
Config: configDump,
},
}, nil
}

在启动时:

  • 若是 Global kuma-cp,会实例化一个可以转发请求的 forwardingClient(向 zone kuma-cp 转发请求获取响应,或者转发给 leader global kuma-cp)。
  • 若是 Zone kuma-cp,则会实例化一个普通的 client,用于向 envoy 发起 admin 请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if cfg.Mode == config_core.Global {
kdsEnvoyAdminClient := admin.NewKDSEnvoyAdminClient(
builder.KDSContext().EnvoyAdminRPCs,
cfg.Store.Type == store.KubernetesStore,
)
forwardingClient := envoyadmin.NewForwardingEnvoyAdminClient(
builder.ReadOnlyResourceManager(),
catalog.NewConfigCatalog(resourceManager),
builder.GetInstanceId(),
intercp.PooledEnvoyAdminClientFn(builder.InterCPClientPool()),
kdsEnvoyAdminClient,
)
builder.WithEnvoyAdminClient(forwardingClient)
} else {
builder.WithEnvoyAdminClient(admin.NewEnvoyAdminClient(
resourceManager,
builder.CaManagers(),
builder.Config().GetEnvoyAdminPort(),
))
}