有了Runnable,为什么还要Callable?
我们先来看下Callable的接口:
public interface Callable {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
复制代码
第一点是不能返回值,对于 Runnable 而言,它不能返回一个返回值,虽然可以利用其他的一些办法,比如在 Runnable 方法中写入日志文件或者修改某个共享的对象的办法,来达到保存线程执行结果的目的,但这种解决问题的行为千曲百折,属于曲线救国,效率着实不高。
实际上,在很多情况下执行一个线程时,我们都希望能得到执行的任务的结果,也就是说,我们是需要得到返回值的,比如请求网络、查询数据库等。我们看接口中的V就代表返回值。
第二点是不能抛异常,我们看下Callable接口定义的时候throw了Exception,而 Runnable是没有的,Runnable只能这样写,在里面try catch掉:
Runnable runnable = new Runnable() {
/**
* run方法上无法声明 throws 异常,且run方法内无法 throw 出 checked Exception,除非使用try catch进行处理
*/
@Override
public void run() {
try {
throw new IOException();
} catch (IOException e) {
e.printStackTrace();
}
}
}
复制代码
最后对比一下Runnable和Callable
下面就来介绍一下Future,上面说到Callable是可以返回值的,那这个返回值怎么拿呢?就是通过 Future 类的 get 方法来获取 。
因此,Future 就相当于一个存储器,它存储了 Callable 的 call 方法的任务结果。除此之外,我们还可以通过 Future 的 isDone 方法来判断任务是否已经执行完毕了,还可以通过 cancel 方法取消这个任务,或限时获取任务的结果等,总之 Future 的功能比较丰富。
如何创建Future
一种是通过线程池,之前在讲线程池的时候也提到过, 《线程池源码精讲》
ExecutorService service = Executors.newFixedThreadPool(10);
Future future = service.submit(new CallableTask());
//阻塞获得结果
Integer rs = future.get();
复制代码
还有一种是通过FutureTask创建
FutureTask integerFutureTask = new FutureTask<>(new CallableTask());
//启动线程
new Thread(integerFutureTask).start();
//阻塞获得结果
Integer rs=integerFutureTask.get();
复制代码
有了宏观上的认识,我们来看下Future里面的方法:
public interface Future {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码
get() 方法
isDone() 方法
该方法是用来判断当前这个任务是否执行完毕了。需要注意的是,这个方法如果返回 true,则代表执行完成了,如果返回 false 则代表还没完成。
但这里如果返回 true,并不代表这个任务是成功执行的,比如说任务执行到一半抛出了异常。那么在这种情况下,对于这个 isDone 方法而言,它其实也是会返回 true 的,因为对它来说,虽然有异常发生了,但是这个任务在未来也不会再被执行,它确实已经执行完毕了。所以 isDone 方法在返回 true 的时候,不代表这个任务是成功执行的,只代表它执行完毕了。
cancel()方法
isCancelled() 方法
判断是否被取消,它和 cancel 方法配合使用,比较简单。
下面看下FutureTask的类图:
我们看了上面的代码其实也能猜到,既然 futureTask 能丢到 Thread 类里面去执行,那它肯定继承了Runnable接口,实现了run方法;既然能够调用get()方法,肯定是继承了Future接口,与上面的类图吻合。
我们看下源码,看下run()方法,很简单,里面执行的逻辑就是Callable里面的call方法,最终将计算出来的结果保存到outcome里面去,然后唤醒阻塞的线程。
看下get()方法,很简单,如果任务结束完成了,直接把outcome里的值返回,否则加入到阻塞队列,类似于AQS。《ReentrantLock介绍及AQS源码精讲》
最后看下流程图:
上面介绍了Future/Callable的使用和原理,下面介绍下CompletableFuture。
CompletableFuture对Future做了改进,主要是在get()方法上,主线程如果需要依赖该任务执行结果继续后续操作时,不再需要等待,而是可以直接传入一个回调对象,当异步任务执行完成后,自动调用该回调对象,相当于实现了异步回调通知功能。
除此之外,CompletableFuture还提供了非常强大的功能,比如对于回调对象的执行,可以放到非任务线程中执行,也能用任务线程执行;提供了函数式编程能力,简化了异步编程的复杂性;提供了多个CompletableFuture的组合与转化功能。
看下类图,实现了CompletionStage和Future接口。
Future就是上面讲的Future,里面有5个方法。CompletionStage表示任务执行的一个阶段,每个异步任务都会返回一个新的CompletionStage对象,我们可以针对多个CompletionStage对象进行串行、并行或者聚合的方式来进行后续下一阶段的操作,简单来说,就是实现异步任务执行后的自动回调功能。
CompletableFuture提供了四个静态方法来构建一个异步事件,方法如下。
下面看下CompletableFuture的简单用法:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf1 = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":异步执行一个任务");
});
//通过阻塞获取执行结果
System.out.println(cf1.get());
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> "Hello World").thenAccept(result -> {
System.out.println(result);
});
//继续做其他事情
//...
}
复制代码
cf1就是执行一个任务,用的是默认ForkJoinPool的线程池,不带返回值,cf1.get()是阻塞获取值,因为不带返回值,所以获取的是null。
cf2是执行一个带返回值的任务,里面就干一件事return hello world,此时主线程可以继续往下执行做其他事情,待任务执行完以后,thenAccept方法接收到返回的hello world,然后打印出来。
我们可以看下CompletableFuture类里面有38个方法,十分的多,下面和大家分类介绍一下。
CompletableFuture类实现了Future接口,所以它开始可以像Future那样主动通过阻塞或者轮询的方式来获得执行结果。
在CompletableFuture类中还有一个比较有意思的方法 complete(T value) ,它表示完成完成计算, 也就是把 value 设置为CompletableFuture的返回值并且唤醒在上述方法阻塞的线程。
我们看下下面的例子,就是创建两个线程t1和t2,线程里面通过completableFuture.get()方法阻塞,当我们调用cf.complete("Finish")方法的时候,相当于往里面赋值了,get()方法取到值了,才能继续往下走。
public class CompleteExample {
static class ClientThread implements Runnable {
private CompletableFuture completableFuture;
public ClientThread(CompletableFuture completableFuture) {
this.completableFuture = completableFuture;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ":" +
completableFuture.get()); //阻塞
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
CompletableFuture cf = new CompletableFuture();
new Thread(new ClientThread(cf), "t1").start();
new Thread(new ClientThread(cf), "t2").start();
//执行某段逻辑
cf.complete("Finish");
//exception
//cf.completeExceptionally(e);
}
}
复制代码
纯消费类型的方法,指依赖上一个异步任务的结果作为当前函数的参数进行下一步计算,它的特点是不返回新的计算值,这类的方法都包含 Accept 这个关键字。在CompletionStage中包含9个Accept关键字的方法,这9个方法又可以分为三类:依赖单个CompletionStage任务完成,依赖两个CompletionStage任务都完成,依赖两个CompletionStage中的任何一个完成。
//当前线程同步执行
public CompletionStage thenAccept(Consumer<? super T> action);
//使用ForkJoinPool.commonPool线程池执行action
public CompletionStage thenAcceptAsync(Consumer<? super T> action);
//使用自定义线程池执行action
public CompletionStage thenAcceptAsync(Consumer<? super T>
action,Executor executor);
public CompletionStage thenAcceptBoth(CompletionStage<? extends U>
other,BiConsumer<? super T, ? super U> action);
public CompletionStage thenAcceptBothAsync(CompletionStage<?
extends U> other,BiConsumer<? super T, ? super U> action);
public CompletionStage thenAcceptBothAsync(CompletionStage<?
extends U> other,BiConsumer<? super T, ? super U> action,Executor executor);
public CompletionStage acceptEither(CompletionStage<? extends T>
other,Consumer<? super T> action);
public CompletionStage acceptEitherAsync(CompletionStage<? extends T>
other,Consumer<? super T> action);
public CompletionStage acceptEitherAsync(CompletionStage<? extends T>
other,Consumer<? super T> action,Executor executor);
复制代码
thenAccept上面演示过了,下面演示下thenAcceptBoth() 方法,当task1和task2都返回值以后,然后再一起打印出来。
public class AcceptExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture task1 = CompletableFuture.supplyAsync(() -> "AcceptBot");
CompletableFuture task2 = CompletableFuture.supplyAsync(() -> "Message");
task1.thenAcceptBoth(task2, (r1, r2) -> {
System.out.println("result: " + r1 + r2);
});
}
}
复制代码
有返回值类型的方法,就是用上一个异步任务的执行结果进行下一步计算,并且会产生一个新的有返回值的CompletionStage对象。
在CompletionStage中,定义了9个带有返回结果的方法,同样也可以分为三个类型:依赖单个CompletionStage任务完成,依赖两个CompletionStage任务都完成,依赖两个CompletionStage中的任何一个完成。
public CompletionStage thenApply(Function<? super T,? extends U> fn);
public CompletionStage thenApplyAsync(Function<? super T,? extends U>
fn);
public CompletionStage thenApplyAsync(Function<? super T,? extends U>
fn,Executor executor);
public CompletionStage thenCombine(CompletionStage<? extends U>
other,BiFunction<? super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage<? extends
U> other,BiFunction<? super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage<? extends
U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
public CompletionStage applyToEither(CompletionStage<? extends T>
other,Function<? super T, U> fn);
public CompletionStage applyToEitherAsync(CompletionStage<? extends
T> other,Function<? super T, U> fn);
public CompletionStage applyToEitherAsync(CompletionStage<? extends
T> other,Function<? super T, U> fn,Executor executor);
复制代码
thenApply() 方法
这新建一个任务return hello,thenApply在拿到值以后再和world拼接,然后再返回值,然后通过get获取到值。
public class ApplyExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "hello ").thenApply(r -> {
return r + "world";
});
System.out.println(cf.get());
}
}
复制代码
thenCombineAsync() 方法
thenCombineAsync的作用就是将task1和task2的值都拿到以后返回值。
public class CombineDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Combine")
.thenCombineAsync(CompletableFuture.supplyAsync(() -> "Message"), (r1, r2) -> r1 + r2);
System.out.println(cf.get());
}
}
复制代码
也是9个方法
public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
public CompletionStage thenRunAsync(Runnable action,Executor
executor);
public CompletionStage runAfterBoth(CompletionStage<?> other,Runnable
action);
public CompletionStage runAfterBothAsync(CompletionStage<?>
other,Runnable action);
public CompletionStage runAfterBothAsync(CompletionStage<?>
other,Runnable action,Executor executor);
public CompletionStage runAfterEither(CompletionStage<?>
other,Runnable action);
public CompletionStage runAfterEitherAsync(CompletionStage<?>
other,Runnable action);
public CompletionStage runAfterEitherAsync(CompletionStage<?>
other,Runnable action,Executor executor);
复制代码
这里新建两个任务,一个是return Both,一个是return Message,在都执行结束以后,因为run是不消费也不返回的,所以入参为0,不需要你们的参数,也不返回,所以没有return。
public class RunExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cxf = CompletableFuture.supplyAsync(() -> "Both")
.runAfterBoth(CompletableFuture.supplyAsync(() -> "Message"), () -> {
System.out.println("Done");
});
System.out.println(cxf.get());
}
}
复制代码
public CompletionStage thenCompose(Function<? super T, ? extends
CompletionStage> fn);
public CompletionStage thenComposeAsync(Function<? super T, ? extends
CompletionStage> fn);
public CompletionStage thenComposeAsync(Function<? super T, ? extends
CompletionStage> fn,Executor executor);
复制代码
异常处理一共三个方法
这里写在一个例子里面,具体用哪种类型,小伙伴按照具体场景具体选取。
最后CompletableFuture里面的方法十分的多,本文介绍了几个,抛砖引玉,更多的是小伙伴在实际开发过程中慢慢的用,熟能生巧,有些方法缺少应用场景也很难举出例子来,以及这些方法里面传的参数都是函数式接口,java8新特性lambda表达式,这个也是需要学会的,否则会看不懂。感谢收看~
作者:小杰博士
链接:https://juejin.cn/post/7035616558604877855
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
页面更新:2024-03-05
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号