Java并发源码分析 - 锁
(注:文章里涉及到的代码分析,基于jdk1.7.0_10 Hotspot 64-Bit)
基本概念
Java同步机制除了内置的synchronized(包含Object.wait/notify)以外,还通过concurrent包提供了多种锁,包含ReentrantLock、Semaphore、ReentrantReadWriteLock等,以及跟Object.wait/notify类似语义的Condition接口。
接口定义
具体的接口(Lock,Condition)就不在这里赘述,只做个简单总结:
- Lock接口提供三种不同类型的获取锁接口:不响应中断(interrupt)、响应中断、可以设置超时;
- Condition接口提供类似Object.wait语义的四种await接口:不响应中断(interrupt)、响应中断、可以设置超时、可以设置deadline;不管哪一种await,都必须在调用前持有跟该Condition对象关联的锁,Condition的实现会保证await调用在进入阻塞状态前释放锁,并且在await调用返回时,重新持有锁。
锁类型
- 同synchronized一样,concurrent包里提供的锁都是可重入的(reentrant):一个线程在持有一个锁时,在不释放该锁的前提下,可多次重新持有该锁;
- 互斥锁和共享锁:在一个线程持有锁的时候,如果其它线程不能再持有该锁,则为互斥锁,否则为共享锁;concurrent包里的ReentrantLock为互斥锁,Semaphore为共享锁,ReentrantReadWriteLock是共享锁及互斥锁的结合;
- 公平锁和非公平锁:公平锁保证线程以FIFO的顺序持有锁(不包含tryLock接口),但非公平锁不保证这点:在有线程在排队等待获取当前锁的时候,新的线程可以直接竞争成功并持有锁;
基本框架
简单查看一下ReetrantLock、Semaphore等类的实现,会发现都依赖于AbstractQueuedSynchronizer(AQS)这个类,这个其实是concurrent包里实现同步机制的一个核心框架,可以通过这篇论文来了解这个框架。该框架的核心实现要素包含以下三点:
- 同步状态的原子性管理
- 等待队列的管理
- 线程的阻塞和唤醒
同步状态的原子性管理
AQS将状态定义为一个整型变量(volatile int state),对它的修改AQS提供了两个接口,一个是基于volatile语义:
549 | protected final void setState(int newState) { |
另外一个依赖于Unsafe.compareAndSwapInt:
564 | protected final boolean compareAndSetState(int expect, int update) { |
那什么时候用setState,什么时候用compareAndSetState呢?简单看了下调用关系,有如下特征:
- 初始化state时一般用setState,比如:Semaphore、CountDownLatch、ReentrantReadWriteLock等的AQS子类初始化;
- 互斥锁的可重入处理逻辑中一般调用setState,比如:ReentrantLock的tryAcquire,ReentrantReadWriteLock的tryAcquire;
- 互斥锁的释放锁操作一般调用setState,比如:ReentrantLock的tryRelease,ReentrantReadWriteLock的tryRelease;
- 其它情况下都调用compareAndSetState。
从以上的情况来看,应该是在基本无竞争(初始化,重入处理、互斥锁的释放)的情况下调用setState;竞争比较激烈的情况下调用compareAndSetState。
等待队列的管理
AQS使用CLH队列的变种来管理等待线程,每个等待线程为一个结点(AbstractQueuedSynchronizer.Node),后文会混用结点和线程。
CLH队列中结点之间并不存在实际的连接,后继结点在等待锁的时候只是在前续结点的状态字段上自旋,直到获取锁。论文对AQS使用prev及next字段的解释是:
- prev主要为了完成超时及取消语义:如果前继结点取消,那么就是向前找到一个未取消的前继结点;
- next的主要作用在于优化后继结点的查找,避免每次都需要从tail结点向前反向查找。
线程的阻塞和唤醒
依赖于LockSupport.park(阻塞当前线程,实际调用Unsafe.park)及LockSupport.unpark(唤醒指定线程,实际调用Unsafe.unpark);根据LockSupport的Java doc可以了解到以下内容:
- park与unpark使用类似Semaphore的许可机制,如果当前线程拥有许可,那个park会消费掉该许可,并立即返回;如果当前线程没有许可,则当前线程会阻塞;unpark会导致指定线程的许可可用;
- 许可不会累加,最多只有一个,也就是说连续多次的unpark并不会导致许可变多,也就是说如下代码还是会导致当前线程阻塞:
1 | LockSupport.unpark(Thread.currentThread()); |
- 关于park()和park(Object blocker)的区别,Object blocker参数的作用在于允许记录当前线程被阻塞的原因,以便监控分析工具进行分析。官方的文档中也更建议使用park(Object blocker)。
AQS实现
分析AQS之前先了解下concurrent包里的类是如何使用AQS的。AQS是抽象类,ReentrantLock、Semaphore等类会在使用时定义一个子类(Sync,一般还会根据是否是公平锁定义FireSync、NonfairSync),根据具体的需要重写AQS定义的四个protected接口:
1 | /** |
注意返回值上,只有tryAcquireShared的返回值为int:大于0时,代表当前获取锁成功,后续的获取锁请求也可能会成功;等于0时,代表当前获取锁成功,后续获取锁请求必须等待;小于0时,代表当前获取锁失败,必须等待;其它返回值都为boolean,true则成功,false失败。
上述这几个接口的主要作用是什么呢?将管理锁(或者其它实现)的状态的任务交给具体实现类,这样AQS就不需要知道各个不同锁机制的状态之间的差别,从而简化AQS的实现。
然后具体的锁实现会调用AQS定义的几个公有方法来获取或者释放锁:
1 | /** |
addWaiter:等待队列的加入
605 | private Node addWaiter(Node mode) { |
605 | private Node enq(final Node node) { |
从上面的代码可以知道,结点的加入只是简单的通过CAS更新队列的tail字段:保证prev跟tail的原子更新,但不保证tail与next的原子更新。
acquire:互斥锁获取
1196 | public final void acquire(int arg) { |
855 | final boolean acquireQueued(final Node node, int arg) { |
release:互斥锁的释放
1259 | public final boolean release(int arg) { |
acquireShared:共享锁获取
946 | public final void acquireShared(int arg) { |
946 | private void doAcquireShared(int arg) { |
708 | private void setHeadAndPropagate(Node node, int propagate) { |
setHeadAndPropagate除了将head设置为当前持有锁的结点外,还需要保证在后面这两种情况下向后传播可以获取锁的信息:
- propagate > 0(也就是tryAcquireShared > 0,表示后续的获取锁操作也可能成功);
- 原始head结点的waitStatus < 0,也就是以前有某个结点希望释放锁的操作向后传播。
releaseShared:共享锁的释放
1339 | public final boolean releaseShared(int arg) { |
670 | private void doReleaseShared() { |
可以看到,doReleaseShared需要保证两点:
- 要么至少唤醒一个等待的结点:waitStatus == Node.SIGNAL;
- 要么将当前head结点的waitStatus设置成Node.PROPAGATE,以保证在后续线程持有到锁后,可以向后传播此次释放锁事件(见setHeadAndPropagate的分析)。
具体锁实现
ReentrantLock
互斥模式,state代表互斥锁的状态:为0说明当前锁可用;为1说明当前锁已经被某个线程持有,其它线程必须等待。获取锁等价于将state设置成1;释放锁等价于将state设置为0。
公平锁获取
236 | protected final boolean tryAcquire(int acquires) { |
非公平锁获取
672170 | protected final boolean tryAcquire(int acquires) { |
133 | final boolean nonfairTryAcquire(int acquires) { |
ReentrantReadWriteLock
共享互斥模式结合:写锁对应互斥锁,读锁对应共享锁。state被分为两部分:高16位代表读锁持有数量;低16位代表写锁持有数量。
主要的实现逻辑跟ReentrantLock类似,但因为同时有两个锁,所以有些不同:
- 在写锁被当前线程持有的情况下,其它线程不同持有任意锁;
- 在写锁被当前线程持有的情况下,当前线程可以继续请求获取读锁和写锁;
- 在读锁被当前线程持有的情况下,其它线程可以持有读锁,不能持有写锁;
- 在读锁被当前线程持有的情况下,当前线程和其它持有读锁的线程可以继续请求获取读锁,不能请求获取写锁。
代码就不详细说明了。
Semaphore
共享模式,state代表许可的个数,初始为许可的个数,每一次的acquire,许可减1。注意:tryAcquireShared返回为int,这里会返回剩余的许可个数。
公平与非公平的处理与ReentrantLock处理逻辑类似,不再详细分析。
CountDownLatch
共享模式,state代表count个数,初始为count个数。下面为核心代码:
177 | protected int tryAcquireShared(int acquires) { |
可以看到,在初始情况下,所有的tryAcquireShared(CountDownLatch.await会调用此方法)都会阻塞(getState == count,不为0);每一次的tryReleaseShared(CountDownLatch.countDown会调用此方法)将count减1,直到为0并且会返回true(nextc == 0),这时acquireShared会调用doReleaseShared唤醒被阻塞的线程(getState == 0保证tryAcquireShared肯定会成功)。
FutureTask
共享模式,state代表任务的完成状态:0代表任务已经准备就绪,1代表任务正在运行,2代表任务已经完成,4代表任务取消。
223 | /** |
由上面代码可以看到在任务没有完成时,任何调用tryAcquireShared(FutureTask.get会调用此方法)的线程都会阻塞;tryReleaseShared永远返回true。
任务执行完成后,会将state设置成2(正常完成或者出现异常)或者4(任务被取消):innerIsDone方法在这两种情况下都会返回true。