Java 中基于 CompletableFuture 的并发编排实战:从异步聚合到超时控制与线程池调优
在 Java 后端开发里,CompletableFuture 几乎是“异步编排”的标配工具之一。很多人第一次接触它,往往停留在 supplyAsync()、thenApply() 这些 API 上;但一到线上,就会遇到真正棘手的问题:
- 多个远程接口怎么并发聚合?
- 某个下游接口很慢,怎么设置超时,不要把整体请求拖死?
- 线程池到底该怎么配,为什么 CPU 不高却还是慢?
join()、get()、allOf()一起用时,异常为什么看起来“消失了”?- 为什么本地压测很顺,线上却突然线程打满、延迟抖动?
这篇文章我不打算只讲 API 名字,而是带你从一个典型的“聚合查询”场景出发,把 CompletableFuture 真正在项目里怎么用、怎么避坑、怎么调优,完整走一遍。
背景与问题
先看一个很常见的业务场景:用户打开商品详情页,服务端需要同时查询:
- 商品基础信息
- 库存信息
- 价格信息
- 营销活动信息
如果按串行方式调用,下游每个接口假设耗时分别是:
- 商品:80ms
- 库存:120ms
- 价格:60ms
- 营销:150ms
串行总耗时大约就是:
80 + 120 + 60 + 150 = 410ms
而在大多数场景下,这 4 个调用彼此独立,完全可以并发发起。理想情况下,总耗时接近最长的那个,也就是 150ms + 少量调度开销。
这就是 CompletableFuture 最典型的价值:把彼此独立的任务并发编排,再把结果安全地汇总起来。
但真实世界没这么理想。问题通常出在:
- 某个接口偶发超时
- 某个任务失败导致整体失败
- 使用默认线程池,结果被别的异步任务“污染”
- 业务线程里
join()用太早,把并发又写回串行 - 下游太慢,线程池堆积,触发雪崩
所以,学会 CompletableFuture 的关键,不是“会用”,而是知道什么时候并发、怎么收口、如何兜底、以及如何让线程池稳定运行。
前置知识与环境准备
适用版本
本文示例建议使用:
- JDK 11+
- 更推荐 JDK 17+
因为超时控制相关 API(如 orTimeout、completeOnTimeout)在新版本里更顺手。
你需要知道的基础
如果你已经了解下面几个概念,读起来会很顺:
- Java 线程池基础:
ThreadPoolExecutor - Lambda 表达式
- 异常传播机制
- 基本的微服务调用模型
核心原理
1. CompletableFuture 到底解决什么问题
Future 只能“拿结果”,但不擅长“结果到了之后继续处理”。
CompletableFuture 则更像是一个可组合的异步任务容器:
- 可以异步执行任务
- 可以串联后续处理
- 可以合并多个任务
- 可以统一处理异常
- 可以设置超时与默认值
简单理解:
supplyAsync():异步生产一个结果thenApply():拿到结果后继续加工thenCompose():把异步嵌套展开thenCombine():合并两个任务结果allOf():等待一批任务完成anyOf():谁先完成就用谁exceptionally()/handle():兜底异常
2. 并发编排的常见模式
模式一:并发聚合
多个独立任务一起跑,最后统一汇总。
flowchart LR
A[请求到达] --> B[查商品]
A --> C[查库存]
A --> D[查价格]
A --> E[查营销]
B --> F[聚合结果]
C --> F
D --> F
E --> F
模式二:依赖链路编排
前一个任务的结果,是后一个任务的输入。
例如:
- 先查用户信息
- 再根据用户等级查专属优惠
这时更适合 thenCompose()。
模式三:竞速返回
两个来源都能提供相同结果,谁快用谁。
例如:
- 优先查缓存副本
- 同时查主数据源
- 谁先返回先用谁
这时可用 anyOf()。
3. thenApply、thenCompose、thenCombine 的区别
这个点很容易混,我建议你这么记:
thenApply
同步映射,输入一个值,输出一个新值。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "hello")
.thenApply(s -> s + " world");
thenCompose
如果后续处理本身也返回一个 CompletableFuture,就用它“拍平”。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "user-1")
.thenCompose(userId -> CompletableFuture.supplyAsync(() -> "profile-of-" + userId));
如果这里误用 thenApply,你会得到 CompletableFuture<CompletableFuture<T>>,嵌套起来很难受。
thenCombine
两个独立任务都完成后,把结果合并。
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> c = a.thenCombine(b, (x, y) -> x + y);
4. 异常与超时为什么这么重要
在并发编排里,真正复杂的不是“成功路径”,而是失败路径。
典型场景:
- 库存服务超时了,但商品详情页仍然想展示其他信息
- 营销服务挂了,不应该导致整个详情页 500
- 价格服务失败必须快速失败,不能静默吞掉
所以你需要先定义好:
- 哪些任务失败可以降级?
- 哪些任务失败必须整体失败?
- 超时时间是统一的还是分任务的?
- 是返回默认值,还是中断整个聚合?
这会直接影响你使用 exceptionally()、handle()、orTimeout()、completeOnTimeout() 的方式。
实战代码(可运行)
下面我们写一个可运行的小示例:模拟商品详情聚合服务。
目标:
- 并发查询商品、库存、价格、营销
- 为慢任务设置超时
- 对可降级服务返回默认值
- 使用自定义线程池,而不是默认公共线程池
- 最终输出聚合结果与总耗时
示例代码
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CompletableFutureOrchestrationDemo {
public static void main(String[] args) {
ThreadPoolExecutor ioExecutor = new ThreadPoolExecutor(
8,
16,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new NamedThreadFactory("cf-io"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
try {
ProductDetailService service = new ProductDetailService(ioExecutor);
long start = System.currentTimeMillis();
ProductDetail detail = service.queryProductDetail(1001L);
long cost = System.currentTimeMillis() - start;
System.out.println("聚合结果:");
System.out.println(detail);
System.out.println("总耗时: " + cost + " ms");
} finally {
ioExecutor.shutdown();
}
}
static class ProductDetailService {
private final Executor executor;
ProductDetailService(Executor executor) {
this.executor = executor;
}
public ProductDetail queryProductDetail(Long productId) {
CompletableFuture<ProductInfo> productFuture =
CompletableFuture.supplyAsync(() -> getProductInfo(productId), executor);
CompletableFuture<StockInfo> stockFuture =
CompletableFuture.supplyAsync(() -> getStockInfo(productId), executor)
.completeOnTimeout(StockInfo.defaultStock(productId), 200, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("库存查询失败,降级:" + ex.getMessage());
return StockInfo.defaultStock(productId);
});
CompletableFuture<PriceInfo> priceFuture =
CompletableFuture.supplyAsync(() -> getPriceInfo(productId), executor)
.orTimeout(300, TimeUnit.MILLISECONDS);
CompletableFuture<PromotionInfo> promotionFuture =
CompletableFuture.supplyAsync(() -> getPromotionInfo(productId), executor)
.completeOnTimeout(PromotionInfo.noPromotion(productId), 180, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("营销查询失败,降级:" + ex.getMessage());
return PromotionInfo.noPromotion(productId);
});
CompletableFuture<Void> all = CompletableFuture.allOf(
productFuture, stockFuture, priceFuture, promotionFuture
);
try {
all.join();
} catch (CompletionException e) {
throw new RuntimeException("聚合查询失败: " + rootMessage(e), e);
}
return new ProductDetail(
productFuture.join(),
stockFuture.join(),
priceFuture.join(),
promotionFuture.join(),
LocalDateTime.now()
);
}
private ProductInfo getProductInfo(Long productId) {
sleep(80);
log("商品信息查询完成");
return new ProductInfo(productId, "机械键盘", "一把很能打的键盘");
}
private StockInfo getStockInfo(Long productId) {
sleep(120);
log("库存信息查询完成");
return new StockInfo(productId, 58, false);
}
private PriceInfo getPriceInfo(Long productId) {
sleep(100);
log("价格信息查询完成");
return new PriceInfo(productId, new BigDecimal("399.00"), "CNY");
}
private PromotionInfo getPromotionInfo(Long productId) {
sleep(220); // 故意制造超时
log("营销信息查询完成");
return new PromotionInfo(productId, "618 限时立减 40 元");
}
}
static class ProductDetail {
private final ProductInfo productInfo;
private final StockInfo stockInfo;
private final PriceInfo priceInfo;
private final PromotionInfo promotionInfo;
private final LocalDateTime queryTime;
public ProductDetail(ProductInfo productInfo, StockInfo stockInfo, PriceInfo priceInfo,
PromotionInfo promotionInfo, LocalDateTime queryTime) {
this.productInfo = productInfo;
this.stockInfo = stockInfo;
this.priceInfo = priceInfo;
this.promotionInfo = promotionInfo;
this.queryTime = queryTime;
}
@Override
public String toString() {
return "ProductDetail{" +
"productInfo=" + productInfo +
", stockInfo=" + stockInfo +
", priceInfo=" + priceInfo +
", promotionInfo=" + promotionInfo +
", queryTime=" + queryTime +
'}';
}
}
static class ProductInfo {
private final Long productId;
private final String name;
private final String description;
public ProductInfo(Long productId, String name, String description) {
this.productId = productId;
this.name = name;
this.description = description;
}
@Override
public String toString() {
return "{productId=" + productId + ", name='" + name + "', description='" + description + "'}";
}
}
static class StockInfo {
private final Long productId;
private final int available;
private final boolean degraded;
public StockInfo(Long productId, int available, boolean degraded) {
this.productId = productId;
this.available = available;
this.degraded = degraded;
}
static StockInfo defaultStock(Long productId) {
return new StockInfo(productId, 0, true);
}
@Override
public String toString() {
return "{productId=" + productId + ", available=" + available + ", degraded=" + degraded + "}";
}
}
static class PriceInfo {
private final Long productId;
private final BigDecimal price;
private final String currency;
public PriceInfo(Long productId, BigDecimal price, String currency) {
this.productId = productId;
this.price = price;
this.currency = currency;
}
@Override
public String toString() {
return "{productId=" + productId + ", price=" + price + ", currency='" + currency + "'}";
}
}
static class PromotionInfo {
private final Long productId;
private final String promotionText;
public PromotionInfo(Long productId, String promotionText) {
this.productId = productId;
this.promotionText = promotionText;
}
static PromotionInfo noPromotion(Long productId) {
return new PromotionInfo(productId, "暂无活动");
}
@Override
public String toString() {
return "{productId=" + productId + ", promotionText='" + promotionText + "'}";
}
}
static class NamedThreadFactory implements ThreadFactory {
private final String prefix;
private final AtomicInteger counter = new AtomicInteger(1);
NamedThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(prefix + "-" + counter.getAndIncrement());
return t;
}
}
static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程被中断", e);
}
}
static void log(String msg) {
System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
}
static String rootMessage(Throwable e) {
Throwable cur = e;
while (cur.getCause() != null) {
cur = cur.getCause();
}
return cur.getMessage();
}
}
代码运行后你应该看到什么
这个例子里:
- 商品、库存、价格并发执行
- 营销接口故意 sleep 220ms
- 但我们给营销设置了
180ms超时,并用默认值降级 - 所以最终不会因为营销慢而拖垮整个聚合
你会发现总耗时接近:
- 最长有效任务的时间
- 而不是所有任务相加的总和
这就是异步聚合最直接的收益。
一步一步理解这个例子
1. 为什么不用默认线程池
很多示例直接这样写:
CompletableFuture.supplyAsync(() -> doSomething());
这会走 ForkJoinPool.commonPool()。它不是不能用,但在线上服务里,我一般不建议这么干,原因很现实:
- 它是全局共享资源
- 可能被项目里其他异步逻辑占满
- 线程名字不好识别,排查困难
- 对于 IO 阻塞型任务并不理想
如果你的异步任务是 RPC、HTTP、数据库、缓存访问这类 IO 型任务,更建议用自定义线程池。
2. completeOnTimeout 和 orTimeout 的区别
这是实战里经常要分清的点。
completeOnTimeout(defaultValue, timeout, unit)
超时后,返回一个默认值,任务整体仍算“正常完成”。
适合:
- 库存允许展示默认值
- 营销信息允许降级
- 推荐列表可以为空
orTimeout(timeout, unit)
超时后,抛出超时异常。
适合:
- 核心价格必须准确
- 支付、风控、库存锁定等关键链路
- 不能默默降级的关键结果
flowchart TD
A[异步任务开始] --> B{是否超时}
B -- 否 --> C[正常返回结果]
B -- 是 --> D{选择策略}
D -- completeOnTimeout --> E[返回默认值]
D -- orTimeout --> F[抛出超时异常]
这两个 API 的选择,本质上不是技术问题,而是业务语义问题。
3. 为什么 allOf() 后还要 join()
CompletableFuture.allOf() 返回的是 CompletableFuture<Void>,它只表示:
“这些任务都结束了”
但它不会自动帮你把每个任务的结果收集成列表或对象。
所以我们通常这么写:
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();
Result result = new Result(f1.join(), f2.join(), f3.join());
注意:这里后面的 join() 之所以安全,是因为 all.join() 已经保证所有任务完成了,不会再阻塞太久。
进阶:依赖型编排示例
并发聚合之外,还有一种很常见的情况:后续请求依赖前一个结果。
比如:
- 先查用户信息
- 再按用户等级查优惠券
这种情况不要硬塞进 allOf(),而应该用 thenCompose()。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThenComposeDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> queryUserLevel(101L), executor)
.thenCompose(level ->
CompletableFuture.supplyAsync(() -> queryCoupon(level), executor)
);
System.out.println(result.join());
executor.shutdown();
}
static String queryUserLevel(Long userId) {
sleep(80);
return "VIP";
}
static String queryCoupon(String level) {
sleep(50);
return "用户等级 " + level + ",可领取 20 元优惠券";
}
static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
线程池调优:别把并发写成“自我攻击”
CompletableFuture 本身不会凭空提升性能,它只是让你更容易把任务并发起来。
如果线程池不合适,结果往往是:
- 任务排队
- 延迟飙升
- 上下游互相拖累
- GC 压力变大
- 服务吞吐反而下降
一个实用的判断原则
先判断任务类型:
1. CPU 密集型
比如:
- JSON 大对象计算
- 图片压缩
- 加密/解密
- 复杂规则运算
线程数通常接近:
CPU 核数 或 CPU 核数 + 1
2. IO 密集型
比如:
- HTTP/RPC 调用
- 数据库查询
- Redis 操作
- 文件读取
线程数通常可以高于 CPU 核数,因为大量时间在线程阻塞等待 IO。
经验上常见起点:
核心线程数 = CPU核数 * 2 ~ 4
但这只是起点,最终还是要看压测和监控。
线程池关键参数怎么理解
以这个构造器为例:
new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler
)
corePoolSize
常驻线程数。
并不是越大越好,太大意味着空闲线程也占资源。
maximumPoolSize
高峰期可扩容到的线程上限。
如果设置过大,遇到下游雪崩时可能把自己也拖死。
workQueue
等待队列。
队列太大容易“看起来没报错,其实请求已经排队很久”。
我自己更倾向于:
- 小到中等容量队列
- 配合清晰的拒绝策略
- 尽早暴露系统压力
RejectedExecutionHandler
拒绝策略。常见有:
AbortPolicy:直接抛异常CallerRunsPolicy:调用方线程自己执行DiscardPolicy:悄悄丢弃DiscardOldestPolicy:丢弃最老任务
线上服务里,DiscardPolicy 我基本不推荐,因为太容易“无声失败”。
一个实际可参考的线程池配置思路
对于详情聚合这类 IO 型场景,可以从下面思路起步:
int core = 8;
int max = 16;
int queueSize = 200;
然后观察:
- 活跃线程数
- 队列积压长度
- 拒绝次数
- 接口 TP99
- 下游超时比例
如果你发现:
- 活跃线程常年接近最大线程数
- 队列持续堆积
- TP99 明显变差
那通常说明:
- 线程池太小
- 下游太慢
- 超时设置过长
- 没有限流/隔离
- 并发量已经超过系统可承受范围
常见坑与排查
这部分我尽量讲得“接地气”一点,因为这些坑真的很常见。
坑一:在循环里立刻 join(),把并发写成串行
错误写法:
for (Long id : ids) {
String value = CompletableFuture.supplyAsync(() -> query(id), executor).join();
System.out.println(value);
}
这其实每次都在“创建任务后立刻等待”,整体还是串行。
正确思路:
List<CompletableFuture<String>> futures = ids.stream()
.map(id -> CompletableFuture.supplyAsync(() -> query(id), executor))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<String> future : futures) {
System.out.println(future.join());
}
坑二:误用默认线程池,导致任务互相干扰
现象:
- 某些接口偶发慢
- 线程堆栈里全是
ForkJoinPool.commonPool-worker-* - 一个模块异步任务暴涨,另一个模块也跟着抖
排查方式:
- 看线程名
- 看异步任务是否显式传了 executor
- 看 commonPool 使用情况
- 用线程 dump 看阻塞点
坑三:异常被包装,看不懂真正原因
join() 抛的是 CompletionException,
get() 抛的是受检异常 ExecutionException。
真正的根因通常藏在 cause 里。
排查建议:
try {
future.join();
} catch (CompletionException e) {
Throwable root = e;
while (root.getCause() != null) {
root = root.getCause();
}
System.err.println("根因: " + root.getMessage());
}
我自己线上排查时,第一步就是先把“最里层 cause”打出来,否则日志全是包装异常,很难看。
坑四:超时只超了 Future,没有取消底层任务
这是一个很容易误判的问题。
orTimeout() 或 completeOnTimeout() 让 CompletableFuture 层面结束了,
但如果底层任务是一个已经发出的阻塞 IO,它未必真的停下来。
也就是说:
- 调用方已经返回了
- 但底层线程可能还在执行
- 如果慢请求很多,线程池还是会被拖住
这时要配合:
- HTTP 客户端超时
- RPC 框架超时
- 数据库查询超时
- 限流/熔断/隔离
不要把 CompletableFuture 的超时当成“万能取消器”。
坑五:线程池队列过大,问题被延后暴露
很多人喜欢把队列设得非常大,比如几千几万。
短期看起来“稳”,因为不容易触发拒绝;但实际上:
- 请求在队列里排很久
- 用户已经超时了
- 服务还在慢慢处理旧任务
- 整体延迟越来越差
这个坑我见过很多次。队列太大不是稳定,是把故障藏起来。
常见排查路径
如果线上出现 CompletableFuture 相关性能问题,我一般会按这个顺序看:
sequenceDiagram
participant U as 用户请求
participant S as 聚合服务
participant TP as 线程池
participant D as 下游服务
U->>S: 发起请求
S->>TP: 提交多个异步任务
TP->>D: 并发调用下游
D-->>TP: 部分慢/超时/失败
TP-->>S: Future完成或异常
S-->>U: 返回结果或降级结果
排查清单
-
先看接口维度
- TP50 / TP99 是否抖动
- 错误率是否升高
- 是否集中在某个下游
-
看线程池指标
- activeCount
- queueSize
- taskCount
- completedTaskCount
- rejectCount
-
看超时配置是否一致
- Future 超时
- HTTP/RPC 超时
- 数据库超时
- 网关超时
-
看异常日志
- 是超时?
- 拒绝执行?
- 线程中断?
- 下游业务异常?
-
看是否存在阻塞
join()用得是否过早- 是否在异步线程里又做了阻塞等待
- 是否线程池嵌套调用导致“自己等自己”
安全/性能最佳实践
这部分给你一些可以直接落地的建议。
1. 为不同类型任务做线程池隔离
不要把所有异步任务都塞进一个池子里。
建议至少分开:
- IO 聚合线程池
- CPU 计算线程池
- 大任务/低优先级任务线程池
这样即使某一类任务暴涨,也不至于把所有业务拖垮。
2. 给每类下游设置明确的超时与降级策略
不要只写一个统一超时值。
更好的方式是按业务重要性区分:
- 核心链路:短超时 + 快失败
- 非核心链路:短超时 + 默认值降级
- 可选信息:超时直接忽略
3. 避免在异步链路中混入大量阻塞操作
如果你用了 CompletableFuture,但中间每一步都在:
- 阻塞 IO
- 长时间 sleep
- 同步远程调用后立刻等待
那就会出现“看起来异步,其实只是换了个地方阻塞”。
4. 日志里打印线程名、任务名、耗时
异步问题最怕“看不见”。
建议日志至少包含:
- traceId
- 任务名称
- 线程名
- 开始/结束时间
- 是否超时/异常
- 降级结果来源
这样你在线上排查时,基本能少走很多弯路。
5. 慎用 exceptionally() 吞异常
exceptionally() 很方便,但也很容易把关键错误“吃掉”。
例如:
future.exceptionally(ex -> defaultValue);
如果没有日志,这个失败对调用方来说就像“从没发生过”。
更稳妥的写法:
future.exceptionally(ex -> {
log("任务失败: " + ex.getMessage());
return defaultValue;
});
对于核心任务,甚至不该降级,而应该显式失败。
6. 监控比代码技巧更重要
如果线上没有这些指标,再漂亮的编排代码也很难长期稳定:
- 线程池活跃数
- 队列长度
- 拒绝次数
- 平均/分位耗时
- 各下游成功率、超时率
- 降级触发次数
我的经验是:
并发问题 30% 靠代码设计,70% 靠监控和压测发现。
逐步验证清单
如果你想把本文内容真正落地,我建议按下面顺序自己练一遍。
第一步:先实现最简单的并发聚合
- 2~3 个独立任务
- 使用
supplyAsync() - 使用
allOf()收口
第二步:给非核心任务加超时降级
- 使用
completeOnTimeout() - 用默认值兜底
第三步:给核心任务加快速失败
- 使用
orTimeout() - 让异常冒出来
第四步:切换到自定义线程池
- 不再使用默认 commonPool
- 观察线程名和任务分布
第五步:做压测
关注:
- 总耗时
- TP99
- 线程池活跃数
- 队列长度
- 超时比例
第六步:模拟故障
可以人为制造:
- 某个下游变慢
- 某个任务抛异常
- 队列打满
- 超时值过小/过大
你会比单纯看 API 文档收获大得多。
总结
CompletableFuture 最有价值的地方,不只是“异步执行”,而是它能把多个任务按业务语义编排起来:
- 独立任务并发聚合,用
allOf() - 依赖任务串联,用
thenCompose() - 多结果合并,用
thenCombine() - 非核心任务超时降级,用
completeOnTimeout() - 核心任务快速失败,用
orTimeout()
但要记住,真正决定线上效果的,往往不是 API 本身,而是这几件事:
- 是否使用了合适的自定义线程池
- 是否给不同任务定义了明确的超时与降级策略
- 是否避免了“伪异步”和过早阻塞
- 是否有足够的监控指标来观察线程池与下游状态
如果你现在正在做接口聚合、批量并发查询、异步任务编排,我建议你先从一个小场景开始改造,不要一口气把所有链路都异步化。先跑通、先压测、先观测,再逐步推广,成功率会高很多。
最后给一个很实用的判断标准:
如果一个任务失败后,你说不清“应该降级、重试、还是整体失败”,那先别急着写并发编排,先把业务语义定清楚。
因为在 CompletableFuture 的世界里,技术动作不难,难的是你是否真的知道自己想要什么结果。