Executor与ThreadPoolExecutor的使用

在开发服务端软件项目时,经常需要处理时间很短而数目却非常巨大的请求,如果为每一个请求创建一个新的线程,会导致性能上的瓶颈,因为线程对象的创建和销毁需要JVM频繁的处理,如果执行时间很短,可能花在创建和销毁线程对象上的时间将大于真正执行任务的时间,若这样,系统性能将大大降低。
因此JDK5起提供了线程池的支持,主要作用则时支持高并发的访问处理,并且可以将线程对象进行复用。

Executors

Exwcutor仅仅时一种规范,是一种声明和定义,并没有实现任何的功能,所以需要使用接口的实现类来完成指定的功能,比如ThreadPoolExecutor。ThreadPoolExecutor使用上并不是很方便,实例化时需要传入很多个参数,还要考虑线程并发数等与线程运行效率相关的参数,所以官方建议使用Executors工厂类来创建线程池对象。

newCachedThreadPool()

创建无界线程池,可以进行线程自动回收
所谓“无界”就是指理论上的Integer.MAX_VALUE

1
2
3
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

newCachedThreadPool(ThreadFactory threadFactory)

可通过ThreadFactory定制无界线程池中的线程对象

1
2
3
4
5
6
7
class MyThreadFactory implements ThreadFactory {
int i = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "定制线程对象-" + ++ i);
}
}

newFixedThreadPool(int nThreads)

创建有界线程池,也就是线程池中的线程数可以指定最大值

1
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

增加定制线程功能的有界线程池

newSingleThreadExecutor()

单一线程的线程池,实现队列的方式来执行任务

1
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

newSingleThreadExecutor(ThreadFactory threadFactory)

定制线程功能的单一线程池

ThreadPoolExecutor的使用

常用构造方法

1
2
3
4
5
6
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
corePoolSize:池中保存的线程数,包括空闲线程数
maximumPoolSize:池中允许的最大线程数
keepAliveTime:线程数数量大于corePoolSize时,在没有超过指定时间内时不会从线程池中将空闲线程删除,超过此时间单位,则删除
unit:keepAliveTime的时间单位
workQueue:执行前用于保存任务的队列。此队列仅保持由execute方法提交的Runnable任务

示例注释:
A:execute提交的Runnable任务数量
B:核心线程数corePoolSize
C:最大线程数maximumPoolSize
D:A-B(假设A>=B)
E:new LinkedBlockingDeque();队列,无构造参数
F:new SynchronousQueue();队列
G:keepAliveTime
注:这里E和F时两种类型的队列,指定线程池仅可选择一种

1、如果A<=B,那么马上创建线程运行这个任务,并不放入扩展队列Queue中,其他参数功能忽略
2、如果A>B && A<=C&&E,那么C和G参数忽略,并把D放入E中等待被执行
3、如果A>B && A<=C&&F,那么C和G参数有效,并马上创建线程运行这些任务,而不把D放入F中,D执行完成后在执行时间后放生超时将D进行清除
4、如果A>B && A>C&&E,那么C和G参数忽略,并把D放入E中等待被执行
5、如果A>B && A>C&&F,那么处理C的任务,其他任务则不再处理抛出异常

ThreadPoolExecutor常见的队列
A:ArrayBlockingQueue
B:LinkedBlockingDeque
C:SynchronousQueue
A和B可指定队列存储元素的多少,C是为同步队列,不存储元素

ThreadPoolExecutor拒绝策略
AbortPolicy:当任务添加到线程池中被拒绝时,它将抛出RejectedExecutionException异常
CallerRunsPolicy:当任务添加到线程池中被拒绝时,会使用调用线程池的Thread对象来处理该被决绝的任务
// 也就是说谁发起来这个任务,谁就要负责该任务的执行
DiscardOldestPolicy:当任务添加到线程池被拒绝时,线程池会放弃等待队列中最旧的未处理的任务,然后将被拒绝的任务添加到等待队列中
DiscardPolicy:当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务

注:线程池ThreadPoolExecutor的执行等待队列中的任务是安顺序去除,执行却是乱序的

参数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
getCorePoolSize()获取核心线程数
getMaximunPoolSize()获取最大线程数
getPoolSize()当前池中线程数
getQueue().size()当前等待执行的线程数
getActiveCount()当前有多少个线程正在执行任务
getCompletedTaskCount()获取已经执行完成的任务数
getTaskCount()获取有多少个任务发送给了线程池
shutdown()当前未执行完的线程继续执行,而不再添加新的任务;方法不会阻塞
shutdownNow()中断所有的任务Task
// 并且抛出InterruptedException异常
// 前提时Runnable中使用if (Thread.currentThread().isInterrupted() == true)语句来判断当前线程的中断状态
// 而在队列中等待执行的线程不再执行,也就是从等待队列中清除
// 如果线程中没有if (Thread.currentThread().isInterrupted() == true)语句及抛出异常的代码,正在执行的线程会正常执行完毕
// 该方法还会返回被清除的线程列表
isShutdown()判断线程池是否已经关闭
isTerminating()可以理解为线程池是否正在关闭
// 当调用shutdown()或shutdownNow()之后处于正在终止但尚未完全终止的过程中时,该方法返回true
isTerminated()线程池是否已经关闭
awaitTermination(long timeout, TimeUnit unit)等待指定时间内,返回线程池是否已经终止工作,
setRejectedExecutionHandler(RejectedExecutionHandler handler)设置任务被拒绝执行时的策略
allowCoreThreadTimeOut(boolean value)设置核心线程数是否也有超时效果
prestartCoreThread()每调用一次就创建一个核心线程,返回是否创建成功
prestartAllCoreThreads()启用全部核心线程,返回实际启动的核心线程数量
// 线程池可重写以下两个方法对线程池中执行的线程对象实现监控
beforeExecute(Thread t, Runnable r)任务执行前
afterExecute(Runnable r, Throwable t)任务执行完成后
remove(Runnable task)删除尚未被执行的Runnable任务
// 但是这个方法需要注意的是,如果通过execute()方法进行提交的任务,且在等待队列中时,才可被删除
// 如果时通过submit()方法提交的任务,且在等待队列中时,不能被删除【原因时submit方法对任务进行了包装】