Dawn's Blogs

分享技术 记录成长

0%

Istio学习笔记 (8) Pilot xDS异步分发

Pilot 的 xDS 服务器功能由 DiscoveryServer 实现,由于 pilot 与 envoy 之间的 Grpc 连接是双向 stream,所以 pilot 可以主动向 envoy 推送更新,或者 envoy 也可以主动发起 xDS 请求。

在 DiscoveryServer 开始时,会启动几个协程,其中最重要的是:

  • handleUpdates:用于从 pushChannel 中取出推送消息,并进行 debounce 后送入 pushQueue
  • sendPushes:用于从 pushQueue 中 取出消息,生成 xDS 消息,并发送到客户端的 pushChannel 中
1
2
3
4
5
6
7
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
go s.WorkloadEntryController.Run(stopCh)
go s.handleUpdates(stopCh)
go s.periodicRefreshMetrics(stopCh)
go s.sendPushes(stopCh)
go s.Cache.Run(stopCh)
}

向 Envoy 推送更新

向 Envoy 推送更新的流程如下:

  • Config Controller 和 Service Controller 在配置或者服务发生变化时通过回调方法通知 Discovery Server,Discovery Server 将变化的消息推至 Push Channel 中
  • Discovery Server 通过一个 goroutine 从 Push Channel 中接收变化消息,将一段时间内连续发生的变化消息进行合并(debounce)。如果超过指定时间没有新的变化消息,则将合并后的消息加入到一个队列 Push Queue 中
  • 另一个 goroutine 从 Push Queue 中取出变化消息,生成 XdsEvent,发送到每个客户端连接的 Push Channel 中
  • (每一个 Grpc 客户端连接)在 DiscoveryServer.StreamAggregatedResources 方法中从 Push Channel 中取出 XdsEvent,然后根据上下文生成符合 xDS 接口规范的 DiscoveryResponse,通过 GRPC 推送给 Envoy
s

推送消息至 pushChannel

当服务或者配置发生变化后,会调用 DiscoveryServer.ConfigUpdate 方法,将推送请求推送至 pushChannel 中。

1
2
3
4
5
6
7
8
9
10
11
12
// ConfigUpdate implements ConfigUpdater interface, used to request pushes.
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
if len(model.ConfigsOfKind(req.ConfigsUpdated, kind.Address)) > 0 {
// This is a bit like clearing EDS cache on EndpointShard update. Because Address
// types are fetched dynamically, they are not part of the same protections, so we need to clear
// the cache.
s.Cache.ClearAll()
}
inboundConfigUpdates.Increment()
s.InboundUpdates.Inc()
s.pushChannel <- req
}

读取 pushChannel 进行 debounce

什么是 debounce

debounce 即防抖,作用就是合并一段时间内的 xDS 推送请求,防止 xDS 推送过多影响网络性能占用带宽。

debounce 有两个关键时间概念

  • 静默时间:静默时间就是,从上一个请求的时间开始,没有新的请求时间。
  • 最大延迟时间:最大延迟时间就是,从第一个被合并的请求开始,等待的最大时间。

当等待时间大于最大延迟时间,或者大于静默时间时,就结束一次 debounce 发送一个 xDS 请求。

debounce 逻辑

在 istio 中,debounce 的主要逻辑如下:

  • 当收到 freeCh 信号时,说明此时上一个 PushRequest 请求已经送入到了 pushQueue 中。所以,调用 pushWorker 尝试将 PushRequest 送入到 pushQueue 中,如果已经超过了最大延迟时间或者静默时间,则将 PushRequest 送入 pushQueue;否则设置静默时间的定时器

pushWorker 的作用就是检查是否已经超过了超过了最大延迟时间或者静默时间,如果符合则将 PushRequest 送入 pushQueue。否则重制静默时间定时器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
push := func(req *model.PushRequest, debouncedEvents int, startDebounce time.Time) {
pushFn(req)
updateSent.Add(int64(debouncedEvents))
debounceTime.Record(time.Since(startDebounce).Seconds())
freeCh <- struct{}{}
}

pushWorker := func() {
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
// it has been too long or quiet enough
if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {
if req != nil {
pushCounter++
if req.ConfigsUpdated == nil {
log.Infof("Push debounce stable[%d] %d for reason %s: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents, reasonsUpdated(req),
quietTime, eventDelay, req.Full)
} else {
log.Infof("Push debounce stable[%d] %d for config %s: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents, configsUpdated(req),
quietTime, eventDelay, req.Full)
}
free = false
go push(req, debouncedEvents, startDebounce)
req = nil
debouncedEvents = 0
}
} else {
// 重制静默时间定时器时间
timeChan = time.After(opts.debounceAfter - quietTime)
}
}
  • 当 ch 收到新的 PushRequest 推送请求时,调用 Merge 方法进行合并请求(debounce)。
  • 当收到 timeChan 信号时,说明静默时间已经到达, 调用 pushWorker 尝试将 PushRequest 送入到 pushQueue 中(或者期间有新的请求到达时,重制静默时间定时器)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
