AQS 是 Java 并发包的基础抽象框架,它将多线程同步的通用逻辑(如线程排队、阻塞、唤醒、状态原子性维护)进行了统一封装。基于这一框架,开发者能以极低的代码成本,高效地实现诸如 ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLock 等各类线程同步工具。

1 问题背景:JUC 同步器的共同底座

ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLock,乃至 ThreadPoolExecutor 内部的 Worker,底层都绕不开同一个类:AbstractQueuedSynchronizer(AQS)。

直接打开 AQS 源码,2300 多行、doAcquireInterruptibly / doAcquireNanos / doAcquireShared 一堆变体,很容易在细节里迷路。这里真正需要关注的是:AQS 把两件事拆开了——

  • 状态语义(这把锁有没有被持有、还剩几个 permit、倒计时还剩几次)交给子类用 state 表达;
  • 排队、阻塞、唤醒、取消 固化成一套模板,子类只需实现几个 tryXxx 钩子。

读完这条主线,再回头看各 JUC 组件,就只是「同一个骨架上的不同 state 含义」。

2 AQS 的设计模型:模板方法 + 单一 state

2.1 核心字段与钩子

AQS 用一个 volatile int state 表示同步状态,通过 getState / setState / compareAndSetState 读写。子类按需实现以下 protected 方法:

tryAcquire(int arg)          // 独占获取
tryRelease(int arg)          // 独占释放
tryAcquireShared(int arg)    // 共享获取,返回值 >= 0 表示成功
tryReleaseShared(int arg)    // 共享释放
isHeldExclusively()          // 是否被当前线程独占持有(Condition 需要)

AQS 自身把 acquire / release / acquireShared / releaseShared 及其可中断、超时变体都声明为 final,子类不能改排队逻辑,只能改状态判断。

2.2 全局骨架

类注释里给出了独占模式的伪代码:

Acquire:
    while (!tryAcquire(arg)) {
       enqueue thread if it is not already queued;
       possibly block current thread;
    }

Release:
    if (tryRelease(arg))
       unblock the first queued thread;

翻译成工程语言:先乐观尝试 CAS 抢状态;抢不到就入队;入队后自旋 + park 交替重试;释放成功则唤醒队首后继。 共享模式类似,但 release 可能触发级联传播(后文第 6 节)。

2.3 组件层次

下图展示 AQS 在 JUC 中的位置:对外暴露的是 LockSemaphore 等 API,对内是各组件的 Sync 内部类继承 AQS 并实现 tryXxx

image-20260618224206634

3 原始 CLH 锁:它要解决什么问题

Doug Lea 在 Node 类注释中写明:AQS 等待队列是 CLH(Craig、Landin、Hagersten)锁队列的变体。要读懂 AQS 的 prev / next / waitStatus,得先知道原始 CLH 在做什么。

3.1 工程动机

Test-and-Set(TAS)自旋锁让所有线程盯着同一块内存自旋。锁释放时,缓存行在核间来回失效,线程越多性能越差。

CLH 的思路是:把等待者排成 FIFO 逻辑队列,每个线程只自旋检查前驱节点上的状态位。缓存竞争从「全员盯一把锁」收敛为「只盯自己的前驱」。

原始 CLH 是自旋锁,适合临界区极短、线程数可控的场景。AQS 借用了它的排队战术,但把自旋换成了 LockSupport.park/unpark 做阻塞。

3.2 结构:逻辑队列,物理上只有 tail

普通链表队列会维护 headtail 和节点间的 next 指针。原始 CLH 刻意不做这件事:全局只暴露一个 tail,前驱关系靠入队时 getAndSet 的返回值临时建立——节点之间没有 next 链,也没有 head

从行为上看,等待者仍按 FIFO 顺序排队、依次获取锁;从数据结构上看,「谁排在谁前面」只存在于各线程的 myPred 里。

