四、并发编程实战之基础构建模块
发布日期:2021-09-12 09:57:53 浏览次数:34 分类:技术文章

本文共 21462 字,大约阅读时间需要 71 分钟。

文章目录

基础构建模块

上一章介绍了构造线程安全类采用的一些技术,比如将线程安全性委托给现有的线程安全类。委托是创建线程安全类的一个最有效的策略:只需让现有的线程安全类管理所有的状态即可。

Java平台类库包含了丰富的并发基础构建模块,例如线程安全的容器类以及各种用于协调多个相互协作的线程控制流的同步工具类(Synchronizer)。

一、同步容器类

同步容器类包括Vector和Hashtable。

此外还包括在JDK1.2中添加的一些功能相似的类,这些同步的封装器类是由Collections.synchronizedXxx等工厂方法创建的,这些类实现线程安全的方式是:
将他们的状态封装起来,并对每个公有方法都进行同步,使得每一次只有一个线程能访问容器的状态。

1、同步容器类的问题

首先容器中常见的复合操作包括:迭代、跳转(根据指定顺序找到当前原始的下一个元素)、以及类似“若没有则添加”的条件运算。在同步容器类中,这些复合操作在单线程操作下仍然是安全的,单并发修改容器时,它们可能会出现出乎意料的意外。因此,同步容器类可能需要额外的客户端加锁来保护复合操作。

public static Object getLast(Vector list)	{
int lastIndex = list.size() - 1; return list.get(lastIndex); } public static void deleteLast(Vector list) {
int lastIndex = list.size() -1 ; list.remove(lastIndex); }

线程A在包含10个元素的Vector上面调用getLast。同时线程B在同一个Vector上面调用deleteLast,A,B经过判断得到的大小都是10,然后线程B调用remove(9),此时线程A调用get(9)的话就会抛出ArrayIndexOutOfBoundsException异常。

在对象的组合中客户端加锁方式,通过获得容器类list的锁,可以使getLast、deleteLast方法成为原子操作,并确保Vector的大小在调用size和get之间不会发生变化。

public static Object getLast(Vector list)	synchronized (list)	{
int lastIndex = list.size() - 1; return list.get(lastIndex); } public static void deleteLast(Vector list) synchronized (list) {
int lastIndex = list.size() -1 ; list.remove(lastIndex); }

再例如:可能抛出ArrayIndexOutOfBoundsException的迭代操作。在调用size、 get的迭代过程中,若有另外一个线程修改了Vector,将会抛出ArrayIndexOutOfBoundsException。

//可能抛出ArrayIndexOutOfBoundsException的迭代操作	for (int i = 0; i< vector.size(); i++)	{
doSomething(vector.get(i)); }

客户端加锁方式改进:

synchronized(vector){
for (int i = 0; i< vector.size(); i++) {
doSomething(vector.get(i)); } }

2、迭代器与ConcurrentModificationException

Vector是一个较老的类,但是在很多新的容器类中也没有解决复合操作中的问题。

在许多现代的容器类中也有类似Vector中的同步问题,"当发现容器在迭代过程中被修改时,就会抛出一个ConcurrentModificationException。
无论直接迭代还是使用for-each循环,对容器类进行迭代的标准方式都是使用Iterator,Iterator机制的实现方式是将计数器的变化和容器关联起来,"如果迭代期间计数器被修改,那么hasNext或next方法就会抛出异常。 "
如果有多个线程并发修改容器,采用迭代器时需要在迭代期间对容器加锁。

但是,长时间地对容器加锁会降低程序的可伸缩性,迭代期持有的锁时间越长,在锁上竞争可能越激烈,迭代器客户端加锁方式性能会受到影响。

如果不希望迭代期对容器加锁,还有一种替代方法就是“克隆”容器,并在副本上进行迭代。副本会被封闭与线程内
,因此其他线程不会在迭代期间对副本修改,避免出现抛出ConcurrentModificationException。此方法关键在于
在克隆过程中进行对容器的加锁,但在克隆容器时会存在显著的性能开销。

