背景与问题
在 Java 后端里,一个请求要同时查多个下游服务,这几乎是家常便饭:
- 查用户基本信息
- 查账户余额
- 查订单摘要
- 查营销权益
如果这些调用都串行执行,总耗时通常是各个子任务耗时之和;而如果它们彼此独立,其实完全可以并发跑起来,把总耗时压到接近“最慢的那个任务”。
很多同学第一次接触异步时,会直接上 new Thread(),或者把 CompletableFuture.supplyAsync() 当成“银弹”。结果上线后常见问题就来了:
- 线程池打满,接口 RT 飙升
join()一把梭,异常被包装得看不清- 上下文丢失,日志里 traceId 断了
- 某个子任务超时,主流程被卡死
- 混用 CPU 密集和 IO 密集任务,吞吐量越来越差
我当时做聚合接口时就踩过一个很典型的坑:业务逻辑是异步了,但线程池没设计好,最后只是“并发地变慢”。所以这篇文章不只讲 API 怎么写,更会把线程池选型、异常治理、链路追踪和排查思路一起串起来。
前置知识
建议你至少熟悉这些概念:
- Java 8+ Lambda
ExecutorServiceCompletableFuture常见方法- 基本日志规范,如 MDC / traceId
- 对接口超时、线程池参数有基本认知
环境准备
本文示例可在以下环境运行:
- JDK 17(JDK 8 也基本兼容,个别 API 可替换)
- Maven / Gradle 任意
- 日志框架:
slf4j + logback - 示例中不依赖 Spring,但我会补充 Spring 场景建议
核心原理
1. CompletableFuture 解决的核心问题
CompletableFuture 本质上做了两件事:
- 表达异步结果
- 表达任务之间的依赖关系
也就是说,它不只是“开个异步线程”,更重要的是能把任务关系写清楚:
- 串行:
thenApply/thenCompose - 并行汇聚:
thenCombine/allOf - 竞争返回:
applyToEither - 异常兜底:
exceptionally/handle
2. 任务编排的三种常见关系
串行依赖
后一个任务依赖前一个任务的结果。
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> loadUser(userId), executor);
CompletableFuture<Address> addressFuture = userFuture.thenApplyAsync(
user -> loadAddress(user.getAddressId()),
executor
);
并行汇聚
多个任务互不依赖,并行执行后聚合结果。
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> loadUser(userId), executor);
CompletableFuture<Account> accountFuture = CompletableFuture.supplyAsync(() -> loadAccount(userId), executor);
CompletableFuture<UserView> viewFuture = userFuture.thenCombine(accountFuture,
(user, account) -> new UserView(user, account)
);
多任务统一收口
适合聚合查询场景。
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();
注意:allOf() 返回的是 CompletableFuture<Void>,并不会自动帮你收集结果,还得自己从各子任务里取值。
3. 为什么线程池决定了异步是否真的高效
很多人默认用 ForkJoinPool.commonPool(),代码确实短:
CompletableFuture.supplyAsync(() -> query());
但线上通常不建议这样做,原因很现实:
- 公共线程池可能被其他任务污染
- 不便于隔离不同业务
- 不便于监控、限流、压测和动态调参
- IO 阻塞任务放进 commonPool,容易拖垮整体吞吐
结论很直接:生产环境里,异步任务尽量显式指定线程池。
4. 一张图看懂异步编排链路
flowchart LR
A[请求进入] --> B[主线程构建异步任务]
B --> C1[查询用户信息]
B --> C2[查询账户余额]
B --> C3[查询订单摘要]
C1 --> D[结果聚合]
C2 --> D
C3 --> D
D --> E[异常处理/降级]
E --> F[返回聚合响应]
5. CompletableFuture 常用方法怎么选
| 方法 | 用途 | 是否消费前序结果 | 是否返回新结果 |
|---|---|---|---|
runAsync | 无返回值异步任务 | 否 | 否 |
supplyAsync | 有返回值异步任务 | 否 | 是 |
thenApply | 同步转换结果 | 是 | 是 |
thenCompose | 扁平化串联异步任务 | 是 | 是 |
thenCombine | 合并两个独立任务结果 | 是 | 是 |
allOf | 等多个任务全部结束 | 否 | 否 |
exceptionally | 出错兜底 | 否 | 是 |
handle | 成功/失败统一处理 | 是 | 是 |
whenComplete | 结果回调,不改结果 | 是 | 否 |
一个经验口诀:
- 转换结果用
thenApply - 后面还是异步任务用
thenCompose - 两个独立任务合并用
thenCombine - 批量收口用
allOf
实战代码(可运行)
下面我们做一个“用户首页聚合接口”的完整示例,包含:
- 自定义线程池
- 异步并发查询
- 超时控制
- 异常降级
- traceId 链路追踪
- 性能统计
1. 完整示例代码
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class CompletableFutureOrchestrationDemo {
// ===== 简单版 Trace 上下文,真实项目可接 MDC / SkyWalking / Sleuth =====
static final class TraceContext {
private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<>();
public static void set(String traceId) {
TRACE_ID.set(traceId);
}
public static String get() {
return TRACE_ID.get();
}
public static void clear() {
TRACE_ID.remove();
}
}
// ===== 支持上下文透传的线程池包装 =====
static final class ContextAwareExecutor implements Executor {
private final Executor delegate;
ContextAwareExecutor(Executor delegate) {
this.delegate = delegate;
}
@Override
public void execute(Runnable command) {
String traceId = TraceContext.get();
delegate.execute(() -> {
String old = TraceContext.get();
try {
TraceContext.set(traceId);
command.run();
} finally {
if (old == null) {
TraceContext.clear();
} else {
TraceContext.set(old);
}
}
});
}
}
// ===== 模拟返回对象 =====
record User(Long id, String name, Long levelId) {}
record Account(Long userId, BigDecimal balance) {}
record OrderSummary(Long userId, int count) {}
record Benefit(Long userId, List<String> coupons) {}
record UserHomeVO(
Long userId,
String userName,
BigDecimal balance,
int orderCount,
List<String> coupons,
boolean degraded,
String traceId,
LocalDateTime generatedAt
) {}
// ===== 业务线程池:适合 IO 密集型任务 =====
private static final ThreadPoolExecutor RAW_EXECUTOR = new ThreadPoolExecutor(
16,
32,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
r -> {
Thread t = new Thread(r);
t.setName("biz-async-" + t.threadId());
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
private static final Executor EXECUTOR = new ContextAwareExecutor(RAW_EXECUTOR);
public static void main(String[] args) {
String traceId = UUID.randomUUID().toString();
TraceContext.set(traceId);
try {
long begin = System.currentTimeMillis();
UserHomeVO vo = buildUserHome(1001L);
long cost = System.currentTimeMillis() - begin;
log("最终结果: " + vo);
log("总耗时(ms): " + cost);
} finally {
TraceContext.clear();
RAW_EXECUTOR.shutdown();
}
}
public static UserHomeVO buildUserHome(Long userId) {
long start = System.currentTimeMillis();
CompletableFuture<User> userFuture = supplyAsyncWithLog("queryUser",
() -> queryUser(userId))
.orTimeout(800, TimeUnit.MILLISECONDS);
CompletableFuture<Account> accountFuture = supplyAsyncWithLog("queryAccount",
() -> queryAccount(userId))
.completeOnTimeout(new Account(userId, BigDecimal.ZERO), 500, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("account 降级: " + unwrap(ex).getMessage());
return new Account(userId, BigDecimal.ZERO);
});
CompletableFuture<OrderSummary> orderFuture = supplyAsyncWithLog("queryOrderSummary",
() -> queryOrderSummary(userId))
.exceptionally(ex -> {
log("order 降级: " + unwrap(ex).getMessage());
return new OrderSummary(userId, 0);
});
CompletableFuture<Benefit> benefitFuture = supplyAsyncWithLog("queryBenefit",
() -> queryBenefit(userId))
.completeOnTimeout(new Benefit(userId, List.of()), 300, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("benefit 降级: " + unwrap(ex).getMessage());
return new Benefit(userId, List.of());
});
CompletableFuture<UserHomeVO> resultFuture = userFuture.thenCombine(accountFuture, (user, account) ->
new Object[]{user, account})
.thenCombine(orderFuture, (arr, order) ->
new Object[]{arr[0], arr[1], order})
.thenCombine(benefitFuture, (arr, benefit) -> {
User user = (User) arr[0];
Account account = (Account) arr[1];
OrderSummary order = (OrderSummary) arr[2];
boolean degraded = account.balance().compareTo(BigDecimal.ZERO) == 0
|| order.count() == 0
|| benefit.coupons().isEmpty();
return new UserHomeVO(
user.id(),
user.name(),
account.balance(),
order.count(),
benefit.coupons(),
degraded,
TraceContext.get(),
LocalDateTime.now()
);
})
.handle((vo, ex) -> {
long cost = System.currentTimeMillis() - start;
if (ex != null) {
Throwable root = unwrap(ex);
log("主流程失败, cost=" + cost + "ms, err=" + root.getMessage());
return new UserHomeVO(
userId,
"UNKNOWN",
BigDecimal.ZERO,
0,
List.of(),
true,
TraceContext.get(),
LocalDateTime.now()
);
}
log("主流程成功, cost=" + cost + "ms");
return vo;
});
return resultFuture.join();
}
private static <T> CompletableFuture<T> supplyAsyncWithLog(String taskName, Supplier<T> supplier) {
return CompletableFuture.supplyAsync(() -> {
long begin = System.currentTimeMillis();
log("开始任务: " + taskName);
try {
T result = supplier.get();
log("结束任务: " + taskName + ", cost=" + (System.currentTimeMillis() - begin) + "ms");
return result;
} catch (Exception e) {
log("任务异常: " + taskName + ", err=" + e.getMessage());
throw e;
}
}, EXECUTOR);
}
// ===== 模拟下游调用 =====
private static User queryUser(Long userId) {
sleep(120);
return new User(userId, "Alice", 3L);
}
private static Account queryAccount(Long userId) {
sleep(200);
return new Account(userId, new BigDecimal("1024.88"));
}
private static OrderSummary queryOrderSummary(Long userId) {
sleep(260);
return new OrderSummary(userId, 7);
}
private static Benefit queryBenefit(Long userId) {
sleep(150);
if (userId % 2 == 1) {
throw new RuntimeException("营销服务不可用");
}
return new Benefit(userId, List.of("COUPON-10", "COUPON-20"));
}
private static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程被中断", e);
}
}
private static Throwable unwrap(Throwable ex) {
if (ex instanceof CompletionException || ex instanceof ExecutionException) {
return ex.getCause() != null ? ex.getCause() : ex;
}
return ex;
}
private static void log(String msg) {
System.out.printf("[%s] [thread=%s] [traceId=%s] %s%n",
LocalDateTime.now(),
Thread.currentThread().getName(),
TraceContext.get(),
msg
);
}
}
2. 代码里几个关键点,建议你重点看
显式使用业务线程池
CompletableFuture.supplyAsync(() -> queryUser(userId), EXECUTOR);
这比直接用默认线程池更可控。
超时控制
.orTimeout(800, TimeUnit.MILLISECONDS)
.completeOnTimeout(defaultValue, 500, TimeUnit.MILLISECONDS)
两者区别:
orTimeout:超时后抛异常completeOnTimeout:超时后直接返回默认值
如果你的业务允许降级,我更推荐后者。
异常兜底
.exceptionally(ex -> new Account(userId, BigDecimal.ZERO))
这类写法适合“局部失败不影响主流程”的子任务。
主流程统一收口
.handle((vo, ex) -> { ... })
handle 很适合在链路末尾统一记录耗时、异常和返回兜底结果。
3. 异步调用时序图
sequenceDiagram
participant Client as 调用方
participant Main as 主线程
participant Pool as 业务线程池
participant U as 用户服务
participant A as 账户服务
participant O as 订单服务
participant B as 营销服务
Client->>Main: 请求用户首页
Main->>Pool: 提交 queryUser
Main->>Pool: 提交 queryAccount
Main->>Pool: 提交 queryOrderSummary
Main->>Pool: 提交 queryBenefit
Pool->>U: 查询用户
Pool->>A: 查询账户
Pool->>O: 查询订单
Pool->>B: 查询权益
U-->>Pool: User
A-->>Pool: Account
O-->>Pool: OrderSummary
B-->>Pool: 异常/超时/Benefit
Pool-->>Main: 聚合结果
Main-->>Client: UserHomeVO
逐步验证清单
如果你准备把这套模式落到项目里,我建议按下面顺序验证,而不是一上来就全接入。
第一步:先验证并发是否生效
看总耗时是不是接近最慢任务,而不是所有任务耗时之和。
例如:
- 用户 120ms
- 账户 200ms
- 订单 260ms
- 权益 150ms
串行大约 730ms;并发后通常应接近 260~320ms。
第二步:验证异常是否能被隔离
故意让某个子任务报错,比如营销服务抛异常,确认:
- 聚合接口仍能返回
- 日志里能看到明确异常
- 返回结果是降级值,不是整页失败
第三步:验证 traceId 是否透传成功
检查线程切换后日志中 traceId 是否一致。如果主线程有、异步线程没有,说明上下文透传丢了。
第四步:压测线程池
重点关注:
- 活跃线程数
- 队列积压
- 拒绝次数
- 接口 TP99
- 下游 RT 与错误率
常见坑与排查
这一部分很重要。我见过不少项目“用了 CompletableFuture”,但问题都出在这些细节上。
1. 默认线程池被误用
现象
- 接口偶发变慢
- 系统里其他异步任务互相影响
- 压测时吞吐量不稳定
原因
没有显式传入线程池,任务跑到了 ForkJoinPool.commonPool()。
排查
搜索代码:
CompletableFuture.supplyAsync(...)
CompletableFuture.runAsync(...)
看是否都传了 executor 参数。
建议
生产环境统一封装异步入口,禁止裸用默认线程池。
2. join() / get() 导致主线程阻塞
现象
异步代码写了一大堆,但最终还是在中间反复 join()。
错误示例
User user = userFuture.join();
Account account = accountFuture.join();
OrderSummary order = orderFuture.join();
这会让你的代码虽然“创建了异步任务”,却很容易在中间阶段退化成同步拼装。
更好的方式
优先使用组合式 API:
userFuture.thenCombine(accountFuture, ...)
只在最终出口 join() 一次。
3. 异常被包装,看不清根因
现象
日志里经常是:
CompletionExceptionExecutionException
真正下游报错信息没看到。
排查建议
统一解包:
private static Throwable unwrap(Throwable ex) {
if (ex instanceof CompletionException || ex instanceof ExecutionException) {
return ex.getCause() != null ? ex.getCause() : ex;
}
return ex;
}
经验建议
异常日志一定要打根因,而不是只打外层包装异常。
4. 线程池参数拍脑袋配置
现象
- 核心线程太小,队列堆积
- 队列太大,问题被掩盖,延迟越来越高
- 最大线程太大,导致上下文切换严重
一个实用思路
如果主要是 IO 密集任务,可以从下面的经验值起步:
- 核心线程数:
CPU核数 * 2 ~ 4 - 队列长度:根据峰值流量和超时时间估算
- 拒绝策略:优先明确业务语义,别默认无脑吞掉
如果是纯 CPU 密集任务,线程数一般不要远超 CPU 核数。
5. 上下文丢失,链路追踪断裂
现象
主线程日志有 traceId,异步线程日志没有。
原因
ThreadLocal 不会自动跨线程传递。
解决思路
- 手动包装
Executor - 使用 MDC 透传工具
- Spring 场景可考虑
TaskDecorator - 引入专业链路追踪方案
本文示例里的 ContextAwareExecutor 就是最小实现。
6. 子任务超时后仍在执行
这点很容易被忽略。
orTimeout() 和 completeOnTimeout() 更多是对 CompletableFuture 结果层面的控制,不一定真的中断底层任务。如果底层任务是阻塞 IO、数据库调用或远程请求,它可能还在继续跑。
建议
- 下游 HTTP/RPC 客户端必须设置自身超时
- 数据库查询设置超时
- 任务代码要处理中断信号
- 不要只依赖
CompletableFuture表层超时
7. allOf() 之后结果丢失
错误认知
很多人以为:
CompletableFuture.allOf(f1, f2, f3)
就能拿到所有结果。
其实不能。你还是得自己 join() 每个 future。
正确写法
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
CompletableFuture<List<Object>> result = all.thenApply(v ->
List.of(f1.join(), f2.join(), f3.join())
);
安全/性能最佳实践
这里我把线上比较实用的建议集中列出来。
1. 线程池隔离业务类型
不要把这些任务混在一个线程池里:
- 用户请求实时查询
- 大批量报表任务
- 消息消费
- 定时任务
更推荐按业务隔离:
queryExecutorrpcExecutorreportExecutor
这样出现故障时,影响范围更可控。
2. 区分 CPU 密集和 IO 密集
这是性能优化里最容易被忽略的一点。
CPU 密集任务
如:
- JSON 大对象序列化
- 复杂加解密
- 大量规则计算
建议线程数接近 CPU 核数。
IO 密集任务
如:
- 调用数据库
- 调用 Redis
- 调用外部 HTTP/RPC 服务
可适当提高线程数,但前提是你知道下游承载能力。
3. 给每个子任务设置超时与降级策略
一个成熟的异步编排,不能只追求“并发快”,还得考虑“坏的时候怎么退”。
建议对每类下游定义:
- 超时时间
- 默认值
- 是否允许失败
- 失败后的监控告警级别
例如:
| 子任务 | 超时 | 降级策略 |
|---|---|---|
| 用户信息 | 800ms | 不允许失败,失败则主流程兜底 |
| 账户余额 | 500ms | 返回 0 |
| 订单摘要 | 600ms | 返回空摘要 |
| 营销权益 | 300ms | 返回空券列表 |
4. 不要吞异常,要结构化记录
推荐至少记录这些字段:
- traceId
- taskName
- cost
- timeout / exception type
- fallback 是否触发
- 线程池名称
- 业务主键,如 userId / orderId
这样排查时才能串起来。
5. 拒绝策略要有业务含义
本文示例用了:
new ThreadPoolExecutor.CallerRunsPolicy()
它的特点是:线程池忙不过来时,由提交任务的线程自己执行,能形成一定“反压”。
但这不是所有场景都合适:
- 对 RT 敏感的在线接口,可能会把主线程拖慢
- 对后台任务,可能还能接受
如果你不知道怎么选,至少不要默默丢任务。相比“无声失败”,明确失败、记录监控更好。
6. 监控比 API 更重要
真正线上治理,核心不是 thenCombine 写得多优雅,而是你能不能看到这些数据:
- 线程池活跃线程数
- 队列长度
- 任务拒绝次数
- 平均耗时 / TP95 / TP99
- 各子任务异常率
- 降级比例
- traceId 覆盖率
如果没有监控,再漂亮的异步代码也很难维护。
7. 敏感信息不要在线程上下文里乱传
链路追踪上下文适合放:
- traceId
- spanId
- requestId
不适合放:
- 明文手机号
- Token 原文
- 身份证号
- 大对象用户详情
原因很简单:
- 有泄露风险
- 增加线程切换时的复制成本
- 容易造成上下文污染
一张图看懂异常与降级策略
stateDiagram-v2
[*] --> 提交子任务
提交子任务 --> 正常完成
提交子任务 --> 超时
提交子任务 --> 异常
正常完成 --> 聚合结果
超时 --> 返回默认值
异常 --> 返回降级值
返回默认值 --> 聚合结果
返回降级值 --> 聚合结果
聚合结果 --> 主流程成功
聚合结果 --> 主流程兜底
主流程成功 --> [*]
主流程兜底 --> [*]
方案落地建议
如果你准备在实际项目里推广,我建议按这个顺序落地。
阶段 1:先收口线程池
统一封装异步工具类,例如:
public final class Asyncs {
private static final Executor EXECUTOR = ...;
public static <T> CompletableFuture<T> supply(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier, EXECUTOR);
}
}
先解决“线程池不可控”的问题。
阶段 2:补齐超时与异常模板
把这些能力做成统一规范:
- 默认超时
- 统一异常日志
- 统一 traceId 透传
- 统一降级工具
这样团队写法会更稳定,不容易每个人一套风格。
阶段 3:接入监控与链路系统
至少让每个异步任务都有:
- 任务名
- 耗时
- 成功/失败状态
- traceId
如果已经有 SkyWalking、Zipkin、OpenTelemetry,这一步会非常值。
总结
CompletableFuture 真正强的地方,不是“把同步代码改成异步”这么简单,而是它能把任务依赖、并发关系、异常收口和降级策略写成一条清晰的执行链。
这篇文章里你可以记住 5 个最实用的结论:
- 生产环境尽量不要裸用默认线程池
- 异步编排优先用组合式 API,中间少
join() - 每个子任务都要有超时、异常和降级策略
- 线程上下文不会自动透传,traceId 要主动处理
- 性能优化先看线程池和下游瓶颈,不是只看语法
最后给几个可执行建议,比较接地气:
- 聚合接口里,先挑 2~4 个独立下游改造成并行,不要一次性全面异步化
- 给每个异步任务打上任务名和耗时日志
- 压测时一定盯住线程池队列、拒绝数和 TP99
- 如果某个下游不稳定,优先加超时和降级,而不是一味扩线程
- 对强依赖任务,宁可失败得明确,也不要返回“看起来成功但数据错了”的结果
如果你能把这套思路真正用到项目里,CompletableFuture 就不只是“会用 API”,而是真正成为可维护、可观测、可演进的异步编排工具。