跳转到内容
123xiao | 无名键客

《Java 并发编程实战:用 CompletableFuture 重构中台聚合接口的异步调用链》

字数: 0 阅读时长: 1 分钟

Java 并发编程实战:用 CompletableFuture 重构中台聚合接口的异步调用链

中台系统里最常见的一类接口,不是“自己算”,而是“到处取,再拼起来”。比如一个商品详情页聚合接口,往往要同时查商品基础信息、库存、价格、营销标签、店铺信息、用户个性化推荐。
如果这些调用串行执行,延迟会非常难看;如果到处手写线程池和 Future.get(),代码又会越来越难维护。

这篇文章我就从一个典型的中台聚合接口出发,带你用 CompletableFuture 把异步调用链重构一遍。重点不是 API 罗列,而是:

  • 什么时候并行,什么时候串联
  • 如何收敛超时、异常和线程池
  • 怎么写出“可运行、可排查、可上线”的代码

前置知识与环境准备

你需要了解

在开始前,建议你已经熟悉:

  • Java 8 及以上
  • 线程池基础:ThreadPoolExecutor
  • Future / Callable 的基本用法
  • REST/RPC 调用的超时概念

示例环境

本文代码基于:

  • JDK 11+
  • 单文件可运行示例
  • 不依赖 Spring,也能直接跑通核心逻辑

如果你在 Spring Boot 项目里使用,迁移也很直接。


背景与问题

我们先看一个典型场景:一个中台聚合接口 /product/detail,对外返回完整商品详情。

它内部需要调用多个下游服务:

  1. 商品服务:查基础信息
  2. 库存服务:查库存
  3. 价格服务:查价格
  4. 营销服务:查活动标签
  5. 推荐服务:查相关推荐

其中有些调用互相独立,有些调用依赖上一步结果。比如:

  • 商品基础信息、库存、价格、营销可以并行
  • 推荐服务可能依赖商品类目或品牌,得等商品基础信息返回后再查

如果我们写成串行代码,通常会像这样:

ProductInfo product = productService.getProduct(productId);
Stock stock = stockService.getStock(productId);
Price price = priceService.getPrice(productId);
Promotion promotion = promotionService.getPromotion(productId);
Recommendation recommendation = recommendationService.recommend(product.getCategoryId());

问题很明显:

  • 总耗时接近所有下游耗时之和
  • 任一调用卡顿,整个接口都被拖慢
  • 容错逻辑分散在各行代码里,不好统一治理
  • 后面想加监控、超时、降级,会越来越乱

典型耗时对比

假设下游平均耗时如下:

服务耗时
商品服务80ms
库存服务50ms
价格服务60ms
营销服务70ms
推荐服务90ms

串行总耗时大约:350ms
如果合理并行,理论上可以接近:80ms + 90ms = 170ms 左右

这也是聚合接口最值得优化的一类地方:不是单点算法慢,而是调用编排方式不合理


核心原理

CompletableFuture 可以把异步任务之间的关系表达得更清楚。你可以把它理解成:

  • supplyAsync:启动一个异步任务并返回结果
  • thenApply:把上一步结果做同步转换
  • thenCompose:基于上一步结果,再发起一个新的异步任务
  • thenCombine:合并两个异步任务结果
  • allOf:等待一组任务都完成
  • exceptionally / handle:处理异常和降级

1. 并行拆分:独立任务一起发

独立调用可以并行发起,不要互相阻塞。

flowchart LR
    A[请求进入聚合接口] --> B[商品服务]
    A --> C[库存服务]
    A --> D[价格服务]
    A --> E[营销服务]
    B --> F[推荐服务]
    B --> G[结果聚合]
    C --> G
    D --> G
    E --> G
    F --> G

2. 串联依赖:有依赖就用 thenCompose

如果 B 依赖 A 的结果,就不要在外部 join() 再继续,那会把异步链打断。
正确姿势是把依赖关系写进链路里。

CompletableFuture<ProductInfo> productFuture = CompletableFuture.supplyAsync(
    () -> productService.getProduct(productId), executor);

CompletableFuture<Recommendation> recommendationFuture = productFuture.thenCompose(
    product -> CompletableFuture.supplyAsync(
        () -> recommendationService.recommend(product.getCategoryId()), executor)
);