3、隐藏迭代器

你必须记住所有对共享容器进行迭代的地方都需要加锁,实际情况会更加复杂,“因为某些情况下,迭代器会隐藏起来。”

书中提供了HiddenIterator 例子,在调用System.out.println("debug: added ten elements to "+set); 过程中会执行对set的toString方法,标准容器的toString方法会隐式迭代容器。TestHiddenIteratorThread
是自己编的进行测试,Thread1在执行System.out.println("debug: added ten elements to "+set);
隐式调用了set的迭代器,但是Thread2可能又同时更改了set,因此会出现ConcurrentModificationException。

public class HiddenIterator {
private final Set
set = new HashSet<>(); public synchronized void add(Integer i) {
set.add(i);} public synchronized void remove(Integer i) {
set.remove(i);} public void addTenThings() {
Random random = new Random(); for(int i = 0; i<10; i++) {
add(random.nextInt()); System.out.println("debug: added ten elements to "+set); } }}public class TestHiddenIteratorThread implements Runnable{
HiddenIterator hiddenIterator = new HiddenIterator(); @Override public void run() {
System.out.println(Thread.currentThread().getName()); hiddenIterator.addTenThings(); } public static void main(String[] args) {
TestHiddenIteratorThread testHiddenIteratorThread = new TestHiddenIteratorThread(); new Thread(testHiddenIteratorThread, "Thread1").start(); new Thread(testHiddenIteratorThread, "Thread2").start(); }}

容器的hashCode、equals以及ContainsAll、romoveAll等方法也会间接的执行迭代操作。

二、并发容器

虽然已经有了同步容器类(Vector、Stack、HashTable和Collections类中提供的静态工厂方法创建的类),但是同步容器类基本上的方法都采用了synchronized进行了同步,很明显必然会影响到执行性能"。于是乎,java5.0提供了多种并发容器类,例如ConcurrentHashMap、CopyOnWriteArrayList。

上面讲的是同步容器,这里讲的是并发容器,并发容器是针对多个线程并发访问设计的。

ConcurrentHashMap用来替换基于同步且基于散列的Map、以及CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的List。
在新的ConcurrentMap接口中增加了对一些常见复合操作的支持,比如"若没有则添加"、替换以及有条件删除等。

Java5.0增加了两种新的容器类型Queue、BlockingQueue。

Queue提供了几种实现,包括ConcurrentLinkedQueue和PriorityQueue(非并发队列)。Queue上的操作不会阻塞,如果为空,则返回null。
BlockingQueue是阻塞的,队列为空,则一直等待,直到出现可用的元素,如果已满,那么插入的操作将一直等待,知道出现可用的空间。在生产者-消费者设计模式中,阻塞队列是非常有用的。

1、ConcurrentHashMap

HashMap线程不安全,Hashtable线程安全但是并发操作独占一把锁,性能不行。因此,新增了强大的ConcurrentHashMap并发容器类,ConcurrentHashMap。并不是将每个方法都用在同一个锁上同步使每次一个线程访问容器,而是采用一种粒度更细的加锁机制(分段锁)来实现更大程度的共享,任意数量的读写线程可以并发访问Map,并且ConcurrentHashMap提供的迭代器具有"弱一致性",不会抛出ConcurrentModificationException,迭代过程不需要进行加锁。

相比Hashtable和synchronizedMap,ConcurrentHashMap拥有更多的优势。ConcurrentHashMap比synchronizedMap有着更好的伸缩性(扩展性)。只有当应用程序需要加锁Map进行独占访问,才应该放弃使用ConcurrentHashMap。同时由于ConcurrentHashMap不能加锁来执行独占访问,我们也无法使用客户端来创建新的原子复合操作,但是ConcurrentHashMap也比较人性,一般基本复合操作例如“若没有则添加、若相等则移除”都在ConcurrentMap接口中声明,而ConcurrentHashMap刚好实现了该接口。

