问题
AQS操作类有哪些?
答案
1. 核心概念
AQS(AbstractQueuedSynchronizer)是 JUC 包中众多并发工具类的基础框架。基于 AQS 实现的同步器包括:
- ReentrantLock(可重入锁)
- ReentrantReadWriteLock(读写锁)
- Semaphore(信号量)
- CountDownLatch(倒计数器)
- CyclicBarrier(循环栅栏,间接基于 AQS)
- ThreadPoolExecutor.Worker(线程池的工作线程)
2. 主要AQS操作类详解
2.1 ReentrantLock(可重入锁)
核心特性:
- 独占模式
- 支持公平/非公平锁
- 支持可中断、超时、尝试获取
- 支持多个条件变量
使用场景:
- 替代 synchronized,提供更灵活的锁机制
- 需要高级特性(超时、中断、公平锁)的场景
示例:
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// 临界区代码
} finally {
lock.unlock();
}
// 可中断获取
try {
lock.lockInterruptibly();
// 业务代码
} finally {
lock.unlock();
}
// 超时获取
if (lock.tryLock(3, TimeUnit.SECONDS)) {
try {
// 业务代码
} finally {
lock.unlock();
}
}
2.2 ReentrantReadWriteLock(读写锁)
核心特性:
- 同时支持独占模式(写锁)和共享模式(读锁)
- 读锁可被多个线程同时持有
- 写锁是独占的
- 支持锁降级(写锁 → 读锁)
使用场景:
- 读多写少的场景
- 需要细粒度控制的数据结构(如缓存)
示例:
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();
// 读操作
readLock.lock();
try {
// 读取数据(多个线程可同时读)
} finally {
readLock.unlock();
}
// 写操作
writeLock.lock();
try {
// 修改数据(独占)
} finally {
writeLock.unlock();
}
state 的巧妙设计:
// 高 16 位表示读锁持有次数,低 16 位表示写锁持有次数
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 读锁持有次数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 写锁持有次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
2.3 Semaphore(信号量)
核心特性:
- 共享模式
- 控制同时访问某个资源的线程数量
- state 表示可用许可数
使用场景:
- 限流(控制并发数)
- 资源池管理(如数据库连接池)
- 停车场管理(车位数量限制)
示例:
// 创建一个有 3 个许可的信号量
Semaphore semaphore = new Semaphore(3);
// 获取许可
semaphore.acquire();
try {
// 访问资源(最多 3 个线程同时访问)
} finally {
semaphore.release(); // 释放许可
}
// 尝试获取
if (semaphore.tryAcquire()) {
try {
// 业务代码
} finally {
semaphore.release();
}
}
// 一次获取多个许可
semaphore.acquire(2);
try {
// 业务代码
} finally {
semaphore.release(2);
}
实际案例:限流:
public class RateLimiter {
private final Semaphore semaphore;
public RateLimiter(int maxConcurrent) {
this.semaphore = new Semaphore(maxConcurrent);
}
public void execute(Runnable task) throws InterruptedException {
semaphore.acquire();
try {
task.run();
} finally {
semaphore.release();
}
}
}
// 使用
RateLimiter limiter = new RateLimiter(10); // 最多 10 个并发
limiter.execute(() -> {
// 业务逻辑
});
2.4 CountDownLatch(倒计数器)
核心特性:
- 共享模式
- 一次性使用(不可重置)
- state 表示剩余计数
- 线程等待计数归零
使用场景:
- 主线程等待多个子线程完成
- 并行任务的协调
- 批量任务的同步
示例:
// 创建一个计数为 3 的 CountDownLatch
CountDownLatch latch = new CountDownLatch(3);
// 3 个工作线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
// 执行任务
System.out.println(Thread.currentThread().getName() + " 完成任务");
} finally {
latch.countDown(); // 计数 -1
}
}, "Worker-" + i).start();
}
// 主线程等待所有工作线程完成
latch.await();
System.out.println("所有任务完成");
实际案例:并行初始化:
public class ParallelInitializer {
public void initialize() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
// 初始化数据库连接
new Thread(() -> {
initDatabase();
latch.countDown();
}).start();
// 初始化缓存
new Thread(() -> {
initCache();
latch.countDown();
}).start();
// 初始化消息队列
new Thread(() -> {
initMQ();
latch.countDown();
}).start();
// 等待所有初始化完成
latch.await();
System.out.println("系统初始化完成");
}
}
2.5 CyclicBarrier(循环栅栏)
核心特性:
- 基于 ReentrantLock + Condition(间接基于 AQS)
- 可重复使用
- 线程到达栅栏后等待其他线程
- 所有线程到齐后一起继续执行
使用场景:
- 多线程计算任务的阶段同步
- 游戏中的多人就绪
- 分布式任务的屏障点
示例:
// 创建一个等待 3 个线程的栅栏
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程到达栅栏,开始执行任务");
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 到达栅栏");
barrier.await(); // 等待其他线程
System.out.println(Thread.currentThread().getName() + " 继续执行");
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread-" + i).start();
}
// 输出:
// Thread-0 到达栅栏
// Thread-1 到达栅栏
// Thread-2 到达栅栏
// 所有线程到达栅栏,开始执行任务
// Thread-0 继续执行
// Thread-1 继续执行
// Thread-2 继续执行
实际案例:多线程计算:
public class ParallelComputer {
public void compute() {
int workerCount = 4;
CyclicBarrier barrier = new CyclicBarrier(workerCount, () -> {
System.out.println("阶段 1 完成,开始阶段 2");
});
for (int i = 0; i < workerCount; i++) {
new Thread(() -> {
try {
// 阶段 1:数据加载
loadData();
barrier.await(); // 等待所有线程完成阶段 1
// 阶段 2:数据处理
processData();
barrier.await(); // 等待所有线程完成阶段 2
// 阶段 3:结果输出
outputResult();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
2.6 ThreadPoolExecutor.Worker
核心特性:
- 基于 AQS 实现的独占锁
- 用于线程池中的工作线程管理
- 实现了简单的不可重入锁
实现简化:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // 初始状态为 -1,禁止中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 简单的独占锁实现
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void run() {
runWorker(this);
}
}
3. AQS操作类对比
| 类名 | 模式 | 可重复使用 | 主要用途 | state 含义 |
|---|---|---|---|---|
| ReentrantLock | 独占 | ✅ | 互斥锁 | 锁的持有次数 |
| ReentrantReadWriteLock | 独占+共享 | ✅ | 读写锁 | 高16位:读锁次数 低16位:写锁次数 |
| Semaphore | 共享 | ✅ | 限流、资源池 | 可用许可数 |
| CountDownLatch | 共享 | ❌ | 一次性同步 | 剩余计数 |
| CyclicBarrier | - | ✅ | 阶段同步 | 基于 ReentrantLock |
| Worker | 独占 | ✅ | 线程池管理 | 锁状态 |
4. 选择指南
需要互斥锁:
- 简单场景:
synchronized - 需要高级特性:
ReentrantLock
读多写少:
ReentrantReadWriteLock
限流/资源池:
Semaphore
等待多任务完成:
- 一次性:
CountDownLatch - 可重复:
CyclicBarrier
5. 实际应用案例
5.1 数据库连接池
public class ConnectionPool {
private final Semaphore semaphore;
private final List<Connection> connections;
public ConnectionPool(int poolSize) {
this.semaphore = new Semaphore(poolSize);
this.connections = new ArrayList<>(poolSize);
// 初始化连接
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire();
return connections.remove(0);
}
public void releaseConnection(Connection conn) {
connections.add(conn);
semaphore.release();
}
}
5.2 缓存系统
public class Cache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public V get(K key) {
lock.readLock().lock();
try {
return cache.get(key);
} finally {
lock.readLock().unlock();
}
}
public void put(K key, V value) {
lock.writeLock().lock();
try {
cache.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
}
6. 总结
基于 AQS 的操作类包括:
- ReentrantLock:可重入锁,独占模式,功能强大
- ReentrantReadWriteLock:读写锁,支持读共享写独占
- Semaphore:信号量,限流和资源池管理
- CountDownLatch:倒计数器,一次性同步工具
- CyclicBarrier:循环栅栏,可重复的阶段同步
- ThreadPoolExecutor.Worker:线程池工作线程管理
面试要点:
- 列举常见的 AQS 操作类及其用途
- 说明独占模式(ReentrantLock)和共享模式(Semaphore)的区别
- 理解 CountDownLatch 和 CyclicBarrier 的差异(一次性 vs 可重复)
- 能够根据场景选择合适的并发工具
记忆口诀:
- Lock 独占保互斥,ReadWrite 读写分
- Semaphore 限流量,Latch 倒数一次完
- Barrier 循环可复用,Worker 线程池来管