美团技术团队关于线程池的文章
线程池的使用场景#
java中经常需要用到多线程来处理一些业务,我们非常不建议单纯使用继承Thread或者实现Runnable接口的方式来创建线程,那样势必有创建及销毁线程耗费资源、线程上下文切换问题。同时创建过多的线程也可能引发资源耗尽的风险,这个时候引入线程池比较合理,方便线程任务的管理。java中涉及到线程池的相关类均在jdk1.5开始的java.util.concurrent包中,涉及到的几个核心类及接口包括:Executor、Executors、ExecutorService、ThreadPoolExecutor、FutureTask、Callable、Runnable等。
加快请求响应(响应时间优先)
比如用户在饿了么上查看某商家外卖,需要聚合商品库存、店家、价格、红包优惠等等信息返回给用户,接口逻辑涉及到聚合、级联等查询,从这个角度来看接口返回越快越好,那么就可以使用多线程方式,把聚合/级联查询等任务采用并行方式执行,从而缩短接口响应时间。这种场景下使用线程池的目的就是为了缩短响应时间,往往不去设置队列去缓冲并发的请求,而是会适当调高corePoolSize和maxPoolSize去尽可能的创造线程来执行任务。
加快处理大任务(吞吐量优先)
比如业务中台每10分钟就调用接口统计每个系统/项目的PV/UV等指标然后写入多个sheet页中返回,这种情况下往往也会使用多线程方式来并行统计。和时间优先场景不同,这种场景的关注点不在于尽可能快的返回,而是关注利用有限的资源尽可能的在单位时间内处理更多的任务,即吞吐量优先。这种场景下我们往往会设置队列来缓冲并发任务,并且设置合理的corePoolSize和maxPoolSize参数,这个时候如果设置了太大的corePoolSize和maxPoolSize可能还会因为线程上下文频繁切换降低任务处理速度,从而导致吞吐量降低。
以上两种使用场景和JVM里的ParallelScavenge和CMS垃圾收集器有较大的类比性,
ParallelScavenge垃圾收集器关注点在于达到可观的吞吐量,而CMS垃圾收集器重点关注尽可能缩短GC停顿时间。
线程池的创建及重要参数#
线程池可以自动创建也可以手动创建,自动创建体现在Executors工具类中,常见的可以创建
newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor、newScheduledThreadPool;手动创建体现在可以灵活设置线程池的各个参数,体现在代码中即ThreadPoolExecutor类构造器上各个实参的不同:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//设置了corePoolSize=maxPoolSize,keepAliveTime=0(此时该参数没作用),无界队列,任务可以无限放入,当请求过多时(任务处理速度跟不上任务提交速度造成请求堆积)可能导致占用过多内存或直接导致OOM异常
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L,
TimeUnit.MILLISECONDS , new LinkedBlockingQueue());
}
//基本同newFixedThreadPool,但是将线程数设置为了1,单线程,弊端和newFixedThreadPool一致
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS , new LinkedBlockingQueue()));
}
//maxPoolSize为int的最大值,同步移交队列,也就是说不维护常驻线程(核心线程),每次来请求直接创建新线程来处理任务,也不使用队列缓冲,会自动回收多余线程,由于将maxPoolSize设置成Integer.MAX_VALUE,当请求很多时就可能创建过多的线程,导致资源耗尽OOM
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS , new SynchronousQueue());
}
//支持定时周期性执行,使用的是延迟队列,弊端同newCachedThreadPool一致
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
return new ScheduledThreadPoolExecutor(var0);
}
所以根据上面分析我们可以看到, FixedThreadPool和SigleThreadExecutor中之所以用LinkedBlockingQueue无界队列, 是因为设置了corePoolSize=maxPoolSize, 线程数无法动态扩展, 于是就设置了无界阻塞队列来应对不可知的任务量; 而CachedThreadPool则使用的是SynchronousQueue同步移交队列, 为什么使用这个队列呢? 因为CachedThreadPool设置了corePoolSize=0, maxPoolSize=Integer.MAX_VALUE , 来一个任务就创建一个线程来执行任务, 用不到队列来存储任务; SchduledThreadPool用的是延迟队列DelayedWorkQueue。 在实际项目开发中也是推荐使用手动创建线程池的方式, 而不用默认方式, 关于这点在《 阿里巴巴开发规范》 中是这样描述的:
【 强制】 线程池不允许使用 Executors 去创建, 而是通过 ThreadPoolExecutor 的方式, 这样的处理方式让写的同学更加明确线程池的运行规则, 规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE , 可能会堆积大量的请求, 从而导致 OOM。
2) CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE , 可能会创建大量的线程, 从而导致 OOM
copy
1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {…… }
copy
ThreadPoolExecutor中重要的几个参数详解
corePoolSize:核心线程数,也是线程池中常驻的线程数,线程池初始化时默认是没有线程的,当任务来临时才开始创建线程去执行任务
maximumPoolSize:最大线程数,在核心线程数的基础上可能会额外增加一些非核心线程,需要注意的是只有当workQueue队列填满时才会创建多于corePoolSize的线程(线程池总线程数不超过maxPoolSize)
keepAliveTime:非核心线程的空闲时间超过keepAliveTime就会被自动终止回收掉,注意当corePoolSize=maxPoolSize时,keepAliveTime参数也就不起作用了(因为不存在非核心线程);
unit:keepAliveTime的时间单位
workQueue:用于保存任务的队列,可以为无界、有界、同步移交三种队列类型之一,当池子里的工作线程数大于corePoolSize时,这时新进来的任务会被放到队列中
threadFactory:创建线程的工厂类,默认使用Executors.defaultThreadFactory(),也可以使用guava库的ThreadFactoryBuilder来创建
handler:线程池无法继续接收任务(队列已满且线程数达到maximunPoolSize)时的饱和策略,取值有AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy
线程池中的线程创建流程图#
举例:现有一个线程池,corePoolSize=10,maxPoolSize=20,队列长度为100,那么当任务过来会先创建10个核心线程数,接下来进来的任务会进入到队列中直到队列满了,会创建额外的线程来执行任务(最多20个线程),这个时候如果再来任务就会执行拒绝策略。
workQueue队列#
SynchronousQueue(同步移交队列):队列不作为任务的缓冲方式,可以简单理解为队列长度为零
LinkedBlockingQueue(无界队列):队列长度不受限制,当请求越来越多时(任务处理速度跟不上任务处理速度造成请求堆积)可能导致内存占用过多或OOM
ArrayBlockintQueue(有界队列):队列长度受限,当队列满了就需要创建多余的线程来执行任务
handler拒绝策略#
AbortPolicy:中断抛出异常
DiscardPolicy:默默丢弃任务,不进行任何通知
DiscardOldestPolicy:丢弃掉在队列中存在时间最久的任务
CallerRunsPolicy:让提交任务的线程去执行任务(对比前三种比较友好一丢丢)
关闭线程池#
shutdownNow():立即关闭线程池(暴力),正在执行中的及队列中的任务会被中断,同时该方法会返回被中断的队列中的任务列表
shutdown():平滑关闭线程池,正在执行中的及队列中的任务能执行完成,后续进来的任务会被执行拒绝策略
isTerminated():当正在执行的任务及对列中的任务全部都执行(清空)完就会返回true
线程池实现线程复用的原理#
手动创建线程池(推荐)#
那么上面说了使用Executors工具类创建的线程池有隐患,那如何使用才能避免这个隐患呢?对症下药,建立自己的线程工厂类,灵活设置关键参数:
这里默认拒绝策略为AbortPolicy
1
private static ExecutorService executor = new ThreadPoolExecutor(10,10,60L, TimeUnit.SECONDS ,new ArrayBlockingQueue(10));
copy
使用guava包中的ThreadFactoryBuilder工厂类来构造线程池:
1
2
3
4
5
//通过guava的ThreadFactory工厂类可以指定线程组名称,这对后期定位错误有帮助
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat ("thread-pool-d%" ).build ();
private static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 60L,
TimeUnit.SECONDS , new ArrayBlockingQueue<Runnable>(10),
threadFactory, new ThreadPoolExecutor.AbortPolicy ());
copy
Springboot中使用线程池#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//方式一:自定义线程池配置类
@Configuration
public class ThreadPoolConfig {
/**
* 配置线程池(方法名应改成模块名字)
* @return
*/
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
int corePoolSize = Runtime.getRuntime ().availableProcessors ();
return new ThreadPoolExecutor(
Integer.valueOf (corePoolSize),
Integer.valueOf (corePoolSize * 2),
Integer.valueOf (60),
TimeUnit.SECONDS ,
new LinkedBlockingQueue(Integer.valueOf (1000)),
new ZzwThreadFactory("zzw" ),//改成模块的名字
new ThreadPoolExecutor.CallerRunsPolicy ());
}
/**
* 构建线程池工厂(类名改成模块名工厂)
*/
static class ZzwThreadFactory implements ThreadFactory {
private ThreadGroup group;
private String namePrefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
ZzwThreadFactory(String name) {
group = Thread.currentThread ().getThreadGroup ();
if (null == name || "" .equals (name.trim ())){
name = "pool" ;
}
// 命名方式:pool(线程池名字)-1(线程池数)-thread-1(线程数) :pool-1-thread-1
this .namePrefix = name +"-" + threadNumber.getAndIncrement () + "-thread" ;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + "-" + threadNumber.getAndIncrement (), 0);
//设置为非守护线程
if (t.isDaemon ()) {
t.setDaemon (false );
}
//设置为非守护线程
if (t.getPriority () != Thread.NORM_PRIORITY ) {
t.setPriority (Thread.NORM_PRIORITY );
}
return t;
}
}
}
copy
使用方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Resource(name = "threadPoolExecutor" )
private ThreadPoolExecutor threadPoolExecutor;
//使用匿名函数
threadPoolExecutor.execute (new Runnable() {
@Override
public void run() {
//TODO
}
});
//用lambda表达式简写
threadPoolExecutor.execute (() -> {
// TODO
});
copy
方法二:
1
2
3
4
5
6
7
8
9
10
@Configuration
public class ThreadPoolConfig {
@Bean(value = "threadPoolInstance" )
public ExecutorService createThreadPoolInstance() {
//通过guava类库的ThreadFactoryBuilder来实现线程工厂类并设置线程名称
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat ("thread-pool-%d" ).build ();
ExecutorService threadPool = new ThreadPoolExecutor(10, 16, 60L, TimeUnit.SECONDS , new ArrayBlockingQueue<Runnable>(100), threadFactory, new ThreadPoolExecutor.AbortPolicy ());
return threadPool;
}
}
copy
使用方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//通过name=threadPoolInstance引用线程池实例
@Resource(name = "threadPoolInstance" )
private ExecutorService executorService;
//使用匿名函数
executorService.execute (new Runnable() {
@Override
public void run() {
//TODO
}
});
//匿名函数,用lambda简写后就是这样
operationThreadPoolExecutor.execute (() -> {
// TODO
});
copy
其它相关#
在ThreadPoolExecutor类中有两个比较重要的方法引起了我们的注意:beforeExecute和afterExecute
1
2
protected void beforeExecute(Thread var1, Runnable var2) {}
protected void afterExecute(Runnable var1, Throwable var2) {}
copy
这两个方法是protected修饰的,很显然是留给开发人员去重写方法体实现自己的业务逻辑,非常适合做钩子函数,在任务run方法的前后增加业务逻辑,比如添加日志、统计等。这个和我们springmvc中拦截器的preHandle和afterCompletion方法很类似,都是对方法进行环绕,类似于spring的AOP,参考下图:
Callable和Runnable#
Runnable和Callable都可以理解为任务,里面封装着任务的具体逻辑,用于提交给线程池执行,区别在于Runnable任务执行没有返回值,且Runnable任务逻辑中不能通过throws抛出cheched异常(但是可以try catch),而Callable可以获取到任务的执行结果返回值且抛出checked异常。
1
2
3
4
5
6
7
8
9
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
copy
Future和FutureTask#
Future接口用来表示执行异步任务的结果存储器,当一个任务的执行时间过长就可以采用这种方式:把任务提交给子线程去处理,主线程不用同步等待,当向线程池提交了一个Callable或Runnable任务时就会返回Future,用Future可以获取任务执行的返回结果。
Future的主要方法包括:
get()方法:返回任务的执行结果,若任务还未执行完,则会一直阻塞直到完成为止,如果执行过程中发生异常,则抛出异常,但是主线程是感知不到并且不受影响的,除非调用get()方法进行获取结果则会抛出ExecutionException异常;
get(long timeout, TimeUnit unit):在指定时间内返回任务的执行结果,超时未返回会抛出TimeoutException,这个时候需要显式的取消任务;
cancel(boolean mayInterruptIfRunning):取消任务,boolean类型入参表示如果任务正在运行中是否强制中断;
isDone():判断任务是否执行完毕,执行完毕不代表任务一定成功执行,比如任务执行失但也执行完毕、任务被中断了也执行完毕都会返回true,它仅仅表示一种状态说后面任务不会再执行了;
isCancelled():判断任务是否被取消;
Future的用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool (10);
Future<Integer> future = executorService.submit (new Task());
Integer integer = future.get ();
System.out .println (integer);
executorService.shutdown ();
}
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out .println ("子线程开始计算" );
int sum = 0;
for (int i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}
}
copy
FutureTask的用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool (10);
FutureTask<Integer> futureTask = new FutureTask<>(new Task());
executorService.submit (futureTask);
Integer integer = futureTask.get ();
System.out .println (integer);
executorService.shutdown ();
}
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out .println ("子线程开始计算" );
int sum = 0;
for (int i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}
}
copy
参考
线程池架构图#
Executor#
它是执行者
接口,它是来执行任务的。准确的说,Executor提供了execute()接口来执行已提交的 Runnable 任务的对象。Executor存在的目的是提供一种将"任务提交"与"任务如何运行"分离开来的机制。
它只包含一个函数接口:
void execute(Runnable command)
注意区分Executors
和Executor
。
ExecutorService#
ExecutorService继承于Executor。它是"执行者服务"接口,它是为"执行者接口Executor"服务而存在的;准确的话,ExecutorService提供了"将任务提交给执行者的接口(submit方法)",“让执行者执行任务(invokeAll, invokeAny方法)“的接口等等。
ExecutorService的函数列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
boolean awaitTermination(long timeout, TimeUnit unit)
// 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
// 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
// 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
// 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
// 如果此执行程序已关闭,则返回 true。
boolean isShutdown()
// 如果关闭后所有任务都已完成,则返回 true。
boolean isTerminated()
// 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
void shutdown()
// 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
List<Runnable> shutdownNow()
// 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
<T> Future<T> submit(Callable<T> task)
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
Future<?> submit(Runnable task)
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
<T> Future<T> submit(Runnable task, T result)
copy
AbstractExecutorService#
AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。
AbstractExecutorService存在的目的是为ExecutorService中的函数接口提供了默认实现。
ThreadPoolExecutor#
ThreadPoolExecutor就是大名鼎鼎的"线程池”。它继承于AbstractExecutorService抽象类。
ThreadPoolExecutor函数列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 用给定的初始参数和默认的线程工厂及被拒绝的执行处理程序创建新的 ThreadPoolExecutor。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
// 用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
// 用给定的初始参数和默认被拒绝的执行处理程序创建新的 ThreadPoolExecutor。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
// 用给定的初始参数创建新的 ThreadPoolExecutor。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
// 基于完成执行给定 Runnable 所调用的方法。
protected void afterExecute(Runnable r, Throwable t)
// 如果在保持活动时间内没有任务到达,新任务到达时正在替换(如果需要),则设置控制核心线程是超时还是终止的策略。
void allowCoreThreadTimeOut(boolean value)
// 如果此池允许核心线程超时和终止,如果在 keepAlive 时间内没有任务到达,新任务到达时正在替换(如果需要),则返回 true。
boolean allowsCoreThreadTimeOut()
// 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
boolean awaitTermination(long timeout, TimeUnit unit)
// 在执行给定线程中的给定 Runnable 之前调用的方法。
protected void beforeExecute(Thread t, Runnable r)
// 在将来某个时间执行给定任务。
void execute(Runnable command)
// 当不再引用此执行程序时,调用 shutdown。
protected void finalize()
// 返回主动执行任务的近似线程数。
int getActiveCount()
// 返回已完成执行的近似任务总数。
long getCompletedTaskCount()
// 返回核心线程数。
int getCorePoolSize()
// 返回线程保持活动的时间,该时间就是超过核心池大小的线程可以在终止前保持空闲的时间值。
long getKeepAliveTime(TimeUnit unit)
// 返回曾经同时位于池中的最大线程数。
int getLargestPoolSize()
// 返回允许的最大线程数。
int getMaximumPoolSize()
// 返回池中的当前线程数。
int getPoolSize()
// 返回此执行程序使用的任务队列。
BlockingQueue<Runnable> getQueue()
// 返回用于未执行任务的当前处理程序。
RejectedExecutionHandler getRejectedExecutionHandler()
// 返回曾计划执行的近似任务总数。
long getTaskCount()
// 返回用于创建新线程的线程工厂。
ThreadFactory getThreadFactory()
// 如果此执行程序已关闭,则返回 true。
boolean isShutdown()
// 如果关闭后所有任务都已完成,则返回 true。
boolean isTerminated()
// 如果此执行程序处于在 shutdown 或 shutdownNow 之后正在终止但尚未完全终止的过程中,则返回 true。
boolean isTerminating()
// 启动所有核心线程,使其处于等待工作的空闲状态。
int prestartAllCoreThreads()
// 启动核心线程,使其处于等待工作的空闲状态。
boolean prestartCoreThread()
// 尝试从工作队列移除所有已取消的 Future 任务。
void purge()
// 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
boolean remove(Runnable task)
// 设置核心线程数。
void setCorePoolSize(int corePoolSize)
// 设置线程在终止前可以保持空闲的时间限制。
void setKeepAliveTime(long time, TimeUnit unit)
// 设置允许的最大线程数。
void setMaximumPoolSize(int maximumPoolSize)
// 设置用于未执行任务的新处理程序。
void setRejectedExecutionHandler(RejectedExecutionHandler handler)
// 设置用于创建新线程的线程工厂。
void setThreadFactory(ThreadFactory threadFactory)
// 按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。
void shutdown()
// 尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。
List<Runnable> shutdownNow()
// 当 Executor 已经终止时调用的方法。
protected void terminated()
copy
ScheduledExecutorService#
ScheduledExecutorService是一个接口,它继承于于ExecutorService。它相当于提供了"延时"和"周期执行"功能的ExecutorService。
ScheduledExecutorService提供了相应的函数接口,可以安排任务在给定的延迟后执行,也可以让任务周期的执行。
ScheduledExecutorService函数列表
1
2
3
4
5
6
7
8
// 创建并执行在给定延迟后启用的 ScheduledFuture。
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
// 创建并执行在给定延迟后启用的一次性操作。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
// 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
copy
ScheduledThreadPoolExecutor#
ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,并且实现了ScheduledExecutorService接口。它相当于提供了"延时"和"周期执行"功能的ScheduledExecutorService。
ScheduledThreadPoolExecutor类似于Timer,但是在高并发程序中,ScheduledThreadPoolExecutor的性能要优于Timer。
ScheduledThreadPoolExecutor函数列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)
// 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
// 使用给定的初始参数创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
// 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)
// 修改或替换用于执行 callable 的任务。
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task)
// 修改或替换用于执行 runnable 的任务。
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task)
// 使用所要求的零延迟执行命令。
void execute(Runnable command)
// 获取有关在此执行程序已 shutdown 的情况下、是否继续执行现有定期任务的策略。
boolean getContinueExistingPeriodicTasksAfterShutdownPolicy()
// 获取有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy()
// 返回此执行程序使用的任务队列。
BlockingQueue<Runnable> getQueue()
// 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
boolean remove(Runnable task)
// 创建并执行在给定延迟后启用的 ScheduledFuture。
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
// 创建并执行在给定延迟后启用的一次性操作。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
// 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
// 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有定期任务的策略。
void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
// 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。
void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)
// 在以前已提交任务的执行中发起一个有序的关闭,但是不接受新任务。
void shutdown()
// 尝试停止所有正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表。
List<Runnable> shutdownNow()
// 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
<T> Future<T> submit(Callable<T> task)
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
Future<?> submit(Runnable task)
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
<T> Future<T> submit(Runnable task, T result)
copy
Executors#
Executors是个静态工厂类。它通过静态工厂方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。
Executors函数列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 返回 Callable 对象,调用它时可运行给定特权的操作并返回其结果。
static Callable<Object> callable(PrivilegedAction<?> action)
// 返回 Callable 对象,调用它时可运行给定特权的异常操作并返回其结果。
static Callable<Object> callable(PrivilegedExceptionAction<?> action)
// 返回 Callable 对象,调用它时可运行给定的任务并返回 null。
static Callable<Object> callable(Runnable task)
// 返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果。
static <T> Callable<T> callable(Runnable task, T result)
// 返回用于创建新线程的默认线程工厂。
static ThreadFactory defaultThreadFactory()
// 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
static ExecutorService newCachedThreadPool()
// 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程。
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
// 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
static ExecutorService newFixedThreadPool(int nThreads)
// 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程。
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
// 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
// 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
// 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
static ExecutorService newSingleThreadExecutor()
// 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程,并在需要时使用提供的 ThreadFactory 创建新线程。
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
// 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
static ScheduledExecutorService newSingleThreadScheduledExecutor()
// 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
// 返回 Callable 对象,调用它时可在当前的访问控制上下文中执行给定的 callable 对象。
static <T> Callable<T> privilegedCallable(Callable<T> callable)
// 返回 Callable 对象,调用它时可在当前的访问控制上下文中,使用当前上下文类加载器作为上下文类加载器来执行给定的 callable 对象。
static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable)
// 返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限。
static ThreadFactory privilegedThreadFactory()
// 返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法。
static ExecutorService unconfigurableExecutorService(ExecutorService executor)
// 返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法。
static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor)
copy
线程池示例#
下面通过示例来对线程池的使用做简单演示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
// 创建一个可重用固定线程数的线程池
ExecutorService pool = Executors.newFixedThreadPool (2);
// 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread ta = new MyThread();
Thread tb = new MyThread();
Thread tc = new MyThread();
Thread td = new MyThread();
Thread te = new MyThread();
// 将线程放入池中进行执行
pool.execute (ta);
pool.execute (tb);
pool.execute (tc);
pool.execute (td);
pool.execute (te);
// 关闭线程池
pool.shutdown ();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out .println (Thread.currentThread ().getName ()+ " is running." );
}
}
copy
运行结果
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
结果说明:
主线程中创建了线程池pool,线程池的容量是2。即,线程池中最多能同时运行2个线程。
紧接着,将ta、tb、tc、td、te这5个线程添加到线程池中运行。
最后,通过shutdown()关闭线程池。
线程池原理#
ThreadPoolExecutor#
ThreadPoolExecutor是线程池类。对于线程池,可以通俗的将它理解为存放一定数量线程的一个线程集合
。
线程池允许多个线程同时运行,允许同时运行的线程数量就是线程池的容量;当添加的到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。
线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。
ThreadPoolExecutor的数据结构如下图所示:
各个数据在ThreadPoolExecutor.java中的定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 阻塞队列。
private final BlockingQueue<Runnable> workQueue;
// 互斥锁
private final ReentrantLock mainLock = new ReentrantLock();
// 线程集合。一个Worker对应一个线程。
private final HashSet<Worker> workers = new HashSet<Worker>();
// “终止条件”,与“mainLock”绑定。
private final Condition termination = mainLock.newCondition ();
// 线程池中线程数量曾经达到过的最大值。
private int largestPoolSize;
// 已完成任务数量
private long completedTaskCount;
// ThreadFactory对象,用于创建线程。
private volatile ThreadFactory threadFactory;
// 拒绝策略的处理句柄。
private volatile RejectedExecutionHandler handler;
// 保持线程存活时间。
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
// 核心池大小
private volatile int corePoolSize;
// 最大池大小
private volatile int maximumPoolSize;
copy
workers
workers是HashSet<Work>
类型,即它是一个Worker集合。而一个Worker对应一个线程,也就是说线程池通过workers包含了"一个线程集合”。当Worker对应的线程池启动时,它会执行线程池中的任务;当执行完一个任务后,它会从线程池的阻塞队列中取出一个阻塞的任务来继续运行。
wokers的作用是,线程池通过它实现了"允许多个线程同时运行"。
workQueue
workQueue是BlockingQueue类型,即它是一个阻塞队列。当线程池中的线程数超过它的容量的时候,线程会进入阻塞队列进行阻塞等待。
通过workQueue,线程池实现了阻塞功能。
mainLock
mainLock是互斥锁,通过mainLock实现了对线程池的互斥访问。
corePoolSize和maximumPoolSize
corePoolSize是"核心池大小",maximumPoolSize是"最大池大小"。它们的作用是调整"线程池中实际运行的线程的数量"。
例如,当新任务提交给线程池时(通过execute方法)。
如果此时,线程池中运行的线程数量< corePoolSize,则创建新线程来处理请求。
如果此时,线程池中运行的线程数量> corePoolSize,但是却< maximumPoolSize;则仅当阻塞队列满时才创建新线程。
如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心池大小和最大池大小的值是在创建线程池设置的;但是,也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。
poolSize
poolSize是当前线程池的实际大小,即线程池中任务的数量。
allowCoreThreadTimeOut和keepAliveTime
allowCoreThreadTimeOut表示是否允许"线程在空闲状态时,仍然能够存活";而keepAliveTime是当线程池处于空闲状态的时候,超过keepAliveTime时间之后,空闲的线程会被终止。
threadFactory
threadFactory是ThreadFactory对象。它是一个线程工厂类,“线程池通过ThreadFactory创建线程”。
handler
handler是RejectedExecutionHandler类型。它是"线程池拒绝策略"的句柄,也就是说"当某任务添加到线程池中,而线程池拒绝该任务时,线程池会通过handler进行相应的处理"。
综上所说,线程池通过workers来管理"线程集合",每个线程在启动后,会执行线程池中的任务;当一个任务执行完后,它会从线程池的阻塞队列中取出任务来继续运行。
阻塞队列是管理线程池任务的队列,当添加到线程池中的任务超过线程池的容量时,该任务就会进入阻塞队列进行等待。
线程池调度#
我们通过下面的图看看下面线程池中任务的调度策略,加深对线程池的理解。
图01
图02
说明:
在"图-01"中,线程池中有N个任务。“任务1”, “任务2”, “任务3"这3个任务在执行,而"任务4"到"任务N"在阻塞队列中等待。
正在执行的任务,在workers集合中,workers集合包含3个Worker,每一个Worker对应一个Thread线程,Thread线程每次处理一个任务。
当workers集合中处理完某一个任务之后,会从阻塞队列中取出一个任务来继续执行,如图-02所示。
图-02表示"任务1"处理完毕之后,线程池将"任务4"从阻塞队列中取出,放到workers中进行处理。
线程池示例#
在分析线程池之前,先看一个简单的线程池示例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
// 创建一个可重用固定线程数的线程池
ExecutorService pool = Executors.newFixedThreadPool (2);
// 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread ta = new MyThread();
Thread tb = new MyThread();
Thread tc = new MyThread();
Thread td = new MyThread();
Thread te = new MyThread();
// 将线程放入池中进行执行
pool.execute (ta);
pool.execute (tb);
pool.execute (tc);
pool.execute (td);
pool.execute (te);
// 关闭线程池
pool.shutdown ();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out .println (Thread.currentThread ().getName ()+ " is running." );
}
}
copy
运行结果:
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
示例中,包括了线程池的创建,将任务添加到线程池中,关闭线程池这3个主要的步骤。稍后,我们会从这3个方面来分析ThreadPoolExecutor。
Executors和ThreadPoolExecutor完整源码#
参考jdk
Executors
ThreadPoolExecutor
线程池源码分析#
创建“线程池”#
下面以newFixedThreadPool()介绍线程池的创建过程。
newFixedThreadPool()#
newFixedThreadPool()在Executors.java中定义,源码如下:
1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS ,
new LinkedBlockingQueue<Runnable>());
}
copy
说明:newFixedThreadPool(int nThreads)的作用是创建一个线程池,线程池的容量是nThreads。
newFixedThreadPool()在调用ThreadPoolExecutor()时,会传递一个LinkedBlockingQueue()对象,而LinkedBlockingQueue是单向链表实现的阻塞队列。
在线程池中,就是通过该阻塞队列来实现"当线程池中任务数量超过允许的任务数量时,部分任务会阻塞等待”。
ThreadPoolExecutor()#
ThreadPoolExecutor()在ThreadPoolExecutor.java中定义,源码如下:
1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory (), defaultHandler);
}
copy
说明:该函数实际上是调用ThreadPoolExecutor的另外一个构造函数。该函数的源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null )
throw new NullPointerException();
// 核心池大小
this .corePoolSize = corePoolSize;
// 最大池大小
this .maximumPoolSize = maximumPoolSize;
// 线程池的等待队列
this .workQueue = workQueue;
this .keepAliveTime = unit.toNanos (keepAliveTime);
// 线程工厂对象
this .threadFactory = threadFactory;
// 拒绝策略的句柄
this .handler = handler;
}
copy
说明:在ThreadPoolExecutor()的构造函数中,进行的是初始化工作。
corePoolSize、maximumPoolSize、unit、keepAliveTime和workQueue这些变量的值是已知的,它们都是通过newFixedThreadPool()传递而来。下面看看threadFactory和handler对象。
ThreadFactory#
线程池中的ThreadFactory是一个线程工厂,线程池创建线程都是通过线程工厂对象(threadFactory)来完成的。
上面所说的threadFactory对象,是通过 Executors.defaultThreadFactory()返回的。Executors.java中的defaultThreadFactory()源码如下:
1
2
3
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
copy
defaultThreadFactory()返回DefaultThreadFactory对象。Executors.java中的DefaultThreadFactory()源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager ();
group = (s != null ) ? s.getThreadGroup () :
Thread.currentThread ().getThreadGroup ();
namePrefix = "pool-" +
poolNumber.getAndIncrement () +
"-thread-" ;
}
// 提供创建线程的API。
public Thread newThread(Runnable r) {
// 线程对应的任务是Runnable对象r
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement (),
0);
// 设为“非守护线程”
if (t.isDaemon ())
t.setDaemon (false );
// 将优先级设为“Thread.NORM_PRIORITY”
if (t.getPriority () != Thread.NORM_PRIORITY )
t.setPriority (Thread.NORM_PRIORITY );
return t;
}
}
copy
说明:ThreadFactory的作用就是提供创建线程的功能的线程工厂。
它是通过newThread()提供创建线程功能的,下面简单说说newThread()。newThread()创建的线程对应的任务是Runnable对象,它创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。
RejectedExecutionHandler#
handler是ThreadPoolExecutor中拒绝策略的处理句柄。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。
线程池默认会采用的是defaultHandler策略,即AbortPolicy策略。在AbortPolicy策略中,线程池拒绝任务时会抛出异常!
defaultHandler的定义如下:
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
AbortPolicy的源码如下:
1
2
3
4
5
6
7
8
9
10
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
// 直接抛出异常,没别的代码
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString () +
" rejected from " +
e.toString ());
}
}
copy
添加任务到“线程池”#
execute()#
execute()定义在ThreadPoolExecutor.java中,源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void execute(Runnable command) {
// 如果任务为null,则抛出异常。
if (command == null )
throw new NullPointerException();
// 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
int c = ctl.get ();
// 当线程池中的任务数量 < "核心池大小"时,即线程池中少于corePoolSize个任务。
// 则通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true ))
return ;
c = ctl.get ();
}
// 当线程池中的任务数量 >= "核心池大小"时,
// 而且,"线程池处于允许状态"时,则尝试将任务添加到阻塞队列中。
if (isRunning(c) && workQueue.offer (command)) {
// 再次确认“线程池状态”,若线程池异常终止了,则删除任务;然后通过reject()执行相应的拒绝策略的内容。
int recheck = ctl.get ();
if (! isRunning(recheck) && remove(command))
reject(command);
// 否则,如果"线程池中任务数量"为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
else if (workerCountOf(recheck) == 0)
addWorker(null , false );
}
// 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
// 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false ))
reject(command);
}
copy
说明:execute()的作用是将任务添加到线程池中执行。它会分为3种情况进行处理:
情况1 – 如果"线程池中任务数量" < “核心池大小"时,即线程池中少于corePoolSize个任务;此时就新建一个线程,并将该任务添加到线程中进行执行。
情况2 – 如果"线程池中任务数量” >= “核心池大小”,并且"线程池是允许状态";此时,则将任务添加到阻塞队列中阻塞等待。在该情况下,会再次确认"线程池的状态",如果"第2次读到的线程池状态"和"第1次读到的线程池状态"不同,则从阻塞队列中删除该任务。
情况3 – 非以上两种情况。在这种情况下,尝试新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,则通过reject()拒绝该任务。
addWorker()#
addWorker()的源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 更新"线程池状态和计数"标记,即更新ctl。
for (;;) {
// 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
int c = ctl.get ();
// 获取线程池状态。
int rs = runStateOf(c);
// 有效性检查
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty ()))
return false ;
for (;;) {
// 获取线程池中任务的数量。
int wc = workerCountOf(c);
// 如果"线程池中任务的数量"超过限制,则返回false。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false ;
// 通过CAS函数将c的值+1。操作失败的话,则退出循环。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get (); // Re-read ctl
// 检查"线程池状态",如果与之前的状态不同,则从retry重新开始。
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false ;
boolean workerAdded = false ;
Worker w = null ;
// 添加任务到线程池,并启动任务所在的线程。
try {
final ReentrantLock mainLock = this .mainLock ;
// 新建Worker,并且指定firstTask为Worker的第一个任务。
w = new Worker(firstTask);
// 获取Worker对应的线程。
final Thread t = w.thread ;
if (t != null ) {
// 获取锁
mainLock.lock ();
try {
int c = ctl.get ();
int rs = runStateOf(c);
// 再次确认"线程池状态"
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null )) {
if (t.isAlive ()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将Worker对象(w)添加到"线程池的Worker集合(workers)"中
workers.add (w);
// 更新largestPoolSize
int s = workers.size ();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true ;
}
} finally {
// 释放锁
mainLock.unlock ();
}
// 如果"成功将任务添加到线程池"中,则启动任务所在的线程。
if (workerAdded) {
t.start ();
workerStarted = true ;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
// 返回任务是否启动。
return workerStarted;
}
copy
说明:
addWorker(Runnable firstTask, boolean core) 的作用是将任务(firstTask)添加到线程池中,并启动该任务。
core为true的话,则以corePoolSize为界限,若"线程池中已有任务数量>=corePoolSize",则返回false;core为false的话,则以maximumPoolSize为界限,若"线程池中已有任务数量>=maximumPoolSize",则返回false。
addWorker()会先通过for循环不断尝试更新ctl状态,ctl记录了"线程池中任务数量和线程池状态"。
更新成功之后,再通过try模块来将任务添加到线程池中,并启动任务所在的线程。
从addWorker()中,我们能清晰的发现:线程池在添加任务时,会创建任务对应的Worker对象;而一个Workder对象包含一个Thread对象。(01) 通过将Worker对象添加到"线程的workers集合"中,从而实现将任务添加到线程池中。 (02) 通过启动Worker对应的Thread线程,则执行该任务。
submit()#
补充说明一点,submit()实际上也是通过调用execute()实现的,源码如下:
1
2
3
4
5
6
public Future<?> submit(Runnable task) {
if (task == null ) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null );
execute(ftask);
return ftask;
}
copy
关闭“线程池”#
shutdown()的源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void shutdown() {
final ReentrantLock mainLock = this .mainLock ;
// 获取锁
mainLock.lock ();
try {
// 检查终止线程池的“线程”是否有权限。
checkShutdownAccess();
// 设置线程池的状态为关闭状态。
advanceRunState(SHUTDOWN);
// 中断线程池中空闲的线程。
interruptIdleWorkers();
// 钩子函数,在ThreadPoolExecutor中没有任何动作。
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
// 释放锁
mainLock.unlock ();
}
// 尝试终止线程池
tryTerminate();
}
copy
说明:shutdown()的作用是关闭线程池。
线程池的状态#
线程有5种状态:新建状态,就绪状态,运行状态,阻塞状态,死亡状态。
线程池的5种状态是:Running, shutdown, stop, tidying, terminated。
ThreadPoolExecutor线程池状态定义如下:
1
2
3
4
5
6
7
8
9
10
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int ctlOf(int rs, int wc) { return rs | wc; }
copy
说明:
ctl是一个AtomicInteger类型的原子对象。ctl记录了"线程池中的任务数量"和"线程池状态"2个信息。
ctl共包括32位。其中,高3位表示"线程池状态",低29位表示"线程池中的任务数量"。
RUNNING – 对应的高3位值是111。
SHUTDOWN – 对应的高3位值是000。
STOP – 对应的高3位值是001。
TIDYING – 对应的高3位值是010。
TERMINATED – 对应的高3位值是011。
RUNNING
(01) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态!
道理很简单,在ctl的初始化代码中(如下),就将它初始化为RUNNING状态,并且"任务数量"初始化为0。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
SHUTDOWN
(01) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(02) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
STOP
(01) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(02) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
TIDYING
(01) 状态说明:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(02) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
TERMINATED
(01) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(02) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
拒绝策略#
线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施。
当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭。第二,任务数量超过线程池的最大限制。
线程池共包括4种拒绝策略,它们分别是:
AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy和DiscardPolicy。
AbortPolicy:当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
CallerRunsPolicy:当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
DiscardOldestPolicy:当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
DiscardPolicy:当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
线程池默认的处理策略是AbortPolicy!
下面通过示例,分别演示线程池的4种拒绝策略。
DiscardPolicy 示例#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.lang.reflect.Field;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
public class DiscardPolicyDemo {
private static final int THREADS_SIZE = 1;
private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS ,
new ArrayBlockingQueue<Runnable>(CAPACITY));
// 设置线程池的拒绝策略为"丢弃"
pool.setRejectedExecutionHandler (new ThreadPoolExecutor.DiscardPolicy ());
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < 10; i++) {
Runnable myrun = new MyRunnable("task-" +i);
pool.execute (myrun);
}
// 关闭线程池
pool.shutdown ();
}
}
class MyRunnable implements Runnable {
private String name;
public MyRunnable(String name) {
this .name = name;
}
@Override
public void run() {
try {
System.out .println (this .name + " is running." );
Thread.sleep (100);
} catch (Exception e) {
e.printStackTrace ();
}
}
}
copy
运行结果:
task-0 is running.
task-1 is running.
结果说明:线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。
线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。
根据"“中分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!
DiscardOldestPolicy 示例#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.lang.reflect.Field;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;
public class DiscardOldestPolicyDemo {
private static final int THREADS_SIZE = 1;
private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS ,
new ArrayBlockingQueue<Runnable>(CAPACITY));
// 设置线程池的拒绝策略为"DiscardOldestPolicy"
pool.setRejectedExecutionHandler (new ThreadPoolExecutor.DiscardOldestPolicy ());
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < 10; i++) {
Runnable myrun = new MyRunnable("task-" +i);
pool.execute (myrun);
}
// 关闭线程池
pool.shutdown ();
}
}
class MyRunnable implements Runnable {
private String name;
public MyRunnable(String name) {
this .name = name;
}
@Override
public void run() {
try {
System.out .println (this .name + " is running." );
Thread.sleep (200);
} catch (Exception e) {
e.printStackTrace ();
}
}
}
copy
运行结果:
task-0 is running.
task-9 is running.
结果说明:将"线程池的拒绝策略"由DiscardPolicy修改为DiscardOldestPolicy之后,当有任务添加到线程池被拒绝时,线程池会丢弃阻塞队列中末尾的任务,然后将被拒绝的任务添加到末尾。
AbortPolicy 示例#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.lang.reflect.Field;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.RejectedExecutionException;
public class AbortPolicyDemo {
private static final int THREADS_SIZE = 1;
private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS ,
new ArrayBlockingQueue<Runnable>(CAPACITY));
// 设置线程池的拒绝策略为"抛出异常"
pool.setRejectedExecutionHandler (new ThreadPoolExecutor.AbortPolicy ());
try {
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < 10; i++) {
Runnable myrun = new MyRunnable("task-" +i);
pool.execute (myrun);
}
} catch (RejectedExecutionException e) {
e.printStackTrace ();
// 关闭线程池
pool.shutdown ();
}
}
}
class MyRunnable implements Runnable {
private String name;
public MyRunnable(String name) {
this .name = name;
}
@Override
public void run() {
try {
System.out .println (this .name + " is running." );
Thread.sleep (200);
} catch (Exception e) {
e.printStackTrace ();
}
}
}
copy
(某一次)运行结果:
1
2
3
4
5
6
7
java.util .concurrent .RejectedExecutionException
at java.util .concurrent .ThreadPoolExecutor$AbortPolicy .rejectedExecution (ThreadPoolExecutor.java :1774)
at java.util .concurrent .ThreadPoolExecutor .reject (ThreadPoolExecutor.java :768)
at java.util .concurrent .ThreadPoolExecutor .execute (ThreadPoolExecutor.java :656)
at AbortPolicyDemo.main (AbortPolicyDemo.java :27)
task-0 is running.
task-1 is running.
copy
结果说明:将"线程池的拒绝策略"由DiscardPolicy修改为AbortPolicy之后,当有任务添加到线程池被拒绝时,会抛出RejectedExecutionException。
CallerRunsPolicy 示例#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.lang.reflect.Field;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
public class CallerRunsPolicyDemo {
private static final int THREADS_SIZE = 1;
private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS ,
new ArrayBlockingQueue<Runnable>(CAPACITY));
// 设置线程池的拒绝策略为"CallerRunsPolicy"
pool.setRejectedExecutionHandler (new ThreadPoolExecutor.CallerRunsPolicy ());
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < 10; i++) {
Runnable myrun = new MyRunnable("task-" +i);
pool.execute (myrun);
}
// 关闭线程池
pool.shutdown ();
}
}
class MyRunnable implements Runnable {
private String name;
public MyRunnable(String name) {
this .name = name;
}
@Override
public void run() {
try {
System.out .println (this .name + " is running." );
Thread.sleep (100);
} catch (Exception e) {
e.printStackTrace ();
}
}
}
copy
(某一次)运行结果:
1
2
3
4
5
6
7
8
9
10
task-2 is running.
task-3 is running.
task-4 is running.
task-5 is running.
task-6 is running.
task-7 is running.
task-8 is running.
task-9 is running.
task-0 is running.
task-1 is running.
copy
结果说明:将"线程池的拒绝策略"由DiscardPolicy修改为CallerRunsPolicy之后,当有任务添加到线程池被拒绝时,线程池会将被拒绝的任务添加到"线程池正在运行的线程"中去运行。
Callable 和 Future 简介#
Callable 和 Future 是比较有趣的一对组合。当我们需要获取线程的执行结果时,就需要用到它们。Callable用于产生结果,Future用于获取结果。
Callable#
Callable 是一个接口,它只包含一个call()方法。Callable是一个返回结果并且可能抛出异常的任务。
为了便于理解,我们可以将Callable比作一个Runnable接口,而Callable的call()方法则类似于Runnable的run()方法。
Callable的源码如下:
1
2
3
public interface Callable<V> {
V call() throws Exception;
}
copy
说明:从中我们可以看出Callable支持泛型。
Future#
Future 是一个接口。它用于表示异步计算的结果。提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。Future的源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Future<V> {
// 试图取消对此任务的执行。
boolean cancel(boolean mayInterruptIfRunning)
// 如果在任务正常完成前将其取消,则返回 true。
boolean isCancelled()
// 如果任务已完成,则返回 true。
boolean isDone()
// 如有必要,等待计算完成,然后获取其结果。
V get() throws InterruptedException, ExecutionException;
// 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
copy
说明: Future用于表示异步计算的结果。它的实现类是FutureTask,在讲解FutureTask之前,我们先看看Callable、Future、FutureTask它们之间的关系图,如下:
说明:
RunnableFuture是一个接口,它继承了Runnable和Future这两个接口。RunnableFuture的源码如下:
1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
copy
FutureTask实现了RunnableFuture接口。所以,我们也说它实现了Future接口。
示例和源码分析#
我们先通过一个示例看看Callable和Future的基本用法,然后再分析示例的实现原理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;
class MyCallable implements Callable {
@Override
public Integer call() throws Exception {
int sum = 0;
// 执行任务
for (int i=0; i<100; i++)
sum += i;
//return sum;
return Integer.valueOf (sum);
}
}
public class CallableTest1 {
public static void main(String[] args)
throws ExecutionException, InterruptedException{
//创建一个线程池
ExecutorService pool = Executors.newSingleThreadExecutor ();
//创建有返回值的任务
Callable c1 = new MyCallable();
//执行任务并获取Future对象
Future f1 = pool.submit (c1);
// 输出结果
System.out .println (f1.get ());
//关闭线程池
pool.shutdown ();
}
}
copy
运行结果:
4950
结果说明:
在主线程main中,通过newSingleThreadExecutor()新建一个线程池。接着创建Callable对象c1,然后再通过pool.submit(c1)将c1提交到线程池中进行处理,并且将返回的结果保存到Future对象f1中。然后,我们通过f1.get()获取Callable中保存的结果;最后通过pool.shutdown()关闭线程池。
submit()#
submit()在 java/util/concurrent/AbstractExecutorService.java
中实现,它的源码如下:
1
2
3
4
5
6
7
8
9
public <T> Future<T> submit(Callable<T> task) {
if (task == null ) throw new NullPointerException();
// 创建一个RunnableFuture对象
RunnableFuture<T> ftask = newTaskFor(task);
// 执行“任务ftask”
execute(ftask);
// 返回“ftask”
return ftask;
}
copy
说明:submit()通过newTaskFor(task)创建了RunnableFuture对象ftask。它的源码如下:
1
2
3
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
copy
FutureTask的构造函数#
1
2
3
4
5
6
7
8
public FutureTask(Callable<V> callable) {
if (callable == null )
throw new NullPointerException();
// callable是一个Callable对象
this .callable = callable;
// state记录FutureTask的状态
this .state = NEW; // ensure visibility of callable
}
copy
FutureTask的run()方法#
我们继续回到submit()的源码中。
在newTaskFor()新建一个ftask对象之后,会通过execute(ftask)执行该任务。此时ftask被当作一个Runnable对象进行执行,最终会调用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java
中实现,源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject (this , runnerOffset,
null , Thread.currentThread ()))
return ;
try {
// 将callable对象赋值给c。
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行Callable的call()方法,并保存结果到result中。
result = c.call ();
ran = true ;
} catch (Throwable ex) {
result = null ;
ran = false ;
setException(ex);
}
// 如果运行成功,则将result保存
if (ran)
set(result);
}
} finally {
runner = null ;
// 设置“state状态标记”
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
copy
说明:run()中会执行Callable对象的call()方法,并且最终将结果保存到result中,并通过set(result)将result保存。
之后调用FutureTask的get()方法,返回的就是通过set(result)保存的值。
参考
CompletableFuture#
使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
CompletableFuture.join()
和get()
的区别#
join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。会将异常包装成CompletionException异常 /CancellationException异常,但是本质原因还是代码内存在的真正的异常。
1
2
3
4
5
6
7
8
public static void main(String[] args) {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync (() -> {
int i =1/0;
return 1;
});
CompletableFuture.allOf (f1).join ();
System.out .println ("CompletableFuture Test" );
}
copy
get()方法抛出的是 checked 异常,ExecutionException、InterruptedException。需要用户手动处理(抛出或者 try catch)
1
2
3
4
5
6
7
8
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync (() -> {
int i =1/0;
return 1;
});
f1.get ();
System.out .println ("CompletableFuture Test" );
}
copy
创建异步任务#
runAsync 和 supplyAsync方法#
没有指定Executor的方法会使用ForkJoinPool.commonPool()
作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
CompletableFuture 提供了四个静态方法来创建一个异步操作。
1
2
3
4
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
copy
runAsync 不支持返回值。
supplyAsync 支持返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//无返回值
public static void runAsync() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync (() -> {
try {
TimeUnit.SECONDS .sleep (1);
} catch (InterruptedException e) {
}
System.out .println ("run end ..." );
});
future.get ();
}
//有返回值
public static void supplyAsync() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync (() -> {
try {
TimeUnit.SECONDS .sleep (1);
} catch (InterruptedException e) {
}
System.out .println ("run end ..." );
return System.currentTimeMillis ();
});
long time = future.get ();
System.out .println ("time = " +time);
}
copy
异步回调#
whenComplete#
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
1
2
3
4
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
copy
可以看到Action的类型是BiConsumer<? super T,? super Throwable>
它可以处理正常的计算结果,或者异常情况。
whenComplete:whenComplete 的任务,交给当前线程执行 。
whenCompleteAsync:whenCompleteAsync 的任务,交给线程池执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void whenComplete() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync (() -> {
try {
TimeUnit.SECONDS .sleep (1);
} catch (InterruptedException e) {
}
if (new Random().nextInt ()%2>=0) {
int i = 12/0;
}
System.out .println ("run end ..." );
});
future.whenComplete (new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void t, Throwable action) {
System.out .println ("执行完成!" );
}
});
future.exceptionally (new Function<Throwable, Void>() {
@Override
public Void apply(Throwable t) {
System.out .println ("执行失败!" +t.getMessage ());
return null ;
}
});
TimeUnit.SECONDS .sleep (2);
}
copy
thenApply#
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
1
2
3
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
copy
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static void thenApply() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync (new Supplier<Long>() {
@Override
public Long get() {
long result = new Random().nextInt (100);
System.out .println ("result1=" +result);
return result;
}
}).thenApply (new Function<Long, Long>() {
@Override
public Long apply(Long t) {
long result = t*5;
System.out .println ("result2=" +result);
return result;
}
});
long result = future.get ();
System.out .println (result);
}
copy
第二个任务依赖第一个任务的结果。
handle#
handle 是执行任务完成时对结果的处理。
handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
1
2
3
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
copy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void handle() throws Exception{
CompletableFuture<Integer> future = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int i= 10/0;
return new Random().nextInt (10);
}
}).handle (new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer param, Throwable throwable) {
int result = -1;
if (throwable==null ){
result = param * 2;
}else {
System.out .println (throwable.getMessage ());
}
return result;
}
});
System.out .println (future.get ());
}
copy
从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。
thenAccept#
接收任务的处理结果,并消费处理,无返回结果。
1
2
3
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
copy
1
2
3
4
5
6
7
8
9
10
11
public static void thenAccept() throws Exception{
CompletableFuture<Void> future = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
return new Random().nextInt (10);
}
}).thenAccept (integer -> {
System.out .println (integer);
});
future.get ();
}
copy
从示例代码中可以看出,该方法只是消费上一个执行完成的任务,并可以根据上面的任务返回的结果进行处理。并没有后续的输出操作。
thenRun#
跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun。
1
2
3
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
copy
1
2
3
4
5
6
7
8
9
10
11
public static void thenRun() throws Exception{
CompletableFuture<Void> future = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
return new Random().nextInt (10);
}
}).thenRun (() -> {
System.out .println ("thenRun ..." );
});
future.get ();
}
copy
该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。只是处理完任务后,执行 thenRun 的后续操作。
thenCombine 合并任务#
thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
1
2
3
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
copy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static void thenCombine() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync (new Supplier<String>() {
@Override
public String get() {
return "hello" ;
}
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync (new Supplier<String>() {
@Override
public String get() {
return "hello" ;
}
});
CompletableFuture<String> result = future1.thenCombine (future2, new BiFunction<String, String, String>() {
@Override
public String apply(String t, String u) {
return t+" " +u;
}
});
System.out .println (result.get ());
}
copy
thenAcceptBoth#
当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗
1
2
3
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
copy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static void thenAcceptBoth() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt (3);
try {
TimeUnit.SECONDS .sleep (t);
} catch (InterruptedException e) {
e.printStackTrace ();
}
System.out .println ("f1=" +t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt (3);
try {
TimeUnit.SECONDS .sleep (t);
} catch (InterruptedException e) {
e.printStackTrace ();
}
System.out .println ("f2=" +t);
return t;
}
});
f1.thenAcceptBoth (f2, new BiConsumer<Integer, Integer>() {
@Override
public void accept(Integer t, Integer u) {
System.out .println ("f1=" +t+";f2=" +u+";" );
}
});
}
copy
和thenCombine相比,thenAcceptBoth是没有返回值的。
runAfterBoth#
两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)
1
2
3
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
copy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static void runAfterBoth() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt (3);
try {
TimeUnit.SECONDS .sleep (t);
} catch (InterruptedException e) {
e.printStackTrace ();
}
System.out .println ("f1=" +t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt (3);
try {
TimeUnit.SECONDS .sleep (t);
} catch (InterruptedException e) {
e.printStackTrace ();
}
System.out .println ("f2=" +t);
return t;
}
});
f1.runAfterBoth (f2, new Runnable() {
@Override
public void run() {
System.out .println ("上面两个任务都执行完成了。" );
}
});
}
copy
applyToEither#
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。
1
2
3
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
copy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static void applyToEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt (3);
try {
TimeUnit.SECONDS .sleep (t);
} catch (InterruptedException e) {
e.printStackTrace ();
}
System.out .println ("f1=" +t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt (3);
try {
TimeUnit.SECONDS .sleep (t);
} catch (InterruptedException e) {
e.printStackTrace ();
}
System.out .println ("f2=" +t);
return t;
}
});
CompletableFuture<Integer> result = f1.applyToEither (f2, new Function<Integer, Integer>() {
@Override
public Integer apply(Integer t) {
System.out .println (t);
return t * 2;
}
});
System.out .println (result.get ());
}
copy
acceptEither#
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。
1
2
3
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
copy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static void acceptEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt (3);
try {
TimeUnit.SECONDS .sleep (t);
} catch (InterruptedException e) {
e.printStackTrace ();
}
System.out .println ("f1=" +t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt (3);
try {
TimeUnit.SECONDS .sleep (t);
} catch (InterruptedException e) {
e.printStackTrace ();
}
System.out .println ("f2=" +t);
return t;
}
});
f1.acceptEither (f2, new Consumer<Integer>() {
@Override
public void accept(Integer t) {
System.out .println (t);
}
});
}
copy
acceptEither没有返回值。
runAfterEither#
两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)。
1
2
3
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
copy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static void runAfterEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt (3);
try {
TimeUnit.SECONDS .sleep (t);
} catch (InterruptedException e) {
e.printStackTrace ();
}
System.out .println ("f1=" +t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt (3);
try {
TimeUnit.SECONDS .sleep (t);
} catch (InterruptedException e) {
e.printStackTrace ();
}
System.out .println ("f2=" +t);
return t;
}
});
f1.runAfterEither (f2, new Runnable() {
@Override
public void run() {
System.out .println ("上面有一个已经完成了。" );
}
});
}
copy
thenCompose#
thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
1
2
3
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
copy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static void thenCompose() throws Exception {
CompletableFuture<Integer> f = CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt (3);
System.out .println ("t1=" +t);
return t;
}
}).thenCompose (new Function<Integer, CompletionStage<Integer>>() {
@Override
public CompletionStage<Integer> apply(Integer param) {
return CompletableFuture.supplyAsync (new Supplier<Integer>() {
@Override
public Integer get() {
int t = param *2;
System.out .println ("t2=" +t);
return t;
}
});
}
});
System.out .println ("thenCompose result : " +f.get ());
}
copy
allOf()#
anyOf 可以接受多个 CompletableFuture 参数。所有 CompletableFuture 结束,才会执行后续的代码。因为每个 CompletableFuture 的返回值类型可能不同,所以无法确定返回类型是什么,所以 allOf 的返回值是CompletableFuture<void>
类型。 考虑下面的例子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void allOf1(){
CompletableFuture<String> future1 = CompletableFuture.supplyAsync (() -> {
try {
TimeUnit.SECONDS .sleep (1);
} catch (InterruptedException e) {
e.printStackTrace ();
}
return "future1的执行结果" ;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync (() -> {
try {
TimeUnit.SECONDS .sleep (2);
} catch (InterruptedException e) {
e.printStackTrace ();
}
return "future2的执行结果" ;
});
// join方法会阻塞
CompletableFuture.allOf (future1,future2).join ();
}
copy
会等待所有 future 执行完成,才会返回。
这种情况相当于异步执行了两个任务,且等待两个任务都执行完成才返回。
上面 allof 只接收了两个参数,如果多个参数如何接收呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
public static void allOf2(){
List<Integer> list = new ArrayList<>();
list.add (1);
list.add (2);
list.add (3);
CompletableFuture[] futures = list.stream ()
// 在这里异步执行任务
.map (i -> CompletableFuture.supplyAsync (() -> i + 1))
.toArray (CompletableFuture[]::new );
// 调用阻塞方法
CompletableFuture.allOf (futures).join ();
}
copy
上面的方法没有返回值,如果需要每个任务的返回值呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void allOf3(){
List<Integer> list = new ArrayList<>();
list.add (1);
list.add (2);
list.add (3);
List<CompletableFuture<Integer>> futureList = list.stream ()
// 在这里异步执行任务
.map (i -> CompletableFuture.supplyAsync (() -> i + 1))
.collect (Collectors.toList ());
CompletableFuture<Void> allOf = CompletableFuture
.allOf (futureList.toArray (new CompletableFuture[futureList.size ()]));
List<Integer> resultList = allOf
.thenApply (i -> futureList.stream ()
// 获取返回值
.map (j -> j.join ())
.collect (Collectors.toList ()))
.join ();
// 打印返回值
for (Integer i : resultList) {
System.out .println (i);
}
}
copy
对上面的方法进行封装:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Data
@AllArgsConstructor
public static class FutureTaskWorker<T, R> {
/**
* 需要异步执行的任务
*/
private List<T> taskList;
/**
* 需要执行的方法
*/
private Function<T, CompletableFuture<R>> workFunction;
/**
* 搜集执行结果
*/
public List<R> getAllResult() {
List<CompletableFuture<R>> futureList = taskList.stream ().map (workFunction).collect (Collectors.toList ());
CompletableFuture<Void> allCompletableFuture = CompletableFuture.allOf (futureList.toArray (new CompletableFuture[futureList.size ()]));
return allCompletableFuture.thenApply (e -> futureList.stream ().map (CompletableFuture::join).collect (Collectors.toList ())).join ();
}
}
// 调用封装的方法
public static void allOf4(){
List<Integer> list = new ArrayList<>();
list.add (1);
list.add (2);
list.add (3);
FutureTaskWorker<Integer, Integer> worker = new FutureTaskWorker<>(list,
i -> CompletableFuture.supplyAsync (() -> i + 1));
List<Integer> result = worker.getAllResult ();
System.out .println (result);
}
copy
anyOf()#
anyOf 可以接受多个 CompletableFuture 参数。其中任意一个 CompletableFuture 结束,就可以执行后续的代码,无须像 allOf() 那样,需要等待所有的 CompletableFuture 结束。
因为每个 CompletableFuture 的返回值类型可能不同,所以无法确定返回类型是什么,所以 anyOf 的返回值是CompletableFuture<Object>
类型。 考虑下面的例子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 测试anyOf
*/
public static void anyOf() throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync (() -> {
try {
TimeUnit.SECONDS .sleep (3);
} catch (InterruptedException e) {
e.printStackTrace ();
}
return "future1的执行结果" ;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync (() -> {
try {
TimeUnit.SECONDS .sleep (2);
} catch (InterruptedException e) {
e.printStackTrace ();
}
return "future2的执行结果" ;
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync (() -> {
try {
TimeUnit.SECONDS .sleep (1);
} catch (InterruptedException e) {
e.printStackTrace ();
}
return "future3的执行结果" ;
});
CompletableFuture<Object> anyOf = CompletableFuture.anyOf (future1, future2, future3);
System.out .println (anyOf.get ());
}
copy
3 个 future 中,future3 睡眠时间最短,会最先执行完成,anyOfFuture.get()
获取的也就是 future2 的内容。future1、future3 的 返回结果被丢弃了。
ForkJoin#
Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。
举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。
主要有两步:
线程池中的每个线程都有自己的工作队列,当自己队列中的任务都完成以后,会从其它线程的工作队列中偷一个任务执行,这样可以充分利用资源。
PS:这一点和ThreadPoolExecutor不同,ThreadPoolExecutor是所有线程公用一个工作队列,所有线程都从这个工作队列中取任务。
工作窃取(work-stealing)#
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
那么为什么需要使用工作窃取算法呢?
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
核心类#
ForkJoinPool#
ForkJoinPool与其它的ExecutorService区别主要在于它使用“工作窃取”:线程池中的所有线程都企图找到并执行提交给线程池的任务。当大量的任务产生子任务的时候,或者同时当有许多小任务被提交到线程池中的时候,这种处理是非常高效的。特别的,当在构造方法中设置asyncMode为true的时候这种处理更加高效。
WorkQueue是一个ForkJoinPool中的内部类,它是线程池中线程的工作队列的一个封装,支持任务窃取。
ForkJoinTask#
ForkJoinTask代表运行在ForkJoinPool中的任务。
主要方法:
1
2
3
4
5
6
7
8
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread ()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue .push (this );
else
ForkJoinPool.common .externalPush (this );
return this ;
}
copy
1
2
3
4
5
6
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
copy
invoke():开始执行任务,如果必要,等待计算完成。
1
2
3
4
5
6
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
copy
子类:
RecursiveAction:一个递归无结果的ForkJoinTask(没有返回值)
RecursiveTask:一个递归有结果的ForkJoinTask(有返回值)
ForkJoinWorkerThread#
ForkJoinWorkerThread代表ForkJoinPool线程池中的一个执行任务的线程。
1
2
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
copy
从代码中我们可以清楚地看到,ForkJoinWorkThread持有ForkJoinPool和ForkJoinPool.WorkQueue的引用,以表明该线程属于哪个线程池,它的工作队列是哪个。
对大数求和:需要子线程的返回结果#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class ForkJoinTest {
public static void main(String[] args) throws Exception {
// 创建2000个随机数组成的数组:
long [] array = new long [2000];
long expectedSum = 0;
for (int i = 0; i < array.length ; i++) {
array[i] = random();
expectedSum += array[i];
}
System.out .println ("预期结果:Expected sum: " + expectedSum);
// fork/join:
ForkJoinTask<Long> task = new SumTask(array, 0, array.length );
long startTime = System.currentTimeMillis ();
Long result = ForkJoinPool.commonPool ().invoke (task);
long endTime = System.currentTimeMillis ();
System.out .println ("Fork/join的结果 sum: " + result + " 执行时间是 " + (endTime - startTime) + " ms." );
}
static Random random = new Random(0);
static long random() {
return random.nextInt (10000);
}
}
class SumTask extends RecursiveTask<Long> {
//阈值
static final int THRESHOLD = 500;
long [] array;
int start;
int end;
SumTask(long [] array, int start, int end) {
this .array = array;
this .start = start;
this .end = end;
}
@Override
protected Long compute() {
// 如果任务足够小,直接计算:
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += this .array [i];
// 故意放慢计算速度:
try {
Thread.sleep (1);
} catch (InterruptedException e) {
}
}
return sum;
}
// 任务太大,一分为二
int middle = (end + start) / 2;
System.out .println (String.format ("split %d~%d ==> %d~%d, %d~%d" , start, end, start, middle, middle, end));
SumTask subTask1 = new SumTask(this .array , start, middle);
SumTask subTask2 = new SumTask(this .array , middle, end);
invokeAll(subTask1, subTask2);
Long subResult1 = subTask1.join ();
Long subResult2 = subTask2.join ();
Long result = subResult1 + subResult2;
System.out .println ("result = " + subResult1 + " + " + subResult2 + " ==> " + result);
return result;
}
}
copy
批量发送消息:不需要子线程的返回值#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ForkJoinPoolDemo {
public static void main(String[] args) throws InterruptedException {
List<String> list = new ArrayList<>();
for (int i = 0; i < 123; i++) {
list.add (String.valueOf (i+1));
}
ForkJoinPool pool = new ForkJoinPool();
pool.submit (new ForkJoinPoolDemo().new SendMsgTask(0, list.size (), list));
pool.awaitTermination (10, TimeUnit.SECONDS );
pool.shutdown ();
}
class SendMsgTask extends RecursiveAction {
private final int THRESHOLD = 10;
private int start;
private int end;
private List<String> list;
public SendMsgTask(int start, int end, List<String> list) {
this .start = start;
this .end = end;
this .list = list;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
System.out .println (Thread.currentThread ().getName () + ": " + list.get (i));
}
}else {
int middle = (start + end) / 2;
invokeAll(new SendMsgTask(start, middle, list), new SendMsgTask(middle, end, list));
}
}
}
}
copy
数组排序#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.wjy.test;
import java.util.Arrays;
import java.util.concurrent.*;
public class RecursiveActionDemo {
private static class SortTask extends RecursiveAction {
static final int THRESHOLD = 100;
final long [] array;
final int lo, hi;
public SortTask(long [] array, int lo, int hi) {
this .array = array;
this .lo = lo;
this .hi = hi;
}
public SortTask(long [] array) {
this (array, 0, array.length );
}
public void sortSequentially(int lo, int hi) {
Arrays.sort (array, lo, hi);
}
public void merge(int lo, int mid, int hi) {
long [] buf = Arrays.copyOfRange (array, lo, mid);
for (int i = 0, j = lo, k = mid; i < buf.length ; j++) {
array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];
}
}
@Override
protected void compute() {
if (hi - lo < THRESHOLD) {
sortSequentially(lo, hi);
}else {
int mid = (lo + hi) >>> 1;
invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi));
merge(lo, mid, hi);
}
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
long [] array = new long [120];
for (int i = 0; i < array.length ; i++) {
array[i] = (long ) (Math.random () * 1000);
}
System.out .println ("生成一个数组:" + Arrays.toString (array));
ForkJoinPool pool = new ForkJoinPool();
pool.submit (new SortTask(array));
pool.awaitTermination (5, TimeUnit.SECONDS );
pool.shutdown ();
System.out .println ("数组排序完毕:" + Arrays.toString (array));
}
}
copy
斐波那契数列#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.wjy.test;
import java.util.concurrent.*;
public class ForkJoinPoolDemo {
private static class Fibonacci extends RecursiveTask<Integer> {
final int n;
public Fibonacci(int n) {
this .n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork ();
Fibonacci f2 = new Fibonacci(n - 1);
return f2.compute () + f1.join ();
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool pool = new ForkJoinPool();
Future<Integer> future = pool.submit (new Fibonacci(10));
System.out .println (future.get ());
pool.shutdown ();
}
}
copy
参考Doug Lea本人写的pdf
java中的几种线程池#
Java通过Executors提供5种线程池,分别为:
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
ExecutorService executorService = Executors.newFixedThreadPool(2);
newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。可能导致内存溢出,一般使用newFixedThreadPool代替。
ExecutorService executorService = Executors.newCachedThreadPool();
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
ExecutorService executorService = Executors.newScheduledThreadPool(2);
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
ExecutorService executorService = Executors.newSingleThreadExecutor();
newSingleThreadScheduledExecutor
创建只有一条线程的线程池,他可以在指定延迟后执行线程任务
newWorkStealingPool(jdk1.8新增)
stealing 翻译为抢断、窃取的意思,上边四种线程池是通过ThreadPoolExecutor
实现的,而newWorkStealingPool
是通过ForkJoinPool
实现的。
newWorkStealingPool会根据所需的并行层次来动态创建和关闭线程。这是一个并行线程池,参数中传入的是一个线程并发的数量。
这个线程池和其他线程池有很明显的区别,前面4种线程池都有核心线程数、最大线程数等等,而这个线程池只使用一个并发线程数来解决问题。
从介绍中,还说明这个线程池不会保证任务的顺序执行,也就是 WorkStealing 的意思,抢占式的工作。即任务的执行是无序的,哪个线程抢到任务,就由它执行。
ExecutorService executorService = Executors.newWorkStealingPool();
newSingleThreadExecutor#
1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS ,
new LinkedBlockingQueue<Runnable>()));
}
copy
LinkedBlockingQueue
是一个无界阻塞队列,可能会堆积大量请求,不建议使用该线程池。
创建只有一个线程的线程池,且线程的存活时间是无限的;当该线程正繁忙时,对于新任务会进入阻塞队列中(无界的阻塞队列)。适用:只有一个任务的场景(那我还用你干嘛?)
1
2
3
4
5
6
7
8
//创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
public static void singleTheadPoolTest() {
ExecutorService pool = Executors.newSingleThreadExecutor ();
for (int i = 0; i < 10; i++) {
final int ii = i;
pool.execute (() -> out.println (Thread.currentThread ().getName () + "=>" + ii));
}
}
copy
执行结果:
线程名称:pool-1-thread-1,执行0
线程名称:pool-1-thread-1,执行1
线程名称:pool-1-thread-1,执行2
线程名称:pool-1-thread-1,执行3
线程名称:pool-1-thread-1,执行4
线程名称:pool-1-thread-1,执行5
线程名称:pool-1-thread-1,执行6
线程名称:pool-1-thread-1,执行7
线程名称:pool-1-thread-1,执行8
线程名称:pool-1-thread-1,执行9
由于我们的线程池中使用的从始至终都是单个线程,所以这里的线程名字都是相同的,而且下载任务都是一个一个的来,直到有空闲线程时,才会继续执行任务,否则都是等待状态。
newFixedThreadPool#
1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS ,
new LinkedBlockingQueue<Runnable>());
}
copy
LinkedBlockingQueue
是一个无界阻塞队列,可能会堆积大量请求,不建议使用该线程池。
创建可容纳固定数量线程的线程池,每个线程的存活时间是无限的,当池子满了就不在添加线程了;如果池中的所有线程均在繁忙状态,对于新任务会进入阻塞队列中(无界的阻塞队列)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 1.创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小<br>
* 2.线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程<br>
* 3.因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字,和线程名称<br>
*/
public static void fixTheadPoolTest() {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool (3);
for (int i = 0; i < 10; i++) {
final int ii = i;
fixedThreadPool.execute (() -> {
out.println ("线程名称:" + Thread.currentThread ().getName () + ",执行" + ii);
try {
Thread.sleep (2000);
} catch (InterruptedException e) {
e.printStackTrace ();
}
});
}
}
copy
执行结果:
线程名称:pool-1-thread-3,执行2
线程名称:pool-1-thread-1,执行0
线程名称:pool-1-thread-2,执行3
线程名称:pool-1-thread-3,执行4
线程名称:pool-1-thread-1,执行5
线程名称:pool-1-thread-2,执行6
线程名称:pool-1-thread-3,执行7
线程名称:pool-1-thread-1,执行8
线程名称:pool-1-thread-3,执行9
我们通过newFixedThreadPool(2)给它传入了 2 个核心线程数,看看下载效果如何:
显然,它就可以做到并发的下载,我们两个下载任务可以同时进行,并且所用的线程始终都只有两个,因为它的最大线程数等于核心线程数,不会再去创建新的线程了。
newCachedThreadPool#
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE ,
60L, TimeUnit.SECONDS ,
new SynchronousQueue<Runnable>());
}
copy
maximumPoolSize为Integer.MAX_VALUE,可能会堆积大量请求,不建议使用该线程池。
可以进行缓存的线程池,意味着它的线程数是最大的,无限的。
当有新任务到来,则插入到SynchronousQueue中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;若池中线程空闲时间超过指定大小,则该线程会被销毁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 1.创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程
* 2.当任务数增加时,此线程池又可以智能的添加新线程来处理任务
* 3.此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小
*/
public static void cacheThreadPool() {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool ();
for (int i = 1; i <= 10; i++) {
final int ii = i;
try {
Thread.sleep (ii * 1);
} catch (InterruptedException e) {
e.printStackTrace ();
}
cachedThreadPool.execute (()->System.out .println ("线程名称:" + Thread.currentThread ().getName () + ",执行" + ii));
}
}
copy
执行结果:
线程名称:pool-1-thread-1,执行1
线程名称:pool-1-thread-1,执行2
线程名称:pool-1-thread-1,执行3
线程名称:pool-1-thread-1,执行4
线程名称:pool-1-thread-1,执行5
线程名称:pool-1-thread-1,执行6
线程名称:pool-1-thread-1,执行7
线程名称:pool-1-thread-1,执行8
线程名称:pool-1-thread-1,执行9
线程名称:pool-1-thread-1,执行10
因为没有最大线程数限制,所以一点全部开始下载,就会创建出 5 条新的线程同时执行任务,从上图的例子看出,每条线程都不一样。
由于这种线程池创建时初始化时,最大线程数和任务的阻塞队列都是无界的值,所以这可能会耗尽cpu资源。
因为空闲的线程的摧毁时间是60秒,为了证明线程的复用效果,我这里又添加了一个按钮,在这个按钮中继续添加后面两个下载任务。
当线程下载完毕时,空闲线程就会复用,结果显示如下,复用线程池的空闲线程:
另一种情况,当线程池中没有空闲线程时,这时又加了新的任务,它就会创建出新的线程来执行任务,结果如下:
newScheduledThreadPool#
1
2
3
4
5
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super (corePoolSize, Integer.MAX_VALUE , 0, NANOSECONDS,
new ScheduledThreadPoolExecutor.DelayedWorkQueue (), threadFactory);
}
copy
maximumPoolSize为Integer.MAX_VALUE,可能会创建大量线程,从而导致 OOM,不建议使用该线程池。
创建一个固定大小的线程池,线程池内线程存活时间无限制,线程池可以支持定时及周期性任务执行,如果所有线程均处于繁忙状态,对于新任务会进入DelayedWorkQueue队列中,这是一种按照超时时间排序的队列结构。
适用:定时及周期性任务执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 创建一个定长线程池,支持定时及周期性任务执行。延迟执行
*/
public static void sceduleThreadPool() {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool (5);
Runnable r1 = () -> out.println ("线程名称:" + Thread.currentThread ().getName () + ",执行:3秒后执行" );
scheduledThreadPool.schedule (r1, 3, TimeUnit.SECONDS );
Runnable r2 = () -> out.println ("线程名称:" + Thread.currentThread ().getName () + ",执行:延迟2秒后每3秒执行一次" );
scheduledThreadPool.scheduleAtFixedRate (r2, 2, 3, TimeUnit.SECONDS );
Runnable r3 = () -> out.println ("线程名称:" + Thread.currentThread ().getName () + ",执行:普通任务" );
for (int i = 0; i < 5; i++) {
scheduledThreadPool.execute (r3);
}
}
copy
执行结果:
线程名称:pool-1-thread-1,执行:普通任务
线程名称:pool-1-thread-5,执行:普通任务
线程名称:pool-1-thread-4,执行:普通任务
线程名称:pool-1-thread-3,执行:普通任务
线程名称:pool-1-thread-2,执行:普通任务
线程名称:pool-1-thread-1,执行:延迟2秒后每3秒执行一次
线程名称:pool-1-thread-5,执行:3秒后执行
线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
newWorkStealingPool#
最明显的用意就是它是一个并行的线程池,参数中传入的是一个线程并发的数量,这里和之前就有很明显的区别,
前面4种线程池都有核心线程数、最大线程数等等,而这就使用了一个并发线程数解决问题。
从介绍中,还说明这个线程池不会保证任务的顺序执行,也就是 WorkStealing 的意思,抢占式的工作。
gif图参考
多线程实际应用#
需求:批量插入100w数据,使用线程池。
思路:将数据进行分组。
方法:一次性取出全部数据,放到内存中,在内存中将数据进行分组,然后线程池处理数据。
如果数据太多,可以先分批从数据库中取数据,然后在内存中分组。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @author: wjy
* @description: 给没有班级的学校,批量添加60个班级
*/
@Override
public void addSquadForSchool() {
//没有班级的学校数据
List<Integer> list = schoolMapper.getSchoolWithoutSquad ();
for (int i = 0; i < list.size (); i+=100) {
//100条数据一组
int toIndex = i + 100 > list.size () ? list.size () : i+100;
List<Integer> subList = list.subList (i,toIndex);
// 每1000条数据为一组,此处也可以用这种方式分组
// List<List<Long>> lists = Lists.partition(idList, 1000);
//线程池处理数据
executorService.execute (()->{
//批量添加班级
SquadUtils.batchInsertSquad (subList);
});
}
}
copy
线程池监控#
美团技术团队关于线程池的文章
其他参考
线程池测试题#
1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool (5);
ClassPushQuestionDTO dto = new ClassPushQuestionDTO();
for (int i = 0; i < 3; i++) {
dto.setClassId (i);
executorService.execute (()->{
System.out .println (dto.getClassId ());
});
}
}
copy
以上程序如何打印。