b2c信息网

您现在的位置是:首页 > 热点问题 > 正文

热点问题

java线程池的源码(java线程池源码解析)

hacker2022-08-11 03:10:26热点问题83
本文目录一览:1、【Java基础】线程池的原理是什么?2、

本文目录一览:

【Java基础】线程池的原理是什么?

什么是线程池?

总归为:池化技术 ---》数据库连接池 缓存架构 缓存池 线程池 内存池,连接池,这种思想演变成缓存架构技术--- JDK设计思想有千丝万缕的联系

首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。

Java 中的 ThreadPoolExecutor 类

java.uitl.concurrent.ThreadPoolExecutor 类是线程池中最核心的一个类,因此如果要透彻地了解Java 中的线程池,必须先了解这个类。下面我们来看一下 ThreadPoolExecutor 类的具体实现源码。

在 ThreadPoolExecutor 类中提供了四个构造方法:

从上面的代码可以得知,ThreadPoolExecutor 继承了 AbstractExecutorService 类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

下面解释下一下构造器中各个参数的含义:

corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads() 或者 prestartCoreThread()方法,从这 2 个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中;

maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize,即当线程池中的线程数大于 corePoolSize 时,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean) 方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为0;

unit:参数 keepAliveTime 的时间单位,有 7 种取值,在 TimeUnit 类中有 7 种静态属性:

workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue 和 PriorityBlockingQueue 使用较少,一般使用 LinkedBlockingQueue 和 Synchronous。线程池的排队策略与 BlockingQueue 有关。

threadFactory:线程工厂,主要用来创建线程;

handler:表示当拒绝处理任务时的策略,有以下四种取值:

具体参数的配置与线程池的关系将在下一节讲述。

从上面给出的 ThreadPoolExecutor 类的代码可以知道,ThreadPoolExecutor 继承了AbstractExecutorService,我们来看一下 AbstractExecutorService 的实现:

AbstractExecutorService 是一个抽象类,它实现了 ExecutorService 接口。

我们接着看 ExecutorService 接口的实现:

而 ExecutorService 又是继承了 Executor 接口,我们看一下 Executor 接口的实现:

java线程池怎么实现

要想理解清楚java线程池实现原理,明白下面几个问题就可以了:

(1):线程池存在哪些状态,这些状态之间是如何进行切换的呢?

(2):线程池的种类有哪些?

(3):创建线程池需要哪些参数,这些参数的具体含义是什么?

(4):将任务添加到线程池之后运行流程?

(5):线程池是怎么做到重用线程的呢?

(6):线程池的关闭

首先回答第一个问题:线程池存在哪些状态;

查看ThreadPoolExecutor源码便知晓:

[java] view plain copy

// runState is stored in the high-order bits

private static final int RUNNING    = -1  COUNT_BITS;

private static final int SHUTDOWN   =  0  COUNT_BITS;

private static final int STOP       =  1  COUNT_BITS;

private static final int TIDYING    =  2  COUNT_BITS;

private static final int TERMINATED =  3  COUNT_BITS;

存在5种状态:

1Running:可以接受新任务,同时也可以处理阻塞队列里面的任务;

2Shutdown:不可以接受新任务,但是可以处理阻塞队列里面的任务;

3Stop:不可以接受新任务,也不处理阻塞队列里面的任务,同时还中断正在处理的任务;

4Tidying:属于过渡阶段,在这个阶段表示所有的任务已经执行结束了,当前线程池中是不存在有效的线程的,并且将要调用terminated方法;

5Terminated:终止状态,这个状态是在调用完terminated方法之后所处的状态;

那么这5种状态之间是如何进行转换的呢?查看ThreadPoolExecutor源码里面的注释便可以知道啦:

[java] view plain copy

* RUNNING - SHUTDOWN

*    On invocation of shutdown(), perhaps implicitly in finalize()

* (RUNNING or SHUTDOWN) - STOP

*    On invocation of shutdownNow()

* SHUTDOWN - TIDYING

*    When both queue and pool are empty

* STOP - TIDYING

*    When pool is empty

* TIDYING - TERMINATED

*    When the terminated() hook method has completed

从上面可以看到,在调用shutdown方法的时候,线程池状态会从Running转换成Shutdown;在调用shutdownNow方法的时候,线程池状态会从Running/Shutdown转换成Stop;在阻塞队列为空同时线程池为空的情况下,线程池状态会从Shutdown转换成Tidying;在线程池为空的情况下,线程池状态会从Stop转换成Tidying;当调用terminated方法之后,线程池状态会从Tidying转换成Terminate;

