AQS详解:Java并发编程的核心骨架


AQS详解:Java并发编程的核心骨架

在Java并发编程中,AbstractQueuedSynchronizer(简称AQS)是一个极其重要的基础框架。它几乎支撑了Java并发包中所有的同步器实现,如ReentrantLockCountDownLatchSemaphore等。本文将深入探讨AQS的设计原理、核心组件以及实现机制。

一、AQS概述

AQS是一个抽象类,它定义了一套用于实现同步器的基础框架。其核心思想是:

通过维护一个共享状态(state)和一个FIFO队列来实现线程间的同步。

AQS为我们提供了一种模板方法设计模式,它定义了同步器的通用结构,而将具体的同步逻辑延迟到子类中实现。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    // 内部类:Node表示等待队列中的一个节点
    static final class Node {
        // 节点状态定义
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        // 其他成员变量和方法...
    }
    
    // 同步状态
    private volatile int state;
    
    // 队列头节点
    private transient volatile Node head;
    
    // 队列尾节点
    private transient volatile Node tail;
    
    // 核心方法...
}

二、AQS核心组件

2.1 同步状态(State)

AQS使用一个整型变量state来表示同步状态,通过CAS操作来修改这个状态。不同的同步器可以根据自己的需要定义state的含义:

  • ReentrantLock:state表示锁的重入次数
  • ReadWriteLock:state的高16位表示读锁状态,低16位表示写锁状态
  • CountDownLatch:state表示计数器的值
  • Semaphore:state表示可用许可证的数量
// 获取当前状态
protected final int getState() {
    return state;
}

// 设置状态
protected final void setState(int newState) {
    state = newState;
}

// CAS方式设置状态
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

2.2 等待队列(CLH队列)

AQS内部维护了一个FIFO的双向链表队列,称为CLH队列(Craig, Landin, and Hagersten locks)。当线程获取锁失败时,会被包装成一个Node节点,并添加到队列末尾等待。

AQS CLH队列结构

Node节点的主要属性:

static final class Node {
    // 共享模式标记
    static final Node SHARED = new Node();
    // 独占模式标记
    static final Node EXCLUSIVE = null;
    
    // 节点状态
    volatile int waitStatus;
    // 前驱节点
    volatile Node prev;
    // 后继节点
    volatile Node next;
    // 当前线程
    volatile Thread thread;
    
    // 等待队列中的下一个节点
    Node nextWaiter;
    
    // 是否为共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    // 其他方法...
}

Node节点的状态(waitStatus)有以下几种:

  • CANCELLED (1):当前节点已取消
  • SIGNAL (-1):当前节点的后继节点需要被唤醒
  • CONDITION (-2):当前节点在等待条件
  • PROPAGATE (-3):共享模式下,状态需要向后传播
  • 0:初始状态

三、AQS的工作原理

3.1 独占模式(Exclusive Mode)

独占模式是指在同一时刻,只有一个线程可以获取同步状态。ReentrantLock就是一个典型的独占模式同步器。

3.1.1 获取锁(acquire)

