Dawn's Blogs

分享技术 记录成长

0%

阅读dubbo-kubernetes的启发 (7) 控制平面之dds

类似于 istio 中的 xDS,在 dubbo-kubernetes 中 dds 组件做了一些对于 dubbo 规则的监听与分发。它的作用是接受各种 dubbo 规则(留了管控、认证、授权),将规则下发至各个 dubbo 应用,完成具有 dubbo 特色的服务治理。即作为控制面,将规则下发到数据面进行应用。

image-20231225215623017

对于 dds 而言,当一个客户端连接到 dubbo-cp dds,dubbo 规则的推送主要来自两个方面:

  1. Informer 机制监听到 CRD 资源发生变化,主动推送消息给客户端。
  2. 客户端发生 Request 请求,dds 服务器进行响应

dds 接口定义如下,因为推送的机制,所以定义为双向 streaming:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
message ObserveRequest {
string nonce = 1;
string type = 2;
}

message ObserveResponse {
string nonce = 1;
string type = 2;
int64 revision = 3;
repeated google.protobuf.Any data = 4;
}

service RuleService {
rpc Observe(stream ObserveRequest)
returns (stream ObserveResponse) {
}
}

规则监听

对于规则的监听,采用 Kubernetes 的 Informer 机制。Kubernetes Informer 机制本质上就是一个 Kubernetes 资源缓存,只依赖于 K8S 的 List 和 Watch API,只有初始化时使用 List,之后利用 Watch 更新缓存资源。

当 Informer 监听到配置信息发生变化(新增,更新,删除)后,通过 Event 响应配置信息变化事件。

对于每一种 CRD,都注册了 EventHandler,在资源更新时对应资源的。

1
2
3
4
5
6
schemas := collections.Rule.All()
for _, schema := range schemas {
cache.RegisterEventHandler(schema.Resource().GroupVersionKind(), crdclient.EventHandler{
Resource: crdclient.NewHandler(ddsServer.Storage, rt.Config().KubeConfig.Namespace, cache),
})
}

cacheHandler

在 dds 中,针对于 Informer 资源的变化,Informer 会回调 cacheHandler 而不是 EventHandler(一种 CRD 对应于一个 cacheHandler,一个 cacheHandler 维护了一组 EventHandler,当收到资源发生变化时,会依次调用所有的 EventHandler 来响应资源的变化。)。

1
2
3
4
5
6
7
8
9
// cacheHandler abstracts the logic of an informer with a set of handlers. Handlers can be added at runtime
// and will be invoked on each informer event.
type cacheHandler struct {
client *Client
informer cache.SharedIndexInformer
schema collection.Schema
handlers []EventHandler
lister func(namespace string) cache.GenericNamespaceLister
}

注册 Informer 回调

为对应的 Informer 注册回调如下,当 Informer 通知变化时,会将回调函数推送至一个队列中等待执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func createCacheHandler(cl *Client, schema collection.Schema, i informers.GenericInformer) *cacheHandler {
h := &cacheHandler{
client: cl,
informer: i.Informer(),
schema: schema,
}
h.lister = func(namespace string) cache.GenericNamespaceLister {
if schema.Resource().IsClusterScoped() {
return i.Lister()
}
return i.Lister().ByNamespace(namespace)
}
_, err := i.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cl.queue.Push(func() error {
return h.onEvent(obj)
})
},
UpdateFunc: func(oldObj, newObj interface{}) {
if reflect.DeepEqual(oldObj, newObj) {
return
}
cl.queue.Push(func() error {
return h.onEvent(newObj)
})
},
DeleteFunc: func(obj interface{}) {
cl.queue.Push(func() error {
return h.onEvent(obj)
})
},
})
if err != nil {
return nil
}
return h
}

Informer 回调

而回调函数如下,它会依次调用 cacheHandler 维护的所有 EventHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (h *cacheHandler) onEvent(curr interface{}) error {
if err := h.client.checkReadyForEvents(curr); err != nil {
return err
}

for _, f := range h.handlers {
err := f.Resource.NotifyWithIndex(h.schema)
if err != nil {
return err
}
}
return nil
}

EventHandler

