func(s *DiscoveryServer)Start(stopCh <-chanstruct{}) { go s.WorkloadEntryController.Run(stopCh) go s.handleUpdates(stopCh) go s.periodicRefreshMetrics(stopCh) go s.sendPushes(stopCh) go s.Cache.Run(stopCh) }
向 Envoy 推送更新
向 Envoy 推送更新的流程如下:
Config Controller 和 Service Controller 在配置或者服务发生变化时通过回调方法通知 Discovery Server,Discovery Server 将变化的消息推至 Push Channel 中。
// ConfigUpdate implements ConfigUpdater interface, used to request pushes. func(s *DiscoveryServer)ConfigUpdate(req *model.PushRequest) { iflen(model.ConfigsOfKind(req.ConfigsUpdated, kind.Address)) > 0 { // This is a bit like clearing EDS cache on EndpointShard update. Because Address // types are fetched dynamically, they are not part of the same protections, so we need to clear // the cache. s.Cache.ClearAll() } inboundConfigUpdates.Increment() s.InboundUpdates.Inc() s.pushChannel <- req }
for { select { case <-freeCh: free = true pushWorker() case r := <-ch: // If reason is not set, record it as an unknown reason iflen(r.Reason) == 0 { r.Reason = model.NewReasonStats(model.UnknownTrigger) } if !opts.enableEDSDebounce && !r.Full { // trigger push now, just for EDS gofunc(req *model.PushRequest) { pushFn(req) updateSent.Inc() }(r) continue }
// Push is called to push changes on config updates using ADS. func(s *DiscoveryServer)Push(req *model.PushRequest) { if !req.Full { req.Push = s.globalPushContext() s.dropCacheForRequest(req) s.AdsPushAll(req) return } // Reset the status during the push. oldPushContext := s.globalPushContext() if oldPushContext != nil { oldPushContext.OnConfigChange() // Push the previous push Envoy metrics. envoyfilter.RecordMetrics() } // PushContext is reset after a config change. Previous status is // saved. t0 := time.Now() versionLocal := s.NextVersion() push, err := s.initPushContext(req, oldPushContext, versionLocal) if err != nil { return } initContextTime := time.Since(t0) log.Debugf("InitContext %v for push took %s", versionLocal, initContextTime) pushContextInitTime.Record(initContextTime.Seconds())
// AdsPushAll will send updates to all nodes, for a full config or incremental EDS. func(s *DiscoveryServer)AdsPushAll(req *model.PushRequest) { if !req.Full { log.Infof("XDS: Incremental Pushing ConnectedEndpoints:%d Version:%s", s.adsClientCount(), req.Push.PushVersion) } else { totalService := len(req.Push.GetAllServices()) log.Infof("XDS: Pushing Services:%d ConnectedEndpoints:%d Version:%s", totalService, s.adsClientCount(), req.Push.PushVersion) monServices.Record(float64(totalService))
// Make sure the ConfigsUpdated map exists if req.ConfigsUpdated == nil { req.ConfigsUpdated = make(sets.Set[model.ConfigKey]) } }
s.startPush(req) }
// Send a signal to all connections, with a push event. func(s *DiscoveryServer)startPush(req *model.PushRequest) { // Push config changes, iterating over connected envoys. if log.DebugEnabled() { currentlyPending := s.pushQueue.Pending() if currentlyPending != 0 { log.Debugf("Starting new push while %v were still pending", currentlyPending) } } req.Start = time.Now() for _, p := range s.AllClients() { s.pushQueue.Enqueue(p, req) } }
funcdoSendPushes(stopCh <-chanstruct{}, semaphore chanstruct{}, queue *PushQueue) { for { select { case <-stopCh: return default: // We can send to it until it is full, then it will block until a pushes finishes and reads from it. // This limits the number of pushes that can happen concurrently semaphore <- struct{}{}
// Get the next proxy to push. This will block if there are no updates required. client, push, shuttingdown := queue.Dequeue() if shuttingdown { return } recordPushTriggers(push.Reason) // Signals that a push is done by reading from the semaphore, allowing another send on it. doneFunc := func() { queue.MarkDone(client) <-semaphore }
// Send pushes to all generators // Each Generator is responsible for determining if the push event requires a push wrl, ignoreEvents := con.pushDetails() for _, w := range wrl { if err := s.pushXds(con, w, pushRequest); err != nil { return err } }
响应 Envoy 主动发起的 xDS 请求
在 Envoy 连接后,会开启一个协程用于响应 Envoy 主动发起的 xDS 请求。
1 2 3 4 5 6 7 8
// Do not call: defer close(con.pushChannel). The push channel will be garbage collected // when the connection is no longer used. Closing the channel can cause subtle race conditions // with push. According to the spec: "It's only necessary to close a channel when it is important // to tell the receiving goroutines that all data have been sent."
// Block until either a request is received or a push is triggered. // We need 2 go routines because 'read' blocks in Recv(). go s.receive(con, ids)
firstRequest := true for { req, err := con.stream.Recv() if err != nil { if istiogrpc.IsExpectedGRPCError(err) { log.Infof("ADS: %q %s terminated", con.peerAddr, con.conID) return } con.errorChan <- err log.Errorf("ADS: %q %s terminated with error: %v", con.peerAddr, con.conID, err) totalXDSInternalErrors.Increment() return } // This should be only set for the first request. The node id may not be set - for example malicious clients. if firstRequest { // probe happens before envoy sends first xDS request if req.TypeUrl == v3.HealthInfoType { log.Warnf("ADS: %q %s send health check probe before normal xDS request", con.peerAddr, con.conID) continue } firstRequest = false if req.Node == nil || req.Node.Id == "" { con.errorChan <- status.New(codes.InvalidArgument, "missing node information").Err() return } if err := s.initConnection(req.Node, con, identities); err != nil { con.errorChan <- err return } defer s.closeConnection(con) log.Infof("ADS: new connection for node:%s", con.conID) }
select { case con.reqChan <- req: case <-con.stream.Context().Done(): log.Infof("ADS: %q %s terminated with stream closed", con.peerAddr, con.conID) return } }
客户端监听 reqChan 响应 Envoy
客户端会监听 reqChan,并调用 precessRequest 响应 Envoy 的主动请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
select { case req, ok := <-con.reqChan: if ok { if err := s.processRequest(req, con); err != nil { return err } } else { // Remote side closed connection or error processing the request. return <-con.errorChan } case <-con.stop: returnnil default: }
// processRequest handles one discovery request. This is currently called from the 'main' thread, which also // handles 'push' requests and close - the code will eventually call the 'push' code, and it needs more mutex // protection. Original code avoided the mutexes by doing both 'push' and 'process requests' in same thread. func(s *DiscoveryServer)processRequest(req *discovery.DiscoveryRequest, con *Connection)error { stype := v3.GetShortType(req.TypeUrl) log.Debugf("ADS:%s: REQ %s resources:%d nonce:%s version:%s ", stype, con.conID, len(req.ResourceNames), req.ResponseNonce, req.VersionInfo) if req.TypeUrl == v3.HealthInfoType { s.handleWorkloadHealthcheck(con.proxy, req) returnnil }
// For now, don't let xDS piggyback debug requests start watchers. if strings.HasPrefix(req.TypeUrl, v3.DebugType) { return s.pushXds(con, &model.WatchedResource{TypeUrl: req.TypeUrl, ResourceNames: req.ResourceNames}, &model.PushRequest{Full: true, Push: con.proxy.LastPushContext}) } if s.StatusReporter != nil { s.StatusReporter.RegisterEvent(con.conID, req.TypeUrl, req.ResponseNonce) } shouldRespond, delta := s.shouldRespond(con, req) if !shouldRespond { returnnil }
// The usage of LastPushTime (rather than time.Now()), is critical here for correctness; This time // is used by the XDS cache to determine if a entry is stale. If we use Now() with an old push context, // we may end up overriding active cache entries with stale ones. Start: con.proxy.LastPushTime, Delta: delta, }
// SidecarScope for the proxy may not have been updated based on this pushContext. // It can happen when `processRequest` comes after push context has been updated(s.initPushContext), // but proxy's SidecarScope has been updated(s.computeProxyState -> SetSidecarScope) due to optimizations that skip sidecar scope // computation. if con.proxy.SidecarScope != nil && con.proxy.SidecarScope.Version != request.Push.PushVersion { s.computeProxyState(con.proxy, request) } return s.pushXds(con, con.Watched(req.TypeUrl), request) }