public final void acquire(int arg) {
    // 尝试获取锁,如果失败则入队并等待
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

获取锁的流程:

  1. 调用tryAcquire尝试获取锁(由子类实现)
  2. 如果获取成功,直接返回
  3. 如果获取失败,调用addWaiter创建一个独占模式的节点并添加到队列尾部
  4. 调用acquireQueued使线程在队列中自旋等待获取锁
  5. 如果线程在等待过程中被中断,调用selfInterrupt自我中断

3.1.2 释放锁(release)

public final boolean release(int arg) {
    // 尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
        // 如果头节点不为空且状态不为0,唤醒后继节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

释放锁的流程:

  1. 调用tryRelease尝试释放锁(由子类实现)
  2. 如果释放成功,获取头节点
  3. 如果头节点不为空且状态不为0,调用unparkSuccessor唤醒后继节点

3.2 共享模式(Shared Mode)

共享模式是指在同一时刻,允许多个线程同时获取同步状态。CountDownLatchSemaphore都支持共享模式。

3.2.1 获取锁(acquireShared)

public final void acquireShared(int arg) {
    // 尝试以共享模式获取锁
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

获取共享锁的流程:

  1. 调用tryAcquireShared尝试获取共享锁(由子类实现)
  2. 如果返回值大于等于0,表示获取成功,直接返回
  3. 如果返回值小于0,表示获取失败,调用doAcquireShared使线程在队列中等待

3.2.2 释放锁(releaseShared)

public final boolean releaseShared(int arg) {
    // 尝试以共享模式释放锁
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

释放共享锁的流程:

  1. 调用tryReleaseShared尝试释放共享锁(由子类实现)
  2. 如果释放成功,调用doReleaseShared唤醒后继节点

四、AQS的主要方法

AQS提供了一系列模板方法,子类需要根据自身需求实现相应的方法:

4.1 独占模式相关方法

  • tryAcquire(int arg):尝试以独占模式获取锁
  • tryRelease(int arg):尝试以独占模式释放锁
  • isHeldExclusively():判断当前线程是否正在独占资源

4.2 共享模式相关方法

  • tryAcquireShared(int arg):尝试以共享模式获取锁
  • tryReleaseShared(int arg):尝试以共享模式释放锁

4.3 组合模式

有些同步器既支持独占模式,又支持共享模式,如ReadWriteLock。在读取操作时使用共享模式,在写入操作时使用独占模式。

五、AQS的实现类

Java并发包中,许多同步器都是基于AQS实现的:

5.1 ReentrantLock

ReentrantLock是一个可重入的独占锁,其内部通过SyncNonfairSyncFairSync三个内部类实现,这三个类都继承自AQS。

公平锁与非公平锁的区别:

  • 公平锁:线程按照请求锁的顺序获取锁
  • 非公平锁:线程可以插队获取锁,不一定按照请求顺序

5.2 CountDownLatch

CountDownLatch是一个同步工具类,用于等待一组事件发生后再执行后续操作。其内部通过Sync内部类实现,该类继承自AQS。

使用场景:

  • 等待所有线程完成初始化后再开始执行任务
  • 等待多个服务启动后再提供服务

5.3 Semaphore

Semaphore是一个计数信号量,用于限制同时访问某个资源的线程数量。其内部通过SyncNonfairSyncFairSync三个内部类实现,这三个类都继承自AQS。

使用场景:

  • 限流
  • 控制并发访问数量

5.4 CyclicBarrier

CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同,CyclicBarrier可以被重用。

使用场景:

  • 多线程并行计算,最后合并结果

六、自定义AQS同步器示例

下面我们来实现一个简单的独占锁,演示如何基于AQS自定义同步器:

public class SimpleLock implements Lock {
    // 自定义同步器
    private final Sync sync = new Sync();
    
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判断是否持有锁
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        
        // 尝试获取锁
        @Override
        protected boolean tryAcquire(int arg) {
            // CAS设置状态为1
            if (compareAndSetState(0, 1)) {
                // 设置当前线程为独占线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        
        // 尝试释放锁
        @Override
        protected boolean tryRelease(int arg) {
            // 只有持有锁的线程才能释放锁
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            // 设置状态为0
            setState(0);
            return true;
        }
        
        // 创建条件变量
        Condition newCondition() {
            return new ConditionObject();
        }
    }
    
    // 实现Lock接口的方法
    @Override
    public void lock() {
        sync.acquire(1);
    }
    
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    
    @Override
    public void unlock() {
        sync.release(1);
    }
    
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

七、总结

AQS是Java并发编程的核心骨架,它通过维护一个同步状态和一个等待队列,为各种同步器提供了统一的实现基础。其主要特点包括:

  1. 模板方法设计模式:AQS定义了同步器的通用结构,具体同步逻辑由子类实现
  2. 两种获取模式:支持独占模式和共享模式
  3. 高效的等待队列:使用CLH队列实现线程的等待和唤醒
  4. 原子性操作:使用CAS操作保证状态更新的原子性
  5. 可重入设计:支持锁的重入

理解AQS的工作原理,对于掌握Java并发编程至关重要。它不仅可以帮助我们更好地使用Java并发包中的各种同步工具,还能让我们在需要时自定义同步器来满足特定需求。

参考资料


文章作者: lucky845
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 lucky845 !
评论
  目录