2、CopyOnWriteList(写时复制)

CopyOnWriteArrayList用于替代同步List,这个容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步同步。

在修改的时候,会创建并重新发布一个新的容器,从而实现可变性,"写入时复制"容器的迭代器保留一个指向底层基础数组的引用,并且这个数组当前位于迭代器的起始位置。
由于它不会被修改,因此对其进行同步时只需要保证数组内容的可见性。写时复制的迭代器不会抛出ConcurrentModificationException异常,并且返回的元素与迭代器创建时的元素完全一致。

import java.util.ArrayList;import java.util.List;import java.util.UUID;import java.util.concurrent.CopyOnWriteArrayList;public class Test{
/** * 1、故障现象 * java.util.ConcurrentModificationException * 2、导致原因 * 并发争抢修改导致 * 3、解决方案 * 3.1 new Vector() * 3.2 Collections.synchronized(new ArrayList<>()); * 3.3 new CopyOnWriteArrayList<>() * * 4、优化建议 * @param args */ public static void main(String[] args) {
List
list = new ArrayList<>(); //Vector Synchronized for (int i=1;i<=30;i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(list); },String.valueOf(i)).start(); } }}

CopyOnWriteArrayList的add源码

public boolean add(E e){
final ReetrantLock lock = this.lock; lock.lock(); try {
Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements,len+1); newElements[len] = e; setArray(newElements); return true; } finally {
lock.unlock(); }}

三、阻塞队列和生产者-消费者模式

生产者-消费者模式有如下几点优点:

  • “解耦”,假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。
    将来如果消费者的代码发生变化, 可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也 就相应降低了。
  • “支持并发”,由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区作为桥梁连接,生产者只需要往缓冲区里丢数据,就可以继续生产
    下一个数据,而消费者只需要从缓冲区了拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。

BlockingQueue的核心方法:

1.放入数据

  • offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回fals(本方法不阻塞当前执行方法 的线程);
  • offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
  • put(an Object):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。
  1. 获取数据
  • poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
  • poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  • take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
  • drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁。

LinkedBlockingQueue 和ArrayBlockingQueue是FIFO队列,两者分别与LinkedList和ArrayList相似,但比同步List有更好的并发性能。

PriorityBlockingQueue是一个按照优先级排序的队列。当你希望按照某种顺序而不是FIFO来处理元素的时候,这个队列将非常有用。
“SynchronousQueue :一种无缓冲的等待队列,因此take和put会一直阻塞,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。”

1、串行线程封闭

对于可变对象,生产者-消费者这种设计与阻塞队列一起,促进了串行线程封闭,从而将对象所有权从生产者交付给消费者,线程封闭对象只能单个线程拥有,但是可以通过安全的发布该对象来转移所有权。在转移所有权之后,也只有另外一个线程能获得这个对象的访问权限。

2、双端队列与工作密取

Java6增加了两种容器类型,Deque和BlockingDeque,他们分别对Queue和BlockingQue进行了扩展,Deque是一个双端队列,实现了在队列头和队列尾的高效插入和移除。具体实现包括ArrayDeque和LinkedBlockingDeque。

四、阻塞方法与中断方法

线程可能会阻塞或者暂停,原因有多种:等待I/O操作结束,等待获得一个锁,等待从Thread.sleep方法中醒来,或者是等待另一个线程的计算结果。

当线程阻塞时,它常常被挂起,并处于某种阻塞状态(BLOCKED、WAITING或者TIMED_WAITING)。
“阻塞操作与执行很长时间的普通操作的差别在于:”
被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行。
比如:等待I/O操作完成、等待某个锁变成可用、或者等待外部计算的结束。

interrupt 英[ˌɪntəˈrʌpt] 阻塞

BlockingQueue的put和take等方法会"抛出受检查异常Interrupted-Exception,这与类库中其他一些方法的做法相同,比如Thread.sleep"。

“当一个方法抛出Interrupted-Exception的时候,表示这个方法是一个阻塞方法”,如果这个方法被中断,那么他将努力提前结束阻塞状态。
"Thread提供了interrupt方法,用于中断线程或者查询线程是否已经被中断。"每一个线程都有一个布尔类型的属性,表示线程的状态,当中断线程时将设置这个状态。
中断是一种协作机制,一个线程不能强制要求其他线程停止正在执行的操作而去执行其他操作。当线程A中断线程B时,A只是要求B在执行到某个可以暂
停的地方停止正在执行的操作。但是实际怎样处理中断是由线程B自己决定的。

当在代码中调用了一个将抛出InterruptedException异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理中断的响应,有两种选择:

  • 传递:避开这个异常通常是最明智的策略,只需要将InterruptedException传递给方法的调用者,你可以根本不捕获该异常,或者捕获该异常然
    后执行某种简单的清理工作后再次抛出这个异常。
  • 恢复中断:有些interruptedException情况下,例如当代码是Runnable的一部分时,只能必须捕获InterruptedException可通过调用当前线程上
    的interrupt方法恢复中断状态,这样在调用栈中更高层的代码中将看到引发了一个中断。

中断的必要条件:

“只有阻塞方法才可以中断,因为它提供了中断响应的策略,非堵塞方法强制中断线程不安全”。中断后不会产生问题。但是在非中断方法中,比如
线程在执行一个排序算法,那么在排序过程中并不会发生中断,因为此时中断会产生问题即排序的数组中一部分有序一部分无序。此时如果在排序过程中必须处理才能中断。此时如果要强制结束线程必须调用Thread.stop()或者Thread.destroy(),但是这样强制结束线程是不安全的.例如下面的例子:
当线程中断时,还是会继续输出2和3。当下面的Thread.currentThread().interrupt();改成 Thread.currentThread().stop();或者 Thread.currentThread().destroy();,才只输出结果1。

public class InterruptedExample implements Runnable  {
@Override public void run() {
System.out.println("1"); Thread.currentThread().interrupt(); System.out.println("2"); System.out.println("3"); } public static void main(String[] args) {
Thread interruptedThread = new Thread(new InterruptedExample()); interruptedThread.start(); } }

interrupt方法详解

interrupt()方法的简单理解:

interrupt() 方法只是改变中断状态而已,它不会中断一个正在运行的线程。
这一方法实际完成的是,给受阻塞的线程发出一个中断信号,这样受阻线程就得以退出阻塞的状态。
更确切的说,如果线程被Object.wait,Thread.join和Thread.sleep三种方法之一阻塞,此时调用该线程的interrupt()方法,那么该线程将抛出一
个 InterruptedException中断异常(该线程必须事先预备好处理此异常),从而提早地终结被阻塞状态。如果线程没有被阻塞,这时调用interrupt()将不起作用,直到执行到wait(),sleep(),join()时,才马上会抛出 InterruptedException。

五、同步工具类

在容器类中,阻塞队列是一种独特的类,它们不仅能作为保存对象的容器,还能协调生产者和消费者等线程之间的控制流,因为take和put等方法将阻塞,直到队列达到期望的状态(队列既非空也非满)。同步工具类可以是任何一个对象,只要它根据其自身状态来协调线程的控制流。同步工具类可以是阻塞队列、信号量Semaphore、栅栏Barrier、闭锁。

1、闭锁

闭锁是一个同步工具类,可以延迟线程的进度直到其到达终止状态。简单地说,闭锁可以用来确保某些活动直到其它活动都完成后才继续执行。

闭锁作用相当于一扇门:当闭锁达到结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并且允许所有的线程通过。当闭锁达到结束状态后,将不会再改变状态,因此此扇门将永远保持打开状态。闭锁可以用来确保某些活动指导其他活动都完成后才继续执行。
例如:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行。
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。
  • 等待直到某个操作的所有参与者都就绪再继续执行。

CountDownLatch是一种灵活的闭锁实现,可以在上面a、b、c情形中使用,它可以是一个或者多个线程等待一组时间发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件发生了,而await方法等待计数器达到零,表示需要等待的事件都已发生,如果计数器的值非零,那么await会一直阻塞直到计数器为0,或者等待中的线程中断、或者等待超时。

CountDownLatch的用法:

典型用法1:

  • 某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为n new CountDownLatch(n) ,每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
    典型用法2:
  • 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计数器初始化为1,多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。

CountDownLatch的不足

CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

import java.util.concurrent.CountDownLatch;public class CountDownLatchDemo{
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6); for (int i=1;i<=6;i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t 上完自习,离开教室"); countDownLatch.countDown(); },String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"\t *****************88班长最后关门走人"); //班长在其他人走之前不能离开。 }}

注意点(CountDownLatch关注的是事件,不一定非要是多个线程调用方法,其实一个线程调用多次方法就可以了)

在一个线程中也可以

import java.util.concurrent.CountDownLatch;public class TestTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(4); for (int i=1;i<=4;i++) {
countDownLatch.countDown(); } countDownLatch.await(); System.out.println(11111); }}

