Dawn's Blogs

分享技术 记录成长

0%

阅读dubbo-kubernetes的启发 (5) 控制平面概述

控制平面功能

在 dubbo-kubernetes 中,控制平面称为 dubbo-cp,功能包括以下几点:

  • 安全性:包括证书管理和 CA 证书发布。
  • 动态准入控制服务器 webhook:在生成 Dubbo 应用时,用于注入 Pod。
  • DDS:与 Envoy 中的 xDS 一样提供各种资源的服务发现,这里称为 DDS(Dubbo DS),监听的资源包括 AuthenticationPolicy、AuthorizationPolicy、ServiceNameMapping、ConditionRoute、DynamicConfig、TagRoute 这些 CRD。主要的工作有两点:
    • 接受 Dubbo 应用的 observe RPC 请求,以长连接的方式监听资源。
    • 利用 Kubernetes 的 Informer 机制,监听各种资源的变化,并且将资源的变更推送到 Dubbo 应用中。
  • SNP(Service Name Mapping):在 Dubbo 应用启动时,会向控制平面发起 registerServiceAppMapping RPC 请求,注册接口名称与应用名称之间的映射,控制平面将接口名到应用名称的映射记录到 SNP CRD,DDS 监听到 SNP 资源变更后会推送至各个 Dubbo 应用中,Dubbo 应用根据 SNP 进行流量控制。
  • Admin:面向控制面管理人员,提供 HTTP 接口。

在 Dubbo 生态中,dubbo-cp 作为控制平面,而 Dubbo 应用作为数据平面。

多组件

dubbo-cp 通过多个组件聚合的方式完成其在控制平面的所有功能,它包括多个组件,如证书管理、CA、Webhook、DDS、SNP 等组件。dubbo-kubernetes 管理组件的方式就是去利用组件管理器 Manager 接口

组件管理器

接口定义

组件管理器接口定义如下:包括添加组建和开始运行组件。

1
2
3
4
5
6
7
8
9
type Manager interface {
// Add registers a component, i.e. gRPC Server, HTTP server, reconciliation loop.
Add(...Component) error

// Start starts registered components and blocks until the Stop channel is closed.
// Returns an error if there is an error starting any component.
// If there are any GracefulComponent, it waits until all components are done.
Start(<-chan struct{}) error
}

Manager 接口的实现如下,包括一个负责领导选举的 LeaderElector。

1
2
3
4
5
6
7
8
9
type manager struct {
leaderElector LeaderElector

sync.Mutex // protects access to fields below
components []Component
started bool
stopCh <-chan struct{}
errCh chan error
}

Add 添加组件方法这里不再说明,下面重点关注运行组件管理器的 Start 方法。

Start 方法

Start 方法非常简单,就是首先运行非领导选举组件 startNonLeaderComponents,再运行领导选组组件 startLeaderComponents。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (cm *manager) Start(stop <-chan struct{}) error {
errCh := make(chan error)

cm.Lock()
cm.startNonLeaderComponents(stop, errCh)
cm.started = true
cm.stopCh = stop
cm.errCh = errCh
cm.Unlock()
// this has to be called outside of lock because it can be leader at any time, and it locks in leader callbacks.
cm.startLeaderComponents(stop, errCh)

defer cm.waitForDone()
select {
case <-stop:
return nil
case err := <-errCh:
return err
}
}

运行非领导选举组件 startNonLeaderComponents 就是挑选不需要领导选举的组件依次运行,而运行领导选组组件 startLeaderComponents 方法则需要开始领导选举,并且添加 LeaderCallbacks,在选举成为 leader 时运行所有需要领导选举的组件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (cm *manager) startLeaderComponents(stop <-chan struct{}, errCh chan error) {
// leader stop channel needs to be stored in atomic because it will be written by leader elector goroutine
// and read by the last goroutine in this function.
// we need separate channel for leader components because they can be restarted
mutex := sync.Mutex{}
leaderStopCh := make(chan struct{})
closeLeaderCh := func() {
mutex.Lock()
defer mutex.Unlock()
if !channels.IsClosed(leaderStopCh) {
close(leaderStopCh)
}
}

cm.leaderElector.AddCallbacks(LeaderCallbacks{
OnStartedLeading: func() {
logger.Sugar().Info("leader acquired")
mutex.Lock()
defer mutex.Unlock()
leaderStopCh = make(chan struct{})

cm.Lock()
defer cm.Unlock()
for _, component := range cm.components {
if component.NeedLeaderElection() {
go func(c Component) {
if err := c.Start(leaderStopCh); err != nil {
errCh <- err
}
}(component)
}
}
},
OnStoppedLeading: func() {
logger.Sugar().Info("leader lost")
closeLeaderCh()
},
})
go cm.leaderElector.Start(stop)
go func() {
<-stop
closeLeaderCh()
}()
}