3. 聚合收口:allOf 负责等待,join 负责取结果

CompletableFuture.allOf(...) 只负责“等大家都完成”,不负责返回聚合对象。
最终结果仍然需要从各个 future 中 join() 取出。

4. 异常与超时:不能只追求快,还要可控

中台接口最怕两种情况:

  • 某个下游偶发超时,把主链路拖死
  • 异常直接抛出,导致整个聚合接口失败

所以异步编排不仅是“提速”,更是“把失败约束在边界内”。


一张图看懂重构前后

sequenceDiagram
    participant Client as 调用方
    participant Agg as 聚合服务
    participant P as 商品服务
    participant S as 库存服务
    participant R as 价格服务
    participant M as 营销服务
    participant Rec as 推荐服务

    Client->>Agg: 请求商品详情
    rect rgb(245,245,245)
    Note over Agg,P: 重构后:并发发起独立调用
    Agg->>P: 查商品
    Agg->>S: 查库存
    Agg->>R: 查价格
    Agg->>M: 查营销
    P-->>Agg: 商品信息
    Agg->>Rec: 基于类目查推荐
    S-->>Agg: 库存
    R-->>Agg: 价格
    M-->>Agg: 营销
    Rec-->>Agg: 推荐
    end
    Agg-->>Client: 聚合结果

实战代码(可运行)

下面我们写一个完整可运行的示例。
这个例子会模拟多个下游服务的调用延迟,并演示:

  • 并发拉取独立数据
  • 依赖式异步调用
  • 超时与降级
  • 统一聚合返回

你可以直接复制运行。

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;

public class CompletableFutureAggregationDemo {

    public static void main(String[] args) {
        ProductAggregationService aggregationService = new ProductAggregationService();

        long start = System.currentTimeMillis();
        ProductDetailDTO detail = aggregationService.getProductDetail(1001L);
        long cost = System.currentTimeMillis() - start;

        System.out.println("聚合结果:");
        System.out.println(detail);
        System.out.println("总耗时: " + cost + " ms");

        aggregationService.shutdown();
    }

    // ========== 聚合服务 ==========
    static class ProductAggregationService {
        private final ExecutorService executor = new ThreadPoolExecutor(
                8,
                16,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                new NamedThreadFactory("aggregate-pool"),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        private final ProductService productService = new ProductService();
        private final StockService stockService = new StockService();
        private final PriceService priceService = new PriceService();
        private final PromotionService promotionService = new PromotionService();
        private final RecommendationService recommendationService = new RecommendationService();

        public ProductDetailDTO getProductDetail(Long productId) {
            CompletableFuture<ProductInfo> productFuture = supplyAsyncWithTimeout(
                    () -> productService.getProduct(productId),
                    200,
                    TimeUnit.MILLISECONDS,
                    ProductInfo.defaultValue(productId),
                    "productService"
            );

            CompletableFuture<Stock> stockFuture = supplyAsyncWithTimeout(
                    () -> stockService.getStock(productId),
                    150,
                    TimeUnit.MILLISECONDS,
                    Stock.defaultValue(productId),
                    "stockService"
            );

            CompletableFuture<Price> priceFuture = supplyAsyncWithTimeout(
                    () -> priceService.getPrice(productId),
                    150,
                    TimeUnit.MILLISECONDS,
                    Price.defaultValue(productId),
                    "priceService"
            );

            CompletableFuture<Promotion> promotionFuture = supplyAsyncWithTimeout(
                    () -> promotionService.getPromotion(productId),
                    150,
                    TimeUnit.MILLISECONDS,
                    Promotion.defaultValue(productId),
                    "promotionService"
            );

            CompletableFuture<Recommendation> recommendationFuture = productFuture.thenCompose(product ->
                    supplyAsyncWithTimeout(
                            () -> recommendationService.recommend(product.getCategoryId()),
                            180,
                            TimeUnit.MILLISECONDS,
                            Recommendation.defaultValue(product.getCategoryId()),
                            "recommendationService"
                    )
            );

            CompletableFuture.allOf(
                    productFuture,
                    stockFuture,
                    priceFuture,
                    promotionFuture,
                    recommendationFuture
            ).join();

            return new ProductDetailDTO(
                    productFuture.join(),
                    stockFuture.join(),
                    priceFuture.join(),
                    promotionFuture.join(),
                    recommendationFuture.join(),
                    LocalDateTime.now()
            );
        }

        private <T> CompletableFuture<T> supplyAsyncWithTimeout(
                Supplier<T> supplier,
                long timeout,
                TimeUnit unit,
                T fallback,
                String serviceName
        ) {
            return CompletableFuture.supplyAsync(() -> {
                        log("调用开始: " + serviceName);
                        T result = supplier.get();
                        log("调用成功: " + serviceName);
                        return result;
                    }, executor)
                    .completeOnTimeout(fallback, timeout, unit)
                    .exceptionally(ex -> {
                        log("调用失败: " + serviceName + ", ex=" + ex.getMessage());
                        return fallback;
                    });
        }

        public void shutdown() {
            executor.shutdown();
        }

        private void log(String msg) {
            System.out.printf("[%s][%s] %s%n",
                    Thread.currentThread().getName(),
                    LocalDateTime.now(),
                    msg);
        }
    }

