SNP(Service Name Mapping):在 Dubbo 应用启动时,会向控制平面发起 registerServiceAppMapping RPC 请求,注册接口名称与应用名称之间的映射,控制平面将接口名到应用名称的映射记录到 SNP CRD,DDS 监听到 SNP 资源变更后会推送至各个 Dubbo 应用中,Dubbo 应用根据 SNP 进行流量控制。
type Manager interface { // Add registers a component, i.e. gRPC Server, HTTP server, reconciliation loop. Add(...Component) error
// Start starts registered components and blocks until the Stop channel is closed. // Returns an error if there is an error starting any component. // If there are any GracefulComponent, it waits until all components are done. Start(<-chanstruct{}) error }
Manager 接口的实现如下,包括一个负责领导选举的 LeaderElector。
1 2 3 4 5 6 7 8 9
type manager struct { leaderElector LeaderElector
sync.Mutex // protects access to fields below components []Component started bool stopCh <-chanstruct{} errCh chan error }
cm.Lock() cm.startNonLeaderComponents(stop, errCh) cm.started = true cm.stopCh = stop cm.errCh = errCh cm.Unlock() // this has to be called outside of lock because it can be leader at any time, and it locks in leader callbacks. cm.startLeaderComponents(stop, errCh)
defer cm.waitForDone() select { case <-stop: returnnil case err := <-errCh: return err } }
func(cm *manager)startLeaderComponents(stop <-chanstruct{}, errCh chan error) { // leader stop channel needs to be stored in atomic because it will be written by leader elector goroutine // and read by the last goroutine in this function. // we need separate channel for leader components because they can be restarted mutex := sync.Mutex{} leaderStopCh := make(chanstruct{}) closeLeaderCh := func() { mutex.Lock() defer mutex.Unlock() if !channels.IsClosed(leaderStopCh) { close(leaderStopCh) } }
// Component defines a process that will be run in the application // Component should be designed in such a way that it can be stopped by stop channel and started again (for example when instance is reelected for a leader). type Component interface { // Start blocks until the channel is closed or an error occurs. // The component will stop running when the channel is closed. Start(<-chanstruct{}) error
// NeedLeaderElection indicates if component should be run only by one instance of Control Plane even with many Control Plane replicas. NeedLeaderElection() bool }
领导选举
接口定义
领导选举接口定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// LeaderCallbacks defines callbacks for events from LeaderElector // It is guaranteed that each methods will be executed from the same goroutine, so only one method can be run at once. type LeaderCallbacks struct { OnStartedLeading func() OnStoppedLeading func() }
type LeaderElector interface { AddCallbacks(LeaderCallbacks) // IsLeader should be used for diagnostic reasons (metrics/API info), because there may not be any leader elector for a short period of time. // Use Callbacks to write logic to execute when Leader is elected. IsLeader() bool
// Start blocks until the channel is closed or an error occurs. Start(stop <-chanstruct{}) }
type KubeLeaderElection struct { leader int32 namespace string name string callbacks []component.LeaderCallbacks client kubernetes.Interface ttl time.Duration
// Records which "cycle" the election is on. This is incremented each time an election is won and then lost // This is mostly just for testing cycle *atomic.Int32 electionID string }
// Start will start leader election, calling all runFns when we become the leader. func(l *KubeLeaderElection)Start(stop <-chanstruct{}) { logger.Sugar().Info("starting Leader Elector") for { le, err := l.create() if err != nil { // This should never happen; errors are only from invalid input and the input is not user modifiable panic("KubeLeaderElection creation failed: " + err.Error()) } l.cycle.Inc() ctx, cancel := context.WithCancel(context.Background()) gofunc() { <-stop cancel() }() le.Run(ctx) select { case <-stop: // We were told to stop explicitly. Exit now return default: cancel() // Otherwise, we may have lost our lock. In practice, this is extremely rare; we need to have the lock, then lose it // Typically this means something went wrong, such as API server downtime, etc // If this does happen, we will start the cycle over again logger.Sugar().Errorf("Leader election cycle %v lost. Trying again", l.cycle.Load()) } } }
func(l *KubeLeaderElection)create()(*leaderelection.LeaderElector, error) { callbacks := leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { l.setLeader(true) for _, f := range l.callbacks { if f.OnStartedLeading != nil { go f.OnStartedLeading() } } }, OnStoppedLeading: func() { logger.Sugar().Infof("leader election lock lost: %v", l.electionID) l.setLeader(false) for _, f := range l.callbacks { if f.OnStoppedLeading != nil { go f.OnStoppedLeading() } } }, } lock, err := resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock, l.namespace, l.electionID, l.client.CoreV1(), l.client.CoordinationV1(), resourcelock.ResourceLockConfig{ Identity: l.name, }, ) if err != nil { returnnil, err } return leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: lock, LeaseDuration: l.ttl, RenewDeadline: l.ttl / 2, RetryPeriod: l.ttl / 4, Callbacks: callbacks, // When exits, the lease will be dropped. This is more likely to lead to a case where // to instances are both considered the leaders. As such, if this is intended to be use for mission-critical // usages (rather than avoiding duplication of work), this may need to be re-evaluated. ReleaseOnCancel: true, }) }