Dawn's Blogs

分享技术 记录成长

0%

原子类都存放在java.util.concurrent.atomic下:

JUC原子类概览

根据操作的数据类型,可以将 JUC 包中的原子类分为 4 类:

基本类型

  • AtomicInteger:整型原子类。
  • AtomicLong:长整型原子类。
  • AtomicBoolean:布尔型原子类。

数组类型

  • AtomicIntegerArray:整型数组原子类。
  • AtomicLongArray:长整型数组原子类。
  • AtomicReferenceArray:引用类型数组原子类。

引用类型

  • AtomicReference:引用类型原子类。
  • AtomicMarkableReference:原子更新带有标记的引用类型,该类将 boolean 标记与引用关联起来。
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题

对象的属性修改类型

  • AtomicIntegerFieldUpdater:原子更新整型字段的更新器。
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器。
  • AtomicReferenceFieldUpdater:原子更新引用类型里的字段。

Java 的悲观锁可以使用 ReentrantLock,乐观锁采用版本号或者 CAS 实现。

CAS 的 ABA 问题:

如果一个变量 V 初次读取的时候是 A 值,并且在准备赋值的时候检查到它仍然是 A 值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回 A,那 CAS 操作就会误认为它从来没有被修改过。这个问题被称为 CAS 操作的 ABA 问题。

ABA 问题的解决思路是在变量前面追加上版本号或者时间戳。JDK 1.5 以后的 AtomicStampedReference 类就是用来解决 ABA 问题的,其中的 compareAndSet() 方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

