Java 中基于 CompletableFuture 与线程池的异步任务编排实战与性能优化
在日常 Java 服务开发里,**“把几个慢操作并发做掉”**几乎是高频需求:查用户信息、拉订单、查库存、拼装推荐数据、调用多个下游接口……如果还按串行方式写,响应时间通常会被最长链路拖垮。
很多人第一次接触 CompletableFuture 时,会觉得它很强,但一上手就容易出现几个问题:
- 代码能跑,但线程池用得不对,性能反而更差
- 任务一多,日志难追踪,异常像“消失了”
join()、get()、allOf()、thenCompose()分不清- 线上偶尔超时,CPU 又不高,问题不好定位
这篇文章我不打算只讲 API,而是从**“为什么要这样编排”**出发,带你写一个可运行的示例,并把性能优化、常见坑、排查方式一起串起来。
背景与问题
先看一个非常典型的业务场景:构建商品详情页。
一次请求里,可能要拿到:
- 商品基础信息
- 价格信息
- 库存信息
- 营销标签
- 推荐商品列表
如果这些数据都来自数据库或远程服务,串行代码大概是这样:
ProductInfo info = productService.queryInfo(productId);
Price price = priceService.queryPrice(productId);
Stock stock = stockService.queryStock(productId);
Marketing marketing = marketingService.queryMarketing(productId);
List<RecommendItem> recommend = recommendService.queryRecommend(productId);
假设每个调用平均 100ms,那么总耗时接近 500ms,页面体验就不太理想。
但这里面很多步骤其实是互不依赖的,可以并发执行;有些步骤又有前后依赖,比如拿到商品信息后,才能根据类目查推荐。这就进入了“异步任务编排”的范畴。
什么叫异步任务编排?
不是简单地“开几个线程”,而是把任务之间的关系表达清楚:
- 哪些可以并发
- 哪些必须串行
- 哪些结果需要合并
- 哪些异常要兜底
- 哪些必须限时返回
如果你只会 submit() 然后 Future.get(),写起来很快就会陷入“线程开了不少,结果还是像同步代码”的窘境。
前置知识与环境准备
本文示例基于:
- JDK 8+
- Maven / Gradle 均可
- 读者已了解:
- Java 线程基础
- 线程池基本参数
- Lambda 表达式
如果你在生产环境使用,建议至少是 JDK 11+,因为工具链、监控、性能表现通常更成熟。
核心原理
CompletableFuture 本质上做了两件事:
- 表示一个未来结果
- 在结果完成后继续执行后续动作
相比 Future,它最大的价值不是“异步”,而是可组合。
1. 常见编排方式
并行执行:supplyAsync
适合启动一个有返回值的异步任务。
CompletableFuture<User> future = CompletableFuture.supplyAsync(() -> loadUser(userId), executor);
串行依赖:thenApply / thenCompose
thenApply:对上一步结果做转换thenCompose:把“异步套异步”拍平
CompletableFuture<Order> orderFuture = queryUser(userId)
.thenCompose(user -> queryLatestOrder(user.getId()));
如果你这里写成 thenApply,返回的就可能是 CompletableFuture<CompletableFuture<Order>>,这通常不是你想要的。
合并结果:thenCombine
两个异步任务互不依赖,但最终结果要拼在一起。
userFuture.thenCombine(orderFuture, (user, order) -> buildView(user, order));
等全部完成:allOf
多个任务一起收口,适合聚合页、批量请求等。
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
注意:allOf 本身不返回各任务结果,结果仍要从各自 future 里取。
2. 为什么线程池是关键
很多人图省事,直接用:
CompletableFuture.supplyAsync(() -> doSomething());
这会默认使用 ForkJoinPool.commonPool()。它不是不能用,但在服务端开发里往往不够稳妥,原因有三点:
- 线程池隔离性差:公共线程池会和别的异步任务互相影响
- 不适合大量阻塞 IO:例如 RPC、数据库、HTTP 调用
- 问题排查不直观:线程名、队列行为、监控粒度都不够友好
更推荐的方式是:为业务场景自定义线程池。
3. 一张图看懂任务关系
flowchart LR
A[接收商品详情请求] --> B[并发查询商品信息]
A --> C[并发查询价格]
A --> D[并发查询库存]
A --> E[并发查询营销信息]
B --> F[根据商品类目查询推荐]
C --> G[汇总结果]
D --> G
E --> G
F --> G
G --> H[返回详情页DTO]
这类图在设计接口聚合服务时非常有用。你会很快看出哪些链路可以拆并行,哪些不能。
核心 API 怎么选
这一段我用“怎么想”来讲,比死记方法名更实用。
runAsync vs supplyAsync
runAsync:没有返回值supplyAsync:有返回值
如果后面还要做组合,大多数情况下你会用 supplyAsync。
get() vs join()
两者都能拿结果,但区别是:
get()抛受检异常,必须显式处理join()抛运行时异常CompletionException
在业务代码中,join() 写起来更顺;但如果你在边界层做统一异常转换,get() 也未尝不可。
thenApply vs thenCompose
记一个简单规则:
- 同步转换用
thenApply - 异步衔接用
thenCompose
exceptionally vs handle vs whenComplete
exceptionally:出错时给默认值handle:不管成功失败,都能转换结果whenComplete:更像做日志、埋点、清理动作,不改变结果
实战代码(可运行)
下面我们写一个完整示例:模拟商品详情聚合服务。
功能目标:
- 并发查商品、价格、库存、营销
- 查到商品信息后,再按类目查推荐
- 对营销接口设置降级
- 使用自定义线程池
- 输出总耗时
你可以直接复制到一个 main 方法里运行。
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class CompletableFutureOrchestrationDemo {
private static final Random RANDOM = new Random();
private static final ThreadPoolExecutor BIZ_EXECUTOR = new ThreadPoolExecutor(
8,
16,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new NamedThreadFactory("detail-pool"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) {
try {
ProductDetail detail = buildProductDetail(1001L);
System.out.println("\n=== 最终结果 ===");
System.out.println(detail);
} finally {
BIZ_EXECUTOR.shutdown();
}
}
public static ProductDetail buildProductDetail(Long productId) {
long start = System.currentTimeMillis();
CompletableFuture<ProductInfo> infoFuture =
CompletableFuture.supplyAsync(() -> queryProductInfo(productId), BIZ_EXECUTOR);
CompletableFuture<Price> priceFuture =
CompletableFuture.supplyAsync(() -> queryPrice(productId), BIZ_EXECUTOR);
CompletableFuture<Stock> stockFuture =
CompletableFuture.supplyAsync(() -> queryStock(productId), BIZ_EXECUTOR);
CompletableFuture<Marketing> marketingFuture =
CompletableFuture.supplyAsync(() -> queryMarketing(productId), BIZ_EXECUTOR)
.exceptionally(ex -> {
System.out.println("营销信息查询失败,降级返回默认值: " + ex.getMessage());
return new Marketing("默认标签");
});
CompletableFuture<List<RecommendItem>> recommendFuture =
infoFuture.thenCompose(info ->
CompletableFuture.supplyAsync(() -> queryRecommend(info.getCategoryId()), BIZ_EXECUTOR)
);
CompletableFuture<Void> allDone = CompletableFuture.allOf(
infoFuture, priceFuture, stockFuture, marketingFuture, recommendFuture
);
ProductDetail detail = allDone.thenApply(v -> new ProductDetail(
infoFuture.join(),
priceFuture.join(),
stockFuture.join(),
marketingFuture.join(),
recommendFuture.join()
)).join();
long cost = System.currentTimeMillis() - start;
System.out.println("\n总耗时: " + cost + " ms");
return detail;
}
private static ProductInfo queryProductInfo(Long productId) {
sleep(120);
log("查询商品基础信息");
return new ProductInfo(productId, "机械键盘", 10L);
}
private static Price queryPrice(Long productId) {
sleep(100);
log("查询价格");
return new Price(new BigDecimal("399.00"));
}
private static Stock queryStock(Long productId) {
sleep(80);
log("查询库存");
return new Stock(58);
}
private static Marketing queryMarketing(Long productId) {
sleep(150);
log("查询营销信息");
if (RANDOM.nextInt(10) < 3) {
throw new RuntimeException("营销服务超时");
}
return new Marketing("满300减40");
}
private static List<RecommendItem> queryRecommend(Long categoryId) {
sleep(90);
log("查询推荐商品");
return Arrays.asList(
new RecommendItem(2001L, "游戏鼠标"),
new RecommendItem(2002L, "桌面音箱"),
new RecommendItem(2003L, "显示器挂灯")
);
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程被中断", e);
}
}
private static void log(String msg) {
System.out.println(Thread.currentThread().getName() + " | " + msg);
}
static class NamedThreadFactory implements ThreadFactory {
private final String prefix;
private int index = 1;
NamedThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public synchronized Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + "-" + index++);
t.setDaemon(false);
return t;
}
}
static class ProductInfo {
private final Long productId;
private final String name;
private final Long categoryId;
public ProductInfo(Long productId, String name, Long categoryId) {
this.productId = productId;
this.name = name;
this.categoryId = categoryId;
}
public Long getProductId() {
return productId;
}
public String getName() {
return name;
}
public Long getCategoryId() {
return categoryId;
}
@Override
public String toString() {
return "ProductInfo{productId=" + productId + ", name='" + name + "', categoryId=" + categoryId + "}";
}
}
static class Price {
private final BigDecimal amount;
public Price(BigDecimal amount) {
this.amount = amount;
}
@Override
public String toString() {
return "Price{amount=" + amount + "}";
}
}
static class Stock {
private final int available;
public Stock(int available) {
this.available = available;
}
@Override
public String toString() {
return "Stock{available=" + available + "}";
}
}
static class Marketing {
private final String label;
public Marketing(String label) {
this.label = label;
}
@Override
public String toString() {
return "Marketing{label='" + label + "'}";
}
}
static class RecommendItem {
private final Long productId;
private final String name;
public RecommendItem(Long productId, String name) {
this.productId = productId;
this.name = name;
}
@Override
public String toString() {
return "RecommendItem{productId=" + productId + ", name='" + name + "'}";
}
}
static class ProductDetail {
private final ProductInfo productInfo;
private final Price price;
private final Stock stock;
private final Marketing marketing;
private final List<RecommendItem> recommends;
public ProductDetail(ProductInfo productInfo, Price price, Stock stock,
Marketing marketing, List<RecommendItem> recommends) {
this.productInfo = productInfo;
this.price = price;
this.stock = stock;
this.marketing = marketing;
this.recommends = recommends;
}
@Override
public String toString() {
return "ProductDetail{" +
"productInfo=" + productInfo +
", price=" + price +
", stock=" + stock +
", marketing=" + marketing +
", recommends=" + recommends.stream().map(Object::toString).collect(Collectors.toList()) +
'}';
}
}
}
逐步拆解这段实战代码
第一步:先把“天然并行”的任务拆出来
商品、价格、库存、营销信息互不依赖,所以可以直接并行启动:
CompletableFuture<ProductInfo> infoFuture =
CompletableFuture.supplyAsync(() -> queryProductInfo(productId), BIZ_EXECUTOR);
类似地,再开 price、stock、marketing。
这一步的收益最直观:把总耗时从“相加”变成“取最大值附近”。
第二步:有依赖的异步链,使用 thenCompose
推荐列表依赖商品类目,必须等 infoFuture 完成:
CompletableFuture<List<RecommendItem>> recommendFuture =
infoFuture.thenCompose(info ->
CompletableFuture.supplyAsync(() -> queryRecommend(info.getCategoryId()), BIZ_EXECUTOR)
);
这里如果你写成 thenApply,逻辑也不是完全错,但 future 嵌套会让后续使用变麻烦。
第三步:非核心链路做降级
营销信息经常是“锦上添花”,不是主链路核心数据。这个时候不要让一个营销服务超时把整个详情页拖死。
CompletableFuture<Marketing> marketingFuture =
CompletableFuture.supplyAsync(() -> queryMarketing(productId), BIZ_EXECUTOR)
.exceptionally(ex -> new Marketing("默认标签"));
这是我非常建议你在业务里明确区分的一点:
- 强依赖:失败就整体失败
- 弱依赖:失败可降级、可兜底
把依赖分层,异步编排才真正有业务价值。
第四步:统一收口
CompletableFuture<Void> allDone = CompletableFuture.allOf(
infoFuture, priceFuture, stockFuture, marketingFuture, recommendFuture
);
当所有任务结束后,再统一拼结果:
ProductDetail detail = allDone.thenApply(v -> new ProductDetail(
infoFuture.join(),
priceFuture.join(),
stockFuture.join(),
marketingFuture.join(),
recommendFuture.join()
)).join();
这种写法可读性不错,也便于后面扩展字段。
再看一张时序图
sequenceDiagram
participant Client as 客户端
participant Service as 聚合服务
participant Pool as 业务线程池
participant P as 商品服务
participant R as 价格服务
participant S as 库存服务
participant M as 营销服务
participant RC as 推荐服务
Client->>Service: 请求商品详情
Service->>Pool: 提交商品/价格/库存/营销任务
Pool->>P: 查询商品信息
Pool->>R: 查询价格
Pool->>S: 查询库存
Pool->>M: 查询营销
P-->>Service: 返回商品信息
Service->>Pool: 基于类目提交推荐任务
Pool->>RC: 查询推荐商品
R-->>Service: 返回价格
S-->>Service: 返回库存
M-->>Service: 返回营销/异常降级
RC-->>Service: 返回推荐列表
Service-->>Client: 聚合后的详情结果
线程池怎么配才更靠谱
这里是很多线上问题的根源。
一个常用原则:线程池要按任务类型分开
不要把所有异步任务都扔一个池子里。至少要按以下思路隔离:
- CPU 密集型:比如复杂计算、规则引擎
- IO 密集型:比如 RPC、HTTP、数据库查询
- 关键链路 与 非关键链路
因为这几类任务对线程数和队列长度的需求完全不同。
参数怎么估
对于 IO 密集型任务,一个经验值是线程数可以适当大于 CPU 核数。但别上来就配几百上千,先看:
- 平均响应时间
- 峰值并发
- 下游最大承载
- 是否存在大量阻塞等待
一个简化思路:
线程数 ≈ CPU核心数 * (1 + 平均等待时间 / 平均计算时间)
这不是绝对公式,但比拍脑袋强很多。
队列不要无脑用无界队列
如果你这么配:
new LinkedBlockingQueue<>()
默认是超大容量,问题在于:
- 请求高峰时任务一直堆积
- 线程池不扩容
- 延迟越来越大
- 最后可能把内存拖爆
更务实的方式是:
- 用有界队列
- 配合理性的拒绝策略
- 在上游做限流、降级、熔断
拒绝策略别只会 AbortPolicy
本文示例用了:
new ThreadPoolExecutor.CallerRunsPolicy()
它的特点是:线程池满了以后,让提交任务的线程自己执行,起到一定的“反压”效果。
适用场景:
- 可以接受请求处理线程变慢
- 希望在过载时自然削峰
但也有边界:
- 如果提交线程本身就是 Tomcat/Netty 业务线程,可能拖慢入口吞吐
- 对强实时接口不一定合适
所以拒绝策略没有万能答案,要结合入口线程模型一起看。
常见坑与排查
这一节我尽量讲得“像线上故障”。
坑 1:用了 CompletableFuture,结果还是串行
错误写法很常见:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> task1(), executor);
String r1 = f1.join();
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> task2(), executor);
String r2 = f2.join();
看起来异步,实际上你每启动一个就立刻阻塞等待,最后还是串行。
正确思路是:先都发出去,再统一收口。
坑 2:默认线程池把服务拖慢
如果你没传自定义线程池,默认会走公共池。在线上服务中,一旦任务多、阻塞多,很容易出现:
- 请求耗时抖动
- 部分异步任务排队
- 难以定位哪个业务把池子打满
排查建议:
- 打印线程名,确认是否跑在
ForkJoinPool.commonPool - 观察线程池活跃线程数、队列长度、拒绝次数
- 区分是 CPU 打满,还是大量线程阻塞
坑 3:异常被吞,日志里像没发生过
例如:
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("出错");
}, executor);
如果你没有 join()、没有异常处理、没有日志钩子,异常可能就这么悄悄结束了。
建议至少加一种:
future.whenComplete((result, ex) -> {
if (ex != null) {
logError(ex);
}
});
或者:
future.exceptionally(ex -> {
logError(ex);
return defaultValue;
});
坑 4:allOf() 完成了,但结果不好取
很多人以为:
CompletableFuture.allOf(f1, f2, f3)
能直接拿到三个结果。实际上不行,它只表示“都完成了”。
你仍需要:
f1.join();
f2.join();
f3.join();
所以更好的做法是:在 allOf() 之后集中组装 DTO。
坑 5:线程池里的任务相互等待,造成“假死”
这是我踩过一次印象很深的坑:线程池大小只有 8,但每个异步任务里又去提交子任务,并马上 join() 等待,结果池里的线程彼此等来等去,吞吐突然大降。
典型风险场景:
- 小线程池
- 任务内部再次提交同池任务
- 同步等待子任务完成
解决方向:
- 避免在池内任务里阻塞等待同池子任务
- 子任务拆到独立线程池
- 尽量改成链式编排,而不是中途
join()
一张“异常与降级”流程图
flowchart TD
A[异步任务启动] --> B{执行成功?}
B -- 是 --> C[返回正常结果]
B -- 否 --> D{是否核心依赖?}
D -- 是 --> E[记录异常并快速失败]
D -- 否 --> F[降级默认值]
E --> G[统一组装响应]
F --> G
C --> G
这张图的重点是:不是所有异常都应该同样处理。核心链路和边缘链路,策略必须不同。
安全/性能最佳实践
这里我把一些更贴近生产的建议集中列出来。
1. 一定要设置超时
如果你使用的是 JDK 9+,可以考虑:
future.orTimeout(300, TimeUnit.MILLISECONDS)
.exceptionally(ex -> defaultValue);
或者:
future.completeOnTimeout(defaultValue, 300, TimeUnit.MILLISECONDS);
如果是 JDK 8,也建议在业务层实现超时控制,或者依赖底层 RPC/HTTP 客户端的超时设置。
重点:异步不等于不会超时,反而更容易因为“多个下游一起慢”而放大问题。
2. 不要把阻塞 IO 和计算密集任务混在一个池里
例如:
- 查数据库、调 HTTP:IO 密集型
- 大对象序列化、复杂规则计算:CPU 密集型
混在一个池子里,往往表现为:
- CPU 任务抢不到线程
- IO 阻塞把池子占满
- 延迟长尾明显
所以分池隔离是最朴素也最有效的优化。
3. 线程名一定要有业务语义
比如:
new NamedThreadFactory("detail-pool")
看似小事,排查时非常重要。日志里一眼看到 detail-pool-3,你马上知道这是详情聚合线程,不至于在一堆 pool-17-thread-4 里迷路。
4. 谨慎使用大队列
大队列会制造一种“系统还能扛”的假象,实际上只是把压力延后。最终表现为:
- 平均耗时看似还行
- P99/P999 飙升
- 业务超时大量出现
在高并发服务里,我更倾向于:
- 小一些的有界队列
- 清晰的拒绝策略
- 失败可观测
- 降级比堆积更重要
5. 异步链路要可观测
至少记录这些指标:
- 线程池活跃线程数
- 队列长度
- 任务提交数 / 完成数 / 拒绝数
- 各下游调用耗时
- 降级次数
- 超时次数
如果有链路追踪系统,最好把异步任务也串上 TraceId。否则你会发现:主请求日志很清楚,但异步子任务像“断线”了一样。
6. 上下文传递要显式处理
比如这些内容:
- TraceId / 请求 ID
- 用户上下文
- MDC 日志上下文
- 安全上下文
线程切换后,这些上下文通常不会自动传递。如果你依赖 MDC 打日志,不做处理就容易出现异步日志没有 traceId 的情况。
解决方式通常有:
- 包装
Runnable/Callable - 自定义线程池装饰器
- 使用支持上下文透传的框架或工具库
7. 不要为了“全异步”而异步
这是个很现实的建议。
如果一个接口只有两个本地轻量计算,串行执行更简单、可维护性更高。异步编排更适合:
- 多个慢 IO 调用
- 聚合查询
- 有明显依赖关系的任务流水线
- 需要降级和超时控制的链路
如果收益不明显,别把代码复杂度平白拉高。
一个简短的性能对比思路
假设:
- 商品信息:120ms
- 价格:100ms
- 库存:80ms
- 营销:150ms
- 推荐:90ms(依赖商品)
串行耗时
120 + 100 + 80 + 150 + 90 = 540ms
合理异步编排后的耗时
第一阶段并行:
- 商品 120ms
- 价格 100ms
- 库存 80ms
- 营销 150ms
第二阶段依赖商品查推荐:
- 推荐 90ms
总耗时约:
max(120,100,80,150) 与 商品->推荐链取更长
≈ max(150, 120 + 90)
≈ 210ms
从 540ms 到 210ms,收益很可观。这也是为什么聚合接口特别适合 CompletableFuture。
当然,真实线上还要考虑:
- 线程切换成本
- 下游限流
- 队列排队
- 超时与重试
- JVM 压力
所以压测必不可少。
逐步验证清单
如果你准备把这套方式落到项目里,可以按这个清单做:
开发阶段
- 区分清楚任务依赖关系
- 所有异步任务都明确指定线程池
- 核心依赖与非核心依赖分开处理
- 对关键任务设置超时与降级
- 日志包含线程名、请求标识、耗时
联调阶段
- 验证并发逻辑是否真的并行
- 模拟某个下游超时,看是否按预期降级
- 模拟线程池满,确认拒绝策略行为
- 检查异常是否都能被观测到
上线前
- 配置线程池监控
- 设置告警阈值:队列积压、拒绝数、超时数
- 做一次压测,关注 P95/P99
- 评估下游服务是否承受新增并发
总结
CompletableFuture 真正的价值,不是“把同步代码改成异步语法”,而是把任务关系、失败策略、资源隔离表达清楚。
你可以记住这几个最重要的落地建议:
- 先分析依赖关系,再写异步代码
- 永远优先使用自定义线程池,不要默认公共池
- 先全部发起,再统一收口,避免伪并行
- 核心链路失败就失败,非核心链路要敢于降级
- 线程池参数、超时、监控要一起设计,不要只写业务代码
- 异步不是银弹,适合慢 IO、多依赖聚合场景
如果你现在正维护一个“接口里连调多个下游”的服务,完全可以先挑一个最典型的详情聚合接口试试:把天然并行的步骤拆出来,用独立线程池执行,再补齐超时、降级、监控。通常第一版就能看到明显收益。
真正难的从来不是 API,而是:你是否知道哪些地方该并发,哪些地方必须克制。