Dawn's Blogs

分享技术 记录成长

0%

延迟消息投递

开源的 NSQ、RabbitMQ、ActiveMQ 和 Pulsar 也都内置了延迟消息的处理能力。虽然每个 MQ 项目的使用和实现方式不同,但核心实现思路都一样:Producer 将一个延迟消息发送到某个 Topic 中,Broker 将延迟消息放到临时存储进行暂存,延迟跟踪服务(Delayed Tracker Service)会检查消息是否到期,将到期的消息进行投递

图片

延迟消息投递是要暂缓对当前消息的处理,在未来的某个时间点再触发投递,实际的应用场景非常多,比如异常检测重试、订单超时取消、预约提醒等。

Pulsar 最早是在 2.4.0 引入了延迟消息投递的特性,在 Pulsar 中使用延迟消息,可以精确指定延迟投递的时间,有 deliverAfter 和 deliverAt 两种方式。其中 deliverAt 可以指定具体的时间戳;deliverAfter 可以指定在当前多长时间后执行。

在 Pulsar 中,可以支持跨度很大的延时消息,比方说一个月、半年;同时在一个 Topic 里,既支持延时消息,也支持非延时消息。下图展示了 Pulsar 中延迟消息的具体过程:

图片

实现原理

Pulsar 实现延迟消息投递的方式比较简单,Delayed Message Tracker 在堆外内存维护着一个 delayed index 优先级队列,根据延迟时间进行堆排序,延迟时间最短的会放在头上,时间越长越靠后。

Consumer 在消费时,会先去 Delayed Message Tracker 检查,是否有到期需要投递的消息,如果有到期的消息,则从 Tracker 中拿出对应的 index,找到对应的消息进行消费;如果没有到期的消息,则直接消费正常的消息。

如果集群出现 Broker 宕机,Pulsar 会重建 delayed index 队列,来保证延迟投递的消息能够正常工作。

图片

风险

从 Pulsar 的延迟消息投递实现原理可以看出,该方法简单高效,对 Pulsar 内核侵入性较小,可以支持到任意时间的延迟消息。但同时发现,Pulsar 的实现方案无法支持大规模使用延迟消息,主要有以下两个原因:

1. delayed index队列受到内存限制

2. delayed index队列重建时间开销

未来工作

Pulsar 目前的延迟消息投递方案简单高效,但处理大规模延迟消息时仍然存在风险。关于延迟消息投递,社区和数据平台部 MQ 团队下一步将聚焦在支持大规模延迟消息。目前讨论的方案是在 delayed index 队列加入时间分区,Broker 只加载当前较近的时间片 delayed index 到内存,其余时间片分区持久化磁盘,

Consumer 订阅 Topic 的时候,通过订阅模式来控制消息的使用模式,指定如何将消息投递给一个组一个的或多个的 Consumer。Pulsar 支持 4 种订阅模式:

  • exclusive(独占模式)
  • failover(故障转移模式,也叫灾备模式)
  • shared(共享模式)
  • key-shared(基于key的共享模式)

订阅模式

exclusive

独占模式,只能有一个 Consumer 绑定到订阅上。如果多于一个 Consumer 尝试以使用相同的订阅订阅 Topic,就会抛出异常且无法连接。

pulsar-exclusive-subscriptions.png

failover

在灾备订阅模式中,多个 Consumer 可以绑定到同一个订阅上, Consumer 将会按字典顺序排序第一个 Consumer 被初始化为唯一接受消息的消费者,被称为 Master Consumer。 当 Master Consumer 断开时,所有的未被确认和后续进入的消息将会被投递给下一个 Consumer。

灾备模式提供了高可用性。

pulsar-failover-subscriptions.png

shared

在共享模式中,多个 Consumer 可以绑定到同一个订阅上。 消息通过 round robin 轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。 当一个消费者断开连接时,所有已经投递给它但还没有被确认的消息将被重新投递,分发给其它存活的消费者。

pulsar-shared-subscriptions.png

key-shared

key-shared 模式是共享模式的一种特例,它也允许多个 Consumer 可以绑定到同一个订阅上,与共享模式中的 round robin 轮询消费消息不同,key-shared 模式增加了一个辅助 key,确保具有相同 key 的消息被交付给相同的消费者

Pulsar 在逻辑上的层级,从上到下依次为租户、命名空间、Topic。

每个租户都可以有单独的认证和授权机制,可以针对租户设置存储配额、消息生存时间 TTL 和隔离策略。

pulsar-logical-arch2.png

默认情况下一个 Topic 中的消息只能由 Pulsar 服务层中的一个 Broker 提供服务,因此单个 Topic 的吞吐量受限于为其提供服务的 Broker 的计算能力。

好在 Pulsar 还支持分区主题,这允许多个 Broker 提供服务,就可以将负载分布到多台机器上。在 Pulsar 内部,分区主题被实现为 N 个内部 Topic,N 是分区数量,分区跨 Broker 的分布由 Pulsar 自动管理,这对用户来说是完全透明的。 Pulsar 将分区主题实现为多个内部主题,这样就可以在需要增加分区数量的时候不必重新平衡整个主题,Pulsar 只需在其内部创建新的内部主题就能够立即接收新的消息,使客户端能在不中断的情况下对现有分区进行消息读写。

当生产者发布消息到分区主题时,不需要特意指定路由模式,默认以轮循的方式将消息均匀分布到各个分区。支持以下3种路由模式:

  • SinglePartition:如果没有消息 key 提供,生产者将会随机选择一个固定的分区来发布消息(同一个生产者生产在同一个分区中),可用于将来自特定生产者的消息分组在一起,以在没有键值时维护消息顺序。
  • RoundRobinPartition:如果没有消息 key 提供,将以轮循的方式将消息均匀分布到各个分区。
  • CustomPartition:定制实现路由模式,控制消息分发到 Topic 的分区中。

