【JDK源码分析系列】ThreadPoolExecutor 源码解析 -- 基本属性分析
发布日期:2021-05-07 20:51:23 浏览次数:22 分类:原创文章

本文共 4965 字,大约阅读时间需要 16 分钟。

【JDK源码分析系列】ThreadPoolExecutor 源码解析 -- 基本属性分析


【1】ThreadPoolExecutor 的基本属性 -- 常量与变量


//ctl 线程池状态控制字段,由两部分组成://1:workerCount  有效的线程数,workerCount 最大是到(2^29)-1,大概 5 亿个线程//2:runState 线程池的状态,提供了生命周期的控制,源码中有很多关于状态的校验,状态枚举如下://RUNNING(-536870912):接受新任务或者处理队列里的任务;//SHUTDOWN(0):不接受新任务,但仍在处理已经在队列里面的任务;//STOP(536870912):不接受新任务,也不处理队列中的任务,对正在执行的任务进行中断;//TIDYING(1073741824): 所以任务都被中断,workerCount 是 0,整理状态;//TERMINATED(1610612736): terminated() 已经完成的时候;//runState 之间的转变过程://RUNNING -> SHUTDOWN:调用 shudown(),finalize()//(RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()//SHUTDOWN -> TIDYING -> workerCount ==0//STOP -> TIDYING -> workerCount ==0//TIDYING -> TERMINATED -> terminated() 执行完成之后private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;// 29private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// =(2^29)-1=536870911// Packing and unpacking ctl// 对 ctl 字段进行拆包与封装处理,获取其中的 workercount 和 runstateprivate static int ctlOf(int rs, int wc) { return rs | wc; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int runStateOf(int c)     { return c & ~CAPACITY; }// runState is stored in the high-order bits// runstate 的各种状态的定义值private static final int RUNNING    = -1 << COUNT_BITS;//-536870912private static final int SHUTDOWN   =  0 << COUNT_BITS;//0private static final int STOP       =  1 << COUNT_BITS;//536870912private static final int TIDYING    =  2 << COUNT_BITS;//1073741824private static final int TERMINATED =  3 << COUNT_BITS;//1610612736// 已完成任务的计数volatile long completedTasks;// 线程池最大容量private int largestPoolSize;// 已经完成的任务数private long completedTaskCount;// 用户可控制的参数都是 volatile 修饰的// 创建失败一般不抛出异常,只有在 OutOfMemoryError 时候才会// 可以使用 threadFactory 创建 threadprivate volatile ThreadFactory threadFactory;// 饱和或者运行中拒绝任务的 handler 处理类private volatile RejectedExecutionHandler handler;// 线程存活时间设置private volatile long keepAliveTime;// 设置 true 的话,核心线程空闲 keepAliveTime 时间后,也会被回收private volatile boolean allowCoreThreadTimeOut;// coreSize// 核心线程数量private volatile int corePoolSize;// maxSize 最大限制 (2^29)-1private volatile int maximumPoolSize;// 默认的拒绝策略private static final RejectedExecutionHandler defaultHandler =    new AbortPolicy();// 队列会 hold 住任务,并且利用队列的阻塞的特性,来保持线程的存活周期private final BlockingQueue<Runnable> workQueue;// 大多数情况下是控制对 workers 的访问权限// 可重入锁以及对应的条件变量private final ReentrantLock mainLock = new ReentrantLock();private final Condition termination = mainLock.newCondition();// 包含线程池中所有的工作线程private final HashSet<Worker> workers = new HashSet<Worker>();

【2】ThreadPoolExecutor 的基本属性 -- Worker 类


// 线程池中任务运行的最小单元// 维护着运行中的任务的线程锁和可中断状态// Worker 继承 AQS,具有锁功能// Worker 实现 Runnable,本身是一个可执行的任务//// 1. Worker 任务的代理,在线程池中,最小的执行单位就是 Worker,因此 Worker 实现了 Runnable 接口,实现了 run 方法;// 2. 在 Worker 初始化时 this.thread = getThreadFactory ().newThread (this) 这行代码//    把当前 Worker 作为线程的构造器入参,//    在后续的实现中会发现这样的代码:Thread t = w.thread;t.start (),//    此时的 w 是 Worker 的引用申明,此处 t.start 实际上执行的就是 Worker 的 run 方法;// 3. Worker 本身也实现了 AQS,所以其本身也是一个锁,其在执行任务的时候,会锁住自己,任务执行完成之后,会释放自己;private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable{    // 任务运行的线程    final Thread thread;    // 需要执行的任务    Runnable firstTask;    // 非常巧妙的设计,Worker本身是个 Runnable,把自己作为任务传递给 thread    // 内部有个属性又设置了 Runnable    Worker(Runnable firstTask) {        setState(-1); // inhibit interrupts until runWorker        this.firstTask = firstTask;        // 把 Worker 自己作为 thread 运行的任务        this.thread = getThreadFactory().newThread(this);    }    /** Worker 本身是 Runnable,run 方法是 Worker 执行的入口, runWorker 是外部的方法 */    public void run() {        runWorker(this);    }    private static final long serialVersionUID = 6138294804551838833L;    // Lock methods    // 0 代表没有锁住,1 代表锁住    protected boolean isHeldExclusively() {        return getState() != 0;    }    // 尝试加锁,CAS 赋值为 1,表示锁住    // 重写了 AbstractQueuedSynchronizer 类的 tryAcquire 方法    protected boolean tryAcquire(int unused) {        if (compareAndSetState(0, 1)) {            setExclusiveOwnerThread(Thread.currentThread());            return true;        }        return false;    }    // 尝试释放锁,释放锁没有 CAS 校验,可以任意的释放锁    // 重写了 AbstractQueuedSynchronizer 类的 tryRelease 方法    protected boolean tryRelease(int unused) {        setExclusiveOwnerThread(null);        setState(0);        return true;    }    //加锁方法    //调用了 AbstractQueuedSynchronizer 的 acquire 方法    public void lock()        { acquire(1); }    //尝试加锁方法    public boolean tryLock()  { return tryAcquire(1); }    //解锁方法    public void unlock()      { release(1); }    //判断是否已经获取锁    public boolean isLocked() { return isHeldExclusively(); }    //若线程已经启动则将该线程中断    void interruptIfStarted() {        Thread t;        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {            try {                t.interrupt();            } catch (SecurityException ignore) {            }        }    }}

参考致谢
本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。


【1】

上一篇:【JDK源码分析系列】ThreadPoolExecutor 源码解析 -- 任务提交与运行
下一篇:【JDK源码分析系列】FutureTask 源码解析

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2025年04月18日 23时21分17秒