基于JDK中的 Future 实现异步编程
使用FutureTask进行异步计算
FutureTask介绍
FutureTask代表了一个可被取消的异步计算任务,该类实现了Future接口,比如提供了启动和取消任务、查询任务是否完成、获取计算结果的接口。
FutureTask任务的结果只有当任务完成后才能获取,并且只能通过get系列方法获取,当结果还没出来时,线程调用get系列方法会被阻塞。另外,一旦任务被执行完成,任务将不能重启,除非运行时使用了runAndReset方法。FutureTask中的任务可以是Callable类型,也可以是Runnable类型(因为FutureTask实现了Runnable接口),FutureTask类型的任务可以被提交到线程池执行。
向线程池提交任务
- 使用submit
// 调用了线程池的submit方法提交了一个任务到线程池,然后返回了一个futureTask对象
Future<String> futureTask = POOL_EXECUTOR.submit(() -> {
String result = null;
try {
result = doSomethingA();
} catch (Exception e) {
e.printStackTrace();
}
return result;
});
// 执行任务
String taskBResult = doSomethingB();
// 获取结果,同步等待线程A运行结束
String taskAResult = futureTask.get();
- 使用execute
// 创建future任务
FutureTask<String> futureTask = new FutureTask<String>(() -> {
String result = null;
try {
result = doSomethingA();
} catch (Exception e) {
e.printStackTrace();
}
return result;
});
// 添加异步任务A到线程池
POOL_EXECUTOR.execute(futureTask);
// 执行任务B
String taskBResult = doSomethingB();
// 同步等待线程A运行结束
String taskAResult = futureTask.get();
总结
当我们创建一个FutureTask时,其任务状态初始化为NEW,当我们把任务提交到线程或者线程池后,会有一个线程来执行该FutureTask任务,具体是调用其run方法来执行任务。在任务执行过程中,我们可以在其他线程调用FutureTask的get()方法来等待获取结果,如果当前任务还在执行,则调用get的线程会被阻塞然后放入FutureTask内的阻塞链表队列;多个线程可以同时调用get方法,这些线程可能都会被阻塞并放到阻塞链表队列中。当任务执行完毕后会把结果或者异常信息设置到outcome变量,然后会移除和唤醒FutureTask内阻塞链表队列中的线程节点,进而这些由于调用FutureTask的get方法而被阻塞的线程就会被激活。
CompletableFuture 实战 –基础篇
文中的 sleep 方法目的是模拟异步任务执行;
POOL_EXECUTOR 为已初始化完毕的线程池;
为缩短篇幅更明了,所有异常均需自行捕获。
CompletableFuture 概述
CompletableFuture是一个可以通过编程方式显式地设置计算结果和状态以便让任务结束的Future,并且其可以作为一个CompletionStage(计算阶段),当它的计算完成时可以触发一个函数或者行为;当多个线程企图调用同一个CompletableFuture的complete、cancel方式时只有一个线程会成功。
显式设置 CompletableFuture 结果
- 创建一个 CompletableFuture 对象
CompletableFuture<String> future = new CompletableFuture<>();
- 将代码提交到异步线程池中执行;开启线程计算任务结果,并调用 future 的 complete 方法设置 future 的结果,设置完结果后,所有由于调用 future 的 get() 方法而被阻塞的线程会被激活,并返回设置的结果。
POOL_EXECUTOR.execute(() -> {
Thread.sleep(3000);
// 设置计算结果到future:调用future的complete方法设置future的结果,设置完结果后,所有由于调用fture的get()方法而被阻塞的线程会被激活,并返回设置的结果。
future.complete("hello,Raines");
});
- 等待计算结果:调用future的get()方法企图获取future的结果,如果future的结果没有被设置,则调用线程会被阻塞。
log.info(future.get());
基于 CompletableFuture 实现异步计算与结果转换
基于 runAsync 实现无返回值的异步计算
- 如果不指定线程池 poolExecutor 执行,则默认使用整个JVM内唯一的 ForkJoinPool.commonPool() 线程池来执行异步任务。future.get() 返回为 null
CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
log.info("over");
}
}, poolExecutor);
// 同步等待异步任务执行结束
System.out.println(future.get());
基于 supplyAsync 实现有返回值的异步执行
- 如果不指定线程池 Executor,则默认使用整个JVM内唯一的 ForkJoinPool.commonPool() 线程池来执行异步任务。
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
Thread.sleep(2000);
// 返回异步计算结果
return "hello";
}
});
// 同步等待异步任务执行结束
System.out.println(future.get());
基于 thenRun 实现 无前者执行结果 无返回值 的回调
- twoFuture 拿不到 oneFuture 的执行结果;
- 在 twoFuture 调用 get() 也会返回 null,因为回调事件是没有返回值的。
CompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
Thread.sleep(2000);
// 返回计算结果
return "hello";
}
});
// 在 oneFuture 上施加事件,当 oneFuture 计算完成后回调该事件,并返回新 future
CompletableFuture twoFuture = oneFuture.thenRun(new Runnable() {
@Override
public void run() {
Thread.sleep(1000);
System.out.println("---after oneFuture over doSomething---");
}
});
// 同步等待 twoFuture 对应的任务完成,返回结果固定为null
System.out.println(twoFuture.get());
基于 thenAccept 实现 有前者执行结果 无返回值 的回调
- 基于 thenAccept 实现异步任务 one,执行完毕后,激活异步任务 two 执行,需要注意的是,这种方式激活的异步任务 two 是可以在回调方法 accept(String t) 中的参数 t 中拿到任务 one 的执行结果的。
- 由于 accept(String t) 方法没有返回值,所有 two 上调用 get() 也返回 null。
CompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
Thread.sleep(2000);
// 返回计算结果
return "hello";
}
});
// 在future上施加事件,当future计算完成后回调该事件,并返回新future
CompletableFuture twoFuture = oneFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String t) {
// 对oneFuture返回的结果进行加工
Thread.sleep(1000);
System.out.println("---after oneFuture over doSomething---" + t);
}
});
// 3.同步等待twoFuture对应的任务完成,返回结果固定为null
System.out.println(twoFuture.get());
基于 thenApply 实现 有前者执行结果 有返回值 的回调
- 可以在回调方法 apply(String t) 的参数t中获取 oneFuture 对应的任务结果。
- 由于 apply(String t) 方法有返回值,所以在 twoFuture 上调用 get() 方法最终也会返回回调方法返回的值。
// 创建有返回值的异步执行
CompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello";
}
});
// 在future上施加事件,当future计算完成后回调该事件,并返回新future
CompletableFuture<String> twoFuture = oneFuture.thenApply(new Function<String, String>() {
// 对oneFuture计算结果基础上进行计算,这里t为oneFuture返回的hello
@Override
public String apply(String t) {
// 返回加工后结果
return t + " jiduo";
}
});
- 默认情况下oneFuture对应的异步任务和在oneFuture上添加的回调事件都是使用ForkJoinPool.commonPool()中的同一个线程来执行的,大家可以使用thenApplyAsync(Function<?super T,?extends U>fn,Executor executor)来指定设置的回调事件使用自定义线程池线程来执行,也就是oneFuture对应的任务与在其上设置的回调执行将不会在同一个线程中执行。
CompletableFuture<String> threeFuture = twoFuture.thenApplyAsync(t2->{
return t2+"three";
},poolExecutor);
基于 whenComplete 设置回调函数,当异步任务执行完毕后进行回调,不会阻塞调用线程
- 在返回的 future 上调用 whenComplete 设置一个回调函数,然后 main 线程就返回了。在整个异步任务的执行过程中,main 函数所在线程是不会被阻塞的,等异步任务执行完毕后会回调设置的回调函数,在回调函数内,如果发现异步任务执行正常则打印执行结果,否则打印异常信息。
- 这里 join() 挂起了 main 函数所在线程,是因为具体执行异步任务的是 ForkJoin 的 commonPool 线程池,其中线程都是 Deamon 线程,所以,当唯一的用户线程 main 线程退出后整个JVM进程就退出了,会导致异步任务得不到执行
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
Thread.sleep(1000);
// 返回计算结果
return "hello,Raines";
}
});
//
future.whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(String t, Throwable u) {
// 如果没有异常,打印异步任务结果
if (u == null) {
System.out.println("1");
} else {
// 打印异常信息
System.out.println(u.getLocalizedMessage());
}
}
});
// 挂起当前线程,等待异步任务执行完毕
Thread.currentThread().join();
CompletableFuture 实战 –提升篇
多个 CompletableFuture 进行组合运算
CompletableFuture 功能强大的原因之一是其可以让两个或者多个 Completable-Future 进行运算来产生结果
基于 thenCompose 实现当一个 CompletableFuture 执行完毕后,可把第一个 Completable 的结果作为参数执行另外一个 CompletableFuture
- 定义异步任务,返回 CompletableFuture
// 开启异步任务one
public static CompletableFuture<String> doSomethingOne(String encodedCompanyId) {
return CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
Thread.sleep(1000);
// 返回结果
String id = encodedCompanyId + ":raines";
return id;
}
});
}
// 开启异步任务two
public static CompletableFuture<String> doSomethingTwo(String companyId) {
return CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
Thread.sleep(3000);
// 返回结果
String str = companyId + "xiaolong";
return str;
}
});
}
- 首先调用方法doSomethingOne(“123”)开启了一个异步任务,并返回了对应的CompletableFuture对象,我们取名为future1。
然后在future1的基础上调用了thenCompose方法,企图让future1执行完毕后,激活使用其结果作为doSomethingTwo(String companyId)方法的参数的任务。
CompletableFuture result = doSomethingOne("123").thenCompose(id -> doSomethingTwo(id));
result.get();
基于 thenCombine 实现当两个并发运行的 CompletableFuture 任务都完成后,使用两者的结果作为参数再执行一个异步任务
- 等doSomethingOne和doSomethingTwo都完成后,使用它们的结果做一件事
CompletableFuture result = doSomethingOne("123").thenCombine(doSomethingTwo("456"), (one, two) -> {
return one + " " + two;
});
result.get()
基于allOf等待多个并发运行的CompletableFuture任务执行完毕
public static void allOf() throws InterruptedException, ExecutionException {
// 1.创建future列表
List<CompletableFuture<String>> futureList = new ArrayList<>();
futureList.add(doSomethingOne("1"));
futureList.add(doSomethingOne("2"));
// 2.转换多个future为一个
CompletableFuture<Void> result = CompletableFuture
.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
// 3.等待所有future都完成
System.out.println(result.get());
// 4.等所有future执行完毕后,获取所有future的计算结果
CompletableFuture<List<String>> finallyResult = result.thenApply(new Function<Void, List<String>>() {
@Override
public List<String> apply(Void t) {
return futureList.stream().map(future -> future.join()).collect(Collectors.toList());
}
});
// 5.打印所有future的结果
for (String str : finallyResult.get()) {
System.out.println(str);
}
}
基于anyOf等多个并发运行的CompletableFuture任务中有一个执行完毕就返回
public static void anyOf() throws InterruptedException, ExecutionException {
// 1.创建future列表
List<CompletableFuture<String>> futureList = new ArrayList<>();
futureList.add(doSomethingOne("1"));
futureList.add(doSomethingOne("2"));
futureList.add(doSomethingTwo("3"));
// 2.转换多个future为一个
CompletableFuture<Object> result = CompletableFuture
.anyOf(futureList.toArray(new CompletableFuture[futureList.size()]));
// 3.等待某一个future完成
System.out.println(result.get());
}
当 Stream 遇见 CompletableFuture(假设读者已经熟练使用 stream 基本操作
实战:消费端对服务提供方集群中的某个服务进行广播调用(轮询调用同一个服务的不同提供者的机器)
- 模拟调用过程
public static String rpcCall(String ip, String param) {
System.out.println(ip + " rpcCall:" + param);
Thread.sleep(1000);
return param;
}
- 通过 map 操作使用 CompletableFuture.supplyAsync 进行异步调用
把同步调用结果转换为了CompletableFuture对象,所以操作符map返回的是一个CompletableFuture,然后collect操作把所有的CompletableFuture对象收集为list后返回。
代码3从 futureList 获取流,然后使用 map 操作符把 future 对象转换为 future 的执行结果,这里是使用 future 的 join 方法来阻塞获取每个异步任务执行完毕,然后返回执行结果,最后使用 collect 操作把所有的结果收集到 resultList
注意:具体这10个rpc请求是否全部并发运行取决于CompletableFuture内线程池内线程的个数,如果你的机器是单核的或者线程池内线程个数为1,那么这10个任务还是会顺序执行的。
// 1.生成ip列表
List<String> ipList = new ArrayList<String>();
for (int i = 1; i <= 10; ++i) {
ipList.add("192.168.0." + i);
}
// 2.并发调用
List<CompletableFuture<String>> futureList = ipList.stream()
.map(ip -> CompletableFuture.supplyAsync(() -> rpcCall(ip, ip))).collect(Collectors.toList());
// 3.获取结果
List<String> resultList = futureList.stream().map(future -> future.join()).collect(Collectors.toList());
// 输出结果
resultList.stream().forEach(r -> System.out.println(r));
异常处理
上面的代码都是当异步任务内可以正常设置任务结果时的情况,但是情况并不总是这样的。如果因为异常而没有正常调用 complete 方法设置结果,将导致调用 get 方法时一直阻塞。
所以我们不仅需要考虑正常设置结果的情况,还需要考虑异常的情况,其实 CompletableFuture 提供了 completeExceptionally 方法来处理异常情况。
- 当出现异常时把异常信息设置到 future 内部,这样调用 get() 时就会在抛出异常后终止。
// 1.创建一个CompletableFuture对象
CompletableFuture<String> future = new CompletableFuture<String>();
// 2.开启线程计算任务结果,并设置
new Thread(() -> {
try {
// 2.1 抛出异常
if (true) {
throw new RuntimeException("excetion test");
}
// 2.2设置正常结果
future.complete("ok");
} catch (Exception e) {
// 2.3 设置异常结果
future.completeExceptionally(e);
}
System.out.println("----" + Thread.currentThread().getName() + " set future result----");
}, "thread-1").start();
// 3.等待计算结果
//System.out.println(future.get());
System.out.println(future.exceptionally(t -> "default").get());// 当出现异常时返回默认值
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!