类似于 istio 中的 xDS,在 dubbo-kubernetes 中 dds 组件做了一些对于 dubbo 规则的监听与分发 。它的作用 是接受各种 dubbo 规则(留了管控、认证、授权),将规则下发至各个 dubbo 应用,完成具有 dubbo 特色的服务治理 。即作为控制面,将规则下发到数据面进行应用。
对于 dds 而言,当一个客户端连接到 dubbo-cp dds,dubbo 规则的推送主要来自两个方面:
Informer 机制监听到 CRD 资源发生变化,主动推送 消息给客户端。
客户端发生 Request 请求,dds 服务器进行响应 。
dds 接口定义如下,因为推送的机制,所以定义为双向 streaming:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 message ObserveRequest { string nonce = 1 ; string type = 2 ; } message ObserveResponse { string nonce = 1 ; string type = 2 ; int64 revision = 3 ; repeated google.protobuf.Any data = 4 ; } service RuleService { rpc Observe(stream ObserveRequest) returns (stream ObserveResponse) { } }
规则监听 对于规则的监听,采用 Kubernetes 的 Informer 机制。Kubernetes Informer 机制本质上就是一个 Kubernetes 资源缓存 ,只依赖于 K8S 的 List 和 Watch API,只有初始化时使用 List,之后利用 Watch 更新缓存资源。
当 Informer 监听到配置信息发生变化 (新增,更新,删除)后,通过 Event 响应配置信息变化事件。
对于每一种 CRD,都注册了 EventHandler,在资源更新时对应资源的。
1 2 3 4 5 6 schemas := collections.Rule.All() for _, schema := range schemas { cache.RegisterEventHandler(schema.Resource().GroupVersionKind(), crdclient.EventHandler{ Resource: crdclient.NewHandler(ddsServer.Storage, rt.Config().KubeConfig.Namespace, cache), }) }
cacheHandler 在 dds 中,针对于 Informer 资源的变化,Informer 会回调 cacheHandler 而不是 EventHandler(一种 CRD 对应于一个 cacheHandler,一个 cacheHandler 维护了一组 EventHandler ,当收到资源发生变化时,会依次调用所有的 EventHandler 来响应资源的变化。)。
1 2 3 4 5 6 7 8 9 type cacheHandler struct { client *Client informer cache.SharedIndexInformer schema collection.Schema handlers []EventHandler lister func (namespace string ) cache .GenericNamespaceLister }
为对应的 Informer 注册回调如下,当 Informer 通知变化时,会将回调函数推送至一个队列 中等待执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func createCacheHandler (cl *Client, schema collection.Schema, i informers.GenericInformer) *cacheHandler { h := &cacheHandler{ client: cl, informer: i.Informer(), schema: schema, } h.lister = func (namespace string ) cache .GenericNamespaceLister { if schema.Resource().IsClusterScoped() { return i.Lister() } return i.Lister().ByNamespace(namespace) } _, err := i.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func (obj interface {}) { cl.queue.Push(func () error { return h.onEvent(obj) }) }, UpdateFunc: func (oldObj, newObj interface {}) { if reflect.DeepEqual(oldObj, newObj) { return } cl.queue.Push(func () error { return h.onEvent(newObj) }) }, DeleteFunc: func (obj interface {}) { cl.queue.Push(func () error { return h.onEvent(obj) }) }, }) if err != nil { return nil } return h }
而回调函数如下,它会依次调用 cacheHandler 维护的所有 EventHandler:
1 2 3 4 5 6 7 8 9 10 11 12 13 func (h *cacheHandler) onEvent (curr interface {}) error { if err := h.client.checkReadyForEvents(curr); err != nil { return err } for _, f := range h.handlers { err := f.Resource.NotifyWithIndex(h.schema) if err != nil { return err } } return nil }
EventHandler 在 Informer 监测到发生变化时,cacheHandler 会将回调函数推送进入回调函数队列 ,等待调用 EventHandler.Resource.NotifyWithIndex。
1 2 3 4 5 6 7 type Handler interface { NotifyWithIndex(schema collection.Schema) error } type EventHandler struct { Resource Handler }
NotifyWithIndex 在 NotifyWithIndex 方法中,进行的工作就是:
生成推送的消息 ,对于每一次推送,对于某一种 CRD 都进行全量的推送。
增加 revision,用于标记推送消息的进度。
记录最近的推送 ,当客户端主动请求推送时,会将这个推送立即推送给客户端。否则,一起等待下一次的推送。
对每一个连接的客户端,都进行一次推送 (送入客户端连接的 RawRuleQueue 中,等待客户端连接的发送)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 type PushContext struct { rootNamespace string mutex *sync.Mutex revision map [string ]int64 storage *storage.Storage cache ConfigStoreCache } func (p *PushContext) NotifyWithIndex (schema collection.Schema) error { gvk := schema.Resource().GroupVersionKind() configs, err := p.cache.List(gvk, NamespaceAll) data := make ([]model.Config, 0 ) p.mutex.Lock() p.revision[gvk.String()]++ p.mutex.Unlock() origin := &storage.OriginImpl{ Gvk: gvk.String(), Rev: p.revision[gvk.String()], Data: data, } p.storage.Mutex.Lock() defer p.storage.Mutex.Unlock() p.storage.LatestRules[gvk.String()] = origin for _, c := range p.storage.Connection { c.RawRuleQueue.Add(origin) } return nil }
规则下发 在客户端连接到 dds 后,会专门开启一个连接 ,专门用于推送规则。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (s *DdsServer) Observe (stream dds.RuleService_ObserveServer) error { c := &GrpcEndpointConnection{ stream: stream, stopChan: make (chan struct {}), sendTimeout: s.Config.Dds.SendTimeout, } p, ok := peer.FromContext(stream.Context()) if !ok { logger.Sugar().Errorf("[DDS] failed to get peer from context" ) return fmt.Errorf("failed to get peer from context" ) } endpoints, err := endpoint2.ExactEndpoint(stream.Context(), s.CertStorage, s.Config, s.CertClient) if err != nil { logger.Sugar().Errorf("[DDS] failed to get endpoint from context: %v. RemoteAddr: %s" , err, p.Addr) return err } c.endpoint = endpoints logger.Sugar().Infof("[DDS] New observe storage from %s" , endpoints) s.Storage.Connected(endpoints, c) <-c.stopChan return nil }
推送规则的触发来自于两个方面:
Informer 机制监听到 CRD 资源发生变化,主动推送 消息给客户端。
客户端发生 Request 请求,dds 服务器进行响应 。
所以连接开启了两个协程 ,专门用于处理上述两种情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (s *Storage) Connected (endpoint *endpoint.Endpoint, connection EndpointConnection) { s.Mutex.Lock() defer s.Mutex.Unlock() c := &Connection{ mutex: &sync.RWMutex{}, status: Connected, EndpointConnection: connection, Endpoint: endpoint, TypeListened: map [string ]bool {}, RawRuleQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "raw-dds" ), ExpectedRules: map [string ]*VersionedRule{}, ClientRules: map [string ]*ClientStatus{}, blockedPushedMutex: &sync.RWMutex{}, Generator: s.Generators, } s.Connection = append (s.Connection, c) go s.listenConnection(c) go c.listenRule() }
listenRule listenRule 方法用于处理 Informer 监听到资源变化的情况,它不断的从 RawRuleQueue 中读取信息,并调用 handleRule 方法进行实际的推送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (c *Connection) listenRule () { for { obj, shutdown := c.RawRuleQueue.Get() if shutdown { return } func (obj interface {}) { defer c.RawRuleQueue.Done(obj) var key Origin var ok bool if key, ok = obj.(Origin); !ok { logger.Sugar().Errorf("[DDS] expected dds.Origin in workqueue but got %#v" , obj) return } if err := c.handleRule(key); err != nil { logger.Sugar().Errorf("[DDS] error syncing '%s': %s" , key, err.Error()) return } logger.Sugar().Infof("[DDS] Successfully synced '%s'" , key) }(obj) } }
handleRule handleRule 进行实际的发送推送操作:
首先根据 generator,根据 CRD 生成推送信息 。
在每一种资源变更时,会产生一个 revision,dds 会拒绝推送当前客户端对应资源的 revision 大于等于资源 revision 的消息 (表明客户端已经最新)。
生成 Nonce 以便下一次客户端发送请求,携带 revision,生成响应信息,并发送推送消息 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (c *Connection) handleRule (rawRule Origin) error { targetRule, err := rawRule.Exact(c.Generator, c.Endpoint) if err != nil { return err } if _, ok := c.TypeListened[targetRule.Type]; !ok { return nil } cr := c.ClientRules[targetRule.Type] for cr.PushingStatus == Pushing { cr.PushQueued = true time.Sleep(1 * time.Second) logger.Sugar().Infof("[DDS] Client %s %s rule is pushing, wait for 1 second" , c.Endpoint.Ips, targetRule.Type) } cr.PushQueued = false if cr.ClientVersion.Data != nil && (reflect.DeepEqual(cr.ClientVersion.Data, targetRule.Data) || cr.ClientVersion.Revision >= targetRule.Revision) { logger.Sugar().Infof("[DDS] Client %s %s dds is up to date" , c.Endpoint.Ips, targetRule.Type) return nil } newVersion := atomic.AddInt64(&cr.NonceInc, 1 ) r := &dds.ObserveResponse{ Nonce: strconv.FormatInt(newVersion, 10 ), Type: targetRule.Type, Revision: targetRule.Revision, Data: targetRule.Data, } logger.Sugar().Infof("[DDS] Receive new version dds. Client %s %s dds is pushing." , c.Endpoint.Ips, targetRule.Type) return c.EndpointConnection.Send(targetRule, cr, r) }
listenConnection listenConnection 方法用于处理客户端主动发起请求的情况,在收到客户端的请求后会读取针对某一 CRD 的最近一次推送,如果有则推入 RawRuleQueue 队列等待发送,否则等待下一次一起进行推送。
1 2 3 4 latestRule := s.LatestRules[req.Type] if latestRule != nil { c.RawRuleQueue.Add(latestRule) }
状态维护 dds 需要维护资源更新的状态,包括每一种资源自身的状态和客户端的状态。
每一个 CRD 资源都有一个 revision 版本号 ,当 CRD 发生变化时会自增 一次版本号,这是资源自身的版本状态。
1 2 3 4 5 6 7 type PushContext struct { rootNamespace string mutex *sync.Mutex revision map [string ]int64 storage *storage.Storage cache ConfigStoreCache }
对于每一个客户端连接 ,每一个 CRD 都会维护一个客户端状态 ,其中就存储了向客户端推送的版本号 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 type Connection struct { Generator map [string ]DdsResourceGenerator mutex *sync.RWMutex status ConnectionStatus EndpointConnection EndpointConnection Endpoint *endpoint.Endpoint TypeListened map [string ]bool RawRuleQueue workqueue.RateLimitingInterface ExpectedRules map [string ]*VersionedRule ClientRules map [string ]*ClientStatus blockedPushedMutex *sync.RWMutex } type ClientStatus struct { sync.RWMutex PushQueued bool PushingStatus PushingStatus NonceInc int64 ClientVersion *VersionedRule LastPushedTime int64 LastPushedVersion *VersionedRule LastPushNonce string }
二者组合在一起,可以知晓客户端是否处于最新的状态,控制 dds 推送消息的发送。