
本文共 9680 字,大约阅读时间需要 32 分钟。
并发笔记传送门:
在
ReentrantLock
和Semaphore
这两个接口之间存在许多共同点。这两个类都可以用做一个“阀门”,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lock
或acquire
时成功返回),也可以等待(在调用lock
或acquire
时阻塞),还可以取消(在调用tryLock
或tryAcquire
时返回“假”,表示在指定的时间内锁是不可用的或者无法获得许可)。而且,这两个接口都支持可中断的、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。
事实上,它们在实现时都使用了一个共同的基类,即
AbstractQueuedSynchronizer
(AQS
),这个类也是其他许多同步类的基类。AQS
是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。不仅ReentrantLock
和Semaphore
是基于AQS
构建的,还包括CountDownLatch
、ReentrantReadWriteLock
、SynchronousQueue
和FutureTask
。
AQS 中的获取与释放
在基于 AQS 构建的同步器类中,最基本的操作包括各种形式的锁获取和释放操作。并且获取操作是一种依赖状态的操作,并且通常会阻塞。
如下伪代码给出了 AQS 获取与释放的简单逻辑。 (Douge Lea 老爷子源码写的太精妙,得慢慢品)boolean acquire() throws InterruptedException{ while (当前状态不允许获取操作){ if (需要阻塞获取请求){ 如果当前线程不再队列中,则将其插入队列 阻塞当前线程 }else { 返回失败 } } 更新同步器的状态 如果线程位于队列中,则将其移除队列 返回成功 } void release(){ 更新同步器的状态 if(新的状态允许某个阻塞的线程获取成功){ 解除队列中一个或多个线程的阻塞状态 } }
一个获取操作包括两部分:
- 首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将阻塞或失败。这种判断是根据同步器的语义决定的。例如:对于锁来说,如果它没有被某个线程持有,那么就能成功的获取;而对于闭锁来说,如果它处于结束状态,那么也能被成功的获取。
- 其次,就是更新同步器的状态,获取同步器的某个线程可能会对其他线程能否获取该同步器照成影响。例如,当获取一个锁后,锁的状态将『未备持有』变成『已被持有』,而从
Semaphore
中获取许可后,将把许可证的数量减1。然而,当一个线程获取闭锁时,并不会影响其他线程能否获取它。
根据同步器性质的不同,实现的方法各有差异:
- 独占操作(例如:
ReentrantLock
):如果某个同步器支持独占的获取操作,那么需要实现AQS
的tryAcquire
、tryRelease
、tryHeldExeclusively
等方法。 - 非独占操作(例如:
Semphore
,CountDownLatch
):对于支持共享获取的同步器,则应该实现tryAcquireShared
、tryReleaseShared
等方法
AQS
中的的acquire
、acquireShared
、release
、releaseShared
等方法都将调用这些方法在子类中带有前缀try
的版本来判断某个操作是否能够执行。
一个简单的闭锁
OneShotLatch
包含两个公有方法:await
和 signal
,分别对应获取和释放操作。起初,闭锁是关闭的,任何调用 await
的线程都将阻塞并直到闭锁打开。当通过调用 signal
打开闭锁时,所有等待中的线程豆浆被释放,并且随后到达闭锁的线程也允许被执行。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class OneShotLatch { private final Sync sync = new Sync(); public void signal() { sync.releaseShared(0); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(0); } private static class Sync extends AbstractQueuedSynchronizer { @Override protected int tryAcquireShared(int arg) { int state = getState(); //如果闭锁是开的(state==1),那么这个操作讲成功,否则失败 System.out.println("state = " + state); return getState() == 1 ? 1 : -1; } @Override protected boolean tryReleaseShared(int arg) { setState(1);//打开闭锁 return true;//其他线程可以获取该闭锁 } } public static void main(String[] args) { OneShotLatch osl = new OneShotLatch(); new Thread(() -> { System.out.println("we are in main 01 thread, and start osl await"); try { osl.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main 01 thread osl await finished"); }).start(); new Thread(() -> { System.out.println("we are in main 02 thread, and start osl await"); try { osl.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main 02 thread osl await finished"); }).start(); new Thread(() -> { System.out.println("we are in main 03 thread, and first sleep 5s"); try { Thread.sleep(5000); System.out.println("we are in main 03 thread, and start osl await"); osl.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main 03 thread osl await finished"); }).start(); new Thread(() -> { System.out.println("we are in work thread,and we start waiting"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("work finish,now we signal main thread"); osl.signal(); }).start(); }}
java.util.concurrent中的 AQS
并发包中有许多的可阻塞类,例如
ReentrantLock
、Semaphore
、CountDownLatch
、ReentrantReadWriteLock
、SynchronousQueue
和FutureTask
等,都是基于AQS
构建的。
ReentrantLock
ReentrantLock
只支持独占方式的获取操作,因此它实现了tryAcquire
、tryRelease
和isHeldExclusively
方法。
ReentrantLock
将同步状态state
用于保存锁获取操作的次数。- 维护了一个
owner
变量来保存当前线程,但是在1.6上进行了重构增加了AbstractOwnableSynchronizer
用exclusiveOwnerThread
来保存当前线程。只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量。- 在
tryRelease
中检查owner
域,从而确保当前线程在执行unlock
操作前已经获取了锁 - 在
tryAcquire
中将使用owner
域判断获取操作是重入还是竞争的
- 在
非公平锁版本(默认)
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
当一个线程尝试获取锁时,tryAcquire
将首先检查锁的状态:
- 未被持有:通过
compareAndSetState(0, acquires)
原子性的操作尝试更新锁的状态以表示已经被持有。 - 已经持有:判断当前现场是否为锁的拥有者,是:计数递增(所以
ReentrantLock
是可重入锁);不是:获取操作失败。
Semaphore
与 CountDownLatch
Semaphore
与CountDownLatch
是属于支持共享获取的同步器,因此它们实现了tryAcquireShared
和tryReleaseShared
方法
Semaphore
将 state
用于保存当前可用许可数量。
以Semaphore
的非公平锁实现为例:
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; //这个代码很精髓啊 //remaining < 0 :如果没有足够的许可,退出循环 //compareAndSetState设置成功,退出循环;设置失败,重新尝试 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
tryAcquireShared
首先会计算剩余许可的数量。
- 如果数量不足,那么会返回一个值表示获取操作失败。
- 如果还有剩余的许可数量,会通过
compareAndSetState
以原子的方式来降低许可的计数。如果这个操作成功(意味着从上次读取后就没有被修改过),那么就返回一个值表示操作获取成功。
CountDownLatch
使用 AQS
的方式与Semaphore
很相似:同步状态 state
用来保存当前的计数值
await()
调用关系: graph LRawait-->acquireSharedInterruptibly acquireSharedInterruptibly --> tryReleaseShared
await
调用 acquire
,当计数器为0时,acquire
将立即返回,否则将执行doAcquireSharedInterruptibly
进入阻塞。
private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
countDown()
调用关系:
graph LRcountDown-->releaseSharedreleaseShared-->tryReleaseShared
countDown()
调用 tryReleaseShared
来完成计数递减,当计数值为0时,执行doReleaseShared
解除所有等待线程的阻塞。
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
FutureTask
老爷子新版本并没有直接使用
AQS
,通过sun.misc.Unsafe UNSAFE
来操作实现。
ReentrantReadWriteLock
ReadWriteLock
接口表示存在两个锁:读取锁和写入锁,但在基于AQS
实现的ReentrantReadWriteLock
中,单个AQS
子类将同时管理读取加锁和写入加锁。
ReentrantReadWriteLock
使用了两个16位的状态分别表示写入锁和读取锁的计数。在读取锁上的操作使用共享的获取、释放方式;在写入锁上的操作使用独占的获取、释放方式。
AQS
在内部维护一个等待线程队列,其中记录了某个线程是独占(Node.EXCLUSIVE
)还是共享(Node.SHARE
)访问。 小结
AQS
源码真心复杂,本篇只是粗浅的记录下并发包内的AQS
的使用情况,下一篇争取啃下AQS
的实现原理
Doug Lea
老爷子一个人撸起了 java 并发的大旗,真滴猛。
以下是是网上比较好的 AQS 源码解析,记录一下
发表评论
最新留言
关于作者