在 Informer 监测到发生变化时,cacheHandler 会将回调函数推送进入回调函数队列,等待调用 EventHandler.Resource.NotifyWithIndex。

1
2
3
4
5
6
7
type Handler interface {
NotifyWithIndex(schema collection.Schema) error
}

type EventHandler struct {
Resource Handler
}

NotifyWithIndex

在 NotifyWithIndex 方法中,进行的工作就是:

  1. 生成推送的消息,对于每一次推送,对于某一种 CRD 都进行全量的推送。
  2. 增加 revision,用于标记推送消息的进度。
  3. 记录最近的推送,当客户端主动请求推送时,会将这个推送立即推送给客户端。否则,一起等待下一次的推送。
  4. 对每一个连接的客户端,都进行一次推送(送入客户端连接的 RawRuleQueue 中,等待客户端连接的发送)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type PushContext struct {
rootNamespace string
mutex *sync.Mutex
revision map[string]int64
storage *storage.Storage
cache ConfigStoreCache
}

func (p *PushContext) NotifyWithIndex(schema collection.Schema) error {
gvk := schema.Resource().GroupVersionKind()
configs, err := p.cache.List(gvk, NamespaceAll)
data := make([]model.Config, 0)

// 。。。。省略

p.mutex.Lock()
p.revision[gvk.String()]++ // revision增加
p.mutex.Unlock()

// 生成推送消息
origin := &storage.OriginImpl{
Gvk: gvk.String(),
Rev: p.revision[gvk.String()],
Data: data,
}

p.storage.Mutex.Lock()
defer p.storage.Mutex.Unlock()
// 记录最近的推送,当客户端主动请求推送时,会将这个推送立即推送给客户端。
// 否则,一起等待下一次的推送
p.storage.LatestRules[gvk.String()] = origin
// 发送
for _, c := range p.storage.Connection {
c.RawRuleQueue.Add(origin)
}
return nil
}

规则下发

在客户端连接到 dds 后,会专门开启一个连接,专门用于推送规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (s *DdsServer) Observe(stream dds.RuleService_ObserveServer) error {
c := &GrpcEndpointConnection{
stream: stream,
stopChan: make(chan struct{}),
sendTimeout: s.Config.Dds.SendTimeout,
}

p, ok := peer.FromContext(stream.Context())
if !ok {
logger.Sugar().Errorf("[DDS] failed to get peer from context")

return fmt.Errorf("failed to get peer from context")
}

endpoints, err := endpoint2.ExactEndpoint(stream.Context(), s.CertStorage, s.Config, s.CertClient)
if err != nil {
logger.Sugar().Errorf("[DDS] failed to get endpoint from context: %v. RemoteAddr: %s", err, p.Addr)

return err
}
c.endpoint = endpoints
logger.Sugar().Infof("[DDS] New observe storage from %s", endpoints)
s.Storage.Connected(endpoints, c)

<-c.stopChan
return nil
}

推送规则的触发来自于两个方面:

  1. Informer 机制监听到 CRD 资源发生变化,主动推送消息给客户端。
  2. 客户端发生 Request 请求,dds 服务器进行响应

所以连接开启了两个协程,专门用于处理上述两种情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (s *Storage) Connected(endpoint *endpoint.Endpoint, connection EndpointConnection) {
s.Mutex.Lock()
defer s.Mutex.Unlock()
c := &Connection{
mutex: &sync.RWMutex{},
status: Connected,
EndpointConnection: connection,
Endpoint: endpoint,
TypeListened: map[string]bool{},
RawRuleQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "raw-dds"),
ExpectedRules: map[string]*VersionedRule{},
ClientRules: map[string]*ClientStatus{},
blockedPushedMutex: &sync.RWMutex{},
Generator: s.Generators,
}

s.Connection = append(s.Connection, c)

go s.listenConnection(c)
go c.listenRule()
}

listenRule