在明白了线程池的各个状态以及状态之间是怎么进行切换之后,我们来看看第二个问题,线程池的种类:

(1):CachedThreadPool:缓存线程池,该类线程池中线程的数量是不确定的,理论上可以达到Integer.MAX_VALUE个,这种线程池中的线程都是非核心线程,既然是非核心线程,那么就存在超时淘汰机制了,当里面的某个线程空闲时间超过了设定的超时时间的话,就会回收掉该线程;

(2):FixedThreadPool:固定线程池,这类线程池中是只存在核心线程的,对于核心线程来说,如果我们不设置allowCoreThreadTimeOut属性的话是不存在超时淘汰机制的,这类线程池中的corePoolSize的大小是等于maximumPoolSize大小的,也就是说,如果线程池中的线程都处于活动状态的话,如果有新任务到来,他是不会开辟新的工作线程来处理这些任务的,只能将这些任务放到阻塞队列里面进行等到,直到有核心线程空闲为止;

(3):ScheduledThreadPool:任务线程池,这种线程池中核心线程的数量是固定的,而对于非核心线程的数量是不限制的,同时对于非核心线程是存在超时淘汰机制的,主要适用于执行定时任务或者周期性任务的场景;

(4):SingleThreadPool:单一线程池,线程池里面只有一个线程,同时也不存在非核心线程,感觉像是FixedThreadPool的特殊版本,他主要用于确保任务在同一线程中的顺序执行,有点类似于进行同步吧;

接下来我们来看第三个问题,创建线程池需要哪些参数:

同样查看ThreadPoolExecutor源码,查看创建线程池的构造函数:

[java] view plain copy

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueueRunnable workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler)

不管你调用的是ThreadPoolExecutor的哪个构造函数,最终都会执行到这个构造函数的,这个构造函数有7个参数,正是由于对这7个参数值的赋值不同,造成生成不同类型的线程池,比如我们常见的CachedThreadPoolExecutor、FixedThreadPoolExecutor

SingleThreadPoolExecutor、ScheduledThreadPoolExecutor,我们老看看这几个参数的具体含义:

1corePoolSize:线程池中核心线程的数量;当提交一个任务到线程池的时候,线程池会创建一个线程来执行执行任务,即使有其他空闲的线程存在,直到线程数达到corePoolSize时不再创建,这时候会把提交的新任务放入到阻塞队列中,如果调用了线程池的preStartAllCoreThreads方法,则会在创建线程池的时候初始化出来核心线程;

2maximumPoolSize:线程池允许创建的最大线程数;如果阻塞队列已经满了,同时已经创建的线程数小于最大线程数的话,那么会创建新的线程来处理阻塞队列中的任务;

3keepAliveTime:线程活动保持时间,指的是工作线程空闲之后继续存活的时间,默认情况下,这个参数只有线程数大于corePoolSize的时候才会起作用,即当线程池中的线程数目大于corePoolSize的时候,如果某一个线程的空闲时间达到keepAliveTime,那么这个线程是会被终止的,直到线程池中的线程数目不大于corePoolSize;如果调用allowCoreThreadTimeOut的话,在线程池中线程数量不大于corePoolSize的时候,keepAliveTime参数也可以起作用的,知道线程数目为0为止;

4unit:参数keepAliveTime的时间单位;

5workQueue:阻塞队列;用于存储等待执行的任务,有四种阻塞队列类型,ArrayBlockingQueue(基于数组的有界阻塞队列)、LinkedBlockingQueue(基于链表结构的阻塞队列)、SynchronousQueue(不存储元素的阻塞队列)、PriorityBlockingQueue(具有优先级的阻塞队列);

6threadFactory:用于创建线程的线程工厂;

7handler:当阻塞队列满了,且没有空闲线程的情况下,也就是说这个时候,线程池中的线程数目已经达到了最大线程数量,处于饱和状态,那么必须采取一种策略来处理新提交的任务,我们可以自己定义处理策略,也可以使用系统已经提供给我们的策略,先来看看系统为我们提供的4种策略,AbortPolicy(直接抛出异常)、CallerRunsPolicy(只有调用者所在的线程来运行任务)、DiscardOldestPolicy(丢弃阻塞队列中最近的一个任务,并执行当前任务)、Discard(直接丢弃);

接下来就是将任务添加到线程池之后的运行流程了;

