Dawn's Blogs

分享技术 记录成长

0%

线程池

线程池就是管理线程的资源池,有任务需要处理时,直接从线程池中取得线程进行处理,处理完成后线程会被放回线程池中,而不是立即销毁。Java 已经提供了线程,为什么还需要使用线程池?

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

如何创建线程池

Java 中创建线程池有两种方法:ThreadPoolExecutor 和 Executor 框架。

  1. 使用 ThreadPoolExecutor 来创建线程池(推荐)。使用这种方式可以使得程序员更加明确线程池的运行规则,规避资源耗尽的风险。

通过构造方法实现

  1. 通过 Executor 框架的工具类 Executors 来创建,可以构造多种类型的线程池。这其实就是队 ThreadPoolExecutor 构造函数的封装。

img

使用 Executors 创建线程池的弊端如下:

  • FixedThreadPoolSingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

    CachedThreadPool:使用的是同步队列 SynchronousQueue,允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。

    ScheduledThreadPoolSingleThreadScheduledExecutor:使用的无界的延迟阻塞队列 DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

阅读全文 »

简介

ThreadLocal 使得每一个线程有自己专属的本地变量。如果创建了一个 ThreadLocal 变量,那么访问这个变量的每个线程都会有这个变量的本地副本,这也是ThreadLocal变量名的由来。可以使用 get()set() 方法来获取默认值或将其值更改为当前线程所存的副本的值,从而避免了线程安全问题。

阅读全文 »

AQS

AQS(AbstractQueuedSynchronizer,抽象队列同步器),这个类在 java.util.concurrent.locks 包下面。AQS 就是一个抽象类,主要用来构建锁和同步器,如 ReentrantLock、Semaphore 等,都是基于 AQS 的。

img

核心思想

AQS 使用 int 变量 state 表示同步状态,并通过 CLH 队列来完成阻塞线程的等待。如果当前状态空闲,则将当前请求资源的线程设置为有效的工作线程;否则使用 CLH 锁,将当前线程阻塞,当资源资源再次空闲时,通过内置的 FIFO 来完成线程排队获取资源的工作。

1
2
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;

CLH 锁是对自旋锁的一种改进,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。

CLH 队列

状态信息 state 可以通过 protected 类型的getState()setState()compareAndSetState() 进行操作。并且,这几个方法都是 final 修饰的,在子类中无法被重写。

1
2
3
4
5
6
7
8
9
10
11
12
13
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

阅读全文 »

volatile 关键字

volatile 关键字可以保证变量的可见性,表示变量是共享且不稳定的,每次使用它都到主存中进行读取。volatile 关键字可以保证变量的可见性,但是不能保证原子性,synchronized 关键字二者都可以保证。

volatile 关键字有两个作用:

  1. 保证变量的可见性
  2. 防止 JVM 的指令重排

单例模式

单例模式的实现如下,其中单例的对象 uniqueInstance 使用了关键字 volatile 进行修饰。uniqueInstance = new Singleton(); 代码分为三步执行:

  1. uniqueInstance 分配内存空间。
  2. 初始化 uniqueInstance
  3. uniqueInstance 指向分配的内存地址。

但是由于 JVM 具有指令重排的特性,执行顺序有可能变成 1->3->2。指令重排在单线程环境下不会出现问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例。例如,线程 T1 执行了 1 和 3,此时 T2 调用 getUniqueInstance() 后发现 uniqueInstance 不为空,因此返回 uniqueInstance,但此时 uniqueInstance 还未被初始化。

使用 volatile 关键字可以防止上述这种指令重排的情形。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Singleton {

private volatile static Singleton uniqueInstance;

private Singleton() {
}

public static Singleton getUniqueInstance() {
//先判断对象是否已经实例过,没有实例化过才进入加锁代码
if (uniqueInstance == null) {
//类对象加锁
synchronized (Singleton.class) {
if (uniqueInstance == null) {
uniqueInstance = new Singleton();
}
}
}
return uniqueInstance;
}
}
阅读全文 »

Java 线程

多线程并不是银弹!在单核 CPU 上,如果线程是 CPU 密集型的,那么多个线程同时运行会导致频繁的线程切换,增加了系统的开销,降低了效率。如果线程是 IO 密集型的,那么多个线程同时运行可以利用 CPU 在等待 IO 时的空闲时间,提高了效率(线程数量也不能超过系统所能承载的上限)。

Java 线程和系统线程

在 Java 1.2 之前线程基于绿色线程(Green Thread)实现的,这是一种用户级别的线程;而从 Java 1.2 开始使用原生线程(Native Thread),这是内核级的线程,由操作系统内核进行线程的调度和管理。

