
【JDK源码分析系列】FutureTask 源码解析
发布日期:2021-05-07 20:51:22
浏览次数:20
分类:原创文章
本文共 10866 字,大约阅读时间需要 36 分钟。
【JDK源码分析系列】FutureTask 源码解析
【1】Runnable
// Runnable : 无返回值新建任务的方式@FunctionalInterfacepublic interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run();}
【2】Callable
// Callable 是一个接口约定了线程要做的事情不过这个线程任务是有返回值的// 返回值是一个泛型,可以定义成任何类型,但使用时不会直接使用 Callable,// 而是会结合 FutureTask 一起使用@FunctionalInterfacepublic interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception;}
【3】Future
// 1. 定义了异步计算的接口,提供了计算是否完成的 check、等待完成和取回等多种方法// 2. 如果想得到结果可以使用 get 方法,此方法(无参方法)会一直阻塞到异步任务计算完成// 3. 取消可以使用 cancel 方法,但一旦任务计算完成,就无法被取消了public interface Future<V> { // 如果任务已经成功了,或已经取消了,是无法再取消的,会直接返回取消成功(true) // 如果任务还没有开始进行时,发起取消,是可以取消成功的 // 如果取消时,任务已经在运行了,mayInterruptIfRunning 为 true 的话,就可以打断运行中的线程 // mayInterruptIfRunning 为 false,表示不能打断直接返回 boolean cancel(boolean mayInterruptIfRunning); // 返回线程是否已经被取消了,true 表示已经被取消了 // 如果线程已经运行结束了,isCancelled 和 isDone 返回的都是 true boolean isCancelled(); // 线程是否已经运行结束了 boolean isDone(); // 等待结果返回 // 如果任务被取消了,抛 CancellationException 异常 // 如果等待过程中被打断了,抛 InterruptedException 异常 V get() throws InterruptedException, ExecutionException; // 等待,但是带有超时时间的,如果超时时间外仍然没有响应,抛 TimeoutException 异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}
【4】RunnableFuture
// RunnableFuture 接口的最大目的是让 Future 可以对 Runnable 进行管理public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run();}
【5】FutureTask
【5.1】FutureTask 的类定义
// FutureTask 可以当做是线程运行的具体任务,// FutureTask 实现了 RunnableFuture 接口// 而 RunnableFuture 又实现了 Runnable(FutureTask 本身就是个 Runnnable), // Future(FutureTask 具备对任务进行管理的功能) 两个接口// // FutureTask 就统一了 Callable 和 Runnablepublic class FutureTask<V> implements RunnableFuture<V>
【5.2】FutureTask 的属性
// 任务状态private volatile int state;private static final int NEW = 0;//线程任务创建private static final int COMPLETING = 1;//任务执行中private static final int NORMAL = 2;//任务执行结束private static final int EXCEPTIONAL = 3;//任务异常private static final int CANCELLED = 4;//任务取消成功private static final int INTERRUPTING = 5;//任务正在被打断中private static final int INTERRUPTED = 6;//任务被打断成功/** The underlying callable; nulled out after running */// Callable 是 FutureTask 的属性// FutureTask 具备了转化 Callable 和 Runnable 的功能private Callable<V> callable;/** The result to return or exception to throw from get() */// 异步线程返回的结果,读写锁保证了其线程安全private Object outcome; // non-volatile, protected by state reads/writes/** The thread running the callable; CASed during run() */// 当前任务所运行的线程private volatile Thread runner;/** Treiber stack of waiting threads */// 记录调用 get 方法时被等待的线程private volatile WaitNode waiters;
【5.3】FutureTask 的构造器
// 构造函数// 使用 Callable 进行初始化public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable}// 使用 Runnable 初始化,并返回 result 结果,一般来说 result 没有用的,置为 null 就好了public FutureTask(Runnable runnable, V result) { // callable 进行适配,把 runnable 适配成 RunnableAdapter,RunnableAdapter 实现了 callable // public static <T> Callable<T> callable(Runnable task, T result) { // if (task == null) // throw new NullPointerException(); // return new RunnableAdapter<T>(task, result); // } // Executors.callable 方法把 runnable 适配成 RunnableAdapter, // RunnableAdapter 实现了 callable 即将 runnable 直接适配成了 callable // RunnableAdapter 是 Executors 的内部静态类实现了 Callable 接口 this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable}
RunnableAdapter
// 转化 Runnable 成 Callable 的工具类// 1. 首先 RunnableAdapter 实现了 Callable,因此 RunnableAdapter 就是 Callable// 2. 其次 Runnable 是 RunnableAdapter 的一个属性,在构造 RunnableAdapter 的时候会传进来// 并且在 call 方法里面调用 Runnable 的 run 方法static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; }}
【5.4】FutureTask 的 get 方法
// get 方法虽然名字叫做 get,但却做了很多 wait 的事情,// 当发现任务还在进行中,没有完成时,就会阻塞当前进程,等待任务完成后再返回结果值//// 阻塞底层使用的是 LockSupport.park 方法,使当前线程进入 WAITING 或 TIMED_WAITING 状态public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { //确保时间单元非空 if (unit == null) throw new NullPointerException(); int s = state; // 如果任务已经在执行中了,并且等待一定的时间后,仍然在执行中,直接抛出异常 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); // 任务执行成功,返回执行的结果 return report(s);}
FutureTask -- private int awaitDone(boolean timed, long nanos)
// 等待任务执行完成private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 计算等待终止时间,如果一直等待的话,终止时间为 0 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; // 不排队 boolean queued = false; // 无限循环 for (;;) { // 如果线程已经被打断了,删除 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } // 当前任务状态 int s = state; // 当前任务已经执行完了,返回 if (s > COMPLETING) { // 当前任务的线程置空 if (q != null) q.thread = null; return s; } // 如果正在执行,当前线程让出 cpu,重新竞争,防止 cpu 飙高 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 如果第一次运行,新建 waitNode,当前线程就是 waitNode 的属性 else if (q == null) q = new WaitNode(); // 默认第一次都会执行这里,执行成功之后,queued 就为 true,就不会再执行了 // 把当前 waitNode 当做 waiters 链表的第一个 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果设置了超时时间,并过了超时时间的话,从 waiters 链表中删除当前 wait else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 没有过超时时间,线程进入 TIMED_WAITING 状态 LockSupport.parkNanos(this, nanos); } // 没有设置超时时间,进入 WAITING 状态 else LockSupport.park(this); }}
FutureTask -- private V report(int s) throws ExecutionException
// 返回已完成任务的结果或抛出异常@SuppressWarnings("unchecked")private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x);}
【5.5】FutureTask 的 run 方法
/** * run 方法可以直接被调用 * 也可以由线程池进行调用 * 如果需要开启子线程的话,只能走线程池,线程池会帮忙开启线程 */// 1. run 方法是没有返回值的,通过给 outcome 属性赋值 (set(result)),// get 时就能从 outcome 属性中拿到返回值// 2. FutureTask 两种构造器,最终都转化成了 Callable,// 因此在 run 方法执行的时候,只需要执行 Callable 的 call 方法即可,// 在执行 c.call() 代码时,// 如果入参是 Runnable 则调用路径为 c.call() -> RunnableAdapter.call() -> Runnable.run()// 如果入参是 Callable 则直接调用public void run() { // 状态不是任务创建或者当前任务已经有线程在执行了 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // callable 在 FutureTask 构造方法中初始化 Callable<V> c = callable; // Callable 不为空并且已经初始化完成 if (c != null && state == NEW) { V result; boolean ran; try { // 调用执行 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } // 给 outcome 赋值 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
FutureTask -- protected void setException(Throwable t)
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); }}
FutureTask -- private void finishCompletion()
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint}
【5.6】FutureTask 的 cancel 方法
// 设置obj对象中offset偏移地址对应的整型field的值为指定值// 这是一个有序或者有延迟的 putIntVolatile 方法,并且不保证值的改变被其他线程立即看到// 只有在field被 volatile 修饰并且期望被意外修改的时候使用才有用// Object obj : 包含需要修改field的对象// offset : obj 中整型field的偏移量// value : field将被设置的新值// public native void putOrderedInt(Object obj, long offset, int value);// 取消任务,如果正在运行,尝试去打断public boolean cancel(boolean mayInterruptIfRunning) { //任务状态不是创建 并且不能把 new 状态置为取消则直接返回 false if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; // 进行取消操作,打断可能会抛出异常,选择 try finally 的结构 try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state //状态设置成已打断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 清理线程 finishCompletion(); } return true;}
参考致谢
本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。
发表评论
最新留言
不错!
[***.144.177.141]2025年03月26日 21时25分30秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Java 对象引用方式 —— 强引用、软引用、弱引用和虚引用
2019-03-06
hadoop学习(四)----windows环境下安装hadoop
2019-03-06
Mybatis Generator最完整配置详解
2019-03-06
Tree--二叉树BinarySearchTree
2019-03-06
Elasticsearch集群升级指引
2019-03-06
webpack打包less与sass
2019-03-06
[白话解析] 深入浅出熵的概念 & 决策树之ID3算法
2019-03-06
[梁山好汉说IT] 梁山好汉和抢劫银行
2019-03-06
[记录点滴] OpenResty中Redis操作总结
2019-03-06
[源码阅读] 阿里SOFA服务注册中心MetaServer(3)
2019-03-06
[源码解析] 消息队列 Kombu 之 基本架构
2019-03-06
[源码分析] 消息队列 Kombu 之 启动过程
2019-03-06
[源码分析] 消息队列 Kombu 之 Consumer
2019-03-06
[源码分析] 消息队列 Kombu 之 Producer
2019-03-06
[源码分析] 消息队列 Kombu 之 mailbox
2019-03-06
抉择之苦
2019-03-06
kubernetes生产实践之mongodb
2019-03-06
wx.NET CLI wrapper for wxWidgets
2019-03-06