Java 中基于 CompletableFuture 的异步编排实战:从并行聚合到超时控制与异常恢复
在 Java 服务开发里,“查多个下游,再拼成一个结果返回” 是非常常见的需求。比如一个商品详情页,可能要同时查:
- 商品基础信息
- 价格
- 库存
- 营销活动
- 用户个性化推荐
如果全都串行调用,延迟会被一层层叠加;但如果只是“把线程开起来”,又很容易落入线程池乱用、超时失控、异常吞没的坑。
这篇文章我想换一个更偏实战的角度,带你把 CompletableFuture 真正用起来:先做并行聚合,再做超时控制,最后补上异常恢复与排查手段。你看完后,应该能直接把这套模式带到业务代码里。
背景与问题
先看一个典型场景:聚合商品详情接口。
假设我们有一个 /product/{id} 接口,需要同时调用 4 个远程服务:
- 商品服务:基础信息
- 价格服务:价格
- 库存服务:库存
- 营销服务:优惠信息
如果串行写,大概是这样:
ProductInfo info = productService.getInfo(productId);
Price price = priceService.getPrice(productId);
Stock stock = stockService.getStock(productId);
Promotion promotion = promotionService.getPromotion(productId);
return ProductView.of(info, price, stock, promotion);
问题很直接:
- 总耗时接近四次调用之和
- 某个下游慢,整体就慢
- 某个下游挂,整个请求可能直接失败
- 很难精细控制“哪些字段允许降级,哪些必须成功”
而实际线上系统通常要求:
- 尽量并行
- 必须设置超时
- 允许部分失败时降级返回
- 关键依赖失败时快速失败
- 线程池可控,不拖垮系统
这正是 CompletableFuture 的典型用武之地。
前置知识与环境准备
本文示例基于:
- JDK 9+(会用到
orTimeout/completeOnTimeout) - 更推荐 JDK 17+
- 示例代码可直接运行在普通 Java 项目中
如果你对 Future 很熟,但对 CompletableFuture 还没形成体系,先记住一句:
Future更像“拿结果”,CompletableFuture更像“描述一条异步处理流水线”。
核心原理
1. CompletableFuture 解决的不是“异步”本身,而是“异步之间怎么编排”
它提供了几类关键能力:
- 创建异步任务:
supplyAsync、runAsync - 结果转换:
thenApply、thenCompose - 任务组合:
thenCombine、allOf、anyOf - 异常处理:
exceptionally、handle、whenComplete - 超时控制:
orTimeout、completeOnTimeout
你可以把它理解成:每个异步任务是一个节点,而 CompletableFuture 是把这些节点串起来的“胶水”。
2. 两类最常见编排:并行聚合 vs 依赖串联
并行聚合
多个任务互不依赖,可以同时执行,最后汇总结果。
比如:
- 查价格
- 查库存
- 查营销
这类适合:
allOfthenCombine
依赖串联
后一个任务依赖前一个结果。
比如:
- 先查用户
- 再根据用户等级查优惠
这类适合:
thenCompose
很多人一上来就把所有东西塞进 allOf,这就是第一个误区:没有依赖关系才适合并行,有依赖就该串联。
3. thenApply 和 thenCompose 的区别非常关键
这是我见过最容易混淆的点之一。
thenApply
把结果映射成另一个普通值。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "42")
.thenApply(Integer::parseInt)
.thenApply(i -> "结果:" + i);
thenCompose
把结果映射成另一个异步任务,并“拍平”。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> 42L)
.thenCompose(userId -> CompletableFuture.supplyAsync(() -> "user-" + userId));
如果你用错成 thenApply,结果会变成:
CompletableFuture<CompletableFuture<String>>
这通常不是你想要的。
4. 超时不是“任务取消”的同义词
很多同学以为:
orTimeout(300, MILLISECONDS)超时了- 那底层任务就自动停止了
其实不一定。
CompletableFuture 的超时,更多是让未来结果在指定时间后以超时状态结束;但底层任务如果已经在线程里跑,未必会被真正中断。
所以超时控制要分两个层面看:
- 调用链上的结果超时
- 底层任务是否可取消、是否响应中断
这一点在远程调用、数据库调用里尤其重要。
一张图看懂异步聚合流程
flowchart LR
A[收到商品详情请求] --> B[并行调用商品服务]
A --> C[并行调用价格服务]
A --> D[并行调用库存服务]
A --> E[并行调用营销服务]
B --> F[等待所有结果]
C --> F
D --> F
E --> F
F --> G[部分失败则按策略降级]
G --> H[组装 ProductView 返回]
CompletableFuture 常用编排模型
模型一:并行执行后统一汇总
适合多个独立查询。
sequenceDiagram
participant Client
participant API
participant InfoSvc
participant PriceSvc
participant StockSvc
participant PromoSvc
Client->>API: 请求商品详情
API->>InfoSvc: 异步查询基础信息
API->>PriceSvc: 异步查询价格
API->>StockSvc: 异步查询库存
API->>PromoSvc: 异步查询营销
InfoSvc-->>API: 返回
PriceSvc-->>API: 返回
StockSvc-->>API: 返回
PromoSvc-->>API: 返回
API-->>Client: 聚合结果
模型二:先查 A,再根据 A 查 B
CompletableFuture<UserDiscount> future = CompletableFuture
.supplyAsync(() -> userService.getUser(userId), executor)
.thenCompose(user -> CompletableFuture.supplyAsync(
() -> discountService.getDiscount(user.getLevel()),
executor
));
模型三:一个任务失败,但整体可降级
CompletableFuture<Promotion> promotionFuture = CompletableFuture
.supplyAsync(() -> promotionService.getPromotion(productId), executor)
.completeOnTimeout(Promotion.defaultValue(), 300, TimeUnit.MILLISECONDS)
.exceptionally(ex -> Promotion.defaultValue());
这类写法非常适合“锦上添花”的字段,例如推荐、营销角标、埋点增强信息。
实战代码(可运行)
下面我们写一个完整的小型示例,模拟商品详情聚合。
为了方便本地运行,我用 sleep 模拟远程调用延迟。
示例目标
- 4 个下游并行调用
- 价格服务超时则降级
- 营销服务异常则降级
- 商品基础信息失败则整体失败
- 最终拼装为
ProductView
完整示例代码
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class CompletableFutureOrchestrationDemo {
private static final ExecutorService BIZ_EXECUTOR = new ThreadPoolExecutor(
8,
16,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
private int index = 1;
@Override
public Thread newThread(Runnable r) {
Thread t = defaultFactory.newThread(r);
t.setName("biz-exec-" + index++);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) {
try {
ProductView view = getProductView(1001L);
System.out.println("聚合结果 => " + view);
} finally {
BIZ_EXECUTOR.shutdown();
}
}
public static ProductView getProductView(Long productId) {
long start = System.currentTimeMillis();
CompletableFuture<ProductInfo> infoFuture = CompletableFuture
.supplyAsync(logged("queryProductInfo", () -> queryProductInfo(productId)), BIZ_EXECUTOR)
.orTimeout(800, TimeUnit.MILLISECONDS);
CompletableFuture<Price> priceFuture = CompletableFuture
.supplyAsync(logged("queryPrice", () -> queryPrice(productId)), BIZ_EXECUTOR)
.completeOnTimeout(Price.defaultValue(), 300, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
System.out.println("[WARN] 价格查询失败,使用默认价格: " + ex.getMessage());
return Price.defaultValue();
});
CompletableFuture<Stock> stockFuture = CompletableFuture
.supplyAsync(logged("queryStock", () -> queryStock(productId)), BIZ_EXECUTOR)
.orTimeout(500, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
System.out.println("[WARN] 库存查询失败,使用默认库存: " + ex.getMessage());
return Stock.unknown();
});
CompletableFuture<Promotion> promotionFuture = CompletableFuture
.supplyAsync(logged("queryPromotion", () -> queryPromotion(productId)), BIZ_EXECUTOR)
.completeOnTimeout(Promotion.defaultValue(), 200, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
System.out.println("[WARN] 营销查询失败,使用默认营销: " + ex.getMessage());
return Promotion.defaultValue();
});
CompletableFuture<ProductView> resultFuture = CompletableFuture
.allOf(infoFuture, priceFuture, stockFuture, promotionFuture)
.thenApply(v -> ProductView.of(
infoFuture.join(),
priceFuture.join(),
stockFuture.join(),
promotionFuture.join()
))
.whenComplete((result, ex) -> {
long cost = System.currentTimeMillis() - start;
if (ex != null) {
System.out.println("[ERROR] 商品详情聚合失败, cost=" + cost + "ms, ex=" + ex.getMessage());
} else {
System.out.println("[INFO] 商品详情聚合成功, cost=" + cost + "ms");
}
});
return resultFuture.join();
}
private static <T> Supplier<T> logged(String action, Supplier<T> supplier) {
return () -> {
long start = System.currentTimeMillis();
try {
T result = supplier.get();
System.out.println("[INFO] " + action + " success, thread=" + Thread.currentThread().getName()
+ ", cost=" + (System.currentTimeMillis() - start) + "ms");
return result;
} catch (Exception e) {
System.out.println("[ERROR] " + action + " failed, thread=" + Thread.currentThread().getName()
+ ", cost=" + (System.currentTimeMillis() - start) + "ms, ex=" + e.getMessage());
throw e;
}
};
}
private static ProductInfo queryProductInfo(Long productId) {
sleep(120);
return new ProductInfo(productId, "机械键盘", "三模热插拔");
}
private static Price queryPrice(Long productId) {
sleep(350); // 故意超过 completeOnTimeout 300ms
return new Price(new BigDecimal("399.00"), "CNY");
}
private static Stock queryStock(Long productId) {
sleep(180);
return new Stock(56, true);
}
private static Promotion queryPromotion(Long productId) {
sleep(100);
throw new RuntimeException("promotion service unavailable");
}
private static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread interrupted", e);
}
}
static class ProductInfo {
private final Long productId;
private final String name;
private final String description;
public ProductInfo(Long productId, String name, String description) {
this.productId = productId;
this.name = name;
this.description = description;
}
@Override
public String toString() {
return "ProductInfo{productId=" + productId + ", name='" + name + "', description='" + description + "'}";
}
}
static class Price {
private final BigDecimal amount;
private final String currency;
public Price(BigDecimal amount, String currency) {
this.amount = amount;
this.currency = currency;
}
public static Price defaultValue() {
return new Price(BigDecimal.ZERO, "CNY");
}
@Override
public String toString() {
return "Price{amount=" + amount + ", currency='" + currency + "'}";
}
}
static class Stock {
private final int available;
private final boolean inStock;
public Stock(int available, boolean inStock) {
this.available = available;
this.inStock = inStock;
}
public static Stock unknown() {
return new Stock(0, false);
}
@Override
public String toString() {
return "Stock{available=" + available + ", inStock=" + inStock + "}";
}
}
static class Promotion {
private final String label;
public Promotion(String label) {
this.label = label;
}
public static Promotion defaultValue() {
return new Promotion("暂无优惠");
}
@Override
public String toString() {
return "Promotion{label='" + label + "'}";
}
}
static class ProductView {
private final ProductInfo info;
private final Price price;
private final Stock stock;
private final Promotion promotion;
private final LocalDateTime generatedAt;
public ProductView(ProductInfo info, Price price, Stock stock, Promotion promotion, LocalDateTime generatedAt) {
this.info = info;
this.price = price;
this.stock = stock;
this.promotion = promotion;
this.generatedAt = generatedAt;
}
public static ProductView of(ProductInfo info, Price price, Stock stock, Promotion promotion) {
return new ProductView(info, price, stock, promotion, LocalDateTime.now());
}
@Override
public String toString() {
return "ProductView{" +
"info=" + info +
", price=" + price +
", stock=" + stock +
", promotion=" + promotion +
", generatedAt=" + generatedAt +
'}';
}
}
}
逐步拆解这段代码
1. 为什么不用默认线程池?
很多示例喜欢这么写:
CompletableFuture.supplyAsync(() -> queryPrice(productId));
如果不传 executor,默认会走 ForkJoinPool.commonPool()。
它不是不能用,但在业务服务里,我通常不建议直接依赖默认线程池,原因有三个:
- 线程资源不可控
- 容易和其他异步任务互相影响
- 排查问题时不容易看出任务来源
所以示例里我显式创建了 BIZ_EXECUTOR。
2. 关键依赖用 orTimeout
商品基础信息是核心字段,没有它,详情页就没有意义:
CompletableFuture<ProductInfo> infoFuture = CompletableFuture
.supplyAsync(() -> queryProductInfo(productId), BIZ_EXECUTOR)
.orTimeout(800, TimeUnit.MILLISECONDS);
它的含义是:
- 800ms 内拿到结果,继续
- 超过 800ms,future 以超时异常结束
这适合必须成功的依赖。
3. 可降级依赖用 completeOnTimeout
价格服务我们做降级处理:
.completeOnTimeout(Price.defaultValue(), 300, TimeUnit.MILLISECONDS)
它和 orTimeout 的区别很重要:
orTimeout:超时后抛异常completeOnTimeout:超时后直接给默认值
所以:
- 核心路径:优先
orTimeout - 可降级字段:优先
completeOnTimeout
4. 异常恢复用 exceptionally
营销服务如果挂了,返回默认营销信息:
.exceptionally(ex -> Promotion.defaultValue())
这非常实用,但别滥用。
如果你对所有异常都“一把梭降级”,最后系统表面上“很稳定”,实际问题全被吃掉了,监控却看不出来。
我自己一般会这么区分:
- 对用户体验影响较小的字段:允许
exceptionally - 对交易、库存、支付等关键链路:谨慎降级,优先失败可见
5. 汇总时为什么用 join() 而不是 get()?
在聚合阶段:
.thenApply(v -> ProductView.of(
infoFuture.join(),
priceFuture.join(),
stockFuture.join(),
promotionFuture.join()
))
这里我更喜欢 join(),因为它不用显式处理受检异常。
区别简记:
get()抛InterruptedException、ExecutionExceptionjoin()抛CompletionException
在流水线式代码里,join() 通常更顺手。
进一步实战:依赖型编排怎么写
并行聚合解决了“横向查多个服务”的问题。
再看一个“先查用户,再查用户折扣”的例子。
import java.math.BigDecimal;
import java.util.concurrent.*;
public class ThenComposeDemo {
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(4);
public static void main(String[] args) {
try {
CompletableFuture<BigDecimal> future = CompletableFuture
.supplyAsync(() -> getUser(1L), EXECUTOR)
.thenCompose(user -> CompletableFuture.supplyAsync(
() -> getDiscountByLevel(user.level()),
EXECUTOR
))
.orTimeout(500, TimeUnit.MILLISECONDS)
.exceptionally(ex -> BigDecimal.ZERO);
System.out.println("折扣 = " + future.join());
} finally {
EXECUTOR.shutdown();
}
}
static User getUser(Long userId) {
sleep(100);
return new User(userId, "VIP");
}
static BigDecimal getDiscountByLevel(String level) {
sleep(150);
return "VIP".equals(level) ? new BigDecimal("0.85") : BigDecimal.ONE;
}
static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
record User(Long id, String level) {}
}
这里如果你写成 thenApply,逻辑上就不对了,因为第二步本身已经是异步任务。
异常传播与恢复链路
理解异常怎么流动,排查时能省很多时间。
flowchart TD
A[supplyAsync 执行任务] --> B{是否抛异常?}
B -- 否 --> C[正常结果继续 thenApply/thenCompose]
B -- 是 --> D[future 进入异常完成状态]
D --> E{是否有 exceptionally/handle?}
E -- 有 --> F[恢复为默认值或转换结果]
E -- 无 --> G[继续向后传播]
F --> H[后续阶段继续执行]
G --> I[join/get 时抛出异常]
三个常用异常处理方法怎么选?
exceptionally
只在异常时执行,返回兜底值。
future.exceptionally(ex -> defaultValue);
适合简单降级。
handle
不管成功还是失败都会执行,可以统一转换结果。
future.handle((result, ex) -> {
if (ex != null) {
return defaultValue;
}
return result;
});
适合做“统一包装”。
whenComplete
更像回调观察者,通常用于日志、指标,不改结果。
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("failed", ex);
}
});
这一点很容易误会:
whenComplete 不是拿来做恢复的,它更适合做记录。
常见坑与排查
下面这些坑,我基本都见过,自己也踩过。
坑 1:把阻塞 IO 扔进默认 commonPool
如果你在 supplyAsync 里执行的是:
- HTTP 调用
- JDBC 查询
- Redis 阻塞操作
- 大量磁盘 IO
那它并不适合 ForkJoinPool.commonPool() 的设计初衷。
现象:
- 吞吐下降
- 异步任务排队严重
- CPU 不高但响应很慢
建议:
- 使用独立业务线程池
- IO 密集型线程池大小要结合压测调优
- 给线程命名,方便排查
坑 2:只设 Future 超时,不设下游客户端超时
这是线上很典型的问题。
你写了:
future.orTimeout(300, TimeUnit.MILLISECONDS)
但如果底层 HTTP 客户端没设置:
- 连接超时
- 读超时
那请求线程可能早就“超时返回”了,下游调用线程却还在耗资源。
建议:
超时至少要有两层:
- 客户端超时:如 HTTP/RPC/数据库驱动超时
- 编排超时:如
orTimeout/completeOnTimeout
坑 3:allOf 不会帮你自动收集结果
allOf 返回的是:
CompletableFuture<Void>
所以你还得手动从各个 future 里取结果:
CompletableFuture.allOf(f1, f2, f3)
.thenApply(v -> List.of(f1.join(), f2.join(), f3.join()));
很多人第一次用会疑惑:
“我都 allOf 了,结果去哪了?”
答案是:还在原来的 future 里。
坑 4:异常被包装,看不清根因
join() 常抛的是 CompletionException,根因在 getCause() 里。
排查时别只打印:
ex.getMessage()
最好把完整堆栈和根因打出来。
try {
future.join();
} catch (CompletionException ex) {
Throwable cause = ex.getCause();
cause.printStackTrace();
}
坑 5:降级值不合理,导致脏数据扩散
比如价格超时后返回 0,前端直接展示成“0 元秒杀”,那就出事故了。
降级不是“随便给个默认值”:
- 价格:可能要返回“价格暂不可用”
- 库存:可能要返回“库存状态未知”
- 营销:可以返回“暂无优惠”
降级值必须符合业务语义。
坑 6:线程池队列太大,导致延迟雪崩
很多系统线程池这样配:
new LinkedBlockingQueue<>(100000)
看起来很稳,实际上问题更大:
- 请求高峰时任务拼命进队列
- 不会及时触发拒绝策略
- 最终用户拿到的是“非常慢的成功”甚至超时
我更建议:
- 有界队列
- 合理拒绝策略
- 配合限流、熔断
排查思路:异步聚合接口慢,到底慢在哪?
我一般按这条线排:
第一步:确认是线程池问题还是下游问题
看线程池指标:
- 活跃线程数
- 队列长度
- 拒绝次数
- 任务平均执行时长
如果线程池已经打满,就先别盯着下游服务了。
第二步:给每个子任务打独立耗时日志
不要只记总耗时,应该像示例里那样给每个调用埋点:
queryProductInfo cost=...queryPrice cost=...queryStock cost=...
否则你只知道“聚合慢”,不知道是谁慢。
第三步:区分超时、异常、降级命中率
建议监控至少拆成:
- 下游调用成功率
- 下游调用超时率
- fallback 命中率
- 聚合接口整体成功率
- 聚合接口 P95 / P99 延迟
如果 fallback 命中率突然升高,但接口成功率仍然很高,那说明系统已经在“带病运行”了。
安全/性能最佳实践
这一节我尽量说能落地的。
1. 业务线程池一定要隔离
不同类型任务不要共用一个线程池:
- 页面聚合
- 导出任务
- 消息消费
- 异步通知
这些混在一起,很容易互相拖垮。
2. 给每个依赖定义“重要性等级”
我常用一个简单分层:
- P0:必须成功
失败就直接失败,例如订单创建核心校验 - P1:可短时失败但要告警
如库存快照、非关键画像 - P2:纯增强信息,可静默降级
如推荐位、营销标签
这样你才知道该用:
orTimeoutcompleteOnTimeoutexceptionally
而不是全凭感觉写。
3. 超时值不要拍脑袋
超时配置建议来自:
- 下游服务 SLA
- 压测结果
- P95/P99 延迟
- 用户体验要求
例如接口总预算 500ms,你有 4 个并行任务,不代表每个都配 500ms。
因为还要留出:
- 网络抖动
- 结果组装
- 序列化
- 网关转发
4. 关键任务避免无脑 join()
虽然 join() 很方便,但也要注意:
- 在错误位置调用
join(),会把异步链重新变成阻塞 - 在请求线程里提前
join(),会损失并行收益
正确姿势是:
- 尽量在最后统一汇总
- 中途优先用
thenCompose/thenCombine继续编排
5. 做好上下文透传
异步任务切线程后,容易丢失:
- TraceId
- 用户上下文
- MDC 日志上下文
- 租户信息
这会导致日志串不起来。
如果你在线上排查过一次“明明出错了但 trace 对不上”,就会知道这事多痛苦。
常见方案:
- 自定义
Executor做上下文包装 - 使用日志框架 MDC 透传工具
- 在链路追踪框架里接管线程池
6. 避免在异步任务里操作不安全共享对象
比如多个 future 同时写同一个 ArrayList、HashMap,很容易出现并发问题。
错误示例:
List<String> result = new ArrayList<>();
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> result.add("A"), executor);
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> result.add("B"), executor);
更稳妥的方式是:
- 每个任务独立返回结果
- 最后统一汇总
- 或使用线程安全容器
7. 只对“值得异步”的地方异步
不是所有逻辑都适合上 CompletableFuture。
如果只是:
- 本地内存计算
- 极轻量同步逻辑
- 明显串行依赖链
强行异步只会:
- 增加代码复杂度
- 增加线程切换
- 增加排查难度
我的经验是:
下游调用明显独立、耗时可观、整体延迟敏感时,再用异步编排最划算。
一份可执行的验证清单
你把 CompletableFuture 用到真实业务前,可以按这份清单自测。
功能正确性
- 并行任务都能正常返回
- 某个非关键任务异常时,整体仍可返回
- 某个关键任务超时时,整体会失败
- 降级字段值符合业务语义
超时与异常
- 编排层设置了超时
- 下游客户端也设置了连接/读超时
- 日志能看到根因异常
- fallback 命中会被监控到
线程池与性能
- 使用独立业务线程池
- 队列是有界的
- 线程名称可识别
- 压测下没有明显排队膨胀
可观测性
- 每个子任务有耗时日志或指标
- 聚合总耗时有指标
- 超时率、异常率、降级率可观测
- TraceId 能贯穿异步线程
什么时候不建议用 CompletableFuture?
虽然它很好用,但也不是银弹。以下情况我会谨慎:
1. 编排关系极其复杂
如果存在:
- 大量条件分支
- 多阶段状态流转
- 补偿逻辑
- 长时任务
这时候 CompletableFuture 链可能会变得非常绕。
更适合考虑:
- 工作流引擎
- 消息驱动编排
- Reactor / 响应式方案
2. 你只是想做简单并发执行
如果只是几个固定任务并发跑,且不需要复杂异常链路,ExecutorService + Future 有时更直白。
不要为了“高级感”把简单问题复杂化。
3. 团队对异步排查经验不足
异步代码最难的不是写出来,而是线上出问题时能不能迅速定位。
如果团队暂时缺少:
- 线程池治理
- 指标监控
- Trace 上下文透传
- 异常链路分析
那就先从小范围、低风险场景落地。
总结
CompletableFuture 最有价值的地方,不只是“开异步”,而是让你能把业务依赖之间的关系写清楚:
- 独立任务并行聚合:用
allOf、thenCombine - 依赖任务串联:用
thenCompose - 关键任务超时失败:用
orTimeout - 非关键任务超时降级:用
completeOnTimeout - 异常恢复:用
exceptionally或handle - 日志与监控:用
whenComplete
如果你要把它真正用在线上,我建议按这个顺序落地:
- 先把独立下游改成并行聚合
- 再给每个依赖补上超时策略
- 然后区分关键依赖和可降级依赖
- 最后补全线程池治理、日志、指标和上下文透传
一句话收尾:
CompletableFuture写得好,是“用更少时间等结果”;写不好,就是“用更多线程等事故”。
如果你现在正准备改一个聚合接口,不妨先从本文的商品详情示例照着跑一遍,再替换成你自己的下游调用。这个过程非常值得。