上面的代码也会输出111

2、FutureTask

FutureTask也可以用作闭锁。(FutureTask实现了Future语义,表示一种抽象的可生成结果的计算。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态后,它会停止在这个状态上。

FutureTask 有点类似Runnable,都可以通过Thread来启动,不过FutureTask可以返回执行完毕的数据,并且FutureTask的get方法支持阻塞。由于FutureTask可以返回执行完毕的数据,并且FutureTask的get方法支持阻塞这两个特性,我们可以用来预先加载一些可能用到资源,然后要用的时候,调用get方法获取(如果资源加载完,直接返回;否则继续等待其加载完成)。

public class Preloader {
private final FutureTask
future = new FutureTask
(new Callable
{
public ProductInfo call() throws DataLoadException {
return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() {
thread.start(); System.out.println("start"); } public ProductInfo get() throws DataLoadException, InterruptedException {
try {
System.out.println("get start"); ProductInfo per = future.get(); System.out.println("get end"); } catch (ExecutionException e) {
Throwable cause = e.getCause(); if (cause instanceof DataLoadException) throw (DataLoadException) cause; else {
throw LaunderThrowable.launderThrowable(cause); // return null; } } }}

3、信号量

信号量主要用于两个目的:

  • 用于多个共享资源的互斥使用
  • 另一个用于并发线程数的控制

计数信号量(Counting Semaphore)用来控制同时访问特定资源的操作数目,或者同时执行某个指定操作的数量。Semaphore中管理者一组虚拟的许可,许可的初始数量可通过构造函数来指定,在执行操作时可以首先获得许可,并在使用之后释放许可,如果没有许可,那么acquire将阻塞直到有许可(或者被中断或者操作超时)。Semaphore可以用于实现资源池,例如数据库连接池。我们可以构造一个固定长度的资源池,当从池中获取一个资源之前首先调用acquire方法获取一个许可,在资源返回给池之后调用release释放许可,那么acquire将一直阻塞直到资源池不为空。

可以使用Semaphore将任何一种容器变成有界阻塞容器,Semaphore就是操作系统中的信号量,是为了解决资源分配问题,限制同一时刻访问某一资源的线程个数。

import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;public class SemaphoreDemo{
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);//模拟3个停车位 for(int i=1;i<=6;i++) //模拟6辆车 {
new Thread(()->{
try {
semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"\t抢到车位"); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"\t停车三秒后离开车位"); } catch (InterruptedException e) {
e.printStackTrace(); } finally {
semaphore.release(); } },String.valueOf(i)).start(); } }}

4、栅栏

栅栏Barrier类似闭锁,能阻塞一组线程直到某个时间发生,与阻塞关键区别在于:所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException,如果成功地通过栅栏,那么await将会为每个线程返回一个唯一的达到索引号,我们可以利用这些索引“选举”产生一个领导线程,并在下一次迭代中由该线程领导之下一些特殊工作。

import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo{
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()-> {
System.out.println("召唤神龙");}); for(int i=1;i<=7;i++) {
final int tempInt = i; new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t 收集到第:"+tempInt+"龙珠"); try {
cyclicBarrier.await(); } catch (InterruptedException e) {
e.printStackTrace(); } catch (BrokenBarrierException e) {
e.printStackTrace(); } },String.valueOf(i)).start(); } }}