如果提供了消息 key,会以 key 做 hash,然后分配消息到指定分区。因此分区中消息的顺序与路由模式和消息的key有关,所有拥有相同的 key 的消息有序,将会被发送到相同的分区

Apache Pulsar是一个支持多租户的、高性能的、分布式多租户消息系统,最初由雅虎开发,现在是Apache软件基金会的顶级项目。 Pulsar提供非常低的消息发布和端到端的延迟、保障消息可靠传递零丢失。

分布式消息系统

Pulsar 是基于段的分布式消息队列。

一般的消息系统在逻辑上都可分为服务层和存储层两层:

  • 服务层:直接与消息的生产者和消费者交互,接收传入的消息并将消息路由到一个或多个目的地。服务层通过支持的消息协议进行通讯,例如AMQP。服务层在消息协议转换上依赖CPU,在通信上依赖网络带宽。
  • 存储层:负责消息的存储。存储层与服务层交互提供服务层请求的消息,存储层保持消息的正确顺序。存储层严重依赖硬盘来存储消息。

分布式消息系统与传统单点消息系统在架构上的主要差别在于存储层的设计方式。在分布式消息传递系统中,数据分布在集群中的多台机器上,允许在单个主题中保留超过单个机器存储容量的消息。分布式消息系统存储层的关键架构抽象是 write-ahead-log,它将存储的消息看成是单个仅追加的数据结构。 对于分布式消息系统,当新消息发布到主题时,从逻辑的角度来看,消息将被追加到日志的末尾;从物理的角度来看,消息可能写入集群中的任何服务器。

partion-based-msg.png

分布式消息系统的好处是将负载分散到多台机器上,可以提高消息的生产和消费的吞吐量,每台服务器都有自己硬盘和写入路径,将提供更好的写入速率。 在分布式消息系统集群中分布消息数据时,有两种不同的方法: 基于分区(Partion-based)和基于段(Segment-based)

基于分区

Kafka 采用的是基于分区的消息存储架构。

在基于分区的消息存储结构中,Topic 被划分为固定数量的分区,发布到 Topic 中的数据均匀的分布在分区中,同时为了确保数据冗余,每个分区会被复制到不同的节点

Topic 中消息的总存储量为 Topic 的分区数乘以分区的大小,如果达到这个限制,需要向集群添加更多的节点同时增加 Topic 的分区数量才能继续向分区中添加数据,增加分区的数量还需要执行重新平衡,这是一个十分复杂和耗时的过程。

在基于分区的消息存储结构的分布式消息系统中,一般在创建 Topic 的时候需要预先确定分区数量,但这样做有一些缺点,因为单个分区只会存储在集群中单个集群节点上,因此单个分区的大小就受限于该节点上的硬盘空间大小,由于 Topic 中的数据均匀分布在所有分区中,所以如果集群节点的硬盘容量不一样的话,那么 Topic 的每个分区的大小将限制为最小硬盘容量的节点。当 Topic 达到容量限制后,唯一能做的就是增加 Topic 的分区数量,但这个扩容的过程包括重新平衡整个 Topic,Topic 中的数据将被重新分布到该 Topic 的所有分区中,平衡数据的过程十分消耗网络带宽和磁盘 I/O

kafka-partitions

基于段

Pulsar 采用的是基于段的消息存储结构。

Pulsar 中服务层合存储层都是无状态的,可以任意的进行水平扩容。引入 Zookeeper 集群,用于集群级别的配置和协调,Zookeeper 中存储 Pulsar 集群的所有元数据(例如 Topic 元数据、Broker 负载数据等等)

Pulsar 依赖 Apache BookKeeper 项目来实现消息的持久存储,BookKeeper 的逻辑存储模型是基于无限流记录作为顺序日志存储的概念。

在BookKeeper中,每个日志被分解成更小的数据块,称为段(Segament),这些数据块又由多个日志条目组成。然后,为了实现冗余和扩展,这些段会在存储层中被称为 bookies 的多个节点上写入。可以将段放在集群节点具有足够硬盘容量的任何位置,当没有足够的空间用于存储新的段时,可以方便地添加节点并立即存储数据。基于段的存储架构的优点在于可以实现真正的水平伸缩,段可以被无限创建并存储在任何位置。

pulsar-segaments

Kuma 将使用的对象抽象为各种资源(Resource),如 Zone,Ingress,Dataplain,RateLimit 等,由资源管理器(Resource Manager)对资源存储(Resource Store)进行增删改查等操作

Resource Store 向上屏蔽了底层存储的差异,在 Kuma 中使用两种存储模式,kubernetes API Server 和 PostgreSQL 数据库。而 Resource Manager 屏蔽了对于各个资源操作的差异

Resource Store

接口定义

ResourceStore 接口用于定义资源的底层存储,ResourceStore 需要实现 Create、Update、Delete、Get、List 操作。

1
2
3
4
5
6
7
type ResourceStore interface {
Create(context.Context, model.Resource, ...CreateOptionsFunc) error
Update(context.Context, model.Resource, ...UpdateOptionsFunc) error
Delete(context.Context, model.Resource, ...DeleteOptionsFunc) error
Get(context.Context, model.Resource, ...GetOptionsFunc) error
List(context.Context, model.ResourceList, ...ListOptionsFunc) error
}

接口实现

在 Kuma 中,ResourceStore 接口由 k8s、memory、postgres、remote 实现。其中:

  • K8s:在 Kubernetes 模式下,以 Kubernetes API Server 存储资源。
  • Postgres:在 Universal 模式下,以 PostgreSQL 作为底层数据库存储数据。
  • Memory:内存数据库,用于测试。
  • Remote:为 kuma-ctl 做资源存储,负责以 HTTP Restful API 的方式操作 kuma-cp。
1
2
3
4
├── k8s
├── memory
├── postgres
└── remote

CustomizableResourceStore