在 Windows 和 Linux 等主流操作系统中,Java 线程采用的是一对一的线程模型,也就是一个 Java 线程对应一个系统内核线程。Solaris 系统是一个特例(Solaris 系统本身就支持多对多的线程模型),HotSpot VM 在 Solaris 上支持多对多和一对一。

一句话概括 Java 线程和操作系统线程的关系:现在的 Java 线程的本质其实就是操作系统的线程

阅读全文 »

Kubernetes 服务发现

Kubernetes Service 服务发现模型

在 Kubernetes 中,服务发现通过 Service 实现。每一个 Service 对象都被分配了一个唯一的 VIP,访问 Service.name 通过 CoreDNS 既可以访问到 VIP,kube-proxy 通过 iptables 将 VIP 映射为具体的某个 Pod IP 地址。流程为:

  1. 客户端通过 CoreDNS 解析 Kubernetes-Service 对应的 VIP。
  2. 客户端向 VIP 发起 HTTP 请求。
  3. HTTP 请求被 kube-proxy 拦截,通过 kube-proxy 创建的 iptables 将请求的目标地址映射到 Pod 的 IP 地址。

img

为什么无法使用 Kubernetes Service 作为 Dubbo 接口级服务发现

在 Dubbo 中接口级服务发现,无法直接使用 Kubernetes 作为注册中心主要有以下几点原因:

  1. 在 Dubbo 调用时,需要元数据信息。但是 Kubernetes-Service 的服务描述字段中并不包含 Dubbo 元数据信息。
  2. Dubbo 的服务注册是基于每一个进程的,每一个 Dubbo 进程均需要独立的注册
  3. Kubernetes-Service 默认为服务创建 VIP,提供 round-robin 的负载策略也与 dubbo-go自有的负载策略形成了冲突
阅读全文 »

Watchdog

什么是 Watchdog

在 Kuma KDS 中,当 Zone CP 和 Global CP 建立 streaming 连接后,需要推送资源的一方在每一条连接开启一个 Watchdog。

Watchdog 用于收集变化的资源,并定期更新 xds-cache(xds-cache 的更新会引发 xds 推送),完成从 Global CP 与 Zone CP 之间的被更新资源的主动推送过程

image-20240308110246222

Watchdog 数据结构

Watchdog 的数据结构如下:

  • Node:表示一个 envoy 节点,这里是一个 Zone CP 节点。
  • EventBus:事件总线,Watchdog 通过事件总线订阅资源更新的事件。
  • Reconciler:调和器,用于重新计算配置资源,并更新 xds-cache。
  • ProvidedTypes:需要同步的所有资源类型。
1
2
3
4
5
6
7
8
9
10
type EventBasedWatchdog struct {
Ctx context.Context
Node *envoy_core.Node
EventBus events.EventBus
Reconciler reconcile.Reconciler
ProvidedTypes map[model.ResourceType]struct{}
Log logr.Logger
NewFlushTicker func() *time.Ticker
NewFullResyncTicker func() *time.Ticker
}

流程

在 Start 方法中定义了 Watchdog 的工作流程:

  1. 首先,从事件总线中订阅 ResourceChangedEvent 资源更新事件
1
2
3
4
5
6
7
8
9
10
listener := e.EventBus.Subscribe(func(event events.Event) bool {
resChange, ok := event.(events.ResourceChangedEvent)
if !ok {
return false
}
if _, ok := e.ProvidedTypes[resChange.Type]; !ok {
return false
}
return true
})
  1. changedTypes 记录已经修改的资源,第一次同步的是全量的资源
1
2
3
4
5
// for the first reconcile assign all types
changedTypes := maps.Clone(e.ProvidedTypes)
reasons := map[string]struct{}{
ReasonResync: {},
}
  1. 开启两个定时器 flushTicker 和 fullResyncTicker。flushTicker 用于定期向 xds-cache 中同步被修改的资源;fullResyncTicker 用于定期向 xds-cache 进行全量的资源同步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for {
select {
case <-stop:
if err := e.Reconciler.Clear(e.Ctx, e.Node); err != nil {
e.Log.Error(err, "reconcile clear failed")
}
listener.Close()
return
case <-flushTicker.C:
if len(changedTypes) == 0 {
continue
}
reason := strings.Join(util_maps.SortedKeys(reasons), "_and_")
e.Log.V(1).Info("reconcile", "changedTypes", changedTypes, "reason", reason)
err, _ := e.Reconciler.Reconcile(e.Ctx, e.Node, changedTypes, e.Log)
if err != nil && errors.Is(err, context.Canceled) {
e.Log.Error(err, "reconcile failed", "changedTypes", changedTypes, "reason", reason)
} else {
changedTypes = map[model.ResourceType]struct{}{}
reasons = map[string]struct{}{}
}
case <-fullResyncTicker.C:
e.Log.V(1).Info("schedule full resync")
changedTypes = maps.Clone(e.ProvidedTypes)
reasons[ReasonResync] = struct{}{}
case event := <-listener.Recv():
resChange := event.(events.ResourceChangedEvent)
e.Log.V(1).Info("schedule sync for type", "typ", resChange.Type)
changedTypes[resChange.Type] = struct{}{}
reasons[ReasonEvent] = struct{}{}
}
}

