跳转到内容
123xiao | 无名键客

《Java 中利用 CompletableFuture 优化并发编排的实战指南》

字数: 0 阅读时长: 1 分钟

Java 中利用 CompletableFuture 优化并发编排的实战指南

在 Java 里写并发,很多人一开始都走过同一条路:new Thread()、线程池、Future.get(),最后代码不是“能跑”,而是“勉强能跑”。一旦业务里出现多个远程调用并行、结果汇总、超时降级、异常兜底,传统写法就会迅速变得难维护。

CompletableFuture 的价值,恰恰就在这里:它不是单纯“开异步任务”的工具,而是一套并发编排能力。你可以把它理解成“用声明式方式描述异步依赖关系”。

这篇文章我会从实战角度带你走一遍:

  • 为什么需要 CompletableFuture
  • 它的核心模型是什么
  • 怎么写出可运行的并发编排代码
  • 常见坑怎么排查
  • 在线上环境里如何兼顾性能与稳定性

如果你已经会用线程池,但对 thenApplythenComposeallOf 总觉得“知道一点但不够顺手”,这篇会比较适合你。


背景与问题

先看一个常见业务场景:商品详情页聚合查询

一次请求需要并发拿到:

  1. 商品基础信息
  2. 库存信息
  3. 价格信息
  4. 营销信息

如果串行调用,大致是这样:

Product product = productService.getProduct(productId);
Stock stock = stockService.getStock(productId);
Price price = priceService.getPrice(productId);
Promotion promotion = promotionService.getPromotion(productId);

这类代码的问题很直接:

  • 总耗时等于多个远程调用之和
  • 某个接口慢,全链路一起慢
  • 异常处理分散,超时不好统一控制
  • 后续如果有“先查 A,再基于 A 查 B”的依赖,代码会越来越绕

有些同学会改成 Future + 线程池

Future<Product> f1 = executor.submit(() -> productService.getProduct(productId));
Future<Stock> f2 = executor.submit(() -> stockService.getStock(productId));
Future<Price> f3 = executor.submit(() -> priceService.getPrice(productId));

然后在后面挨个 get()

这比串行快一些,但仍然有几个典型问题:

  • get() 是阻塞的,写多了还是“同步思维”
  • 多任务依赖关系靠人工管理,易乱
  • 异常包装层级深,调试不直观
  • 超时、降级、链式转换写起来不顺手

CompletableFuture 的优势在于:
它把“任务之间怎么衔接”这件事,从控制流代码里抽出来,变成了一套 API。


前置知识与环境准备

适合的 Java 版本

建议至少使用:

  • Java 8CompletableFuture 基础能力齐全
  • Java 9+:可用 orTimeoutcompleteOnTimeout 等更方便的超时控制
  • Java 17+:长期支持版本,更推荐生产使用

示例环境

本文代码基于:

  • JDK 17
  • 普通 main 方法可运行
  • 不依赖 Spring,方便你本地直接试

学习这篇前最好已掌握

  • Java 线程池基础
  • lambda 表达式
  • 异常处理
  • 基本的 HTTP/RPC 调用模型概念

核心原理

1. CompletableFuture 不只是 Future

Future 更像是“异步结果容器”,你能查它是否完成、能阻塞等待结果。
CompletableFuture 则更进一步:

  • 能主动完成结果
  • 能注册完成后的回调
  • 能进行链式转换
  • 能组合多个异步任务
  • 能统一处理异常和超时

可以把它理解成一个“未来结果 + 编排动作”的组合体。


2. 三类最常用能力

一类:启动异步任务

  • runAsync():执行任务,无返回值
  • supplyAsync():执行任务,有返回值

二类:结果转换与依赖编排

  • thenApply():把上一步结果转换成另一个值
  • thenAccept():消费结果,无返回值
  • thenRun():不关心结果,只在完成后执行
  • thenCompose():把“异步套异步”拉平
  • thenCombine():组合两个独立异步结果

三类:汇总与异常控制

  • allOf():等待所有任务完成
  • anyOf():任意一个完成就继续
  • exceptionally():出现异常时给默认值
  • handle():无论成功失败都处理
  • whenComplete():适合做日志、埋点、清理

