博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ThreadPoolExecutor源码解读
阅读量:3576 次
发布时间:2019-05-20

本文共 14547 字,大约阅读时间需要 48 分钟。

ThreadPoolExecuor是JDK中默认线程池的实现。下面我们对其进入深入剖析。首先来一张类图

其继承关系还是很清晰明了的,下面从Executor说起,Executor 这个接口只是定义了一个方法executor(Runnable r),用于向线程池提交一个任务。注意到一点这个方法的返回值还是空的。

public interface Executor {    void execute(Runnable command);}

ExecutorService 这个接口开始有了线程池的一些雏形,定义了一些线程池的基础方法如(状态相关,控制相关),其中比较重要的方法应该是submit(),并且会放回一个Future对象,用于获取执行结果(是否执行完毕)。同时还允许提交一个Callable对象,结合Future对象使用,可以真实获取执行的结果,弥补了Runable 对象执行完毕,无法获得结果的缺陷。后面会对Future及Callable进行深入分析。

public interface ExecutorService extends Executor {      /**      *  关闭空闲的worker 线程,不会影响执行中的任务      *      */    void shutdown();    /**     *  将所有执行中的线程置为中断,并返回未执行的任务     */    List
shutdownNow(); boolean isShutdown(); boolean isTerminated(); /** * 若为停止状态则直接返回,否则等待后再判断返回 */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /** * * 提交一个有返回结果的任务 */
Future
submit(Callable
task); /** * 提交一个Runnbale对象,以及一个结果 */
Future
submit(Runnable task, T result); /** * 不解释 */ Future
submit(Runnable task); /** * 批量执行 */
List
> invokeAll(Collection
> tasks) throws InterruptedException; /** * * 给点的时间内批量执行 */
List
> invokeAll(Collection
> tasks, long timeout, TimeUnit unit) throws InterruptedException; /** * 提交一堆任务。其中一个有返回结果就返回。 * */
T invokeAny(Collection
> tasks) throws InterruptedException, ExecutionException; /** * 给定的时间范围内,提交一堆任务,其中一个执行完且再时间范围内就返回,若超时则直接返回 */
T invokeAny(Collection
> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}

接下来再看看AbstractExecutorService 同样的套路都是实现一些公共的方法。比较关键的是实现了submit的那几个方法。我们来看看。还是挺清晰明了的,都是将任务封装为一个FutureTask 然后调用execute(Runable command)方法。FutureTask 是啥??我们后面再解释解释。

protected 
RunnableFuture
newTaskFor(Runnable runnable, T value) { return new FutureTask
(runnable, value); } protected
RunnableFuture
newTaskFor(Callable
callable) { return new FutureTask
(callable); } public Future
submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture
ftask = newTaskFor(task, null); execute(ftask); return ftask; } public
Future
submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture
ftask = newTaskFor(task, result); execute(ftask); return ftask; } public
Future
submit(Callable
task) { if (task == null) throw new NullPointerException(); RunnableFuture
ftask = newTaskFor(task); execute(ftask); return ftask;

 

接下来的分析就进入到重头戏了。ThreadPoolExecutor。先来看看主要的成员变量

/**      *  高三位标识线程池状态,低29位标识线程数量      *    * /    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));     /**       * 阻塞队列,存放任务       *       */     private final BlockingQueue
workQueue; /** * 锁对象 进行关键操作的时候进行上锁。 */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 存放封装工作线程的Worker */ private final HashSet
workers = new HashSet
(); /** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition(); /** * Tracks largest attained pool size. Accessed only under * mainLock. */ private int largestPoolSize; /** * Counter for completed tasks. Updated only on termination of * worker threads. Accessed only under mainLock. */ private long completedTaskCount; /** * 线程工厂 */ private volatile ThreadFactory threadFactory; /** * 拒绝策略 * Handler called when saturated or shutdown in execute. */ private volatile RejectedExecutionHandler handler; /** * 空闲存活时间 */ private volatile long keepAliveTime; /** * 核心线程数量 */ private volatile int corePoolSize; /** * 最大线程数量 */ private volatile int maximumPoolSize;

线程池的状态有以下几个,存保存在ctl的高三位(通过位移操作 左移表示*2,右移表示除以2)

private static final int RUNNING    = -1 << COUNT_BITS;    private static final int SHUTDOWN   =  0 << COUNT_BITS;    private static final int STOP       =  1 << COUNT_BITS;    private static final int TIDYING    =  2 << COUNT_BITS;    private static final int TERMINATED =  3 << COUNT_BITS;

 

接着分析重要方法

ThreadPoolExecutor的构造方法基本上是由以下几个参数组成的

public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue
workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

核心线程数,最大线程数,最大空闲时间,时间单位,阻塞队列,线程工厂,以及拒绝策略

常见的拒绝策略有 

CallerRunsPolicy (直接再提交线程中执行任务)
AbortPolicy (抛异常拒绝)
DiscardPolicy (直接丢弃)
DiscardOldestPolicy (放弃最老的任务,再次提交当前任务)

核心方法

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        int c = ctl.get();        //线程数量还没到达核心数量,创建新线程        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        // 线程池正在运行中,且成功将任务放进队列        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            // 再一次进行检查,如果非运行状态就删除任务,并拒绝            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                // 没有工作线程就添加一下                addWorker(null, false);        }        // 这里实际上是放不进阻塞队列了才去创建新的工作器,所以对阻塞队列的设置很关键了        // 要不然工作线程永远不会达到最大的线程数        else if (!addWorker(command, false))            // 未能成功创建线程,就拒绝任务            reject(command);    }

再来看看addWorker,addWorker 主要的任务就是创建新的工作线程,其中Worker是ThreadPoolExecutor的一个成员内部类。

我们先来看看这个类,这个类主要就是来执行任务的,所以呢其关键成员是持有一个线程对象,其次比较关键的是他集成了AQS,一个非常重要的类,下次再对这个类进行分析。

private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {        /**         * This class will never be serialized, but we provide a         * serialVersionUID to suppress a javac warning.         */        private static final long serialVersionUID = 6138294804551838833L;        /** 真正干活的线程 */        final Thread thread;        /** 该工作器的首个任务,初始化的时候赋予的. */        Runnable firstTask;        /** 干的任务的数量 */        volatile long completedTasks;        /**         * Creates with given first task and thread from ThreadFactory.         * @param firstTask the first task (null if none)         */        Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }        /** Delegates main run loop to outer runWorker  */        public void run() {            runWorker(this);        }        // Lock methods        //        // The value 0 represents the unlocked state.        // The value 1 represents the locked state.        protected boolean isHeldExclusively() {            return getState() != 0;        }        protected boolean tryAcquire(int unused) {            if (compareAndSetState(0, 1)) {                setExclusiveOwnerThread(Thread.currentThread());                return true;            }            return false;        }        protected boolean tryRelease(int unused) {            setExclusiveOwnerThread(null);            setState(0);            return true;        }        public void lock()        { acquire(1); }        public boolean tryLock()  { return tryAcquire(1); }        public void unlock()      { release(1); }        public boolean isLocked() { return isHeldExclusively(); }        void interruptIfStarted() {            Thread t;            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {                try {                    t.interrupt();                } catch (SecurityException ignore) {                }            }        }    }
private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // 线程是停止的且没任务传过来,不用创建新的工作器,直接放回            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;            for (;;) {                int wc = workerCountOf(c);                // 是否已经大于核心线程数或者最大线程数                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                    return false;                // cas 添加工作器数量 并跳出循环添加工作器                if (compareAndIncrementWorkerCount(c))                    break retry;                c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs)                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }        }        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            // 创建工作器            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                // 加锁操作                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int rs = runStateOf(ctl.get());                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    //启动线程                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }

线程启动了之后,就会去执行worker 的run方法。其run()实际上是调用ThreadPoolExecutor的runWork(worker)方法。

final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            // 循环去获取任务,获取不到这里会阻塞,这个方法也是挺关键的            while (task != null || (task = getTask()) != null) {                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    // 钩子方法,执行一些前置操作,这里是空实现                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                            // 同样是空实现                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    //任务数自增                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            // 跳出了循环,可能是线程池关闭。线程等待时间大于最大空闲时间的,关闭该工作线程            processWorkerExit(w, completedAbruptly);        }    }

再来分析一个重要的方法getTask(),为什么说其重要呢。因为它涉及到线程池的一个重要思想,当线程池>coreSize 时或者线程等待时间大于最大空闲时间时,会对线程进行回收操作。

private Runnable getTask() {        //是否超时的一个标志        boolean timedOut = false;         for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // 线程池关了。就减少工作器数量            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                decrementWorkerCount();                return null;            }            //当前的工作线程数量            int wc = workerCountOf(c);            // 一个标识,代表需部需要删线程 如等待时间超时了,有空闲线程之类的            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;            if ((wc > maximumPoolSize || (timed && timedOut))                && (wc > 1 || workQueue.isEmpty())) {                if (compareAndDecrementWorkerCount(c))                    return null;                continue;            }            try {                Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();                if (r != null)                    return r;                // 获取超时了。                timedOut = true;            } catch (InterruptedException retry) {                timedOut = false;            }        }    }

线程池状态演变图

 

继续来看看几个重要的接口及相关实现

Future:用来获取异步结果主要的方法应该是get()

public interface Future
{ boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}

Callable 能够获取结果的任务 

public interface Callable
{ V call() throws Exception;}

在之前我们说过无论提交的Runnbale还是Callable 最终转化为的都是FutureTask 。这个FutureTask是个什么呢?我们来看看类图

FutureTask 既实现了Runnable 接口也实现了Future接口。有一点需要注意的是在FutureTask中。Runnable 对象也是统一转化为Callable对象的。这是怎么转换的呢??看看代码就知道了。简单滴适配一下而已。下篇文章将对FutureTask进行深入解读

static final class RunnableAdapter
implements Callable
{ final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }

 

转载地址:http://fhagj.baihongyu.com/

你可能感兴趣的文章
金总公司的网络计划
查看>>
hibernate查询方式
查看>>
hibernate抓取策略(关联级别的延迟加载)
查看>>
DNS_PROBE_FINISHED_NXDOMAIN
查看>>
dom4j解析节点元素的crud和xpath
查看>>
初识Struts
查看>>
多线程打印A12Z34。。。
查看>>
strutsc踩过的坑
查看>>
maven安装和使用踩坑
查看>>
第一次紧张刺激的面试
查看>>
咕泡笔记导读篇
查看>>
eclipse安装maven和简单使用
查看>>
关于交往所思
查看>>
jdbc的封装
查看>>
数据库存入数据变为???
查看>>
实现数据库源的几种方式和开源数据源的使用
查看>>
元数据的获取和 数据库读写操作封装
查看>>
java文件的上传和下载(细节问题)
查看>>
DBUtils框架QueryRunner的 使用
查看>>
springMVC之controller笔记
查看>>