ThreadPoolExecutor 源码解析

问题

抱着问题,看源码,不至于让自己在看源码时懵逼,看的同时参考网上的源码分析,加深自己的理解

  1. 线程池的核心线程数,最大线程数,队列间的关系?
  2. 设置核心线程数,能大于最大线程数么?
  3. 释放线程时,怎样才能释放核心线程?
  4. 线程池为什么能维持线程不释放,随时运行各种任务?

简介

线程池常用的创建方法有那么几种:

newFixedThreadPool()
newSingleThreadExecutor()
newCachedThreadPool()
newScheduledThreadPool()

线程池的创建方式,是装饰模式的一种体现,底层封装ThreadPoolExecutor这个类
在阿里巴巴开放的代码规范里,因为线程池默认使用无界队列,会导致线程无限增长导致应用oom,所以要显示的创建 ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler);
    }

参数说明

  1. corePoolSize 代表核心线程池的个数,当线程池当前的个数大于核心线程池的时候,线程池会回收多出来的线程
  2. maximumPoolSize 代表最大的线程池个数,当线程池需要执行的任务大于核心线程池的时候,会创建更多的线程,但是最大不能超过这个数
  3. keepAliveTime 代表空余的线程存活的时间,当多余的线程完成任务的时候,需要多长时间进行回收,时间单位是unit 去控制
  4. workQueue 非常重要,这个工作队列会存放所有待执行的Runnable对象ArrayBlockingQueue:有界队列。是一种基于数组的有界阻塞队列LinkedBlockingQueue:无界队列。是一个基于链表的阻塞队列,吞吐量通常要高于ArrayBlockingQueue,newSingleThreadExecutor,newFixedThreadPool默认队列SynchronousQueue:直接提交。是一种不存储元素的阻塞队列,每个插入操作必须等另一个线程调用移除操作,否则插入操作一直处于阻塞状态。newCachedThreadPool使用了这个队列PriorityBlockingQueue:是一种具有优先权的阻塞队列,优先级大的任务可以先执行,用户由此可以控制任务的执行顺序DelayedWorkQueue:优先级队列,newScheduledThreadPool默认这个队列
  5. threadFactory 创建线程时,使用的工厂DefaultThreadFactory 默认,创建一个同线程组且默认优先级的线程PrivilegedThreadFactory,使用访问权限创建一个权限控制的线程
  6. handler 线程池满时,提供的拒绝策略AbortPolicy:默认,抛出异常 "Task r.toString() rejected from e.toString()"DiscardPolicy:不抛异常,直接丢弃当前线程DiscardOldestPolicy::如果线程池没有中止,移除队列第一个任务,再执行当前任务CallerRunsPolicy:如果线程池没有中止,直接用调用者的线程资源执行任务

执行过程

  1. 判断是否大于核心线程数,小于,创建线程
  2. 超过核心线程数,判断队列是否满,不满,创建线程
  3. 队列已满,判断是否超过最大线程数,没有超过,创建线程
  4. 超过最大线程数,执行拒绝策略

类说明

