Future、FutureTask、CompletableFuture

Future

Feature接口

public interface Future<V> {

    /**
	 * 取消任务的执行
	 * 如果任务已经完成,或者已经被取消,或者因为其他原因不能被取消,则返回失败
	 * 如果任务在调用时还未启动,那么返回成功
	 * 如果任务已经在执行过程中,则根据参数确定此执行任务的线程能否被中断,来试图停止任务的执行
	 * @param mayInterruptIfRunning
	 * @return
	 */
    boolean cancel(boolean mayInterruptIfRunning);

	/**
     * 判断任务是否已经取消,任务正常完成前将其取消,则返回true
     * @return
     */
    boolean isCancelled();

     /**
     * 判断任务是否已经完成,需要注意的是如果任务正常、异常或取消,都返回true
     * @return
     */
    boolean isDone();
 	/**
     * 等待任务执行结束,并返回结果
     * @return
     * @throws InterruptedException  线程被中断异常
     * @throws ExecutionException 任务执行异常
     */
    V get() throws InterruptedException, ExecutionException;

   /**
     * 等待任务执行结束,并返回结果,同上面get方法的区别是设置了超时时间,
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask介绍

  • Future只是一个接口,无法直接创建对象,因此有了FutureTask。RunnableFuture继承了Runnable和Future接口,而FutureTask实现了RunnableFuture接口。
  • Future是一个接口, FutureTask类是Future 的一个实现类,并实现了Runnable,因此FutureTask可以传递到线程对象Thread中新建一个线程执行。
  • FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable和Future,也就是说FutureTask既是Runnable,也是Future。

    public class FutureTask<V> implements RunnableFuture<V> {
    	......
    }
    
    public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
    }
    

CompletableFuture(组合式异步编程)

  • future存在局限性:Future很难直接表述多个Future 结果之间的依赖性,开发中,我们经常需要达成以下目的:
  • 将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果)
  • 等待 Future 集合中的所有任务都完成。
  • 仅等待 Future 集合中最快结束的任务完成,并返回它的结果。
  • future的缺点也很明显:
  • 实现了异步获取执行结果的需求,但是没有提供通知,无法知道任务什么时候完成;
  • get方法获取结果的时候,会进入等待阻塞状态,这时候又变成同步操作,如果使用isDone循环判断任务是否完成,会耗费cpu资源。

  • complatableFuture弥补了Future模式的缺点,在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接将异步处理的结果交给另外一个异步事件处理线程来处理。

  • CompletableFuture是一个静态辅助方法,以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:

    	//使用ForkJoinPool.commonPool()作为它的线程池执行异步代码,异步操作有返回值
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
     
    //使用指定的thread pool执行异步代码,异步操作有返回值
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
     
    //使用ForkJoinPool.commonPool()作为他的线程池执行异步代码
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
     
    //使用指定的thread pool执行异步代码
    public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }
    
    	//demo
    	//使用ForkJoinPool.commonPool()来执行
    	CompletableFuture<FileTransferResponseDto> responseDtoFuture = CompletableFuture.supplyAsync(()-> uploadFile(file,urn));
    	//使用自定义线程池来执行
    	CompletableFuture<FileTransferResponseDto> responseDtoFuture = CompletableFuture.supplyAsync(()-> uploadFile(file,urn),threadPoolExecutor);
    

run方法不支持返回值。 supply可以支持返回值。

  • CompletableFuture.complete方法: 在执行future.get()方法时,如果执行的结果还没有出来,会一直处于阻塞的状态,这时候可以调用complete方法会立即执行,但是complete方法只会执行一次,对于重复调用只会获取到第一次的结果,同时如果future已经能够返回结果,那么调用complete方法也会无效

    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }
    

CompletableFuture处理方法

  • 串行 then 直译【然后】,也就是表示下一步,所以通常是一种串行关系体现, then 后面的单词(比如 run /apply/accept)的方法的参数为对应的函数式接口,它的作用和那几个函数式接口的作用一样

    CompletableFuture<Void> thenRun(Runnable action)
    CompletableFuture<Void> thenRunAsync(Runnable action)
    CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
          
    <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
          
    CompletableFuture<Void> thenAccept(Consumer<? super T> action) 
    CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
    CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
          
    <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)  
    <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
    <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
    
  • 并行 并行可理解成组合的聚合处理。

  • 组合-and聚合 combine… with… 和 both…and… 都是要求两者都满足,也就是and关系

    <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
         
    <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
    <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
    <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
          
    CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
    CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
    CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
    
  • 组合-or聚合 Either…or… 表示两者中的一个,也就是 Or 的体现

    <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
    <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
    <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)
         
    CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
    CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
    CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
         
    CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
    CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
    CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)
    
  • 异常处理

  • whenComplete 和 handle 的区别在于接受的参数函数式接口不同,前者使用Comsumer, 自然也就不会2有返回值;后者使用 Function,自然也就会有返回值

  • 另外exceptionally->try/catch , whenComplete/handle->try/finally

    CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
    CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn)
    CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)
    
    CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
    CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
    CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
    
    <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
    <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
    <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
    
  • DEMO

  • runAsync,入参为runnable,无结果

  • supplyAsync,异步处理,返回结果

  • thenAccept,串行后续处理上一异步结果,不返回结果

  • thenRun,串行不处理上一异步结果,不返回结果

  • thenApplyAsync为thenRun的异步变体

  • thenCompose,类似于lambda的flatMap,将结果“拍平”,将原本返回CompletableFuture>的结果返回为CompletableFuture

  • thenCombine,and组合俩个独立的结果,有返回CompletableFuture对象

  • allOf和anyOf

  • exceptionally,就相当于 catch,出现异常,将会跳过 thenApply 的后续操作,直接捕获异常,进行一场处理

  • handle,用多线程,良好的习惯是使用 try/finally 范式,handle 就可以起到 finally 的作用

参考:https://blog.csdn.net/Mr_Flouxetin/article/details/107485191