上一篇介绍了ThreadPoolExecutor,提供了一个execute方法执行任务,但execute方法没有返回值,不能批量提交任务,不能设置超时时间。AbstractExecutorService是ThreadPoolExecutor的抽象父类,提供了一些额外方法,使得执行任务的功能得到增强。方法可分为三类:有返回的提交(submit),执行全部(invokeAll),执行任意一个(invokeAny)。这三类方法也是基于execute。
submit方法
作用
submit用来提交一个任务,并异步返回任务的执行结果。
重载方法
submit有三种重载方法,可以提交一个Runnable或Callable类型对象作为参数。
| 方法 | 参数 | 返回 | 说明 |
|---|---|---|---|
| submit | Callable |
Future |
提交一个Callable对象,返回对应类型Future |
| submit | Runnable task | Future<?> | 提交一个Runnable 对象,返回Void类型Future |
| submit | Runnable task, T result | Future |
提交一个Runnable 对象,并指定返回Future类型 |
源码分析
submit方法传入一个Callable类型对象task,接着通过newTaskFor方法将task转换成RunnableFuture类型对象ftask,然后调用execute(ftask),最后返回ftask。

Callable接口
其中,j.u.c.Callable是一个函数式接口,只包含一个call方法。与Runnable.run相比,call方法拥有返回值,类型即为泛型V。

Future接口
Future接口提供了一些方法,其中get方法可以在任务执行完毕后获得返回值。

RunnableFuture接口
由于RunnableFuture实现了Runnable和Future,因此这个ftask既可以是execute的方法参数,又可以是submit的返回值。

newTaskFor方法
newTaskFor有两种重载方法,这两种方法都是将一个Callable或Runnable 对象封装成了FutureTask对象。而FutureTask实现了RunnableFuture接口。
| 方法 | 参数 | 返回 | 说明 |
|---|---|---|---|
| newTaskFor | Callable |
RunnableFuture |
将callable封装进RunnableFuture |
| newTaskFor | Runnable runnable, T value | RunnableFuture |
将runnable封装进RunnableFuture,并通过value指定泛型 |
分别调用了两个FutureTask的构造方法


由之前对execute(Runnable command)方法的分析可知,线程最终执行的是command.run()方法。而这里传入的command是FutureTask类型。
FutureTask
FutureTask中起关键作用的是run、get和report方法
run方法
在run方法中,先执行callable.call()方法获得结果result,再将result保存到成员变量outcome中


get方法
get方法用来获取结果,包括两个重载方法
无超时时间
任务结束前会一直阻塞

有超时时间
如果超时了任务还没完成,则抛出异常

report方法
任务执行完成,会调用report方法,返回outcome

invokeAny方法
作用
invokeAny方法用来提交多个任务,只要这些任务中有一个成功,就返回执行结果,其它任务都会被取消。如果所有任务都执行失败,则抛出异常。
invokeAny方法适合执行某些时效性要求高的任务。比如向多台机器发出同样的计算请求,结果都一样,但性能高或网络好的机器会最先返回,那其他任务可以取消。
重载方法
invokeAny包含两个重载方法,提交一个Callable<T>类型任务集合。区别是是否允许超时。
| 方法 | 参数 | 返回 | 说明 |
|---|---|---|---|
| invokeAny | Collection<? extends Callable |
T | 提交任务集合,只要有任意一个成功则返回,其余任务取消 |
| invokeAny | Collection<? extends Callable |
T | 超时则抛出异常 |
这两个方法内部都是调用的doInvokeAny方法。
doInvokeAny方法
doInvokeAny是invokeAny的具体实现。
逻辑
1.首先构建一个ECS实例,提交第一个任务,然后进入for循环,循环体包含两个if语句;
2.进入循环后,开始尝试从ecs的队列中获取一个future,此时存在以下几种情况:
2.1还没有任务执行完成,future=null,进入第一个if语句,判断剩余任务是否都已提交
2.1.1如果还没提交完,则提交一个,进入下一轮循环;
2.1.2如果已经提交完,判断是否设置了超时时间
2.1.2.1如果设置了,则等待超时时间,没有结果就抛出异常,有结果就执行第二个if语句;
2.1.2.2如果没有设置,则阻塞等待。
2.2已有任务已执行完成,future!=null,执行后续的f.get操作,
2.2.1如果get获取失败,则记录异常,进入下一轮循环;
2.2.2如果get获取成功,则结束循环,返回执行结果
3.如果循环结束所有任务都执行失败,则抛出最后记录的异常;
4.最后中断所有已提交的任务。
源码分析
1 | private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, |
ExecutorCompletionService类
在doInvokeAny方法中,创建了一个j.u.c.ExecutorCompletionService对象ecs。

ecs包含三个成员变量,其中
executor就是调用方ThreadPoolExecutor对象aes为向上转型的AbstractExecutorService对象completionQueue是BlockingQueue<Future>类型,用来保存执行结果
QueueingFuture类
ExecutorCompletionService提供了submit,poll,take方法。
其中submit方法创建了一个ExecutorCompletionService.QueueingFuture对象,QueueingFuture继承自FutureTask,重写了父类的done方法,用于将task添加到completionQueue中。


而done方法会在FutureTask.run方法执行后被调用。因此completionQueue中保存有任务执行结果。
因此AbstractExecutorService可以通过ECS的poll,take方法从completionQueue中获取执行结果。

invokeAll方法
作用
与invokeAny不同,invokeAll方法用来提交多个任务,并等待所有任务返回。
重载方法
invokeAll包含两个重载方法,都是提交一个Callable<T>类型任务集合。区别是是否允许超时。
| 方法 | 参数 | 返回 | 说明 |
|---|---|---|---|
| invokeAll | Collection<? extends Callable |
List<Future |
提交任务集合,阻塞直到所以任务都完成 |
| invokeAll | Collection<? extends Callable |
List<Future |
超时则直接返回 |
源码分析
不设置超时
会等待所有任务执行完毕,并同步返回执行结果。一旦有任务提交失败,或者执行失败,则取消所有任务。
1 | public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) |
设置超时
会等待所有任务执行完毕,并同步返回执行结果。一旦有任务提交失败,或者超时,则取消所有任务。
1 | public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, |


