跳转到内容
123xiao | 无名键客

《Java 中基于 CompletableFuture 的异步编排实战:并行调用、超时控制与异常处理最佳实践》

字数: 0 阅读时长: 1 分钟

Java 中基于 CompletableFuture 的异步编排实战:并行调用、超时控制与异常处理最佳实践

在 Java 后端开发里,CompletableFuture 基本算是“异步编排”的入门必修课。很多系统一开始是串行调用:查用户、查订单、查库存、查营销信息,一个接口顺着一个接口走,逻辑清晰,但响应时间会被最长链路拖垮。

而当业务逐渐复杂后,我们就会遇到几个非常现实的问题:

  • 多个远程调用其实互不依赖,能不能并行?
  • 某个下游接口很慢,能不能超时后快速返回默认值?
  • 并行任务中一个失败了,是直接整体失败,还是部分降级?
  • 线程池到底该怎么配,才能不把服务拖死?

这篇文章我会从“做一个聚合接口”这个常见场景出发,带你把 CompletableFuture 用顺:并行调用、超时控制、异常处理、线程池配置,以及线上最容易踩的坑。


背景与问题

假设我们要实现一个“商品详情聚合接口”,一个请求需要同时获取:

  1. 商品基础信息
  2. 价格信息
  3. 库存信息
  4. 推荐信息

如果串行执行,整体耗时大概是各个调用时间之和:

总耗时 = 商品信息 + 价格 + 库存 + 推荐

假设每个调用平均耗时如下:

  • 商品信息:80ms
  • 价格:120ms
  • 库存:100ms
  • 推荐:150ms

串行大约要 450ms,但如果并行,理论上可以接近最长的那个调用,也就是 150ms 左右,再加上一点线程调度和组装开销。

这就是异步编排最直接的价值:把等待时间重叠起来

不过,真正落地时不能只会 allOf(),因为业务还有这些要求:

  • 推荐失败了不影响主流程
  • 库存超过 200ms 没结果就走默认值
  • 商品基础信息失败必须整体失败
  • 线程不能无限创建
  • 日志里要看得出是哪个子任务超时或异常

前置知识与环境准备

