线程池体系(二)-- ForkJoinPool
发布日期:2021-05-06 17:45:27 浏览次数:15 分类:技术文章

本文共 9697 字,大约阅读时间需要 32 分钟。

一 特性

1 ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。

2 ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等
3 ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。
4 工作窃取

二 原理

1 ForkJoinPool主要组成:

1)ForkJoinPool:是forkjoin框架里面的管理者,最原始的任务都要交给它才能处理。它负责控制整个forkjoin有多少个workerThread,workerThread的创建,激活都是由它来掌控。它还负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue。然后它把接到的活都交给workerThread去处理,它可以说是整个forkjoin的容器。

2)ForkJoinWorkerThread:forkjoin里面真正干活的"工人",是一个线程。里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。

3)ForkJoinPool.WorkQueue: 双端队列就是它,它负责存储接收的任务。

4)ForkJoinTask:是RecursiveAction与RecursiveTask的父类, ForkJoinTask中使用了模板模式进行设计,将ForkJoinTask的执行相关的代码进行隐藏,我们一般用它的两个子类RecursiveTask、RecursiveAction。这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑和任务的切分都集中在compute()方法里面。

ForkJoinTask的常用方法有:

invokeAll方法:在fork/join模式中,我们在子任务中常常使用fork方法来让子任务采取异步方法执行,但是这不是高效的实现方法,尤其是对于forkjoinPool在线程有限的情况下,子任务直接使用fork方法执行时间比使用invokeAll执行时间要长。因为pool里面线程数量是固定的,那么调用子任务的fork方法相当于A先分工给B,然后A当监工不干活,B去完成A交代的任务。所以上面的模式相当于浪费了一个线程。那么如果使用invokeAll相当于A分工给B后,A和B都去完成工作。这样缩短了执行的时间。

join方法:用于让当前线程阻塞,直到对应的子任务完成运行并返回执行结果。或者,如果这个子任务存在于当前线程的任务等待队列(work queue)中,则取出这个子任务进行“递归”执行。其目的是尽快得到当前子任务的运行结果,然后继续执行。

ForkJoinPool执行任务的方法有:

execute(ForkJoinTask) :异步执行tasks,无返回值。

invoke(ForkJoinTask) :有Join, tasks会被同步到主进程。

submit(ForkJoinTask) :异步执行,且带Task返回值,可通过task.get 实现同步到主线程。

2 fork() 和 join() 的作用

fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。

join():等待该任务的处理线程处理完毕,获得返回值。
疑问:当任务分解得越来越细时,所需要的线程数就会越来越多,而且大部分线程处于等待状态?

但是如果我们在上面的示例代码加入以下代码

System.out.println(pool.getPoolSize());

这会显示当前线程池的大小,在我的机器上这个值是4,也就是说只有4个工作线程。甚至即使我们在初始化 pool 时指定所使用的线程数为1时,上述程序也没有任何问题——除了变成了一个串行程序以外。

这个矛盾可以导出,我们的假设是错误的,并不是每个 fork() 都会促成一个新线程被创建,而每个 join() 也不是一定会造成线程被阻塞。Fork/Join Framework 的实现算法并不是那么“显然”,而是一个更加复杂的算法——这个算法的名字就叫做work stealing 算法

ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。

每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
在既没有自己的任务,也没有可以窃取的任务时,进入休眠。

3 submit() 和 fork()

submit() 和 fork() 其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操作)。submitting queue 和其他 work queue 一样,是工作线程”窃取“的对象,因此当其中的任务被一个工作线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。

5 ForkJoinPool common 如何配置

