AQS
AQS(AbstractQueuedSynchronizer,抽象队列同步器),这个类在 java.util.concurrent.locks
包下面。AQS 就是一个抽象类,主要用来构建锁和同步器,如 ReentrantLock、Semaphore 等,都是基于 AQS 的。
核心思想
AQS 使用 int 变量 state 表示同步状态,并通过 CLH 队列来完成阻塞线程的等待。如果当前状态空闲,则将当前请求资源的线程设置为有效的工作线程;否则使用 CLH 锁,将当前线程阻塞,当资源资源再次空闲时,通过内置的 FIFO 来完成线程排队获取资源的工作。
1 | // 共享变量,使用volatile修饰保证线程可见性 |
CLH 锁是对自旋锁的一种改进,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。
状态信息 state
可以通过 protected
类型的getState()
、setState()
和compareAndSetState()
进行操作。并且,这几个方法都是 final
修饰的,在子类中无法被重写。
1 | //返回同步状态的当前值 |
常用同步器
ReentrantLock 可重入锁
可重入的互斥锁 ReentrantLock
,用 AQS 中的 state
变量,用来表示锁的占用状态。state
的初始值为 0,表示锁处于未锁定状态。
ReentrantLock 和 synchronized 的异同:
相同点:二者都是可重入锁。
不同点:
- synchronized 依赖于 JVM 是虚拟机层面的,而 ReentrantLock 依赖于 API 是 JDK 层面的。
- ReentrantLock 提供了一些高级功能:
- 等待可中断 :ReentrantLock 提供了 lockInterruptibly 方法来实现中段等待锁。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情。
- 可实现公平锁:Reentrant 可以指定公平锁还是非公平锁,而 synchronized 是能是 非公平锁。
- 可实现选择性通知:synchronized 与 wait 和 notify/notifyAll 结合可以实现等待/通知机制,而 ReentrantLock 借助 Condition 接口也可以实现,并且是选择性的通知(Condition 的 signalAll 方法,只会唤醒注册在该 Condition 实例中的所有等待线程,实现选择性)。
加锁 解锁
当线程 A 调用 lock()
方法时,会尝试通过 tryAcquire()
方法独占该锁,并让 state
的值加 1。如果成功了,那么线程 A 就获取到了锁。
如果失败了,那么线程 A 就会被加入到一个等待队列(CLH 队列)中,直到其他线程释放该锁。
可重入性
假设线程 A 获取锁成功了,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加)。这就是可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。
Semaphore 信号量
Semaphore 定义多个资源,因此控制访问资源的线程数量,使用 acquire()
方法获取一个资源,release()
当资源数量为 1 时退化为排他锁。Semaphore 有两种模式:
- 公平模式:调用
acquire()
方法的顺序就是获取资源的顺序,遵循 FIFO; - 非公平模式:抢占式的,默认创建的是非公平的。
1 | public Semaphore(int permits) { |
可以看到,semaphore 基于 Sync 类同步器 sync 实现。Sync 继承了 AbstractQueuedSynchronizer 并重写了其中的某些方法。并且,Sync 对应的还有两个子类 NonfairSync(对应非公平模式) 和 FairSync(对应公平模式)。
1 | private static final class Sync extends AbstractQueuedSynchronizer { |
acquire 获取
调用semaphore.acquire()
,线程尝试获取许可证,如果 state > 0
的话,则表示可以获取成功,如果 state <= 0
的话,则表示许可证数量不足,获取失败。
如果可以获取成功的话(state > 0
),会尝试使用 CAS 操作去修改 state
的值 state=state-1
。如果获取失败则会创建一个 Node 节点加入等待队列,挂起当前线程。
1 | // 获取1个许可证 |
acquireSharedInterruptibly 方法是 AbstractQueuedSynchronizer 中的默认实现,如下所示。其中,tryAcquireShared 方法是 Sync 重写的方法。
1 | // 共享模式下获取许可证,获取成功则返回,失败则加入等待队列,挂起线程 |
release 释放
调用semaphore.release();
,线程尝试释放许可证,并使用 CAS 操作去修改 state
的值 state=state+1
。释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改 state
的值 state=state-1
,如果 state > 0
则获取令牌成功,否则重新进入等待队列,挂起线程。
1 | // 释放一个许可证 |
releaseShared 方法是 AbstractQueuedSynchronizer 中的默认实现,如下所示。其中,tryReleaseShared 方法是 Sync 重写的方法。
1 | // 释放共享锁 |
CountDownLatch 倒计时器
CountDownLatch 类似于 Golang 中的 WaitGroup,允许 n 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。CountDown 中计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,不能再次使用。
当线程调用 countDown 方法时,将 state 减一。当 state 为 0 时,表示所有的线程都调用了 countDown 方法,在 CountDownLatch 上等待的线程就会被唤醒。
当线程调用 await 方法时,如果 state 不为 0,那就证明任务还没有执行完毕,当前线程会被加入到 CLI 队列中阻塞。
CyclicBarrier 循环栅栏
CyclicBarrier 和 CountDownLatch 类似,但是功能更加强大。
CountDownLatch 基于 AQS 实现的,而 CyclicBarrier 是基于 ReentrantLock(ReentrantLock 也是基于 AQS 的)和 Condition 的。
CyclicBarrier 定义了 parties 和 count,count 作为计数器被初始化为 parties,当一个线程到了栅栏这里了,那么就将计数器减一,如果 count 为零了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务,并且再次将 count 设置为 parties 进行拦截。
1 | //每次拦截的线程数 |