六、构建高校且可伸缩的结果缓存

几乎所有的服务器都会使用某种形式的缓存,重用之前的计算结果能降低延迟、提高吞吐量,但是需要消耗更多的内存,时间换空间的代价。

书中从HashMap和同步机制来初始化缓存,改进至ConcurrentHashMap替换HashMap,再到基于FutureTask的Memorizing封装器,到最后最终实现。

1、使用HashMap和同步机制来初始化缓存

//首先有一个Computable
接口:里面是一个需要很长时间的一个计算操作 public interface Computable
{
V compute(A arg) throws InterruptedException; } 其次自己测试的一个耗时长的TimeConsumingComputor 实现了Computable
接口。 public class TimeConsumingComputor implements Computable
{
@Override public Integer compute(Integer arg) throws InterruptedException {
for(long i = 0; i < 10000000; i++) ; return arg; } } /*然后出现第一代使用HashMap和同步机制当缓存器,HashMap不是线程安全的,因此采用同步机制确保线程安全,但是出现一个明显的可伸缩性问题,每次只能有一个线程执行compute,当compute计算耗时长时,其他线程可能会阻塞很长时间,显然并发性很糟糕。*/ public class Memoizer1
implements Computable
{ private final Map
cache= new HashMap
(); private final Computable
computor; //接口对象 public Memoizer1(Computable
c) { this.computor = c; } public synchronized V compute(A arg) throws InterruptedException { V result = cache.get(arg); //查看在不在缓存里面,在的话直接返回,不在的话计算 if(result == null) { result = computor.compute(arg); cache.put(arg, result); } return result; } }//下面是测试第一代的缓存器: public class TestMemorizerThread implements Runnable { private TimeConsumingComputor timeConsumingComputor = new TimeConsumingComputor(); //实现了Computable接口 private Memoizer1
memorizer1 = new Memoizer1
(timeConsumingComputor); @Override public void run() { // TODO Auto-generated method stub long startTime = System.currentTimeMillis(); for(int i = 0; i < 100; i++) { try { System.out.println(Thread.currentThread().getName()+" input = "+ i +" result = " + memorizer1.compute(i)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } long endTime = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName()+" cost time is "+String.valueOf(endTime - startTime)); } public static void main(String[] args) throws InterruptedException { TestMemorizerThread testMemorizerThread = new TestMemorizerThread(); new Thread(testMemorizerThread, "thread1").start(); // Thread.sleep(2000); new Thread(testMemorizerThread, "thread2").start(); new Thread(testMemorizerThread, "thread3").start(); } } /*但是实际并发执行时,线程1 2 3都会去计算相同的i,三个线程计算相 同的东西,并且是排队阻塞计算,都要写入缓存,缓存的作用是避免相同结 果计算多次,显然第一代缓存器很糟糕。 (同时执行相同的i,都会计算都 会写入缓存) */

2、用ConcurrentHashMap替换HashMap

//第二代缓存器,用ConcurrentHashMap替换了HashMap和同步机制    public class Memoizer2
implements Computable
{
private final Map
cache = new ConcurrentHashMap
(); private final Computable
computor ; public Memoizer2(Computable
c) { this.computor = c; } @Override public V compute(A arg) throws InterruptedException { V result = cache.get(arg); if(result == null) { result = computor.compute(arg); cache.put(arg, result); } return result; } }

3、使用FutureTask实现真正意义上缓存

真正理想的状态是,线程1正在计算f(1),线程2 3也打算计算f(1),但他们知道最高效的方式是等线程1计算完后,再去查询缓存。而Memoizer2是多个线程并发判断是否f(1)结果存在,有则直接取结果。更加严格意义上的缓存必然是,线程判断f(1)是否有其他线程开始计算,如果还没有线程启动,那么就创建一个FutureTask并注册到Map中,然后启动计算。 通过构建Callable和Future的线程,不同于实现Runnable和继承Thread方式,在任务执行完成后就可以获取执行结果。Callable类似Runnabel接口,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行,ThreadPoolExecutor或ScheduledThreadPoolExecutor都实现了ExcutorService接口,而因此Callable需要和Executor框架中的ExcutorService结合使用,ExecutorService提供了submit提交一个实现Callable接口的任务,并且返回封装了异步计算结果的Future。FutureTask类除了实现了Future接口外还实现了Runnable接口,因此FutureTask也可以直接提交给Executor执行。 当然也可以调用线程直接执行(FutureTask.run())。最终可以通过Future对象.get() 判断异步计算的结果。

public class Memoizer3
implements Computable
{
private final Map
> cache = new ConcurrentHashMap<>(); private final Computable
computor; public Memoizer3(Computable
c) {
this.computor = c;} public V compute(final A arg) throws InterruptedException{
Future
future = cache.get(arg); if(future == null) { Callable
eval = new Callable
() { public V call() throws InterruptedException { return computor.compute(arg); } }; FutureTask
fTask = new FutureTask<>(eval); future = fTask; cache.put(arg, fTask); fTask.run();//调用一个新线程直接执行(FutureTask.run()) } try { return future.get(); //返回Callable中call方法结果 } catch (Exception e) { e.printStackTrace(); throw LaunderThrowable.launderThrowable(e.getCause()); } }}

Memoizer表现出非常好的并发性(基本上是因为发挥了ConcurrentHashMap高效的并发性),若计算结果计算出那么立即返回。如果其他两个线程正在计算该结果,那么新到的线程将一直等待这个结果被计算出来。时间相比Memoizer2时间Memoizer3已经缩短了很多。但是,仍然存在一个小缺陷,即在判断if(future == null)时,仍然可能有两个线程同时执行此处,判断f(1)时不在缓存中,将f(1)的Future放入缓存,再计算f(1)。因为判断是否将future是否存入缓存中的If代码块是非原子的“先检查再执行”,即当两个线程同时没有在缓存中找到期望的值,然后同一时间执行if(){}代码块。所以讲道理还没有严格实现判断其他线程是否同时开始计算。

进一步改进

public class Memoizer4
implements Computable
{
private final Map
> cache = new ConcurrentHashMap<>(); private final Computable
computor; public Memoizer4(Computable
c) {
this.computor = c;} public V compute(final A arg) throws InterruptedException{
while(true){
Future
future = cache.get(arg); if(future == null) { Callable
eval = new Callable
() { public V call() throws InterruptedException{ return computor.compute(arg); } }; FutureTask
fTask = new FutureTask<>(eval); future = cache.putIfAbsent(arg, fTask); if(future == null) { future = fTask; fTask.run();} } try { return future.get(); } catch (CancellationException e) { cache.remove(arg, future); throw LaunderThrowable.launderThrowable(e.getCause()); } catch (ExecutionException e) { e.printStackTrace(); throw LaunderThrowable.launderThrowable(e.getCause()); } } } }

转载地址:https://blog.csdn.net/weixin_38367817/article/details/103764614 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:五、结构化并发应用程序之任务执行(Executor)
下一篇:三、Java并发编程实战之对象的组合

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2024年04月14日 13时33分56秒