我们可以调用submit或者execute方法,两者最大的区别在于,调用submit方法的话,我们可以传入一个实现Callable接口的对象,进而能在当前任务执行结束之后通过Future对象获得任务的返回值,submit内部实际上还是执行的execute方法;而调用execute方法的话,是不能获得任务执行结束之后的返回值的;此外,调用submit方法的话是可以抛出异常的,但是调用execute方法的话,异常在其内部得到了消化,也就是说异常在其内部得到了处理,不会向外传递的;

因为submit方法最终也是会执行execute方法的,因此我们只需要了解execute方法就可以了:

在execute方法内部会分三种情况来进行处理:

1:首先判断当前线程池中的线程数量是否小于corePoolSize,如果小于的话,则直接通过addWorker方法创建一个新的Worker对象来执行我们当前的任务;

2:如果说当前线程池中的线程数量大于corePoolSize的话,那么会尝试将当前任务添加到阻塞队列中,然后第二次检查线程池的状态,如果线程池不在Running状态的话,会将刚刚添加到阻塞队列中的任务移出,同时拒绝当前任务请求;如果第二次检查发现当前线程池处于Running状态的话,那么会查看当前线程池中的工作线程数量是否为0,如果为0的话,就会通过addWorker方法创建一个Worker对象出来处理阻塞队列中的任务;

3:如果原先线程池就不处于Running状态或者我们刚刚将当前任务添加到阻塞队列的时候出现错误的话,那么会去尝试通过addWorker创建新的Worker来处理当前任务,如果添加失败的话,则拒绝当前任务请求;

可以看到在上面的execute方法中,我们仅仅只是检查了当前线程池中的线程数量有没有超过corePoolSize的情况,那么当前线程池中的线程数量有没有超过maximumPoolSize是在哪里检测的呢?实际上是在addWorker方法里面了,我们可以看下addWorker里面的一段代码:

[java] view plain copy

if (wc = CAPACITY ||

wc = (core ? corePoolSize : maximumPoolSize))

return false;

如果当前线程数量超过maximumPoolSize的话,直接就会调用return方法,返回false;

其实到这里我们很明显可以知道,一个线程池中线程的数量实际上就是这个线程池中Worker的数量,如果Worker的大小超过了corePoolSize,那么任务都在阻塞队列里面了,Worker是Java对我们任务的一个封装类,他的声明是酱紫的:

[java] view plain copy

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

可以看到他实现了Runnable接口,他是在addWorker方法里面通过new Worker(firstTask)创建的,我们来看看他的构造函数就知道了:

[java] view plain copy

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

而这里的firstTask其实就是我们调用execute或者submit的时候传入的那个参数罢了,一般来说这些参数是实现Callable或者Runnable接口的;

在通过addWorker方法创建出来Worker对象之后,这个方法的最后会执行Worker内部thread属性的start方法,而这个thread属性实际上就是封装了Worker的Thread,执行他的start方法实际上执行的是Worker的run方法,因为Worker是实现了Runnable接口的,在run方法里面就会执行runWorker方法,而runWorker方法里面首先会判断当前我们传入的任务是否为空,不为空的话直接就会执行我们通过execute或者submit方法提交的任务啦,注意一点就是我们虽然会通过submit方法提交实现了Callable接口的对象,但是在调用submit方法的时候,其实是会将Callable对象封装成实现了Runnable接口对象的,不信我们看看submit方法源码是怎么实现的:

[java] view plain copy

public T FutureT submit(CallableT task) {

if (task == null) throw new NullPointerException();

RunnableFutureT ftask = newTaskFor(task);

execute(ftask);

return ftask;

}

看到没有呢,实际上在你传入实现了Callable接口对象的时候,在submit方法里面是会将其封装成RunnableFuture对象的,而RunnableFuture接口是继承了Runnable接口的;那么说白了其实就是直接执行我们提交任务的run方法了;如果为空的话,则会通过getTask方法从阻塞队列里面拿出一个任务去执行;在任务执行结束之后继续从阻塞队列里面拿任务,直到getTask的返回值为空则退出runWorker内部循环,那么什么情况下getTask返回为空呢?查看getTask方法的源码注释可以知道:在Worker必须需要退出的情况下getTask会返回空,具体什么情况下Worker会退出呢?(1):当Worker的数量超过maximumPoolSize的时候;(2):当线程池状态为Stop的时候;(3):当线程池状态为Shutdown并且阻塞队列为空的时候;(4):使用等待超时时间从阻塞队列中拿数据,但是超时之后仍然没有拿到数据;