Reconciler 调和器

Reconciler 用于重新计算给定 node 的配置资源信息,并同步到 xds-cache 中,接口定义如下:

  • Reconcile 方法:调和已经修改的资源类型,从老的 Snapshot 中取未改变的资源,再重新查询已经改变的资源,并以此创建新的 Snapshot 并更新给 xds-cache。
  • Clear 方法:在 xds-cache 中清理 Snapshot,在 streaming 结束时调用。
1
2
3
4
5
6
7
// Reconciler re-computes configuration for a given node.
type Reconciler interface {
// Reconcile reconciles state of node given changed resource types.
// Returns error and bool which is true if any resource was changed.
Reconcile(context.Context, *envoy_core.Node, map[model.ResourceType]struct{}, logr.Logger) (error, bool)
Clear(context.Context, *envoy_core.Node) error
}

Reconcile 方法

Reconcile 方法调和已经修改的资源类型,从老的 Snapshot 中取未改变的资源,再重新查询已经改变的资源,并以此创建新的 Snapshot 并更新给 xds-cache。具体流程如下:

  1. 首先从 xds-cache 中查询老的 Snapshot。
1
2
id := r.hasher.ID(node)
old, _ := r.cache.GetSnapshot(id)
  1. 对于未变更的资源,直接从老的 Snapshot 中获取。接着调用 r.generator.GenerateSnapshot 方法,重新查询已经修改的资源,并根据已变更和未变更的资源创建新的 Snapshot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// construct builder with unchanged types from the old snapshot
builder := cache.NewSnapshotBuilder()
if old != nil {
for _, typ := range util_dds.GetSupportedTypes() {
resType := core_model.ResourceType(typ)
if _, ok := changedTypes[resType]; ok {
continue
}

oldRes := old.GetResources(typ)
if len(oldRes) > 0 {
builder = builder.With(resType, maps.Values(oldRes))
}
}
}

new, err := r.generator.GenerateSnapshot(ctx, node, builder, changedTypes)
if err != nil {
return err, false
}
if new == nil {
return errors.New("nil snapshot"), false
}
  1. 若新的 Snapshot 相比于老的发生了修改,则同步到 xds-cache 中。
1
2
3
4
5
6
7
new, changed := r.Version(new, old)
if changed {
r.logChanges(logger, new, old, node)
r.meterConfigReadyForDelivery(new, old, node.Id)
return r.cache.SetSnapshot(ctx, id, new), true
}
return nil, false

延迟消息投递

开源的 NSQ、RabbitMQ、ActiveMQ 和 Pulsar 也都内置了延迟消息的处理能力。虽然每个 MQ 项目的使用和实现方式不同,但核心实现思路都一样:Producer 将一个延迟消息发送到某个 Topic 中,Broker 将延迟消息放到临时存储进行暂存,延迟跟踪服务(Delayed Tracker Service)会检查消息是否到期,将到期的消息进行投递

图片

延迟消息投递是要暂缓对当前消息的处理,在未来的某个时间点再触发投递,实际的应用场景非常多,比如异常检测重试、订单超时取消、预约提醒等。

Pulsar 最早是在 2.4.0 引入了延迟消息投递的特性,在 Pulsar 中使用延迟消息,可以精确指定延迟投递的时间,有 deliverAfter 和 deliverAt 两种方式。其中 deliverAt 可以指定具体的时间戳;deliverAfter 可以指定在当前多长时间后执行。

在 Pulsar 中,可以支持跨度很大的延时消息,比方说一个月、半年;同时在一个 Topic 里,既支持延时消息,也支持非延时消息。下图展示了 Pulsar 中延迟消息的具体过程:

图片

实现原理

Pulsar 实现延迟消息投递的方式比较简单,Delayed Message Tracker 在堆外内存维护着一个 delayed index 优先级队列,根据延迟时间进行堆排序,延迟时间最短的会放在头上,时间越长越靠后。