/**
 * An {@link ExecutorService} that executes each submitted task using
 * one of possibly several pooled threads, normally configured
 * using {@link Executors} factory methods.
 *
 * 线程池主要解决两个问题:在执行大量异步线程时改善性能,应付减少每个任务的调用开销,
 * 并且提供了一种限制和管理资源(包含线程)的方法。ThreadPoolExecutor提供了一些基本的统计,
 * 比如完成任务的数量。
 * 
 * 

Thread pools address two different problems: they usually * provide improved performance when executing large numbers of * asynchronous tasks, due to reduced per-task invocation overhead, * and they provide a means of bounding and managing the resources, * including threads, consumed when executing a collection of tasks. * Each {@code ThreadPoolExecutor} also maintains some basic * statistics, such as the number of completed tasks. * * 在大部分的上下文场景里,这个类提供了很多可调整的参数和可扩展的hook。不管怎样,开发者可以更方便的创建无界线可回收的线程池,固定的线程池,单个后台线程,这些都是为大部分应用场景设置的默认线程池配置。 * 如果以上线程方法不足以满足应用场景,可以手动配置和调校线程池ThreadPoolExecutor *

To be useful across a wide range of contexts, this class * provides many adjustable parameters and extensibility * hooks. However, programmers are urged to use the more convenient * {@link Executors} factory methods {@link * Executors#newCachedThreadPool} (unbounded thread pool, with * automatic thread reclamation), {@link Executors#newFixedThreadPool} * (fixed size thread pool) and {@link * Executors#newSingleThreadExecutor} (single background thread), that * preconfigure settings for the most common usage * scenarios. Otherwise, use the following guide when manually * configuring and tuning this class: * *

* 核心和最大线程池数量 *
Core and maximum pool sizes
* * 线程池执行器将会根据核心线程池数量和最大线程池数量自动地调整线程池大小。 *
A {@code ThreadPoolExecutor} will automatically adjust the * pool size (see {@link #getPoolSize}) * according to the bounds set by * corePoolSize (see {@link #getCorePoolSize}) and * maximumPoolSize (see {@link #getMaximumPoolSize}). * * 当一个新的线程提交到方法里,如果当前执行的线程小于核心线程数,会根据请求新建一个线程,即便其他线程仍旧闲置。 * 如果多于核心线程数但小于最大线程数线程在执行。一个新线程将被创建,直到队列满。 * 如果需要设置maximumPoolSize为无界的,比如Integer.MAX_VALUE,那么将允许线程池容纳任意数量的任务并发执行。 * 在典型的场景中,corePoolSize和maximumPoolSize 仅仅在构造中设置,但是我们也可以动态的调整用#setCorePoolSize和#setMaximumPoolSize函数 * When a new task is submitted in method {@link #execute(Runnable)}, * and fewer than corePoolSize threads are running, a new thread is * created to handle the request, even if other worker threads are * idle. If there are more than corePoolSize but less than * maximumPoolSize threads running, a new thread will be created only * if the queue is full. By setting corePoolSize and maximumPoolSize * the same, you create a fixed-size thread pool. By setting * maximumPoolSize to an essentially unbounded value such as {@code * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary * number of concurrent tasks. Most typically, core and maximum pool * sizes are set only upon construction, but they may also be changed * dynamically using {@link #setCorePoolSize} and {@link * #setMaximumPoolSize}.
* *
On-demand construction
* * *
By default, even core threads are initially created and * started only when new tasks arrive, but this can be overridden * dynamically using method {@link #prestartCoreThread} or {@link * #prestartAllCoreThreads}. You probably want to prestart threads if * you construct the pool with a non-empty queue.
* *
Creating new threads
* *
New threads are created using a {@link ThreadFactory}. If not * otherwise specified, a {@link Executors#defaultThreadFactory} is * used, that creates threads to all be in the same {@link * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and * non-daemon status. By supplying a different ThreadFactory, you can * alter the thread's name, thread group, priority, daemon status, * etc. If a {@code ThreadFactory} fails to create a thread when asked * by returning null from {@code newThread}, the executor will * continue, but might not be able to execute any tasks. Threads * should possess the "modifyThread" {@code RuntimePermission}. If * worker threads or other threads using the pool do not possess this * permission, service may be degraded: configuration changes may not * take effect in a timely manner, and a shutdown pool may remain in a * state in which termination is possible but not completed.
* *
Keep-alive times
* * 如果当前线程数大于核心线程数,那些超过keepAliveTime时间的限制线程将会被终止。 * 当线程池没有充分利用的情况下, 此策略可以减少资源的消耗 * 如果线程池变得活跃,新的线程将会被构造。 * 我们可以用setKeepAliveTime 参数动态改变,用一个 Long.MAX_VALU,TimeUnit#NANOSECONDS 作为保活时间,那么空闲的线程可以避免在线程池关闭之前被终止。 * 保活策略只有在当前线程池线程数量大于 核心线程池数量时,才起作用。 * #allowCoreThreadTimeOut用于控制当任务线程空闲时,是否允许线程等待 keepAliveTime时间,以便在这个过程中,有新的任务进来 *
If the pool currently has more than corePoolSize threads, * excess threads will be terminated if they have been idle for more * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}). * This provides a means of reducing resource consumption when the * pool is not being actively used. If the pool becomes more active * later, new threads will be constructed. This parameter can also be * changed dynamically using method {@link #setKeepAliveTime(long, * TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link * TimeUnit#NANOSECONDS} effectively disables idle threads from ever * terminating prior to shut down. By default, the keep-alive policy * applies only when there are more than corePoolSize threads. But * method {@link #allowCoreThreadTimeOut(boolean)} can be used to * apply this time-out policy to core threads as well, so long as the * keepAliveTime value is non-zero.
* *
Queuing
* * BlockingQueu用于存放提交的任务,队列的实际容量与线程池大小相关联 *
Any {@link BlockingQueue} may be used to transfer and hold * submitted tasks. The use of this queue interacts with pool sizing: * *
    * 如果当前线程池小于核心线程数,执行器总是优先创建一个新线程,而不是从队列获取 *
  • If fewer than corePoolSize threads are running, the Executor * always prefers adding a new thread * rather than queuing.
  • * * 如果当前线程池大于核心线程数,执行器是从队列获取空闲线程,而不是从新建一个空闲线程 *
  • If corePoolSize or more threads are running, the Executor * always prefers queuing a request rather than adding a new * thread.
  • * * 如果当前线程池任务数量大于核心线程池数量,且队列中无空闲任务线程,将会创建一个任务线程,直到超出最大线程数后,则任务将会被拒绝 *
  • If a request cannot be queued, a new thread is created unless * this would exceed maximumPoolSize, in which case, the task will be * rejected.
  • * *
* ThreadPoolExecutor有3中出队列策略 * There are three general strategies for queuing: *
    * *
  1. Direct handoffs. A good default choice for a work * queue is a {@link SynchronousQueue} that hands off tasks to threads * without otherwise holding them. Here, an attempt to queue a task * will fail if no threads are immediately available to run it, so a * new thread will be constructed. This policy avoids lockups when * handling sets of requests that might have internal dependencies. * Direct handoffs generally require unbounded maximumPoolSizes to * avoid rejection of new submitted tasks. This in turn admits the * possibility of unbounded thread growth when commands continue to * arrive on average faster than they can be processed.
  2. * *
  3. Unbounded queues. Using an unbounded queue (for * example a {@link LinkedBlockingQueue} without a predefined * capacity) will cause new tasks to wait in the queue when all * corePoolSize threads are busy. Thus, no more than corePoolSize * threads will ever be created. (And the value of the maximumPoolSize * therefore doesn't have any effect.) This may be appropriate when * each task is completely independent of others, so tasks cannot * affect each others execution; for example, in a web page server. * While this style of queuing can be useful in smoothing out * transient bursts of requests, it admits the possibility of * unbounded work queue growth when commands continue to arrive on * average faster than they can be processed.
  4. * *
  5. Bounded queues. A bounded queue (for example, an * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when * used with finite maximumPoolSizes, but can be more difficult to * tune and control. Queue sizes and maximum pool sizes may be traded * off for each other: Using large queues and small pools minimizes * CPU usage, OS resources, and context-switching overhead, but can * lead to artificially low throughput. If tasks frequently block (for * example if they are I/O bound), a system may be able to schedule * time for more threads than you otherwise allow. Use of small queues * generally requires larger pool sizes, which keeps CPUs busier but * may encounter unacceptable scheduling overhead, which also * decreases throughput.
  6. * *
* *
* 拒绝策略 *
Rejected tasks
* * 当执行器关闭时,或者执行器用有界的最大线程池数量和任务队列容量饱 和时,新提交的任务将会被拒绝。在其他情况下,execute方法将调用RejectedExecutionHandler 的rejectedExecution方法处理任务。有四种处理策略提供如下: *
New tasks submitted in method {@link #execute(Runnable)} will be * rejected when the Executor has been shut down, and also when * the Executor uses finite bounds for both maximum threads and work queue * capacity, and is saturated. In either case, the {@code execute} method * invokes the {@link * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)} * method of its {@link RejectedExecutionHandler}. Four predefined handler * policies are provided: * *
    * *
  1. In the default {@link ThreadPoolExecutor.AbortPolicy}, the * handler throws a runtime {@link RejectedExecutionException} upon * rejection.
  2. * *
  3. In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread * that invokes {@code execute} itself runs the task. This provides a * simple feedback control mechanism that will slow down the rate that * new tasks are submitted.
  4. * *
  5. In {@link ThreadPoolExecutor.DiscardPolicy}, a task that * cannot be executed is simply dropped.
  6. * *
  7. In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the * executor is not shut down, the task at the head of the work queue * is dropped, and then execution is retried (which can fail again, * causing this to be repeated.)
  8. * *
* * It is possible to define and use other kinds of {@link * RejectedExecutionHandler} classes. Doing so requires some care * especially when policies are designed to work only under particular * capacity or queuing policies.
* *
Hook methods
* *
This class provides {@code protected} overridable * {@link #beforeExecute(Thread, Runnable)} and * {@link #afterExecute(Runnable, Throwable)} methods that are called * before and after execution of each task. These can be used to * manipulate the execution environment; for example, reinitializing * ThreadLocals, gathering statistics, or adding log entries. * Additionally, method {@link #terminated} can be overridden to perform * any special processing that needs to be done once the Executor has * fully terminated. * *

If hook or callback methods throw exceptions, internal worker * threads may in turn fail and abruptly terminate.

* *
Queue maintenance
* *
Method {@link #getQueue()} allows access to the work queue * for purposes of monitoring and debugging. Use of this method for * any other purpose is strongly discouraged. Two supplied methods, * {@link #remove(Runnable)} and {@link #purge} are available to * assist in storage reclamation when large numbers of queued tasks * become cancelled.
* *
Finalization
* *
A pool that is no longer referenced in a program AND * has no remaining threads will be {@code shutdown} automatically. If * you would like to ensure that unreferenced pools are reclaimed even * if users forget to call {@link #shutdown}, then you must arrange * that unused threads eventually die, by setting appropriate * keep-alive times, using a lower bound of zero core threads and/or * setting {@link #allowCoreThreadTimeOut(boolean)}.
* *
* *

Extension example. Most extensions of this class * override one or more of the protected hook methods. For example, * here is a subclass that adds a simple pause/resume feature: * *

 {@code
 * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
 *   private boolean isPaused;
 *   private ReentrantLock pauseLock = new ReentrantLock();
 *   private Condition unpaused = pauseLock.newCondition();
 *
 *   public PausableThreadPoolExecutor(...) { super(...); }
 *
 *   protected void beforeExecute(Thread t, Runnable r) {
 *     super.beforeExecute(t, r);
 *     pauseLock.lock();
 *     try {
 *       while (isPaused) unpaused.await();
 *     } catch (InterruptedException ie) {
 *       t.interrupt();
 *     } finally {
 *       pauseLock.unlock();
 *     }
 *   }
 *
 *   public void pause() {
 *     pauseLock.lock();
 *     try {
 *       isPaused = true;
 *     } finally {
 *       pauseLock.unlock();
 *     }
 *   }
 *
 *   public void resume() {
 *     pauseLock.lock();
 *     try {
 *       isPaused = false;
 *       unpaused.signalAll();
 *     } finally {
 *       pauseLock.unlock();
 *     }
 *   }
 * }}
* * @since 1.5 * @author Doug Lea */