组件作用
AtomicReference<QNode> tail全线程共享,始终指向队尾
dummy 哨兵节点构造时创建,locked=false,给第一个入队线程提供前驱
ThreadLocal<QNode> myNode每线程复用自身节点,避免 lock 热路径分配
ThreadLocal<QNode> myPred记录本次入队时得到的前驱
QNode.locked是否仍在排队;true 表示举着牌子等锁,false 表示已放行

入队的核心是 tail.getAndSet(myNode),它在一次原子操作里完成两件事:把 tail 更新为当前节点;返回更新前的旧 tail,即逻辑前驱 pred。随后线程执行 while (pred.locked)——只自旋前驱的状态位。这就是 3.1 中「缓存竞争从全员盯一把锁,收敛为只盯自己的前驱」在结构上的落点。

三线程先后 lock 后,全局引用与各线程视角可以这样理解:

image-20260619103637653

  • T1 自旋盯哨兵locked;T2 盯 T1 节点;T3 盯 T2 节点
  • T1 的节点里没有指向 T2 的指针;后继只通过各自的 myPred 知道该盯谁
  • 不维护 head:获取锁只需前驱放行,无需从队头遍历,也少一个全局竞争点

3.3 简易 Java 实现

public class CLHSpinLock {
    private final AtomicReference<QNode> tail =
        new AtomicReference<>(new QNode());
    private final ThreadLocal<QNode> myNode =
        ThreadLocal.withInitial(QNode::new);
    private final ThreadLocal<QNode> myPred = new ThreadLocal<>();

    public void lock() {
        QNode node = myNode.get();
        node.locked = true;
        QNode pred = tail.getAndSet(node);
        myPred.set(pred);
        while (pred.locked) { /* spin */ }
    }

    public void unlock() {
        myNode.get().locked = false;
        myNode.set(myPred.get());
    }

    static class QNode { volatile boolean locked; }
}
步骤含义
node.locked = true声明自己在排队
tail.getAndSet(node)原子入队:当前节点成为新 tail,返回值是前驱 pred
myPred.set(pred)把前驱记入 ThreadLocal,供 unlock 复用节点
while (pred.locked)只盯前驱——CLH 缓存友好的根源
myNode.get().locked = falseunlock:放下牌子,后继自旋结束
myNode.set(myPred.get())把 ThreadLocal 节点换成前驱对象复用,避免快速重入时后继仍盯着已被当前线程再次 lock 的同一节点而死锁

3.4 三线程时序

下面用时序图把 3.2 的入队与 3.3 的自旋串起来:每次 getAndSet 的返回值决定「盯谁」,每次 unlocklocked=false 决定「谁被放行」。

sequenceDiagram
    participant S as 哨兵节点
    participant T1 as Thread1
    participant T2 as Thread2
    participant T3 as Thread3

    Note over S: tail 指向哨兵,locked=false
    T1->>S: getAndSet(T1.node),pred=哨兵
    T1->>S: 哨兵.locked=false,T1 获得锁
    T1->>S: T1 执行临界区
    T2->>S: getAndSet(T2.node),pred=T1.node
    T2->>T1: 自旋 T1.node.locked
    T3->>S: getAndSet(T3.node),pred=T2.node
    T3->>T2: 自旋 T2.node.locked
    T1->>T1: unlock:T1.node.locked=false,复用哨兵节点
    T2->>T2: 自旋结束,获得锁
    T2->>T2: unlock:T2.node.locked=false
    T3->>T3: 自旋结束,获得锁

3.5 原始 CLH 与 AQS 变体对照

维度原始 CLH 自旋锁AQS CLH 变体
等待方式自旋 while (pred.locked)先自旋,然后 LockSupport.park 阻塞
队列指针tail;逻辑前驱显式 head + tail + prev + next
状态字段QNode.lockedNode.waitStatus(SIGNAL/CANCELLED/…)
节点归属ThreadLocal 复用每次等待 new Node(thread)
获取语义严格 FIFO 自旋锁队列 FIFO,但允许 barging 插队
取消/超时不支持CANCELLED + prev 链修补
唤醒前驱 locked=falserelease 时 unparkSuccessor
dummy 节点构造时创建哨兵首次竞争时延迟初始化

