Dawn's Blogs

分享技术 记录成长

0%

Kuma学习笔记 (7) Resource Manager

Kuma 将使用的对象抽象为各种资源(Resource),如 Zone,Ingress,Dataplain,RateLimit 等,由资源管理器(Resource Manager)对资源存储(Resource Store)进行增删改查等操作

Resource Store 向上屏蔽了底层存储的差异,在 Kuma 中使用两种存储模式,kubernetes API Server 和 PostgreSQL 数据库。而 Resource Manager 屏蔽了对于各个资源操作的差异

Resource Store

接口定义

ResourceStore 接口用于定义资源的底层存储,ResourceStore 需要实现 Create、Update、Delete、Get、List 操作。

1
2
3
4
5
6
7
type ResourceStore interface {
Create(context.Context, model.Resource, ...CreateOptionsFunc) error
Update(context.Context, model.Resource, ...UpdateOptionsFunc) error
Delete(context.Context, model.Resource, ...DeleteOptionsFunc) error
Get(context.Context, model.Resource, ...GetOptionsFunc) error
List(context.Context, model.ResourceList, ...ListOptionsFunc) error
}

接口实现

在 Kuma 中,ResourceStore 接口由 k8s、memory、postgres、remote 实现。其中:

  • K8s:在 Kubernetes 模式下,以 Kubernetes API Server 存储资源。
  • Postgres:在 Universal 模式下,以 PostgreSQL 作为底层数据库存储数据。
  • Memory:内存数据库,用于测试。
  • Remote:为 kuma-ctl 做资源存储,负责以 HTTP Restful API 的方式操作 kuma-cp。
1
2
3
4
├── k8s
├── memory
├── postgres
└── remote

CustomizableResourceStore

Customizable Resource Store 为某一种具体的资源提供定制化的 Resource Store:

  • Customize 方法:为 model.ReourceType 类型的资源指定资源存储器。
  • ResourceStore 方法:返回 model.ResourceType 类型的资源存储器,如果没有该类型,则会返回默认的资源存储器。
1
2
3
4
5
6
7
8
9
10
11
// ResourceStoreWrapper is a function that takes a ResourceStore and returns a wrapped ResourceStore.
// The wrapped ResourceStore can be used to modify or augment the behavior of the original ResourceStore.
type ResourceStoreWrapper = func(delegate ResourceStore) ResourceStore

type CustomizableResourceStore interface {
ResourceStore
ResourceStore(typ model.ResourceType) ResourceStore
DefaultResourceStore() ResourceStore
Customize(typ model.ResourceType, store ResourceStore)
WrapAll(wrapper ResourceStoreWrapper)
}

Resource Manager

接口定义

Resource Manager 用于对资源(Zone、Dataplain 等)进行操作,定义的方法与 Resource Store 差不多,都是增删改查。

1
2
3
4
5
6
7
8
9
10
11
12
type ReadOnlyResourceManager interface {
Get(context.Context, model.Resource, ...store.GetOptionsFunc) error
List(context.Context, model.ResourceList, ...store.ListOptionsFunc) error
}

type ResourceManager interface {
ReadOnlyResourceManager
Create(context.Context, model.Resource, ...store.CreateOptionsFunc) error
Update(context.Context, model.Resource, ...store.UpdateOptionsFunc) error
Delete(context.Context, model.Resource, ...store.DeleteOptionsFunc) error
DeleteAll(context.Context, model.ResourceList, ...store.DeleteAllOptionsFunc) error
}

CustomizableResourceManager

Customizable Resource Manager 为某一种具体的资源提供定制化的 Resource Manager:

  • Customize 方法:为 model.ReourceType 类型的资源指定资源管理器。
  • ResourceManager 方法:返回 model.ResourceType 类型的资源管理器,如果没有该类型,则会返回默认的资源管理器。
1
2
3
4
5
6
7
8
type ResourceManagerWrapper = func(delegate ResourceManager) ResourceManager

type CustomizableResourceManager interface {
ResourceManager
Customize(model.ResourceType, ResourceManager)
ResourceManager(model.ResourceType) ResourceManager
WrapAll(ResourceManagerWrapper)
}

初始化

