0%

【Java】ExecutorService 线程池

并发编程 ExecutorService 线程池的使用

ExecutorService

介绍

接口 java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行。
在 java.util.concurrent 包中的 ExecutorService 的实现就是一个线程池的实现。

关系?

ExecutorService接口继承了Executor接口,定义了一些生命周期的方法。

why?

许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务,每当一个请求到达就创建一个新线程,然后在新线程中为请求服务,但是频繁创建新线程、销毁新线程、线程切换既花费较多的时间,影响相应速度,又消耗大量的系统资源,且有时服务器无法处理过多请求导致崩溃。

一种情形:假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。

ExecutorService是一个线程池,请求到达时,线程已经存在,响应延迟低,多个任务复用线程,避免了线程的重复创建和销毁,并且可以规定线程数目,请求数目超过阈值时强制其等待直到有空闲线程。

优缺


  • 重用存在的线程,减少对象创建、消亡的开销,性能佳。
    可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
    提供定时执行、定期执行、单线程、并发数控制等功能。

  • 上手难度比较高,要求基础扎实。

场景

当我们有大量短小的任务需要多线程来完成时,将任务(实现Runnable、callable接口、继承Thread类的对象)提交给ExecutorService,以提高服务器性能。

使用

构造方法

ThreadPoolExecutor

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
....
}
name explain
corePoolSize 核心线程数,一旦创建将不会再释放
如果创建的线程数还没有达到指定的核心线程数量,将会继续创建新的核心线程,直到达到最大核心线程数后,核心线程数将不在增加;
如果没有空闲的核心线程,同时又未达到最大线程数,则将继续创建非核心线程;如果核心线程数等于最大线程数,则当核心线程都处于激活状态时,任务将被挂起,等待有空闲线程时再执行。
maximumPoolSize 最大线程数,允许创建的最大线程数量
如果最大线程数等于核心线程数,则无法创建非核心线程;
如果非核心线程处于空闲时,超过设置的空闲时间,则将被回收,释放占用的资源。
keepAliveTime 当线程空闲时,所允许保存的最大时间
超过这个时间,线程将被释放销毁,但只针对于非核心线程。
unit 时间单位,TimeUnit.SECONDS等。
workQueue 任务队列,用于保存等待执行的任务的阻塞队列。
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,必须设置容量。此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,可以设置容量,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入offer操作必须等到另一个线程调用移除poll操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
threadFactory 线程工厂,用于创建线程。
handler 线程边界和队列容量已经达到最大时,用于处理阻塞时的程序

newCachedThreadPool

可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

通过它的创建方式可以知道,创建的都是非核心线程,而且最大线程数为Interge的最大值,空闲线程存活时间是1分钟。
SynchronousQueue队列,一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作。
所以,当我们提交第一个任务的时候,是加入不了队列的。这就满足了,一个线程池条件“当无法加入队列的时候,且任务没有达到maxsize时,我们将新开启一个线程任务”。
即当线程不够用的时候会不断创建新线程,如果线程无限增长,会导致内存溢出。
所以我们的maxsize是big big。时间是60s,当一个线程没有任务执行会暂时保存60s超时时间,如果没有的新的任务的话,会从cache中remove掉。
因此长时间不提交任务的CachedThreadPool不会占用系统资源。就是缓冲区为1的生产者消费者模式。

newSingleThreadExecutor