本文示例基于:

  • JDK 9+(因为会用到 orTimeout / completeOnTimeout
  • 普通 Java 项目即可运行
  • 不依赖 Spring,也方便你直接复制验证

如果你在 JDK 8 环境中,后面我也会说明兼容思路。


核心原理

先别急着写代码,先把几个核心 API 的职责理清楚。

1. supplyAsync:异步启动任务

用于有返回值的异步任务:

CompletableFuture.supplyAsync(() -> queryPrice(), executor);

如果是无返回值任务,用 runAsync

2. thenApply / thenCompose / thenCombine

这是最容易混的地方。

  • thenApply:把上一个结果做一次同步转换
  • thenCompose:把“异步套异步”拍平
  • thenCombine:合并两个独立 Future 的结果

可以这样理解:

  • 有依赖链:优先考虑 thenCompose
  • 无依赖并行后合并:优先考虑 thenCombineallOf

3. allOf:等待多个任务完成

CompletableFuture.allOf(f1, f2, f3)

注意:allOf 返回的是 CompletableFuture<Void>,它只表示“都完成了”,不会自动帮你收集结果,结果还得自己从各个 Future 里取。

4. 超时控制:orTimeoutcompleteOnTimeout

  • orTimeout(timeout, unit):超时则抛异常
  • completeOnTimeout(value, timeout, unit):超时则返回默认值

这两个的语义差别非常关键:

  • 核心链路必须成功:用 orTimeout
  • 可降级链路允许兜底:用 completeOnTimeout

5. 异常处理:exceptionallyhandlewhenComplete

它们看起来很像,但职责不同:

  • exceptionally:异常时给默认值
  • handle:无论成功失败都能处理,并返回新结果
  • whenComplete:更像“旁路通知”,适合打日志,不改变结果

我的经验是:

  • 业务降级:优先用 exceptionallyhandle
  • 日志埋点:优先用 whenComplete

一张图看懂异步编排流程

flowchart LR
    A[请求进入] --> B[并行发起商品/价格/库存/推荐]
    B --> C1[商品信息]
    B --> C2[价格信息]
    B --> C3[库存信息]
    B --> C4[推荐信息]

    C1 --> D[等待聚合]
    C2 --> D
    C3 --> D
    C4 --> D

    D --> E{关键任务是否成功}
    E -- 是 --> F[组装详情DTO]
    E -- 否 --> G[返回异常或降级结果]

实战代码(可运行)

下面做一个完整可运行示例。我们模拟四个下游服务,并实现一个商品详情聚合逻辑。

1. 完整示例代码

import java.util.concurrent.*;
import java.util.function.Supplier;

public class CompletableFutureOrchestrationDemo {

    private static final ExecutorService IO_POOL = new ThreadPoolExecutor(
            8,
            16,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            r -> {
                Thread t = new Thread(r);
                t.setName("io-pool-" + t.getId());
                return t;
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public static void main(String[] args) {
        try {
            ProductDetail detail = getProductDetail(1001L);
            System.out.println("聚合结果:");
            System.out.println(detail);
        } finally {
            IO_POOL.shutdown();
        }
    }

    public static ProductDetail getProductDetail(Long productId) {
        CompletableFuture<ProductInfo> productFuture = CompletableFuture
                .supplyAsync(() -> query("商品信息", () -> ProductService.getProductInfo(productId)), IO_POOL)
                .orTimeout(300, TimeUnit.MILLISECONDS)
                .whenComplete((result, ex) -> log("商品信息完成", ex));

        CompletableFuture<PriceInfo> priceFuture = CompletableFuture
                .supplyAsync(() -> query("价格信息", () -> PriceService.getPrice(productId)), IO_POOL)
                .orTimeout(300, TimeUnit.MILLISECONDS)
                .whenComplete((result, ex) -> log("价格信息完成", ex));

        CompletableFuture<StockInfo> stockFuture = CompletableFuture
                .supplyAsync(() -> query("库存信息", () -> StockService.getStock(productId)), IO_POOL)
                .completeOnTimeout(new StockInfo(productId, 0, true), 200, TimeUnit.MILLISECONDS)
                .exceptionally(ex -> {
                    log("库存信息异常,使用默认库存", ex);
                    return new StockInfo(productId, 0, true);
                });

        CompletableFuture<RecommendInfo> recommendFuture = CompletableFuture
                .supplyAsync(() -> query("推荐信息", () -> RecommendService.getRecommend(productId)), IO_POOL)
                .completeOnTimeout(new RecommendInfo(productId, "默认推荐"), 180, TimeUnit.MILLISECONDS)
                .exceptionally(ex -> {
                    log("推荐信息异常,使用默认推荐", ex);
                    return new RecommendInfo(productId, "默认推荐");
                });

        CompletableFuture<Void> all = CompletableFuture.allOf(
                productFuture, priceFuture, stockFuture, recommendFuture
        );

        try {
            all.join();
        } catch (CompletionException ex) {
            throw new RuntimeException("商品详情聚合失败: " + rootMessage(ex));
        }

        ProductInfo product = productFuture.join();
        PriceInfo price = priceFuture.join();
        StockInfo stock = stockFuture.join();
        RecommendInfo recommend = recommendFuture.join();

        return new ProductDetail(product, price, stock, recommend);
    }

    private static <T> T query(String name, Supplier<T> supplier) {
        long start = System.currentTimeMillis();
        try {
            T result = supplier.get();
            long cost = System.currentTimeMillis() - start;
            System.out.println(name + " 调用成功,耗时 " + cost + "ms");
            return result;
        } catch (Exception e) {
            long cost = System.currentTimeMillis() - start;
            System.out.println(name + " 调用失败,耗时 " + cost + "ms, error=" + e.getMessage());
            throw e;
        }
    }

    private static void log(String message, Throwable ex) {
        if (ex == null) {
            System.out.println("[INFO] " + message);
        } else {
            System.out.println("[ERROR] " + message + " - " + rootMessage(ex));
        }
    }

    private static String rootMessage(Throwable ex) {
        Throwable current = ex;
        while (current.getCause() != null) {
            current = current.getCause();
        }
        return current.getClass().getSimpleName() + ": " + current.getMessage();
    }

    static class ProductService {
        static ProductInfo getProductInfo(Long productId) {
            sleep(80);
            return new ProductInfo(productId, "机械键盘");
        }
    }

    static class PriceService {
        static PriceInfo getPrice(Long productId) {
            sleep(120);
            return new PriceInfo(productId, 399.00);
        }
    }

    static class StockService {
        static StockInfo getStock(Long productId) {
            sleep(250);
            return new StockInfo(productId, 20, false);
        }
    }

    static class RecommendService {
        static RecommendInfo getRecommend(Long productId) {
            sleep(100);
            if (productId % 2 == 1) {
                throw new RuntimeException("推荐服务不可用");
            }
            return new RecommendInfo(productId, "猜你喜欢");
        }
    }

    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("线程中断");
        }
    }

    static class ProductInfo {
        Long productId;
        String name;

        ProductInfo(Long productId, String name) {
            this.productId = productId;
            this.name = name;
        }

        @Override
        public String toString() {
            return "ProductInfo{productId=" + productId + ", name='" + name + "'}";
        }
    }

    static class PriceInfo {
        Long productId;
        double price;

        PriceInfo(Long productId, double price) {
            this.productId = productId;
            this.price = price;
        }

        @Override
        public String toString() {
            return "PriceInfo{productId=" + productId + ", price=" + price + "}";
        }
    }

    static class StockInfo {
        Long productId;
        int stock;
        boolean fromFallback;

        StockInfo(Long productId, int stock, boolean fromFallback) {
            this.productId = productId;
            this.stock = stock;
            this.fromFallback = fromFallback;
        }

        @Override
        public String toString() {
            return "StockInfo{productId=" + productId + ", stock=" + stock + ", fromFallback=" + fromFallback + "}";
        }
    }

    static class RecommendInfo {
        Long productId;
        String content;

        RecommendInfo(Long productId, String content) {
            this.productId = productId;
            this.content = content;
        }

        @Override
        public String toString() {
            return "RecommendInfo{productId=" + productId + ", content='" + content + "'}";
        }
    }

    static class ProductDetail {
        ProductInfo productInfo;
        PriceInfo priceInfo;
        StockInfo stockInfo;
        RecommendInfo recommendInfo;

        ProductDetail(ProductInfo productInfo, PriceInfo priceInfo, StockInfo stockInfo, RecommendInfo recommendInfo) {
            this.productInfo = productInfo;
            this.priceInfo = priceInfo;
            this.stockInfo = stockInfo;
            this.recommendInfo = recommendInfo;
        }

        @Override
        public String toString() {
            return "ProductDetail{" +
                    "productInfo=" + productInfo +
                    ", priceInfo=" + priceInfo +
                    ", stockInfo=" + stockInfo +
                    ", recommendInfo=" + recommendInfo +
                    '}';
        }
    }
}

代码拆解:这段编排为什么这么写

关键点 1:核心任务与可降级任务分开处理

在示例里:

  • 商品信息、价格信息:主链路,必须拿到,所以用 orTimeout
  • 库存、推荐信息:可降级,所以用 completeOnTimeout

这其实是异步编排最重要的思路之一:不要把所有任务都当成同样重要

如果你对所有下游都一视同仁,要么过于脆弱,要么过度降级。

关键点 2:异常兜底不要放到最后统一处理

很多人会写成这样:

CompletableFuture.allOf(f1, f2, f3).exceptionally(...)

这通常不够细,因为你已经分不清是哪个任务出了问题,也不方便对不同子任务做不同降级。

更合理的做法是:每个子任务就地定义自己的超时和异常策略

关键点 3:最终组装阶段用 join()

为什么不用 get()

  • get() 会抛受检异常,代码会更啰嗦
  • join() 抛的是 CompletionException,在聚合场景里更顺手

但要注意,join() 抛出的异常常常包了好几层,所以示例里专门写了 rootMessage() 来提取根因。


再看一张时序图:并行调用与超时降级

sequenceDiagram
    participant Client as 调用方
    participant App as 聚合服务
    participant P as 商品服务
    participant R as 价格服务
    participant S as 库存服务
    participant Rec as 推荐服务

    Client->>App: 请求商品详情
    App->>P: 异步请求商品信息
    App->>R: 异步请求价格
    App->>S: 异步请求库存
    App->>Rec: 异步请求推荐

    P-->>App: 80ms 返回成功
    R-->>App: 120ms 返回成功
    Rec-->>App: 异常/超时
    Note over App,Rec: 返回默认推荐
    S-->>App: 超过 200ms
    Note over App,S: 使用默认库存

    App-->>Client: 返回聚合结果

逐步验证清单

如果你想自己跑一遍,建议按下面顺序验证:

场景 1:正常成功

RecommendService 里的异常去掉,库存睡眠改为 100ms,观察:

  • 所有任务成功
  • 总耗时接近最慢任务,而不是总和

场景 2:推荐失败但整体成功

保留推荐异常,观察:

  • 推荐任务异常
  • 最终接口仍返回默认推荐

场景 3:库存超时降级

将库存调用保持 250ms,观察:

  • 库存在 200ms 超时后返回默认值
  • 总体接口不会被库存拖慢

场景 4:主链路失败导致整体失败

ProductService.getProductInfo() 改成抛异常,观察:

  • all.join() 会失败
  • 整体聚合抛出异常,而不是静默返回残缺数据

常见坑与排查

这部分我建议你认真看。很多线上问题都不是 API 不会用,而是“看起来能跑,实际会炸”。

坑 1:默认线程池用着用着就卡了

如果你直接写:

CompletableFuture.supplyAsync(() -> query());

没有传线程池,默认会走 ForkJoinPool.commonPool()。这在简单 CPU 任务里还行,但在 I/O 密集型场景(远程调用、数据库、缓存)里经常不够用。

问题表现:

  • 接口 RT 飙高
  • 任务排队
  • 看起来像“异步了”,实际更慢

建议:I/O 任务自己配线程池,不要依赖 commonPool。


坑 2:线程池配太大,机器直接抖动

很多人知道不能用默认线程池,于是把线程池改成:

new ThreadPoolExecutor(200, 500, ...)

如果机器核数不高、下游响应又不稳定,这种配置很容易导致:

  • 上下文切换剧增
  • 内存上涨
  • 队列积压
  • 下游也被打爆

建议

  • I/O 池大小结合机器资源和下游容量评估
  • 一定设置有界队列
  • 一定配置拒绝策略
  • 监控活跃线程数、队列长度、任务拒绝次数

坑 3:allOf() 完成了,不代表结果没异常

这是非常经典的误区。

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();

如果其中某个 Future 异常了,all.join() 也会异常。但如果你在子任务里已经 exceptionally 兜底了,那么 allOf() 可能会成功完成。

所以排查时要先分清:

  • 是子任务已经被吞掉并返回默认值?
  • 还是异常一路冒泡到聚合层?

坑 4:异常被“吃掉”,日志里看不到根因

比如:

future.exceptionally(ex -> defaultValue);

这样虽然业务继续执行了,但如果你没有记录异常,线上就只剩“怎么老走默认值”这种玄学问题。

建议:任何降级都要带日志、指标或埋点。

至少记住:

  • 哪个任务降级了
  • 原始异常是什么
  • 是否超时触发
  • 降级比例是多少

坑 5:链式调用里混用同步/异步方法,线程行为超出预期

比如:

future.thenApply(...)
      .thenApply(...)
      .thenApply(...);

这些默认可能沿用上一个任务的执行线程。如果中间处理逻辑变重,可能把 I/O 线程占住。

如果某一步计算明显更重,可以显式使用:

thenApplyAsync(...)

并指定合适线程池。

但也别滥用 Async,否则线程切换过多也会损耗性能。


安全/性能最佳实践

这一部分讲“线上可落地”的建议。

1. 按任务类型拆线程池

不要把所有异步任务都丢进同一个池子。最常见的做法是:

  • I/O 调用池
  • CPU 计算池
  • 定时/调度池

这样某类任务堆积时,不至于把另一类也拖死。


2. 为每个下游设置独立超时

不要只在最外层接口设一个总超时。因为总超时只能告诉你“整体慢了”,不能阻止某个子调用无限拖延。

更好的方式是:

  • HTTP 客户端本身设置连接/读超时
  • CompletableFuture 层再补一层业务超时
  • 主链路和可降级链路区别对待

也就是说,超时要做双层控制


3. 降级值必须“业务可接受”

默认值不是随便填个 null 就完了。

比如:

  • 库存默认值可以是 0,但要标记 fromFallback = true
  • 推荐默认值可以是“暂无推荐”
  • 价格一般不能随便降级,可能涉及交易风险

一句话:降级不是技术问题,是业务决策


4. 保留可观测性

我自己比较推荐在每个子任务上至少打这些信息:

  • 任务名
  • 耗时
  • 是否超时
  • 是否异常
  • 是否降级
  • traceId / requestId

没有这些信息,异步链路一旦出问题,排查体验会非常差。


5. 注意上下文传递

在真实项目中,你可能还会依赖这些上下文:

  • TraceId
  • 登录用户信息
  • MDC 日志上下文
  • ThreadLocal 变量

而异步任务切换线程后,这些上下文默认不会自动传递

所以如果你在线程池任务里发现:

  • 日志 traceId 丢了
  • 用户上下文为空
  • ThreadLocal 失效

不要怀疑人生,这就是线程切换带来的正常现象。需要额外做上下文包装或使用框架能力传递。


6. 对外部依赖增加限流与熔断

CompletableFuture 解决的是编排问题,不是治理全部问题。

如果下游本身不稳定,仅靠异步并不能拯救系统,反而可能把失败并发放大。在线上建议配合:

  • 限流
  • 熔断
  • 隔离
  • 重试(慎用,避免放大流量)

一张图总结异常处理策略

stateDiagram-v2
    [*] --> 发起异步任务
    发起异步任务 --> 成功完成
    发起异步任务 --> 超时
    发起异步任务 --> 异常失败

    超时 --> 核心链路失败
    超时 --> 可降级返回默认值

    异常失败 --> 核心链路失败
    异常失败 --> 可降级返回默认值

    成功完成 --> 聚合结果
    核心链路失败 --> 返回错误
    可降级返回默认值 --> 聚合结果

JDK 8 怎么办?

如果你还在 JDK 8,没有 orTimeoutcompleteOnTimeout,常见做法有两种:

  1. 配合 ScheduledExecutorService 手动构造超时 Future
  2. 使用成熟框架做超时与熔断治理

手动实现并不复杂,但样板代码会明显增多。所以如果条件允许,JDK 9+ 在 CompletableFuture 这一块体验会好很多。


适用边界:什么时候不该强行用 CompletableFuture

虽然它很好用,但也不是所有场景都应该上。

更适合用它的场景

  • 聚合多个独立下游接口
  • 有明确并行机会
  • 需要细粒度超时和降级
  • 逻辑上仍是“一次请求内的编排”

不太适合的场景

  • 特别复杂的异步工作流,步骤很多、状态很多
  • 涉及大量消息驱动、跨服务长事务
  • 流式处理和背压需求明显

这些情况下,可能要考虑:

  • 消息队列
  • Reactor / WebFlux
  • 工作流引擎
  • 更完整的任务调度系统

总结

如果你只记住三件事,我建议是这三条:

  1. 先区分任务重要性:核心任务失败就失败,可降级任务才兜底
  2. 每个子任务就地处理超时和异常:不要把所有问题堆到 allOf() 之后
  3. 一定使用自定义线程池和可观测性:否则异步只会把问题藏起来

CompletableFuture 真正的价值,不只是“把代码改成异步”,而是让你能显式表达并行、依赖、超时、失败和降级策略。这点在聚合接口、推荐接口、首页装配接口里特别实用。

如果你正在改造一个串行接口,我建议按下面顺序落地:

  • 第一步:识别可并行的子调用
  • 第二步:为每个子调用定义超时和降级策略
  • 第三步:接入独立线程池
  • 第四步:补齐日志、耗时、异常、降级指标
  • 第五步:压测验证线程池与下游承载能力

这样做下来,CompletableFuture 才不只是“会用”,而是真能在线上站住。


分享到:

上一篇
《Java开发踩坑实战:ThreadLocal在线程池中的内存泄漏与上下文串值排查指南》
下一篇
《Java 中基于 CompletableFuture 与线程池隔离的异步任务编排实战:性能优化、超时控制与异常治理》