3. thenApply 和 thenCompose 的区别

这是很多人刚学时最容易混的地方。

thenApply

适合:同步转换

CompletableFuture<String> future =
    CompletableFuture.supplyAsync(() -> "java")
        .thenApply(String::toUpperCase);

这里 toUpperCase 只是普通转换,所以用 thenApply

thenCompose

适合:上一步结果出来后,还要发起新的异步任务

CompletableFuture<User> future =
    CompletableFuture.supplyAsync(() -> 1001L)
        .thenCompose(userService::getUserAsync);

如果你这里用 thenApply,得到的会是:

CompletableFuture<CompletableFuture<User>>

这就是典型的“异步嵌套异步”。
thenCompose 的作用就是把它拍平。


4. 并发编排的基本模型

可以把它简化成三种关系:

  1. 并行执行:多个任务互不依赖,同时发起
  2. 串行依赖:后一个任务依赖前一个结果
  3. 汇总聚合:多个任务结束后,组装成最终结果

下面这张图能帮助你快速建立心智模型。

flowchart LR
    A[收到商品详情请求] --> B[并发查商品信息]
    A --> C[并发查库存]
    A --> D[并发查价格]
    A --> E[并发查营销]
    B --> F[聚合结果]
    C --> F
    D --> F
    E --> F
    F --> G[返回详情页DTO]

5. 线程执行到底在哪个池里?

这是生产里非常关键的问题。

如果你用:

CompletableFuture.supplyAsync(() -> doSomething());

没有传入自定义线程池,它默认使用的是:

  • ForkJoinPool.commonPool()

问题在于:

  • 它是全局共享的
  • 适合 CPU 密集型任务
  • 如果你拿它跑大量 IO 阻塞任务,容易把池子拖慢
  • 和应用里其他地方共用时,行为不可控

我的建议很明确:

线上业务异步编排,尽量显式传入自定义线程池,不要长期依赖默认 commonPool。


一张图看懂常见 API 关系

classDiagram
    class CompletableFuture {
        +runAsync()
        +supplyAsync()
        +thenApply()
        +thenCompose()
        +thenCombine()
        +allOf()
        +anyOf()
        +exceptionally()
        +handle()
        +whenComplete()
    }

    class AsyncTask {
        <<start>>
    }

    class Transform {
        <<convert>>
    }

    class Combine {
        <<aggregate>>
    }

    class ErrorHandle {
        <<fallback>>
    }

    CompletableFuture --> AsyncTask
    CompletableFuture --> Transform
    CompletableFuture --> Combine
    CompletableFuture --> ErrorHandle

实战代码(可运行)

下面我们实现一个简化版的商品详情聚合服务
目标:

  • 商品、库存、价格并发查询
  • 营销信息依赖商品分类后再异步查询
  • 对价格接口设置超时降级
  • 对整个流程做统一日志与异常处理

完整示例

import java.math.BigDecimal;
import java.time.LocalTime;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Supplier;

public class CompletableFutureOrchestrationDemo {

