Java 中基于 CompletableFuture 的异步编排实战:从并行调用到超时兜底与异常聚合
在业务系统里,“查多个服务然后拼结果”几乎是家常便饭。比如商品详情页,要同时查商品基础信息、库存、价格、营销标签、用户画像;又比如风控链路,要并行打多个评分服务,再按规则合成结论。
如果还在用串行调用,延迟会被一步步叠加;如果直接上线程池 + Future,代码又很快变得难读、难排错、难兜底。这个时候,CompletableFuture 就很适合登场了。
这篇文章我会带你从一个真实感较强的“聚合查询”场景出发,逐步完成:
- 并行调用多个下游服务
- 为慢接口设置超时兜底
- 收集异常而不是“静默失败”
- 控制线程池,避免把公共线程池打爆
- 写出一段可运行、可扩展、便于排障的异步编排代码
如果你已经会基础的 CompletableFuture API,这篇内容会帮你把它真正落到工程实践里。
背景与问题
先看一个典型需求:构建“订单确认页”聚合接口,需要同时调用:
- 用户服务:获取用户信息
- 库存服务:获取库存状态
- 优惠券服务:获取可用优惠券
- 推荐服务:获取推荐商品
这些请求互相独立,天然适合并发。
串行调用的问题
串行代码大概会像这样:
User user = userService.getUser(userId);
Stock stock = stockService.getStock(skuId);
Coupon coupon = couponService.getBestCoupon(userId);
Recommendation rec = recommendationService.getRecommendation(userId);
假设每个服务平均耗时分别是 80ms、120ms、150ms、100ms,那么总耗时接近:
80 + 120 + 150 + 100 = 450ms
但如果并行执行,理论上接近最长的那个调用时间,再加一些调度开销,大概是:
max(80, 120, 150, 100) ≈ 150ms
差距很明显。
但并发不是“开几个线程”这么简单
实际工程里,问题往往不是“怎么并行”,而是:
- 某个下游超时了,整个接口要不要一起等死?
- 有的结果可以降级,有的结果必须成功,怎么表达?
- 发生多个异常时,日志里只看到第一个,其他错误去哪了?
- 默认线程池能不能直接用?
join()、get()、exceptionally()、handle()到底怎么配?
这些问题,才是 CompletableFuture 真正的实战重点。
前置知识与环境准备
本文示例基于:
- JDK 11+
- Maven/Gradle 均可
- 只使用 JDK 标准库,不依赖第三方异步框架
你需要对以下概念有基本了解:
- Java 线程池
ExecutorService - Lambda 表达式
- 基本异常处理
文中会用到
orTimeout和completeOnTimeout,它们在 JDK 9+ 可用。
核心原理
先把 CompletableFuture 的几个关键角色理清楚,不然后面很容易“API 都认识,组合起来就晕”。
1. 它既是 Future,也是编排器
传统 Future 更像“一个未来会有结果的票据”,你可以等它完成,但不好继续往后编排。
而 CompletableFuture 不只表示结果,还能描述“结果出来以后接着做什么”。
例如:
CompletableFuture.supplyAsync(() -> "hello")
.thenApply(String::toUpperCase)
.thenAccept(System.out::println);
这不是简单拿结果,而是在定义一条异步处理链。
2. 组合方式分三类
串行依赖:thenApply / thenCompose
thenApply:上一步结果做同步转换thenCompose:上一步结果再触发一个新的异步任务
CompletableFuture<User> userFuture = ...
CompletableFuture<Order> orderFuture =
userFuture.thenCompose(user -> fetchOrderAsync(user.getId()));
并行汇聚:thenCombine / allOf / anyOf
thenCombine:两个任务都完成后合并结果allOf:等一组任务全完成anyOf:任意一个完成就继续
结果兜底与异常处理:exceptionally / handle / whenComplete
exceptionally:只在异常时给默认值handle:无论成功失败都能处理并返回新值whenComplete:无论成功失败都能观察,但不改结果
这是很多人第一次写时最容易混淆的地方。
一张图先看清异步编排
下面这个流程图描述的是本文要实现的聚合接口。
flowchart TD
A[接收聚合请求] --> B[并行发起用户/库存/优惠券/推荐查询]
B --> C1[用户服务]
B --> C2[库存服务]
B --> C3[优惠券服务]
B --> C4[推荐服务]
C1 --> D[超时/异常处理]
C2 --> D
C3 --> D
C4 --> D
D --> E[等待全部完成]
E --> F[聚合成功结果]
E --> G[聚合异常列表]
F --> H[返回响应]
G --> H
核心原理再进一步:成功、失败、超时的生命周期
这个状态图有助于理解:一个异步任务不是只有“成功”一种结局。
stateDiagram-v2
[*] --> Created
Created --> Running: supplyAsync/runAsync
Running --> Success: 正常返回
Running --> Failed: 抛出异常
Running --> TimeoutFallback: completeOnTimeout
Running --> TimeoutFailed: orTimeout
Success --> [*]
Failed --> [*]
TimeoutFallback --> [*]
TimeoutFailed --> [*]
实战代码(可运行)
下面我们实现一个完整示例:订单确认页聚合查询。
目标
- 并行发起 4 个请求
- 库存和用户信息是关键数据:异常需要记录
- 优惠券和推荐是可降级数据:超时后给默认值
- 最终返回聚合结果 + 异常列表
完整代码
你可以直接保存为 CompletableFutureOrchestrationDemo.java 运行。
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class CompletableFutureOrchestrationDemo {
private static final ExecutorService BIZ_POOL = new ThreadPoolExecutor(
8,
16,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
r -> {
Thread t = new Thread(r);
t.setName("biz-cf-" + t.getId());
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) {
AggregatedResponse response = buildOrderPage("user-1001", "sku-2002");
log("最终聚合结果:\n" + response);
BIZ_POOL.shutdown();
}
public static AggregatedResponse buildOrderPage(String userId, String skuId) {
List<String> errors = new CopyOnWriteArrayList<>();
CompletableFuture<UserInfo> userFuture = supplyAsyncWithTrace(
"userService",
() -> mockUserService(userId),
BIZ_POOL
).orTimeout(300, TimeUnit.MILLISECONDS)
.whenComplete((r, ex) -> recordError("userService", ex, errors));
CompletableFuture<StockInfo> stockFuture = supplyAsyncWithTrace(
"stockService",
() -> mockStockService(skuId),
BIZ_POOL
).orTimeout(250, TimeUnit.MILLISECONDS)
.whenComplete((r, ex) -> recordError("stockService", ex, errors));
CompletableFuture<CouponInfo> couponFuture = supplyAsyncWithTrace(
"couponService",
() -> mockCouponService(userId),
BIZ_POOL
).completeOnTimeout(CouponInfo.defaultCoupon(), 200, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
errors.add("couponService failed: " + rootMessage(ex));
return CouponInfo.defaultCoupon();
});
CompletableFuture<RecommendInfo> recommendFuture = supplyAsyncWithTrace(
"recommendService",
() -> mockRecommendService(userId),
BIZ_POOL
).completeOnTimeout(RecommendInfo.defaultRecommend(), 180, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
errors.add("recommendService failed: " + rootMessage(ex));
return RecommendInfo.defaultRecommend();
});
CompletableFuture<Void> all = CompletableFuture.allOf(
userFuture, stockFuture, couponFuture, recommendFuture
);
try {
all.join();
} catch (CompletionException ex) {
// 这里只说明至少有一个任务失败,详细异常前面已经分别记录
log("allOf 检测到异常:" + rootMessage(ex));
}
UserInfo user = getSafely(userFuture, null);
StockInfo stock = getSafely(stockFuture, null);
CouponInfo coupon = getSafely(couponFuture, CouponInfo.defaultCoupon());
RecommendInfo recommend = getSafely(recommendFuture, RecommendInfo.defaultRecommend());
boolean partialSuccess = user != null && stock != null;
return new AggregatedResponse(
partialSuccess,
user,
stock,
coupon,
recommend,
new ArrayList<>(errors)
);
}
private static <T> CompletableFuture<T> supplyAsyncWithTrace(
String taskName,
Supplier<T> supplier,
Executor executor
) {
return CompletableFuture.supplyAsync(() -> {
log("开始调用 " + taskName);
long start = System.currentTimeMillis();
try {
T result = supplier.get();
log(taskName + " 调用成功,耗时 " + (System.currentTimeMillis() - start) + "ms");
return result;
} catch (Exception e) {
log(taskName + " 调用失败,耗时 " + (System.currentTimeMillis() - start) + "ms, ex=" + e.getMessage());
throw e;
}
}, executor);
}
private static void recordError(String service, Throwable ex, List<String> errors) {
if (ex != null) {
errors.add(service + " failed: " + rootMessage(ex));
}
}
private static <T> T getSafely(CompletableFuture<T> future, T defaultValue) {
try {
return future.join();
} catch (CompletionException ex) {
return defaultValue;
}
}
private static String rootMessage(Throwable ex) {
Throwable current = ex;
while (current.getCause() != null) {
current = current.getCause();
}
return current.getClass().getSimpleName() + ": " + current.getMessage();
}
private static void log(String msg) {
System.out.println(LocalTime.now() + " [" + Thread.currentThread().getName() + "] " + msg);
}
// ===== mock services =====
private static UserInfo mockUserService(String userId) {
sleep(80);
return new UserInfo(userId, "Alice", "VIP");
}
private static StockInfo mockStockService(String skuId) {
sleep(120);
return new StockInfo(skuId, true, 18);
}
private static CouponInfo mockCouponService(String userId) {
sleep(260); // 故意超时,触发 completeOnTimeout
return new CouponInfo("COUPON-50", 50);
}
private static RecommendInfo mockRecommendService(String userId) {
sleep(100);
if (Math.random() > 0.5) {
throw new RuntimeException("recommend engine busy");
}
return new RecommendInfo(List.of("sku-9001", "sku-9002"));
}
private static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread interrupted");
}
}
// ===== DTOs =====
static class UserInfo {
private final String userId;
private final String name;
private final String level;
public UserInfo(String userId, String name, String level) {
this.userId = userId;
this.name = name;
this.level = level;
}
@Override
public String toString() {
return "UserInfo{userId='" + userId + "', name='" + name + "', level='" + level + "'}";
}
}
static class StockInfo {
private final String skuId;
private final boolean available;
private final int count;
public StockInfo(String skuId, boolean available, int count) {
this.skuId = skuId;
this.available = available;
this.count = count;
}
@Override
public String toString() {
return "StockInfo{skuId='" + skuId + "', available=" + available + ", count=" + count + "}";
}
}
static class CouponInfo {
private final String couponCode;
private final int discount;
public CouponInfo(String couponCode, int discount) {
this.couponCode = couponCode;
this.discount = discount;
}
public static CouponInfo defaultCoupon() {
return new CouponInfo("NO-COUPON", 0);
}
@Override
public String toString() {
return "CouponInfo{couponCode='" + couponCode + "', discount=" + discount + "}";
}
}
static class RecommendInfo {
private final List<String> skuList;
public RecommendInfo(List<String> skuList) {
this.skuList = skuList;
}
public static RecommendInfo defaultRecommend() {
return new RecommendInfo(List.of());
}
@Override
public String toString() {
return "RecommendInfo{skuList=" + skuList + "}";
}
}
static class AggregatedResponse {
private final boolean success;
private final UserInfo userInfo;
private final StockInfo stockInfo;
private final CouponInfo couponInfo;
private final RecommendInfo recommendInfo;
private final List<String> errors;
public AggregatedResponse(boolean success,
UserInfo userInfo,
StockInfo stockInfo,
CouponInfo couponInfo,
RecommendInfo recommendInfo,
List<String> errors) {
this.success = success;
this.userInfo = userInfo;
this.stockInfo = stockInfo;
this.couponInfo = couponInfo;
this.recommendInfo = recommendInfo;
this.errors = errors;
}
@Override
public String toString() {
return "AggregatedResponse{" +
"success=" + success +
", userInfo=" + userInfo +
", stockInfo=" + stockInfo +
", couponInfo=" + couponInfo +
", recommendInfo=" + recommendInfo +
", errors=" + errors +
'}';
}
}
}
逐步拆解这段代码
这部分我不只解释“它能跑”,而是解释“为什么要这么写”。
1. 为每类任务定义不同失败策略
示例里其实做了两个层次的区分:
必须成功的任务
- 用户信息
- 库存信息
它们使用的是:
.orTimeout(...)
.whenComplete(...)
这意味着:
- 超时直接视为失败
- 不静默吞掉异常
- 最终聚合时如果拿不到,就会影响整体成功标记
可以降级的任务
- 优惠券
- 推荐信息
它们使用的是:
.completeOnTimeout(defaultValue, ...)
.exceptionally(ex -> defaultValue)
这意味着:
- 超时给默认值
- 普通异常也给默认值
- 页面仍然可以继续返回
这一点非常重要:不是所有异步任务都该“一刀切”地失败或降级。
2. allOf() 只是“等待全部完成”,不会帮你聚合结果
很多人第一次用 allOf() 会误会它能返回所有结果,实际上它返回的是:
CompletableFuture<Void>
所以常见写法是:
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();
T1 r1 = f1.join();
T2 r2 = f2.join();
T3 r3 = f3.join();
也就是说:
allOf()负责同步点- 各任务结果还是要自己取
3. 为什么异常要“单独记录”而不是只靠 allOf().join()
因为 allOf() 遇到异常时,只会让你知道“有任务失败了”,但它不适合直接承载完整诊断信息。
真实线上排障时,你更想看到:
- 哪个服务失败
- 是超时、连接异常还是业务异常
- 是否有多个服务同时失败
所以我在每个 future 上都挂了自己的错误记录逻辑:
.whenComplete((r, ex) -> recordError("userService", ex, errors));
这样即使多个服务同时挂掉,最终你也能看到聚合后的错误列表,而不是一个模糊的 CompletionException。
异常传播与聚合过程示意
下面用时序图描述一下整个编排链路,尤其是超时和降级是怎么穿过调用链的。
sequenceDiagram
participant Client as 调用方
participant Agg as 聚合服务
participant U as UserFuture
participant S as StockFuture
participant C as CouponFuture
participant R as RecommendFuture
Client->>Agg: 请求订单确认页
Agg->>U: supplyAsync(userService)
Agg->>S: supplyAsync(stockService)
Agg->>C: supplyAsync(couponService)
Agg->>R: supplyAsync(recommendService)
U-->>Agg: 正常结果
S-->>Agg: 正常结果
C-->>Agg: 超时
Agg->>C: completeOnTimeout(defaultCoupon)
R-->>Agg: 异常/正常结果
Agg->>R: exceptionally(defaultRecommend)
Agg->>Agg: allOf().join()
Agg->>Agg: 汇总结果 + 错误列表
Agg-->>Client: 聚合响应
常见坑与排查
这一节很关键。我自己在项目里踩过的坑,基本都集中在下面这些地方。
坑 1:直接用默认线程池 ForkJoinPool.commonPool()
如果你这样写:
CompletableFuture.supplyAsync(() -> query());
它默认可能走 ForkJoinPool.commonPool()。这在 demo 里没问题,但线上往往不建议直接依赖它,原因有几个:
- 公共线程池可能被其他任务共享
- 线程数不一定适合你的 IO 型任务
- 排查问题时线程名不清晰
- 容易出现“看着是异步,实际上阻塞把池占满了”
建议:总是显式传入业务线程池。
CompletableFuture.supplyAsync(() -> query(), bizExecutor);
坑 2:join() 和 get() 混着用,异常处理越来越乱
两者区别:
get()抛受检异常:InterruptedException,ExecutionExceptionjoin()抛非受检异常:CompletionException
在编排代码里,我更建议统一用 join(),这样链式代码更顺手;但你要记得 unwrap 根因。
catch (CompletionException ex) {
Throwable cause = ex.getCause();
}
坑 3:以为 orTimeout() 会中断底层任务
这是一个很常见的误区。
future.orTimeout(200, TimeUnit.MILLISECONDS)
它的含义更接近:
“200ms 之后如果还没完成,就把这个
CompletableFuture标记为超时失败。”
但这不等于底层执行逻辑一定被物理中断。如果你的任务里是阻塞 IO、外部 HTTP 调用、数据库查询,它们可能还在后台继续跑。
这意味着:
- 你需要在下游客户端层面也配置超时
CompletableFuture超时只是编排层超时,不是万能取消器
坑 4:exceptionally() 用多了,异常被悄悄吞掉
例如:
future.exceptionally(ex -> defaultValue);
这会把异常转换成正常结果。如果你前面没打日志、没埋点,最终线上看起来就像“啥事没有,只是数据偶尔为空”。
建议:降级前先记录异常。
future.exceptionally(ex -> {
log.error("xxx failed", ex);
return defaultValue;
});
在本文示例里,我用 errors.add(...) 做了最基础的聚合记录。
坑 5:把 CPU 密集型和 IO 密集型任务塞进同一个线程池
比如:
- 查远程服务:IO 密集型
- 做复杂 JSON 计算/规则引擎:CPU 密集型
如果全扔同一个池里,慢慢就会互相拖垮。
建议:至少按任务类型拆线程池。
坑 6:allOf() 之后再取结果时,忘了某些 future 已经异常完成
比如:
all.join();
User user = userFuture.join();
如果 userFuture 本身失败了,这里还是会抛异常。
所以你要么:
- 统一前置处理,把异常转成默认值
- 要么像示例一样,用
getSafely()做一次集中兜底
排查思路:线上异步编排出问题时怎么查
如果你的聚合接口“时快时慢、偶发空数据、日志里异常不全”,可以按这个顺序排查。
1. 先看线程池
重点看:
- 活跃线程数
- 队列积压长度
- 拒绝策略是否触发
- 是否出现大量阻塞任务
如果线程池打满,再优雅的 CompletableFuture 也没用。
2. 再看每个 future 的超时设置
确认三件事:
- 编排层超时是否合理
- 下游客户端超时是否更短或一致
- 是否存在“编排层已经超时,但下游仍在跑”的情况
3. 检查异常是否被吞掉
排查代码里是否出现:
exceptionally(ex -> defaultValue)
但没有任何日志、指标或 trace 信息。
4. 看聚合逻辑是否把“可降级失败”误判成“整体成功”
例如:
- 用户和库存都失败了,但优惠券和推荐给了默认值,接口却仍返回成功
- 或者反过来,一个非关键推荐服务失败,却把整页搞成失败
这个问题本质不是技术 API 问题,而是业务成功语义没有定义清楚。
安全/性能最佳实践
虽然这篇主题偏异步编排,但真正落地时,安全和性能习惯也非常重要。
1. 不要在异步链路里泄露敏感信息
错误聚合时,经常有人图省事:
errors.add("userService failed: " + ex.getMessage());
如果异常信息里带了:
- 用户手机号
- token
- SQL 参数
- 内部服务地址
那就可能在日志或返回体里泄露出去。
建议:
- 对外返回通用错误描述
- 详细堆栈只进内部日志
- 敏感字段做脱敏
2. 线程池参数要按任务模型配置
粗略经验:
- IO 密集型:线程数可以大于 CPU 核数
- CPU 密集型:线程数接近 CPU 核数即可
但不要机械套公式。你需要结合:
- 平均 RT
- 峰值 QPS
- 阻塞比例
- 队列可接受长度
如果你是聚合接口,线程池过大不一定更好,因为下游服务也有承载上限。
3. 超时时间不要拍脑袋定
一个比较稳妥的思路是:
- 先看下游 P95 / P99 延迟
- 再结合页面 SLA 反推聚合超时
- 关键路径给更严格控制
- 可降级路径给稍宽但有限的超时
例如页面总 SLA 是 300ms,那么你不可能给每个下游都设 500ms。
4. 给异步任务打上可观测信息
至少包括:
- 任务名
- 开始时间、结束时间
- 耗时
- 成功/失败
- 异常类型
- traceId / requestId
示例里我只是用 log() 简化演示,真实项目里建议接入统一日志和链路追踪。
5. 降级要有边界,不要“默认值成瘾”
默认值很好用,但不能什么都默认。
适合降级的:
- 推荐列表为空
- 优惠券列表为空
- 非核心画像数据为空
不适合随意降级的:
- 库存状态
- 支付状态
- 权限校验结果
- 风控判定结果
一句话:能降级的不一定该降级,关键看业务风险。
一个更工程化的封装思路
如果你的项目里异步聚合很多,建议把“超时 + 降级 + 错误记录”抽成统一方法,不要每个接口都手写一遍。
例如封装成:
public static <T> CompletableFuture<T> asyncCall(
String serviceName,
Supplier<T> supplier,
Executor executor,
long timeoutMs,
T defaultValue,
boolean degradable,
List<String> errors
) {
CompletableFuture<T> future = CompletableFuture
.supplyAsync(supplier, executor);
if (degradable) {
return future.completeOnTimeout(defaultValue, timeoutMs, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
errors.add(serviceName + " failed: " + ex.getMessage());
return defaultValue;
});
} else {
return future.orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
.whenComplete((r, ex) -> {
if (ex != null) {
errors.add(serviceName + " failed: " + ex.getMessage());
}
});
}
}
这样做的好处是:
- 失败策略统一
- 代码更短
- 便于统一埋点和限流扩展
当然,它的边界也很明确:
如果某些服务需要非常个性化的恢复逻辑,就不要强行套通用模板。
逐步验证清单
如果你准备在本地边写边验证,可以按这个顺序来。
第一步:验证并行是否生效
- 把 4 个 mock 服务都设置成固定 sleep
- 比较串行总耗时和并行总耗时
预期:并行耗时应接近最长任务的耗时,而不是总和。
第二步:验证超时兜底
- 让
couponService故意 sleep 超过阈值 - 看是否返回默认优惠券
第三步:验证异常聚合
- 让
recommendService随机抛异常 - 看
errors列表里是否能记录到
第四步:验证关键任务失败语义
- 让
userService或stockService超时/抛异常 - 看
success是否变为false
第五步:验证线程池饱和行为
- 把线程池缩小,增加并发请求量
- 观察队列积压、响应时间、拒绝策略是否生效
总结
CompletableFuture 真正的价值,不只是“把同步代码改成异步”,而是让你能明确表达:
- 哪些任务并行执行
- 哪些任务必须成功
- 哪些任务可以超时降级
- 异常如何记录、传播、聚合
- 线程资源如何受控
这篇文章里,我们做了一套相对完整的异步编排实践:
- 用
supplyAsync并行调用多个下游 - 用显式业务线程池隔离资源
- 用
orTimeout处理关键任务超时 - 用
completeOnTimeout+exceptionally处理可降级任务 - 用
allOf做统一等待点 - 用错误列表聚合多个失败原因
如果你现在就要把它落到项目里,我给三条最实用的建议:
- 第一,别偷懒用默认线程池,先把执行器隔离好。
- 第二,先定义业务成功语义,再写降级策略,不要反过来。
- 第三,异常一定要留痕,否则线上只会看到“偶发空数据”,根因永远埋着。
最后补一句边界条件:
CompletableFuture 很适合单机内的异步编排,但如果你的场景已经涉及复杂工作流、重试补偿、跨服务持久化状态流转,那可能就该考虑消息队列、工作流引擎,或者响应式框架了。别把所有问题都硬塞给它。
如果只是做接口聚合、并行调用、超时兜底,这套思路已经足够实用,而且在大多数 Java 服务里都能很快落地。