如果runWorker方法退出了它里面的循环,那么就说明当前阻塞队列里面是没有任务可以执行的了,你可以看到在runWorker方法内部的finally语句块中执行了processWorkerExit方法,用来对Worker对象进行回收操作,这个方法会传入一个参数表示需要删除的Worker对象;在进行Worker回收的时候会调用tryTerminate方法来尝试关闭线程池,在tryTerminate方法里面会检查是否有Worker在工作,检查线程池的状态,没问题的话就会将当前线程池的状态过渡到Tidying,之后调用terminated方法,将线程池状态更新到Terminated;

从上面的分析中,我们可以看出线程池运行的4个阶段:

(1):poolSize corePoolSize,则直接创建新的线程(核心线程)来执行当前提交的任务;

(2):poolSize = corePoolSize,并且此时阻塞队列没有满,那么会将当前任务添加到阻塞队列中,如果此时存在工作线程(非核心线程)的话,那么会由工作线程来处理该阻塞队列中的任务,如果此时工作线程数量为0的话,那么会创建一个工作线程(非核心线程)出来;

(3):poolSize = corePoolSize,并且此时阻塞队列已经满了,那么会直接创建新的工作线程(非核心线程)来处理阻塞队列中的任务;

(4):poolSize = maximumPoolSize,并且此时阻塞队列也满了的话,那么会触发拒绝机制,具体决绝策略采用的是什么就要看我们创建ThreadPoolExecutor的时候传入的RejectExecutionHandler参数了;

接下来就是线程池是怎么做到重用线程的呢?

个人认为线程池里面重用线程的工作是在getTask里面实现的,在getTask里面是存在两个for死循环嵌套的,他会不断的从阻塞对列里面取出需要执行的任务,返回给我们的runWorker方法里面,而在runWorker方法里面只要getTask返回的任务不是空就会执行该任务的run方法来处理它,这样一直执行下去,直到getTask返回空为止,此时的情况就是阻塞队列里面没有任务了,这样一个线程处理完一个任务之后接着再处理阻塞队列中的另一个任务,当然在线程池中的不同线程是可以并发处理阻塞队列中的任务的,最后在阻塞队列内部不存在任务的时候会去判断是否需要回收Worker对象,其实Worker对象的个数就是线程池中线程的个数,至于什么情况才需要回收,上面已经说了,就是四种情况了;

最后就是线程池是怎样被关闭的呢?

涉及到线程池的关闭,需要用到两个方法,shutdown和shutdownNow,他们都是位于ThreadPoolExecutor里面的,对于shutdown的话,他会将线程池状态切换成Shutdown,此时是不会影响对阻塞队列中任务执行的,但是会拒绝执行新加进来的任务,同时会回收闲置的Worker;而shutdownNow方法会将线程池状态切换成Stop,此时既不会再去处理阻塞队列里面的任务,也不会去处理新加进来的任务,同时会回收所有Worker;

线程池参数 java 核心

总结起来就是:最大线程数参数,是在已经达到核心线程池参数,并且任务队列已经满的... java线程池类型及选择 通过查看源码,java中存在以下线程池

java线程池原理

线程池把先前创建的线程重用于当前任务。这就解决了需要太多线程的问题,因此内存不足不是一个选择。您甚至可以把线程池视为回收系统。它不止消除了用尽内存的选项,而且还使应用程序非常快速地响应,原因是当请求到达时已经存在一个线程。

工作流程步骤:

创建要执行的任务

使用执行程序创建执行程序池

把任务传递给执行程序池

关闭执行程序池

Java线程池中的核心线程是如何被重复利用的

Java线程池中的核心线程是如何被重复利用的?

引言

在Java开发中,经常需要创建线程去执行一些任务,实现起来也非常方便,但如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。此时,我们很自然会想到使用线程池来解决这个问题。

使用线程池的好处:

降低资源消耗。java中所有的池化技术都有一个好处,就是通过复用池中的对象,降低系统资源消耗。设想一下如果我们有n多个子任务需要执行,如果我们为每个子任务都创建一个执行线程,而创建线程的过程是需要一定的系统消耗的,最后肯定会拖慢整个系统的处理速度。而通过线程池我们可以做到复用线程,任务有多个,但执行任务的线程可以通过线程池来复用,这样减少了创建线程的开销,系统资源利用率得到了提升。

降低管理线程的难度。多线程环境下对线程的管理是最容易出现问题的,而线程池通过框架为我们降低了管理线程的难度。我们不用再去担心何时该销毁线程,如何最大限度的避免多线程的资源竞争。这些事情线程池都帮我们代劳了。