组件

在 dubbo-cp 中的组件定义如下,组件需要区分是否需要领导选举

  • 对于不需要领导选举的组件,多个 dubbo-cp 副本中都可以运行。
  • 对于需要领导选举的组件,多个 dubbo-cp 副本之间会进行领导选举,只有被选举为 leader 的副本才可以运行这些组件
1
2
3
4
5
6
7
8
9
10
// Component defines a process that will be run in the application
// Component should be designed in such a way that it can be stopped by stop channel and started again (for example when instance is reelected for a leader).
type Component interface {
// Start blocks until the channel is closed or an error occurs.
// The component will stop running when the channel is closed.
Start(<-chan struct{}) error

// NeedLeaderElection indicates if component should be run only by one instance of Control Plane even with many Control Plane replicas.
NeedLeaderElection() bool
}

领导选举

接口定义

领导选举接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// LeaderCallbacks defines callbacks for events from LeaderElector
// It is guaranteed that each methods will be executed from the same goroutine, so only one method can be run at once.
type LeaderCallbacks struct {
OnStartedLeading func()
OnStoppedLeading func()
}

type LeaderElector interface {
AddCallbacks(LeaderCallbacks)
// IsLeader should be used for diagnostic reasons (metrics/API info), because there may not be any leader elector for a short period of time.
// Use Callbacks to write logic to execute when Leader is elected.
IsLeader() bool

// Start blocks until the channel is closed or an error occurs.
Start(stop <-chan struct{})
}

Kubernetes 领导选举

在 k8s 环境下,kubernetes SDK 的 leaderelection 利用 ConfigMap 实现分布式锁,进而实现领导选举。

1
2
3
4
5
6
7
8
9
10
11
12
13
type KubeLeaderElection struct {
leader int32
namespace string
name string
callbacks []component.LeaderCallbacks
client kubernetes.Interface
ttl time.Duration

// Records which "cycle" the election is on. This is incremented each time an election is won and then lost
// This is mostly just for testing
cycle *atomic.Int32
electionID string
}

Start 开始选举

选举 leader 的流程就是一个循环,循环体就是使用 create 方法创建 leaderelection.LeaderElector 并进行选举(Run)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Start will start leader election, calling all runFns when we become the leader.
func (l *KubeLeaderElection) Start(stop <-chan struct{}) {
logger.Sugar().Info("starting Leader Elector")
for {
le, err := l.create()
if err != nil {
// This should never happen; errors are only from invalid input and the input is not user modifiable
panic("KubeLeaderElection creation failed: " + err.Error())
}
l.cycle.Inc()
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-stop
cancel()
}()
le.Run(ctx)
select {
case <-stop:
// We were told to stop explicitly. Exit now
return
default:
cancel()
// Otherwise, we may have lost our lock. In practice, this is extremely rare; we need to have the lock, then lose it
// Typically this means something went wrong, such as API server downtime, etc
// If this does happen, we will start the cycle over again
logger.Sugar().Errorf("Leader election cycle %v lost. Trying again", l.cycle.Load())
}
}
}

func (l *KubeLeaderElection) create() (*leaderelection.LeaderElector, error) {
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
l.setLeader(true)
for _, f := range l.callbacks {
if f.OnStartedLeading != nil {
go f.OnStartedLeading()
}
}
},
OnStoppedLeading: func() {
logger.Sugar().Infof("leader election lock lost: %v", l.electionID)
l.setLeader(false)
for _, f := range l.callbacks {
if f.OnStoppedLeading != nil {
go f.OnStoppedLeading()
}
}
},
}
lock, err := resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock,
l.namespace,
l.electionID,
l.client.CoreV1(),
l.client.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: l.name,
},
)
if err != nil {
return nil, err
}
return leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: l.ttl,
RenewDeadline: l.ttl / 2,
RetryPeriod: l.ttl / 4,
Callbacks: callbacks,
// When exits, the lease will be dropped. This is more likely to lead to a case where
// to instances are both considered the leaders. As such, if this is intended to be use for mission-critical
// usages (rather than avoiding duplication of work), this may need to be re-evaluated.
ReleaseOnCancel: true,
})
}