for {
select {
case <-freeCh:
free = true
pushWorker()
case r := <-ch:
// If reason is not set, record it as an unknown reason
if len(r.Reason) == 0 {
r.Reason = model.NewReasonStats(model.UnknownTrigger)
}
if !opts.enableEDSDebounce && !r.Full {
// trigger push now, just for EDS
go func(req *model.PushRequest) {
pushFn(req)
updateSent.Inc()
}(r)
continue
}

lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 {
timeChan = time.After(opts.debounceAfter)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++

req = req.Merge(r)
case <-timeChan:
if free {
pushWorker()
}
case <-stopCh:
return
}
}

推送至 pushQueue

Push 方法就是真正的将 PushRequest 推送至 pushQueue 的函数,首先会判断是否是全量推送:

  • 如果不是全量推送(EDS 更新不需要全量推送,只增量推送),获取了全局 PushContext 后就直接处理了。
  • 如果是全量推送,则会将全局 PushContext 附带在 PushRequest(用于生成 xDS 推送消息),并重置 PushContext。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Push is called to push changes on config updates using ADS.
func (s *DiscoveryServer) Push(req *model.PushRequest) {
if !req.Full {
req.Push = s.globalPushContext()
s.dropCacheForRequest(req)
s.AdsPushAll(req)
return
}
// Reset the status during the push.
oldPushContext := s.globalPushContext()
if oldPushContext != nil {
oldPushContext.OnConfigChange()
// Push the previous push Envoy metrics.
envoyfilter.RecordMetrics()
}
// PushContext is reset after a config change. Previous status is
// saved.
t0 := time.Now()
versionLocal := s.NextVersion()
push, err := s.initPushContext(req, oldPushContext, versionLocal)
if err != nil {
return
}
initContextTime := time.Since(t0)
log.Debugf("InitContext %v for push took %s", versionLocal, initContextTime)
pushContextInitTime.Record(initContextTime.Seconds())

req.Push = push
s.AdsPushAll(req)
}

AdsPushAll 将消息推送至 pushQueue

在 Push 方法中,最后调用了 AdsPushAll 方法向所有的客户端发送更新推送(即调用 pushQueue.Enqueue 方法,将 PushRequest 和对应的客户端入队 pushQueue)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// AdsPushAll will send updates to all nodes, for a full config or incremental EDS.
func (s *DiscoveryServer) AdsPushAll(req *model.PushRequest) {
if !req.Full {
log.Infof("XDS: Incremental Pushing ConnectedEndpoints:%d Version:%s",
s.adsClientCount(), req.Push.PushVersion)
} else {
totalService := len(req.Push.GetAllServices())
log.Infof("XDS: Pushing Services:%d ConnectedEndpoints:%d Version:%s",
totalService, s.adsClientCount(), req.Push.PushVersion)
monServices.Record(float64(totalService))

// Make sure the ConfigsUpdated map exists
if req.ConfigsUpdated == nil {
req.ConfigsUpdated = make(sets.Set[model.ConfigKey])
}
}

s.startPush(req)
}

// Send a signal to all connections, with a push event.
func (s *DiscoveryServer) startPush(req *model.PushRequest) {
// Push config changes, iterating over connected envoys.
if log.DebugEnabled() {
currentlyPending := s.pushQueue.Pending()
if currentlyPending != 0 {
log.Debugf("Starting new push while %v were still pending", currentlyPending)
}
}
req.Start = time.Now()
for _, p := range s.AllClients() {
s.pushQueue.Enqueue(p, req)
}
}

接收 pushQueue 将消息推送至客户端的 pushChannel

DiscoveryServer 会另外开启一个协程 sendPushes 用于接收 pushQueue 中的请求,并将推送请求发送至客户端的 pushChannel

其中:

  • semaphore 用于控制发送速率
  • 将推送消息推送至 client.pushChannel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
for {
select {
case <-stopCh:
return
default:
// We can send to it until it is full, then it will block until a pushes finishes and reads from it.
// This limits the number of pushes that can happen concurrently
semaphore <- struct{}{}

// Get the next proxy to push. This will block if there are no updates required.
client, push, shuttingdown := queue.Dequeue()
if shuttingdown {
return
}
recordPushTriggers(push.Reason)
// Signals that a push is done by reading from the semaphore, allowing another send on it.
doneFunc := func() {
queue.MarkDone(client)
<-semaphore
}

proxiesQueueTime.Record(time.Since(push.Start).Seconds())
var closed <-chan struct{}
if client.stream != nil {
closed = client.stream.Context().Done()
} else {
closed = client.deltaStream.Context().Done()
}
go func() {
pushEv := &Event{
pushRequest: push,
done: doneFunc,
}

select {
case client.pushChannel <- pushEv:
return
case <-closed: // grpc stream was closed
doneFunc()
log.Infof("Client closed connection %v", client.conID)
}
}()
}
}
}

将 xDS 消息推送至 Envoy

客户端在连接到 xDS Server 后,在 Handler 方法中会监听客户端连接的 pushChannel,并调用 pushConnection 方法利用 PushContext 生成 xDS 推送消息并发送给客户端

1
2
3
4
5
6
case pushEv := <-con.pushChannel:
err := s.pushConnection(con, pushEv)
pushEv.done()
if err != nil {
return err
}