提升任务处理速度。线程池中长期驻留了一定数量的活线程,当任务需要执行时,我们不必先去创建线程,线程池会自己选择利用现有的活线程来处理任务。

很显然,线程池一个很显著的特征就是“长期驻留了一定数量的活线程”,避免了频繁创建线程和销毁线程的开销,那么它是如何做到的呢?我们知道一个线程只要执行完了run()方法内的代码,这个线程的使命就完成了,等待它的就是销毁。既然这是个“活线程”,自然是不能很快就销毁的。为了搞清楚这个“活线程”是如何工作的,下面通过追踪源码来看看能不能解开这个疑问。

分析方法

在分析源码之前先来思考一下要怎么去分析,源码往往是比较复杂的,如果知识储备不够丰厚,很有可能会读不下去,或者读岔了。一般来讲要时刻紧跟着自己的目标来看代码,跟目标关系不大的代码可以不理会它,一些异常的处理也可以暂不理会,先看正常的流程。就我们现在要分析的源码而言,目标就是看看线程是如何被复用的。那么对于线程池的状态的管理以及非正常状态下的处理代码就可以不理会,具体来讲,在ThreadPollExcutor类中,有一个字段 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 是对线程池的运行状态和线程池中有效线程的数量进行控制的, 它包含两部分信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),还有几个对ctl进行计算的方法:

// 获取运行状态

private static int runStateOf(int c)     { return c ~CAPACITY; }

// 获取活动线程数

private static int workerCountOf(int c)  { return c CAPACITY; }123456

以上两个方法在源码中经常用到,结合我们的目标,对运行状态的一些判断及处理可以不用去管,而对当前活动线程数要加以关注等等。

下面将遵循这些原则来分析源码。

解惑

当我们要向线程池添加一个任务时是调用ThreadPollExcutor对象的execute(Runnable command)方法来完成的,所以先来看看ThreadPollExcutor类中的execute(Runnable command)方法的源码:

public void execute(Runnable command) {

   if (command == null)

       throw new NullPointerException();

   int c = ctl.get();

   if (workerCountOf(c) corePoolSize) {

       if (addWorker(command, true))

           return;

       c = ctl.get();

   }

   if (isRunning(c) workQueue.offer(command)) {

       int recheck = ctl.get();

       if (! isRunning(recheck) remove(command))

           reject(command);

       else if (workerCountOf(recheck) == 0)

           addWorker(null, false);

   }

   else if (!addWorker(command, false))

       reject(command);

}123456789101112131415161718192021

按照我们在分析方法中提到的一些原则,去掉一些相关性不强的代码,看看核心代码是怎样的。

// 为分析而简化后的代码

public void execute(Runnable command) {

   int c = ctl.get();

   if (workerCountOf(c) corePoolSize) {

       // 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中,并把任务添加到该线程中

       if (addWorker(command, true))

           return;

       c = ctl.get();

   }

   // 如果当前活动线程数大于等于corePoolSize,则尝试将任务放入缓存队列

   if (workQueue.offer(command)) {

       int recheck = ctl.get();

       if (workerCountOf(recheck) == 0)

           addWorker(null, false);

   }else {

       // 缓存已满,新建一个线程放入线程池,并把任务添加到该线程中(此时新建的线程相当于非核心线程)

       addWorker(command, false)

   }

}12345678910111213141516171819202122

这样一看,逻辑应该清晰很多了。

如果 当前活动线程数 指定的核心线程数,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于核心线程);

如果 当前活动线程数 = 指定的核心线程数,且缓存队列未满,则将任务添加到缓存队列中;

如果 当前活动线程数 = 指定的核心线程数,且缓存队列已满,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于非核心线程);

接下来看 addWorker(Runnable firstTask, boolean core)方法