    // ========== 模拟下游服务 ==========
    static class ProductService {
        public ProductInfo getProduct(Long productId) {
            sleep(80);
            return new ProductInfo(productId, "机械键盘", 10L, "KEYBOARD");
        }
    }

    static class StockService {
        public Stock getStock(Long productId) {
            sleep(50);
            return new Stock(productId, 128, false);
        }
    }

    static class PriceService {
        public Price getPrice(Long productId) {
            sleep(60);
            return new Price(productId, new BigDecimal("399.00"), "CNY");
        }
    }

    static class PromotionService {
        public Promotion getPromotion(Long productId) {
            sleep(70);
            return new Promotion(productId, "满300减30");
        }
    }

    static class RecommendationService {
        public Recommendation recommend(String categoryId) {
            sleep(90);
            List<String> items = Arrays.asList("电竞鼠标", "桌垫", "键帽套装");
            return new Recommendation(categoryId, items);
        }
    }

    // ========== DTO / Model ==========
    static class ProductDetailDTO {
        private final ProductInfo productInfo;
        private final Stock stock;
        private final Price price;
        private final Promotion promotion;
        private final Recommendation recommendation;
        private final LocalDateTime assembledAt;

        public ProductDetailDTO(ProductInfo productInfo, Stock stock, Price price,
                                Promotion promotion, Recommendation recommendation,
                                LocalDateTime assembledAt) {
            this.productInfo = productInfo;
            this.stock = stock;
            this.price = price;
            this.promotion = promotion;
            this.recommendation = recommendation;
            this.assembledAt = assembledAt;
        }

        @Override
        public String toString() {
            return "ProductDetailDTO{" +
                    "productInfo=" + productInfo +
                    ", stock=" + stock +
                    ", price=" + price +
                    ", promotion=" + promotion +
                    ", recommendation=" + recommendation +
                    ", assembledAt=" + assembledAt +
                    '}';
        }
    }

    static class ProductInfo {
        private final Long productId;
        private final String name;
        private final Long shopId;
        private final String categoryId;

        public ProductInfo(Long productId, String name, Long shopId, String categoryId) {
            this.productId = productId;
            this.name = name;
            this.shopId = shopId;
            this.categoryId = categoryId;
        }

        public String getCategoryId() {
            return categoryId;
        }

        public static ProductInfo defaultValue(Long productId) {
            return new ProductInfo(productId, "未知商品", -1L, "DEFAULT");
        }

        @Override
        public String toString() {
            return "ProductInfo{" +
                    "productId=" + productId +
                    ", name='" + name + '\'' +
                    ", shopId=" + shopId +
                    ", categoryId='" + categoryId + '\'' +
                    '}';
        }
    }

    static class Stock {
        private final Long productId;
        private final int available;
        private final boolean degraded;

        public Stock(Long productId, int available, boolean degraded) {
            this.productId = productId;
            this.available = available;
            this.degraded = degraded;
        }

        public static Stock defaultValue(Long productId) {
            return new Stock(productId, 0, true);
        }