pushConnection

在 pushConnection 方法中,对于每一种 xDS,都会调用 pushXds 推送给客户端。

1
2
3
4
5
6
7
8
// Send pushes to all generators
// Each Generator is responsible for determining if the push event requires a push
wrl, ignoreEvents := con.pushDetails()
for _, w := range wrl {
if err := s.pushXds(con, w, pushRequest); err != nil {
return err
}
}

响应 Envoy 主动发起的 xDS 请求

在 Envoy 连接后,会开启一个协程用于响应 Envoy 主动发起的 xDS 请求。

1
2
3
4
5
6
7
8
// Do not call: defer close(con.pushChannel). The push channel will be garbage collected
// when the connection is no longer used. Closing the channel can cause subtle race conditions
// with push. According to the spec: "It's only necessary to close a channel when it is important
// to tell the receiving goroutines that all data have been sent."

// Block until either a request is received or a push is triggered.
// We need 2 go routines because 'read' blocks in Recv().
go s.receive(con, ids)

receive 方法部分如下,接收到 Envoy 主动发起的 xDS 请求后,会将请求推送至客户端连接的 reqChan 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
firstRequest := true
for {
req, err := con.stream.Recv()
if err != nil {
if istiogrpc.IsExpectedGRPCError(err) {
log.Infof("ADS: %q %s terminated", con.peerAddr, con.conID)
return
}
con.errorChan <- err
log.Errorf("ADS: %q %s terminated with error: %v", con.peerAddr, con.conID, err)
totalXDSInternalErrors.Increment()
return
}
// This should be only set for the first request. The node id may not be set - for example malicious clients.
if firstRequest {
// probe happens before envoy sends first xDS request
if req.TypeUrl == v3.HealthInfoType {
log.Warnf("ADS: %q %s send health check probe before normal xDS request", con.peerAddr, con.conID)
continue
}
firstRequest = false
if req.Node == nil || req.Node.Id == "" {
con.errorChan <- status.New(codes.InvalidArgument, "missing node information").Err()
return
}
if err := s.initConnection(req.Node, con, identities); err != nil {
con.errorChan <- err
return
}
defer s.closeConnection(con)
log.Infof("ADS: new connection for node:%s", con.conID)
}

select {
case con.reqChan <- req:
case <-con.stream.Context().Done():
log.Infof("ADS: %q %s terminated with stream closed", con.peerAddr, con.conID)
return
}
}

客户端监听 reqChan 响应 Envoy

客户端会监听 reqChan,并调用 precessRequest 响应 Envoy 的主动请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
select {
case req, ok := <-con.reqChan:
if ok {
if err := s.processRequest(req, con); err != nil {
return err
}
} else {
// Remote side closed connection or error processing the request.
return <-con.errorChan
}
case <-con.stop:
return nil
default:
}

processRequest 响应客户端

在 processRequest 的最后,调用 pushXds 方法响应客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// processRequest handles one discovery request. This is currently called from the 'main' thread, which also
// handles 'push' requests and close - the code will eventually call the 'push' code, and it needs more mutex
// protection. Original code avoided the mutexes by doing both 'push' and 'process requests' in same thread.
func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {
stype := v3.GetShortType(req.TypeUrl)
log.Debugf("ADS:%s: REQ %s resources:%d nonce:%s version:%s ", stype,
con.conID, len(req.ResourceNames), req.ResponseNonce, req.VersionInfo)
if req.TypeUrl == v3.HealthInfoType {
s.handleWorkloadHealthcheck(con.proxy, req)
return nil
}

// For now, don't let xDS piggyback debug requests start watchers.
if strings.HasPrefix(req.TypeUrl, v3.DebugType) {
return s.pushXds(con,
&model.WatchedResource{TypeUrl: req.TypeUrl, ResourceNames: req.ResourceNames},
&model.PushRequest{Full: true, Push: con.proxy.LastPushContext})
}
if s.StatusReporter != nil {
s.StatusReporter.RegisterEvent(con.conID, req.TypeUrl, req.ResponseNonce)
}
shouldRespond, delta := s.shouldRespond(con, req)
if !shouldRespond {
return nil
}

request := &model.PushRequest{
Full: true,
Push: con.proxy.LastPushContext,
Reason: model.NewReasonStats(model.ProxyRequest),

// The usage of LastPushTime (rather than time.Now()), is critical here for correctness; This time
// is used by the XDS cache to determine if a entry is stale. If we use Now() with an old push context,
// we may end up overriding active cache entries with stale ones.
Start: con.proxy.LastPushTime,
Delta: delta,
}

// SidecarScope for the proxy may not have been updated based on this pushContext.
// It can happen when `processRequest` comes after push context has been updated(s.initPushContext),
// but proxy's SidecarScope has been updated(s.computeProxyState -> SetSidecarScope) due to optimizations that skip sidecar scope
// computation.
if con.proxy.SidecarScope != nil && con.proxy.SidecarScope.Version != request.Push.PushVersion {
s.computeProxyState(con.proxy, request)
}
return s.pushXds(con, con.Watched(req.TypeUrl), request)
}