第十四章-构建自己的同步工具
发布日期:2021-09-12 09:58:05 浏览次数:18 分类:技术文章

本文共 10329 字,大约阅读时间需要 34 分钟。

文章目录

第十四章 构建自定义的同步工具

一、状态依赖性的管理

像FutureTask、Semaphore和BlockingQueue类中的操作有基于状态为前提的类称为状态依赖性类。例如,不能从一个空的队列中删除元素、或者获取一个尚未结束的任务的计算结果。(依赖状态的个人理解:某个操作依赖于/等着什么状态条件改变,状态依赖性的管理需要考虑:在依赖条件不满足时该干什么?如何监测依赖状态的改变不浪费CPU资源,响应性高?)

  例如在生产者—消费者的设计经常会像ArrayBlockingQueue的有界缓存,在有界缓存的put和take操作中都有状态依赖条件:不能从空缓存中获取元素,也不能将元素放入已满缓存中。当依赖条件不满足时,这些依赖状态的操作put、take可以抛出一个异常或者返回一个错误,也可保持阻塞直到对象进入正确的状态。

1、依赖条件不满足时,可将依赖条件的失败传递给调用者

下面通过BaseBoundedBuffer有界缓存对依赖条件的失败不同处理介绍有界缓存的几种实现。

@ ThreadSafepublic abstract class BaseBoundedBuffer
{
@GuardeBy( "this" ) private final E[] buf; @GuardeBy( "this" ) private int tail; @GuardeBy( "this" ) private int head; @GuardeBy( "this" ) private int count; protected BaseBoundedBuffer( int capacity) //构造函数 {
this .buf = (E[]) new Object[capacity]; } protected synchronized final void doPut(E E) {
buf[tail] = E; if (++tail == buf.length) {
tail = 0; } ++count; } protected synchronized final E doTake() {
E E = buf[head]; buf[head] = null ; if (++head == buf.length) {
head = 0; } --count; return E; } public synchronized final boolean isFull() {
return count == buf.length; } public synchronized final boolean isEmpty() {
return count == 0; }}

GrumpyBoundedBuffer将前提条件的失败传递给调用者,虽然这种方法实现起来很简单,但是调用者必须做好捕获异常的准备,并且当如果在多个地方都要调用put和take方法时,并且前提条件还失败,这样就要不断的重试。

@ ThreadSafepublic class GrumpyBoundedBuffer
extends BaseBoundedBuffer
{
public GrumpyBoundedBuffer( int size) {
super (size); } public synchronized void put(V v) {
if (isFull()) {
throw new BufferFullException (); } doPut(v); } public synchronized V take() {
if (isEmpty()) throw new BufferEmptyExeption (); return doTake(); }}

调用者要调用GrumpyBoundedBuffer的take和put方法,取出值进行操作,可以采用两种方式:

  • 1、在循环中不断重试,这种方法称为忙等待或者自旋等待。自旋导致CPU时钟周期浪费。
  • 2、当缓存状态不满足时,进行休眠。低响应性。
while (true ){
//自旋不断重试 try {
V item = buffer.take(); // 对于item执行一些操作 break ; } catch (BufferEmptyException e) {
Thread. sleep(SLEEP_GRANULARITY ); }}

自旋有稍微好点的方法,在不满足依赖条件时,调用Thread.yield,相当于告诉调度器:现在需要让出一定的时间使另一个线程运行。当然即使使用了thread.yield()来让渡,但也只是减缓对CPU,上下文切换的消耗。

while(!message){
Thread.yield();}

2、依赖条件不满足时,可通过轮询和休眠实现简单阻塞

SleepyBoundedBuffer就是通过“轮询与休眠”重试机制实现put、take方法,从而使调用者无需每次调用都实现重试的逻辑。如果依赖条件不满足,那么当前执行的线程首先会释放锁并且休眠一段时间,从而让其他线程能够访问缓存。当线程醒来时,它将重新请求锁并重新尝试操作,因而线程能反复在休眠以及测试状态条件过程中切换,直到可以执行位置。

  从调用者看来,这种方法能很好的运行。但是如何选择合适的休眠时间间隔,就要在响应性和CPU使用率之间权衡,休眠时间间隔越小,响应性越高,但消耗的CPU资源越高。

@ ThreadSafepublic class SleepyBoundedBuffer
extends BaseBoundedBuffer
{
public SleepyBoundedBuffer( int size) {
super (size); } public void put(V v) throws InterruptedException{
while (true ){
synchronized (this ){
if (!isFull()){
doPut(v); return ; } } Thread.sleep(SLEEP_GRANULARITY); } } public V take() throws InterruptedException{
while (true ){
synchronized (this ){
if (!isEmpty()){
return doTake(); } } Thread.sleep(SLEEP_GRANULARITY); } }}

3、条件队列(Condition Queue)能及时响应依赖状态的改变且不浪费CPU