        @Override
        public String toString() {
            return "Stock{" +
                    "productId=" + productId +
                    ", available=" + available +
                    ", degraded=" + degraded +
                    '}';
        }
    }

    static class Price {
        private final Long productId;
        private final BigDecimal amount;
        private final String currency;

        public Price(Long productId, BigDecimal amount, String currency) {
            this.productId = productId;
            this.amount = amount;
            this.currency = currency;
        }

        public static Price defaultValue(Long productId) {
            return new Price(productId, BigDecimal.ZERO, "CNY");
        }

        @Override
        public String toString() {
            return "Price{" +
                    "productId=" + productId +
                    ", amount=" + amount +
                    ", currency='" + currency + '\'' +
                    '}';
        }
    }

    static class Promotion {
        private final Long productId;
        private final String label;

        public Promotion(Long productId, String label) {
            this.productId = productId;
            this.label = label;
        }

        public static Promotion defaultValue(Long productId) {
            return new Promotion(productId, "无活动");
        }

        @Override
        public String toString() {
            return "Promotion{" +
                    "productId=" + productId +
                    ", label='" + label + '\'' +
                    '}';
        }
    }

    static class Recommendation {
        private final String categoryId;
        private final List<String> items;

        public Recommendation(String categoryId, List<String> items) {
            this.categoryId = categoryId;
            this.items = items;
        }

        public static Recommendation defaultValue(String categoryId) {
            return new Recommendation(categoryId, List.of());
        }

        @Override
        public String toString() {
            return "Recommendation{" +
                    "categoryId='" + categoryId + '\'' +
                    ", items=" + items +
                    '}';
        }
    }

    // ========== 工具类 ==========
    static class NamedThreadFactory implements ThreadFactory {
        private final String prefix;
        private int counter = 1;

        public NamedThreadFactory(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public synchronized Thread newThread(Runnable r) {
            Thread t = new Thread(r, prefix + "-" + counter++);
            t.setDaemon(false);
            return t;
        }
    }

    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("线程被中断", e);
        }
    }
}

代码是怎么一步步工作的

上面这段代码里,有几个点很值得你注意。

1. 独立任务并发启动

CompletableFuture<Stock> stockFuture = supplyAsyncWithTimeout(...);
CompletableFuture<Price> priceFuture = supplyAsyncWithTimeout(...);
CompletableFuture<Promotion> promotionFuture = supplyAsyncWithTimeout(...);

这些服务彼此无依赖,所以应当第一时间并发发起。

2. 依赖任务用 thenCompose 串起来

CompletableFuture<Recommendation> recommendationFuture = productFuture.thenCompose(product ->
        supplyAsyncWithTimeout(() -> recommendationService.recommend(product.getCategoryId()), ...)
);

这里如果你写成:

ProductInfo product = productFuture.join();
Recommendation recommendation = recommendationService.recommend(product.getCategoryId());

虽然功能上能跑,但会提早阻塞主线程,异步编排的意义就打折了。

3. allOf 用于“收口”

CompletableFuture.allOf(...).join();

这一步代表:等所有依赖的结果都准备好,再统一组装 DTO。

4. completeOnTimeout 做软超时降级

.completeOnTimeout(fallback, timeout, unit)

这是聚合接口里很好用的能力:
不要让一个下游慢请求拖垮整个接口,而是超时后返回默认值。

我自己在项目里经常这么做,尤其是营销、推荐、画像这种“非核心但影响体验”的字段,非常适合超时降级。


再画一张:异步任务状态流转

stateDiagram-v2
    [*] --> Created
    Created --> Running: supplyAsync
    Running --> Success: 正常返回
    Running --> TimeoutFallback: completeOnTimeout
    Running --> ExceptionFallback: exceptionally
    Success --> Joined
    TimeoutFallback --> Joined
    ExceptionFallback --> Joined
    Joined --> [*]

常见坑与排查

CompletableFuture 很好用,但也特别容易写出“表面异步、实际堵死”的代码。下面这些坑,我基本都见过。

1. 默认线程池乱用,导致互相干扰

如果你直接写:

CompletableFuture.supplyAsync(() -> remoteCall());

