Dawn's Blogs

分享技术 记录成长

0%

阅读dubbo-kubernetes的启发 (6) 控制平面之webhook和snp

本节说明 webhook 和 snp 组件,在 dubbo-cp 中:

  • webhook 用于实现动态准入控制器的注入功能。
  • snp(ServiceNameMapping)用于记录接口名 InterfaceName 到应用名 AppName 的映射并更新到 SNP CRD 中,通过 DDS 的通知各个 Dubbo 应用进行流量控制等数据平面的操作。

webhook

webhook 组件结构体定义如下,主要包括以下几个部分:

  • WebhookServer:动态准入控制的 Webhook 服务器。
  • JavaInjector:用于在创建 Pod 时注入信息。
1
2
3
4
5
6
7
8
type WebhookServer struct {
Options *dubbo_cp.Config
WebhookClient webhookclient.Client
CertStorage *cert.CertStorage

WebhookServer *webhook.Webhook
JavaInjector *patch.JavaSdk
}

Webhook 服务器

Webhook 服务器用于监听 Kubernetes 的动态准入控制请求,进而修改创建 Pod 的 Spec 信息实现信息注入。HTTP Path 为 mutating-services,Handler 为 Mutate。

其中 Webhook.Paches 为注入函数,getCertificate 为 dubbo-cp 服务器证书的获取方式,服务器证书通过证书管理模块管理。

1
2
3
4
5
webhookServer.WebhookServer = webhook.NewWebhook(
func(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
return rt.CertStorage().GetServerCert(info.ServerName), nil
})
webhookServer.WebhookServer.Init(rt.Config())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type (
PodPatch func(*v1.Pod) (*v1.Pod, error)
GetCertificate func(*tls.ClientHelloInfo) (*tls.Certificate, error)
)

type Webhook struct {
Patches []PodPatch
AllowOnErr bool
getCertificate GetCertificate
Server *http.Server
}

func NewWebhook(certificate GetCertificate) *Webhook {
return &Webhook{
getCertificate: certificate,
AllowOnErr: true,
}
}

func (wh *Webhook) NewServer(port int32) *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/health", wh.ServeHealth)
mux.HandleFunc("/mutating-services", wh.Mutate)
return &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
TLSConfig: &tls.Config{
GetCertificate: wh.getCertificate,
},
}
}

Mutate

Webhook 的 Mutate 方法用于动态准入控制,修改 Pod,注入信息。在 Mutate 执行的过程中,会依次执行 Patches 内所定义的函数,完成信息的注入。

更新 K8s webhook 配置

webhook 在启动时会更新 Kubernetes 动态准入 webhook 配置,更新的是 Kubernetes MutatingWebhookConfiguration 资源,用于指定:

  • 创建 Pod 时进行动态准入配置。
  • 配置在进行动态准入控制 webhook 的地址为 dubbo-cp Kubernetes Service
  • 配置在 webhook 过程中所信任的 webhook 服务器 CA 证书

在后续过程中,dubbo-cp 的证书管理模块用于更新动态准入 webhook 的所使用的证书。

注入信息

dubbo-cp 在创建 Pod 时,注入的信息目前包括两类:

  • 为 Dubbo 应用 Pod 注入 Registry 信息。通过 dubboctl 安装的组件中,zk 和 nacos 会被分别打上 dubbo.apache.org/zookeeper: true, dubbo.apache.org/nacos: true 的标签。

    • 当用户在 Pod 里面打上相应 labels 表示需要自动注入注册中心地址时,会将注册中心的地址(通过查找带有相应 label 的 Service 得到注册中心地址)注入到 DUBBO_REGISTRY_ADDRESS 环境变量中去,以供 Dubbo SDK 使用。
    • 默认的注册中心优先级为 zookeeper > nacos > k8s
    • 当 Pod 的 DUBBO_REGISTRY_ADDRESS 环境变量已经有值时,不会进行注入。
  • 注入 CA 信息。

    • 当用户在 Pod 里面打上相应 labels 表示需要自动注入 CA 信息时,会自动注入 volumes 和环境变量。

    • 当 Pod 中已经存在相关信息时,不会信息注入。

