Java 中 CompletableFuture 异步编排实战:从并行任务聚合到超时控制与异常处理
在 Java 后端开发里,很多接口慢,并不一定是“单个 SQL 太慢”,更多时候是一个请求要串好几个下游服务:查用户信息、查订单、查库存、查优惠、算风控……如果全都同步串行,响应时间很容易被拉长。
这类场景里,CompletableFuture 很适合做一件事:把多个互相独立的任务并行化,再把结果编排起来。但真正落地时,往往不是 allOf() 一把梭那么简单,问题会很快变成:
- 并行任务怎么聚合结果?
- 某个任务超时了怎么办?
- 一个任务失败,是整体失败还是局部降级?
- 线程池怎么配,才不至于“异步写了个寂寞”?
- 为什么异常看起来被“吞掉”了?
这篇文章我会用一个完整的示例,把这些问题串起来讲清楚。重点不是 API 罗列,而是如何把它用成生产可控的异步编排工具。
背景与问题
假设我们要实现一个“用户首页聚合接口”,需要同时拿到:
- 用户基本信息
- 最近订单
- 推荐商品
这三项彼此独立,很适合并行执行。目标大概是:
- 总耗时接近三者中的最大值,而不是总和
- 某些非核心模块超时后允许降级
- 核心模块失败时能明确返回错误
- 日志里能看见真正的异常来源
如果直接用同步写法,通常是这样:
UserProfile profile = userService.getProfile(userId);
List<Order> orders = orderService.getRecentOrders(userId);
List<Product> recommendations = recommendationService.recommend(userId);
return new HomePage(profile, orders, recommendations);
问题很明显:三个调用顺序执行,总耗时 = 3 段耗时之和。
而用 CompletableFuture 之后,理想流程应该是下面这样。
flowchart LR
A[请求进入] --> B[并行查询用户信息]
A --> C[并行查询最近订单]
A --> D[并行查询推荐商品]
B --> E[聚合结果]
C --> E
D --> E
E --> F[返回首页数据]
前置知识与环境准备
本文示例基于:
- JDK 8+
- 推荐你至少熟悉:
- Java Lambda
ExecutorService- 基本异常处理
如果你是 JDK 9+,可以直接使用
orTimeout()和completeOnTimeout()。
如果你还在 JDK 8,我后面也会给一个通用超时封装思路。
核心原理
CompletableFuture 本质上做了两件事:
- 表示一个“未来会完成”的结果
- 支持对结果进行链式编排
最常用的几组能力可以这样理解:
1. 创建异步任务
supplyAsync(Supplier):有返回值runAsync(Runnable):无返回值
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> "hello");
2. 串联后续处理
thenApply:拿到上一步结果,做转换thenAccept:消费结果,无返回值thenCompose:把“Future 里再套 Future”拍平thenCombine:合并两个独立任务结果
3. 聚合多个任务
allOf(f1, f2, f3):全部完成后继续anyOf(f1, f2, f3):谁先完成用谁
4. 处理异常
exceptionally:出错时兜底handle:不管成功失败都能处理whenComplete:更偏向记录日志、埋点,不改变结果
5. 指定线程池
这是实战里的重点之一。
如果你不传线程池,默认使用 ForkJoinPool.commonPool()。看起来省事,但在服务端代码里,我通常不建议直接依赖默认线程池,因为:
- 不容易做隔离
- 不方便观测
- 阻塞任务可能拖垮公共线程池
一个完整的编排模型
先看一下这类聚合接口的典型执行路径:
sequenceDiagram
participant Client as 调用方
participant API as 聚合接口
participant TP as 业务线程池
participant U as 用户服务
participant O as 订单服务
participant R as 推荐服务
Client->>API: 请求首页数据
API->>TP: 提交用户信息任务
API->>TP: 提交订单任务
API->>TP: 提交推荐任务
TP->>U: 查询用户信息
TP->>O: 查询最近订单
TP->>R: 查询推荐商品
U-->>TP: 返回结果
O-->>TP: 返回结果
R-->>TP: 超时/成功
TP-->>API: 聚合结果
API-->>Client: 返回响应
这里面最关键的是:不是所有任务都必须同等对待。
举个很实用的策略:
- 用户基本信息:核心数据,失败就整体失败
- 最近订单:重要但可兜底,失败后返回空列表
- 推荐商品:非核心,超时后直接降级为空
这就不是简单的“并行执行”了,而是带业务语义的异步编排。
实战代码(可运行)
下面这个例子可以直接运行。它模拟三个远程调用,并演示:
- 并行聚合
- 超时控制
- 异常兜底
- 自定义线程池
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class CompletableFutureDemo {
private static final ExecutorService BIZ_POOL = new ThreadPoolExecutor(
8,
16,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
r -> {
Thread t = new Thread(r);
t.setName("biz-cf-" + t.getId());
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) {
CompletableFutureDemo demo = new CompletableFutureDemo();
HomePage page = demo.buildHomePage("u1001");
System.out.println(page);
BIZ_POOL.shutdown();
}
public HomePage buildHomePage(String userId) {
long start = System.currentTimeMillis();
CompletableFuture<UserProfile> profileFuture =
CompletableFuture.supplyAsync(() -> getProfile(userId), BIZ_POOL)
.orTimeout(800, TimeUnit.MILLISECONDS);
CompletableFuture<List<Order>> ordersFuture =
CompletableFuture.supplyAsync(() -> getRecentOrders(userId), BIZ_POOL)
.orTimeout(700, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("orders fallback: " + unwrap(ex).getMessage());
return Collections.emptyList();
});
CompletableFuture<List<Product>> recommendFuture =
CompletableFuture.supplyAsync(() -> getRecommendations(userId), BIZ_POOL)
.completeOnTimeout(Collections.emptyList(), 500, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("recommend fallback: " + unwrap(ex).getMessage());
return Collections.emptyList();
});
CompletableFuture<Void> all =
CompletableFuture.allOf(profileFuture, ordersFuture, recommendFuture);
try {
all.join();
HomePage result = new HomePage(
profileFuture.join(),
ordersFuture.join(),
recommendFuture.join()
);
log("buildHomePage success, cost=" + (System.currentTimeMillis() - start) + "ms");
return result;
} catch (CompletionException ex) {
Throwable root = unwrap(ex);
log("buildHomePage failed: " + root.getClass().getSimpleName() + ", " + root.getMessage());
throw new RuntimeException("首页聚合失败", root);
}
}
private UserProfile getProfile(String userId) {
sleep(300);
return new UserProfile(userId, "Alice", 28);
}
private List<Order> getRecentOrders(String userId) {
sleep(600);
if (Math.random() > 0.7) {
throw new RuntimeException("订单服务异常");
}
return Arrays.asList(
new Order("O100", 199.0),
new Order("O101", 299.0)
);
}
private List<Product> getRecommendations(String userId) {
sleep(900); // 故意比 500ms 超时更长
return Arrays.asList(
new Product("P100", "Mechanical Keyboard"),
new Product("P101", "Noise Cancelling Headphones")
);
}
private static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程被中断", e);
}
}
private static Throwable unwrap(Throwable ex) {
if (ex instanceof CompletionException || ex instanceof ExecutionException) {
if (ex.getCause() != null) {
return ex.getCause();
}
}
return ex;
}
private static void log(String msg) {
System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
}
static class HomePage {
private final UserProfile profile;
private final List<Order> orders;
private final List<Product> recommendations;
public HomePage(UserProfile profile, List<Order> orders, List<Product> recommendations) {
this.profile = profile;
this.orders = orders;
this.recommendations = recommendations;
}
@Override
public String toString() {
return "HomePage{" +
"profile=" + profile +
", orders=" + orders +
", recommendations=" + recommendations +
'}';
}
}
static class UserProfile {
private final String userId;
private final String name;
private final int age;
public UserProfile(String userId, String name, int age) {
this.userId = userId;
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "UserProfile{" +
"userId='" + userId + '\'' +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
static class Order {
private final String orderId;
private final double amount;
public Order(String orderId, double amount) {
this.orderId = orderId;
this.amount = amount;
}
@Override
public String toString() {
return "Order{" +
"orderId='" + orderId + '\'' +
", amount=" + amount +
'}';
}
}
static class Product {
private final String productId;
private final String title;
public Product(String productId, String title) {
this.productId = productId;
this.title = title;
}
@Override
public String toString() {
return "Product{" +
"productId='" + productId + '\'' +
", title='" + title + '\'' +
'}';
}
}
}
逐步拆解这段代码
1. 核心任务直接失败,非核心任务降级
CompletableFuture<UserProfile> profileFuture =
CompletableFuture.supplyAsync(() -> getProfile(userId), BIZ_POOL)
.orTimeout(800, TimeUnit.MILLISECONDS);
用户信息是核心数据,所以这里只做超时限制,不做兜底。超时就让它失败。
而订单、推荐则允许降级:
.exceptionally(ex -> Collections.emptyList())
这类写法很适合“可空、可缺省”的模块。
2. allOf() 只负责“等全部完成”,不帮你收集结果
这是一个很容易误解的点。很多人第一次用时以为:
CompletableFuture.allOf(f1, f2, f3)
会直接返回所有结果。实际上不是,它返回的是 CompletableFuture<Void>,意思只是:这些任务都结束了。
所以你后面还得自己取:
all.join();
HomePage result = new HomePage(
profileFuture.join(),
ordersFuture.join(),
recommendFuture.join()
);
如果你嫌这个写法啰嗦,可以自己封装聚合方法。
3. join() 和 get() 怎么选?
两者都能取结果,但区别是:
get()抛受检异常:InterruptedException、ExecutionExceptionjoin()抛非受检异常:CompletionException
在业务代码里,我一般更常用 join(),因为链式代码更顺手,不用到处 try-catch。但也要记住:异常会被包一层,所以你常常要 unwrap。
超时控制的几种方式
超时是异步编排里最容易被忽略,却又最关键的点。
因为“异步”不等于“不会卡住”,如果下游一直慢,你只是把阻塞换了个地方。
1. orTimeout()
超时后直接失败:
future.orTimeout(500, TimeUnit.MILLISECONDS);
适合核心流程,超时就快速报错。
2. completeOnTimeout()
超时后返回默认值:
future.completeOnTimeout(Collections.emptyList(), 500, TimeUnit.MILLISECONDS);
适合推荐列表、标签列表、埋点信息这类非核心数据。
3. JDK 8 的通用超时封装思路
如果你没有 orTimeout(),可以用一个定时任务去完成异常。
import java.util.concurrent.*;
public class TimeoutHelper {
private static final ScheduledExecutorService SCHEDULER =
Executors.newScheduledThreadPool(1);
public static <T> CompletableFuture<T> failAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> promise = new CompletableFuture<>();
SCHEDULER.schedule(() ->
promise.completeExceptionally(new TimeoutException("超时")),
timeout, unit);
return promise;
}
public static <T> CompletableFuture<T> withTimeout(
CompletableFuture<T> future, long timeout, TimeUnit unit) {
return future.applyToEither(failAfter(timeout, unit), t -> t);
}
}
使用方式:
CompletableFuture<String> task =
CompletableFuture.supplyAsync(() -> "result");
CompletableFuture<String> withTimeout =
TimeoutHelper.withTimeout(task, 300, TimeUnit.MILLISECONDS);
异常处理:别只会用 exceptionally
很多团队在 CompletableFuture 里只会一个 exceptionally,然后慢慢把异常处理写成一团。更稳妥的做法,是区分场景。
exceptionally:失败后兜底
future.exceptionally(ex -> defaultValue);
特点:
- 只在异常时执行
- 会把异常转换成正常结果
适合降级返回默认值。
handle:成功失败都能改写结果
future.handle((result, ex) -> {
if (ex != null) {
return defaultValue;
}
return transform(result);
});
适合统一收口,尤其是你想把“成功”和“失败”都映射成一种输出结构时。
whenComplete:更适合记录,不适合兜底
future.whenComplete((result, ex) -> {
if (ex != null) {
log("error: " + ex.getMessage());
}
});
这个方法不会吞掉异常,更适合做:
- 日志
- 指标
- tracing
- 审计
如果你以为 whenComplete 能把异常变成正常结果,通常会踩坑。
多任务编排的几种常见写法
1. 并行后聚合:allOf
最常见,也最适合聚合接口。
2. 两个结果合并:thenCombine
比如先并行查“用户信息”和“会员等级”,再组合成一个对象:
CompletableFuture<UserProfile> f1 = CompletableFuture.supplyAsync(() -> getProfile("u1"));
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> getLevel("u1"));
CompletableFuture<String> combined =
f1.thenCombine(f2, (profile, level) -> profile + ", level=" + level);
3. 依赖上一步结果继续异步:thenCompose
这是避免 Future 嵌套的关键。
错误示例:
CompletableFuture<CompletableFuture<String>> nested =
CompletableFuture.supplyAsync(() -> "u1")
.thenApply(userId -> CompletableFuture.supplyAsync(() -> "detail-" + userId));
正确写法:
CompletableFuture<String> flat =
CompletableFuture.supplyAsync(() -> "u1")
.thenCompose(userId -> CompletableFuture.supplyAsync(() -> "detail-" + userId));
状态变化怎么理解
如果你总觉得 CompletableFuture 的执行过程有点抽象,可以把它看成下面几个状态:
stateDiagram-v2
[*] --> Pending
Pending --> Running: 提交任务
Running --> Success: 正常完成
Running --> Failed: 抛出异常
Running --> TimedOut: 超时完成异常
Failed --> Recovered: exceptionally/handle 降级
TimedOut --> Recovered: completeOnTimeout/兜底
Success --> [*]
Recovered --> [*]
Failed --> [*]
TimedOut --> [*]
理解这个状态图后,很多 API 的职责就更清楚了:
orTimeout:让 Running 进入 TimedOutexceptionally:把 Failed 转成 Recoveredhandle:Success / Failed 都能收口allOf:等所有分支都到终态
常见坑与排查
下面这些问题,我自己和团队里同学都踩过。
坑 1:默认线程池被阻塞,异步反而更慢
如果异步任务里包含:
- HTTP 调用
- JDBC 查询
- Redis 阻塞操作
- 大量
Thread.sleep
那它本质上是阻塞型任务。这时使用 ForkJoinPool.commonPool() 往往不合适。
排查方式
- 看线程名是不是
ForkJoinPool.commonPool-worker-* - 看任务是否存在长时间阻塞
- 看线程池是否出现排队、CPU 利用率不高但 RT 很高
建议
给业务异步任务使用独立线程池,并区分:
- IO 密集型线程池
- CPU 密集型线程池
坑 2:allOf() 之后只知道失败了,不知道谁失败了
allOf() 聚合失败时,异常信息常常不够直观。尤其多任务并行时,你可能只看到一个包装后的 CompletionException。
排查建议
给每个分支都加带上下文的日志:
future.whenComplete((r, ex) -> {
if (ex != null) {
log("profileFuture failed, userId=" + userId + ", ex=" + unwrap(ex).getMessage());
}
});
如果系统里有 traceId,也要一并打印。
坑 3:异常被“吃掉”了
最常见的原因是你在中间用了 exceptionally 返回了默认值,导致链路后面再也看不到失败。
这不一定是错,但你要明确:
一旦兜底,后面拿到的是“正常结果”而不是异常。
如果你既想降级,又想保留可观测性,建议这样写:
future.exceptionally(ex -> {
log("recommend degrade, ex=" + unwrap(ex).getMessage());
return Collections.emptyList();
});
坑 4:任务超时了,但底层调用没停
这个坑非常常见。
orTimeout() 或 completeOnTimeout() 控制的,是 Future 的完成状态,不一定能真正中断底层 IO 调用。
也就是说:
- 上层看起来已经超时返回了
- 但底层线程可能还在跑
这意味着什么?
如果下游调用本身没有超时配置,你的线程资源仍然可能被耗住。
建议
- HTTP 客户端设置连接超时、读超时
- 数据库连接设置查询超时
- 不要把 Future 超时当成唯一超时控制手段
坑 5:链式调用里混用同步/异步方法,线程切换失控
例如:
thenApply():通常在前一个任务完成的线程上继续执行thenApplyAsync():会切到线程池执行
如果你在复杂链路里乱用 Async 后缀,线程切换次数会明显增加,排查时也更乱。
我的经验建议
- 纯计算、轻转换:优先
thenApply - 明确要切线程池隔离时:再用
thenApplyAsync(..., executor)
安全/性能最佳实践
这一节是最偏“生产化”的部分。
1. 一定要用自定义线程池
建议最少做到:
- 核心线程数、最大线程数可配置
- 队列长度可控
- 拒绝策略明确
- 线程名可识别
示例:
ExecutorService pool = new ThreadPoolExecutor(
16, 32,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
r -> {
Thread t = new Thread(r);
t.setName("order-agg-" + t.getId());
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
2. 为每个下游调用设置“双超时”
我通常会强调两个层面的超时:
- 客户端调用超时:真正约束 IO
- CompletableFuture 编排超时:约束聚合链路
两层都要有,缺一不可。
3. 区分核心链路和非核心链路
不要所有任务都“失败即整体失败”,也不要所有任务都“一律降级”。
更实用的思路是:
- 核心:用户身份、库存锁定、支付结果
- 可降级:推荐、标签、画像补充、埋点扩展字段
这其实是业务策略,不只是技术写法。
4. 避免在异步任务里放重 CPU 运算
如果任务本身是 CPU 密集型,比如:
- 大量 JSON 序列化/反序列化
- 大批量加密解密
- 复杂规则计算
那就不要和 IO 任务混在同一个线程池里。否则一边跑计算,一边等网络,很容易互相拖慢。
5. 保留上下文信息
异步之后最难受的事情之一,是日志断链。
建议至少保留:
- userId
- requestId / traceId
- 任务名称
- 开始时间 / 耗时
- 失败原因
如果你的项目用了 MDC,要注意线程切换后上下文可能丢失,需要做传递封装。
6. 不要滥用异步
这是我最后特别想提醒的一点。
不是所有逻辑都值得上 CompletableFuture。
如果只有一个下游调用,或者任务之间强依赖、总耗时本来就短,那为了“显得高级”硬上异步,通常只会:
- 增加复杂度
- 增加排障成本
- 让异常栈更难读
异步编排最适合:多个独立、耗时较高、可并行、可降级的任务。
逐步验证清单
如果你准备把本文的模式搬到自己项目里,可以按这个清单验证:
功能层
- 多个独立任务是否确实已经并行
-
allOf()后是否正确收集了每个结果 - 核心任务失败是否会整体失败
- 非核心任务失败是否能按预期降级
超时层
- 每个下游客户端本身是否设置了超时
- Future 层是否设置了编排超时
- 超时后是否仍有线程长期占用
观测层
- 是否能从日志看出哪个分支失败
- 是否打印了 traceId / requestId
- 是否上报了任务耗时和异常指标
资源层
- 是否使用了独立线程池
- 线程池队列是否可能打满
- 拒绝策略是否符合业务预期
总结
CompletableFuture 真正的价值,不只是“把同步代码改成异步”,而是把原本串行的、互相独立的调用,组织成一个可并行、可超时、可降级、可观测的执行流。
你可以重点记住这几条:
- 并行聚合:用
supplyAsync + allOf - 结果收集:
allOf()不返回结果,要自己join() - 超时控制:核心链路用
orTimeout(),非核心链路可用completeOnTimeout() - 异常处理:
exceptionally兜底,handle收口,whenComplete做日志 - 线程池隔离:服务端不要过度依赖默认公共线程池
- 生产关键点:Future 超时不等于底层 IO 真停,客户端超时也必须配
如果你现在正准备重构一个“聚合查询接口”,我的建议是先从一个简单场景开始:
- 先挑 2~3 个独立下游
- 先做并行化
- 再补超时和降级
- 最后补日志、指标和线程池治理
这样最稳,也最容易看见收益。
很多时候,CompletableFuture 的难点不在 API,而在于你是否把业务优先级、失败策略、资源边界想清楚了。只要这三件事明确,异步编排就会真正变成你的加速器,而不是新的故障源。