默认会使用 ForkJoinPool.commonPool()
问题在于:

  • 这个线程池是全局共享的
  • 你项目里别的模块也可能在用
  • RPC/HTTP 这种阻塞 IO 任务并不适合直接扔进 commonPool

建议:聚合接口使用独立业务线程池。

排查思路

  • 看线程名是否是 ForkJoinPool.commonPool-worker-*
  • 线上出现偶发慢请求时,检查 commonPool 是否被其他任务打满

2. 在异步链中途 join(),把并发写成串行

错误示例:

CompletableFuture<ProductInfo> productFuture = ...;
ProductInfo product = productFuture.join(); // 过早阻塞
CompletableFuture<Recommendation> recFuture = CompletableFuture.supplyAsync(
    () -> recommendationService.recommend(product.getCategoryId()), executor
);

这会让主线程提前等待,后续编排不再自然流动。

正确做法thenCompose


3. allOf() 后以为能直接拿结果

很多人第一次用会写成:

CompletableFuture<Object> future = CompletableFuture.allOf(f1, f2);

allOf() 返回的是 CompletableFuture<Void>,不携带业务结果。
结果还是要从 f1.join()f2.join() 里分别取。


4. 异常被吞掉,日志也没打

如果你只写:

future.exceptionally(ex -> fallback);

但没有日志,后面排查时会非常痛苦。因为你只看到了“怎么总是降级”,不知道为什么降级。

建议

  • 记录服务名、请求参数、耗时、异常摘要
  • 如果有链路追踪,带上 traceId
  • 区分超时降级与业务异常降级

5. 线程池太小或队列太大

线程池配置不是“越大越好”。
中台聚合接口的下游通常是 IO 型调用,线程数要结合机器核数、下游 RT、接口 QPS 来评估。

一个常见误区是:

  • 核心线程很小,瞬时流量来时排队严重
  • 队列很大,任务看似没拒绝,实际上延迟越来越高

经验建议

  • IO 密集型线程池可适当大于 CPU 核数
  • 队列不要无限大
  • 一定要有拒绝策略和监控

6. 忘记处理中断

如果你在下游调用封装里吞掉 InterruptedException,任务取消和关闭过程会很别扭。
示例代码里我做了这件事:

Thread.currentThread().interrupt();

这是个小细节,但很重要。


常见排查清单

当聚合接口“偶发很慢”时,我一般按这个顺序查:

1. 看总耗时构成

先确认是:

  • 单个下游慢
  • 线程池排队慢
  • 异步依赖链写错,实际串行了

2. 打印每个 future 的开始/结束时间

至少记录:

  • 服务名
  • 开始时间
  • 结束时间
  • 是否超时
  • 是否异常
  • 使用线程名

3. 检查线程池指标

重点看:

  • activeCount
  • queue size
  • reject count
  • 最大线程数是否经常打满

4. 检查超时配置是否层层失控

很常见的情况是:

  • HTTP 客户端超时 3s
  • 聚合接口超时 500ms
  • completeOnTimeout 只是主线程返回了,但底层请求还在跑

这种场景会造成“主链路看似快了,系统整体资源却更紧张”。


安全/性能最佳实践

这一部分很关键。很多文章只讲 CompletableFuture 怎么写,却不讲怎么稳。真正上线时,稳定性比语法重要。

1. 区分核心字段与非核心字段

不是所有字段都值得等待到最后一毫秒。

比如商品详情接口里:

  • 核心字段:商品信息、价格、库存
  • 非核心字段:营销文案、推荐列表、扩展画像

建议策略:

  • 核心字段:失败可重试、超时阈值略高
  • 非核心字段:快速降级、宁可缺省也别拖主链路

2. 每个下游设置独立超时

不要只靠接口最外层超时。
应该给每个下游服务设置自己的 timeout,否则一个慢服务会持续占着线程资源。

3. 线程池隔离

不同类型的任务最好隔离:

  • 聚合计算线程池
  • 下游 HTTP/RPC 调用线程池
  • 大对象转换或序列化线程池

这样一个模块雪崩时,不至于拖死全部异步任务。

4. 谨慎使用 join()get()

  • join():抛出非受检异常,写法简洁
  • get():会抛出受检异常,适合需要显式处理时使用