在注入 CA 信息时,包括将 ServiceAccount Token 注入到指定路径下,将 CA 证书注入到指定目录下

在注入环境变量时,会注入 CA 地址(也就是 dubbo-cp Kubernetes Service 地址)、CA 证书目录、认证 Token、Token 类型以供 Dubbo 应用使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (s *JavaSdk) injectContainers(c *v1.Container) {
c.Env = append(c.Env, v1.EnvVar{
Name: "DUBBO_CA_ADDRESS",
Value: s.options.KubeConfig.ServiceName + "." + s.options.KubeConfig.Namespace + ".svc:" + strconv.Itoa(s.options.GrpcServer.SecureServerPort),
})
c.Env = append(c.Env, v1.EnvVar{
Name: "DUBBO_CA_CERT_PATH",
Value: "/var/run/secrets/dubbo-ca-cert/ca.crt",
})
c.Env = append(c.Env, v1.EnvVar{
Name: "DUBBO_OIDC_TOKEN",
Value: "/var/run/secrets/dubbo-ca-token/token",
})
c.Env = append(c.Env, v1.EnvVar{
Name: "DUBBO_OIDC_TOKEN_TYPE",
Value: "dubbo-ca-token",
})

c.VolumeMounts = append(c.VolumeMounts, v1.VolumeMount{
Name: "dubbo-ca-token",
MountPath: "/var/run/secrets/dubbo-ca-token",
ReadOnly: true,
})
c.VolumeMounts = append(c.VolumeMounts, v1.VolumeMount{
Name: "dubbo-ca-cert",
MountPath: "/var/run/secrets/dubbo-ca-cert",
ReadOnly: true,
})
}

func (s *JavaSdk) injectVolumes(target *v1.Pod, expireSeconds int64) {
target.Spec.Volumes = append(target.Spec.Volumes, v1.Volume{
Name: "dubbo-ca-token",
VolumeSource: v1.VolumeSource{
Projected: &v1.ProjectedVolumeSource{
Sources: []v1.VolumeProjection{
{
ServiceAccountToken: &v1.ServiceAccountTokenProjection{
Audience: "dubbo-ca",
ExpirationSeconds: &expireSeconds,
Path: "token",
},
},
},
},
},
})
target.Spec.Volumes = append(target.Spec.Volumes, v1.Volume{
Name: "dubbo-ca-cert",
VolumeSource: v1.VolumeSource{
Projected: &v1.ProjectedVolumeSource{
Sources: []v1.VolumeProjection{
{
ConfigMap: &v1.ConfigMapProjection{
LocalObjectReference: v1.LocalObjectReference{
Name: "dubbo-ca-cert",
},
Items: []v1.KeyToPath{
{
Key: "ca.crt",
Path: "ca.crt",
},
},
},
},
},
},
},
})
}

snp

snp(ServiceNameMapping)用于记录接口名 InterfaceName 到应用名 AppName 的映射并更新到 SNP CRD 中,通过 DDS 的通知各个 Dubbo 应用进行流量控制等数据平面的操作。

接口定义

Dubbo 应用会向 SNP 接口上报 Service Name Mapping,即这个节点所在的 namespace、应用名,以及这个 dubbo 应用所拥有的接口名。

在 Dubbo 中,应用名对应于一个 Dubbo 应用,接口名对应于 Protobuf 文件中定义的 Service(一个应用中可能存在多个接口)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Provides an service for reporting the mapping relationship between interface => cluster
// the cluster name will be versioned FQDN. such as "demo.default.svc.cluster.local"
service ServiceNameMappingService{
rpc registerServiceAppMapping(ServiceMappingRequest) returns (ServiceMappingResponse);
}

// When dubbo provider start up, it reports its applicationName and its interfaceName,
// and Dubbo consumer will get the service name mapping info by dds.
message ServiceMappingRequest{
// This is namespace of dubbo server
string namespace = 1;

string applicationName = 2;

repeated string interfaceNames = 3;
}

message ServiceMappingResponse{
bool success = 1;
string message = 2;
}

接口实现