Consumer 在消费时,会先去 Delayed Message Tracker 检查,是否有到期需要投递的消息,如果有到期的消息,则从 Tracker 中拿出对应的 index,找到对应的消息进行消费;如果没有到期的消息,则直接消费正常的消息。

如果集群出现 Broker 宕机,Pulsar 会重建 delayed index 队列,来保证延迟投递的消息能够正常工作。

图片

风险

从 Pulsar 的延迟消息投递实现原理可以看出,该方法简单高效,对 Pulsar 内核侵入性较小,可以支持到任意时间的延迟消息。但同时发现,Pulsar 的实现方案无法支持大规模使用延迟消息,主要有以下两个原因:

1. delayed index队列受到内存限制

2. delayed index队列重建时间开销

未来工作

Pulsar 目前的延迟消息投递方案简单高效,但处理大规模延迟消息时仍然存在风险。关于延迟消息投递,社区和数据平台部 MQ 团队下一步将聚焦在支持大规模延迟消息。目前讨论的方案是在 delayed index 队列加入时间分区,Broker 只加载当前较近的时间片 delayed index 到内存,其余时间片分区持久化磁盘,

Consumer 订阅 Topic 的时候,通过订阅模式来控制消息的使用模式,指定如何将消息投递给一个组一个的或多个的 Consumer。Pulsar 支持 4 种订阅模式:

  • exclusive(独占模式)
  • failover(故障转移模式,也叫灾备模式)
  • shared(共享模式)
  • key-shared(基于key的共享模式)

订阅模式

exclusive

独占模式,只能有一个 Consumer 绑定到订阅上。如果多于一个 Consumer 尝试以使用相同的订阅订阅 Topic,就会抛出异常且无法连接。

pulsar-exclusive-subscriptions.png

failover

在灾备订阅模式中,多个 Consumer 可以绑定到同一个订阅上, Consumer 将会按字典顺序排序第一个 Consumer 被初始化为唯一接受消息的消费者,被称为 Master Consumer。 当 Master Consumer 断开时,所有的未被确认和后续进入的消息将会被投递给下一个 Consumer。

灾备模式提供了高可用性。

pulsar-failover-subscriptions.png

shared

在共享模式中,多个 Consumer 可以绑定到同一个订阅上。 消息通过 round robin 轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。 当一个消费者断开连接时,所有已经投递给它但还没有被确认的消息将被重新投递,分发给其它存活的消费者。

pulsar-shared-subscriptions.png

key-shared

key-shared 模式是共享模式的一种特例,它也允许多个 Consumer 可以绑定到同一个订阅上,与共享模式中的 round robin 轮询消费消息不同,key-shared 模式增加了一个辅助 key,确保具有相同 key 的消息被交付给相同的消费者

Pulsar 在逻辑上的层级,从上到下依次为租户、命名空间、Topic。

每个租户都可以有单独的认证和授权机制,可以针对租户设置存储配额、消息生存时间 TTL 和隔离策略。

pulsar-logical-arch2.png

默认情况下一个 Topic 中的消息只能由 Pulsar 服务层中的一个 Broker 提供服务,因此单个 Topic 的吞吐量受限于为其提供服务的 Broker 的计算能力。

好在 Pulsar 还支持分区主题,这允许多个 Broker 提供服务,就可以将负载分布到多台机器上。在 Pulsar 内部,分区主题被实现为 N 个内部 Topic,N 是分区数量,分区跨 Broker 的分布由 Pulsar 自动管理,这对用户来说是完全透明的。 Pulsar 将分区主题实现为多个内部主题,这样就可以在需要增加分区数量的时候不必重新平衡整个主题,Pulsar 只需在其内部创建新的内部主题就能够立即接收新的消息,使客户端能在不中断的情况下对现有分区进行消息读写。

当生产者发布消息到分区主题时,不需要特意指定路由模式,默认以轮循的方式将消息均匀分布到各个分区。支持以下3种路由模式:

  • SinglePartition:如果没有消息 key 提供,生产者将会随机选择一个固定的分区来发布消息(同一个生产者生产在同一个分区中),可用于将来自特定生产者的消息分组在一起,以在没有键值时维护消息顺序。
  • RoundRobinPartition:如果没有消息 key 提供,将以轮循的方式将消息均匀分布到各个分区。
  • CustomPartition:定制实现路由模式,控制消息分发到 Topic 的分区中。

如果提供了消息 key,会以 key 做 hash,然后分配消息到指定分区。因此分区中消息的顺序与路由模式和消息的key有关,所有拥有相同的 key 的消息有序,将会被发送到相同的分区