AQS 注释指出:prev 链在原始 CLH 中不存在,主要为处理取消;next 链则为阻塞唤醒服务。

4 AQS 的 CLH 变体等待队列

4.1 Node 与 waitStatus

static final class Node {
    volatile int waitStatus;  // SIGNAL=-1, CANCELLED=1, CONDITION=-2, PROPAGATE=-3
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
}

waitStatus 承担原始 CLH 里「前驱状态决定后继是否继续等」的角色,但语义更丰富:

  • SIGNAL:当前节点的后继需要被 unpark;在 acquire 失败路径里由前驱设置
  • CANCELLED:节点因超时或中断取消,不再参与排队
  • CONDITION:节点在条件队列上(ConditionObject
  • PROPAGATE:共享模式下 release 需继续向后传播

4.2 入队与出队

tryAcquire 失败后会走 addWaiter 入队;获取成功则在 acquireQueued 里调用 setHead,把当前节点提升为 head(逻辑出队)。与第 3 节原始 CLH 相比,AQS 显式维护 head + tail + prev + next,不再只靠 getAndSet 的返回值建立前驱关系。

入队入口 addWaiter:先走快路径——tail 非空时直接 CAS 挂到队尾;失败或队列尚未初始化则退到 enq 自旋重试。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

enqtail == null 时 CAS 初始化 dummy header(thread == null 的哨兵节点),否则 CAS 把节点挂到 tail,并双向链接 prev / next。CAS 失败则 for (;;) 重试。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
步骤含义
compareAndSetHead(new Node())首次竞争时延迟创建 dummy header
tail = head初始化后 head 与 tail 同指哨兵
node.prev = t先写 prev,再 CAS tail——与 CLH 先 getAndSet 再拿返回值类似
compareAndSetTail(t, node)原子确认 tail 仍是 t,才把 node 挂到队尾
t.next = nodeCAS 成功后才写 next,保证 prev 链始终完整

setHead:获取成功后把等待节点提升为新的 dummy head,清空 thread / prev,避免旧节点被 GC 根引用住,也减少多余的唤醒遍历。

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

注意:setHead 只清 prevnext——acquireQueued 成功路径里会由调用方执行 p.next = null 断开旧 head 与后继的链接。

5 独占模式主链路:以 ReentrantLock 为样本

5.1 acquire 调用链

ReentrantLock.lock()Sync.acquire(1) → 模板:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
sequenceDiagram
    participant T as Thread
    participant RL as ReentrantLock.Sync
    participant AQS as AbstractQueuedSynchronizer

    T->>RL: lock()
    RL->>AQS: acquire(1)
    AQS->>AQS: tryAcquire(1)
    alt CAS 成功
        AQS-->>T: 直接持有
    else 失败
        AQS->>AQS: addWaiter(EXCLUSIVE)
        AQS->>AQS: acquireQueued(node, 1)
        loop 直到成功或取消
            AQS->>AQS: 前驱是 head 则 tryAcquire
            AQS->>AQS: shouldParkAfterFailedAcquire
            AQS->>AQS: LockSupport.park
        end
    end

5.2 acquireQueued:争用权不等于成功

final boolean acquireQueued(final Node node, int arg) {
    // ...
    for (;;) {
      	// 获取前驱节点
        final Node p = node.predecessor();
      	// 前驱是 head 才有资格 tryAcquire
        if (p == head && tryAcquire(arg)) {
            setHead(node); // 逻辑出队
            p.next = null;
            return interrupted;
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
            interrupted = true;
    }
}

前驱是 head 才有资格 tryAcquire——这叫获得争用权,但不保证成功。新到来的线程可以在入队前先 tryAcquire(barging),插队抢到锁。AQS 注释认为默认 barging 策略吞吐更高,代价是不保证严格 FIFO 获取顺序。

5.3 shouldParkAfterFailedAcquire:先标记再 park

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) { /* 跳过 CANCELLED 前驱 */ }
    else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

若直接 park 而不先把前驱标为 SIGNAL,可能出现:当前线程 park 的同时持有锁的线程 release 并 unpark——信号丢失。因此标准路径是「CAS 设 SIGNAL → 再 tryAcquire 一次 → 仍失败才 park」。

5.4 release 与唤醒

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
      	// head 节点不可能为 CANCELLED
      	// 所以 waitStatus != 0 表示需要唤醒后继
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

ReentrantLockstate 表示重入次数。tryRelease 减到 0 时清空 exclusiveOwnerThread 并返回 true,触发 unparkSuccessor

5.5 公平与非公平

ReentrantLock 通过两个 Sync 子类区分策略:非公平lock() 热路径先 CAS 抢锁(barging);公平所有获取都走 acquire(1),且在 tryAcquire 里检查队列前是否有等待者。

非公平 NonfairSynclock() 先乐观 CAS,失败才入队;tryAcquire 委托给 nonfairTryAcquire,不检查队列。

static final class NonfairSync extends Sync {
    final void lock() {
      	// 热路径先 cas 抢锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

公平 FairSynclock() 直接 acquire(1),不做提前 CAS;state == 0 时还需 !hasQueuedPredecessors() 才允许抢锁。

static final class FairSync extends Sync {
    final void lock() {
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

两者重入逻辑相同;差异只在「锁空闲时是否允许插队」。公共的 nonfairTryAcquire 被非公平 tryAcquiretryLock() 共用:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

// ReentrantLock.tryLock() — 无论公平/非公平构造,均走 nonfairTryAcquire
public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

公平锁依赖 AQS 的 hasQueuedPredecessors() 判断当前线程是否是队首等待者:

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
  	// h != t 则队列至少一个真实排队线程
    return h != t &&
        // s == null 则并发场景下刚有线程 addWaiter 入队,CAS 修改 tail 成功,但 head.next 还未完成指向,判定当前存在还未完全入队的前驱
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
对比项非公平公平
lock() 入口先 CAS,失败再 acquire直接 acquire(1)
空闲时抢锁允许 barging 插队队列非空则拒绝
tryLock()nonfairTryAcquire,可插队不遵守公平策略
吞吐更高,少一次队列操作较低,获取时间方差更小

公平锁降低吞吐、减小获取时间方差,但不等于 JVM 线程调度公平——持公平锁的线程仍可连续多次获取;tryLock() 在锁可用时直接成功,即使队列里有等待者。

6 共享模式:Semaphore 与信号传播

6.1 返回值语义

方法返回值含义
tryAcquireShared>= 0 成功,值为剩余资源;< 0 失败
tryReleaseSharedtrue 表示可能唤醒等待者

Semaphorestate 表示可用 permit 数。tryAcquireShared 用 CAS 减计数;tryReleaseShared 用 CAS 加计数。

6.2 传播:PROPAGATE 的作用

独占 release 只需 unparkSuccessor(head) 唤醒一个后继;共享模式下一次 release 可能让多个等待线程同时满足 tryAcquireShared(例如 Semaphore release 多个 permit、CountDownLatch 倒数到 0),因此需要 PROPAGATE 级联传播:release 侧 doReleaseShared 唤醒队首,acquire 侧 setHeadAndPropagate 在获取成功后继续向后传播。

共享 release 模板:子类 tryReleaseShared 返回 true 后,AQS 调用 doReleaseShared 而非独占模式的单次 unparkSuccessor

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared:循环检查 head 的 waitStatus。若为 SIGNAL 则 unparkSuccessor;若为 0 则 CAS 设为 PROPAGATE,保证后续 acquire 继续传播。循环是为应对并发下 head 被替换或新节点入队。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
head.waitStatus动作
SIGNALCAS 清为 0,调用 unparkSuccessor(h) 唤醒后继
0CAS 设为 PROPAGATE,标记「后续 acquire 应继续传播」
其他 / h == tail本轮不唤醒,检查 head 是否变化后退出

setHeadAndPropagate:共享 acquire 成功时(doAcquireShared 路径),在 setHead 之后判断是否需要继续传播。propagate > 0 表示 tryAcquireShared 返回值 ≥ 0(仍有剩余资源);h.waitStatus < 0 则覆盖 PROPAGATE 已转为 SIGNAL 的情况。

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    // ...
    for (;;) {
        final Node p = node.predecessor();
        if (p == head) {
            int r = tryAcquireShared(arg);
            if (r >= 0) {
                setHeadAndPropagate(node, r);
                p.next = null; // help GC
                // ...
                return;
            }
        }
        // shouldParkAfterFailedAcquire + park ...
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

Node.PROPAGATE = -3 的语义:仅设在 head 上,表示下一次 releaseShared 应无条件继续传播。

sequenceDiagram
    participant R as Release 线程
    participant AQS as AQS
    participant H as head 后继
    participant S2 as 共享等待者2

    R->>AQS: releaseShared(1)
    AQS->>AQS: tryReleaseShared 成功
    AQS->>AQS: doReleaseShared
    AQS->>H: unparkSuccessor(head)
    H->>AQS: tryAcquireShared 成功
    AQS->>AQS: setHeadAndPropagate
    AQS->>S2: 继续 doReleaseShared 级联唤醒

Semaphore 为例:tryReleaseShared 把 permit 加回 state 后返回 truetryAcquireShared 返回减完后的剩余 permit(≥ 0 表示成功)。CountDownLatch 则是 state 倒数到 0 时 tryReleaseShared 返回 true,等待线程批量通过:

// CountDownLatch.Sync
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;  // 仅倒数到 0 时触发 doReleaseShared
    }
}

7 ConditionObject:双队列与最佳实践

7.1 机制:同步队列 + 条件队列

ConditionObject 是 AQS 的内部类,每个 Condition 实例维护自己的条件队列firstWaiter / lastWaiter,单向 nextWaiter 链),与 AQS 的同步队列head / tailprev / next 链)并存。Condition 必须绑定独占 Lock(通常 ReentrantLock.newCondition())——signal / await 都依赖 isHeldExclusively() 校验当前线程持锁。

public class ConditionObject implements Condition, java.io.Serializable {
    private transient Node firstWaiter;
    private transient Node lastWaiter;
    // ...
}

await 主路径:入条件队列 → 完全释放锁 → park 等待 signal → 被转移到同步队列 → acquireQueuedsavedState 重新抢锁。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
步骤方法含义
1addConditionWaiter节点进入条件队列,waitStatus = CONDITION
2fullyReleaserelease(savedState) 原子释放锁,保存重入次数
3LockSupport.park在条件队列上阻塞,直到被 signal 转移
4isOnSyncQueue检测节点是否已被 transferForSignal 挂到同步队列
5acquireQueued(node, savedState)按保存的重入次数重新获取锁

addConditionWaiter:尾插条件队列,节点通过 nextWaiter 串联(与同步队列的 next 字段独立)。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

fullyRelease:保存当前 state,调用 release(savedState) 一次性释放全部重入;失败则标 CANCELLED 并抛 IllegalMonitorStateException

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

signaltransferForSignalsignal 要求当前线程独占持锁;doSignal 把队首条件节点转移到同步队列。转移时 CAS 把 waitStatusCONDITION 改为 0,再 enq 挂到同步队列 tail,并尝试把前驱标为 SIGNAL

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

注意:signal 只把节点转移到同步队列unpark,并不直接授予锁。被唤醒线程返回 await 后仍需 acquireQueued 与其他竞争者重新抢锁——因此不保证按条件等待顺序执行。

7.2 最佳实践

(1)标准模板

lock.lock();
try {
    while (!predicate())       // 必须 while,不能 if
        condition.await();
    // 修改共享状态
    condition.signal();        // 状态改完后再 signal
} finally {
    lock.unlock();
}

为何必须 while:spurious wakeup

Spurious wakeup(虚假唤醒) 指线程从等待中返回,但并不是因为业务条件已成立,甚至可能没有收到针对自己的 signalCondition 接口明确允许这种现象,作为向底层平台语义(Object.waitpthread_cond_wait)的妥协;实现可以消除它,但规范要求应用代码始终假设它可能发生

AQS 的 await() 在条件队列上调用 LockSupport.park(),而 LockSupport 文档写明:park 可能 spuriously(for no reason)returns,调用方必须重新检查等待条件。AQS 用内层循环消化「park 无故醒来但尚未被 signal 转移」:

while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

线程会继续 park,不会因此从 await() 返回。但对应用代码而言,await() 正常返回时只保证两件事:节点已被转移到同步队列;线程通过 acquireQueued 重新持锁。并保证业务谓词已成立:

场景发生了什么
平台虚假唤醒park 无故返回;AQS 内层 while 挡住大部分,接口仍要求应用侧防御
signal 后谓词已变T1 signal 唤醒 T2,T2 抢锁前 T3 取走数据 → 队列又空
多等待者共用一个 Conditionsignal 只唤醒一个,多个线程可能先后 await 返回,都需重检
先 signal 后改状态(见(2))写法错误,不是平台虚假唤醒;while 能兜底但多一次抢锁/阻塞

因此 await 返回只表示「可以重新检查谓词」,不表示条件已成立。ifwhile 的差别:

// 错误:await 返回就往下执行,谓词可能仍为 false
if (queue.isEmpty())
    notEmpty.await();

// 正确:每次醒来都重新检查
while (queue.isEmpty())
    notEmpty.await();

(2)先改状态,再 signal

ArrayBlockingQueue 把「改状态」和「signal」拆到 enqueue / dequeue 私有方法里:先更新 count 与数组槽位,完成后signalput / takewhile 循环只负责等待,入队/出队逻辑委托给这两个方法。

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();   // 状态已更新后再唤醒
}

private E dequeue() {
    final Object[] items = this.items;
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    notFull.signal();    // 腾出空位后再唤醒
    return x;
}

signal 写在 count++ 之前,被唤醒线程抢锁后检查 count == 0 仍成立,会立刻再次 await——while 能兜底,但多一次无谓的抢锁与阻塞。

(3)多 Condition 拆分,优先 signal

ArrayBlockingQueue 用两个 Condition 把「等空位」与「等数据」分开,构造时绑定同一把 ReentrantLock

final ReentrantLock lock;
private final Condition notEmpty;  // 等待 take
private final Condition notFull;   // 等待 put

public ArrayBlockingQueue(int capacity, boolean fair) {
    // ...
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull  = lock.newCondition();
}

putnotFull,唤醒消费者 notEmptytake 反之。全程用 signal() 精确唤醒一个等待者——源码中没有 signalAll()

public void put(E e) throws InterruptedException {
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);              // 内部 notEmpty.signal()
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();        // 内部 notFull.signal()
    } finally {
        lock.unlock();
    }
}

