Java 中基于 CompletableFuture 的异步编排实战:并行任务、超时控制与异常处理
在 Java 后端开发里,很多业务接口本质上都不是“算得慢”,而是“等得多”:等数据库、等缓存、等第三方服务、等消息结果。
如果这些等待是串行的,接口耗时就会被一层层叠加;如果处理不好异常和超时,异步代码还会变成“看起来很高级,排查起来很痛苦”的事故现场。CompletableFuture 正好就是解决这类问题的常用工具。
这篇文章我会从一个真实感比较强的聚合查询场景切入,带你一步步完成:
- 并行执行多个任务
- 设置超时与降级
- 收拢异常,避免线程悄悄失败
- 写出可运行、可维护、可排查的异步编排代码
背景与问题
假设我们要实现一个“用户主页聚合接口”,需要同时获取:
- 用户基本信息
- 用户订单摘要
- 用户优惠券信息
如果串行执行,伪代码大概是这样:
UserProfile profile = userService.getProfile(userId);
OrderSummary orders = orderService.getOrderSummary(userId);
CouponInfo coupons = couponService.getCoupons(userId);
return assemble(profile, orders, coupons);
问题很直接:
- 每个远程调用都要几十到几百毫秒
- 总耗时接近三者之和
- 任意一个服务慢或失败,整体接口就会被拖住
这时候,异步编排的目标就不是“炫技”,而是三个字:更稳、更快。
前置知识与环境准备
本文示例基于:
- JDK 9+(因为会演示
orTimeout/completeOnTimeout) - 标准库
java.util.concurrent.CompletableFuture - 示例代码可直接放在一个
main方法里运行
如果你用的是 JDK 8,也能做同样的事情,只是超时控制需要手动封装,文中后面会提到替代思路。
核心原理
1. CompletableFuture 解决的是什么问题
CompletableFuture 同时具备两层能力:
- Future:代表一个“未来会完成”的结果
- CompletionStage:支持把多个异步阶段串起来做编排
它特别适合下面几类场景:
- 并行拉取多个独立数据源
- 某个任务成功后继续做下游处理
- 给异步链路加统一异常兜底
- 在整体结果返回前完成聚合
2. 常见编排方式怎么选
supplyAsync
有返回值的异步任务。
CompletableFuture<UserProfile> future = CompletableFuture.supplyAsync(() -> userService.getProfile(userId), executor);
runAsync
无返回值任务,比如异步记录日志、发送通知。
thenApply
对上一步结果做同步转换。
thenCompose
把“异步套异步”摊平,避免 CompletableFuture<CompletableFuture<T>>。
thenCombine
合并两个独立任务的结果。
allOf
等待一组任务都完成。
exceptionally / handle / whenComplete
做异常兜底、记录日志、统一收尾。
3. 一张图看懂基本流程
flowchart LR
A[接收请求] --> B[并行查询用户信息]
A --> C[并行查询订单摘要]
A --> D[并行查询优惠券]
B --> E[结果聚合]
C --> E
D --> E
E --> F[返回响应]
4. 异常与超时在链路中的位置
很多同学第一次用 CompletableFuture,更关注“怎么并行”,却忽略了“谁来收异常、谁来做超时兜底”。实际上这两件事才决定代码是否能上生产。
sequenceDiagram
participant Client as 调用方
participant CF as CompletableFuture
participant S1 as 用户服务
participant S2 as 订单服务
participant S3 as 优惠券服务
Client->>CF: 发起聚合请求
CF->>S1: 异步调用
CF->>S2: 异步调用
CF->>S3: 异步调用
S1-->>CF: 返回结果
S2-->>CF: 超时/异常
S3-->>CF: 返回结果
CF->>CF: 异常处理 + 降级
CF-->>Client: 返回聚合结果
实战代码(可运行)
下面我们用一个完整示例,把并行任务、超时控制、异常处理放在一起。
1. 示例目标
实现一个 loadUserDashboard 方法:
- 并行获取用户信息、订单摘要、优惠券
- 订单服务如果超时,返回默认值
- 优惠券服务如果抛异常,记录日志并降级
- 最终拼装成一个统一结果
2. 完整代码
import java.time.LocalTime;
import java.util.concurrent.*;
public class CompletableFutureDemo {
private static final ExecutorService BIZ_EXECUTOR = new ThreadPoolExecutor(
4,
8,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
r -> {
Thread t = new Thread(r);
t.setName("biz-exec-" + t.getId());
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) {
try {
Dashboard dashboard = loadUserDashboard(1001L);
log("最终结果: " + dashboard);
} finally {
BIZ_EXECUTOR.shutdown();
}
}
public static Dashboard loadUserDashboard(Long userId) {
CompletableFuture<UserProfile> profileFuture =
CompletableFuture.supplyAsync(() -> getUserProfile(userId), BIZ_EXECUTOR)
.orTimeout(800, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("用户信息查询失败: " + ex.getMessage());
return UserProfile.defaultProfile(userId);
});
CompletableFuture<OrderSummary> orderFuture =
CompletableFuture.supplyAsync(() -> getOrderSummary(userId), BIZ_EXECUTOR)
.completeOnTimeout(OrderSummary.empty(userId), 500, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("订单摘要查询失败: " + ex.getMessage());
return OrderSummary.empty(userId);
});
CompletableFuture<CouponInfo> couponFuture =
CompletableFuture.supplyAsync(() -> getCouponInfo(userId), BIZ_EXECUTOR)
.orTimeout(700, TimeUnit.MILLISECONDS)
.handle((result, ex) -> {
if (ex != null) {
log("优惠券查询失败,降级为空列表: " + ex.getMessage());
return CouponInfo.empty(userId);
}
return result;
});
CompletableFuture<Dashboard> dashboardFuture =
CompletableFuture.allOf(profileFuture, orderFuture, couponFuture)
.thenApply(v -> new Dashboard(
profileFuture.join(),
orderFuture.join(),
couponFuture.join()
))
.whenComplete((result, ex) -> {
if (ex == null) {
log("聚合完成");
} else {
log("聚合失败: " + ex.getMessage());
}
});
return dashboardFuture.join();
}
private static UserProfile getUserProfile(Long userId) {
sleep(300);
log("用户服务返回");
return new UserProfile(userId, "Alice");
}
private static OrderSummary getOrderSummary(Long userId) {
sleep(900);
log("订单服务返回");
return new OrderSummary(userId, 12);
}
private static CouponInfo getCouponInfo(Long userId) {
sleep(200);
log("优惠券服务开始处理");
throw new RuntimeException("优惠券服务不可用");
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程被中断", e);
}
}
private static void log(String msg) {
System.out.println(LocalTime.now() + " [" + Thread.currentThread().getName() + "] " + msg);
}
static class Dashboard {
private final UserProfile profile;
private final OrderSummary orderSummary;
private final CouponInfo couponInfo;
public Dashboard(UserProfile profile, OrderSummary orderSummary, CouponInfo couponInfo) {
this.profile = profile;
this.orderSummary = orderSummary;
this.couponInfo = couponInfo;
}
@Override
public String toString() {
return "Dashboard{" +
"profile=" + profile +
", orderSummary=" + orderSummary +
", couponInfo=" + couponInfo +
'}';
}
}
static class UserProfile {
private final Long userId;
private final String name;
public UserProfile(Long userId, String name) {
this.userId = userId;
this.name = name;
}
public static UserProfile defaultProfile(Long userId) {
return new UserProfile(userId, "UNKNOWN");
}
@Override
public String toString() {
return "UserProfile{" +
"userId=" + userId +
", name='" + name + '\'' +
'}';
}
}
static class OrderSummary {
private final Long userId;
private final int orderCount;
public OrderSummary(Long userId, int orderCount) {
this.userId = userId;
this.orderCount = orderCount;
}
public static OrderSummary empty(Long userId) {
return new OrderSummary(userId, 0);
}
@Override
public String toString() {
return "OrderSummary{" +
"userId=" + userId +
", orderCount=" + orderCount +
'}';
}
}
static class CouponInfo {
private final Long userId;
private final int availableCoupons;
public CouponInfo(Long userId, int availableCoupons) {
this.userId = userId;
this.availableCoupons = availableCoupons;
}
public static CouponInfo empty(Long userId) {
return new CouponInfo(userId, 0);
}
@Override
public String toString() {
return "CouponInfo{" +
"userId=" + userId +
", availableCoupons=" + availableCoupons +
'}';
}
}
}
3. 这段代码里最关键的几个点
并行执行
三段 supplyAsync(...) 是独立启动的,不会相互等待。
订单超时降级
.completeOnTimeout(OrderSummary.empty(userId), 500, TimeUnit.MILLISECONDS)
意思是:如果 500ms 内没拿到结果,就直接用默认值完成。
这很适合“非核心字段”,比如推荐位、营销信息、统计摘要。
优惠券异常兜底
.handle((result, ex) -> {
if (ex != null) {
return CouponInfo.empty(userId);
}
return result;
})
handle 不管成功失败都会执行,因此很适合统一收口。
聚合阶段使用 allOf
CompletableFuture.allOf(profileFuture, orderFuture, couponFuture)
它本身只表示“都完成了”,不直接返回聚合结果,所以后面要用 join() 把每个 future 的结果取出来。
逐步验证清单
如果你准备把这套写法放进项目,我建议按下面顺序自己跑一遍:
- 三个任务都成功,确认是并行执行
- 让其中一个任务抛异常,确认是否能正常降级
- 让其中一个任务超时,确认响应时间是否被控制住
- 把线程池大小调小,看是否出现排队
- 观察日志里是否能定位到底哪个阶段慢、哪个阶段失败
这是我自己比较常用的一套验证方式,能尽早发现“代码能跑,但线上不稳”的问题。
核心原理再深入一点:thenApply、thenCompose、thenCombine
这三个方法很容易混。
thenApply:同步转换结果
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> "alice")
.thenApply(String::toUpperCase);
输入一个值,输出另一个值,但这一步本身不是新的异步任务。
thenCompose:串联异步任务
比如先查用户,再根据用户等级查推荐:
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> "VIP")
.thenCompose(level ->
CompletableFuture.supplyAsync(() -> "recommend for " + level)
);
如果你用 thenApply 返回另一个 CompletableFuture,就会得到嵌套结构,不好处理;thenCompose 就是专门拿来“拆平”的。
thenCombine:合并两个独立异步结果
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> combined = a.thenCombine(b, (x, y) -> x + y);
当两个任务互不依赖,但结果要一起使用时,thenCombine 很顺手。
一张图理解三者区别
flowchart TD
A[上游结果] --> B[thenApply: 同步转换]
A --> C[thenCompose: 继续发起异步任务]
D[任务1结果] --> E[thenCombine: 合并两个结果]
F[任务2结果] --> E
常见坑与排查
这一节非常重要。我见过不少项目用了 CompletableFuture,结果问题不是出在 API 不会用,而是出在细节上。
坑 1:默认线程池用得太随意
如果你没显式传 executor,很多异步任务会落到 ForkJoinPool.commonPool()。
这在 demo 里没问题,在生产环境里常常有风险:
- 任务类型混杂
- 有阻塞 I/O
- 线程数不受控
- 某一类任务把公共线程池占满
建议:业务异步任务用独立线程池,不要全靠默认池。
坑 2:allOf 完成了,不代表你拿到了业务结果
很多人第一次写会这样:
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();
然后以为结果已经有了。其实 allOf 只保证“都结束”,具体值还得自己取:
Result r1 = f1.join();
Result r2 = f2.join();
Result r3 = f3.join();
坑 3:join() 和 get() 混着用,异常处理变乱
get()抛的是受检异常:InterruptedException,ExecutionExceptionjoin()抛的是运行时异常:CompletionException
在链路里我更常用 join(),代码更干净;但前提是你要知道异常被包了一层。
排查时建议看根因:
try {
future.join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
cause.printStackTrace();
}
坑 4:超时只是 Future 超时,不一定能中断底层任务
这是一个非常容易误解的点。
orTimeout / completeOnTimeout 控制的是 CompletableFuture 的完成状态,不一定能真正取消底层正在执行的远程调用。
也就是说:
- 调用方可能已经返回默认值了
- 但底层线程可能还在跑
- 如果底层是阻塞 I/O,没有超时配置,资源仍然会继续占着
所以,应用层 Future 超时 和 RPC/HTTP/数据库客户端超时 必须一起配。
坑 5:异常被吞掉,日志里什么都没有
比如:
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("boom");
});
如果你既不 join/get,也不加异常处理,这个异常很可能不会以你预期的方式暴露出来。
建议:
- 每条重要链路都要有
exceptionally/handle/whenComplete - 日志要带请求 ID、用户 ID、阶段名
- 不要让关键任务“悄悄失败”
一个实用的排查思路
当异步聚合接口变慢时,我一般按这个顺序查:
- 线程池是否打满
- 队列是否堆积
- 是不是某个下游服务频繁超时
- 异常是否被统一记录
- 降级逻辑是否真的生效
- 底层 HTTP/RPC 客户端有没有设置连接和读超时
如果只看接口总耗时,不拆阶段,往往很难定位。
安全/性能最佳实践
1. 为不同类型任务隔离线程池
不要把数据库查询、HTTP 调用、CPU 密集计算全扔进一个线程池。
常见建议:
- I/O 密集型:线程数可适度大一些
- CPU 密集型:线程数接近 CPU 核数
- 核心链路与非核心链路分池
这样一个推荐服务变慢,不至于把订单主链路拖死。
2. 线程池参数不要只会“无脑加大”
很多事故不是线程太少,而是线程太多导致:
- 上下文切换严重
- 下游被打爆
- 队列堆积更隐蔽
- GC 压力增大
建议至少明确这几个参数的含义:
corePoolSizemaximumPoolSizequeueCapacityRejectedExecutionHandler
并结合压测数据调,不要拍脑袋。
3. 给每个下游设置边界时间
如果一个聚合接口总 SLA 是 800ms,你不能给每个下游都配 800ms 超时,那样整体肯定收不住。
更合理的思路是:
- 用户信息:300ms
- 订单摘要:200ms
- 优惠券:150ms
- 预留聚合与序列化开销
要有整体预算意识,而不是每个服务“各超各的”。
4. 区分核心数据和可降级数据
不是所有失败都该一视同仁。
例如:
- 用户实名认证状态:核心,不能乱降级
- 推荐商品:可降级
- 营销角标:可直接返回空
如果核心字段失败却被你悄悄吞掉,业务可能比接口报错更危险。
5. 日志和监控要围绕“阶段”来做
我很建议给异步编排加这种维度:
- 阶段名:profile/order/coupon
- 开始时间、结束时间
- 是否超时
- 是否降级
- 异常类型
- 线程池活跃线程数、队列长度
这样你在监控平台上会很容易看出:到底是线程池问题,还是某个下游慢。
6. 注意上下文传递
在线上项目里,你可能还需要传递这些上下文:
- TraceId
- 用户身份
- 租户信息
- MDC 日志上下文
CompletableFuture 切线程后,这些上下文默认不一定自动带过去。如果项目依赖链路追踪或审计日志,这一点一定要补齐。
一个更贴近生产的编排建议
如果你要做的是接口聚合,我通常建议按下面这个原则组织:
- 先划分核心与非核心任务
- 核心任务失败直接失败或明确报错
- 非核心任务允许超时降级
- 每个任务独立记录耗时与异常
- 统一在聚合层收口,不要到处 scattered 地 try-catch
这会让代码结构清晰很多。异步编排最怕的不是 API 多,而是逻辑散。
什么时候不适合用 CompletableFuture
虽然它很好用,但也不是所有场景都适合。
不太适合的情况
- 链路极其复杂,分支和回滚很多
- 需要强事务一致性
- 大量流式处理、背压控制
- 团队对异步调试经验不足
这时候可以考虑:
- 更清晰的任务编排框架
- 响应式方案
- 消息队列解耦
- 显式工作流引擎
CompletableFuture 更适合中等复杂度、单机内编排、请求内聚合这类问题。
总结
CompletableFuture 的价值,不只是“把几个任务并行起来”,而是帮我们把异步流程写成可组合、可控制、可降级的结构。
这篇文章你可以直接带走的几个实用结论是:
- 并行任务:独立任务用
supplyAsync+allOf - 结果组合:简单场景用
thenCombine,复杂聚合用allOf后join - 超时控制:优先用
orTimeout/completeOnTimeout - 异常处理:链路里至少放一个
handle、exceptionally或whenComplete - 线程池:生产环境尽量显式配置,不要依赖默认公共线程池
- 边界意识:Future 超时不等于底层调用真的停了,RPC/HTTP 客户端超时也要配
如果你现在正准备把一个串行聚合接口改造成异步版,我建议你按这个顺序落地:
- 先并行化独立调用
- 再补超时
- 再补异常与降级
- 最后加监控与线程池治理
一步一步来,比一上来写一大串链式调用更稳。
我自己踩坑后最大的感受就是:异步代码写得快不难,写得稳才值钱。