Customizable Resource Store 为某一种具体的资源提供定制化的 Resource Store:

  • Customize 方法:为 model.ReourceType 类型的资源指定资源存储器。
  • ResourceStore 方法:返回 model.ResourceType 类型的资源存储器,如果没有该类型,则会返回默认的资源存储器。
1
2
3
4
5
6
7
8
9
10
11
// ResourceStoreWrapper is a function that takes a ResourceStore and returns a wrapped ResourceStore.
// The wrapped ResourceStore can be used to modify or augment the behavior of the original ResourceStore.
type ResourceStoreWrapper = func(delegate ResourceStore) ResourceStore

type CustomizableResourceStore interface {
ResourceStore
ResourceStore(typ model.ResourceType) ResourceStore
DefaultResourceStore() ResourceStore
Customize(typ model.ResourceType, store ResourceStore)
WrapAll(wrapper ResourceStoreWrapper)
}

Resource Manager

接口定义

Resource Manager 用于对资源(Zone、Dataplain 等)进行操作,定义的方法与 Resource Store 差不多,都是增删改查。

1
2
3
4
5
6
7
8
9
10
11
12
type ReadOnlyResourceManager interface {
Get(context.Context, model.Resource, ...store.GetOptionsFunc) error
List(context.Context, model.ResourceList, ...store.ListOptionsFunc) error
}

type ResourceManager interface {
ReadOnlyResourceManager
Create(context.Context, model.Resource, ...store.CreateOptionsFunc) error
Update(context.Context, model.Resource, ...store.UpdateOptionsFunc) error
Delete(context.Context, model.Resource, ...store.DeleteOptionsFunc) error
DeleteAll(context.Context, model.ResourceList, ...store.DeleteAllOptionsFunc) error
}

CustomizableResourceManager

Customizable Resource Manager 为某一种具体的资源提供定制化的 Resource Manager:

  • Customize 方法:为 model.ReourceType 类型的资源指定资源管理器。
  • ResourceManager 方法:返回 model.ResourceType 类型的资源管理器,如果没有该类型,则会返回默认的资源管理器。
1
2
3
4
5
6
7
8
type ResourceManagerWrapper = func(delegate ResourceManager) ResourceManager

type CustomizableResourceManager interface {
ResourceManager
Customize(model.ResourceType, ResourceManager)
ResourceManager(model.ResourceType) ResourceManager
WrapAll(ResourceManagerWrapper)
}

初始化

在 kuma-cp 初始化 Resource Manager 时,会为每一种资源指定一种资源管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
func initializeResourceManager(cfg kuma_cp.Config, builder *core_runtime.Builder) error {
defaultManager := core_manager.NewResourceManager(builder.ResourceStore())
customizableManager := core_manager.NewCustomizableResourceManager(defaultManager, nil)

customizableManager.Customize(
mesh.MeshType,
mesh_managers.NewMeshManager(
builder.ResourceStore(),
customizableManager,
builder.CaManagers(),
registry.Global(),
builder.ResourceValidators().Mesh,
cfg.Store.UnsafeDelete,
builder.Extensions(),
),
)

rateLimitValidator := ratelimit_managers.RateLimitValidator{
Store: builder.ResourceStore(),
}
customizableManager.Customize(
mesh.RateLimitType,
ratelimit_managers.NewRateLimitManager(builder.ResourceStore(), rateLimitValidator),
)

externalServiceValidator := externalservice_managers.ExternalServiceValidator{
Store: builder.ResourceStore(),
}
customizableManager.Customize(
mesh.ExternalServiceType,
externalservice_managers.NewExternalServiceManager(builder.ResourceStore(), externalServiceValidator),
)

customizableManager.Customize(
mesh.DataplaneType,
dataplane.NewDataplaneManager(builder.ResourceStore(), builder.Config().Multizone.Zone.Name, builder.ResourceValidators().Dataplane),
)

customizableManager.Customize(
mesh.DataplaneInsightType,
dataplaneinsight.NewDataplaneInsightManager(builder.ResourceStore(), builder.Config().Metrics.Dataplane),
)

customizableManager.Customize(
system.ZoneType,
zone.NewZoneManager(builder.ResourceStore(), zone.Validator{Store: builder.ResourceStore()}, builder.Config().Store.UnsafeDelete),
)

customizableManager.Customize(
system.ZoneInsightType,
zoneinsight.NewZoneInsightManager(builder.ResourceStore(), builder.Config().Metrics.Zone),
)

customizableManager.Customize(
mesh.ZoneIngressInsightType,
zoneingressinsight.NewZoneIngressInsightManager(builder.ResourceStore(), builder.Config().Metrics.Dataplane),
)

customizableManager.Customize(
mesh.ZoneEgressInsightType,
zoneegressinsight.NewZoneEgressInsightManager(builder.ResourceStore(), builder.Config().Metrics.Dataplane),
)

var cipher secret_cipher.Cipher
switch cfg.Store.Type {
case store.KubernetesStore:
cipher = secret_cipher.None() // deliberately turn encryption off on Kubernetes
case store.MemoryStore, store.PostgresStore:
cipher = secret_cipher.TODO() // get back to encryption in universal case
default:
return errors.Errorf("unknown store type %s", cfg.Store.Type)
}
var secretValidator secret_manager.SecretValidator
if cfg.IsFederatedZoneCP() {
secretValidator = secret_manager.ValidateDelete(func(ctx context.Context, secretName string, secretMesh string) error { return nil })
} else {
secretValidator = secret_manager.NewSecretValidator(builder.CaManagers(), builder.ResourceStore())
}

customizableManager.Customize(
system.SecretType,
secret_manager.NewSecretManager(builder.SecretStore(), cipher, secretValidator, cfg.Store.UnsafeDelete),
)

customizableManager.Customize(
system.GlobalSecretType,
secret_manager.NewGlobalSecretManager(builder.SecretStore(), cipher),
)

builder.WithResourceManager(customizableManager)

if builder.Config().Store.Cache.Enabled {
cachedManager, err := core_manager.NewCachedManager(
customizableManager,
builder.Config().Store.Cache.ExpirationTime.Duration,
builder.Metrics(),
builder.Tenants(),
)
if err != nil {
return err
}
builder.WithReadOnlyResourceManager(cachedManager)
} else {
builder.WithReadOnlyResourceManager(customizableManager)
}
return nil
}