代码解析

代码分析

        // 1. 如果少于核心线程数,则试着用给定的第一个task启动线程。对addworker以原子方式检查运行状态和任务统计,通过返回false,来防止在增加线程时出现错误警报
        // 2. 如果一个task能成功排队,这是我们仍旧需要两次检查,无论我们是否应该加入线程(因为最后检查时存在的已经死掉)或者在刚进入时线程池就关掉。因此我们重新检查状态,以确保停止时回滚回队列;如果没有,则启动新线程,
        // 3. 如果不能从队列拿去,我们尝试新建线程。如果失败,我们就知道已经停止或者饱和并且丢弃这个任务
        int c = ctl.get();
        // 如果少于核心线程数,则创建
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果正在运行,并且可以进入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

核心工作方法 addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    
    try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
    } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
}

查看 Worker这个类

// 包装类, 调用thread.start()方法时候,实际调用的就是Worker类的run()方法
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
        
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    {
        public void run() {
            runWorker(this);
        }
    }

具体干活的方法 runWorker

// 死循环,等条件,执行线程
final void runWorker(Worker w) {
    while (task != null || (task = getTask()) != null) {
                w.lock();
                try {
                } finally {
                 w.unlock();
                }
    }
}

判断条件 getTask()

// 死循环,拿到任务才返回
// 如果线程超时或者大于核心线程数设置, 释放线程。 
// 否则队列为空,则一直阻塞
private Runnable getTask() {
    for (;;) {
    
        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 略
        Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
        // 略
    }
}

