Java 中利用 CompletableFuture 优化并发编排的实战指南
在 Java 里写并发,很多人一开始都走过同一条路:new Thread()、线程池、Future.get(),最后代码不是“能跑”,而是“勉强能跑”。一旦业务里出现多个远程调用并行、结果汇总、超时降级、异常兜底,传统写法就会迅速变得难维护。
CompletableFuture 的价值,恰恰就在这里:它不是单纯“开异步任务”的工具,而是一套并发编排能力。你可以把它理解成“用声明式方式描述异步依赖关系”。
这篇文章我会从实战角度带你走一遍:
- 为什么需要
CompletableFuture - 它的核心模型是什么
- 怎么写出可运行的并发编排代码
- 常见坑怎么排查
- 在线上环境里如何兼顾性能与稳定性
如果你已经会用线程池,但对 thenApply、thenCompose、allOf 总觉得“知道一点但不够顺手”,这篇会比较适合你。
背景与问题
先看一个常见业务场景:商品详情页聚合查询。
一次请求需要并发拿到:
- 商品基础信息
- 库存信息
- 价格信息
- 营销信息
如果串行调用,大致是这样:
Product product = productService.getProduct(productId);
Stock stock = stockService.getStock(productId);
Price price = priceService.getPrice(productId);
Promotion promotion = promotionService.getPromotion(productId);
这类代码的问题很直接:
- 总耗时等于多个远程调用之和
- 某个接口慢,全链路一起慢
- 异常处理分散,超时不好统一控制
- 后续如果有“先查 A,再基于 A 查 B”的依赖,代码会越来越绕
有些同学会改成 Future + 线程池:
Future<Product> f1 = executor.submit(() -> productService.getProduct(productId));
Future<Stock> f2 = executor.submit(() -> stockService.getStock(productId));
Future<Price> f3 = executor.submit(() -> priceService.getPrice(productId));
然后在后面挨个 get()。
这比串行快一些,但仍然有几个典型问题:
get()是阻塞的,写多了还是“同步思维”- 多任务依赖关系靠人工管理,易乱
- 异常包装层级深,调试不直观
- 超时、降级、链式转换写起来不顺手
CompletableFuture 的优势在于:
它把“任务之间怎么衔接”这件事,从控制流代码里抽出来,变成了一套 API。
前置知识与环境准备
适合的 Java 版本
建议至少使用:
- Java 8:
CompletableFuture基础能力齐全 - Java 9+:可用
orTimeout、completeOnTimeout等更方便的超时控制 - Java 17+:长期支持版本,更推荐生产使用
示例环境
本文代码基于:
- JDK 17
- 普通
main方法可运行 - 不依赖 Spring,方便你本地直接试
学习这篇前最好已掌握
- Java 线程池基础
- lambda 表达式
- 异常处理
- 基本的 HTTP/RPC 调用模型概念
核心原理
1. CompletableFuture 不只是 Future
Future 更像是“异步结果容器”,你能查它是否完成、能阻塞等待结果。
CompletableFuture 则更进一步:
- 能主动完成结果
- 能注册完成后的回调
- 能进行链式转换
- 能组合多个异步任务
- 能统一处理异常和超时
可以把它理解成一个“未来结果 + 编排动作”的组合体。
2. 三类最常用能力
一类:启动异步任务
runAsync():执行任务,无返回值supplyAsync():执行任务,有返回值
二类:结果转换与依赖编排
thenApply():把上一步结果转换成另一个值thenAccept():消费结果,无返回值thenRun():不关心结果,只在完成后执行thenCompose():把“异步套异步”拉平thenCombine():组合两个独立异步结果
三类:汇总与异常控制
allOf():等待所有任务完成anyOf():任意一个完成就继续exceptionally():出现异常时给默认值handle():无论成功失败都处理whenComplete():适合做日志、埋点、清理
3. thenApply 和 thenCompose 的区别
这是很多人刚学时最容易混的地方。
thenApply
适合:同步转换
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> "java")
.thenApply(String::toUpperCase);
这里 toUpperCase 只是普通转换,所以用 thenApply。
thenCompose
适合:上一步结果出来后,还要发起新的异步任务
CompletableFuture<User> future =
CompletableFuture.supplyAsync(() -> 1001L)
.thenCompose(userService::getUserAsync);
如果你这里用 thenApply,得到的会是:
CompletableFuture<CompletableFuture<User>>
这就是典型的“异步嵌套异步”。
thenCompose 的作用就是把它拍平。
4. 并发编排的基本模型
可以把它简化成三种关系:
- 并行执行:多个任务互不依赖,同时发起
- 串行依赖:后一个任务依赖前一个结果
- 汇总聚合:多个任务结束后,组装成最终结果
下面这张图能帮助你快速建立心智模型。
flowchart LR
A[收到商品详情请求] --> B[并发查商品信息]
A --> C[并发查库存]
A --> D[并发查价格]
A --> E[并发查营销]
B --> F[聚合结果]
C --> F
D --> F
E --> F
F --> G[返回详情页DTO]
5. 线程执行到底在哪个池里?
这是生产里非常关键的问题。
如果你用:
CompletableFuture.supplyAsync(() -> doSomething());
但没有传入自定义线程池,它默认使用的是:
ForkJoinPool.commonPool()
问题在于:
- 它是全局共享的
- 适合 CPU 密集型任务
- 如果你拿它跑大量 IO 阻塞任务,容易把池子拖慢
- 和应用里其他地方共用时,行为不可控
我的建议很明确:
线上业务异步编排,尽量显式传入自定义线程池,不要长期依赖默认 commonPool。
一张图看懂常见 API 关系
classDiagram
class CompletableFuture {
+runAsync()
+supplyAsync()
+thenApply()
+thenCompose()
+thenCombine()
+allOf()
+anyOf()
+exceptionally()
+handle()
+whenComplete()
}
class AsyncTask {
<<start>>
}
class Transform {
<<convert>>
}
class Combine {
<<aggregate>>
}
class ErrorHandle {
<<fallback>>
}
CompletableFuture --> AsyncTask
CompletableFuture --> Transform
CompletableFuture --> Combine
CompletableFuture --> ErrorHandle
实战代码(可运行)
下面我们实现一个简化版的商品详情聚合服务。
目标:
- 商品、库存、价格并发查询
- 营销信息依赖商品分类后再异步查询
- 对价格接口设置超时降级
- 对整个流程做统一日志与异常处理
完整示例
import java.math.BigDecimal;
import java.time.LocalTime;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class CompletableFutureOrchestrationDemo {
private static final ExecutorService IO_POOL = new ThreadPoolExecutor(
8,
16,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
r -> {
Thread t = new Thread(r);
t.setName("cf-io-" + t.threadId());
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) {
ProductDetailService service = new ProductDetailService();
try {
ProductDetail detail = service.getProductDetail(1001L);
System.out.println("\n===== 最终结果 =====");
System.out.println(detail);
} finally {
IO_POOL.shutdown();
}
}
static class ProductDetailService {
private final ProductService productService = new ProductService();
private final StockService stockService = new StockService();
private final PriceService priceService = new PriceService();
private final PromotionService promotionService = new PromotionService();
public ProductDetail getProductDetail(Long productId) {
long start = System.currentTimeMillis();
CompletableFuture<Product> productFuture =
CompletableFuture.supplyAsync(logged("query product", () -> productService.getProduct(productId)), IO_POOL);
CompletableFuture<Stock> stockFuture =
CompletableFuture.supplyAsync(logged("query stock", () -> stockService.getStock(productId)), IO_POOL)
.exceptionally(ex -> {
log("stock fallback: " + ex.getMessage());
return new Stock(productId, 0);
});
CompletableFuture<Price> priceFuture =
CompletableFuture.supplyAsync(logged("query price", () -> priceService.getPrice(productId)), IO_POOL)
.completeOnTimeout(new Price(productId, new BigDecimal("0.00"), true), 800, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("price fallback: " + ex.getMessage());
return new Price(productId, new BigDecimal("0.00"), true);
});
CompletableFuture<Promotion> promotionFuture =
productFuture.thenCompose(product ->
CompletableFuture.supplyAsync(
logged("query promotion by category", () -> promotionService.getPromotion(product.category())),
IO_POOL
)
).exceptionally(ex -> {
log("promotion fallback: " + ex.getMessage());
return new Promotion("NONE", "无可用活动");
});
CompletableFuture<ProductDetail> detailFuture =
CompletableFuture.allOf(productFuture, stockFuture, priceFuture, promotionFuture)
.thenApply(v -> new ProductDetail(
productFuture.join(),
stockFuture.join(),
priceFuture.join(),
promotionFuture.join()
))
.whenComplete((result, ex) -> {
long cost = System.currentTimeMillis() - start;
if (ex == null) {
log("assemble success, cost = " + cost + " ms");
} else {
log("assemble failed, cost = " + cost + " ms, ex = " + ex.getMessage());
}
});
return detailFuture.join();
}
}
static class ProductService {
public Product getProduct(Long productId) {
sleep(300);
return new Product(productId, "机械键盘", "keyboard");
}
}
static class StockService {
public Stock getStock(Long productId) {
sleep(400);
return new Stock(productId, 128);
}
}
static class PriceService {
public Price getPrice(Long productId) {
sleep(1000); // 故意慢一点,触发超时降级
return new Price(productId, new BigDecimal("399.00"), false);
}
}
static class PromotionService {
public Promotion getPromotion(String category) {
sleep(350);
if ("keyboard".equals(category)) {
return new Promotion("PROMO-618", "满 299 减 40");
}
return new Promotion("NONE", "无可用活动");
}
}
record Product(Long productId, String name, String category) {}
record Stock(Long productId, int available) {}
record Price(Long productId, BigDecimal amount, boolean fallback) {}
record Promotion(String code, String description) {}
record ProductDetail(Product product, Stock stock, Price price, Promotion promotion) {}
static <T> Supplier<T> logged(String action, Supplier<T> supplier) {
return () -> {
long start = System.currentTimeMillis();
log("start: " + action);
try {
T result = supplier.get();
log("finish: " + action + ", cost = " + (System.currentTimeMillis() - start) + " ms");
return result;
} catch (Exception e) {
log("error: " + action + ", cost = " + (System.currentTimeMillis() - start) + " ms");
throw e;
}
};
}
static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread interrupted", e);
}
}
static void log(String msg) {
System.out.printf("[%s][%s] %s%n", LocalTime.now(), Thread.currentThread().getName(), msg);
}
}
这段代码里到底优化了什么?
我们一步步拆开看。
1. 商品、库存、价格并发查
CompletableFuture<Product> productFuture = ...
CompletableFuture<Stock> stockFuture = ...
CompletableFuture<Price> priceFuture = ...
这三个任务互不依赖,所以应当立刻并发启动。
如果串行执行,理论耗时约为:
- 300ms + 400ms + 1000ms = 1700ms,再加营销查询
而并发后,主耗时接近最慢的那个任务,也就是价格查询。
2. 营销查询依赖商品分类,用 thenCompose
productFuture.thenCompose(product ->
CompletableFuture.supplyAsync(() -> promotionService.getPromotion(product.category()), IO_POOL)
)
为什么这里不是 thenApply?
因为 getPromotion(...) 本身又是一个异步任务,我们需要的是:
- 先拿到商品
- 再异步发起营销查询
- 返回一个“拉平后的最终 Future”
这就是 thenCompose 的典型场景。
3. 价格接口超时降级
.completeOnTimeout(new Price(productId, new BigDecimal("0.00"), true), 800, TimeUnit.MILLISECONDS)
这里特别适合聚合型接口。
商品详情页里,价格如果超时:
- 未必需要整个请求失败
- 可以返回兜底值
- 让页面主体先出来
这类思路在线上非常常见:核心字段强一致,边缘字段弱依赖。
4. allOf 负责“等大家都结束”
CompletableFuture.allOf(productFuture, stockFuture, priceFuture, promotionFuture)
它返回的是 CompletableFuture<Void>,本身不直接给你各自结果。
所以我们常见写法是后面再 join():
.thenApply(v -> new ProductDetail(
productFuture.join(),
stockFuture.join(),
priceFuture.join(),
promotionFuture.join()
))
这里之所以安全,是因为 allOf 已经保证这些任务都结束了。
再看一个时序图:请求是怎么流动的?
sequenceDiagram
participant C as Client
participant A as API
participant P as ProductService
participant S as StockService
participant R as PriceService
participant M as PromotionService
C->>A: 请求商品详情
A->>P: 异步查询商品
A->>S: 异步查询库存
A->>R: 异步查询价格
P-->>A: 返回商品(category=keyboard)
A->>M: 基于category异步查营销
S-->>A: 返回库存
Note over R,A: 超过800ms触发降级
M-->>A: 返回营销信息
A-->>C: 聚合后返回详情
常用编排模式总结
模式一:并行聚合
多个无依赖任务同时执行,最后合并。
CompletableFuture<A> fa = ...;
CompletableFuture<B> fb = ...;
CompletableFuture<Result> future = fa.thenCombine(fb, Result::new);
适合:
- 聚合页
- 报表统计
- 多下游接口并发查询
模式二:串行依赖
后续步骤依赖前一步结果。
CompletableFuture<UserProfile> future =
getUserIdAsync()
.thenCompose(this::getUserAsync)
.thenCompose(this::getProfileAsync);
适合:
- 先鉴权,再查用户,再查画像
- 先建订单,再锁库存,再发优惠券
模式三:竞争返回
多个来源哪个先返回用哪个。
CompletableFuture<Object> future = CompletableFuture.anyOf(sourceA, sourceB, sourceC);
适合:
- 多机房竞速
- 主备源切换
- 多缓存层抢答
不过要注意:anyOf 返回后,其他任务默认不会自动取消。这点很多人会忽略,后面会讲。
常见坑与排查
这一节我尽量讲得“接地气”一点,都是实际项目里很常见的问题。
坑 1:误用默认线程池,导致吞吐异常
现象
- 压测时接口波动大
- CPU 不高,但响应时间抖动明显
- 异步任务堆积,日志线程名常见
ForkJoinPool.commonPool-worker-*
原因
默认 commonPool 并不适合承载大量 IO 阻塞任务,比如:
- 调数据库
- 调 HTTP 接口
- 调 RPC 服务
排查建议
- 打日志输出线程名
- 看是否大量任务跑在
ForkJoinPool.commonPool - 观察线程池活跃数、队列长度、拒绝次数
建议
业务场景中显式传入自定义线程池:
CompletableFuture.supplyAsync(() -> queryRemote(), customExecutor);
坑 2:thenApply 写成了“Future 套 Future”
错误示例
CompletableFuture<CompletableFuture<User>> future =
getUserIdAsync().thenApply(this::getUserAsync);
这时后续链路会很难用。
正确写法
CompletableFuture<User> future =
getUserIdAsync().thenCompose(this::getUserAsync);
判断口诀
返回普通值,用
thenApply;返回 Future,用thenCompose。
坑 3:异常被包装后看不清根因
join() 抛的是 CompletionException,get() 抛的是 ExecutionException。
真正的业务异常,通常藏在 getCause() 里。
示例
try {
future.join();
} catch (CompletionException e) {
Throwable root = e.getCause();
root.printStackTrace();
}
排查建议
线上日志不要只打印:
log.error("future failed: {}", e.getMessage());
而要打印完整堆栈和根因。
坑 4:allOf() 成功了,但取结果时报错
严格说,allOf() 并不是“所有任务都成功”,而是“所有任务都完成”。
如果其中有异常,后续 join() 一样会炸。
建议做法
对可降级任务提前兜底:
future.exceptionally(ex -> defaultValue);
如果是关键任务,就不要吞异常,让主流程感知失败。
坑 5:超时了,但底层任务还在跑
这个坑很真实。
比如你用了:
future.orTimeout(500, TimeUnit.MILLISECONDS)
主流程超时报错了,但底层那个 HTTP/RPC 调用如果本身不支持中断,它仍可能继续占资源。
这意味着什么?
- 超时只是调用方“不等了”
- 不代表下游真的“停了”
建议
- 下游客户端本身也要配置连接超时、读超时
- 不要把
CompletableFuture超时当成唯一超时控制手段 - 真正要“止损”,要结合网络客户端超时与线程池隔离
坑 6:在回调里做重活,拖慢线程池
例如:
future.thenApply(result -> {
// 大量计算或阻塞IO
return heavyProcess(result);
});
如果这一步很重,你可能不希望它继续占用前一个阶段的执行线程。
可以改成:
future.thenApplyAsync(this::heavyProcess, customExecutor);
什么时候用 Async 版本?
- 回调逻辑耗时明显
- 回调里有阻塞操作
- 你想把不同阶段隔离到不同线程池
安全/性能最佳实践
这一节我会给偏“工程化”的建议,适合你直接带回项目里用。
1. 线程池按任务类型隔离
不要把所有异步任务都丢进一个池子。
建议至少区分:
- IO 密集型池:远程调用、数据库访问
- CPU 密集型池:计算、规则匹配、加解密
原因很简单:
- IO 会阻塞线程
- CPU 任务需要尽量少上下文切换
一个粗略经验:
- IO 密集型线程数可略大于 CPU 核数
- CPU 密集型线程数通常接近 CPU 核数
最终还是要靠压测定。
2. 聚合接口必须设置超时与降级边界
聚合接口最怕“被最慢依赖拖死”。
建议给每个下游定义清楚:
- 是否核心依赖
- 最大等待时间
- 超时后的默认值
- 是否允许空结果返回
例如:
| 下游服务 | 是否核心 | 超时策略 | 降级策略 |
|---|---|---|---|
| 商品信息 | 是 | 500ms | 失败即整体失败 |
| 库存信息 | 否 | 300ms | 返回 0 或未知 |
| 价格信息 | 否 | 300ms | 返回兜底价格 |
| 营销信息 | 否 | 200ms | 返回无活动 |
这张表在接口设计时非常有用。
3. 不要滥用 join()
join() 用起来很顺手,但如果你在中间链路过早 join(),就会把异步流程重新变回同步阻塞。
不推荐
Product product = productFuture.join();
CompletableFuture<Promotion> promotionFuture = ...
这样会让当前线程卡住等待商品结果。
更推荐
CompletableFuture<Promotion> promotionFuture =
productFuture.thenCompose(product -> ...);
原则是:
尽量把阻塞留到最外层,只在真正需要拿最终结果时
join()。
4. 给异步链路补上日志上下文
这是线上排查特别关键的一点。
异步一多,请求日志很容易“串不起来”。
建议至少记录:
- 请求 ID / traceId
- 阶段名称
- 开始时间、结束时间
- 执行线程名
- 是否命中降级
如果你在 Spring 项目里,还要特别注意:
ThreadLocal上下文不会自动跨线程传递- MDC 日志上下文可能丢失
- 安全上下文也可能丢失
这时可以考虑:
- 手工透传上下文
- 自定义
Executor - 用装饰器包装任务
5. 对共享状态保持克制
异步回调里最怕随手改共享对象,例如:
Map<String, Object> map = new HashMap<>();
future1.thenAccept(v -> map.put("a", v));
future2.thenAccept(v -> map.put("b", v));
这在多线程下可能有并发问题。
建议:
- 优先使用不可变对象
- 在最终汇总时一次性组装
- 如果确实要共享写入,用线程安全容器并明确边界
6. 不要把 CompletableFuture 当消息队列
它适合:
- 单机内并发编排
- 请求生命周期内的异步组合
它不适合直接承担:
- 持久化任务队列
- 跨进程可靠投递
- 长时间后台任务管理
如果你的任务需要:
- 断点续跑
- 重试补偿
- 持久化状态
- 跨服务异步解耦
那更适合 MQ、调度系统或工作流引擎。
逐步验证清单
如果你准备把 CompletableFuture 用到项目里,我建议按下面顺序验证,而不是一步上复杂编排。
第一步:验证并发是否真的生效
- 看总耗时是否接近最慢任务,而不是所有任务之和
- 打印线程名,确认确实在线程池中运行
第二步:验证异常传播路径
- 人为让某个任务抛异常
- 确认主流程是否按预期失败或降级
- 检查日志里能否看到真实根因
第三步:验证超时行为
- 人为让某个任务 sleep 更久
- 确认是否命中
orTimeout/completeOnTimeout - 确认超时后整体 SLA 是否达标
第四步:验证线程池边界
- 压测时观察活跃线程数、队列长度、拒绝策略
- 确认是否发生线程池打满
- 确认是否有任务堆积导致雪崩
第五步:验证降级结果是否可接受
- 页面/接口拿到默认值后,前端或调用方是否能正确处理
- 不要只关注“程序没报错”,还要看“业务是否能兜住”
一个更贴近生产的取舍建议
很多文章会把 CompletableFuture 讲得像“银弹”,但我想提醒一个边界:
适合用它的情况
- 一个请求里有多个下游调用
- 任务依赖关系清晰
- 需要并发提速
- 需要在本进程内完成聚合与降级
不太适合的情况
- 链路过长,回调嵌套层数太深
- 业务状态机复杂,有很多分支和补偿
- 涉及跨服务长事务
- 需要持久化、重试、审计、人工介入
如果你的业务已经发展到“异步步骤很多、失败恢复复杂”,那就应该考虑:
- 领域工作流
- 任务调度平台
- MQ + 消费者编排
- 工作流引擎
换句话说:
CompletableFuture很适合做“请求内并发编排”,但不应该无限承担“系统级异步流程”的职责。
总结
CompletableFuture 真正厉害的地方,不是“把代码改成异步”,而是让你能更自然地表达这些关系:
- 哪些任务可以并行
- 哪些任务必须依赖前置结果
- 哪些任务失败可以降级
- 哪些任务超时就该止损
如果你刚开始在项目里落地,我给你的可执行建议是:
- 先从聚合查询场景开始,这是最容易见效的入口
- 始终使用自定义线程池,不要默认依赖
commonPool - 优先掌握
thenApply / thenCompose / thenCombine / allOf / exceptionally,这几个就能覆盖大多数场景 - 把超时、降级、日志打通,否则异步代码上线后很难排查
- 只在最外层
join(),中途尽量保持链式编排,不要过早阻塞
最后给你一个简短判断标准:
- 如果你现在的代码是“多个接口串行查、总耗时长、异常处理散”,那
CompletableFuture值得上。 - 如果你的业务已经演变成“复杂工作流 + 补偿 + 状态持久化”,那就别硬扛,应该升级架构手段。
学会 CompletableFuture 后,你会发现并发代码不一定非得写得很“硬核”,也可以写得清楚、克制,而且足够工程化。