Cached Manager

Cached Manager 实现了只读资源管理器 Read Only Resource Manager,为对资源的 Get 和 List 操作进行缓存(有过期时间)。

  • delegate:底层 ReadOnlyResourceManager,若缓存未命中则会从底层只读资源管理器中读取数据,并记录在缓存中。
  • cache:cached manager 的核心数据结构,用于缓存查询结果。key 为 <op>:<resource_type>:<ops.hashcode(resource_name+resource_mesh)>:tenantID
1
cacheKey := fmt.Sprintf("GET:%s:%s:%s", res.Descriptor().Name, opts.HashCode(), tenantID)
  • mutexes:为每一个 key 单独加锁,保证每一个 k-v 都互斥访问。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Cached version of the ReadOnlyResourceManager designed to be used only for use cases of eventual consistency.
// This cache is NOT consistent across instances of the control plane.
//
// When retrieving elements from cache, they point to the same instances of the resources.
// We cannot do deep copies because it would consume lots of memory, therefore you need to be extra careful to NOT modify the resources.
type cachedManager struct {
delegate ReadOnlyResourceManager
cache *cache.Cache
metrics *prometheus.CounterVec

mutexes map[string]*sync.Mutex
mapMutex sync.Mutex // guards "mutexes" field
tenants multitenant.Tenants
}

Kuma-cp 的 Intercp(Control Plane Intercommunication)组件用于 kuma 控制面之间的通信,旨在实现 kuma-cp 的水平扩展。

kuma 的水平扩展包含三个方面:

  • Leader 选举机制:在 kuma 中,leader 选举机制非常简单,就是选择最新加入的 kuma-cp 节点作为 Leader。作为 Leader,会接收成员的心跳,并且定期监测超时的 kuma-cp 成员。
  • 心跳机制:作为成员,会定期向 Leader 发送心跳,表明自己还存活。
  • EnvoyAdminServer:对于 Global Cp 的水平扩展多实例部署,会将 EnvoyAdmin 请求转发给 Leader Global Kuma-cp 执行。对于 Zone Cp 的水平扩展多实例部署,会在本实例执行对 envoy 发起 Admin 请求获取响应。

下文中的实例,表示 kuma-cp 水平扩展中的某一实例。

可以发现,kuma-cp 在水平扩展时,如果 leader 下线了没有自动故障切换机制,除非原来的 leader 再次上线或者有新的 kuma-cp 实例上线,leader 才会正常工作。

阅读全文 »

在微服务架构中,如何感知后端服务实例的动态上下线,就是服务发现(Service Discovery)。业界比较有代表性的微服务框架如 SpringCloud、Dubbo 等都抽象了强大的动态地址发现能力,并且为了满足微服务业务场景的需求,绝大多数框架的地址发现都是基于自己设计的一套机制来实现。在 SpringCloud 中使用 Eureka 作为注册中心,Dubbo 通常采用 zookeeper 和 nacos 作为注册中心。注册中心不仅仅记录了 IP+Port,还包括微服务的元信息,如序列化类型,实例方法列表,各个方法级的定制化配置等。

服务发现包含三种角色:服务提供者(Provider)、服务消费者(Consumer)和注册中心(Registry)。不同框架之间的区别在于如何组织注册中心中的数据。

dubbo中应用,服务和实例的概念区分:

应用是一个独立的逻辑单元,一个应用可以包含多个服务,每个服务可以包含多个实例

业界服务发现方式

Spring Cloud

Spring Cloud 通过注册中心只同步了应用与实例地址,消费方可以基于实例地址与服务提供方建立链接,但是消费方对于如何发起 HTTP 调用(SpringCloud 基于 rest 通信)一无所知,比如对方有哪些 HTTP endpoint,需要传入哪些参数等。

RPC 服务这部分信息目前都是通过线下约定或离线的管理系统来协商的。这种架构的优缺点总结如下。

  • 优势: 部署结构清晰、地址推送量小。
  • 缺点: 地址订阅需要指定应用名, provider 应用变更(拆分)需消费端感知;RPC 调用无法全自动同步。

img

Dubbo

Dubbo 通过注册中心同时同步了实例地址和 RPC 方法,因此其能实现 RPC 过程的自动同步,面向 RPC 编程、面向 RPC 治理,对后端应用的拆分消费端无感知,其缺点则是地址推送数量变大,和 RPC 方法成正比。

img

Dubbo + Kubernetes

Kubernetes Service 作为一个抽象概念,怎么映射到 Dubbo 是一个值得讨论的点。

  1. Service Name - > Application Name:Dubbo 应用和 Kubernetes 服务一一对应,对于微服务运维和建设环节透明,与开发阶段解耦(对应于应用服务发现)。
  2. Service Name - > Dubbo RPC Service:维护的 Service 数量变多,一个 Dubbo 应用可以运行多个接口,即一个 Dubbo 应用可以创建多个 Kubernetes Service(如 dubbo-app-service-1,dubbo-app- service-2 等)。

Dubbo 3 服务发现

服务自省

是什么?

以 Dubbo 之前的地址发现数据格式为例,它是“RPC 服务粒度”的,它是以 RPC 服务作为 key,以实例列表作为 value 来组织数据的:

1
2
3
4
5
6
7
8
"RPC Service1": [  
{"name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}},
{"name":"instance2", "ip":"127.0.0.1", "metadata":{"timeout":2000}},
{"name":"instance3", "ip":"127.0.0.1", "metadata":{"timeout":3000}}
],

"RPC Service2": [Instance list of RPC Service2],
"RPC ServiceN": [Instance list of RPC ServiceN]

而 Dubbo 3 中的服务发现,是“应用粒度的服务发现”,它以应用名(Application)作为 key,以这个应用部署的一组实例(Instance)列表作为 value。这带来两点不同:

  1. 数据映射关系变了,从 RPC Service -> Instance 变为 Application -> Instance。
  2. 数据变少了,注册中心没有了 RPC Service 及其相关配置信息。
1
2
3
4
5
"application1": [
{"name":"instance1", "ip":"127.0.0.1", "metadata":{}},
{"name":"instance2", "ip":"127.0.0.1", "metadata":{}},
{"name":"instanceN", "ip":"127.0.0.1", "metadata":{}}
]

Dubbo 之前的服务发现粒度更细,在注册中心产生的数据条目也会更多(与 RPC 服务成正比),同时也存在一定的数据冗余

接着解释它为什么会被叫做“服务自省”?其实这还是得从它的工作原理说起,上面提到,应用粒度服务发现的数据模型有几个以下明显变化:数据中心的数据量少了,RPC 服务相关的数据在注册中心没有了,现在只有 application - instance 这两个层级的数据。

为了保证这部分缺少的 RPC 服务数据仍然能被 Consumer 端正确的感知,我们在 Consumer 和 Provider 间建立了一条单独的通信通道:Consumer 和 Provider 两两之间通过特定端口交换信息,我们把这种 Provider 自己主动暴露自身信息的行为认为是一种内省机制,因此整个机制命名为:服务自省。

为什么需要?

为什么需要服务自省,这会带来以下优势:

  1. 与业界主流微服务模型对齐,比如 SpringCloud、Kubernetes Service 等。
  2. 提升性能与可伸缩性
    • 注册中心的数据减少了,大幅度的减轻注册中心的存储、推送压力,进而减少 Dubbo Consumer 侧的地址计算压力。
    • 服务发现的数据规模,以及集群规模也开始变得可预测、可评估(与 RPC 接口数量无关,只与实例部署规模相关)。

根据统计,平均情况下 Consumer 订阅的 3 个接口来自同一个 Provider 应用,如此计算下来,如果以应用粒度为地址通知和选址基本单位,则平均地址推送和计算量将下降 60% 还要多。而在极端情况下,也就是当 Consumer 端消费的接口更多的来自同一个应用时,这个地址推送与内存消耗的占用将会进一步得到降低,甚至可以超过 80% 以上。

典型的例子是 API 网关,可能一个 API 网关有几十个甚至上百个定义的服务。

工作原理

以下是服务自省的一个完整工作流程图,详细描述了服务注册、服务发现、MetadataService、RPC 调用间的协作流程。

  • 服务提供者启动,首先解析应用定义的“普通服务”并依次注册为 RPC 服务,紧接着注册内建的 MetadataService 服务,最后打开 TCP 监听端口。
  • 启动完成后,将实例信息注册到注册中心(仅限 ip、port 等实例相关数据),提供者启动完成。
  • 服务消费者启动,首先依据其要“消费的 provider 应用名”到注册中心查询地址列表,并完成订阅(以实现后续地址变更自动通知)。
  • 消费端拿到地址列表后,紧接着对 MetadataService 发起调用,返回结果中包含了所有应用定义的“普通服务”及其相关配置信息
  • 至此,消费者可以接收外部流量,并对提供者发起 Dubbo RPC 调用

img

设计原则

接口级服务发现的好处是数据量小,但是接口粒度的服务治理能力还是要继续保留。这就需要两点设计原则:

  1. 新的服务发现模型要实现对原有 Dubbo 消费端开发者的无感知迁移,即 Dubbo 继续面向 RPC 服务编程、面向 RPC 服务治理,做到对用户侧完全无感知。
  2. 建立 Consumer 与 Provider 间的自动化 RPC 服务元数据协调机制,解决传统微服务模型无法同步 RPC 级接口配置的缺点。

基本原理

  1. 注册中心的数据组织方式:以应用名为 key,实例列表为 value 组织数据。元数据只包含实例级别的元数据,不包含接口级别的元数据。注册中心的一个实例条目如下,仅仅包含实例地址,端口,实例级别的元数据信息等基本信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"name": "provider-app-name",
"id": "192.168.0.102:20880",
"address": "192.168.0.102",
"port": 20880,
"sslPort": null,
"payload": {
"id": null,
"name": "provider-app-name",
"metadata": {
"metadataService": "{\"dubbo\":{\"version\":\"1.0.0\",\"dubbo\":\"2.0.2\",\"release\":\"2.7.5\",\"port\":\"20881\"}}",
"endpoints": "[{\"port\":20880,\"protocol\":\"dubbo\"}]",
"storage-type": "local",
"revision": "6785535733750099598",
}
},
"registrationTimeUTC": 1583461240877,
"serviceType": "DYNAMIC",
"uriSpec": null
}
  1. 服务调用者和提供者自行协商 RPC 方法信息。在注册中心不再同步 RPC 服务信息后,服务自省在服务消费端和提供端之间建立了一条内置的 RPC 服务信息协商机制。服务端实例会暴露一个预定义的 MetadataService RPC 服务,消费端通过调用 MetadataService 获取每个实例 RPC 方法相关的配置信息。当前 MetadataService 返回的数据格式如下:
1
2
3
4
5
[
"dubbo://192.168.0.102:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=9585&release=2.7.5&side=provider&timestamp=1583469714314",
"dubbo://192.168.0.102:20880/org.apache.dubbo.demo.HelloService?anyhost=true&application=demo-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=9585&release=2.7.5&side=provider&timestamp=1583469714314",
"dubbo://192.168.0.102:20880/org.apache.dubbo.demo.WorldService?anyhost=true&application=demo-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=9585&release=2.7.5&side=provider&timestamp=1583469714314"
]

