AQS 是 Java 并发包的基础抽象框架,它将多线程同步的通用逻辑(如线程排队、阻塞、唤醒、状态原子性维护)进行了统一封装。基于这一框架,开发者能以极低的代码成本,高效地实现诸如 ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 等各类线程同步工具。
1 问题背景:JUC 同步器的共同底座
ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock,乃至 ThreadPoolExecutor 内部的 Worker,底层都绕不开同一个类:AbstractQueuedSynchronizer(AQS)。
直接打开 AQS 源码,2300 多行、doAcquireInterruptibly / doAcquireNanos / doAcquireShared 一堆变体,很容易在细节里迷路。这里真正需要关注的是:AQS 把两件事拆开了——
- 状态语义(这把锁有没有被持有、还剩几个 permit、倒计时还剩几次)交给子类用
state表达; - 排队、阻塞、唤醒、取消 固化成一套模板,子类只需实现几个
tryXxx钩子。
读完这条主线,再回头看各 JUC 组件,就只是「同一个骨架上的不同 state 含义」。
2 AQS 的设计模型:模板方法 + 单一 state
2.1 核心字段与钩子
AQS 用一个 volatile int state 表示同步状态,通过 getState / setState / compareAndSetState 读写。子类按需实现以下 protected 方法:
tryAcquire(int arg) // 独占获取
tryRelease(int arg) // 独占释放
tryAcquireShared(int arg) // 共享获取,返回值 >= 0 表示成功
tryReleaseShared(int arg) // 共享释放
isHeldExclusively() // 是否被当前线程独占持有(Condition 需要)
AQS 自身把 acquire / release / acquireShared / releaseShared 及其可中断、超时变体都声明为 final,子类不能改排队逻辑,只能改状态判断。
2.2 全局骨架
类注释里给出了独占模式的伪代码:
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}
Release:
if (tryRelease(arg))
unblock the first queued thread;
翻译成工程语言:先乐观尝试 CAS 抢状态;抢不到就入队;入队后自旋 + park 交替重试;释放成功则唤醒队首后继。 共享模式类似,但 release 可能触发级联传播(后文第 6 节)。
2.3 组件层次
下图展示 AQS 在 JUC 中的位置:对外暴露的是 Lock、Semaphore 等 API,对内是各组件的 Sync 内部类继承 AQS 并实现 tryXxx。

3 原始 CLH 锁:它要解决什么问题
Doug Lea 在 Node 类注释中写明:AQS 等待队列是 CLH(Craig、Landin、Hagersten)锁队列的变体。要读懂 AQS 的 prev / next / waitStatus,得先知道原始 CLH 在做什么。
3.1 工程动机
Test-and-Set(TAS)自旋锁让所有线程盯着同一块内存自旋。锁释放时,缓存行在核间来回失效,线程越多性能越差。
CLH 的思路是:把等待者排成 FIFO 逻辑队列,每个线程只自旋检查前驱节点上的状态位。缓存竞争从「全员盯一把锁」收敛为「只盯自己的前驱」。
原始 CLH 是自旋锁,适合临界区极短、线程数可控的场景。AQS 借用了它的排队战术,但把自旋换成了 LockSupport.park/unpark 做阻塞。
3.2 结构:逻辑队列,物理上只有 tail
普通链表队列会维护 head、tail 和节点间的 next 指针。原始 CLH 刻意不做这件事:全局只暴露一个 tail,前驱关系靠入队时 getAndSet 的返回值临时建立——节点之间没有 next 链,也没有 head。
从行为上看,等待者仍按 FIFO 顺序排队、依次获取锁;从数据结构上看,「谁排在谁前面」只存在于各线程的 myPred 里。
| 组件 | 作用 |
|---|---|
AtomicReference<QNode> tail | 全线程共享,始终指向队尾 |
| dummy 哨兵节点 | 构造时创建,locked=false,给第一个入队线程提供前驱 |
ThreadLocal<QNode> myNode | 每线程复用自身节点,避免 lock 热路径分配 |
ThreadLocal<QNode> myPred | 记录本次入队时得到的前驱 |
QNode.locked | 是否仍在排队;true 表示举着牌子等锁,false 表示已放行 |
入队的核心是 tail.getAndSet(myNode),它在一次原子操作里完成两件事:把 tail 更新为当前节点;返回更新前的旧 tail,即逻辑前驱 pred。随后线程执行 while (pred.locked)——只自旋前驱的状态位。这就是 3.1 中「缓存竞争从全员盯一把锁,收敛为只盯自己的前驱」在结构上的落点。
三线程先后 lock 后,全局引用与各线程视角可以这样理解:

- T1 自旋盯哨兵的
locked;T2 盯 T1 节点;T3 盯 T2 节点 - T1 的节点里没有指向 T2 的指针;后继只通过各自的
myPred知道该盯谁 - 不维护
head:获取锁只需前驱放行,无需从队头遍历,也少一个全局竞争点
3.3 简易 Java 实现
public class CLHSpinLock {
private final AtomicReference<QNode> tail =
new AtomicReference<>(new QNode());
private final ThreadLocal<QNode> myNode =
ThreadLocal.withInitial(QNode::new);
private final ThreadLocal<QNode> myPred = new ThreadLocal<>();
public void lock() {
QNode node = myNode.get();
node.locked = true;
QNode pred = tail.getAndSet(node);
myPred.set(pred);
while (pred.locked) { /* spin */ }
}
public void unlock() {
myNode.get().locked = false;
myNode.set(myPred.get());
}
static class QNode { volatile boolean locked; }
}
| 步骤 | 含义 |
|---|---|
node.locked = true | 声明自己在排队 |
tail.getAndSet(node) | 原子入队:当前节点成为新 tail,返回值是前驱 pred |
myPred.set(pred) | 把前驱记入 ThreadLocal,供 unlock 复用节点 |
while (pred.locked) | 只盯前驱——CLH 缓存友好的根源 |
myNode.get().locked = false | unlock:放下牌子,后继自旋结束 |
myNode.set(myPred.get()) | 把 ThreadLocal 节点换成前驱对象复用,避免快速重入时后继仍盯着已被当前线程再次 lock 的同一节点而死锁 |
3.4 三线程时序
下面用时序图把 3.2 的入队与 3.3 的自旋串起来:每次 getAndSet 的返回值决定「盯谁」,每次 unlock 的 locked=false 决定「谁被放行」。
sequenceDiagram
participant S as 哨兵节点
participant T1 as Thread1
participant T2 as Thread2
participant T3 as Thread3
Note over S: tail 指向哨兵,locked=false
T1->>S: getAndSet(T1.node),pred=哨兵
T1->>S: 哨兵.locked=false,T1 获得锁
T1->>S: T1 执行临界区
T2->>S: getAndSet(T2.node),pred=T1.node
T2->>T1: 自旋 T1.node.locked
T3->>S: getAndSet(T3.node),pred=T2.node
T3->>T2: 自旋 T2.node.locked
T1->>T1: unlock:T1.node.locked=false,复用哨兵节点
T2->>T2: 自旋结束,获得锁
T2->>T2: unlock:T2.node.locked=false
T3->>T3: 自旋结束,获得锁3.5 原始 CLH 与 AQS 变体对照
| 维度 | 原始 CLH 自旋锁 | AQS CLH 变体 |
|---|---|---|
| 等待方式 | 自旋 while (pred.locked) | 先自旋,然后 LockSupport.park 阻塞 |
| 队列指针 | 仅 tail;逻辑前驱 | 显式 head + tail + prev + next |
| 状态字段 | QNode.locked | Node.waitStatus(SIGNAL/CANCELLED/…) |
| 节点归属 | ThreadLocal 复用 | 每次等待 new Node(thread) |
| 获取语义 | 严格 FIFO 自旋锁 | 队列 FIFO,但允许 barging 插队 |
| 取消/超时 | 不支持 | CANCELLED + prev 链修补 |
| 唤醒 | 前驱 locked=false | release 时 unparkSuccessor |
| dummy 节点 | 构造时创建哨兵 | 首次竞争时延迟初始化 |
AQS 注释指出:prev 链在原始 CLH 中不存在,主要为处理取消;next 链则为阻塞唤醒服务。
4 AQS 的 CLH 变体等待队列
4.1 Node 与 waitStatus
static final class Node {
volatile int waitStatus; // SIGNAL=-1, CANCELLED=1, CONDITION=-2, PROPAGATE=-3
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
waitStatus 承担原始 CLH 里「前驱状态决定后继是否继续等」的角色,但语义更丰富:
- SIGNAL:当前节点的后继需要被 unpark;在 acquire 失败路径里由前驱设置
- CANCELLED:节点因超时或中断取消,不再参与排队
- CONDITION:节点在条件队列上(
ConditionObject) - PROPAGATE:共享模式下 release 需继续向后传播
4.2 入队与出队
tryAcquire 失败后会走 addWaiter 入队;获取成功则在 acquireQueued 里调用 setHead,把当前节点提升为 head(逻辑出队)。与第 3 节原始 CLH 相比,AQS 显式维护 head + tail + prev + next,不再只靠 getAndSet 的返回值建立前驱关系。
入队入口 addWaiter:先走快路径——tail 非空时直接 CAS 挂到队尾;失败或队列尚未初始化则退到 enq 自旋重试。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq:tail == null 时 CAS 初始化 dummy header(thread == null 的哨兵节点),否则 CAS 把节点挂到 tail,并双向链接 prev / next。CAS 失败则 for (;;) 重试。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
| 步骤 | 含义 |
|---|---|
compareAndSetHead(new Node()) | 首次竞争时延迟创建 dummy header |
tail = head | 初始化后 head 与 tail 同指哨兵 |
node.prev = t | 先写 prev,再 CAS tail——与 CLH 先 getAndSet 再拿返回值类似 |
compareAndSetTail(t, node) | 原子确认 tail 仍是 t,才把 node 挂到队尾 |
t.next = node | CAS 成功后才写 next,保证 prev 链始终完整 |
setHead:获取成功后把等待节点提升为新的 dummy head,清空 thread / prev,避免旧节点被 GC 根引用住,也减少多余的唤醒遍历。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
注意:setHead 只清 prev,不清 next——acquireQueued 成功路径里会由调用方执行 p.next = null 断开旧 head 与后继的链接。
5 独占模式主链路:以 ReentrantLock 为样本
5.1 acquire 调用链
ReentrantLock.lock() → Sync.acquire(1) → 模板:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
sequenceDiagram
participant T as Thread
participant RL as ReentrantLock.Sync
participant AQS as AbstractQueuedSynchronizer
T->>RL: lock()
RL->>AQS: acquire(1)
AQS->>AQS: tryAcquire(1)
alt CAS 成功
AQS-->>T: 直接持有
else 失败
AQS->>AQS: addWaiter(EXCLUSIVE)
AQS->>AQS: acquireQueued(node, 1)
loop 直到成功或取消
AQS->>AQS: 前驱是 head 则 tryAcquire
AQS->>AQS: shouldParkAfterFailedAcquire
AQS->>AQS: LockSupport.park
end
end5.2 acquireQueued:争用权不等于成功
final boolean acquireQueued(final Node node, int arg) {
// ...
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 前驱是 head 才有资格 tryAcquire
if (p == head && tryAcquire(arg)) {
setHead(node); // 逻辑出队
p.next = null;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
}
前驱是 head 才有资格 tryAcquire——这叫获得争用权,但不保证成功。新到来的线程可以在入队前先 tryAcquire(barging),插队抢到锁。AQS 注释认为默认 barging 策略吞吐更高,代价是不保证严格 FIFO 获取顺序。
5.3 shouldParkAfterFailedAcquire:先标记再 park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) { /* 跳过 CANCELLED 前驱 */ }
else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
若直接 park 而不先把前驱标为 SIGNAL,可能出现:当前线程 park 的同时持有锁的线程 release 并 unpark——信号丢失。因此标准路径是「CAS 设 SIGNAL → 再 tryAcquire 一次 → 仍失败才 park」。
5.4 release 与唤醒
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// head 节点不可能为 CANCELLED
// 所以 waitStatus != 0 表示需要唤醒后继
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock 的 state 表示重入次数。tryRelease 减到 0 时清空 exclusiveOwnerThread 并返回 true,触发 unparkSuccessor。
5.5 公平与非公平
ReentrantLock 通过两个 Sync 子类区分策略:非公平在 lock() 热路径先 CAS 抢锁(barging);公平所有获取都走 acquire(1),且在 tryAcquire 里检查队列前是否有等待者。
非公平 NonfairSync:lock() 先乐观 CAS,失败才入队;tryAcquire 委托给 nonfairTryAcquire,不检查队列。
static final class NonfairSync extends Sync {
final void lock() {
// 热路径先 cas 抢锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
公平 FairSync:lock() 直接 acquire(1),不做提前 CAS;state == 0 时还需 !hasQueuedPredecessors() 才允许抢锁。
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
两者重入逻辑相同;差异只在「锁空闲时是否允许插队」。公共的 nonfairTryAcquire 被非公平 tryAcquire 和 tryLock() 共用:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// ReentrantLock.tryLock() — 无论公平/非公平构造,均走 nonfairTryAcquire
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
公平锁依赖 AQS 的 hasQueuedPredecessors() 判断当前线程是否是队首等待者:
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h != t 则队列至少一个真实排队线程
return h != t &&
// s == null 则并发场景下刚有线程 addWaiter 入队,CAS 修改 tail 成功,但 head.next 还未完成指向,判定当前存在还未完全入队的前驱
((s = h.next) == null || s.thread != Thread.currentThread());
}
| 对比项 | 非公平 | 公平 |
|---|---|---|
lock() 入口 | 先 CAS,失败再 acquire | 直接 acquire(1) |
| 空闲时抢锁 | 允许 barging 插队 | 队列非空则拒绝 |
tryLock() | nonfairTryAcquire,可插队 | 不遵守公平策略 |
| 吞吐 | 更高,少一次队列操作 | 较低,获取时间方差更小 |
公平锁降低吞吐、减小获取时间方差,但不等于 JVM 线程调度公平——持公平锁的线程仍可连续多次获取;tryLock() 在锁可用时直接成功,即使队列里有等待者。
6 共享模式:Semaphore 与信号传播
6.1 返回值语义
| 方法 | 返回值含义 |
|---|---|
tryAcquireShared | >= 0 成功,值为剩余资源;< 0 失败 |
tryReleaseShared | true 表示可能唤醒等待者 |
Semaphore 的 state 表示可用 permit 数。tryAcquireShared 用 CAS 减计数;tryReleaseShared 用 CAS 加计数。
6.2 传播:PROPAGATE 的作用
独占 release 只需 unparkSuccessor(head) 唤醒一个后继;共享模式下一次 release 可能让多个等待线程同时满足 tryAcquireShared(例如 Semaphore release 多个 permit、CountDownLatch 倒数到 0),因此需要 PROPAGATE 级联传播:release 侧 doReleaseShared 唤醒队首,acquire 侧 setHeadAndPropagate 在获取成功后继续向后传播。
共享 release 模板:子类 tryReleaseShared 返回 true 后,AQS 调用 doReleaseShared 而非独占模式的单次 unparkSuccessor。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doReleaseShared:循环检查 head 的 waitStatus。若为 SIGNAL 则 unparkSuccessor;若为 0 则 CAS 设为 PROPAGATE,保证后续 acquire 继续传播。循环是为应对并发下 head 被替换或新节点入队。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
| head.waitStatus | 动作 |
|---|---|
SIGNAL | CAS 清为 0,调用 unparkSuccessor(h) 唤醒后继 |
0 | CAS 设为 PROPAGATE,标记「后续 acquire 应继续传播」 |
其他 / h == tail | 本轮不唤醒,检查 head 是否变化后退出 |
setHeadAndPropagate:共享 acquire 成功时(doAcquireShared 路径),在 setHead 之后判断是否需要继续传播。propagate > 0 表示 tryAcquireShared 返回值 ≥ 0(仍有剩余资源);h.waitStatus < 0 则覆盖 PROPAGATE 已转为 SIGNAL 的情况。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
// ...
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
// ...
return;
}
}
// shouldParkAfterFailedAcquire + park ...
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
Node.PROPAGATE = -3 的语义:仅设在 head 上,表示下一次 releaseShared 应无条件继续传播。
sequenceDiagram
participant R as Release 线程
participant AQS as AQS
participant H as head 后继
participant S2 as 共享等待者2
R->>AQS: releaseShared(1)
AQS->>AQS: tryReleaseShared 成功
AQS->>AQS: doReleaseShared
AQS->>H: unparkSuccessor(head)
H->>AQS: tryAcquireShared 成功
AQS->>AQS: setHeadAndPropagate
AQS->>S2: 继续 doReleaseShared 级联唤醒以 Semaphore 为例:tryReleaseShared 把 permit 加回 state 后返回 true;tryAcquireShared 返回减完后的剩余 permit(≥ 0 表示成功)。CountDownLatch 则是 state 倒数到 0 时 tryReleaseShared 返回 true,等待线程批量通过:
// CountDownLatch.Sync
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0; // 仅倒数到 0 时触发 doReleaseShared
}
}
7 ConditionObject:双队列与最佳实践
7.1 机制:同步队列 + 条件队列
ConditionObject 是 AQS 的内部类,每个 Condition 实例维护自己的条件队列(firstWaiter / lastWaiter,单向 nextWaiter 链),与 AQS 的同步队列(head / tail,prev / next 链)并存。Condition 必须绑定独占 Lock(通常 ReentrantLock.newCondition())——signal / await 都依赖 isHeldExclusively() 校验当前线程持锁。
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
// ...
}
await 主路径:入条件队列 → 完全释放锁 → park 等待 signal → 被转移到同步队列 → acquireQueued 按 savedState 重新抢锁。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
| 步骤 | 方法 | 含义 |
|---|---|---|
| 1 | addConditionWaiter | 节点进入条件队列,waitStatus = CONDITION |
| 2 | fullyRelease | 用 release(savedState) 原子释放锁,保存重入次数 |
| 3 | LockSupport.park | 在条件队列上阻塞,直到被 signal 转移 |
| 4 | isOnSyncQueue | 检测节点是否已被 transferForSignal 挂到同步队列 |
| 5 | acquireQueued(node, savedState) | 按保存的重入次数重新获取锁 |
addConditionWaiter:尾插条件队列,节点通过 nextWaiter 串联(与同步队列的 next 字段独立)。
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
fullyRelease:保存当前 state,调用 release(savedState) 一次性释放全部重入;失败则标 CANCELLED 并抛 IllegalMonitorStateException。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
signal 与 transferForSignal:signal 要求当前线程独占持锁;doSignal 把队首条件节点转移到同步队列。转移时 CAS 把 waitStatus 从 CONDITION 改为 0,再 enq 挂到同步队列 tail,并尝试把前驱标为 SIGNAL。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
注意:signal 只把节点转移到同步队列并 unpark,并不直接授予锁。被唤醒线程返回 await 后仍需 acquireQueued 与其他竞争者重新抢锁——因此不保证按条件等待顺序执行。
7.2 最佳实践
(1)标准模板
lock.lock();
try {
while (!predicate()) // 必须 while,不能 if
condition.await();
// 修改共享状态
condition.signal(); // 状态改完后再 signal
} finally {
lock.unlock();
}
为何必须 while:spurious wakeup
Spurious wakeup(虚假唤醒) 指线程从等待中返回,但并不是因为业务条件已成立,甚至可能没有收到针对自己的 signal。Condition 接口明确允许这种现象,作为向底层平台语义(Object.wait、pthread_cond_wait)的妥协;实现可以消除它,但规范要求应用代码始终假设它可能发生。
AQS 的 await() 在条件队列上调用 LockSupport.park(),而 LockSupport 文档写明:park 可能 spuriously(for no reason)returns,调用方必须重新检查等待条件。AQS 用内层循环消化「park 无故醒来但尚未被 signal 转移」:
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
线程会继续 park,不会因此从 await() 返回。但对应用代码而言,await() 正常返回时只保证两件事:节点已被转移到同步队列;线程通过 acquireQueued 重新持锁。并不保证业务谓词已成立:
| 场景 | 发生了什么 |
|---|---|
| 平台虚假唤醒 | park 无故返回;AQS 内层 while 挡住大部分,接口仍要求应用侧防御 |
| signal 后谓词已变 | T1 signal 唤醒 T2,T2 抢锁前 T3 取走数据 → 队列又空 |
| 多等待者共用一个 Condition | signal 只唤醒一个,多个线程可能先后 await 返回,都需重检 |
| 先 signal 后改状态(见(2)) | 写法错误,不是平台虚假唤醒;while 能兜底但多一次抢锁/阻塞 |
因此 await 返回只表示「可以重新检查谓词」,不表示条件已成立。if 与 while 的差别:
// 错误:await 返回就往下执行,谓词可能仍为 false
if (queue.isEmpty())
notEmpty.await();
// 正确:每次醒来都重新检查
while (queue.isEmpty())
notEmpty.await();
(2)先改状态,再 signal
ArrayBlockingQueue 把「改状态」和「signal」拆到 enqueue / dequeue 私有方法里:先更新 count 与数组槽位,完成后才 signal。put / take 的 while 循环只负责等待,入队/出队逻辑委托给这两个方法。
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); // 状态已更新后再唤醒
}
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
notFull.signal(); // 腾出空位后再唤醒
return x;
}
若 signal 写在 count++ 之前,被唤醒线程抢锁后检查 count == 0 仍成立,会立刻再次 await——while 能兜底,但多一次无谓的抢锁与阻塞。
(3)多 Condition 拆分,优先 signal
ArrayBlockingQueue 用两个 Condition 把「等空位」与「等数据」分开,构造时绑定同一把 ReentrantLock:
final ReentrantLock lock;
private final Condition notEmpty; // 等待 take
private final Condition notFull; // 等待 put
public ArrayBlockingQueue(int capacity, boolean fair) {
// ...
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
put 等 notFull,唤醒消费者 notEmpty;take 反之。全程用 signal() 精确唤醒一个等待者——源码中没有 signalAll():
public void put(E e) throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e); // 内部 notEmpty.signal()
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue(); // 内部 notFull.signal()
} finally {
lock.unlock();
}
}
signalAll 适合全局状态翻转、无法判断哪些等待者满足谓词的场景;日常有界缓冲优先 signal 降低惊群。
(4)中断与超时
可中断等待用 lockInterruptibly() 获取锁,配合 await();超时版本在 while 中递减 awaitNanos 返回值:
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos); // 返回剩余等待时间
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
捕获 InterruptedException 后应向上抛出或 Thread.currentThread().interrupt(),不要吞掉。
(5)反模式
| 反模式 | 后果 |
|---|---|
if (!predicate) await() | 虚假唤醒导致谓词未满足就继续执行 |
未持锁 signal() | IllegalMonitorStateException |
默认 signalAll() | 不必要唤醒,锁竞争加剧 |
| 一个 Condition 管多种谓词 | 无法精确唤醒 |
| 把 Condition 当 synchronized 用 | 与关联 Lock 语义无关(L126–135) |
| 能用 BlockingQueue 却手写 | 重复踩 AQS 细节 |
(6)何时值得用
适合自定义有界缓冲、连接池等待、单锁多谓词协作。需求可由 BlockingQueue、Semaphore 表达时,优先用现成组件。前提是 Lock 支持 newCondition()——ReentrantLock 可以,Semaphore 不行。
Condition.java 中的 BoundedBuffer 是标准模板的最小完整版(put + take),ArrayBlockingQueue 是其生产级实现:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
8 非常规用法:ThreadPoolExecutorWorker
Worker 直接继承 AQS,而非组合 Sync 内部类。类注释说明设计意图:用独占状态区分「正在执行任务」与「空闲等任务」,控制中断时机;实现非可重入互斥,防止任务内调用 setCorePoolSize 等池控制方法时再次抢锁。
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. ...
* We implement a simple non-reentrant mutual exclusion lock rather
* than use ReentrantLock because we do not want worker tasks to be
* able to reacquire the lock when they invoke pool control methods
* like setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in runWorker).
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
| state 值 | 含义 |
|---|---|
-1 | 构造后、runWorker 开始前:抑制 interruptIdleWorkers |
0 | 空闲等任务:可被 tryLock 中断 |
1 | 正在执行任务:中断应作用于任务,而非唤醒等任务的 park |
runWorker:启动时 w.unlock() 把 state 从 -1 清到 0;每执行一个任务前 w.lock(),结束后 w.unlock()。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts — state: -1 → 0
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock(); // state: 0 → 1,标记正在执行
try {
// ... beforeExecute / task.run() / afterExecute ...
} finally {
task = null;
w.completedTasks++;
w.unlock(); // state: 1 → 0
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
interruptIdleWorkers:只对能 tryLock 成功的 Worker 发中断——正在执行任务的 Worker(state == 1)tryAcquire 失败,不会被误中断。
private void interruptIdleWorkers(boolean onlyOne) {
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
这不是「锁」的业务语义,而是借用 AQS 的 tryAcquire / tryRelease 与 park/unpark 做执行期保护。
9 取消、中断与超时路径
主路径 acquire / acquireQueued 处理正常排队;超时、可中断 acquire 和节点取消走独立变体。AQS 注释说明不可简单合并——中断异常语义、超时递减、tryAcquire 抛异常时的 cancelAcquire 交互,合并会伤害性能。
9.1 不可中断 acquire 与 selfInterrupt
acquire 忽略中断,但 park 期间收到中断会记录标志,成功后补设中断位:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
for (;;) {
// ...
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 记录,不抛异常
}
// return interrupted;
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
9.2 可中断 acquire
acquireInterruptibly 在 park 期间检测到中断立即抛 InterruptedException,失败路径 cancelAcquire:
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
9.3 超时 acquire
doAcquireNanos 用 deadline 递减剩余时间;极短超时(≤ spinForTimeoutThreshold,1000ns)自旋重试而非 park,减少系统调用:
static final long spinForTimeoutThreshold = 1000L;
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
// ...
for (;;) {
// ... 前驱是 head 则 tryAcquire ...
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
}
9.4 cancelAcquire
节点因超时或中断取消时,标为 CANCELLED,沿 prev 链跳过已取消前驱,必要时 unparkSuccessor 唤醒后继:
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
shouldParkAfterFailedAcquire 在排队路径上也会跳过 CANCELLED 前驱(ws > 0 分支),与 cancelAcquire 配合保证队列不因取消节点而卡住。
10 边界与常见误区
- AQS 不保证 FIFO 获取顺序,除非子类在
tryAcquire中检查hasQueuedPredecessors Condition.signal只保证节点转移到同步队列;被唤醒线程仍需重新抢锁,不保证按条件等待顺序执行- 公平锁降低吞吐,不等于线程调度公平
state为int,重入次数或许可证溢出需子类处理(抛Error)- 状态语义过于复杂时,AQS 注释建议降级到
Atomic*+ 自定义Queue+LockSupport(见下)
10.1 何时不必强行用 AQS
AQS 类注释指出:它是为能用单个 int state + acquire/release 参数 + 内置 FIFO 等待队列表达的同步器提供的高效底座。当这套模型装不下你的状态语义时,不必硬套 AQS,应退回更底层的三件套自行组装:
| 组件 | 职责 |
|---|---|
AtomicInteger / AtomicLong / AtomicReference 等 | 维护同步状态,支持比单个 int 更丰富的语义 |
自定义 Queue | 按业务需要组织等待者(优先级、分段、每 key 一条队列等) |
LockSupport | 阻塞与唤醒,替代 AQS 固化的 park/unpark 模板 |
AQS 擅长什么:ReentrantLock(state = 重入次数)、Semaphore(state = permit 数)、CountDownLatch(state = 倒计时)——状态是一个整数,获取/释放可用 tryAcquire / tryRelease 描述,排队策略接受 CLH FIFO + barging。
什么时候考虑降级:
- 状态不是一个
int能表达:需要long版本戳(如StampedLock的 stamp + 读写位)、多个字段组合状态、按阶段/代际(generation)管理等待者(Phaser更接近此类) - 获取/释放模型对不上:不是简单的「减 1 / 加 1」或「0/1 互斥」,而是复杂谓词、批量放行规则、与外部资源生命周期强绑定
- 队列语义要定制:不要全局 FIFO,要优先级队列、LIFO 栈式唤醒、或「每个 key 独立等待队列」
Condition前提不满足:ConditionObject要求独占release(savedState)能完全释放、acquire(savedState)能恢复——语义对不上就不要挂Condition
降级并不意味着「手写一套完整 AQS」——而是只拿你需要的零件:用 Atomic* 保证状态原子性,用自选队列结构管理等待关系,在合适点 LockSupport.park/unpark。代价是你要自己处理 signal 丢失、取消、超时、公平性等 AQS 已固化的边界;收益是状态与排队策略不再被 int state + CLH 模板约束。
JUC 里已有不少「非 AQS 底座」的同步器:StampedLock(long 状态 + 队列)、Phaser(分阶段协调)、部分场景直接用 BlockingQueue / Semaphore 组合即可,无需再继承 AQS。
11 总结
理解 AQS 可以压缩成三条主线:
- state 语义在子类——锁、信号量、倒计时只是
tryXxx的不同实现 - 排队唤醒在 AQS——CLH 变体队列 +
acquireQueued/unparkSuccessor模板 - 阻塞靠 LockSupport——自旋与 park 交替,兼顾短持有与高并发
建议阅读顺序:先跟通 acquire → acquireQueued → release → unparkSuccessor;再读共享传播的 doReleaseShared;最后看 ConditionObject 的双队列转移。在此之上对照 ReentrantLock、Semaphore、ArrayBlockingQueue 的 Sync 实现,就能把 JUC 同步器串成一张完整的图。