    private static final ExecutorService IO_POOL = new ThreadPoolExecutor(
            8,
            16,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(200),
            r -> {
                Thread t = new Thread(r);
                t.setName("cf-io-" + t.threadId());
                return t;
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public static void main(String[] args) {
        ProductDetailService service = new ProductDetailService();

        try {
            ProductDetail detail = service.getProductDetail(1001L);
            System.out.println("\n===== 最终结果 =====");
            System.out.println(detail);
        } finally {
            IO_POOL.shutdown();
        }
    }

    static class ProductDetailService {

        private final ProductService productService = new ProductService();
        private final StockService stockService = new StockService();
        private final PriceService priceService = new PriceService();
        private final PromotionService promotionService = new PromotionService();

        public ProductDetail getProductDetail(Long productId) {
            long start = System.currentTimeMillis();

            CompletableFuture<Product> productFuture =
                    CompletableFuture.supplyAsync(logged("query product", () -> productService.getProduct(productId)), IO_POOL);

            CompletableFuture<Stock> stockFuture =
                    CompletableFuture.supplyAsync(logged("query stock", () -> stockService.getStock(productId)), IO_POOL)
                            .exceptionally(ex -> {
                                log("stock fallback: " + ex.getMessage());
                                return new Stock(productId, 0);
                            });

            CompletableFuture<Price> priceFuture =
                    CompletableFuture.supplyAsync(logged("query price", () -> priceService.getPrice(productId)), IO_POOL)
                            .completeOnTimeout(new Price(productId, new BigDecimal("0.00"), true), 800, TimeUnit.MILLISECONDS)
                            .exceptionally(ex -> {
                                log("price fallback: " + ex.getMessage());
                                return new Price(productId, new BigDecimal("0.00"), true);
                            });

            CompletableFuture<Promotion> promotionFuture =
                    productFuture.thenCompose(product ->
                            CompletableFuture.supplyAsync(
                                    logged("query promotion by category", () -> promotionService.getPromotion(product.category())),
                                    IO_POOL
                            )
                    ).exceptionally(ex -> {
                        log("promotion fallback: " + ex.getMessage());
                        return new Promotion("NONE", "无可用活动");
                    });

            CompletableFuture<ProductDetail> detailFuture =
                    CompletableFuture.allOf(productFuture, stockFuture, priceFuture, promotionFuture)
                            .thenApply(v -> new ProductDetail(
                                    productFuture.join(),
                                    stockFuture.join(),
                                    priceFuture.join(),
                                    promotionFuture.join()
                            ))
                            .whenComplete((result, ex) -> {
                                long cost = System.currentTimeMillis() - start;
                                if (ex == null) {
                                    log("assemble success, cost = " + cost + " ms");
                                } else {
                                    log("assemble failed, cost = " + cost + " ms, ex = " + ex.getMessage());
                                }
                            });

            return detailFuture.join();
        }
    }

    static class ProductService {
        public Product getProduct(Long productId) {
            sleep(300);
            return new Product(productId, "机械键盘", "keyboard");
        }
    }

    static class StockService {
        public Stock getStock(Long productId) {
            sleep(400);
            return new Stock(productId, 128);
        }
    }

    static class PriceService {
        public Price getPrice(Long productId) {
            sleep(1000); // 故意慢一点,触发超时降级
            return new Price(productId, new BigDecimal("399.00"), false);
        }
    }

    static class PromotionService {
        public Promotion getPromotion(String category) {
            sleep(350);
            if ("keyboard".equals(category)) {
                return new Promotion("PROMO-618", "满 299 减 40");
            }
            return new Promotion("NONE", "无可用活动");
        }
    }

    record Product(Long productId, String name, String category) {}
    record Stock(Long productId, int available) {}
    record Price(Long productId, BigDecimal amount, boolean fallback) {}
    record Promotion(String code, String description) {}
    record ProductDetail(Product product, Stock stock, Price price, Promotion promotion) {}

    static <T> Supplier<T> logged(String action, Supplier<T> supplier) {
        return () -> {
            long start = System.currentTimeMillis();
            log("start: " + action);
            try {
                T result = supplier.get();
                log("finish: " + action + ", cost = " + (System.currentTimeMillis() - start) + " ms");
                return result;
            } catch (Exception e) {
                log("error: " + action + ", cost = " + (System.currentTimeMillis() - start) + " ms");
                throw e;
            }
        };
    }

    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("thread interrupted", e);
        }
    }

    static void log(String msg) {
        System.out.printf("[%s][%s] %s%n", LocalTime.now(), Thread.currentThread().getName(), msg);
    }
}

这段代码里到底优化了什么?

我们一步步拆开看。

1. 商品、库存、价格并发查

CompletableFuture<Product> productFuture = ...
CompletableFuture<Stock> stockFuture = ...
CompletableFuture<Price> priceFuture = ...

这三个任务互不依赖,所以应当立刻并发启动

如果串行执行,理论耗时约为:

  • 300ms + 400ms + 1000ms = 1700ms,再加营销查询

而并发后,主耗时接近最慢的那个任务,也就是价格查询。


2. 营销查询依赖商品分类,用 thenCompose