关键机制

元数据同步机制

Client 与 Server 间在收到地址推送后的配置同步是服务自省的关键环节,目前针对元数据同步有两种具体的可选方案,分别是:

  • 内建的 MetadataService 服务
  • 独立的元数据中心,通过中心化的元数据集群协调数据。Provider 实例启动后,会尝试将内部的 RPC 服务组织成元数据的格式到元数据中心,而 consumer 则在每次收到注册中心推送更新后,主动查询元数据中心。

img

RPC 服务与应用之间的映射关系

从服务粒度到应用粒度,要想用户完全无感知必须知道 RPC 服务与应用名之间的映射关系。所以为了使整个开发流程对老的 Dubbo 用户更透明,同时避免指定 provider 对可扩展性带来的影响,设计了一套 RPC 服务到应用名的映射关系,以尝试在 consumer 自动完成 RPC 服务到 provider 应用名的转换。

  • consumer 通过配置中心以服务名查询应用名
  • consumer 得到应用名,就可以检索注册中心,进行应用级别的服务发现了。

img

服务发现

本节介绍 kuma 如何处理 service mesh 中的服务发现问题,包括数据平面代理和控制平面之间的通信,控制平面代理间的通信。

数据平面代理和控制平面之间的通信

当数据平面代理连接到控制平面时,它会启动到控制平面的 gRPC 流连接(xDS)。它从控制平面检索最新的策略配置并向控制平面发送诊断信息。

  • 独立部署下,kuma-dp 直接连接到 kuma-cp 实例。
  • 多区域部署下,kuma-dp 连接到 zone kuma-cp,zone kuma-cp 将通过 KDS 的 xDS API 扩展连接到 global kuma-cp。在多区域模式下,数据平面代理永远不会连接到全局控制平面,而仅连接到区域控制平面。

数据平面和控制平面之间的连接不在服务请求的执行路径上,这意味着如果数据平面暂时失去与控制平面的连接,服务流量不会受到影响。

数据平面代理间的通信

数据平面会通报每一个服务的 IP 地址:

  • 在 Kubernetes 环境中,IP 地址为 Pod 的地址
  • 在 Universal 环境中,IP地址会检索 inbound listeners(用于配置控制平面代理的入站监听地址)。

数据平面会通报 IP 地址到控制平面,这意味着在任何时间 kuma-cp 都知道每一个服务的每个副本关联的所有 IP 地址是什么。

Kuma 数据平面代理间的通信,在独立部署和多区域部署使用自己的 DNS:

  • 独立部署模式下,直接使用 IP 地址进行通信。
  • 多区域部署模式下,Kuma 将自动解析域名,可以连接到在同一区域中运行的数据平面代理,或者通过 Egress(如果存在)和 另一个区域中的 Ingress 的地址 来实现跨区域连接。这意味着多区域部署模式下,服务之间可以跨集群(不管是 Kubernetes 还是 VM)连接。

DNS

对于每一份服务,都会生成一个以 .mesh 结尾的域名:

  • 对于一个 Kubernetes Service 和关联的 Pod,Kuma 控制面会自动的生成一个注解 kuma.io/service: <name>_<namespace>_svc_<port>,其中 name 和 namespace 还有 port 均来自 Service 信息。

  • 某些情况下 Pods 不属于任何的 Service,仅仅是单纯的一个 Pod(或者在 Universal 下)。在这种情况下,Kuma 会自动生成的注解为 kuma.io/service: <name>_<namespace>_svc ,其中 name 和 namespace 来自于 Pod 资源。

服务的域名被定义为 <kuma.io/service的值>.mesh,默认端口为 80。

非 mesh 流量

传入流量

启用 mTLS 后,来自网格外部的客户端无法访问网格内部的应用程序。如果想允许外部客户端使用服务网格,可以使用 Permissive mTLS 模式(宽松的 mTLS,即允许 mTLS,也允许明文请求)。

所有域名查找均由数据平面代理在本地处理,而不是由控制平面处理。这种方法允许更稳健地处理名称解析,例如,当控制平面关闭时,数据平面代理仍然可以解析 DNS。

数据平面代理 DNS 包括:

  • Envoy DNS 过滤器提供来自网格的 DNS 记录响应。
  • CoreDNS 用于 Envoy DNS 过滤器和原始主机 DNS 之间发送请求
  • iptable 规则将原始 DNS 流量重定向到本地 CoreDNS 实例。

穿出流量

在默认设置中,Kuma 允许任何非网状流量通过 Envoy,而无需应用任何策略。当 networking.outbound.passthroughfalse 时,任何非网格资源的流量都不能离开网格。

使用 ProxyTemplate,可以设置非 mesh 流量的配置。如断路器,超时。

Kuma 通过 KDS(Kuma Discovery Service)进行 Global kuma-cp 和 Zone kuma-cp 的资源交换,以实现跨集群的通信。

在第一节首先会简要说明 KDS 中需要同步的资源,第二节说明 KDS 的实现。

资源

KDS 同步的资源按照资源的流向,可以分为两种类型:

  • ZoneToGlobal:区域 cp 到全局 cp 同步的信息。
  • GlobalToZone:全局 cp 到区域 cp 同步的信息。

ZoneToGlobal

Dataplane

Dataplane:允许 Zone kuma-cp -> Global kuma-cp,Dataplane 定义了一个数据面 sidecar 代理的配置,spec 包括:

  • Network:网络,描述数据面代理的入站和出站地址。
  • Metrics: 定义数据面收集的指标。
  • Probes:暴露健康检查端口,健康检查配置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Dataplane defines a configuration of a side-car proxy.
type Dataplane struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

