Java 中基于 CompletableFuture 的异步编排实战:并行调用、超时控制与异常处理最佳实践
在 Java 后端开发里,CompletableFuture 基本算是“异步编排”的入门必修课。很多系统一开始是串行调用:查用户、查订单、查库存、查营销信息,一个接口顺着一个接口走,逻辑清晰,但响应时间会被最长链路拖垮。
而当业务逐渐复杂后,我们就会遇到几个非常现实的问题:
- 多个远程调用其实互不依赖,能不能并行?
- 某个下游接口很慢,能不能超时后快速返回默认值?
- 并行任务中一个失败了,是直接整体失败,还是部分降级?
- 线程池到底该怎么配,才能不把服务拖死?
这篇文章我会从“做一个聚合接口”这个常见场景出发,带你把 CompletableFuture 用顺:并行调用、超时控制、异常处理、线程池配置,以及线上最容易踩的坑。
背景与问题
假设我们要实现一个“商品详情聚合接口”,一个请求需要同时获取:
- 商品基础信息
- 价格信息
- 库存信息
- 推荐信息
如果串行执行,整体耗时大概是各个调用时间之和:
总耗时 = 商品信息 + 价格 + 库存 + 推荐
假设每个调用平均耗时如下:
- 商品信息:80ms
- 价格:120ms
- 库存:100ms
- 推荐:150ms
串行大约要 450ms,但如果并行,理论上可以接近最长的那个调用,也就是 150ms 左右,再加上一点线程调度和组装开销。
这就是异步编排最直接的价值:把等待时间重叠起来。
不过,真正落地时不能只会 allOf(),因为业务还有这些要求:
- 推荐失败了不影响主流程
- 库存超过 200ms 没结果就走默认值
- 商品基础信息失败必须整体失败
- 线程不能无限创建
- 日志里要看得出是哪个子任务超时或异常
前置知识与环境准备
本文示例基于:
- JDK 9+(因为会用到
orTimeout/completeOnTimeout) - 普通 Java 项目即可运行
- 不依赖 Spring,也方便你直接复制验证
如果你在 JDK 8 环境中,后面我也会说明兼容思路。
核心原理
先别急着写代码,先把几个核心 API 的职责理清楚。
1. supplyAsync:异步启动任务
用于有返回值的异步任务:
CompletableFuture.supplyAsync(() -> queryPrice(), executor);
如果是无返回值任务,用 runAsync。
2. thenApply / thenCompose / thenCombine
这是最容易混的地方。
thenApply:把上一个结果做一次同步转换thenCompose:把“异步套异步”拍平thenCombine:合并两个独立 Future 的结果
可以这样理解:
- 有依赖链:优先考虑
thenCompose - 无依赖并行后合并:优先考虑
thenCombine或allOf
3. allOf:等待多个任务完成
CompletableFuture.allOf(f1, f2, f3)
注意:allOf 返回的是 CompletableFuture<Void>,它只表示“都完成了”,不会自动帮你收集结果,结果还得自己从各个 Future 里取。
4. 超时控制:orTimeout 与 completeOnTimeout
orTimeout(timeout, unit):超时则抛异常completeOnTimeout(value, timeout, unit):超时则返回默认值
这两个的语义差别非常关键:
- 核心链路必须成功:用
orTimeout - 可降级链路允许兜底:用
completeOnTimeout
5. 异常处理:exceptionally、handle、whenComplete
它们看起来很像,但职责不同:
exceptionally:异常时给默认值handle:无论成功失败都能处理,并返回新结果whenComplete:更像“旁路通知”,适合打日志,不改变结果
我的经验是:
- 业务降级:优先用
exceptionally或handle - 日志埋点:优先用
whenComplete
一张图看懂异步编排流程
flowchart LR
A[请求进入] --> B[并行发起商品/价格/库存/推荐]
B --> C1[商品信息]
B --> C2[价格信息]
B --> C3[库存信息]
B --> C4[推荐信息]
C1 --> D[等待聚合]
C2 --> D
C3 --> D
C4 --> D
D --> E{关键任务是否成功}
E -- 是 --> F[组装详情DTO]
E -- 否 --> G[返回异常或降级结果]
实战代码(可运行)
下面做一个完整可运行示例。我们模拟四个下游服务,并实现一个商品详情聚合逻辑。
1. 完整示例代码
import java.util.concurrent.*;
import java.util.function.Supplier;
public class CompletableFutureOrchestrationDemo {
private static final ExecutorService IO_POOL = new ThreadPoolExecutor(
8,
16,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
r -> {
Thread t = new Thread(r);
t.setName("io-pool-" + t.getId());
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) {
try {
ProductDetail detail = getProductDetail(1001L);
System.out.println("聚合结果:");
System.out.println(detail);
} finally {
IO_POOL.shutdown();
}
}
public static ProductDetail getProductDetail(Long productId) {
CompletableFuture<ProductInfo> productFuture = CompletableFuture
.supplyAsync(() -> query("商品信息", () -> ProductService.getProductInfo(productId)), IO_POOL)
.orTimeout(300, TimeUnit.MILLISECONDS)
.whenComplete((result, ex) -> log("商品信息完成", ex));
CompletableFuture<PriceInfo> priceFuture = CompletableFuture
.supplyAsync(() -> query("价格信息", () -> PriceService.getPrice(productId)), IO_POOL)
.orTimeout(300, TimeUnit.MILLISECONDS)
.whenComplete((result, ex) -> log("价格信息完成", ex));
CompletableFuture<StockInfo> stockFuture = CompletableFuture
.supplyAsync(() -> query("库存信息", () -> StockService.getStock(productId)), IO_POOL)
.completeOnTimeout(new StockInfo(productId, 0, true), 200, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("库存信息异常,使用默认库存", ex);
return new StockInfo(productId, 0, true);
});
CompletableFuture<RecommendInfo> recommendFuture = CompletableFuture
.supplyAsync(() -> query("推荐信息", () -> RecommendService.getRecommend(productId)), IO_POOL)
.completeOnTimeout(new RecommendInfo(productId, "默认推荐"), 180, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("推荐信息异常,使用默认推荐", ex);
return new RecommendInfo(productId, "默认推荐");
});
CompletableFuture<Void> all = CompletableFuture.allOf(
productFuture, priceFuture, stockFuture, recommendFuture
);
try {
all.join();
} catch (CompletionException ex) {
throw new RuntimeException("商品详情聚合失败: " + rootMessage(ex));
}
ProductInfo product = productFuture.join();
PriceInfo price = priceFuture.join();
StockInfo stock = stockFuture.join();
RecommendInfo recommend = recommendFuture.join();
return new ProductDetail(product, price, stock, recommend);
}
private static <T> T query(String name, Supplier<T> supplier) {
long start = System.currentTimeMillis();
try {
T result = supplier.get();
long cost = System.currentTimeMillis() - start;
System.out.println(name + " 调用成功,耗时 " + cost + "ms");
return result;
} catch (Exception e) {
long cost = System.currentTimeMillis() - start;
System.out.println(name + " 调用失败,耗时 " + cost + "ms, error=" + e.getMessage());
throw e;
}
}
private static void log(String message, Throwable ex) {
if (ex == null) {
System.out.println("[INFO] " + message);
} else {
System.out.println("[ERROR] " + message + " - " + rootMessage(ex));
}
}
private static String rootMessage(Throwable ex) {
Throwable current = ex;
while (current.getCause() != null) {
current = current.getCause();
}
return current.getClass().getSimpleName() + ": " + current.getMessage();
}
static class ProductService {
static ProductInfo getProductInfo(Long productId) {
sleep(80);
return new ProductInfo(productId, "机械键盘");
}
}
static class PriceService {
static PriceInfo getPrice(Long productId) {
sleep(120);
return new PriceInfo(productId, 399.00);
}
}
static class StockService {
static StockInfo getStock(Long productId) {
sleep(250);
return new StockInfo(productId, 20, false);
}
}
static class RecommendService {
static RecommendInfo getRecommend(Long productId) {
sleep(100);
if (productId % 2 == 1) {
throw new RuntimeException("推荐服务不可用");
}
return new RecommendInfo(productId, "猜你喜欢");
}
}
static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程中断");
}
}
static class ProductInfo {
Long productId;
String name;
ProductInfo(Long productId, String name) {
this.productId = productId;
this.name = name;
}
@Override
public String toString() {
return "ProductInfo{productId=" + productId + ", name='" + name + "'}";
}
}
static class PriceInfo {
Long productId;
double price;
PriceInfo(Long productId, double price) {
this.productId = productId;
this.price = price;
}
@Override
public String toString() {
return "PriceInfo{productId=" + productId + ", price=" + price + "}";
}
}
static class StockInfo {
Long productId;
int stock;
boolean fromFallback;
StockInfo(Long productId, int stock, boolean fromFallback) {
this.productId = productId;
this.stock = stock;
this.fromFallback = fromFallback;
}
@Override
public String toString() {
return "StockInfo{productId=" + productId + ", stock=" + stock + ", fromFallback=" + fromFallback + "}";
}
}
static class RecommendInfo {
Long productId;
String content;
RecommendInfo(Long productId, String content) {
this.productId = productId;
this.content = content;
}
@Override
public String toString() {
return "RecommendInfo{productId=" + productId + ", content='" + content + "'}";
}
}
static class ProductDetail {
ProductInfo productInfo;
PriceInfo priceInfo;
StockInfo stockInfo;
RecommendInfo recommendInfo;
ProductDetail(ProductInfo productInfo, PriceInfo priceInfo, StockInfo stockInfo, RecommendInfo recommendInfo) {
this.productInfo = productInfo;
this.priceInfo = priceInfo;
this.stockInfo = stockInfo;
this.recommendInfo = recommendInfo;
}
@Override
public String toString() {
return "ProductDetail{" +
"productInfo=" + productInfo +
", priceInfo=" + priceInfo +
", stockInfo=" + stockInfo +
", recommendInfo=" + recommendInfo +
'}';
}
}
}
代码拆解:这段编排为什么这么写
关键点 1:核心任务与可降级任务分开处理
在示例里:
- 商品信息、价格信息:主链路,必须拿到,所以用
orTimeout - 库存、推荐信息:可降级,所以用
completeOnTimeout
这其实是异步编排最重要的思路之一:不要把所有任务都当成同样重要。
如果你对所有下游都一视同仁,要么过于脆弱,要么过度降级。
关键点 2:异常兜底不要放到最后统一处理
很多人会写成这样:
CompletableFuture.allOf(f1, f2, f3).exceptionally(...)
这通常不够细,因为你已经分不清是哪个任务出了问题,也不方便对不同子任务做不同降级。
更合理的做法是:每个子任务就地定义自己的超时和异常策略。
关键点 3:最终组装阶段用 join()
为什么不用 get()?
get()会抛受检异常,代码会更啰嗦join()抛的是CompletionException,在聚合场景里更顺手
但要注意,join() 抛出的异常常常包了好几层,所以示例里专门写了 rootMessage() 来提取根因。
再看一张时序图:并行调用与超时降级
sequenceDiagram
participant Client as 调用方
participant App as 聚合服务
participant P as 商品服务
participant R as 价格服务
participant S as 库存服务
participant Rec as 推荐服务
Client->>App: 请求商品详情
App->>P: 异步请求商品信息
App->>R: 异步请求价格
App->>S: 异步请求库存
App->>Rec: 异步请求推荐
P-->>App: 80ms 返回成功
R-->>App: 120ms 返回成功
Rec-->>App: 异常/超时
Note over App,Rec: 返回默认推荐
S-->>App: 超过 200ms
Note over App,S: 使用默认库存
App-->>Client: 返回聚合结果
逐步验证清单
如果你想自己跑一遍,建议按下面顺序验证:
场景 1:正常成功
把 RecommendService 里的异常去掉,库存睡眠改为 100ms,观察:
- 所有任务成功
- 总耗时接近最慢任务,而不是总和
场景 2:推荐失败但整体成功
保留推荐异常,观察:
- 推荐任务异常
- 最终接口仍返回默认推荐
场景 3:库存超时降级
将库存调用保持 250ms,观察:
- 库存在
200ms超时后返回默认值 - 总体接口不会被库存拖慢
场景 4:主链路失败导致整体失败
把 ProductService.getProductInfo() 改成抛异常,观察:
all.join()会失败- 整体聚合抛出异常,而不是静默返回残缺数据
常见坑与排查
这部分我建议你认真看。很多线上问题都不是 API 不会用,而是“看起来能跑,实际会炸”。
坑 1:默认线程池用着用着就卡了
如果你直接写:
CompletableFuture.supplyAsync(() -> query());
没有传线程池,默认会走 ForkJoinPool.commonPool()。这在简单 CPU 任务里还行,但在 I/O 密集型场景(远程调用、数据库、缓存)里经常不够用。
问题表现:
- 接口 RT 飙高
- 任务排队
- 看起来像“异步了”,实际更慢
建议:I/O 任务自己配线程池,不要依赖 commonPool。
坑 2:线程池配太大,机器直接抖动
很多人知道不能用默认线程池,于是把线程池改成:
new ThreadPoolExecutor(200, 500, ...)
如果机器核数不高、下游响应又不稳定,这种配置很容易导致:
- 上下文切换剧增
- 内存上涨
- 队列积压
- 下游也被打爆
建议:
- I/O 池大小结合机器资源和下游容量评估
- 一定设置有界队列
- 一定配置拒绝策略
- 监控活跃线程数、队列长度、任务拒绝次数
坑 3:allOf() 完成了,不代表结果没异常
这是非常经典的误区。
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();
如果其中某个 Future 异常了,all.join() 也会异常。但如果你在子任务里已经 exceptionally 兜底了,那么 allOf() 可能会成功完成。
所以排查时要先分清:
- 是子任务已经被吞掉并返回默认值?
- 还是异常一路冒泡到聚合层?
坑 4:异常被“吃掉”,日志里看不到根因
比如:
future.exceptionally(ex -> defaultValue);
这样虽然业务继续执行了,但如果你没有记录异常,线上就只剩“怎么老走默认值”这种玄学问题。
建议:任何降级都要带日志、指标或埋点。
至少记住:
- 哪个任务降级了
- 原始异常是什么
- 是否超时触发
- 降级比例是多少
坑 5:链式调用里混用同步/异步方法,线程行为超出预期
比如:
future.thenApply(...)
.thenApply(...)
.thenApply(...);
这些默认可能沿用上一个任务的执行线程。如果中间处理逻辑变重,可能把 I/O 线程占住。
如果某一步计算明显更重,可以显式使用:
thenApplyAsync(...)
并指定合适线程池。
但也别滥用 Async,否则线程切换过多也会损耗性能。
安全/性能最佳实践
这一部分讲“线上可落地”的建议。
1. 按任务类型拆线程池
不要把所有异步任务都丢进同一个池子。最常见的做法是:
- I/O 调用池
- CPU 计算池
- 定时/调度池
这样某类任务堆积时,不至于把另一类也拖死。
2. 为每个下游设置独立超时
不要只在最外层接口设一个总超时。因为总超时只能告诉你“整体慢了”,不能阻止某个子调用无限拖延。
更好的方式是:
- HTTP 客户端本身设置连接/读超时
CompletableFuture层再补一层业务超时- 主链路和可降级链路区别对待
也就是说,超时要做双层控制。
3. 降级值必须“业务可接受”
默认值不是随便填个 null 就完了。
比如:
- 库存默认值可以是
0,但要标记fromFallback = true - 推荐默认值可以是“暂无推荐”
- 价格一般不能随便降级,可能涉及交易风险
一句话:降级不是技术问题,是业务决策。
4. 保留可观测性
我自己比较推荐在每个子任务上至少打这些信息:
- 任务名
- 耗时
- 是否超时
- 是否异常
- 是否降级
- traceId / requestId
没有这些信息,异步链路一旦出问题,排查体验会非常差。
5. 注意上下文传递
在真实项目中,你可能还会依赖这些上下文:
- TraceId
- 登录用户信息
- MDC 日志上下文
- ThreadLocal 变量
而异步任务切换线程后,这些上下文默认不会自动传递。
所以如果你在线程池任务里发现:
- 日志 traceId 丢了
- 用户上下文为空
- ThreadLocal 失效
不要怀疑人生,这就是线程切换带来的正常现象。需要额外做上下文包装或使用框架能力传递。
6. 对外部依赖增加限流与熔断
CompletableFuture 解决的是编排问题,不是治理全部问题。
如果下游本身不稳定,仅靠异步并不能拯救系统,反而可能把失败并发放大。在线上建议配合:
- 限流
- 熔断
- 隔离
- 重试(慎用,避免放大流量)
一张图总结异常处理策略
stateDiagram-v2
[*] --> 发起异步任务
发起异步任务 --> 成功完成
发起异步任务 --> 超时
发起异步任务 --> 异常失败
超时 --> 核心链路失败
超时 --> 可降级返回默认值
异常失败 --> 核心链路失败
异常失败 --> 可降级返回默认值
成功完成 --> 聚合结果
核心链路失败 --> 返回错误
可降级返回默认值 --> 聚合结果
JDK 8 怎么办?
如果你还在 JDK 8,没有 orTimeout 和 completeOnTimeout,常见做法有两种:
- 配合
ScheduledExecutorService手动构造超时 Future - 使用成熟框架做超时与熔断治理
手动实现并不复杂,但样板代码会明显增多。所以如果条件允许,JDK 9+ 在 CompletableFuture 这一块体验会好很多。
适用边界:什么时候不该强行用 CompletableFuture
虽然它很好用,但也不是所有场景都应该上。
更适合用它的场景
- 聚合多个独立下游接口
- 有明确并行机会
- 需要细粒度超时和降级
- 逻辑上仍是“一次请求内的编排”
不太适合的场景
- 特别复杂的异步工作流,步骤很多、状态很多
- 涉及大量消息驱动、跨服务长事务
- 流式处理和背压需求明显
这些情况下,可能要考虑:
- 消息队列
- Reactor / WebFlux
- 工作流引擎
- 更完整的任务调度系统
总结
如果你只记住三件事,我建议是这三条:
- 先区分任务重要性:核心任务失败就失败,可降级任务才兜底
- 每个子任务就地处理超时和异常:不要把所有问题堆到
allOf()之后 - 一定使用自定义线程池和可观测性:否则异步只会把问题藏起来
CompletableFuture 真正的价值,不只是“把代码改成异步”,而是让你能显式表达并行、依赖、超时、失败和降级策略。这点在聚合接口、推荐接口、首页装配接口里特别实用。
如果你正在改造一个串行接口,我建议按下面顺序落地:
- 第一步:识别可并行的子调用
- 第二步:为每个子调用定义超时和降级策略
- 第三步:接入独立线程池
- 第四步:补齐日志、耗时、异常、降级指标
- 第五步:压测验证线程池与下游承载能力
这样做下来,CompletableFuture 才不只是“会用”,而是真能在线上站住。