productFuture.thenCompose(product ->
    CompletableFuture.supplyAsync(() -> promotionService.getPromotion(product.category()), IO_POOL)
)

为什么这里不是 thenApply

因为 getPromotion(...) 本身又是一个异步任务,我们需要的是:

  • 先拿到商品
  • 再异步发起营销查询
  • 返回一个“拉平后的最终 Future”

这就是 thenCompose 的典型场景。


3. 价格接口超时降级

.completeOnTimeout(new Price(productId, new BigDecimal("0.00"), true), 800, TimeUnit.MILLISECONDS)

这里特别适合聚合型接口。

商品详情页里,价格如果超时:

  • 未必需要整个请求失败
  • 可以返回兜底值
  • 让页面主体先出来

这类思路在线上非常常见:核心字段强一致,边缘字段弱依赖


4. allOf 负责“等大家都结束”

CompletableFuture.allOf(productFuture, stockFuture, priceFuture, promotionFuture)

它返回的是 CompletableFuture<Void>,本身不直接给你各自结果。
所以我们常见写法是后面再 join()

.thenApply(v -> new ProductDetail(
    productFuture.join(),
    stockFuture.join(),
    priceFuture.join(),
    promotionFuture.join()
))

这里之所以安全,是因为 allOf 已经保证这些任务都结束了。


再看一个时序图:请求是怎么流动的?

sequenceDiagram
    participant C as Client
    participant A as API
    participant P as ProductService
    participant S as StockService
    participant R as PriceService
    participant M as PromotionService

    C->>A: 请求商品详情
    A->>P: 异步查询商品
    A->>S: 异步查询库存
    A->>R: 异步查询价格
    P-->>A: 返回商品(category=keyboard)
    A->>M: 基于category异步查营销
    S-->>A: 返回库存
    Note over R,A: 超过800ms触发降级
    M-->>A: 返回营销信息
    A-->>C: 聚合后返回详情

常用编排模式总结

模式一:并行聚合

多个无依赖任务同时执行,最后合并。

CompletableFuture<A> fa = ...;
CompletableFuture<B> fb = ...;

CompletableFuture<Result> future = fa.thenCombine(fb, Result::new);

适合:

  • 聚合页
  • 报表统计
  • 多下游接口并发查询

模式二:串行依赖

后续步骤依赖前一步结果。

CompletableFuture<UserProfile> future =
    getUserIdAsync()
        .thenCompose(this::getUserAsync)
        .thenCompose(this::getProfileAsync);

适合:

  • 先鉴权,再查用户,再查画像
  • 先建订单,再锁库存,再发优惠券

模式三:竞争返回

多个来源哪个先返回用哪个。

CompletableFuture<Object> future = CompletableFuture.anyOf(sourceA, sourceB, sourceC);

适合:

  • 多机房竞速
  • 主备源切换
  • 多缓存层抢答

不过要注意:anyOf 返回后,其他任务默认不会自动取消。这点很多人会忽略,后面会讲。


常见坑与排查

这一节我尽量讲得“接地气”一点,都是实际项目里很常见的问题。

坑 1:误用默认线程池,导致吞吐异常

现象

  • 压测时接口波动大
  • CPU 不高,但响应时间抖动明显
  • 异步任务堆积,日志线程名常见 ForkJoinPool.commonPool-worker-*

原因

默认 commonPool 并不适合承载大量 IO 阻塞任务,比如:

  • 调数据库
  • 调 HTTP 接口
  • 调 RPC 服务

排查建议

  • 打日志输出线程名
  • 看是否大量任务跑在 ForkJoinPool.commonPool
  • 观察线程池活跃数、队列长度、拒绝次数

建议

业务场景中显式传入自定义线程池:

CompletableFuture.supplyAsync(() -> queryRemote(), customExecutor);

坑 2:thenApply 写成了“Future 套 Future”

错误示例

CompletableFuture<CompletableFuture<User>> future =
    getUserIdAsync().thenApply(this::getUserAsync);

这时后续链路会很难用。

正确写法

CompletableFuture<User> future =
    getUserIdAsync().thenCompose(this::getUserAsync);

判断口诀