1
2
3
4
5
6
7
8
9
10
11
12
public boolean compareAndSet(V   expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

Feature

Feature 的思想是异步调用,当执行一个耗时很长的任务时,可以将这个耗时任务交给一个子线程去异步执行,然后通过 Feature 类获取任务的执行结果

在 Java 中 Feature 是一个泛型接口,位于 java.util.concurrent 包下,其中定义了 5 个方法,主要包括下面这 4 个功能:

  • 取消任务;
  • 判断任务是否被取消;
  • 判断任务是否已经执行完成;
  • 获取任务执行结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// V 代表了Future执行的任务返回值的类型
public interface Future<V> {
// 取消任务执行
// 成功取消返回 true,否则返回 false
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否被取消
boolean isCancelled();
// 判断任务是否已经执行完成
boolean isDone();
// 获取任务执行结果
V get() throws InterruptedException, ExecutionException;
// 指定时间内没有返回计算结果就抛出 TimeOutException 异常
V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutExceptio

}

FeatureTask 类是 Feature 的具体实现。

CompletableFuture

Future 在实际使用过程中存在一些局限性比如不支持异步任务的编排组合、获取计算结果的 get() 方法为阻塞调用。Java 8 才被引入CompletableFuture 类可以解决Future 的这些缺陷。

CompletableFuture 除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。

img

线程池

线程池就是管理线程的资源池,有任务需要处理时,直接从线程池中取得线程进行处理,处理完成后线程会被放回线程池中,而不是立即销毁。Java 已经提供了线程,为什么还需要使用线程池?

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

如何创建线程池

Java 中创建线程池有两种方法:ThreadPoolExecutor 和 Executor 框架。

  1. 使用 ThreadPoolExecutor 来创建线程池(推荐)。使用这种方式可以使得程序员更加明确线程池的运行规则,规避资源耗尽的风险。

通过构造方法实现

  1. 通过 Executor 框架的工具类 Executors 来创建,可以构造多种类型的线程池。这其实就是队 ThreadPoolExecutor 构造函数的封装。

img

使用 Executors 创建线程池的弊端如下:

  • FixedThreadPoolSingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

    CachedThreadPool:使用的是同步队列 SynchronousQueue,允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。

    ScheduledThreadPoolSingleThreadScheduledExecutor:使用的无界的延迟阻塞队列 DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

阅读全文 »

简介

ThreadLocal 使得每一个线程有自己专属的本地变量。如果创建了一个 ThreadLocal 变量,那么访问这个变量的每个线程都会有这个变量的本地副本,这也是ThreadLocal变量名的由来。可以使用 get()set() 方法来获取默认值或将其值更改为当前线程所存的副本的值,从而避免了线程安全问题。

阅读全文 »

AQS

AQS(AbstractQueuedSynchronizer,抽象队列同步器),这个类在 java.util.concurrent.locks 包下面。AQS 就是一个抽象类,主要用来构建锁和同步器,如 ReentrantLock、Semaphore 等,都是基于 AQS 的。

img

核心思想

AQS 使用 int 变量 state 表示同步状态,并通过 CLH 队列来完成阻塞线程的等待。如果当前状态空闲,则将当前请求资源的线程设置为有效的工作线程;否则使用 CLH 锁,将当前线程阻塞,当资源资源再次空闲时,通过内置的 FIFO 来完成线程排队获取资源的工作。

1
2
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;

CLH 锁是对自旋锁的一种改进,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。

CLH 队列

状态信息 state 可以通过 protected 类型的getState()setState()compareAndSetState() 进行操作。并且,这几个方法都是 final 修饰的,在子类中无法被重写。

1
2
3
4
5
6
7
8
9
10
11
12
13
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

阅读全文 »

volatile 关键字

volatile 关键字可以保证变量的可见性,表示变量是共享且不稳定的,每次使用它都到主存中进行读取。volatile 关键字可以保证变量的可见性,但是不能保证原子性,synchronized 关键字二者都可以保证。

volatile 关键字有两个作用:

  1. 保证变量的可见性
  2. 防止 JVM 的指令重排

单例模式

单例模式的实现如下,其中单例的对象 uniqueInstance 使用了关键字 volatile 进行修饰。uniqueInstance = new Singleton(); 代码分为三步执行:

  1. uniqueInstance 分配内存空间。
  2. 初始化 uniqueInstance
  3. uniqueInstance 指向分配的内存地址。

但是由于 JVM 具有指令重排的特性,执行顺序有可能变成 1->3->2。指令重排在单线程环境下不会出现问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例。例如,线程 T1 执行了 1 和 3,此时 T2 调用 getUniqueInstance() 后发现 uniqueInstance 不为空,因此返回 uniqueInstance,但此时 uniqueInstance 还未被初始化。

使用 volatile 关键字可以防止上述这种指令重排的情形。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Singleton {

private volatile static Singleton uniqueInstance;

private Singleton() {
}

public static Singleton getUniqueInstance() {
//先判断对象是否已经实例过,没有实例化过才进入加锁代码
if (uniqueInstance == null) {
//类对象加锁
synchronized (Singleton.class) {
if (uniqueInstance == null) {
uniqueInstance = new Singleton();
}
}
}
return uniqueInstance;
}
}
阅读全文 »

Java 线程

多线程并不是银弹!在单核 CPU 上,如果线程是 CPU 密集型的,那么多个线程同时运行会导致频繁的线程切换,增加了系统的开销,降低了效率。如果线程是 IO 密集型的,那么多个线程同时运行可以利用 CPU 在等待 IO 时的空闲时间,提高了效率(线程数量也不能超过系统所能承载的上限)。

Java 线程和系统线程

在 Java 1.2 之前线程基于绿色线程(Green Thread)实现的,这是一种用户级别的线程;而从 Java 1.2 开始使用原生线程(Native Thread),这是内核级的线程,由操作系统内核进行线程的调度和管理。

在 Windows 和 Linux 等主流操作系统中,Java 线程采用的是一对一的线程模型,也就是一个 Java 线程对应一个系统内核线程。Solaris 系统是一个特例(Solaris 系统本身就支持多对多的线程模型),HotSpot VM 在 Solaris 上支持多对多和一对一。

一句话概括 Java 线程和操作系统线程的关系:现在的 Java 线程的本质其实就是操作系统的线程

阅读全文 »

Kubernetes 服务发现

Kubernetes Service 服务发现模型

在 Kubernetes 中,服务发现通过 Service 实现。每一个 Service 对象都被分配了一个唯一的 VIP,访问 Service.name 通过 CoreDNS 既可以访问到 VIP,kube-proxy 通过 iptables 将 VIP 映射为具体的某个 Pod IP 地址。流程为:

  1. 客户端通过 CoreDNS 解析 Kubernetes-Service 对应的 VIP。
  2. 客户端向 VIP 发起 HTTP 请求。
  3. HTTP 请求被 kube-proxy 拦截,通过 kube-proxy 创建的 iptables 将请求的目标地址映射到 Pod 的 IP 地址。

img

为什么无法使用 Kubernetes Service 作为 Dubbo 接口级服务发现

在 Dubbo 中接口级服务发现,无法直接使用 Kubernetes 作为注册中心主要有以下几点原因:

  1. 在 Dubbo 调用时,需要元数据信息。但是 Kubernetes-Service 的服务描述字段中并不包含 Dubbo 元数据信息。
  2. Dubbo 的服务注册是基于每一个进程的,每一个 Dubbo 进程均需要独立的注册
  3. Kubernetes-Service 默认为服务创建 VIP,提供 round-robin 的负载策略也与 dubbo-go自有的负载策略形成了冲突
阅读全文 »

Watchdog

什么是 Watchdog

在 Kuma KDS 中,当 Zone CP 和 Global CP 建立 streaming 连接后,需要推送资源的一方在每一条连接开启一个 Watchdog。

Watchdog 用于收集变化的资源,并定期更新 xds-cache(xds-cache 的更新会引发 xds 推送),完成从 Global CP 与 Zone CP 之间的被更新资源的主动推送过程

image-20240308110246222

Watchdog 数据结构

Watchdog 的数据结构如下:

  • Node:表示一个 envoy 节点,这里是一个 Zone CP 节点。
  • EventBus:事件总线,Watchdog 通过事件总线订阅资源更新的事件。
  • Reconciler:调和器,用于重新计算配置资源,并更新 xds-cache。
  • ProvidedTypes:需要同步的所有资源类型。
1
2
3
4
5
6
7
8
9
10
type EventBasedWatchdog struct {
Ctx context.Context
Node *envoy_core.Node
EventBus events.EventBus
Reconciler reconcile.Reconciler
ProvidedTypes map[model.ResourceType]struct{}
Log logr.Logger
NewFlushTicker func() *time.Ticker
NewFullResyncTicker func() *time.Ticker
}

流程

在 Start 方法中定义了 Watchdog 的工作流程:

  1. 首先,从事件总线中订阅 ResourceChangedEvent 资源更新事件
1
2
3
4
5
6
7
8
9
10
listener := e.EventBus.Subscribe(func(event events.Event) bool {
resChange, ok := event.(events.ResourceChangedEvent)
if !ok {
return false
}
if _, ok := e.ProvidedTypes[resChange.Type]; !ok {
return false
}
return true
})
  1. changedTypes 记录已经修改的资源,第一次同步的是全量的资源
1
2
3
4
5
// for the first reconcile assign all types
changedTypes := maps.Clone(e.ProvidedTypes)
reasons := map[string]struct{}{
ReasonResync: {},
}
  1. 开启两个定时器 flushTicker 和 fullResyncTicker。flushTicker 用于定期向 xds-cache 中同步被修改的资源;fullResyncTicker 用于定期向 xds-cache 进行全量的资源同步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for {
select {
case <-stop:
if err := e.Reconciler.Clear(e.Ctx, e.Node); err != nil {
e.Log.Error(err, "reconcile clear failed")
}
listener.Close()
return
case <-flushTicker.C:
if len(changedTypes) == 0 {
continue
}
reason := strings.Join(util_maps.SortedKeys(reasons), "_and_")
e.Log.V(1).Info("reconcile", "changedTypes", changedTypes, "reason", reason)
err, _ := e.Reconciler.Reconcile(e.Ctx, e.Node, changedTypes, e.Log)
if err != nil && errors.Is(err, context.Canceled) {
e.Log.Error(err, "reconcile failed", "changedTypes", changedTypes, "reason", reason)
} else {
changedTypes = map[model.ResourceType]struct{}{}
reasons = map[string]struct{}{}
}
case <-fullResyncTicker.C:
e.Log.V(1).Info("schedule full resync")
changedTypes = maps.Clone(e.ProvidedTypes)
reasons[ReasonResync] = struct{}{}
case event := <-listener.Recv():
resChange := event.(events.ResourceChangedEvent)
e.Log.V(1).Info("schedule sync for type", "typ", resChange.Type)
changedTypes[resChange.Type] = struct{}{}
reasons[ReasonEvent] = struct{}{}
}
}

Reconciler 调和器

Reconciler 用于重新计算给定 node 的配置资源信息,并同步到 xds-cache 中,接口定义如下:

  • Reconcile 方法:调和已经修改的资源类型,从老的 Snapshot 中取未改变的资源,再重新查询已经改变的资源,并以此创建新的 Snapshot 并更新给 xds-cache。
  • Clear 方法:在 xds-cache 中清理 Snapshot,在 streaming 结束时调用。
1
2
3
4
5
6
7
// Reconciler re-computes configuration for a given node.
type Reconciler interface {
// Reconcile reconciles state of node given changed resource types.
// Returns error and bool which is true if any resource was changed.
Reconcile(context.Context, *envoy_core.Node, map[model.ResourceType]struct{}, logr.Logger) (error, bool)
Clear(context.Context, *envoy_core.Node) error
}

Reconcile 方法

Reconcile 方法调和已经修改的资源类型,从老的 Snapshot 中取未改变的资源,再重新查询已经改变的资源,并以此创建新的 Snapshot 并更新给 xds-cache。具体流程如下:

  1. 首先从 xds-cache 中查询老的 Snapshot。
1
2
id := r.hasher.ID(node)
old, _ := r.cache.GetSnapshot(id)
  1. 对于未变更的资源,直接从老的 Snapshot 中获取。接着调用 r.generator.GenerateSnapshot 方法,重新查询已经修改的资源,并根据已变更和未变更的资源创建新的 Snapshot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// construct builder with unchanged types from the old snapshot
builder := cache.NewSnapshotBuilder()
if old != nil {
for _, typ := range util_dds.GetSupportedTypes() {
resType := core_model.ResourceType(typ)
if _, ok := changedTypes[resType]; ok {
continue
}

oldRes := old.GetResources(typ)
if len(oldRes) > 0 {
builder = builder.With(resType, maps.Values(oldRes))
}
}
}

new, err := r.generator.GenerateSnapshot(ctx, node, builder, changedTypes)
if err != nil {
return err, false
}
if new == nil {
return errors.New("nil snapshot"), false
}
  1. 若新的 Snapshot 相比于老的发生了修改,则同步到 xds-cache 中。
1
2
3
4
5
6
7
new, changed := r.Version(new, old)
if changed {
r.logChanges(logger, new, old, node)
r.meterConfigReadyForDelivery(new, old, node.Id)
return r.cache.SetSnapshot(ctx, id, new), true
}
return nil, false