private boolean addWorker(Runnable firstTask, boolean core) {

   retry:

   for (;;) {

       int c = ctl.get();

       int rs = runStateOf(c);

       // Check if queue empty only if necessary.

       if (rs = SHUTDOWN

           ! (rs == SHUTDOWN

              firstTask == null

              ! workQueue.isEmpty()))

           return false;

       for (;;) {

           int wc = workerCountOf(c);

           if (wc = CAPACITY ||

               wc = (core ? corePoolSize : maximumPoolSize))

               return false;

           if (compareAndIncrementWorkerCount(c))

               break retry;

           c = ctl.get();  // Re-read ctl

           if (runStateOf(c) != rs)

               continue retry;

           // else CAS failed due to workerCount change; retry inner loop

       }

   }

   boolean workerStarted = false;

   boolean workerAdded = false;

   Worker w = null;

   try {

       w = new Worker(firstTask);

       final Thread t = w.thread;

       if (t != null) {

           final ReentrantLock mainLock = this.mainLock;

           mainLock.lock();

           try {

               // Recheck while holding lock.

               // Back out on ThreadFactory failure or if

               // shut down before lock acquired.

               int rs = runStateOf(ctl.get());

               if (rs SHUTDOWN ||

                   (rs == SHUTDOWN firstTask == null)) {

                   if (t.isAlive()) // precheck that t is startable

                       throw new IllegalThreadStateException();

                   workers.add(w);

                   int s = workers.size();

                   if (s largestPoolSize)

                       largestPoolSize = s;

                   workerAdded = true;

               }

           } finally {

               mainLock.unlock();

           }

           if (workerAdded) {

               t.start();

               workerStarted = true;

           }

       }

   } finally {

       if (! workerStarted)

           addWorkerFailed(w);

   }

   return workerStarted;

}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667

同样,我们也来简化一下:

// 为分析而简化后的代码

private boolean addWorker(Runnable firstTask, boolean core) {

   int wc = workerCountOf(c);

   if (wc = (core ? corePoolSize : maximumPoolSize))

       // 如果当前活动线程数 = 指定的核心线程数,不创建核心线程

       // 如果当前活动线程数 = 指定的最大线程数,不创建非核心线程  

       return false;

   boolean workerStarted = false;

   boolean workerAdded = false;

   Worker w = null;

   try {

       // 新建一个Worker,将要执行的任务作为参数传进去

       w = new Worker(firstTask);

       final Thread t = w.thread;

       if (t != null) {

           workers.add(w);

           workerAdded = true;

           if (workerAdded) {

               // 启动刚刚新建的那个worker持有的线程,等下要看看这个线程做了啥

               t.start();

               workerStarted = true;

           }

       }

   } finally {

       if (! workerStarted)

           addWorkerFailed(w);

   }

   return workerStarted;

}1234567891011121314151617181920212223242526272829303132

看到这里,我们大概能猜测到,addWorker方法的功能就是新建一个线程并启动这个线程,要执行的任务应该就是在这个线程中执行。为了证实我们的这种猜测需要再来看看Worker这个类。

private final class Worker

   extends AbstractQueuedSynchronizer

   implements Runnable{

   // ....

}

Worker(Runnable firstTask) {

   setState(-1); // inhibit interrupts until runWorker

   this.firstTask = firstTask;

   this.thread = getThreadFactory().newThread(this);

}123456789101112

从上面的Worker类的声明可以看到,它实现了Runnable接口,以及从它的构造方法中可以知道待执行的任务赋值给了它的变量firstTask,并以它自己为参数新建了一个线程赋值给它的变量thread,那么运行这个线程的时候其实就是执行Worker的run()方法,来看一下这个方法:

   public void run() {

       runWorker(this);

   }

   final void runWorker(Worker w) {

   Thread wt = Thread.currentThread();

   Runnable task = w.firstTask;

   w.firstTask = null;

   w.unlock(); // allow interrupts

   boolean completedAbruptly = true;

   try {

       while (task != null || (task = getTask()) != null) {

           w.lock();

           // If pool is stopping, ensure thread is interrupted;

           // if not, ensure thread is not interrupted.  This

           // requires a recheck in second case to deal with

           // shutdownNow race while clearing interrupt

           if ((runStateAtLeast(ctl.get(), STOP) ||

                (Thread.interrupted()

                 runStateAtLeast(ctl.get(), STOP)))

               !wt.isInterrupted())

               wt.interrupt();

           try {

               beforeExecute(wt, task);

               Throwable thrown = null;

               try {

                   task.run();

               } catch (RuntimeException x) {

                   thrown = x; throw x;

               } catch (Error x) {

                   thrown = x; throw x;

               } catch (Throwable x) {

                   thrown = x; throw new Error(x);

               } finally {

                   afterExecute(task, thrown);

               }

           } finally {

               task = null;

               w.completedTasks++;

               w.unlock();

           }

       }

       completedAbruptly = false;

   } finally {

       processWorkerExit(w, completedAbruptly);

   }

}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748

在run()方法中只调了一下 runWorker(this) 方法,再来简化一下这个 runWorker() 方法

// 为分析而简化后的代码

final void runWorker(Worker w) {

   Runnable task = w.firstTask;

   w.firstTask = null;

   while (task != null || (task = getTask()) != null) {

           try {

               task.run();

           } finally {

               task = null;

           }

       }

}12345678910111213

