可重入锁的底层实现
核心原理
可重入锁的底层实现依赖两个关键数据:
- 持有线程标识(Owner Thread):记录当前持有锁的线程
- 重入计数器(Recursion Counter):记录同一线程获取锁的次数
Java 中有两种主要实现方式:
- synchronized:基于 JVM 的 ObjectMonitor
- ReentrantLock:基于 AQS(AbstractQueuedSynchronizer)
一、synchronized 的底层实现
1. ObjectMonitor 数据结构
// HotSpot 源码:src/hotspot/share/runtime/objectMonitor.hpp
ObjectMonitor {
void * _owner; // ✅ 持有锁的线程指针
int _recursions; // ✅ 重入计数器
ObjectWaiter * _EntryList; // 等待获取锁的线程队列
ObjectWaiter * _WaitSet; // 调用 wait() 的线程队列
int _count; // 计数器
// ...
}
2. 加锁流程(monitorenter)
源码分析
// src/hotspot/share/runtime/objectMonitor.cpp
void ObjectMonitor::enter(TRAPS) {
Thread * const Self = THREAD;
// 步骤1:快速尝试获取锁(CAS)
void * cur = Atomic::cmpxchg(Self, &_owner, (void*)NULL);
if (cur == NULL) {
// 成功获取锁
assert(_recursions == 0, "invariant");
return;
}
// 步骤2:检查是否为当前线程(可重入判断)
if (cur == Self) {
// ✅ 可重入:计数器加1
_recursions++;
return;
}
// 步骤3:检查当前线程是否已持有轻量级锁
if (Self->is_lock_owned((address)cur)) {
assert(_recursions == 0, "internal state error");
_recursions = 1; // 轻量级锁膨胀为重量级锁
_owner = Self;
return;
}
// 步骤4:其他线程持有锁,进入阻塞队列
EnterI(THREAD);
}
void ObjectMonitor::EnterI(TRAPS) {
Thread * const Self = THREAD;
// 自旋尝试
if (TrySpin(Self) > 0) {
return; // 自旋成功
}
// 自旋失败,加入 _EntryList 阻塞
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_CXQ;
// 加入队列
ParkEvent * _event = node._event;
for (;;) {
_event->park(); // 阻塞当前线程
// 被唤醒后重新尝试获取锁
if (TryLock(Self) > 0) {
break; // 获取成功
}
}
}
流程图
monitorenter:
┌─────────────────────┐
│ CAS 尝试获取锁 │
└─────────┬───────────┘
│
成功 │ 失败
▼
┌─────────────────────┐
│ _owner == NULL? │
└─────────┬───────────┘
│
是 │ 否
▼
┌─────────────────────┐
│ _owner == 当前线程? │ ◄─ 可重入判断
└─────────┬───────────┘
│
是 │ 否
▼
┌─────────────────────┐
│ _recursions++ │ ◄─ 计数器加1
└─────────────────────┘
│
否 │
▼
┌─────────────────────┐
│ 进入 _EntryList 阻塞 │
└─────────────────────┘
3. 解锁流程(monitorexit)
源码分析
void ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * const Self = THREAD;
// 步骤1:检查当前线程是否持有锁
if (_owner != Self) {
throw IllegalMonitorStateException();
}
// 步骤2:处理重入情况
if (_recursions != 0) {
// ✅ 重入计数器减1,但不释放锁
_recursions--;
return;
}
// 步骤3:计数器为0,真正释放锁
OrderAccess::release_store(&_owner, (void*)NULL);
OrderAccess::fence(); // 内存屏障
// 步骤4:唤醒 _EntryList 中的线程
ObjectWaiter * w = NULL;
// 不同的唤醒策略(QMode)
if (QMode == 2 && _cxq != NULL) {
// 从 _cxq 队列中唤醒
w = _cxq;
ExitEpilog(Self, w);
} else if (QMode == 0 && _EntryList != NULL) {
// 从 _EntryList 队列中唤醒
w = _EntryList;
_EntryList = w->_next;
ExitEpilog(Self, w);
}
}
void ObjectMonitor::ExitEpilog(Thread * Self, ObjectWaiter * Wakee) {
// 唤醒线程
ParkEvent * _event = Wakee->_event;
_event->unpark(); // 唤醒被阻塞的线程
}
关键点
synchronized(obj) { // ← monitorenter
// _owner = 当前线程
// _recursions = 1
synchronized(obj) { // ← monitorenter(重入)
// _owner 不变
// _recursions = 2
// ...
} // ← monitorexit
// _recursions = 1
// 锁未释放
} // ← monitorexit
// _recursions = 0
// _owner = null
// 唤醒 _EntryList
二、ReentrantLock 的底层实现
1. AQS 核心数据结构
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public abstract class AbstractQueuedSynchronizer {
/**
* ✅ 状态变量:表示锁的状态
* - 0: 未锁定
* - n: 被锁定,且重入了 n 次
*/
private volatile int state;
/**
* ✅ 持有锁的线程
*/
private transient Thread exclusiveOwnerThread;
/**
* 等待队列(双向链表)
*/
static final class Node {
volatile Node prev;
volatile Node next;
volatile Thread thread;
// ...
}
private transient volatile Node head; // 队列头
private transient volatile Node tail; // 队列尾
}
2. ReentrantLock 的加锁实现
Sync 内部类(核心逻辑)
// java.util.concurrent.locks.ReentrantLock.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 非公平锁的快速尝试
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 情况1:锁未被持有
if (c == 0) {
// CAS 尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 情况2:当前线程已持有锁(可重入判断)
else if (current == getExclusiveOwnerThread()) {
// ✅ 重入:state 加上 acquires(通常为1)
int nextc = c + acquires;
if (nextc < 0) { // 溢出检查
throw new Error("Maximum lock count exceeded");
}
setState(nextc); // 更新 state
return true;
}
// 情况3:其他线程持有锁
return false;
}
/**
* 释放锁
*/
protected final boolean tryRelease(int releases) {
// ✅ state 减去 releases(通常为1)
int c = getState() - releases;
// 检查当前线程是否持有锁
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
boolean free = false;
// 只有 state 为 0 时才真正释放锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null); // 清空持有线程
}
setState(c);
return free;
}
}
公平锁 vs 非公平锁
// 非公平锁(默认)
static final class NonfairSync extends Sync {
final void lock() {
// 直接尝试 CAS(可能插队)
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
} else {
acquire(1); // 进入队列
}
}
}
// 公平锁
static final class FairSync extends Sync {
final void lock() {
acquire(1); // 直接进入队列,不插队
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// ✅ 公平锁:检查是否有前驱节点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入逻辑相同
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
setState(nextc);
return true;
}
return false;
}
}
3. 完整加锁流程
// 1. 用户调用
ReentrantLock lock = new ReentrantLock();
lock.lock();
// 2. lock() 方法
public void lock() {
sync.lock(); // 调用内部 Sync 的 lock
}
// 3. acquire() 方法(AQS)
public final void acquire(int arg) {
// 尝试获取锁
if (!tryAcquire(arg) &&
// 获取失败,加入队列并阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
// 4. tryAcquire() 方法(上面已分析)
// - 检查 state == 0(无人持有)
// - 检查 exclusiveOwnerThread == current(可重入)
// - 否则返回 false
// 5. addWaiter() 加入队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速入队
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 完整入队逻辑
enq(node);
return node;
}
// 6. acquireQueued() 阻塞等待
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 只有前驱是 head 才尝试获取锁(公平性)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
三、两种实现的对比
1. 数据结构对比
| 对比项 | synchronized (Monitor) | ReentrantLock (AQS) |
|---|---|---|
| 持有线程 | _owner (void*) | exclusiveOwnerThread |
| 重入计数器 | _recursions (int) | state (int) |
| 等待队列 | _EntryList (单链表) | Node (双向链表) |
| 实现层级 | JVM C++ 代码 | Java 代码 |
| 条件队列 | _WaitSet (单个) | ConditionObject (多个) |
2. 重入机制对比
synchronized
// 加锁
if (_owner == Self) {
_recursions++; // 直接递增
return;
}
// 解锁
if (_recursions != 0) {
_recursions--; // 直接递减
return;
}
ReentrantLock
// 加锁
if (current == getExclusiveOwnerThread()) {
int nextc = getState() + acquires; // state 加上 acquires
setState(nextc);
return true;
}
// 解锁
int c = getState() - releases; // state 减去 releases
if (c == 0) {
setExclusiveOwnerThread(null);
}
setState(c);
3. 性能对比
// 基准测试
@Benchmark
public void synchronizedReentrant() {
synchronized(lock) {
synchronized(lock) {
synchronized(lock) {
// 三次重入
}
}
}
}
@Benchmark
public void reentrantLockReentrant() {
lock.lock();
try {
lock.lock();
try {
lock.lock();
try {
// 三次重入
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
}
结果(现代 JVM):
- synchronized:~50 ns(JVM 内联优化)
- ReentrantLock:~60 ns(方法调用开销)
结论:性能差异很小,选择依据应是功能需求。
四、关键实现细节
1. 为什么用 state 而不是单独的计数器?
AQS 的 state 是多用途的:
- 独占锁:state = 重入次数
- 共享锁:state = 剩余许可数(如 Semaphore)
- 读写锁:state 高 16 位 = 读锁,低 16 位 = 写锁
// ReentrantReadWriteLock 的 state 划分
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 读锁单位
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 读锁计数:state >>> 16
// 写锁计数:state & EXCLUSIVE_MASK
2. 重入计数器的溢出处理
// synchronized:不检查溢出(理论上可达到 2^31-1)
_recursions++; // 可能溢出(实际不会发生)
// ReentrantLock:检查溢出
int nextc = c + acquires;
if (nextc < 0) { // 溢出检查
throw new Error("Maximum lock count exceeded");
}
3. 内存可见性保证
// synchronized:通过内存屏障(monitorenter/monitorexit)
monitorenter
LoadLoad 屏障
临界区代码
StoreStore 屏障
monitorexit
// ReentrantLock:通过 volatile
private volatile int state; // volatile 保证可见性
五、答题总结
问题:可重入锁的底层实现?
回答:
可重入锁的底层实现依赖持有线程标识和重入计数器两个核心数据。
synchronized 实现(ObjectMonitor)
- 数据结构:
_owner:持有锁的线程指针_recursions:重入计数器
- 加锁流程:
- 检查
_owner == NULL:CAS 获取锁 - 检查
_owner == 当前线程:计数器 + 1(可重入) - 否则进入
_EntryList阻塞
- 检查
- 解锁流程:
- 计数器 - 1
- 只有计数器为 0 时才释放锁,唤醒等待线程
ReentrantLock 实现(AQS)
- 数据结构:
state:锁状态(0 = 未锁定,n = 重入 n 次)exclusiveOwnerThread:持有锁的线程
- 加锁流程:
- 检查
state == 0:CAS 设置 state = 1 - 检查
exclusiveOwnerThread == 当前线程:state + 1(可重入) - 否则加入 AQS 队列阻塞
- 检查
- 解锁流程:
- state - 1
- 只有 state == 0 时才释放锁,唤醒队列头部线程
关键点:两者的可重入机制本质相同,都是通过线程标识 + 计数器实现,性能接近,主要区别在于功能特性(公平锁、条件变量等)。