跳到主要内容

05-CompletableFuture 由浅入深

CompletableFuture 是 Java 8 引入用于支持异步编程和非阻塞操作的类。

  • Completable:可完成
  • Future:未来/将来

这两个单词体现了它设计的目的:提供一种可完成的异步计算。

classDiagram
`Future<T>` <|.. `CompletableFuture<T>` : 实现
`CompletionStage<T>` <|.. `CompletableFuture<T>` : 实现

Future 接口

CompletableFuture实现自JDK 5出现的Future接口,该接口属于java.util.concurrent包,这个包提供了用于并发编程的一些基础设施,其中就包括 Future 接口。Future接口的目的是表示异步计算的结果,它允许你提交一个任务给一个 Executor(执行器),并在稍后获取任务的结果。尽管 Future 提供了一种机制来检查任务是否完成、等待任务完成,并获取其结果,但它的设计也有一些局限性,比如无法取消任务、无法组合多个任务的结果等。

Future接口为CompletableFuture提供了以下功能:

  • 异步任务的提交:通过Future的接口,可以提交异步任务,并在稍后获取任务的结果,这是 Future 接口最基本的功能之一。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
  • 检查任务完成状态: 使用 isDone 方法可以检查任务是否已经完成。

    boolean isDone = future.isDone();
  • 等待任 务完成: 通过get方法,阻塞当前线程,直到异步任务完成并获取其结果。

System.out.println("main Thread"); //开启异步线程 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello"); //阻塞异步线程执行完成 String result = future.get();
  • 取消任务: 通过 cancel 方法,你可以尝试取消异步任务的执行。这是 Future 接口的一项功能,但在实际使用中,由于限制和不确定性,这个方法并不总是能够成功取消任务。
boolean canceled = future.cancel(true);

CompletionStage接口

CompletableFuture同时也实现自CompletionStage接口,CompletionStage 接口是 Java 8 中引入的,在CompletableFuture中用于表示一个步骤,这个步骤可能是由另外一个CompletionStage触发的,随当前步骤的完成,可以触发其他CompletionStage的执行。CompletableFuture 类实现了 CompletionStage 接口,因此继承了这些功能。以下是 CompletionStageCompletableFuture 提供的一些关键功能:

1、链式操作:CompletionStage 定义了一系列方法,如 thenApply, thenAccept, thenRun,允许你在一个异步操作完成后,基于其结果进行进一步的操作。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");  CompletableFuture<Integer> lengthFuture = future.thenApply(String::length);

2、组合多个阶段CompletionStage 提供了 thenCombine, thenCompose, thenAcceptBoth 等方法,用于组合多个阶段的结果,形成新的 CompletionStage

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "word");
CompletableFuture<String> combineFuture = future1.thenCombine(future2, (s1, s2) -> s1 + s2);

3、异常处理CompletionStage 提供了一系列处理异常的方法,如 exceptionally, handle,用于在异步计算过程中处理异常情况。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Test Error!");
});
CompletableFuture<String> resultFuture = future.exceptionally(ex -> "Handle some exception " + ex.getMessage());

4、顺序执行thenApply, thenAccept, thenRun 等方法可以用于在上一个阶段完成后执行下一个阶段,形成顺序执行的链式操作。

image-20240715234255503

CompletableFuture-tryFire

tryFire 方法是 CompletableFuture 内部的一个关键方法,用于尝试触发异步操作链中的下一个阶段。这个方法的主要作用是在合适的时机执行异步操作链中的后续阶段,将计算结果传递给下一个阶段。

为什么先介绍这个方法呢?因为这个方法的大部分API都是基于该方法的基础上实现的。

/**
* Performs completion action if triggered, returning a
* dependent that may need propagation, if one exists.
*
* @param mode SYNC, ASYNC, or NESTED
*/
abstract CompletableFuture<?> tryFire(int mode);

1、触发方式( mode ):

tryFire 方法接收一个 mode 参数,表示触发的方式。常见的触发方式包括同步触发(SYNC)、异步触发(ASYNC)以及嵌套触发(NESTED)。

2、触发下一个阶段:

tryFire 方法中,通过 next 字段获取下一个阶段的引用,然后调用下一个阶段的 tryFire 方法,将当前阶段的计算结果传递给下一个阶段。

3、递归触发:

tryFire 方法可能会递归调用下一个阶段的 tryFire 方法,以确保整个异步操作链中的阶段能够依次触发。这个递归调用保证了异步操作链的串联执行。

4、触发逻辑的条件判断:

tryFire 方法中通常还包含一些条件判断,用于确定是否应该触发后续的操作。例如,可能会检查当前阶段的状态,如果满足触发条件,则继续触发。

总体而言,tryFire 方法是 CompletableFuture 异步操作链中触发后续阶段的核心方法。通过递归调用,它实现了异步操作链的顺序执行,确保了各个阶段按照期望的顺序执行,并将计算结果传递给下一个阶段。

CompletableFuture结构

字段和常量定义

字段定义

  • result:存储异步计算的结果
  • stack:存储观察者链
  • NEXT:异步调用链中观察者链的管理

常量定义

// Modes for Completion.tryFire. Signedness matters.
static final int SYNC = 0;
static final int ASYNC = 1;
static final int NESTED = -1;

这三个变量用于Completion类中tryFire方法的标志,表示不同的触发模式。

  • SYNC: 表示同步触发(默认触发方式),即当前计算完成后直接执行后续的操作。适用于当前计算的结果已经准备好并且可以直接进行下一步操作的情况。
  • AYSNC: 表示异步触发,当前计算完成后将后续的操作提交到异步线程池中执行。即当前计算完成后将后续的操作提交到异步线程池中执行。适用于需要在不同线程上执行后续操作的情况。
  • NESTED: 嵌套触发,通常表示当前阶段的触发是由另一个阶段触发的,因此无需再次触发后续操作。在某些情况下,可能会避免重复触发。

内部类定义

CompletableFuture 类包含多个内部类,这些内部类用于为CompletableFuture提供不同的API而设计的,用于异步编程中的不同阶段和操作。

常用内部类列举:

1、UniCompletionBiCompletion

UniCompletionBiCompletion 是用于表示异步操作链中的单一阶段和二元阶段的基础抽象类。它们提供了一些通用的方法和字段,用于处理阶段之间的关系,尤其是观察者链的构建和触发。

2、UniApplyUniAcceptUniRun

UniApplyUniAcceptUniRunUniCompletion 的具体子类,分别用于表示异步操作链中的 thenApplythenAcceptthenRun 阶段。它们实现了具体的 tryFire 方法,用于触发阶段的执行。

3、BiApplyBiAcceptBiRun

BiApplyBiAcceptBiRunBiCompletion 的具体子类,分别用于表示异步操作链中的 thenCombinethenAcceptBothrunAfterBoth 阶段。它们同样实现了具体的 tryFire 方法。

4、OrApplyOrAcceptOrRun

OrApplyOrAcceptOrRunBiCompletion 的另一组具体子类,用于表示异步操作链中的 applyToEitheracceptEitherrunAfterEither 阶段。同样,它们实现了具体的 tryFire 方法。

5、Async

AsyncCompletableFuture 内部用于表示异步操作的标志类,用于表示某个阶段需要异步执行。例如,在调用 supplyAsyncrunAsync 等方法时,会生成一个带有 Async 标志的阶段。

异步编程模型

状态转换

volatile Object result;       // Either the result or boxed AltResult volatile Completion stack;    // Top of Treiber stack of dependent actions

CompletableFuture中定义了两个属性:result、stack,result用于表示执行的结果或异常,stack用于表示执行完当前任务后触发的其他步骤。

CompletableFuture中包含两个字段:resultstack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。

image-20240716000900219

上图为 CF 基本结构

这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。

UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。

BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。

CompletableFuture 中,Completion 对象表示当前的异步操作,它是被观察者。stack 中存储的是后续的步骤对象,这些对象充当观察者的角色。当当前的异步操作执行完成后,会通知 stack 中的观察者获取执行结果。