很明显,runWorker()方法里面执行了我们新建Worker对象时传进去的待执行的任务,到这里为止貌似这个worker的run()方法就执行完了,既然执行完了那么这个线程也就没用了,只有等待虚拟机销毁了。那么回顾一下我们的目标:Java线程池中的核心线程是如何被重复利用的?好像并没有重复利用啊,新建一个线程,执行一个任务,然后就结束了,销毁了。没什么特别的啊,难道有什么地方漏掉了,被忽略了?再仔细看一下runWorker()方法的代码,有一个while循环,当执行完firstTask后task==null了,那么就会执行判断条件 (task = getTask()) != null,我们假设这个条件成立的话,那么这个线程就不止只执行一个任务了,可以执行多个任务了,也就实现了重复利用了。答案呼之欲出了,接着看getTask()方法

private Runnable getTask() {

   boolean timedOut = false; // Did the last poll() time out?

   for (;;) {

       int c = ctl.get();

       int rs = runStateOf(c);

       // Check if queue empty only if necessary.

       if (rs = SHUTDOWN (rs = STOP || workQueue.isEmpty())) {

           decrementWorkerCount();

           return null;

       }

       int wc = workerCountOf(c);

       // Are workers subject to culling?

       boolean timed = allowCoreThreadTimeOut || wc corePoolSize;

       if ((wc maximumPoolSize || (timed timedOut))

            (wc 1 || workQueue.isEmpty())) {

           if (compareAndDecrementWorkerCount(c))

               return null;

           continue;

       }

       try {

           Runnable r = timed ?

               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

               workQueue.take();

           if (r != null)

               return r;

           timedOut = true;

       } catch (InterruptedException retry) {

           timedOut = false;

       }

   }

}1234567891011121314151617181920212223242526272829303132333435363738

老规矩,简化一下代码来看:

// 为分析而简化后的代码

private Runnable getTask() {

   boolean timedOut = false;

   for (;;) {

       int c = ctl.get();

       int wc = workerCountOf(c);

       // timed变量用于判断是否需要进行超时控制。

       // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;

       // wc corePoolSize,表示当前线程池中的线程数量大于核心线程数量;

       // 对于超过核心线程数量的这些线程,需要进行超时控制

       boolean timed = allowCoreThreadTimeOut || wc corePoolSize;

       if (timed timedOut) {

           // 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时,那么尝试将workerCount减1,即当前活动线程数减1,

           // 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了

           if (compareAndDecrementWorkerCount(c))

               return null;

           continue;

       }

       try {

           Runnable r = timed ?

               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

               workQueue.take();

           // 注意workQueue中的poll()方法与take()方法的区别

           //poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null

           //take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止

           if (r != null)

               return r;

           timedOut = true;

       } catch (InterruptedException retry) {

           timedOut = false;

       }

   }

}123456789101112131415161718192021222324252627282930313233343536373839

从以上代码可以看出,getTask()的作用是

如果当前活动线程数大于核心线程数,当去缓存队列中取任务的时候,如果缓存队列中没任务了,则等待keepAliveTime的时长,此时还没任务就返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数就会这样一个一个地销毁这些多余的线程。

如果当前活动线程数小于等于核心线程数,同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态,直到能取出任务为止,因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的。

小结

通过以上的分析,应该算是比较清楚地解答了“线程池中的核心线程是如何被重复利用的”这个问题,同时也对线程池的实现机制有了更进一步的理解:

当有新任务来的时候,先看看当前的线程数有没有超过核心线程数,如果没超过就直接新建一个线程来执行新的任务,如果超过了就看看缓存队列有没有满,没满就将新任务放进缓存队列中,满了就新建一个线程来执行新的任务,如果线程池中的线程数已经达到了指定的最大线程数了,那就根据相应的策略拒绝任务。

当缓存队列中的任务都执行完了的时候,线程池中的线程数如果大于核心线程数,就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时这些线程就不会被销毁了,它们一直处于阻塞状态,等待新的任务到来。

注意: 

本文所说的“核心线程”、“非核心线程”是一个虚拟的概念,是为了方便描述而虚拟出来的概念,在代码中并没有哪个线程被标记为“核心线程”或“非核心线程”,所有线程都是一样的,只是当线程池中的线程多于指定的核心线程数量时,会将多出来的线程销毁掉,池中只保留指定个数的线程。那些被销毁的线程是随机的,可能是第一个创建的线程,也可能是最后一个创建的线程,或其它时候创建的线程。一开始我以为会有一些线程被标记为“核心线程”,而其它的则是“非核心线程”,在销毁多余线程的时候只销毁那些“非核心线程”,而“核心线程”不被销毁。这种理解是错误的。