我的经验是:
业务代码里常用 join(),但要确保异常已经在链路上被处理或包装过。

5. 补齐监控维度

至少监控这些指标:

  • 聚合接口总 RT / TP99
  • 每个下游服务 RT / 错误率 / 超时率
  • 降级命中率
  • 线程池活跃线程数、队列长度、拒绝次数

如果没有这些指标,出了问题基本就是“猜”。

6. 防止请求风暴下的级联放大

聚合接口天然会放大下游压力。
例如一次入口请求,要打 5 个下游;入口 QPS 1000,就可能放大成 5000 次下游调用。

建议结合:

  • 本地缓存
  • 批量接口替代多次单查
  • 热点数据预计算
  • 熔断、限流、隔离

7. 注意上下文传递

如果你依赖:

  • TraceId
  • 用户上下文
  • MDC 日志上下文
  • ThreadLocal 中的租户信息

那在异步线程里可能丢失。
这点在 Spring 项目里特别常见。

解决方式通常包括:

  • 显式参数传递
  • 包装 Executor
  • 使用支持上下文透传的工具

我自己踩过这个坑:日志里主线程有 traceId,异步线程全没了,定位问题非常费劲。


方案对比:为什么不用 Future 或手工线程编排?

方案一:串行调用

优点:

  • 最简单
  • 易读

缺点:

  • 延迟高
  • 无法发挥并发能力

方案二:Future + 手工 get()

优点:

  • 比串行快
  • 能实现基础并发

缺点:

  • 编排关系不清晰
  • 依赖链很丑
  • 异常处理分散

方案三:CompletableFuture

优点:

  • 同时表达并行、串联、聚合、异常处理
  • 更适合复杂聚合接口
  • 扩展性好

缺点:

  • 一旦写得太“链式魔法”,阅读门槛会上升
  • 对线程池、超时、上下文要求更高

我的建议是:
中台聚合接口只要超过 3 个下游、且存在依赖关系,CompletableFuture 通常就是很合适的选择。


逐步验证清单

如果你准备把现网接口从串行重构为异步,建议按下面步骤推进,不要一次改太大。

第一步:先梳理调用依赖图

列清楚:

  • 哪些服务独立
  • 哪些服务依赖前置结果
  • 哪些字段允许降级
  • 哪些字段必须强一致

第二步:先并行独立调用

先改最容易的部分,比如库存、价格、营销并行。

第三步:再改依赖链

把“查商品后再查推荐”这类逻辑改成 thenCompose

第四步:补超时与降级

不要等上线后再补。
异步一旦跑起来,没有超时和降级会更难收拾。

第五步:压测验证

重点观察:

  • 平均 RT、P95、P99
  • 错误率
  • 线程池饱和情况
  • 下游调用量是否被放大

第六步:灰度上线

建议按流量比例逐步放开,观察降级率和队列积压情况。


总结

CompletableFuture 重构中台聚合接口,本质上是在做三件事:

  1. 把独立调用并行化,降低总 RT
  2. 把依赖关系显式化,让调用链更清楚
  3. 把超时、异常、降级收敛起来,提升稳定性

如果你只记住几个最重要的实践,我建议是这几点:

  • 独立调用用 supplyAsync
  • 有依赖的链路用 thenCompose
  • 聚合收口用 allOf
  • 不要滥用默认线程池
  • 每个下游都要有超时和 fallback
  • 日志、监控、上下文传递必须提前考虑

最后给一个比较务实的边界判断:

  • 下游少、延迟低、关系简单:串行可能已经够用
  • 下游多、RT 可观、存在依赖链:值得上 CompletableFuture
  • 再往上,涉及限流、熔断、隔离、批处理编排:就该从“代码级异步”升级到“治理级异步”了

如果你正在维护一个“看起来不复杂,但总是莫名其妙变慢”的中台聚合接口,那它往往不是业务逻辑有多复杂,而是调用链没有被好好组织。
CompletableFuture,正好就是把这件事做清楚的一把好工具。


分享到:

上一篇
《自动化测试中的测试数据管理实战:从数据构造、隔离到回收的可复用方案》
下一篇
《从源码到部署:基于开源项目 MinIO 搭建高可用对象存储服务的实战指南-220》