private static ForkJoinPool makeCommonPool() {           int parallelism = -1;        ForkJoinWorkerThreadFactory factory = null;        UncaughtExceptionHandler handler = null;        try {     // ignore exceptions in accessing/parsing properties            String pp = System.getProperty                ("java.util.concurrent.ForkJoinPool.common.parallelism");            String fp = System.getProperty                ("java.util.concurrent.ForkJoinPool.common.threadFactory");            String hp = System.getProperty                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");            if (pp != null)                parallelism = Integer.parseInt(pp);            if (fp != null)                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.                           getSystemClassLoader().loadClass(fp).newInstance());            if (hp != null)                handler = ((UncaughtExceptionHandler)ClassLoader.                           getSystemClassLoader().loadClass(hp).newInstance());        } catch (Exception ignore) {           }        if (factory == null) {               if (System.getSecurityManager() == null)                factory = defaultForkJoinWorkerThreadFactory;            else // use security-managed default                factory = new InnocuousForkJoinWorkerThreadFactory();        }        if (parallelism < 0 && // default 1 less than #cores            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)            parallelism = 1;        if (parallelism > MAX_CAP)            parallelism = MAX_CAP;        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,                                "ForkJoinPool.commonPool-worker-");    }

5.1parallelism

parallelism(即配置线程池个数)

可以通过java.util.concurrent.ForkJoinPool.common.parallelism进行配置,最大值不能超过MAX_CAP,即32767.
static final int MAX_CAP = 0x7fff; //32767
1 parallelism :默认值 Runtime.getRuntime().availableProcessors() - 1
自定义:代码指定(必须得在commonPool初始化之前注入进去,否则无法生效)
System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “8”);
// 或者启动参数指定
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8

5.2 threadFactory:

默认为defaultForkJoinWorkerThreadFactory,没有securityManager的话。

5.3 exceptionHandler

如果没有设置,默认为null

5.4 asyncMode:异步模式,控制是FIFO还是LIFO

ForkJoin框架中为每一个独立工作的线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。asyncMode确定队列中待执行的任务的工作模式,当asyncMode设置为ture的时候,队列采用先进先出方式工作;反之则是采用后进先出的方式工作,该值默认为false。WorkQueue:

ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
queue capacity:队列容量

三 源码解析

1 ForkJoinTask

//任务状态,初始值为0volatile int status; // accessed directly by pool and workers// 任务状态的掩码static final int DONE_MASK   = 0xf0000000;  // 正常状态,负数,标识任务已经完成static final int NORMAL      = 0xf0000000;  // 任务取消,非负,
= 1<<16,有其他任务依赖当前任务,任务结束前,通知其他任务join当前任务的结果。static final int SIGNAL = 0x00010000; // 低位掩码static final int SMASK = 0x0000ffff;

主要方法实现

ForkJoinTask的主要方法有异步执行方法fork(),

public final ForkJoinTask
fork() { Thread t; //如果线程类型为ForkJoinWorkerThread,则将任务推入workQueue进行处理 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); //否则,交由ForkJoinPool的common线程池进行处理 else ForkJoinPool.common.externalPush(this); return this;}

获取结果方法join()

public final V join() {       int s;    //调用doJoin()进行任务的执行,若任务结果为非正常完成,则根据状态抛出不同的异常,    //如若状态为CANCELLED,则抛出CancellationException(),异常;    //若状态为EXCEPTIONAL,则抛出包装后的异常    if ((s = doJoin() & DONE_MASK) != NORMAL)        reportException(s);    return getRawResult();}private int doJoin() {       int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;    //1、若任务状态为正常完成(status < 0),则返回任务的正常完成状态;    //2、若执行任务的当前线程类型为ForkJoinWorkerThread,且将任务从线程的工作队列中移除成功,    //则调用doExec()执行任务,若任务执行状态为正常结束,则返回状态,否则awaitJoin()等待任务结束。    //3、否则调用externalAwaitDone()等待任务执行完成。    return (s = status) < 0 ? s :        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?        (w = (wt = (ForkJoinWorkerThread)t).workQueue).        tryUnpush(this) && (s = doExec()) < 0 ? s :        wt.pool.awaitJoin(w, this, 0L) :        externalAwaitDone();}final int doExec() {       int s; boolean completed;    //任务未完成?    if ((s = status) >= 0) {           try {               //执行任务            completed = exec();        } catch (Throwable rex) {               return setExceptionalCompletion(rex);        }        //设置任务状态为正常完成        if (completed)            s = setCompletion(NORMAL);    }    return s;}private int externalAwaitDone() {       //任务处理    int s = ((this instanceof CountedCompleter) ? // try helping             ForkJoinPool.common.externalHelpComplete(                 (CountedCompleter
)this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) { boolean interrupted = false; do { //等待任务完成 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { try { wait(0L); } catch (InterruptedException ie) { interrupted = true; } } //任务完成后通知其他依赖的任务 else notifyAll(); } } } while ((s = status) >= 0); if (interrupted) Thread.currentThread().interrupt(); } return s;}

2 ForkJoinWorkerThread

3 WorkQueue

scanState:如果WorkQueue没有属于自己的owner(下标为偶数的都没有),该值为 inactive 也就是一个负数;如果有自己的owner,该值的初始值为其在WorkQueue[]数组中的下标,也肯定是个奇数;如果这个值,变成了偶数,说明该队列所属的Thread正在执行Task。

stackPred:前任池(WorkQueue[])索引,由此构成一个栈;
config:index | mode。 如果下标为偶数的WorkQueue,则其mode是共享类型。如果有自己的owner 默认是 LIFO;
qlock: 锁标识,在多线程往队列中添加数据,会有竞争,使用此标识抢占锁。1: locked, < 0: terminate; else 0
base:worker steal的偏移量,因为其他的线程都可以偷该队列的任务,所有base使用volatile标识。
top:owner执行任务的偏移量。
parker:如果 owner 挂起,则使用该变量做记录挂起owner的线程。
currentJoin:当前正在join等待结果的任务。
currentSteal:当前执行的任务是steal过来的任务,该变量做记录。

4 ForkJoinPool

四 工作窃取模式优点

使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?

使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。

总结

在了解了 Fork/Join Framework 的工作原理之后,相信很多使用上的注意事项就可以从原理中找到原因。例如:为什么在 ForkJoinTask 里最好不要存在 I/O 等会阻塞线程的行为?,这个各位读者可以思考思考了。

还有一些延伸阅读的内容,在此仅提及一下:
ForkJoinPool 有一个 Async Mode ,效果是工作线程在处理本地任务时也使用 FIFO 顺序。这种模式下的 ForkJoinPool 更接近于是一个消息队列,而不是用来处理递归式的任务。
在需要阻塞工作线程时,可以使用 ManagedBlocker
Java 1.8 新增加的 CompletableFuture 类可以实现类似于 Javascript 的 promise-chain,内部就是使用 ForkJoinPool 来实现的

参考:

1 https://blog.csdn.net/m0_37542889/article/details/92640903

2 https://segmentfault.com/a/1190000021345819
3 https://www.cnblogs.com/suxuan/p/4970498.html
4 https://www.jianshu.com/p/577cd6dc05b6

上一篇:ThreadLocal
下一篇:Kafka消息保留-清理策略

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2025年04月06日 16时57分32秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章