Dawn's Blogs

分享技术 记录成长

0%

Java面试之并发编程 (3) AQS和其实现的常用同步器

AQS

AQS(AbstractQueuedSynchronizer,抽象队列同步器),这个类在 java.util.concurrent.locks 包下面。AQS 就是一个抽象类,主要用来构建锁和同步器,如 ReentrantLock、Semaphore 等,都是基于 AQS 的。

img

核心思想

AQS 使用 int 变量 state 表示同步状态,并通过 CLH 队列来完成阻塞线程的等待。如果当前状态空闲,则将当前请求资源的线程设置为有效的工作线程;否则使用 CLH 锁,将当前线程阻塞,当资源资源再次空闲时,通过内置的 FIFO 来完成线程排队获取资源的工作。

1
2
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;

CLH 锁是对自旋锁的一种改进,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。

CLH 队列

状态信息 state 可以通过 protected 类型的getState()setState()compareAndSetState() 进行操作。并且,这几个方法都是 final 修饰的,在子类中无法被重写。

1
2
3
4
5
6
7
8
9
10
11
12
13
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

常用同步器

ReentrantLock 可重入锁

可重入的互斥锁 ReentrantLock,用 AQS 中的 state 变量,用来表示锁的占用状态。state 的初始值为 0,表示锁处于未锁定状态。

ReentrantLock 和 synchronized 的异同:

相同点:二者都是可重入锁。

不同点:

  • synchronized 依赖于 JVM 是虚拟机层面的,而 ReentrantLock 依赖于 API 是 JDK 层面的。
  • ReentrantLock 提供了一些高级功能:
    • 等待可中断 :ReentrantLock 提供了 lockInterruptibly 方法来实现中段等待锁。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情。
    • 可实现公平锁:Reentrant 可以指定公平锁还是非公平锁,而 synchronized 是能是 非公平锁。
    • 可实现选择性通知:synchronized 与 wait 和 notify/notifyAll 结合可以实现等待/通知机制,而 ReentrantLock 借助 Condition 接口也可以实现,并且是选择性的通知(Condition 的 signalAll 方法,只会唤醒注册在该 Condition 实例中的所有等待线程,实现选择性)。

加锁 解锁

当线程 A 调用 lock() 方法时,会尝试通过 tryAcquire() 方法独占该锁,并让 state 的值加 1。如果成功了,那么线程 A 就获取到了锁。

如果失败了,那么线程 A 就会被加入到一个等待队列(CLH 队列)中,直到其他线程释放该锁。

可重入性

假设线程 A 获取锁成功了,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加)。这就是可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。

Semaphore 信号量

Semaphore 定义多个资源,因此控制访问资源的线程数量,使用 acquire() 方法获取一个资源,release() 当资源数量为 1 时退化为排他锁。Semaphore 有两种模式:

  • 公平模式:调用 acquire() 方法的顺序就是获取资源的顺序,遵循 FIFO;
  • 非公平模式:抢占式的,默认创建的是非公平的。
1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

可以看到,semaphore 基于 Sync 类同步器 sync 实现。Sync 继承了 AbstractQueuedSynchronizer 并重写了其中的某些方法。并且,Sync 对应的还有两个子类 NonfairSync(对应非公平模式) 和 FairSync(对应公平模式)。

1
2
3
4
5
6
7
8
9
private static final class Sync extends AbstractQueuedSynchronizer {
// ...
}
static final class NonfairSync extends Sync {
// ...
}
static final class FairSync extends Sync {
// ...
}

acquire 获取

调用semaphore.acquire() ,线程尝试获取许可证,如果 state > 0 的话,则表示可以获取成功,如果 state <= 0 的话,则表示许可证数量不足,获取失败。

如果可以获取成功的话(state > 0 ),会尝试使用 CAS 操作去修改 state 的值 state=state-1。如果获取失败则会创建一个 Node 节点加入等待队列,挂起当前线程。

1
2
3
4
5
6
7
8
9
10
// 获取1个许可证
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// 获取一个或者多个许可证
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

acquireSharedInterruptibly 方法是 AbstractQueuedSynchronizer 中的默认实现,如下所示。其中,tryAcquireShared 方法是 Sync 重写的方法。

1
2
3
4
5
6
7
8
9
// 共享模式下获取许可证,获取成功则返回,失败则加入等待队列,挂起线程
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取许可证,arg为获取许可证个数,当获取失败时,则创建一个节点加入等待队列,挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

release 释放

调用semaphore.release(); ,线程尝试释放许可证,并使用 CAS 操作去修改 state 的值 state=state+1。释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改 state 的值 state=state-1 ,如果 state > 0 则获取令牌成功,否则重新进入等待队列,挂起线程。

1
2
3
4
5
6
7
8
9
10
// 释放一个许可证
public void release() {
sync.releaseShared(1);
}

// 释放一个或者多个许可证
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

releaseShared 方法是 AbstractQueuedSynchronizer 中的默认实现,如下所示。其中,tryReleaseShared 方法是 Sync 重写的方法。

1
2
3
4
5
6
7
8
9
10
11
// 释放共享锁
// 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//释放当前节点的后置等待节点
doReleaseShared();
return true;
}
return false;
}

CountDownLatch 倒计时器

CountDownLatch 类似于 Golang 中的 WaitGroup,允许 n 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。CountDown 中计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,不能再次使用。

当线程调用 countDown 方法时,将 state 减一。当 state 为 0 时,表示所有的线程都调用了 countDown 方法,在 CountDownLatch 上等待的线程就会被唤醒。

当线程调用 await 方法时,如果 state 不为 0,那就证明任务还没有执行完毕,当前线程会被加入到 CLI 队列中阻塞。

CyclicBarrier 循环栅栏

CyclicBarrier 和 CountDownLatch 类似,但是功能更加强大。

CountDownLatch 基于 AQS 实现的,而 CyclicBarrier 是基于 ReentrantLock(ReentrantLock 也是基于 AQS 的)和 Condition 的。

CyclicBarrier 定义了 parties 和 count,count 作为计数器被初始化为 parties,当一个线程到了栅栏这里了,那么就将计数器减一,如果 count 为零了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务,并且再次将 count 设置为 parties 进行拦截。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//每次拦截的线程数
private final int parties;
//计数器
private int count;

public CyclicBarrier(int parties) {
this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}