线程池
为什么要使用线程池
使用线程池主要有以下三个原因:
- 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程。
- 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
- 可以对线程做统一管理。
线程池的原理
Java中的线程池顶层接口是Executor
接口,ThreadPoolExecutor
是这个接口的实现类。
我们先看看ThreadPoolExecutor
类。
ThreadPoolExecutor提供的构造方法
一共有四个构造方法:
|
|
涉及到5~7个参数,我们先看看必须的5个参数是什么意思:
-
int corePoolSize:该线程池中核心线程数最大值
核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。
-
int maximumPoolSize:该线程池中线程总数最大值 。
该值等于核心线程数量 + 非核心线程数量。
-
long keepAliveTime:非核心线程闲置超时时长。
非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置allowCoreThreadTimeOut(true),则会也作用于核心线程。
-
TimeUnit unit:keepAliveTime的单位。
TimeUnit是一个枚举类型 ,包括以下属性:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 = 1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒 MINUTES : 分 HOURS : 小时 DAYS : 天
-
BlockingQueue workQueue:阻塞队列,维护着等待执行的Runnable任务对象。
常用的几个阻塞队列:
-
LinkedBlockingQueue
链式阻塞队列,底层数据结构是链表,默认大小是
Integer.MAX_VALUE
,也可以指定大小。 -
ArrayBlockingQueue
数组阻塞队列,底层数据结构是数组,需要指定队列的大小。
-
SynchronousQueue
同步队列,内部容量为0,每个put操作必须等待一个take操作,反之亦然。
-
DelayQueue
延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。
-
我们将在下一章中重点介绍各种阻塞队列
好了,介绍完5个必须的参数之后,还有两个非必须的参数。
-
ThreadFactory threadFactory
创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。
|
|
-
RejectedExecutionHandler handler
拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :
- ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
ThreadPoolExecutor的策略
线程池本身有一个调度线程,这个线程就是用于管理布控整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。
故线程池也有自己的状态, 分别为RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。
-
线程池创建后处于RUNNING状态。
-
调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,会等待阻塞队列的任务完成。
-
调用shutdownNow()方法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。
-
当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。
ThreadPoolExecutor中有一个控制状态的属性叫ctl,它是一个AtomicInteger类型的变量。
-
线程池处在TIDYING状态时,执行完terminated()方法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态。
线程池内部状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl //获取高三位也就是线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //获取低29位也就是线程池中的线程数 private static int workerCountOf(int c) { return c & CAPACITY; } //pack, 根据线程池状态以及线程数打包成ctl。 private static int ctlOf(int rs, int wc) { return rs | wc; }
其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态: 1、RUNNING:
-1 << COUNT_BITS
,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务; 2、SHUTDOWN:0 << COUNT_BITS
,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务; 3、STOP :1 << COUNT_BITS
,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务; 4、TIDYING :2 << COUNT_BITS
,即高3位为010; 5、TERMINATED:3 << COUNT_BITS
,即高3位为011;
线程池主要的任务处理流程
处理任务的核心方法是execute
,我们看看 JDK 1.8 源码中ThreadPoolExecutor
是如何处理线程任务的:
|
|
ctl.get()
是获取线程池状态,用int
类型表示。第二步中,入队前进行了一次isRunning
判断,入队之后,又进行了一次isRunning
判断。
为什么要二次检查线程池的状态?
在多线程的环境下,线程池的状态是时刻发生变化的。很有可能刚获取线程池状态后线程池状态就改变了。判断是否将command
加入workqueue
是线程池之前的状态。倘若没有二次检查,万一线程池处于非RUNNING状态(在多线程环境下很有可能发生),那么command
永远不会执行。
总结一下处理流程
- 线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到corePoolSize,在核心线程数量 < corePoolSize时)。注意,这一步需要获得全局锁。
- 线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)。
- 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行这些任务了。于是会创建非核心线程去执行这个任务。注意,这一步需要获得全局锁。
- 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理。
整个过程如图所示:
ThreadPoolExecutor如何做到线程复用的
我们知道,一个线程在创建的时候会指定一个线程任务,当执行完这个线程任务之后,线程自动销毁。但是线程池却可以复用线程,即一个线程执行完线程任务后不销毁,继续执行另外的线程任务。那么,线程池如何做到线程复用呢?
原来,ThreadPoolExecutor在创建线程时,会将线程封装成工作线程worker,并放入工作线程组中,然后这个worker反复从阻塞队列中拿任务去执行。话不多说,我们继续看看源码(一定要仔细看,前后有联系)
这里的addWorker
方法是在上面提到的execute
方法里面调用的,先看看上半部分:
|
|
上半部分主要是判断线程数量是否超出阈值,超过了就返回false。我们继续看下半部分:
|
|
创建worker
对象,并初始化一个Thread
对象,然后启动这个线程对象。
我们接着看看Worker
类,仅展示部分源码:
|
|
Worker
类实现了Runnable
接口,所以Worker
也是一个线程任务。在构造方法中,创建了一个线程,线程的任务就是自己。故addWorker
方法调用addWorker方法源码下半部分中的第4步t.start
,会触发Worker
类的run
方法被JVM调用。
我们再看看runWorker
的逻辑:
|
|
首先去执行创建这个worker时就有的任务,当执行完这个任务后,worker的生命周期并没有结束,在while
循环中,worker会不断地调用getTask
方法从阻塞队列中获取任务然后调用task.run()
执行任务,从而达到复用线程的目的。只要getTask
方法不返回null
,此线程就不会退出。
当然,核心线程池中创建的线程想要拿到阻塞队列中的任务,先要判断线程池的状态,如果STOP或者TERMINATED,返回null
。
最后看看getTask
方法的实现:
|
|
核心线程的会一直卡在workQueue.take
方法,被阻塞并挂起,不会占用CPU资源,直到拿到Runnable
然后返回(当然如果allowCoreThreadTimeOut设置为true
,那么核心线程就会去调用poll
方法,因为poll
可能会返回null
,所以这时候核心线程满足超时条件也会被销毁)。
非核心线程会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null
,Worker对象的run()
方法循环体的判断为null
,任务结束,然后线程被系统回收 。
四种常见的线程池
Executors
类中提供的几个静态方法来创建线程池。大家到了这一步,如果看懂了前面讲的ThreadPoolExecutor
构造方法中各种参数的意义,那么一看到Executors
类中提供的线程池的源码就应该知道这个线程池是干嘛的。
newCachedThreadPool
CacheThreadPool
的运行流程如下:
- 提交任务进线程池。
- 因为corePoolSize为0的关系,不创建核心线程,线程池最大为Integer.MAX_VALUE。
- 尝试将任务添加到SynchronousQueue队列。
- 如果SynchronousQueue入列成功,等待被当前运行的空闲线程拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从SynchronousQueue拉取任务并在当前线程执行。
- 如果SynchronousQueue已有任务在等待,入列操作将会阻塞。
当需要执行很多短时间的任务时,CacheThreadPool的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源。
newFixedThreadPool
|
|
核心线程数量和总线程数量相等,都是传入的参数nThreads,所以只能创建核心线程,不能创建非核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。
与CachedThreadPool的区别:
- 因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核心线程。 而CachedThreadPool因为corePoolSize=0,所以只会创建非核心线程。
- 在 getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。 CachedThreadPool会在60s后收回。
- 由于线程不会被回收,会一直卡在阻塞,所以没有任务的情况下, FixedThreadPool占用资源更多。
- 都几乎不会触发拒绝策略,但是原理不同。FixedThreadPool是因为阻塞队列可以很大(最大为Integer最大值),故几乎不会触发拒绝策略;CachedThreadPool是因为线程池很大(最大为Integer最大值),几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。
newSingleThreadPool
|
|
有且仅有一个核心线程( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
|
|
四种常见的线程池基本够我们使用了,但是《阿里把把开发手册》不建议我们直接使用Executors类中的线程池,而是通过ThreadPoolExecutor
的方式,这样的处理方式让写的同学需要更加明确线程池的运行规则,规避资源耗尽的风险。
但如果你及团队本身对线程池非常熟悉,又确定业务规模不会大到资源耗尽的程度(比如线程数量或任务队列长度可能达到Integer.MAX_VALUE)时,其实是可以使用JDK提供的这几个接口的,它能让我们的代码具有更强的可读性。
线程池的使用
cpu密集型
又叫计算密集型,cpu使用率较高(也就是一些复杂运算,逻辑处理),所以线程数一般只需要cpu核数的线程就可以了。 这一类型在开发中多出现于一些业务复杂计算和逻辑处理过程中。
I/O密集型
cpu使用率较低,程序中会存在大量I/O操作占据时间,导致线程空余时间出来,所以通常就需要开cpu核数的两倍的线程, 当线程进行I/O操作cpu空暇时启用其他线程继续使用cpu,提高cpu使用率
通过上述可以总结出:线程的最佳数目 = ( ( 线程等待时间+线程CPU时间 ) /线程CPU时间 ) * CPU数目*CPU期望利用率
线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
这一类型在开发中主要出现在一些读写操作频繁的业务逻辑中。
I/O 分阻塞I/O和非阻塞I/O,这里的IO密集型指的是阻塞IO,所以对于一些响应要求高的可以对业务预处理,然后异步处理后续逻辑。
影响最大线程数的因素,在不考虑系统本身限制 的情况下,主要跟JVM一下几点有关
- -Xms 初始堆大小 (在实际生产中,一般把-Xms和-Xmx设置成一样的。)
- -Xmx 最大堆大小
- -Xss 每个线程栈大小
系统层面影响:
- /proc/sys/kernel/pid_max 增大,线程数量增大,pid_max有最高值,超过之后不再改变,而且32,64位也不一样
- /proc/sys/kernel/threads-max 系统可以生成最大线程数量
- max_user_process(ulimit -u)centos系统上才有,没有具体研究
- /proc/sys/vm/max_map_count 增大,数量增多
总结 : JVM线程最大数量由JVM的堆(-Xmx,-Xms)大小、Thread的栈(-Xss)内存大小、 系统最大可创建的线程数的限制参数三个方面影响。 不考虑系统限制,可以通过这个公式估算:
线程数量 = (机器本身可用内存 - (JVM分配的堆内存+JVM元数据区)) / Xss的值。
开发中我们经常会使用到线程池来处理一些业务,而在不新增设备的情况下,我们所能使用的线程资源又不是无限的。那么高并发、任务执行时间短的业务怎样使用线程池?还有并发不高、任务执行时间长的业务怎样使用线程池?高并发、业务执行时间长的业务怎样使用线程池?
接下来我们进行一一分析:
-
高并发、任务执行时间短的业务
线程池线程数可以设置为CPU核数+1,减少线程上下文的切换
为什么+1
对于计算密集型的程序,线程数应当等于核心数,但是再怎么计算密集,总有一些IO吧,所以再加一个线程来把等待IO的CPU时间利用起来
在拥有N个处理器的系统上,当线程池的大小为N+1时,通常能实现最优的效率。(即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保CPU的时钟周期不会被浪费。)
-
并发不高、任务执行时间长
-
业务时间集中在I/O操作上,也就是I/O密集型任务
因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以适当加大线程池中的线程数目,让CPU处理更多的业务
-
业务时间集中在计算操作上,也就是计算密集型任务
这个就没办法了,和高并发、任务执行时间短的业务处理一样,线程池中的线程数设置得少一些,减少线程上下文的切换
-
-
高并发、任务执行时间长的业务
线程池的设置参考2
解决这种类型任务的关键不在于线程池而在于整体架构的设计
step1: 针对任务执行时间长
1. 看看这些业务里面某些数据是否能做缓存 2. 看看能不能使用中间件(任务时间过长的可以考虑拆分逻辑放入队列等操作)对任务进行拆分和解耦
step2: 增加服务器(一般政府项目的首先,因为不用对项目工作做大改动,求一个稳,但前提是资金充足)
-
混合型任务
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。
线程池的动态调整
一般我们使用线程池配置参数是写死在xml里的,这样主要存在两个问题:
- 缺乏动态调控能力:无法根据业务实际情况动态调控,如果要改配置,只能改动代码,走发布流程。
- 业务隔离性较差:业务和业务之间缺乏隔离,比如灰度发布一个业务,和原业务使用同一个线程池可能会影响原先的业务。
解决方案
-
对线程池按照业务分组
-
使用配置中心动态调控线程池
主要用到线程池的shudown方法,当获取线程池时如果已经关闭则重新创建一个新的线程池,使用配置好的参数。