这种设计允许异步操作的串联,每个步骤都对应一个 Completion 对象,形成了观察者链。当一个异步操作完成时,它会逐一触发 stack 中的观察者对象执行相应的回调函数,实现了链式的异步操作。这个机制是 CompletableFuture 强大异步编程模型的核心之一。

为印证以上结论,我们来看个例子,追踪下源码:

例子:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> thennedAcceptFuture = future.thenAccept(result ->
System.out.println("result: " + result));

以 JDK 11 为例:

image-20240716001320219

CompletableFuturethenAccept方法中直接调用了uniAcceptStage方法,该方法入参是线程池对象和JDK 8出现的函数式接口Consumer,即上文中的result -> {System.out.println("Result: " + result);}),这段代码的作用是获取到上一阶段的计算结果后,将计算结果传递给消费者操作f,在thenAccept方法中将f转换成一个新的CompletableFuture,将uniAccept推入观察者链中,来表示一个新的thenAccept阶段。

源码:

private CompletableFuture<Void> uniAcceptStage(Executor e,
Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
Object r;
if ((r = result) != null)
return uniAcceptNow(r, e, f);
CompletableFuture<Void> d = newIncompleteFuture();
unipush(new UniAccept<T>(e, d, this, f));
return d;
}

以下代码是将给定的Completion对象推入观察者链:

源码:

/**
* Pushes the given completion unless it completes while trying.
* Caller should first check that result is null.
*/
final void unipush(Completion c) {
if (c != null) {
//尝试将 completion.对象c推入观察者链.如果返回fa1se,
//说明推入的过程中观赛者链发生了变化,可能有其他线程正在修改观聚者链
//这种情况下,通过潘环尝试
while (!tryPushStack(c)) {
// result 对象不为空,表示当前 CompletableFuture 对象已完成,计算结果已存在。
if (result != null) {
NEXT.set(c, null);
break;
}
}
if (result != null)
c.tryFire(SYNC);
}
}

/** Returns true if successfully pushed c onto stack. */
final boolean tryPushStack(Completion c) {
Completion h = stack;
NEXT.set(c, h); // CAS piggyback
return STACK.compareAndSet(this, h, c);
}

前提:判断观察者链是否被其他线程修改是通过被保持线程可见性的类、关键字修饰的。JDK 8使用的是volatile关键字实现简单的变量的原子性和线程可见性。在JDK 11中的CompletableFuture使用的是VarHandle类型定义。

源码:

// VarHandle mechanics
private static final VarHandle RESULT;
private static final VarHandle STACK;
private static final VarHandle NEXT;

CompletableFuture线程池

CompletableFuture 类在执行异步操作时,默认使用 ForkJoinPool.commonPool() 作为线程池。这是一个共享的线程池,通常是一个守护线程池,适用于执行异步任务。该线程池的特性包括自动管理线程数量、支持工作窃取(work-stealing)等。

如果你想要使用自定义的线程池,可以通过传递 Executor 对象作为参数来创建 CompletableFuture 实例。

源码:

public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action);
}
// 默认线程池
public Executor defaultExecutor() {
return ASYNC_POOL;
}
// 相关实现
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}

ASYNC_POOL中使用了默认的ForkJoinPool去开启一个线程池。

自定义线程池

CompletableFuture中提供了使用自定义线程池的方法,方法中需要传入一个线程池的接口对象,那么我们就可以传入任何一个实现自Executor接口的线程池。

源码:

public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}

并发控制

CompletableFuture 默认使用共享线程池: ForkJoinPool.commonPool() 作为线程池,通过工作窃取算法提高了任务的并行度,同时使用VarHandlevolatile来保证线程间的可见性和原子操作,以上保证了线程安全和高可用。

文章摘录: juejin.cn/post/7300099522202140709

文章作者: FirstMrRight

💡本文声明

转载请注明出处,谢谢合作!转载本文请声明原文章链接如下:

原文链接: https://zhoujun134.github.io/docs/java/06-2024-07-16-00-25-17-java-CompletableFuture

作者: Z 不殊

Z 不殊 致力于分享有价值的信息和知识。我们尊重并保护知识产权。本文仅代表作者观点,不代表任何立场。 如果本文有所侵权,请联系作者删除或修改!

Loading Comments...