大于核心线程数时,会根据心跳时间释放队列的线程
当小于等于核心线程数,会卡在workQueue.take()方法,直到拿到Runnable 然后返回

总结

  1. 线程池的核心线程数,最大线程数,队列,线程间的关系?
    答:创建时,当小于核心线程数,无论队列是否有空闲线程,总会新建线程。当大于核心线程数,优先获取队列的空闲线程,当队列满了,则新建线程,如果大于最大线程数,则该任务被拒绝。
    自动销毁时,大于核心线程数,会根据KeepAliveTime时间,释放队列中的线程,直至线程数总数等于核心线程数,不再释放,核心线程数通过循环维持
  2. 设置核心线程数,能大于最大线程数么?
    答:不能,看源码
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
            .....
    }   
            
    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
            .....
    }
      
  1. 释放线程时,怎样才能释放核心线程?
    allowCoreThreadTimeOut 设置为true
    释放线程时,条件判断:
private Runnable getTask() {
...
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // allowCoreThreadTimeOut 或者超过核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
...
}
  1. 线程池为什么能维持线程不释放,随时运行各种任务?
    答:核心线程是在一系列的死循环里,通过队列阻塞保留。从而可以不断接受任务来进行
// 死循环,拿到任务才返回
// 如果线程超时或者大于核心线程数设置, 释放线程。 
// 否则队列为空,则一直阻塞
private Runnable getTask() {
    for (;;) {
    
        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 略
        Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
        // 略
    }
}
展开阅读全文

页面更新:2024-04-22

标签:源码   优先级   队列   线程   个数   数量   核心   策略   时间   方法

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top