
【JDK源码分析系列】ThreadPoolExecutor 源码解析 -- 基本属性分析
发布日期:2021-05-07 20:51:23
浏览次数:22
分类:原创文章
本文共 4965 字,大约阅读时间需要 16 分钟。
【JDK源码分析系列】ThreadPoolExecutor 源码解析 -- 基本属性分析
【1】ThreadPoolExecutor 的基本属性 -- 常量与变量
//ctl 线程池状态控制字段,由两部分组成://1:workerCount 有效的线程数,workerCount 最大是到(2^29)-1,大概 5 亿个线程//2:runState 线程池的状态,提供了生命周期的控制,源码中有很多关于状态的校验,状态枚举如下://RUNNING(-536870912):接受新任务或者处理队列里的任务;//SHUTDOWN(0):不接受新任务,但仍在处理已经在队列里面的任务;//STOP(536870912):不接受新任务,也不处理队列中的任务,对正在执行的任务进行中断;//TIDYING(1073741824): 所以任务都被中断,workerCount 是 0,整理状态;//TERMINATED(1610612736): terminated() 已经完成的时候;//runState 之间的转变过程://RUNNING -> SHUTDOWN:调用 shudown(),finalize()//(RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()//SHUTDOWN -> TIDYING -> workerCount ==0//STOP -> TIDYING -> workerCount ==0//TIDYING -> TERMINATED -> terminated() 执行完成之后private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;// 29private static final int CAPACITY = (1 << COUNT_BITS) - 1;// =(2^29)-1=536870911// Packing and unpacking ctl// 对 ctl 字段进行拆包与封装处理,获取其中的 workercount 和 runstateprivate static int ctlOf(int rs, int wc) { return rs | wc; }private static int workerCountOf(int c) { return c & CAPACITY; }private static int runStateOf(int c) { return c & ~CAPACITY; }// runState is stored in the high-order bits// runstate 的各种状态的定义值private static final int RUNNING = -1 << COUNT_BITS;//-536870912private static final int SHUTDOWN = 0 << COUNT_BITS;//0private static final int STOP = 1 << COUNT_BITS;//536870912private static final int TIDYING = 2 << COUNT_BITS;//1073741824private static final int TERMINATED = 3 << COUNT_BITS;//1610612736// 已完成任务的计数volatile long completedTasks;// 线程池最大容量private int largestPoolSize;// 已经完成的任务数private long completedTaskCount;// 用户可控制的参数都是 volatile 修饰的// 创建失败一般不抛出异常,只有在 OutOfMemoryError 时候才会// 可以使用 threadFactory 创建 threadprivate volatile ThreadFactory threadFactory;// 饱和或者运行中拒绝任务的 handler 处理类private volatile RejectedExecutionHandler handler;// 线程存活时间设置private volatile long keepAliveTime;// 设置 true 的话,核心线程空闲 keepAliveTime 时间后,也会被回收private volatile boolean allowCoreThreadTimeOut;// coreSize// 核心线程数量private volatile int corePoolSize;// maxSize 最大限制 (2^29)-1private volatile int maximumPoolSize;// 默认的拒绝策略private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();// 队列会 hold 住任务,并且利用队列的阻塞的特性,来保持线程的存活周期private final BlockingQueue<Runnable> workQueue;// 大多数情况下是控制对 workers 的访问权限// 可重入锁以及对应的条件变量private final ReentrantLock mainLock = new ReentrantLock();private final Condition termination = mainLock.newCondition();// 包含线程池中所有的工作线程private final HashSet<Worker> workers = new HashSet<Worker>();
【2】ThreadPoolExecutor 的基本属性 -- Worker 类
// 线程池中任务运行的最小单元// 维护着运行中的任务的线程锁和可中断状态// Worker 继承 AQS,具有锁功能// Worker 实现 Runnable,本身是一个可执行的任务//// 1. Worker 任务的代理,在线程池中,最小的执行单位就是 Worker,因此 Worker 实现了 Runnable 接口,实现了 run 方法;// 2. 在 Worker 初始化时 this.thread = getThreadFactory ().newThread (this) 这行代码// 把当前 Worker 作为线程的构造器入参,// 在后续的实现中会发现这样的代码:Thread t = w.thread;t.start (),// 此时的 w 是 Worker 的引用申明,此处 t.start 实际上执行的就是 Worker 的 run 方法;// 3. Worker 本身也实现了 AQS,所以其本身也是一个锁,其在执行任务的时候,会锁住自己,任务执行完成之后,会释放自己;private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ // 任务运行的线程 final Thread thread; // 需要执行的任务 Runnable firstTask; // 非常巧妙的设计,Worker本身是个 Runnable,把自己作为任务传递给 thread // 内部有个属性又设置了 Runnable Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 把 Worker 自己作为 thread 运行的任务 this.thread = getThreadFactory().newThread(this); } /** Worker 本身是 Runnable,run 方法是 Worker 执行的入口, runWorker 是外部的方法 */ public void run() { runWorker(this); } private static final long serialVersionUID = 6138294804551838833L; // Lock methods // 0 代表没有锁住,1 代表锁住 protected boolean isHeldExclusively() { return getState() != 0; } // 尝试加锁,CAS 赋值为 1,表示锁住 // 重写了 AbstractQueuedSynchronizer 类的 tryAcquire 方法 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 尝试释放锁,释放锁没有 CAS 校验,可以任意的释放锁 // 重写了 AbstractQueuedSynchronizer 类的 tryRelease 方法 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } //加锁方法 //调用了 AbstractQueuedSynchronizer 的 acquire 方法 public void lock() { acquire(1); } //尝试加锁方法 public boolean tryLock() { return tryAcquire(1); } //解锁方法 public void unlock() { release(1); } //判断是否已经获取锁 public boolean isLocked() { return isHeldExclusively(); } //若线程已经启动则将该线程中断 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }}
参考致谢
本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。
【1】