返回普通值,用 thenApply;返回 Future,用 thenCompose


坑 3:异常被包装后看不清根因

join() 抛的是 CompletionExceptionget() 抛的是 ExecutionException
真正的业务异常,通常藏在 getCause() 里。

示例

try {
    future.join();
} catch (CompletionException e) {
    Throwable root = e.getCause();
    root.printStackTrace();
}

排查建议

线上日志不要只打印:

log.error("future failed: {}", e.getMessage());

而要打印完整堆栈和根因。


坑 4:allOf() 成功了,但取结果时报错

严格说,allOf() 并不是“所有任务都成功”,而是“所有任务都完成”。
如果其中有异常,后续 join() 一样会炸。

建议做法

对可降级任务提前兜底:

future.exceptionally(ex -> defaultValue);

如果是关键任务,就不要吞异常,让主流程感知失败。


坑 5:超时了,但底层任务还在跑

这个坑很真实。

比如你用了:

future.orTimeout(500, TimeUnit.MILLISECONDS)

主流程超时报错了,但底层那个 HTTP/RPC 调用如果本身不支持中断,它仍可能继续占资源。

这意味着什么?

  • 超时只是调用方“不等了”
  • 不代表下游真的“停了”

建议

  • 下游客户端本身也要配置连接超时、读超时
  • 不要把 CompletableFuture 超时当成唯一超时控制手段
  • 真正要“止损”,要结合网络客户端超时与线程池隔离

坑 6:在回调里做重活,拖慢线程池

例如:

future.thenApply(result -> {
    // 大量计算或阻塞IO
    return heavyProcess(result);
});

如果这一步很重,你可能不希望它继续占用前一个阶段的执行线程。

可以改成:

future.thenApplyAsync(this::heavyProcess, customExecutor);

什么时候用 Async 版本?

  • 回调逻辑耗时明显
  • 回调里有阻塞操作
  • 你想把不同阶段隔离到不同线程池

安全/性能最佳实践

这一节我会给偏“工程化”的建议,适合你直接带回项目里用。

1. 线程池按任务类型隔离

不要把所有异步任务都丢进一个池子。

建议至少区分:

  • IO 密集型池:远程调用、数据库访问
  • CPU 密集型池:计算、规则匹配、加解密

原因很简单:

  • IO 会阻塞线程
  • CPU 任务需要尽量少上下文切换

一个粗略经验:

  • IO 密集型线程数可略大于 CPU 核数
  • CPU 密集型线程数通常接近 CPU 核数

最终还是要靠压测定。


2. 聚合接口必须设置超时与降级边界

聚合接口最怕“被最慢依赖拖死”。

建议给每个下游定义清楚:

  • 是否核心依赖
  • 最大等待时间
  • 超时后的默认值
  • 是否允许空结果返回

例如:

下游服务是否核心超时策略降级策略
商品信息500ms失败即整体失败
库存信息300ms返回 0 或未知
价格信息300ms返回兜底价格
营销信息200ms返回无活动

这张表在接口设计时非常有用。


3. 不要滥用 join()

join() 用起来很顺手,但如果你在中间链路过早 join(),就会把异步流程重新变回同步阻塞。

不推荐

Product product = productFuture.join();
CompletableFuture<Promotion> promotionFuture = ...

这样会让当前线程卡住等待商品结果。

更推荐

CompletableFuture<Promotion> promotionFuture =
    productFuture.thenCompose(product -> ...);

原则是:

尽量把阻塞留到最外层,只在真正需要拿最终结果时 join()


4. 给异步链路补上日志上下文

这是线上排查特别关键的一点。
异步一多,请求日志很容易“串不起来”。

建议至少记录:

  • 请求 ID / traceId
  • 阶段名称
  • 开始时间、结束时间
  • 执行线程名
  • 是否命中降级

如果你在 Spring 项目里,还要特别注意:

  • ThreadLocal 上下文不会自动跨线程传递
  • MDC 日志上下文可能丢失
  • 安全上下文也可能丢失

这时可以考虑:

  • 手工透传上下文
  • 自定义 Executor
  • 用装饰器包装任务

5. 对共享状态保持克制

