批量 FutureTask
在Future模式的讲解中,说到future模式是一种支持异步计算并可以返回计算结果,并在返回结果前可以阻塞调用者的线程对象模型。
批量Future模式是在对Future模式的扩展。比如有,一批FutureTask,我要把这批task打包到一起,等待整个任务包执行完后,在返回给调用者,或在规定时间内返回计算结果。
在Jdk中实现的方法是:
// 支持超时的批量任务模式
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
JDK的一种简单实现方式
Jdk在实现这种模式时,用了一个ArrayList存放task包,每个task还是单独的送入到调度器中,然后在等待每个task的计算结果,等到所有的task计算完成后在返回包含每个task的计算结果集合,否则阻塞当前调用线程
下面是jdk的实现源码:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
// 存放task的集合
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 每个task都送入到调度器
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
// 等待每一个task返回计算结果
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
// 如果其中有task被取消了,则忽略
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}