signalAll 适合全局状态翻转、无法判断哪些等待者满足谓词的场景;日常有界缓冲优先 signal 降低惊群。

(4)中断与超时

可中断等待用 lockInterruptibly() 获取锁,配合 await();超时版本在 while 中递减 awaitNanos 返回值:

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);  // 返回剩余等待时间
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

捕获 InterruptedException 后应向上抛出或 Thread.currentThread().interrupt(),不要吞掉。

(5)反模式

反模式后果
if (!predicate) await()虚假唤醒导致谓词未满足就继续执行
未持锁 signal()IllegalMonitorStateException
默认 signalAll()不必要唤醒,锁竞争加剧
一个 Condition 管多种谓词无法精确唤醒
把 Condition 当 synchronized 用与关联 Lock 语义无关(L126–135)
能用 BlockingQueue 却手写重复踩 AQS 细节

(6)何时值得用

适合自定义有界缓冲、连接池等待、单锁多谓词协作。需求可由 BlockingQueueSemaphore 表达时,优先用现成组件。前提是 Lock 支持 newCondition()——ReentrantLock 可以,Semaphore 不行。

Condition.java 中的 BoundedBuffer 是标准模板的最小完整版(put + take),ArrayBlockingQueue 是其生产级实现:

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

8 非常规用法:ThreadPoolExecutorWorker