条件队列可以协调不同线程之间的工作,通过某种方式等待特定的条件变成真。

条件队列:装入的数据项是等待先验条件成立而被挂起的线程。不同于传统的队列存放数据。每个java对象可以作为一个锁,每个对象同样可以作为一个条件队列,并且Object中的wait、notify和notifyAll方法就构成内部条件队列的api。对象的内置锁和内部条件队列是相互关联的,要调用某个对象中条件队列的wait、notify和notifyAll任何一个方法,必须先持有该对象上的锁。

  Object.wait会自动释放锁,同时当前对象会请求操作系统挂起当前线程,此时对象的对象锁就可用了,允许其余等待线程进入。
  “条件队列中的线程一定是执行不下去了才处于等待状态”,这个"执行不下去的条件"叫做"条件谓词。
  需要注意的是,wait()方法的返回并不一定意味着正在等待的条件谓词变成真了。举个列子:假设现在有三个线程在等待同一个条件谓词变成真,然后另外一个线程调用了notifyAll()方法。此时,只能有一个线程离开条件队列,另外两个线程将仍然需要处于等待状态,这就是在代码中使用while(conditioin is not true){this.wait();}而不使用if(condition id not true){this.wait();}的原因。另外一种情况是:同一个条件队列与多个条件谓词互相关联。这个时候,当调用此条件队列的notifyAll()方法时,某些条件谓词根本就不会变成真。综上,这就是为什么每当线程被从wait中唤醒时,都必须再次测试条件谓词。
  下面的BoundedBuffer中,当执行notifyAll()方法时,线程会从wait()地方唤醒(本来是不满足条件谓词,执行wait然后挂起了线程)。使用条件队列会明显比“休眠”有界缓存更加高效,响应性也更高。

@ ThreadSafepublic class BoundedBuffer
extends BaseBoundedBuffer
{
// 条件谓词:not-full (!isFull()) // 条件谓词:not-empty (!isEmpty()) public BoundedBuffer( int size) {
super (size); } // 阻塞并直道:not-full public synchronized void put(V v) throws InterruptedException{
while (isFull()){
wait(); } doPut(v); notifyAll(); } // 阻塞并直道:not-empty public synchronized V take() throws InterruptedException{
while (isEmpty()){
wait(); } V v = doTake(); notifyAll(); return v; }}

二、如何使用条件队列

1、条件等待的标准形式

锁、条件谓词、条件队列三者关系并不复杂,但是wait方法返回并不意味着线程正在等待的条件谓词变真了,一个条件队列与多个条件谓词相关是很常见的情况。因此,每次线程都从wait方法唤醒,都必须再次测试条件谓词,由于现场在条件谓词不为真的情况下也可以反复醒来,因此必须在一个循环中调用wait,并且每次迭代中都测试条件谓词。条件等待的标准形式如下:

void stateDependentMethod() throws InterruptedException{
synchronized(lock){
while(!conditionPredition) lock.wait(); doSomething(); }}

2、条件等待的通知 notify 与notifyAll

信号丢失:指线程必须等待一个已经为真的条件,但在开始等待之前没有检测条件谓词。好比:启动了烤面包机去拿报纸,当烤面包机铃声响了,你没有听到还在等待烤面包机的铃声,因此可能会等待很长的时间。信号丢失也是一种活跃性故障。

  条件等待的包括等待和通知。在BoundedBuffer中,在缓存变非空时,为了使take解除阻塞,必须确保每条使缓存变非空的代码路径都发出一个通知。在BoundedBuffer中,只有一条代码路径即put方法,因此在put成功一个元素到缓存后,将调用notifyAll。同样在成功take一个元素后也要调用notifyAll,向正在等待“不为满”条件的线程发出通知:缓存已经不满了。
  通知是采用notifyAll和notify方法,无论调用哪个,都必须持有与条件队列对象相关的锁。调用notify时,JVM会从条件队列中等待的多个线程选择一个来唤醒,而调用notifyAll会唤醒这个条件队列上等待的所有线程。由于多个线程可以基于不同条件谓词在同一个条件队列上等待,那么如果使用notify而不是notifyAll将是一种危险的操作(可能唤醒的是错的,唤醒的不是想要去唤醒的),容易导致信号丢失。
  
  只有同时满足下面两个条件,才能用notify而不是notifyAll。但是大多数的类并不满足下面两个条件,因此普遍认可做法是notifyAll,虽然会比notify低效,但是可以确保正确。

  • 只有一个条件谓词与条件队列相关,并且每个线程从wait返回后都执行相同的操作。
  • 在条件变量上的每次通知,最多只能唤醒一个线程来执行。

三、显式的Condition对象提供更加细粒度的条件队列