在 kuma-cp 初始化 Resource Manager 时,会为每一种资源指定一种资源管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
func initializeResourceManager(cfg kuma_cp.Config, builder *core_runtime.Builder) error {
defaultManager := core_manager.NewResourceManager(builder.ResourceStore())
customizableManager := core_manager.NewCustomizableResourceManager(defaultManager, nil)

customizableManager.Customize(
mesh.MeshType,
mesh_managers.NewMeshManager(
builder.ResourceStore(),
customizableManager,
builder.CaManagers(),
registry.Global(),
builder.ResourceValidators().Mesh,
cfg.Store.UnsafeDelete,
builder.Extensions(),
),
)

rateLimitValidator := ratelimit_managers.RateLimitValidator{
Store: builder.ResourceStore(),
}
customizableManager.Customize(
mesh.RateLimitType,
ratelimit_managers.NewRateLimitManager(builder.ResourceStore(), rateLimitValidator),
)

externalServiceValidator := externalservice_managers.ExternalServiceValidator{
Store: builder.ResourceStore(),
}
customizableManager.Customize(
mesh.ExternalServiceType,
externalservice_managers.NewExternalServiceManager(builder.ResourceStore(), externalServiceValidator),
)

customizableManager.Customize(
mesh.DataplaneType,
dataplane.NewDataplaneManager(builder.ResourceStore(), builder.Config().Multizone.Zone.Name, builder.ResourceValidators().Dataplane),
)

customizableManager.Customize(
mesh.DataplaneInsightType,
dataplaneinsight.NewDataplaneInsightManager(builder.ResourceStore(), builder.Config().Metrics.Dataplane),
)

customizableManager.Customize(
system.ZoneType,
zone.NewZoneManager(builder.ResourceStore(), zone.Validator{Store: builder.ResourceStore()}, builder.Config().Store.UnsafeDelete),
)

customizableManager.Customize(
system.ZoneInsightType,
zoneinsight.NewZoneInsightManager(builder.ResourceStore(), builder.Config().Metrics.Zone),
)

customizableManager.Customize(
mesh.ZoneIngressInsightType,
zoneingressinsight.NewZoneIngressInsightManager(builder.ResourceStore(), builder.Config().Metrics.Dataplane),
)

customizableManager.Customize(
mesh.ZoneEgressInsightType,
zoneegressinsight.NewZoneEgressInsightManager(builder.ResourceStore(), builder.Config().Metrics.Dataplane),
)

var cipher secret_cipher.Cipher
switch cfg.Store.Type {
case store.KubernetesStore:
cipher = secret_cipher.None() // deliberately turn encryption off on Kubernetes
case store.MemoryStore, store.PostgresStore:
cipher = secret_cipher.TODO() // get back to encryption in universal case
default:
return errors.Errorf("unknown store type %s", cfg.Store.Type)
}
var secretValidator secret_manager.SecretValidator
if cfg.IsFederatedZoneCP() {
secretValidator = secret_manager.ValidateDelete(func(ctx context.Context, secretName string, secretMesh string) error { return nil })
} else {
secretValidator = secret_manager.NewSecretValidator(builder.CaManagers(), builder.ResourceStore())
}

customizableManager.Customize(
system.SecretType,
secret_manager.NewSecretManager(builder.SecretStore(), cipher, secretValidator, cfg.Store.UnsafeDelete),
)

customizableManager.Customize(
system.GlobalSecretType,
secret_manager.NewGlobalSecretManager(builder.SecretStore(), cipher),
)

builder.WithResourceManager(customizableManager)

if builder.Config().Store.Cache.Enabled {
cachedManager, err := core_manager.NewCachedManager(
customizableManager,
builder.Config().Store.Cache.ExpirationTime.Duration,
builder.Metrics(),
builder.Tenants(),
)
if err != nil {
return err
}
builder.WithReadOnlyResourceManager(cachedManager)
} else {
builder.WithReadOnlyResourceManager(customizableManager)
}
return nil
}

Cached Manager

Cached Manager 实现了只读资源管理器 Read Only Resource Manager,为对资源的 Get 和 List 操作进行缓存(有过期时间)。

  • delegate:底层 ReadOnlyResourceManager,若缓存未命中则会从底层只读资源管理器中读取数据,并记录在缓存中。
  • cache:cached manager 的核心数据结构,用于缓存查询结果。key 为 <op>:<resource_type>:<ops.hashcode(resource_name+resource_mesh)>:tenantID
1
cacheKey := fmt.Sprintf("GET:%s:%s:%s", res.Descriptor().Name, opts.HashCode(), tenantID)
  • mutexes:为每一个 key 单独加锁,保证每一个 k-v 都互斥访问。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Cached version of the ReadOnlyResourceManager designed to be used only for use cases of eventual consistency.
// This cache is NOT consistent across instances of the control plane.
//
// When retrieving elements from cache, they point to the same instances of the resources.
// We cannot do deep copies because it would consume lots of memory, therefore you need to be extra careful to NOT modify the resources.
type cachedManager struct {
delegate ReadOnlyResourceManager
cache *cache.Cache
metrics *prometheus.CounterVec

mutexes map[string]*sync.Mutex
mapMutex sync.Mutex // guards "mutexes" field
tenants multitenant.Tenants
}