单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newFixedThreadPool

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 /**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

coresize和maxmumsize相同,超时时间为0,队列用的LinkedBlockingQueue无界的FIFO队列,如果队列里面有线程任务的话就从队列里面取出线程,然后开启一个新的线程开始执行。
很明显,这个线程池始终只有size的线程在运行,大小固定,难以扩展。

newScheduledThreadPool

定长线程池,支持定时及周期性任务执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically. (Note
* however that if this single thread terminates due to a failure
* during execution prior to shutdown, a new one will take its
* place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task
* will be active at any given time. Unlike the otherwise
* equivalent {@code newScheduledThreadPool(1, threadFactory)}
* the returned executor is guaranteed not to be reconfigurable to
* use additional threads.
* @param threadFactory the factory to use when creating new
* threads
* @return a newly created scheduled executor
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory));
}

创建线程的时机

  • 运行的线程少于 corePoolSize,
    Executor 始终首选添加新的线程,而不进行排队。(即如果当前运行的线程小于corePoolSize,则任务根本不会添加到workQueue中)
  • 运行的线程等于或多于 corePoolSize,
    Executor 始终首选将请求加入工作队列,而不添加新的线程。
  • 无法将请求加入workQueue(但是队列已满),
    则创建新的线程,除非创建此线程超出 maximumPoolSize,如果超过,在这种情况下,新的任务将被拒绝。

关闭线程池

执行程序时发现,所有线程执行完毕后,JVM并未结束运行,也就说明线程池没有正常结束。怎样正确关闭线程池呢?

调用 Executor 的 shutdown() 方法会等待线程都执行完毕之后再关闭,但是如果调用的是 shutdownNow() 方法,则相当于调用每个线程的 interrupt() 方法。

如果只执行shutdown(),线程池会等待所有线程全部结束才终止线程池。且!执行shutdown()后,就不能再继续使用ExecutorService来追加新的任务了,如果继续调用execute/submit方法执行新的任务的话,就会抛出RejectedExecutionException异常。

所以调用顺序一般为

  1. shutdown() 停止接收新的任务
  2. awaitTermination() , 判断任务是否执行完毕或者是否在指定时间内
  3. shutdownNow()停止接收新的任务

Demo

创建定长线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {
public static void main(String[] args) throws InterruptedException {
int[] a = new int[1];
//创建一个容量为5的线程池

ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 100; i++) {
//向线程池提交一个任务(其实就是通过线程池来启动一个线程)
System.out.println("添加线程:" + i);
executorService.execute(new TestRunnable(a, i));
}

}
}

class TestRunnable extends Thread {
public int i;
public int[] count;

TestRunnable(int[] a, int i) {
this.count = a;
this.i = i;
}

@Override
public void run() {
count[0] = count[0] + 1;
System.out.println("i: " + i + "\t" + Thread.currentThread().getName() + "-线程被调用了"+ " count值为:" + count[0]);
}
}

关闭线程池1

指定时间关闭
未完成的线程会抛出异常 java.lang.InterruptedException: sleep interrupted

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Test {
public static void main(String[] args) {

ExecutorService pool = Executors.newFixedThreadPool(5);

Runnable task1 = new Runnable() {
public void run() {
try {
System.out.println("task1 start" + Thread.currentThread().getName());
Thread.sleep(8000);
System.out.println("task1 end" + Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println("task1 interrupted: " + e);
}
}
};

Runnable task2 = new Runnable() {
public void run() {
try {
System.out.println("task2 start" + Thread.currentThread().getName());
Thread.sleep(1000);
System.out.println("task2 end" + Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println("task2 interrupted: " + e);
}
}
};
//消耗时间很长的任务 8秒
pool.execute(task1);

//消耗时间1秒
for (int i = 0; i < 1000; ++i) {
pool.execute(task2);
}
System.out.println("当前队列大小:" + ((ThreadPoolExecutor) pool).getQueue().size());

try {
// 告诉线程池,如果所有任务执行完毕则关闭线程池
pool.shutdown();

// 判断线程池是否在限定时间内,或者线程池内线程全部结束
if (!pool.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
// 超时的时候向线程池中所有的线程发出中断(interrupted)。
System.out.println("当前队列大小:" + ((ThreadPoolExecutor) pool).getQueue().size());
pool.shutdownNow();
}
} catch (InterruptedException e) {
System.out.println("awaitTermination interrupted: " + e);
}

System.out.println("end");
}
}

关闭线程池2

所有任务全部完成时关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class Test {
public static void main(String[] args) {

ExecutorService pool = Executors.newFixedThreadPool(5);

Runnable task1 = new Runnable() {
public void run() {
try {
System.out.println("task1 start\t" + Thread.currentThread().getName());
Thread.sleep(8000);
System.out.println("task1 end\t" + Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println("task1 interrupted: " + e);
}
}
};

Runnable task2 = new Runnable() {
public void run() {
try {
System.out.println("task2 start\t" + Thread.currentThread().getName());
Thread.sleep(1000);
System.out.println("task2 end\t" + Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println("task2 interrupted: " + e);
}
}
};
//消耗时间很长的任务 8秒
pool.execute(task1);

//消耗时间1秒
for (int i = 0; i < 100; ++i) {
pool.execute(task2);
}
System.out.println("当前队列大小:" + ((ThreadPoolExecutor) pool).getQueue().size());

// 告诉线程池,如果所有任务执行完毕则关闭线程池
pool.shutdown();
//循环判断线程池是否结束
while (true) {
System.out.println("当前队列大小:" + ((ThreadPoolExecutor) pool).getQueue().size());
if (pool.isTerminated()) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.println("end");
}
}

创建可缓存线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {
public static void main(String[] args) throws InterruptedException {
int[] a = new int[1];
//创建一个容量为5的线程池

ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
//向线程池提交一个任务(其实就是通过线程池来启动一个线程)
System.out.println("添加线程:" + i);
executorService.execute(new TestRunnable(a, i));
}

}
}

class TestRunnable extends Thread {
public int i;
public int[] count;

TestRunnable(int[] a, int i) {
this.count = a;
this.i = i;
}

@Override
public void run() {
count[0] = count[0] + 1;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("i: " + i + "\t" + Thread.currentThread().getName() + "-线程被调用了"+ " count值为:" + count[0]);
}
}