public class BoundedBuffer
extends BaseBoundedBuffer
{
public BoundedBuffer(int capacity){
super(capacity); } public synchronized void put(T t) throws InterruptedException{
while(!isFull())//执行该方法的对象在条件谓词 !isFull 上等待 this.wait(); this.doPut(t); this.notifyAll(); } public synchronized T get() throws InterruptedException{
while(!isEmpty())//执行该方法的对象在条件谓词 !isEmpty 上等待 this.wait(); T t=this.doGet(); this.notifyAll(); return t; }}

然而在上面的BoundedBuffer例子中,同一个条件队列上存在两个条件谓词。这样,当调用notifyAll()方法的时候,唤醒的不仅是在!isFull()上等待的线程,还有在!isEmpty()上等待的线程,尽管唤醒在!isEmpty()上等待的线程是没有必要的,这就迫使我们想使用一种更加细颗度的条件队列。在Java中,除了提供内置锁和内置条件队列,还提供显式锁和显式条件队列。其中显式锁为Lock,显示条件队列为Condition对象。

一个Condition是和一个Lock关联起来的,就像一个内置条件队列和一个内置锁关联起来一样。要创建一个Condition,可以在相关联的Lock上调用newCondition()方法。每个内置锁只能有一个与之关联的内置条件队列,与之不同的是,每个Lock上可以有多个与他关联的Condition,这就使得我们对Condition的控制更加细粒度化。 对于上面的BoundedBuffer类,使用显式条件队列进行改进,如下:(signal比signalAll更高效,能极大的减少在每次缓存操作中发生的上下文切换和锁请求的次数。)

一个显式锁只能对应一个条件队列,也就是只能在一个条件队列里。

每个Lock上可以有多个与他关联的Condition,就好像有多个条件一样。

import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ConditionBoundedBuffer
{
private Lock lock=new ReentrantLock(); private Condition notFull=lock.newCondition(); private Condition notEmpty=lock.newCondition(); //条件谓词 private int head,tail,count; private Object[] buf; public ConditionBoundedBuffer(int capacity){
buf=new Object[capacity]; head=0;tail=0;count=0; } public void add(T t) throws InterruptedException{
lock.lock(); try{
while(count==buf.length) notFull.wait(); buf[tail]=t; if(++tail==buf.length) tail=0; count++; notEmpty.signal(); }finally{
lock.unlock(); } } public T get() throws InterruptedException{
lock.lock(); try{
while(count==0) notEmpty.wait(); Object obj=buf[head]; buf[head]=null; if(++head==buf.length) head=0; count--; notFull.signal(); return (T)obj; }finally{
lock.unlock(); } }}

1、每个Lock上可以有多个与他关联的Condition

在ReentrantLock和Semaphore两个接口中存在很多共同点,两个类都可以做一个“阀门”,每次都只允许一定数量线程通过,两者其实都是使用了一个共同的基类,即AbstractQueuedSynchronizer(AQS),ReentrantLock和Semaphore都实现了Synchronizer。

一个常见的练习,使用Lock来实现计数信号量

@ThreadSafepublic class SemaphoreOnLock {
private final Lock lock = new ReentrantLock(); // CONDITION PREDICATE: permitsAvailable (permits > 0) private final Condition permitsAvailable = lock.newCondition(); @GuardedBy("lock") private int permits; SemaphoreOnLock(int initialPermits) {
lock.lock(); try {
permits = initialPermits; } finally {
lock.unlock(); } } // BLOCKS-UNTIL: permitsAvailable public void acquire() throws InterruptedException {
lock.lock(); try {
while (permits <= 0) permitsAvailable.await(); --permits; } finally {
lock.unlock(); } } public void release() {
lock.lock(); try {
++permits; permitsAvailable.signal(); } finally {
lock.unlock(); } }}

AQS (AbstractQueuedSynchronizer)是一个构建锁和同步器的框架,CountDownLatch、SynchronousQueue和FutureTask也是由AQS构建的。

四、AbstractQueuedSynchronizer

大多数的开发者都不会直接用AQS,常见的标准同步器类集合能满足大多数的需求。java.util.concurrent中许多可阻塞的类,例如ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch、SynchronousQueue和FutureTask都是基于AQS构建的,不需要过于深入了解实现的细节。

五、小结

要实现一个包含依赖状态的类--如果没有满足依赖状态的前提条件,那么这个类的方法必须阻塞,那么最好的方式是基于现有的库类来构建,例如CountDownLatch、Semaphore。例如第八章的ValueLatch所示。然而有时,现有的库类不能提供足够的功能,这种情况下,可以使用内置的条件队列、显式的Condition对象或者AbstractQueuedSynchronizer来构建自己的同步器,显式的Condition和Lock相比内置条件队列提供了扩展的功能集,包括每个锁对应多个等待线程集、可中断或者不可中断的条件等待,公平或者非公平队列操作。

转载地址:https://blog.csdn.net/weixin_38367817/article/details/104296178 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:第十五章、原子变量与非阻塞同步机制
下一篇:计算机网络-第九章之无线网络

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2024年04月14日 17时16分23秒