在 SNP 服务器中,核心是一个队列(chan)。在服务器收到 dubbo 应用上报的 snp 信息后,将映射关系推送到队列中。

映射关系为 (namespace, InterfaceName) —> []ApplicationName

SNP CRD 的定义也是如此,其 spec 定义如下。一个 CRD 对象记录包含 InterfaceName 的所有 dubbo 应用。

1
2
3
4
message ServiceNameMapping {
string interfaceName = 1;
repeated string applicationNames = 2;
}

SNP 服务器会定期的检查队列进行合并(防止网络抖动)后,更改 ServiceNameMapping CRD,DDS 通过 Informer 机制检查到 CRD 变化后,通知各个 Dubbo 应用(数据平面)进行流量控制等数据平面的操作。

debounce

debounce 就是定期去检查队列,经过合并后通过 pushFn 推送更新(在这里为更新 CRD),这里借鉴了 istio 的 debounce 机制。

在还没有 pushFn 推送更新时,如果有新的 snp 信息到达,则会调用 Merge 方法,对 snp 信息进行合并。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (s *Snp) debounce(stopCh <-chan struct{}, pushFn func(req *RegisterRequest)) {
ch := s.queue
var timeChan <-chan time.Time
var startDebounce time.Time
var lastConfigUpdateTime time.Time

pushCounter := 0
debouncedEvents := 0

var req *RegisterRequest

free := true
freeCh := make(chan struct{}, 1)

push := func(req *RegisterRequest) {
pushFn(req)
freeCh <- struct{}{}
}

pushWorker := func() {
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
if eventDelay >= s.config.Dds.Debounce.Max || quietTime >= s.config.Dds.Debounce.After {
if req != nil {
pushCounter++

if req.ConfigsUpdated != nil {
logger.Infof("[ServiceMapping] Push debounce stable[%d] %d for config %s: %v since last change, %v since last push",
pushCounter, debouncedEvents, configsUpdated(req),
quietTime, eventDelay)
}
free = false
go push(req)
req = nil
debouncedEvents = 0
}
} else {
timeChan = time.After(s.config.Dds.Debounce.After - quietTime)
}
}

for {
select {
case <-freeCh:
free = true
pushWorker()
case r := <-ch:
if !s.config.Dds.Debounce.Enable {
go push(r)
req = nil
continue
}

lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 {
timeChan = time.After(200 * time.Millisecond)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++

req = req.Merge(r)
case <-timeChan:
if free {
pushWorker()
}
case <-stopCh:
return
}
}
}

pushFn

在 snp 中,推送更新实际上就是更新 ServiceNameMapping CRD。每一个 Interface 为一个对象,创建或者更新相映的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func tryRegister(kubeClient versioned.Interface, namespace, interfaceName string, newApps []string) error {
logger.Debugf("[ServiceMapping] try register [%s] in namespace [%s] with [%v] apps", interfaceName, namespace, len(newApps))
snp, created, err := getOrCreateSnp(kubeClient, namespace, interfaceName, newApps)
if created {
logger.Debugf("[ServiceMapping] register success, revision:%s", snp.ResourceVersion)
return nil
}
if err != nil {
return err
}

previousLen := len(snp.Spec.ApplicationNames)
previousAppNames := make(map[string]struct{}, previousLen)
for _, name := range snp.Spec.ApplicationNames {
previousAppNames[name] = struct{}{}
}
for _, newApp := range newApps {
previousAppNames[newApp] = struct{}{}
}
if len(previousAppNames) == previousLen {
logger.Debugf("[ServiceMapping] [%s] has been registered: %v", interfaceName, newApps)
return nil
}

mergedApps := make([]string, 0, len(previousAppNames))
for name := range previousAppNames {
mergedApps = append(mergedApps, name)
}
snp.Spec.ApplicationNames = mergedApps
snpInterface := kubeClient.DubboV1alpha1().ServiceNameMappings(namespace)
snp, err = snpInterface.Update(context.Background(), snp, v1.UpdateOptions{})
if err != nil {
return errors.Wrap(err, " update failed")
}
logger.Debugf("[ServiceMapping] register update success, revision:%s", snp.ResourceVersion)
return nil
}