Dawn's Blogs

分享技术 记录成长

0%

阅读dubbo-kubernetes的启发 (8) 控制平面之安全性

以下是 CA 安全性的概述,分为两个部分:

  • 控制面:实现了一个 CA 负责为网格中的各个服务签发证书,并将证书颁发给各个服务,具体的做法有两个:

    • 通过(webhook)动态准入控制器的方式,将 CA 的证书挂载到 Pod 里面去,同时添加了环境变量,只需要读取环境变量就可以获取路径了。
    • Dubbo 应用通过与 CA 建立双向 TLS 连接(CA 证书已经被挂载到了 Pod 中,所以可以验证 CA 证书),发送 CSR 请求申请这个 dubbo 应用的证书,dubbo-cp 会验证客户端的 CSR 请求并签发证书返回给客户端。
  • 数据面:在网格中服务相互之前发起通信的时候,dubbo sdk 会拦截服务请求,采用证书进行双向 TLS 认证并建立一个 TLS 连接,使用该 TLS 连接来在网格中传输数据。

1703656322

在 dubbo-cp 中,要实现一个 CA 要做两方面的事情:

  1. 维护 CA:维护 CA 的证书状态,存储 CA 证书。
  2. 签发证书:接受 CSR 请求,为 Dubbo 应用签发证书。

维护 CA

dubbo-cp 中的 CA 中心结构包含两个关键字段:

  • authorityCert:相当于 CA 认证证书,CA 服务器证书、客户端的证书都是这个证书签发的。
  • serverCerts:CA 服务器的证书,在客户端与控制面的连接中,dubbo-cp 使用的证书就是这个证书,用于响应 CSR 请求(以及其他如 snp,dds 等接口),客户端可能使用ip地址、localhost、域名等方式访问控制面,不同名称的访问返回不同的服务器证书。

以下是 dubbo-cp 初始化 gRPC 服务器的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func NewGrpcServer(s *provider.CertStorage, config *dubbo_cp.Config) GrpcServer {
srv := GrpcServer{
PlainServerPort: config.GrpcServer.PlainServerPort,
SecureServerPort: config.GrpcServer.SecureServerPort,
}
pool := x509.NewCertPool()
tlsConfig := &tls.Config{
GetCertificate: func(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
for _, cert := range s.GetTrustedCerts() {
pool.AddCert(cert.Cert)
}
return s.GetServerCert(info.ServerName), nil
},
ClientCAs: pool,
ClientAuth: tls.VerifyClientCertIfGiven,
}

srv.PlainServer = grpc.NewServer()
reflection.Register(srv.PlainServer)

srv.SecureServer = grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))
reflection.Register(srv.SecureServer)
return srv
}

刷新 CA 服务器证书

dubbo-cp 会定期的检查 CA 服务器证书是否失效,并刷新 CA 服务器证书:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (s *CertStorage) RefreshServerCert(stop <-chan struct{}) {
interval := math.Max(math.Min(math.Floor(float64(s.config.Security.CertValidity)/100), 10_000), 1)
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-stop:
return
case <-ticker.C:
func() {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.authorityCert == nil || !s.authorityCert.IsValid() {
// ignore if authority cert is invalid
return
}
if s.serverCerts == nil || !s.serverCerts.IsValid() {
logger.Sugar().Infof("[Authority] Server cert is invalid, refresh it.")
s.serverCerts = SignServerCert(s.authorityCert, s.serverNames, s.config.Security.CertValidity)
}
}()
}
}
}

刷新 CA 认证证书

在 CA 启动时,除了刷新 CA 服务器证书,还需要刷新 CA 认证证书。