// Networking describes inbound and outbound interfaces of the data plane
// proxy.
Networking *Dataplane_Networking `protobuf:"bytes,1,opt,name=networking,proto3" json:"networking,omitempty"`
// Configuration for metrics that should be collected and exposed by the
// data plane proxy.
//
// Settings defined here will override their respective defaults
// defined at a Mesh level.
Metrics *MetricsBackend `protobuf:"bytes,2,opt,name=metrics,proto3" json:"metrics,omitempty"`
// Probes describe a list of endpoints that will be exposed without mTLS.
// This is useful to expose the health endpoints of the application so the
// orchestration system (e.g. Kubernetes) can still health check the
// application.
//
// See
// https://kuma.io/docs/latest/policies/service-health-probes/#virtual-probes
// for more information.
Probes *Dataplane_Probes `protobuf:"bytes,3,opt,name=probes,proto3" json:"probes,omitempty"`
}

DataplaneInsight:表示了数据平面的运行时状态,包括:

  • Subscriptions:描述了由数据平面向控制平面创建的 ADS 订阅。
  • MTLS:数据平面代理的 mTLS 状态,如证书过期时间,上次证书的生成时间等。
1
2
3
4
5
6
7
8
9
10
11
// DataplaneInsight defines the observed state of a Dataplane.
type DataplaneInsight struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

// List of ADS subscriptions created by a given Dataplane.
Subscriptions []*DiscoverySubscription `protobuf:"bytes,1,rep,name=subscriptions,proto3" json:"subscriptions,omitempty"`
// Insights about mTLS for Dataplane.
MTLS *DataplaneInsight_MTLS `protobuf:"bytes,2,opt,name=mTLS,proto3" json:"mTLS,omitempty"`
}

Egress

Egress:用于定义 Zone Egress。

  • Zone:所属的 Zone 名称。
  • Network:描述了 Egress 监听的地址和端口。
1
2
3
4
5
6
7
8
9
10
11
12
// ZoneEgress allows us to configure dataplane in the Egress mode.
type ZoneEgress struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

// Zone field contains Zone name where egress is serving, field will be
// automatically set by Global Kuma CP
Zone string `protobuf:"bytes,1,opt,name=zone,proto3" json:"zone,omitempty"`
// Networking defines the address and port of the Egress to listen on.
Networking *ZoneEgress_Networking `protobuf:"bytes,2,opt,name=networking,proto3" json:"networking,omitempty"`
}

EgressInsight:定义了 Zone Egress 的运行时状态,包括:

  • DiscoverySubscription:定义了由 Zone kuma-cp 创建的 ADS 订阅。
1
2
3
4
5
6
7
8
9
// ZoneEgressInsight defines the observed state of a Zone Egress.
type ZoneEgressInsight struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

// List of ADS subscriptions created by a given Zone Kuma CP.
Subscriptions []*DiscoverySubscription `protobuf:"bytes,1,rep,name=subscriptions,proto3" json:"subscriptions,omitempty"`
}

Ingress

Ingress:定义了 Zone Ingress。允许 Zone kuma-cp -> Global kuma-cp,和 Global kuma-cp -> Zone kuma-cp,Global kuma-cp 接收 Zone kuma-cp 的 Ingress 资源,并向其他 Zone kuma-cp 同步 Ingress 信息

  • Zone:标识 Ingress 服务于哪个区域。
  • Networking:定义了 Zone Ingress 的监听地址和端口,以及公开的地址端口。
  • AvailableServices:定义了可以通过该 Zone Ingress 访问的服务,是 Kuma 跨集群通信的基础。
    • Tags:服务 tag。
    • Instances:给定标签可用的服务实例数量。
    • Mesh:给定标签可用的服务实例所属的服务网格名称。
    • ExternalService:表明该服务是否为外部服务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ZoneIngress allows us to configure dataplane in the Ingress mode. In this
// mode, dataplane has only inbound interfaces. Every inbound interface matches
// with services that reside in that cluster.
type ZoneIngress struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

// Zone field contains Zone name where ingress is serving, field will be
// automatically set by Global Kuma CP
Zone string `protobuf:"bytes,1,opt,name=zone,proto3" json:"zone,omitempty"`
// Networking defines the address and port of the Ingress to listen on.
// Additionally publicly advertised address and port could be specified.
Networking *ZoneIngress_Networking `protobuf:"bytes,2,opt,name=networking,proto3" json:"networking,omitempty"`
// AvailableService contains tags that represent unique subset of
// endpoints
AvailableServices []*ZoneIngress_AvailableService `protobuf:"bytes,3,rep,name=availableServices,proto3" json:"availableServices,omitempty"`
}

type ZoneIngress_AvailableService struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

