
本文共 9697 字,大约阅读时间需要 32 分钟。
一 特性
1 ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。
2 ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。 3 ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。 4 工作窃取二 原理
1 ForkJoinPool主要组成:
1)ForkJoinPool:是forkjoin框架里面的管理者,最原始的任务都要交给它才能处理。它负责控制整个forkjoin有多少个workerThread,workerThread的创建,激活都是由它来掌控。它还负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue。然后它把接到的活都交给workerThread去处理,它可以说是整个forkjoin的容器。
2)ForkJoinWorkerThread:forkjoin里面真正干活的"工人",是一个线程。里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。
3)ForkJoinPool.WorkQueue: 双端队列就是它,它负责存储接收的任务。
4)ForkJoinTask:是RecursiveAction与RecursiveTask的父类, ForkJoinTask中使用了模板模式进行设计,将ForkJoinTask的执行相关的代码进行隐藏,我们一般用它的两个子类RecursiveTask、RecursiveAction。这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑和任务的切分都集中在compute()方法里面。
ForkJoinTask的常用方法有:invokeAll方法:在fork/join模式中,我们在子任务中常常使用fork方法来让子任务采取异步方法执行,但是这不是高效的实现方法,尤其是对于forkjoinPool在线程有限的情况下,子任务直接使用fork方法执行时间比使用invokeAll执行时间要长。因为pool里面线程数量是固定的,那么调用子任务的fork方法相当于A先分工给B,然后A当监工不干活,B去完成A交代的任务。所以上面的模式相当于浪费了一个线程。那么如果使用invokeAll相当于A分工给B后,A和B都去完成工作。这样缩短了执行的时间。
join方法:用于让当前线程阻塞,直到对应的子任务完成运行并返回执行结果。或者,如果这个子任务存在于当前线程的任务等待队列(work queue)中,则取出这个子任务进行“递归”执行。其目的是尽快得到当前子任务的运行结果,然后继续执行。
ForkJoinPool执行任务的方法有:
execute(ForkJoinTask) :异步执行tasks,无返回值。
invoke(ForkJoinTask) :有Join, tasks会被同步到主进程。
submit(ForkJoinTask) :异步执行,且带Task返回值,可通过task.get 实现同步到主线程。
2 fork() 和 join() 的作用
fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
join():等待该任务的处理线程处理完毕,获得返回值。 疑问:当任务分解得越来越细时,所需要的线程数就会越来越多,而且大部分线程处于等待状态?但是如果我们在上面的示例代码加入以下代码
System.out.println(pool.getPoolSize());
这会显示当前线程池的大小,在我的机器上这个值是4,也就是说只有4个工作线程。甚至即使我们在初始化 pool 时指定所使用的线程数为1时,上述程序也没有任何问题——除了变成了一个串行程序以外。这个矛盾可以导出,我们的假设是错误的,并不是每个 fork() 都会促成一个新线程被创建,而每个 join() 也不是一定会造成线程被阻塞。Fork/Join Framework 的实现算法并不是那么“显然”,而是一个更加复杂的算法——这个算法的名字就叫做work stealing 算法。
ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。3 submit() 和 fork()
submit() 和 fork() 其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操作)。submitting queue 和其他 work queue 一样,是工作线程”窃取“的对象,因此当其中的任务被一个工作线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。
5 ForkJoinPool common 如何配置
private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = defaultForkJoinWorkerThreadFactory; else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }
5.1parallelism
parallelism(即配置线程池个数)
可以通过java.util.concurrent.ForkJoinPool.common.parallelism进行配置,最大值不能超过MAX_CAP,即32767. static final int MAX_CAP = 0x7fff; //32767 1 parallelism :默认值 Runtime.getRuntime().availableProcessors() - 1 自定义:代码指定(必须得在commonPool初始化之前注入进去,否则无法生效) System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “8”); // 或者启动参数指定 -Djava.util.concurrent.ForkJoinPool.common.parallelism=85.2 threadFactory:
默认为defaultForkJoinWorkerThreadFactory,没有securityManager的话。
5.3 exceptionHandler
如果没有设置,默认为null
5.4 asyncMode:异步模式,控制是FIFO还是LIFO
ForkJoin框架中为每一个独立工作的线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。asyncMode确定队列中待执行的任务的工作模式,当asyncMode设置为ture的时候,队列采用先进先出方式工作;反之则是采用后进先出的方式工作,该值默认为false。WorkQueue:
ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。 queue capacity:队列容量三 源码解析
1 ForkJoinTask
//任务状态,初始值为0volatile int status; // accessed directly by pool and workers// 任务状态的掩码static final int DONE_MASK = 0xf0000000; // 正常状态,负数,标识任务已经完成static final int NORMAL = 0xf0000000; // 任务取消,非负,= 1<<16,有其他任务依赖当前任务,任务结束前,通知其他任务join当前任务的结果。static final int SIGNAL = 0x00010000; // 低位掩码static final int SMASK = 0x0000ffff;
主要方法实现
ForkJoinTask的主要方法有异步执行方法fork(),public final ForkJoinTaskfork() { Thread t; //如果线程类型为ForkJoinWorkerThread,则将任务推入workQueue进行处理 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); //否则,交由ForkJoinPool的common线程池进行处理 else ForkJoinPool.common.externalPush(this); return this;}
获取结果方法join()
public final V join() { int s; //调用doJoin()进行任务的执行,若任务结果为非正常完成,则根据状态抛出不同的异常, //如若状态为CANCELLED,则抛出CancellationException(),异常; //若状态为EXCEPTIONAL,则抛出包装后的异常 if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult();}private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; //1、若任务状态为正常完成(status < 0),则返回任务的正常完成状态; //2、若执行任务的当前线程类型为ForkJoinWorkerThread,且将任务从线程的工作队列中移除成功, //则调用doExec()执行任务,若任务执行状态为正常结束,则返回状态,否则awaitJoin()等待任务结束。 //3、否则调用externalAwaitDone()等待任务执行完成。 return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone();}final int doExec() { int s; boolean completed; //任务未完成? if ((s = status) >= 0) { try { //执行任务 completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } //设置任务状态为正常完成 if (completed) s = setCompletion(NORMAL); } return s;}private int externalAwaitDone() { //任务处理 int s = ((this instanceof CountedCompleter) ? // try helping ForkJoinPool.common.externalHelpComplete( (CountedCompleter )this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) { boolean interrupted = false; do { //等待任务完成 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { try { wait(0L); } catch (InterruptedException ie) { interrupted = true; } } //任务完成后通知其他依赖的任务 else notifyAll(); } } } while ((s = status) >= 0); if (interrupted) Thread.currentThread().interrupt(); } return s;}
2 ForkJoinWorkerThread
3 WorkQueue
scanState:如果WorkQueue没有属于自己的owner(下标为偶数的都没有),该值为 inactive 也就是一个负数;如果有自己的owner,该值的初始值为其在WorkQueue[]数组中的下标,也肯定是个奇数;如果这个值,变成了偶数,说明该队列所属的Thread正在执行Task。
stackPred:前任池(WorkQueue[])索引,由此构成一个栈; config:index | mode。 如果下标为偶数的WorkQueue,则其mode是共享类型。如果有自己的owner 默认是 LIFO; qlock: 锁标识,在多线程往队列中添加数据,会有竞争,使用此标识抢占锁。1: locked, < 0: terminate; else 0 base:worker steal的偏移量,因为其他的线程都可以偷该队列的任务,所有base使用volatile标识。 top:owner执行任务的偏移量。 parker:如果 owner 挂起,则使用该变量做记录挂起owner的线程。 currentJoin:当前正在join等待结果的任务。 currentSteal:当前执行的任务是steal过来的任务,该变量做记录。4 ForkJoinPool
四 工作窃取模式优点
使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?
使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。总结
在了解了 Fork/Join Framework 的工作原理之后,相信很多使用上的注意事项就可以从原理中找到原因。例如:为什么在 ForkJoinTask 里最好不要存在 I/O 等会阻塞线程的行为?,这个各位读者可以思考思考了。
还有一些延伸阅读的内容,在此仅提及一下: ForkJoinPool 有一个 Async Mode ,效果是工作线程在处理本地任务时也使用 FIFO 顺序。这种模式下的 ForkJoinPool 更接近于是一个消息队列,而不是用来处理递归式的任务。 在需要阻塞工作线程时,可以使用 ManagedBlocker Java 1.8 新增加的 CompletableFuture 类可以实现类似于 Javascript 的 promise-chain,内部就是使用 ForkJoinPool 来实现的参考:
1 https://blog.csdn.net/m0_37542889/article/details/92640903
2 https://segmentfault.com/a/1190000021345819 3 https://www.cnblogs.com/suxuan/p/4970498.html 4 https://www.jianshu.com/p/577cd6dc05b6发表评论
最新留言
关于作者