另外还有一个重要的接口 BlockingQueue 值得去了解,它定义了一些入队出队同步操作的方法,还可以阻塞,作用很大。

java 线程池 工作队列是如何工作的

使用线程池的好处

1、降低资源消耗

可以重复利用已创建的线程降低线程创建和销毁造成的消耗。

2、提高响应速度

当任务到达时,任务可以不需要等到线程创建就能立即执行。

3、提高线程的可管理性

线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控

线程池的工作原理

首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的

1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。

2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步

3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务

线程池饱和策略

这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:

AbortPolicy

为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。

DiscardPolicy

直接抛弃,任务不执行,空方法

DiscardOldestPolicy

从队列里面抛弃head的一个任务,并再次execute 此task。

CallerRunsPolicy

在调用execute的线程里面执行此command,会阻塞入口

用户自定义拒绝策略(最常用)

实现RejectedExecutionHandler,并自己定义策略模式

下我们以ThreadPoolExecutor为例展示下线程池的工作流程图

1.jpg

2.jpg

1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

线程池只是并发编程中的一小部分,下图是史上最全面的Java的并发编程学习技术总汇

3.jpg

关键方法源码分析

我们看看核心方法添加到线程池方法execute的源码如下:

//     //Executes the given task sometime in the future.  The task     //may execute in a new thread or in an existing pooled thread.     //     // If the task cannot be submitted for execution, either because this     // executor has been shutdown or because its capacity has been reached,     // the task is handled by the current {@code RejectedExecutionHandler}.     //     // @param command the task to execute     // @throws RejectedExecutionException at discretion of     //         {@code RejectedExecutionHandler}, if the task     //         cannot be accepted for execution     // @throws NullPointerException if {@code command} is null     //    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        //         // Proceed in 3 steps:         //         // 1. If fewer than corePoolSize threads are running, try to         // start a new thread with the given command as its first         // task.  The call to addWorker atomically checks runState and         // workerCount, and so prevents false alarms that would add         // threads when it shouldn't, by returning false.         // 翻译如下:         // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,         // 如果能完成新线程创建exexute方法结束,成功提交任务         // 2. If a task can be successfully queued, then we still need         // to double-check whether we should have added a thread         // (because existing ones died since last checking) or that         // the pool shut down since entry into this method. So we         // recheck state and if necessary roll back the enqueuing if         // stopped, or start a new thread if there are none.         // 翻译如下:         // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态         // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要         // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;         // 3. If we cannot queue task, then we try to add a new         // thread.  If it fails, we know we are shut down or saturated         // and so reject the task.         // 翻译如下:         // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池         // 已经达到饱和状态,所以reject这个他任务         //        int c = ctl.get();        // 工作线程数小于核心线程数        if (workerCountOf(c) corePoolSize) {            // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize            if (addWorker(command, true))                return;            c = ctl.get();        }        // 如果工作线程数大于等于核心线程数        // 线程的的状态未RUNNING并且队列notfull        if (isRunning(c) workQueue.offer(command)) {            // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除            int recheck = ctl.get();            if (! isRunning(recheck) remove(command))                // 移除成功,拒绝该非运行的任务                reject(command);            else if (workerCountOf(recheck) == 0)                // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。                // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务                addWorker(null, false);        }        // 如果队列满了或者是非运行的任务都拒绝执行        else if (!addWorker(command, false))            reject(command);    }

发表评论

评论列表

  • 离鸢海夕(2022-08-11 14:13:01)回复取消回复

    ut = true;       } catch (InterruptedException retry) {           timedOut = false;       }   }}1234567891

  • 掩吻艳鬼(2022-08-11 08:08:51)回复取消回复

    etTask()) != null) {           try {               task.run();           } finally {               task = null;       

  • 孤鱼萌懂(2022-08-11 14:38:17)回复取消回复

    rivate static final int SHUTDOWN   =  0  COUNT_BITS;private static final int STOP       =  1  COUNT_BITS;

  • 野欢顾执(2022-08-11 13:21:13)回复取消回复

    wn = x; throw x;               } catch (Throwable x) {                   thrown = x;

  • 鹿岛雾月(2022-08-11 13:29:03)回复取消回复

    大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为0;unit:参数 keepAliveTime 的时间单位,有 7 种取值,在 Time