Worker 直接继承 AQS,而非组合 Sync 内部类。类注释说明设计意图:用独占状态区分「正在执行任务」与「空闲等任务」,控制中断时机;实现非可重入互斥,防止任务内调用 setCorePoolSize 等池控制方法时再次抢锁。

/**
 * Class Worker mainly maintains interrupt control state for
 * threads running tasks, along with other minor bookkeeping.
 * This class opportunistically extends AbstractQueuedSynchronizer
 * to simplify acquiring and releasing a lock surrounding each
 * task execution.  ...
 * We implement a simple non-reentrant mutual exclusion lock rather
 * than use ReentrantLock because we do not want worker tasks to be
 * able to reacquire the lock when they invoke pool control methods
 * like setCorePoolSize.  Additionally, to suppress interrupts until
 * the thread actually starts running tasks, we initialize lock
 * state to a negative value, and clear it upon start (in runWorker).
 */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
}
state 值含义
-1构造后、runWorker 开始前:抑制 interruptIdleWorkers
0空闲等任务:可被 tryLock 中断
1正在执行任务:中断应作用于任务,而非唤醒等任务的 park

runWorker:启动时 w.unlock() 把 state 从 -1 清到 0;每执行一个任务前 w.lock(),结束后 w.unlock()

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts — state: -1 → 0
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();   // state: 0 → 1,标记正在执行
            try {
                // ... beforeExecute / task.run() / afterExecute ...
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();  // state: 1 → 0
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

interruptIdleWorkers:只对tryLock 成功的 Worker 发中断——正在执行任务的 Worker(state == 1tryAcquire 失败,不会被误中断。

private void interruptIdleWorkers(boolean onlyOne) {
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

这不是「锁」的业务语义,而是借用 AQS 的 tryAcquire / tryReleasepark/unpark执行期保护

9 取消、中断与超时路径

主路径 acquire / acquireQueued 处理正常排队;超时、可中断 acquire 和节点取消走独立变体。AQS 注释说明不可简单合并——中断异常语义、超时递减、tryAcquire 抛异常时的 cancelAcquire 交互,合并会伤害性能。

9.1 不可中断 acquire 与 selfInterrupt

acquire 忽略中断,但 park 期间收到中断会记录标志,成功后补设中断位:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    for (;;) {
        // ...
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
            interrupted = true;  // 记录,不抛异常
    }
    // return interrupted;
}

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

9.2 可中断 acquire

acquireInterruptiblypark 期间检测到中断立即抛 InterruptedException,失败路径 cancelAcquire

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

9.3 超时 acquire

doAcquireNanosdeadline 递减剩余时间;极短超时(≤ spinForTimeoutThreshold,1000ns)自旋重试而非 park,减少系统调用:

static final long spinForTimeoutThreshold = 1000L;

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    // ...
    for (;;) {
        // ... 前驱是 head 则 tryAcquire ...
        nanosTimeout = deadline - System.nanoTime();
        if (nanosTimeout <= 0L)
            return false;
        if (shouldParkAfterFailedAcquire(p, node) &&
            nanosTimeout > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if (Thread.interrupted())
            throw new InterruptedException();
    }
}

9.4 cancelAcquire

节点因超时或中断取消时,标为 CANCELLED,沿 prev 链跳过已取消前驱,必要时 unparkSuccessor 唤醒后继:

private void cancelAcquire(Node node) {
    if (node == null)
        return;
    node.thread = null;

    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;

    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

shouldParkAfterFailedAcquire 在排队路径上也会跳过 CANCELLED 前驱(ws > 0 分支),与 cancelAcquire 配合保证队列不因取消节点而卡住。

10 边界与常见误区

  • AQS 不保证 FIFO 获取顺序,除非子类在 tryAcquire 中检查 hasQueuedPredecessors
  • Condition.signal 只保证节点转移到同步队列;被唤醒线程仍需重新抢锁,不保证按条件等待顺序执行
  • 公平锁降低吞吐,不等于线程调度公平
  • stateint,重入次数或许可证溢出需子类处理(抛 Error
  • 状态语义过于复杂时,AQS 注释建议降级到 Atomic* + 自定义 Queue + LockSupport(见下)

10.1 何时不必强行用 AQS

AQS 类注释指出:它是为能用单个 int state + acquire/release 参数 + 内置 FIFO 等待队列表达的同步器提供的高效底座。当这套模型装不下你的状态语义时,不必硬套 AQS,应退回更底层的三件套自行组装:

组件职责
AtomicInteger / AtomicLong / AtomicReference维护同步状态,支持比单个 int 更丰富的语义
自定义 Queue按业务需要组织等待者(优先级、分段、每 key 一条队列等)
LockSupport阻塞与唤醒,替代 AQS 固化的 park/unpark 模板

AQS 擅长什么ReentrantLockstate = 重入次数)、Semaphorestate = permit 数)、CountDownLatchstate = 倒计时)——状态是一个整数,获取/释放可用 tryAcquire / tryRelease 描述,排队策略接受 CLH FIFO + barging。

什么时候考虑降级

  • 状态不是一个 int 能表达:需要 long 版本戳(如 StampedLock 的 stamp + 读写位)、多个字段组合状态、按阶段/代际(generation)管理等待者(Phaser 更接近此类)
  • 获取/释放模型对不上:不是简单的「减 1 / 加 1」或「0/1 互斥」,而是复杂谓词、批量放行规则、与外部资源生命周期强绑定
  • 队列语义要定制:不要全局 FIFO,要优先级队列、LIFO 栈式唤醒、或「每个 key 独立等待队列」
  • Condition 前提不满足ConditionObject 要求独占 release(savedState) 能完全释放、acquire(savedState) 能恢复——语义对不上就不要挂 Condition

降级并不意味着「手写一套完整 AQS」——而是只拿你需要的零件:用 Atomic* 保证状态原子性,用自选队列结构管理等待关系,在合适点 LockSupport.park/unpark。代价是你要自己处理 signal 丢失、取消、超时、公平性等 AQS 已固化的边界;收益是状态与排队策略不再被 int state + CLH 模板约束。

JUC 里已有不少「非 AQS 底座」的同步器:StampedLocklong 状态 + 队列)、Phaser(分阶段协调)、部分场景直接用 BlockingQueue / Semaphore 组合即可,无需再继承 AQS。

11 总结

理解 AQS 可以压缩成三条主线:

  1. state 语义在子类——锁、信号量、倒计时只是 tryXxx 的不同实现
  2. 排队唤醒在 AQS——CLH 变体队列 + acquireQueued / unparkSuccessor 模板
  3. 阻塞靠 LockSupport——自旋与 park 交替,兼顾短持有与高并发

建议阅读顺序:先跟通 acquire → acquireQueued → release → unparkSuccessor;再读共享传播的 doReleaseShared;最后看 ConditionObject 的双队列转移。在此之上对照 ReentrantLockSemaphoreArrayBlockingQueueSync 实现,就能把 JUC 同步器串成一张完整的图。