CompletionService的使用

接口CompletionService的功能是以异步的方式一边生产新任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开来进行处理。使用submit()执行任务,使用take()取得已完成的任务,并按照完成这些任务的时间顺序处理它们。

上一篇文章讲到Future具有阻塞同步性,这样的代码运行效率大打折扣,接口CompletionService可以很好的解决这个问题。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class App {
public static void main(String[] args) {
try {
ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
CompletionService cs = new ExecutorCompletionService(pool);
List<Callable> list = new ArrayList<>();
list.add(() -> { System.out.println("username1");Thread.sleep(5000);return "return username1"; });
list.add(() -> { System.out.println("username2");Thread.sleep(4000);return "return username2"; });
list.add(() -> { System.out.println("username3");Thread.sleep(3000);return "return username3"; });
list.add(() -> { System.out.println("username4");Thread.sleep(2000);return "return username4"; });
list.add(() -> { System.out.println("username5");Thread.sleep(1000);return "return username5"; });

list.forEach(cs::submit);

for (int i = 1; i <= 5; i ++) {
System.out.println("等待第" + i + "个的返回值");
System.out.println(cs.take().get());
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
username1
username2
username3
username4
等待第1个的返回值
username5
return username5
等待第2个的返回值
return username4
等待第3个的返回值
return username3
等待第4个的返回值
return username2
等待第5个的返回值
return username1

从结果来看,CompletionService解决Future阻塞的特性,也就是谁先执行完成,就先处理谁的结果。
当然如果当前没有任务被执行完,cs.take().get()也是呈阻塞特性的

CompletionService执行任务时的各种异常:
如果任务执行异常,则通过对应的Future对象的get()方法抛出,如果不执行Future对象的get()方法,则该异常不抛出

方法介绍

1
2
3
4
5
take()取得最先完成任务的Future对象
pool()获取并移除表示下一个已经完成任务的Future对象,如果不存在这样的任务,则返回null,此方法无阻塞的效果
poll(long timeout, TimeUnit unit)等带指定的时间,在指定的时间内获取到值时立即向下继续执行,如果超时也立即执行
submit(Callable<V> task)
submit(Runnable task, V result)参数V是对应的返回值