因为 dubbo-cp 可能启动多个实例,所以对于 CA 认证证书的刷新只能有一个实例进行(通过 Kubernetes 的选举功能实现只在一个实例上进行)。主要需要做的是:

  • 更新 CA 认证证书。
  • 更新 Webhook 动态准入控制器的证书。
  • 更新 CA 证书在 ConfigMap 上的公钥信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (s *CertStorage) Start(stop <-chan struct{}) error {
go s.RefreshServerCert(stop)
go func(stop <-chan struct{}) {
ticker := time.NewTicker(calculateInterval(s.config.Security.CaValidity))
defer ticker.Stop()

for {
select {
case <-stop:
return
case <-ticker.C:
if s.GetAuthorityCert().NeedRefresh() {
logger.Sugar().Infof("[Authority] Authority cert is invalid, refresh it.")
// TODO lock if multi cp-server
// TODO refresh signed cert

err := NewleaderElection().Election(s, s.config, s.certClient.GetKubClient())
if err != nil {
logger.Sugar().Error("[Authority] Leader Election failed")
}
if s.config.KubeConfig.IsKubernetesConnected {
s.certClient.UpdateAuthorityCert(s.GetAuthorityCert().CertPem, EncodePrivateKey(s.GetAuthorityCert().PrivateKey), s.config.KubeConfig.Namespace)
s.webhookClient.UpdateWebhookConfig(s.config, s.GetAuthorityCert().CertPem)
if s.certClient.UpdateAuthorityPublicKey(s.GetAuthorityCert().CertPem) {
logger.Sugar().Infof("[Authority] Write ca to config maps success.")
} else {
logger.Sugar().Warnf("[Authority] Write ca to config maps failed.")
}
}
}
}
}
}(stop)
return nil
}

签发证书

客户端发起 CSR 请求,CA 为客户端签发证书的接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
message IdentityRequest {
string csr = 1;
string type = 2;

google.protobuf.Struct metadata = 3;
}

message IdentityResponse {
bool success = 1;
string cert_pem = 2;
repeated string trust_certs = 3;
string token = 4;
repeated string trusted_token_public_keys = 5;
int64 refresh_time = 6;
int64 expire_time = 7;
string message = 8;
}

service AuthorityService {
rpc CreateIdentity(IdentityRequest)
returns (IdentityResponse) {
}
}

接口实现

接口实现如下,在验证之后,会返回为客户端签发的证书。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (s *AuthorityService) CreateIdentity(
c context.Context,
req *ca.IdentityRequest,
) (*ca.IdentityResponse, error) {
if req.Csr == "" {
return &ca.IdentityResponse{
Success: false,
Message: "CSR is empty.",
}, nil
}

csr, err := cert.LoadCSR(req.Csr)
if csr == nil || err != nil {
return &ca.IdentityResponse{
Success: false,
Message: "Decode csr failed.",
}, nil
}

p, _ := peer.FromContext(c)
endpoint, err := endpoint.ExactEndpoint(c, s.CertStorage, s.Options, s.CertClient)
if err != nil {
logger.Sugar().Warnf("[Authority] Failed to exact endpoint from context: %v. RemoteAddr: %s", err, p.Addr.String())

return &ca.IdentityResponse{
Success: false,
Message: err.Error(),
}, nil
}

certPem, err := cert.SignFromCSR(csr, endpoint, s.CertStorage.GetAuthorityCert(), s.Options.Security.CertValidity)
if err != nil {
logger.Sugar().Warnf("[Authority] Failed to sign certificate from csr: %v. RemoteAddr: %s", err, p.Addr.String())

return &ca.IdentityResponse{
Success: false,
Message: err.Error(),
}, nil
}

logger.Sugar().Infof("[Authority] Success to sign certificate from csr. RemoteAddr: %s", p.Addr.String())

token, err := jwt.NewClaims(endpoint.SpiffeID, endpoint.ToString(), endpoint.ID, s.Options.Security.CertValidity).Sign(s.CertStorage.GetAuthorityCert().PrivateKey)
if err != nil {
logger.Sugar().Warnf("[Authority] Failed to sign jwt token: %v. RemoteAddr: %s", err, p.Addr.String())

return &ca.IdentityResponse{
Success: false,
Message: err.Error(),
}, nil
}

var trustedCerts []string
var trustedTokenPublicKeys []string
for _, c := range s.CertStorage.GetTrustedCerts() {
trustedCerts = append(trustedCerts, c.CertPem)
trustedTokenPublicKeys = append(trustedTokenPublicKeys, cert.EncodePublicKey(&c.PrivateKey.PublicKey))
}
return &ca.IdentityResponse{
Success: true,
Message: "OK",
CertPem: certPem,
TrustCerts: trustedCerts,
Token: token,
TrustedTokenPublicKeys: trustedTokenPublicKeys,
RefreshTime: time.Now().UnixMilli() + (s.Options.Security.CertValidity / 2),
ExpireTime: time.Now().UnixMilli() + s.Options.Security.CertValidity,
}, nil
}