Java 中基于 CompletableFuture 的异步编排实战:从并发优化到异常处理落地
在很多 Java 业务系统里,我们最早写的代码往往是“串行思维”:
- 查用户
- 查订单
- 查库存
- 拼装结果
- 返回
这样写当然没错,逻辑直观、调试方便。但只要这些步骤依赖远程调用、数据库查询或者第三方接口,整体耗时就会被慢请求一层层叠加。
我自己在做聚合接口时就踩过这个坑:单个接口都只要几十毫秒,看起来不慢,但串起来就是三四百毫秒,接口一多,延迟就开始让人难受。
这时候,CompletableFuture 就非常适合上场。它不只是“异步执行一个任务”,更重要的是它能做异步编排:并行、串联、合并、兜底、超时、异常恢复,一套链路都能比较自然地表达出来。
这篇文章我会从“真实开发怎么落地”的角度来讲,重点不放在 API 罗列,而是带你写一个可运行的聚合查询示例,再把常见坑、异常处理方式和性能边界讲透。
背景与问题
先看一个很典型的业务场景:商品详情页聚合接口。
前端请求商品详情时,后端通常需要同时拿到:
- 商品基础信息
- 库存信息
- 价格信息
- 营销信息
如果你用串行方式写,大概像这样:
public ProductDetailVO getDetail(Long productId) {
Product product = productService.getProduct(productId);
Stock stock = stockService.getStock(productId);
Price price = priceService.getPrice(productId);
Promotion promotion = promotionService.getPromotion(productId);
return ProductDetailVO.of(product, stock, price, promotion);
}
问题很明显:
- 总耗时 = 各步骤耗时之和
- 任意一步抖动,整体都会变慢
- 某个非核心服务失败时,很难优雅降级
- 想加超时、重试、兜底,代码会越来越乱
更糟的是,很多人会尝试手写线程池 + Future 来优化,但最后会发现:
Future只能拿结果,编排能力弱- 多任务依赖和异常处理写起来很别扭
- 代码可读性迅速下降
所以我们真正要解决的不是“开线程”,而是:
- 如何把无依赖任务并行化
- 如何把有依赖任务串起来
- 如何在失败、超时、部分结果缺失时仍然可控地返回
- 如何避免线程池被打爆、异常被吞掉、主线程莫名卡住
前置知识与环境准备
本文示例基于:
- JDK 8+
- 建议 JDK 9+,因为有
orTimeout/completeOnTimeout - 一个自定义线程池,不建议直接依赖默认公共线程池
Maven 项目无需额外依赖,CompletableFuture 来自 JDK 标准库。
核心原理
先别急着背 API,我建议先抓住 4 个最关键的能力。
1. 异步执行:把任务扔到线程池里
CompletableFuture.supplyAsync(() -> queryData(), executor);
supplyAsync:有返回值runAsync:无返回值
这一步只是“启动异步任务”。
2. 串行编排:上一步结果作为下一步输入
future.thenApply(result -> transform(result));
常用区别:
thenApply:同步转换结果thenCompose:把“异步套异步”拍平thenAccept:消费结果但不返回值
3. 并行合并:多个任务一起跑,最后汇总
CompletableFuture.allOf(f1, f2, f3)
适合“都完成后再统一收集结果”。
如果只关心“谁先完成用谁”,可以用:
CompletableFuture.anyOf(f1, f2, f3)
4. 异常处理:别让异常失控
这是落地里最重要的一块。
常见方法:
exceptionally:异常兜底,返回替代值handle:不管成功失败都处理whenComplete:做日志、埋点,不改变结果
我一般这样记:
- 要改结果:
exceptionally/handle - 只记录,不改结果:
whenComplete
一张图看懂 CompletableFuture 编排方式
flowchart LR
A[请求商品详情] --> B[并行查商品基础信息]
A --> C[并行查库存]
A --> D[并行查价格]
A --> E[并行查营销]
B --> F[汇总结果]
C --> F
D --> F
E --> F
F --> G[返回详情页数据]
上图是最基础的“并行汇总”。
但真实业务里,常常还会有依赖关系,比如:
- 先查商品基础信息
- 再根据商品分类去查营销规则
这时就要用串行链式编排。
核心 API 的使用心法
thenApply 和 thenCompose 的区别
这是中级开发最容易混淆的点之一。
thenApply
上一步返回 T,这一步把它转成 R。
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> "hello")
.thenApply(s -> s + " world");
thenCompose
上一步返回 T,这一步再发起一个新的异步任务,返回 CompletableFuture<R>。
thenCompose 会帮你自动“拍平”。
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> 1)
.thenCompose(id -> CompletableFuture.supplyAsync(() -> "user-" + id));
如果你用错成 thenApply,就会得到:
CompletableFuture<CompletableFuture<String>>
这几乎总不是你想要的。
异步链路时序图
sequenceDiagram
participant Client as 调用方
participant CF as CompletableFuture编排层
participant P as ProductService
participant S as StockService
participant PR as PriceService
participant M as PromotionService
Client->>CF: 请求商品详情
par 并行查询
CF->>P: 查询商品
CF->>S: 查询库存
CF->>PR: 查询价格
CF->>M: 查询营销
end
P-->>CF: 商品结果
S-->>CF: 库存结果
PR-->>CF: 价格结果
M-->>CF: 营销结果/异常
CF->>CF: 异常兜底/默认值填充
CF-->>Client: 聚合结果
实战代码(可运行)
下面我们写一个可以直接运行的示例:模拟商品详情聚合接口。
功能目标:
- 商品、库存、价格、营销并行查询
- 营销服务偶发失败,使用默认值兜底
- 对整体链路做超时控制
- 汇总结果返回
- 打印总耗时
为了方便本地运行,下面所有代码放在一个文件中也可以跑。
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class CompletableFutureDemo {
private static final ExecutorService BIZ_EXECUTOR = new ThreadPoolExecutor(
8,
16,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactory() {
private int index = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "biz-cf-" + index++);
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) {
ProductFacade facade = new ProductFacade();
long start = System.currentTimeMillis();
ProductDetailVO detail = facade.getProductDetail(1001L);
long cost = System.currentTimeMillis() - start;
System.out.println("最终结果: " + detail);
System.out.println("总耗时: " + cost + " ms");
BIZ_EXECUTOR.shutdown();
}
static class ProductFacade {
private final ProductService productService = new ProductService();
private final StockService stockService = new StockService();
private final PriceService priceService = new PriceService();
private final PromotionService promotionService = new PromotionService();
public ProductDetailVO getProductDetail(Long productId) {
CompletableFuture<Product> productFuture =
CompletableFuture.supplyAsync(wrap(() -> productService.getProduct(productId)), BIZ_EXECUTOR)
.whenComplete((r, e) -> log("productFuture", r, e));
CompletableFuture<Stock> stockFuture =
CompletableFuture.supplyAsync(wrap(() -> stockService.getStock(productId)), BIZ_EXECUTOR)
.whenComplete((r, e) -> log("stockFuture", r, e));
CompletableFuture<Price> priceFuture =
CompletableFuture.supplyAsync(wrap(() -> priceService.getPrice(productId)), BIZ_EXECUTOR)
.whenComplete((r, e) -> log("priceFuture", r, e));
CompletableFuture<Promotion> promotionFuture =
CompletableFuture.supplyAsync(wrap(() -> promotionService.getPromotion(productId)), BIZ_EXECUTOR)
.exceptionally(ex -> {
System.out.println("[WARN] 营销服务失败,使用默认营销信息: " + ex.getMessage());
return new Promotion("默认无活动");
})
.whenComplete((r, e) -> log("promotionFuture", r, e));
CompletableFuture<ProductDetailVO> detailFuture =
CompletableFuture.allOf(productFuture, stockFuture, priceFuture, promotionFuture)
.thenApply(v -> new ProductDetailVO(
productFuture.join(),
stockFuture.join(),
priceFuture.join(),
promotionFuture.join()
))
.orTimeout(1500, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
throw new RuntimeException("聚合商品详情失败: " + ex.getMessage(), ex);
});
return detailFuture.join();
}
private <T> Supplier<T> wrap(Supplier<T> supplier) {
return () -> {
try {
return supplier.get();
} catch (Exception e) {
throw new CompletionException(e);
}
};
}
private void log(String taskName, Object result, Throwable throwable) {
if (throwable != null) {
System.out.println("[ERROR] " + taskName + " failed: " + throwable.getMessage());
} else {
System.out.println("[INFO] " + taskName + " success: " + result);
}
}
}
static class ProductService {
public Product getProduct(Long productId) {
sleep(300);
return new Product(productId, "机械键盘");
}
}
static class StockService {
public Stock getStock(Long productId) {
sleep(500);
return new Stock(productId, 28);
}
}
static class PriceService {
public Price getPrice(Long productId) {
sleep(400);
return new Price(productId, 399.00);
}
}
static class PromotionService {
private final Random random = new Random();
public Promotion getPromotion(Long productId) {
sleep(600);
if (random.nextBoolean()) {
throw new RuntimeException("promotion service unavailable");
}
return new Promotion("满300减30");
}
}
static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread interrupted", e);
}
}
static class Product {
private final Long id;
private final String name;
public Product(Long id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "{id=" + id + ", name='" + name + "'}";
}
}
static class Stock {
private final Long productId;
private final int available;
public Stock(Long productId, int available) {
this.productId = productId;
this.available = available;
}
@Override
public String toString() {
return "{productId=" + productId + ", available=" + available + "}";
}
}
static class Price {
private final Long productId;
private final double amount;
public Price(Long productId, double amount) {
this.productId = productId;
this.amount = amount;
}
@Override
public String toString() {
return "{productId=" + productId + ", amount=" + amount + "}";
}
}
static class Promotion {
private final String desc;
public Promotion(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "{desc='" + desc + "'}";
}
}
static class ProductDetailVO {
private final Product product;
private final Stock stock;
private final Price price;
private final Promotion promotion;
public ProductDetailVO(Product product, Stock stock, Price price, Promotion promotion) {
this.product = product;
this.stock = stock;
this.price = price;
this.promotion = promotion;
}
@Override
public String toString() {
return "ProductDetailVO{" +
"product=" + product +
", stock=" + stock +
", price=" + price +
", promotion=" + promotion +
'}';
}
}
}
这段代码里几个关键点,值得你重点看
1. 使用自定义线程池,而不是默认线程池
CompletableFuture.supplyAsync(task, BIZ_EXECUTOR)
默认的 ForkJoinPool.commonPool() 并不是不能用,但业务系统里我更建议显式传入线程池:
- 方便隔离不同业务
- 方便压测和调优
- 线程数、队列长度、拒绝策略都可控
- 避免公共线程池被其他异步任务拖垮
2. 用 allOf 做并行汇总
CompletableFuture.allOf(productFuture, stockFuture, priceFuture, promotionFuture)
注意:allOf 本身不直接返回聚合结果,只表示“都完成了”。
所以后面还要再用 join() 把各自结果取出来。
3. 对非核心依赖做降级
营销信息通常不是强一致核心数据,所以:
.exceptionally(ex -> new Promotion("默认无活动"))
这类兜底非常实用。
我的经验是:不要把所有异常都往上抛,而是按“核心依赖 / 非核心依赖”做分层处理。
4. 超时控制不能省
.orTimeout(1500, TimeUnit.MILLISECONDS)
异步并不意味着不会卡住。
如果底层服务持续慢响应,线程池仍然会被占住,调用方也一样会等死。
进一步扩展:有依赖关系时怎么写
上面的四个查询彼此独立,适合并行。
但如果营销规则依赖商品分类,就要写成“先查商品,再查营销”。
import java.util.concurrent.*;
public class ThenComposeExample {
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(4);
public static void main(String[] args) {
CompletableFuture<String> resultFuture =
CompletableFuture.supplyAsync(() -> getProductCategory(1001L), EXECUTOR)
.thenCompose(category ->
CompletableFuture.supplyAsync(() -> getPromotionByCategory(category), EXECUTOR)
)
.exceptionally(ex -> "默认分类活动");
System.out.println(resultFuture.join());
EXECUTOR.shutdown();
}
static String getProductCategory(Long productId) {
sleep(200);
return "keyboard";
}
static String getPromotionByCategory(String category) {
sleep(300);
return category + " 类目限时折扣";
}
static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
如果你看到“异步任务 A 完成后,再发起异步任务 B”,优先想到 thenCompose。
状态变化图:一个 Future 是怎么走完生命周期的
stateDiagram-v2
[*] --> Created
Created --> Running: supplyAsync/runAsync
Running --> Success: 正常完成
Running --> Failed: 抛出异常
Running --> Timeout: 超时
Failed --> Recovered: exceptionally/handle兜底
Success --> [*]
Recovered --> [*]
Timeout --> [*]
这张图的意义在于:
异常不是终点,超时也不是终点,关键是你有没有定义“失败后的业务行为”。
常见坑与排查
这部分很重要,很多线上问题不是不会用 API,而是细节没处理好。
坑 1:误用默认线程池,结果线上抖动严重
现象
- 某些接口偶发变慢
- CPU 不高,但请求堆积
- 异步任务互相影响
原因
默认公共线程池和别的任务共用,且不一定适合你的业务模型。
如果任务里还有阻塞 IO(比如远程调用、数据库查询),问题更明显。
建议
- 业务异步任务使用独立线程池
- IO 密集型线程池大小通常可比 CPU 核数更大,但别无脑放大
- 配合监控:活跃线程数、队列长度、拒绝次数、任务耗时
坑 2:join() / get() 用错位置,异步又写回串行了
例如:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> queryA(), executor);
String a = f1.join();
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> queryB(a), executor);
String b = f2.join();
这虽然用了 CompletableFuture,但主线程每步都在等,本质还是串行阻塞。
正确思路
- 能链式编排就链式编排
- 能
allOf汇总就不要中途频繁join
坑 3:异常被 CompletionException 包了一层,看不懂真实原因
这是最常见的排查痛点之一。
future.join();
如果任务异常,join() 会抛出 CompletionException。
真正的业务异常通常在:
ex.getCause()
排查建议
打印日志时不要只打印:
ex.getMessage()
最好完整输出堆栈,并关注根因:
Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
cause.printStackTrace();
坑 4:whenComplete 以为能兜底,实际上它不会吞异常
很多人第一次写会这样理解错:
future.whenComplete((r, e) -> {
if (e != null) {
System.out.println("出错了");
}
});
这只是“看到异常”,并没有改变 Future 的失败状态。
如果你要兜底,应该用:
exceptionallyhandle
坑 5:链路超时了,但底层任务还在跑
orTimeout 只能让 CompletableFuture 在上层感知超时,
不等于底层 IO 调用就真的取消了。
比如你里面执行的是 HTTP 请求、数据库请求、RPC 调用,
它们是否真正停止,还取决于底层客户端自己的超时机制。
实战建议
超时要分层设置:
CompletableFuture编排超时- HTTP/RPC 客户端超时
- 数据库连接和查询超时
这三个最好一起配。
常见排查路径
如果某个异步聚合接口突然变慢或报错,我一般按这个顺序查:
1. 看线程池
重点看:
- 当前线程数
- 活跃线程数
- 队列积压
- 拒绝策略触发次数
2. 看每个子任务耗时
给每个 Future 打日志或埋点:
- 开始时间
- 结束时间
- 是否异常
- 降级是否生效
3. 看异常链
尤其注意:
CompletionExceptionExecutionException- 真正
cause
4. 看是否有隐式阻塞
比如:
- 异步方法里又调用了同步阻塞接口
- 主线程过早
join - 某些公共锁导致线程等待
安全/性能最佳实践
这一节我尽量给“可以直接拿去执行”的建议。
1. 区分核心依赖和非核心依赖
不是所有失败都该直接让接口报错。
一个简单策略:
- 核心依赖:商品、价格、库存
失败就整体失败,或者明确返回错误 - 非核心依赖:营销、推荐、标签
失败则降级,返回默认值
这样用户体验和系统稳定性会好很多。
2. 线程池隔离,避免相互拖垮
建议至少按业务域拆分线程池,例如:
- 商品聚合线程池
- 推荐计算线程池
- 消息通知线程池
不要所有异步任务都塞进一个池子。
为什么这也算“安全”最佳实践
因为线程池被打满,本质上是一种资源争抢。
如果没有隔离,一个边缘业务的高峰可能拖垮核心接口。
3. 不要在异步任务里做长时间阻塞操作
CompletableFuture 很适合编排,但它并不能神奇地消灭阻塞。
如果你在线程池里执行的是:
- 大量同步 HTTP 调用
- 大量数据库慢查询
- 大量
Thread.sleep - 大量锁等待
那最终还是会把线程耗尽。
建议
- 优先优化底层依赖时延
- 为阻塞型任务配置合适线程池
- 有条件时考虑响应式或事件驱动方案,但别为了“新技术”强上
4. 统一超时、降级、日志规范
我很建议团队里沉淀统一模板,例如:
- 每个远程依赖都要有超时
- 每个非核心依赖都要定义默认值
- 每个 Future 都要打成功/失败日志
- 聚合接口统一输出 traceId、子任务耗时、异常原因
这样问题一来,排查速度会快很多。
5. 慎用 join(),更慎用循环里 join()
下面这种写法很危险:
for (Long id : ids) {
CompletableFuture<Result> future = CompletableFuture.supplyAsync(() -> query(id), executor);
results.add(future.join());
}
这又退化成串行了。
更好的做法
先全部提交,再统一收集:
List<CompletableFuture<Result>> futures = ids.stream()
.map(id -> CompletableFuture.supplyAsync(() -> query(id), executor))
.toList();
List<Result> results = futures.stream()
.map(CompletableFuture::join)
.toList();
如果你在 JDK 8,没有 toList(),可以用 Collectors.toList()。
6. 注意上下文传递问题
在 Web 场景里,异步线程经常拿不到主线程上下文,比如:
ThreadLocal- traceId
- 登录态信息
- 租户信息
如果你依赖这些上下文,一定要显式传递或使用支持上下文传播的方案。
否则很容易出现:
- 日志 trace 丢失
- 多租户串数据
- 审计字段缺失
这是我在实际项目里见过很多次的问题,排查起来很烦。
逐步验证清单
如果你准备把 CompletableFuture 用到生产代码里,我建议按这个清单自测一遍:
- 所有异步任务是否都指定了自定义线程池
- 并行任务是否真的并行,没有中途过早
join - 核心依赖和非核心依赖是否区分处理
- 是否配置了超时
- 是否有异常兜底逻辑
- 是否记录了子任务耗时和异常日志
- 线程池参数是否根据压测数据调整过
- 底层 HTTP/RPC/DB 是否也配置了超时
- 是否考虑了上下文传递
- 是否验证过某个子任务失败时整体返回是否符合预期
一个简单的取舍建议:什么时候适合用 CompletableFuture
很适合:
- 聚合查询接口
- 多个独立远程调用并行执行
- 有简单依赖关系的异步链路
- 需要异常兜底、超时控制的中等复杂业务
不太适合单靠它硬扛的场景:
- 超复杂 DAG 编排
- 海量任务调度
- 强依赖非阻塞 IO 模型
- 需要跨进程工作流编排
换句话说,CompletableFuture 非常适合“应用层异步编排”,
但它不是万能工作流引擎,也不是性能灵药。
总结
CompletableFuture 真正的价值,不只是“把代码改成异步”,而是让你能更清晰地表达:
- 哪些任务可以并行
- 哪些任务有前后依赖
- 哪些异常该失败,哪些该兜底
- 哪些调用必须限时结束
如果你只记住几个落地建议,我建议记这 5 条:
- 优先并行化无依赖任务,接口延迟通常会立刻改善
- 一定使用自定义线程池,别把业务命运交给公共线程池
- 核心依赖失败就显式报错,非核心依赖失败就降级
- 超时要分层控制:编排层、RPC/HTTP 层、数据库层都要有
- 排查问题先看线程池、再看子任务耗时、最后看异常根因
最后说句很实在的话:
CompletableFuture 写得好,代码会很优雅;写得不好,异步链会比同步代码更难维护。
所以别一上来就追求“全链路异步化”,先从一个典型聚合接口开始,把并行、超时、兜底、日志这四件事做好,收益通常就已经很明显了。