// tags of the service
Tags map[string]string `protobuf:"bytes,1,rep,name=tags,proto3" json:"tags,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// number of instances available for given tags
Instances uint32 `protobuf:"varint,2,opt,name=instances,proto3" json:"instances,omitempty"`
// mesh of the instances available for given tags
Mesh string `protobuf:"bytes,3,opt,name=mesh,proto3" json:"mesh,omitempty"`
// instance of external service available from the zone
ExternalService bool `protobuf:"varint,4,opt,name=externalService,proto3" json:"externalService,omitempty"`
}

IngressInsight:定义了 ZoneIngress 的运行时状态。允许 Zone kuma-cp -> Global kuma-cp,和 Global kuma-cp -> Zone kuma-cp,Global kuma-cp 接收 Zone kuma-cp 的 IngressInsight 资源,并向其他 Zone kuma-cp 同步 IngressInsight 信息。

1
2
3
4
5
6
7
8
9
// ZoneIngressInsight defines the observed state of a Zone Ingress.
type ZoneIngressInsight struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

// List of ADS subscriptions created by a given Zone Kuma CP.
Subscriptions []*DiscoverySubscription `protobuf:"bytes,1,rep,name=subscriptions,proto3" json:"subscriptions,omitempty"`
}

GlobalToZone

Global kuma-cp 到 zone kuma-cp 的资源,除了 Ingress 外剩下的均是来自于 Global kuma-cp 策略的更新

如 CircuitBreaker,FaultInjection,HealthCheck,Retry 等,这里不再详细说明。

KDS

kds

KDS v1

服务定义

在 KDS v1 中,global 和 zone 之间在一个双向 gRPC 流中进行资源同步,KDS 服务定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
service MultiplexService {
rpc StreamMessage(stream Message) returns (stream Message);
}

message Message {
oneof value {
envoy.api.v2.DiscoveryRequest legacy_request = 1;
envoy.api.v2.DiscoveryResponse legacy_response = 2;
envoy.service.discovery.v3.DiscoveryRequest request = 3;
envoy.service.discovery.v3.DiscoveryResponse response = 4;
}
}

流程

在 zone kuma-cp 和 global kuma-cp 建立连接后:

  • 创建 Session 对象,并启动两个协程用于读写 gRPC streaming。Session 底层抽象分离了 serverStream 和 clientStream 并实现了基于 buffer 的 streaming,用于表示 global 到 zone 和 zone 到 global 的连接。

  • 接着调用 OnSessionStarted 回调函数,开启两个协程用于表示 server(global kuma-cp)和 client(global kuma-cp)。

    • server 实现了 xDS 服务器,用于响应客户端的 xDS 请求。

    • client 在建立时:

      • 首先对所有需要从 zone 到 global(ZoneToGlobal)的资源发起 DiscoveryRequest。
      • 接收 xDS 响应,并同步资源到缓存中。
    • server 和 client 所有读写的数据,均从 serverStream 和 clientStream 中读取/发送。

image-20240128202501932

KDS v2

为什么引入 KDS v2

为什么要重新设计 KDS v2,因为 KDS v1 实现非常复杂并且可能存在一些 bug(如 issues #5373)。KDS v2 主要的变化如下:

  • 分离 kds 中 zone 和 global 之间流的共用(这引入了复杂性和 bug)。
  • 当数据从 zone 同步到 global 时,使用增量更新而不是全量更新。

KDS v1 使用一个双向流从同步 zone 和 global 之间的资源。KDS v2 引入 2 个独立的 RPC 来同步资源来简化逻辑,将简化逻辑和可读性,并且使用增量更新 Delta xDS

1
2
3
4
5
service KDSSyncService {
rpc GlobalToZoneSync(stream envoy.service.discovery.v3.DeltaDiscoveryRequest)
rpc ZoneToGlobalSync(stream envoy.service.discovery.v3.DeltaDiscoveryResponse)
returns (stream envoy.service.discovery.v3.DeltaDiscoveryRequest);
}

KDS v1 zone 向 global 发送 DiscoveryRequest 并等待资源的更新通知。global 将状态存储在缓存中,并在发生变化时发送响应。在 global 到 zone 同步的情况下效果很好,但在大型部署中更多的流量来自从 zone 到 global 的同步。每当有变化时,zone 控制平面需要将所有数据平面资源发送到 global。所以,KDS v2 使用 delta xDS,可以用更高效的方式同步资源

服务定义

KDS v2 分离出了两个接口,GlobalToZoneSync 和 ZoneToGlobalSync,用于 global 和 zone 之间的资源同步,使用 delta xDS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// KDSSyncService is a service exposed by the control-plane for the
// synchronization of the resources between zone and global control-plane.
service KDSSyncService {
// GlobalToZoneSync is logically a service exposed by global control-plane
// that allows zone control plane to connect and synchronize resources from
// the global control-plane to the zone control-plane. It uses delta xDS from
// go-control-plane and responds only with the changes to the resources.
rpc GlobalToZoneSync(stream envoy.service.discovery.v3.DeltaDiscoveryRequest)
returns (stream envoy.service.discovery.v3.DeltaDiscoveryResponse);
// ZoneToGlobalSync is logically a service exposed by global control-plane
// that allows zone control plane to connect and synchronize resources to
// the global control-plane. It uses delta xDS from go-control-plane and
// responds only with the changes to the resources.
rpc ZoneToGlobalSync(stream envoy.service.discovery.v3.DeltaDiscoveryResponse)
returns (stream envoy.service.discovery.v3.DeltaDiscoveryRequest);
}

流程

GlobalToZoneSync 和 ZoneToGobalSync 在连接开始时,都从 EventBus 订阅了 ZoneWentOffline 事件,接着向 EventBus 发送 ZoneOpenStream 事件(用于通知 HealthCheck 对新的 zone 和 的健康检查),用于结束与 zone 之间的连接(如正常结束,发生错误,健康检查超时)。

  • 在 GlobalToZoneSync 方法中,接着调用 OnGlobalToZoneSyncConnect 回调函数:

    • 开启 delta xDS 服务器,用于响应 zone 到 global 对 GlobalToZone 资源的 xDS 请求。
  • ZoneToGobalSync 方法中,调用 OnGlobalToZoneSyncConnect 回调函数:

    • 对所有 ZoneToGlobal 资源发起 DeltaDiscoveryRequest 请求。
    • 接收 xDS 响应,并同步资源到缓存中。

image-20240128201819505

其他服务

ZoneWatch

ZoneWatch 订阅 ZoneOpenStream 事件,用于添加活跃的 zone 列表,并通知 zone kuma-cp 下线。

  • 每次都会轮询所有的 zone,检查上一次发送 HealthCheck 请求的时间是否超时。
  • 如果超时,则会发送 ZoneWentOffline 事件,通知 KDS 服务器终止与 zone kuma-cp 的连接。

image-20240128210831447

GlobalKDSService

Kuma 提供在在 Global Zone 上获取 zone xDS 配置、状态等信息的 API,所以需要通过 GlobalKDSService 收集 zone 的信息,并暴露方法以进行健康检查。

image-20240128210844323