listenRule 方法用于处理 Informer 监听到资源变化的情况,它不断的从 RawRuleQueue 中读取信息,并调用 handleRule 方法进行实际的推送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (c *Connection) listenRule() {
for {
obj, shutdown := c.RawRuleQueue.Get()
if shutdown {
return
}

func(obj interface{}) {
defer c.RawRuleQueue.Done(obj)

var key Origin

var ok bool

if key, ok = obj.(Origin); !ok {
logger.Sugar().Errorf("[DDS] expected dds.Origin in workqueue but got %#v", obj)

return
}

if err := c.handleRule(key); err != nil {
logger.Sugar().Errorf("[DDS] error syncing '%s': %s", key, err.Error())

return
}

logger.Sugar().Infof("[DDS] Successfully synced '%s'", key)
}(obj)
}
}

handleRule

handleRule 进行实际的发送推送操作:

  1. 首先根据 generator,根据 CRD 生成推送信息
  2. 在每一种资源变更时,会产生一个 revision,dds 会拒绝推送当前客户端对应资源的 revision 大于等于资源 revision 的消息(表明客户端已经最新)。
  3. 生成 Nonce 以便下一次客户端发送请求,携带 revision,生成响应信息,并发送推送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (c *Connection) handleRule(rawRule Origin) error {
targetRule, err := rawRule.Exact(c.Generator, c.Endpoint)
if err != nil {
return err
}

if _, ok := c.TypeListened[targetRule.Type]; !ok {
return nil
}

cr := c.ClientRules[targetRule.Type]

// TODO how to improve this one
for cr.PushingStatus == Pushing {
cr.PushQueued = true
time.Sleep(1 * time.Second)
logger.Sugar().Infof("[DDS] Client %s %s rule is pushing, wait for 1 second", c.Endpoint.Ips, targetRule.Type)
}
cr.PushQueued = false

if cr.ClientVersion.Data != nil &&
(reflect.DeepEqual(cr.ClientVersion.Data, targetRule.Data) || cr.ClientVersion.Revision >= targetRule.Revision) {
logger.Sugar().Infof("[DDS] Client %s %s dds is up to date", c.Endpoint.Ips, targetRule.Type)
return nil
}
newVersion := atomic.AddInt64(&cr.NonceInc, 1)
r := &dds.ObserveResponse{
Nonce: strconv.FormatInt(newVersion, 10),
Type: targetRule.Type,
Revision: targetRule.Revision,
Data: targetRule.Data,
}

logger.Sugar().Infof("[DDS] Receive new version dds. Client %s %s dds is pushing.", c.Endpoint.Ips, targetRule.Type)

return c.EndpointConnection.Send(targetRule, cr, r)
}

listenConnection

listenConnection 方法用于处理客户端主动发起请求的情况,在收到客户端的请求后会读取针对某一 CRD 的最近一次推送,如果有则推入 RawRuleQueue 队列等待发送,否则等待下一次一起进行推送。

1
2
3
4
latestRule := s.LatestRules[req.Type]
if latestRule != nil {
c.RawRuleQueue.Add(latestRule)
}

状态维护

dds 需要维护资源更新的状态,包括每一种资源自身的状态和客户端的状态。

  • 每一个 CRD 资源都有一个 revision 版本号,当 CRD 发生变化时会自增一次版本号,这是资源自身的版本状态。
1
2
3
4
5
6
7
type PushContext struct {
rootNamespace string
mutex *sync.Mutex
revision map[string]int64
storage *storage.Storage
cache ConfigStoreCache
}
  • 对于每一个客户端连接,每一个 CRD 都会维护一个客户端状态,其中就存储了向客户端推送的版本号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Connection struct {
Generator map[string]DdsResourceGenerator
mutex *sync.RWMutex
status ConnectionStatus
EndpointConnection EndpointConnection
Endpoint *endpoint.Endpoint

TypeListened map[string]bool

RawRuleQueue workqueue.RateLimitingInterface
ExpectedRules map[string]*VersionedRule
ClientRules map[string]*ClientStatus // 客户端状态

blockedPushedMutex *sync.RWMutex
}

type ClientStatus struct {
sync.RWMutex
PushQueued bool
PushingStatus PushingStatus

NonceInc int64

ClientVersion *VersionedRule

LastPushedTime int64
LastPushedVersion *VersionedRule
LastPushNonce string
}

二者组合在一起,可以知晓客户端是否处于最新的状态,控制 dds 推送消息的发送。