异步回调里最怕随手改共享对象,例如:

Map<String, Object> map = new HashMap<>();
future1.thenAccept(v -> map.put("a", v));
future2.thenAccept(v -> map.put("b", v));

这在多线程下可能有并发问题。

建议:

  • 优先使用不可变对象
  • 在最终汇总时一次性组装
  • 如果确实要共享写入,用线程安全容器并明确边界

6. 不要把 CompletableFuture 当消息队列

它适合:

  • 单机内并发编排
  • 请求生命周期内的异步组合

它不适合直接承担:

  • 持久化任务队列
  • 跨进程可靠投递
  • 长时间后台任务管理

如果你的任务需要:

  • 断点续跑
  • 重试补偿
  • 持久化状态
  • 跨服务异步解耦

那更适合 MQ、调度系统或工作流引擎。


逐步验证清单

如果你准备把 CompletableFuture 用到项目里,我建议按下面顺序验证,而不是一步上复杂编排。

第一步:验证并发是否真的生效

  • 看总耗时是否接近最慢任务,而不是所有任务之和
  • 打印线程名,确认确实在线程池中运行

第二步:验证异常传播路径

  • 人为让某个任务抛异常
  • 确认主流程是否按预期失败或降级
  • 检查日志里能否看到真实根因

第三步:验证超时行为

  • 人为让某个任务 sleep 更久
  • 确认是否命中 orTimeout / completeOnTimeout
  • 确认超时后整体 SLA 是否达标

第四步:验证线程池边界

  • 压测时观察活跃线程数、队列长度、拒绝策略
  • 确认是否发生线程池打满
  • 确认是否有任务堆积导致雪崩

第五步:验证降级结果是否可接受

  • 页面/接口拿到默认值后,前端或调用方是否能正确处理
  • 不要只关注“程序没报错”,还要看“业务是否能兜住”

一个更贴近生产的取舍建议

很多文章会把 CompletableFuture 讲得像“银弹”,但我想提醒一个边界:

适合用它的情况

  • 一个请求里有多个下游调用
  • 任务依赖关系清晰
  • 需要并发提速
  • 需要在本进程内完成聚合与降级

不太适合的情况

  • 链路过长,回调嵌套层数太深
  • 业务状态机复杂,有很多分支和补偿
  • 涉及跨服务长事务
  • 需要持久化、重试、审计、人工介入

如果你的业务已经发展到“异步步骤很多、失败恢复复杂”,那就应该考虑:

  • 领域工作流
  • 任务调度平台
  • MQ + 消费者编排
  • 工作流引擎

换句话说:

CompletableFuture 很适合做“请求内并发编排”,但不应该无限承担“系统级异步流程”的职责。


总结

CompletableFuture 真正厉害的地方,不是“把代码改成异步”,而是让你能更自然地表达这些关系:

  • 哪些任务可以并行
  • 哪些任务必须依赖前置结果
  • 哪些任务失败可以降级
  • 哪些任务超时就该止损

如果你刚开始在项目里落地,我给你的可执行建议是:

  1. 先从聚合查询场景开始,这是最容易见效的入口
  2. 始终使用自定义线程池,不要默认依赖 commonPool
  3. 优先掌握 thenApply / thenCompose / thenCombine / allOf / exceptionally,这几个就能覆盖大多数场景
  4. 把超时、降级、日志打通,否则异步代码上线后很难排查
  5. 只在最外层 join(),中途尽量保持链式编排,不要过早阻塞

最后给你一个简短判断标准:

  • 如果你现在的代码是“多个接口串行查、总耗时长、异常处理散”,那 CompletableFuture 值得上。
  • 如果你的业务已经演变成“复杂工作流 + 补偿 + 状态持久化”,那就别硬扛,应该升级架构手段。

学会 CompletableFuture 后,你会发现并发代码不一定非得写得很“硬核”,也可以写得清楚、克制,而且足够工程化。


分享到:

上一篇
《区块链智能合约安全审计实战:从常见漏洞识别到自动化检测流程搭建-211》
下一篇
《从提示工程到 RAG 落地:中级开发者构建企业级 AI 知识问答系统实战指南》