跳转到内容
123xiao | 无名键客

《Java 中基于 CompletableFuture 的异步编排实战:从并发优化到异常处理落地》

字数: 0 阅读时长: 1 分钟

Java 中基于 CompletableFuture 的异步编排实战:从并发优化到异常处理落地

在很多 Java 业务系统里,我们最早写的代码往往是“串行思维”:

  1. 查用户
  2. 查订单
  3. 查库存
  4. 拼装结果
  5. 返回

这样写当然没错,逻辑直观、调试方便。但只要这些步骤依赖远程调用、数据库查询或者第三方接口,整体耗时就会被慢请求一层层叠加。
我自己在做聚合接口时就踩过这个坑:单个接口都只要几十毫秒,看起来不慢,但串起来就是三四百毫秒,接口一多,延迟就开始让人难受。

这时候,CompletableFuture 就非常适合上场。它不只是“异步执行一个任务”,更重要的是它能做异步编排:并行、串联、合并、兜底、超时、异常恢复,一套链路都能比较自然地表达出来。

这篇文章我会从“真实开发怎么落地”的角度来讲,重点不放在 API 罗列,而是带你写一个可运行的聚合查询示例,再把常见坑、异常处理方式和性能边界讲透。


背景与问题

先看一个很典型的业务场景:商品详情页聚合接口。

前端请求商品详情时,后端通常需要同时拿到:

  • 商品基础信息
  • 库存信息
  • 价格信息
  • 营销信息

如果你用串行方式写,大概像这样:

public ProductDetailVO getDetail(Long productId) {
    Product product = productService.getProduct(productId);
    Stock stock = stockService.getStock(productId);
    Price price = priceService.getPrice(productId);
    Promotion promotion = promotionService.getPromotion(productId);
    return ProductDetailVO.of(product, stock, price, promotion);
}

问题很明显:

  • 总耗时 = 各步骤耗时之和
  • 任意一步抖动,整体都会变慢
  • 某个非核心服务失败时,很难优雅降级
  • 想加超时、重试、兜底,代码会越来越乱

更糟的是,很多人会尝试手写线程池 + Future 来优化,但最后会发现:

  • Future 只能拿结果,编排能力弱
  • 多任务依赖和异常处理写起来很别扭
  • 代码可读性迅速下降

所以我们真正要解决的不是“开线程”,而是:

  1. 如何把无依赖任务并行化
  2. 如何把有依赖任务串起来
  3. 如何在失败、超时、部分结果缺失时仍然可控地返回
  4. 如何避免线程池被打爆、异常被吞掉、主线程莫名卡住

前置知识与环境准备

本文示例基于:

  • JDK 8+
  • 建议 JDK 9+,因为有 orTimeout / completeOnTimeout
  • 一个自定义线程池,不建议直接依赖默认公共线程池

Maven 项目无需额外依赖,CompletableFuture 来自 JDK 标准库。


核心原理

先别急着背 API,我建议先抓住 4 个最关键的能力。

1. 异步执行:把任务扔到线程池里

CompletableFuture.supplyAsync(() -> queryData(), executor);
  • supplyAsync:有返回值
  • runAsync:无返回值

这一步只是“启动异步任务”。

2. 串行编排:上一步结果作为下一步输入

future.thenApply(result -> transform(result));

常用区别:

  • thenApply:同步转换结果
  • thenCompose:把“异步套异步”拍平
  • thenAccept:消费结果但不返回值

3. 并行合并:多个任务一起跑,最后汇总

CompletableFuture.allOf(f1, f2, f3)

适合“都完成后再统一收集结果”。

如果只关心“谁先完成用谁”,可以用:

CompletableFuture.anyOf(f1, f2, f3)

4. 异常处理:别让异常失控

这是落地里最重要的一块。

常见方法:

  • exceptionally:异常兜底,返回替代值
  • handle:不管成功失败都处理
  • whenComplete:做日志、埋点,不改变结果

我一般这样记:

  • 要改结果exceptionally / handle
  • 只记录,不改结果whenComplete

一张图看懂 CompletableFuture 编排方式

flowchart LR
    A[请求商品详情] --> B[并行查商品基础信息]
    A --> C[并行查库存]
    A --> D[并行查价格]
    A --> E[并行查营销]
    B --> F[汇总结果]
    C --> F
    D --> F
    E --> F
    F --> G[返回详情页数据]

上图是最基础的“并行汇总”。
但真实业务里,常常还会有依赖关系,比如:

  • 先查商品基础信息
  • 再根据商品分类去查营销规则

这时就要用串行链式编排。


核心 API 的使用心法

thenApplythenCompose 的区别

这是中级开发最容易混淆的点之一。

thenApply

上一步返回 T,这一步把它转成 R

CompletableFuture<String> future =
    CompletableFuture.supplyAsync(() -> "hello")
        .thenApply(s -> s + " world");

thenCompose

上一步返回 T,这一步再发起一个新的异步任务,返回 CompletableFuture<R>
thenCompose 会帮你自动“拍平”。

CompletableFuture<String> future =
    CompletableFuture.supplyAsync(() -> 1)
        .thenCompose(id -> CompletableFuture.supplyAsync(() -> "user-" + id));

如果你用错成 thenApply,就会得到:

CompletableFuture<CompletableFuture<String>>

这几乎总不是你想要的。


异步链路时序图

sequenceDiagram
    participant Client as 调用方
    participant CF as CompletableFuture编排层
    participant P as ProductService
    participant S as StockService
    participant PR as PriceService
    participant M as PromotionService

    Client->>CF: 请求商品详情
    par 并行查询
        CF->>P: 查询商品
        CF->>S: 查询库存
        CF->>PR: 查询价格
        CF->>M: 查询营销
    end
    P-->>CF: 商品结果
    S-->>CF: 库存结果
    PR-->>CF: 价格结果
    M-->>CF: 营销结果/异常
    CF->>CF: 异常兜底/默认值填充
    CF-->>Client: 聚合结果

实战代码(可运行)

下面我们写一个可以直接运行的示例:模拟商品详情聚合接口。

功能目标:

  • 商品、库存、价格、营销并行查询
  • 营销服务偶发失败,使用默认值兜底
  • 对整体链路做超时控制
  • 汇总结果返回
  • 打印总耗时

为了方便本地运行,下面所有代码放在一个文件中也可以跑。

import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Supplier;

public class CompletableFutureDemo {

    private static final ExecutorService BIZ_EXECUTOR = new ThreadPoolExecutor(
            8,
            16,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            new ThreadFactory() {
                private int index = 1;
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "biz-cf-" + index++);
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public static void main(String[] args) {
        ProductFacade facade = new ProductFacade();

        long start = System.currentTimeMillis();
        ProductDetailVO detail = facade.getProductDetail(1001L);
        long cost = System.currentTimeMillis() - start;

        System.out.println("最终结果: " + detail);
        System.out.println("总耗时: " + cost + " ms");

        BIZ_EXECUTOR.shutdown();
    }

    static class ProductFacade {

        private final ProductService productService = new ProductService();
        private final StockService stockService = new StockService();
        private final PriceService priceService = new PriceService();
        private final PromotionService promotionService = new PromotionService();

        public ProductDetailVO getProductDetail(Long productId) {
            CompletableFuture<Product> productFuture =
                    CompletableFuture.supplyAsync(wrap(() -> productService.getProduct(productId)), BIZ_EXECUTOR)
                            .whenComplete((r, e) -> log("productFuture", r, e));

            CompletableFuture<Stock> stockFuture =
                    CompletableFuture.supplyAsync(wrap(() -> stockService.getStock(productId)), BIZ_EXECUTOR)
                            .whenComplete((r, e) -> log("stockFuture", r, e));

            CompletableFuture<Price> priceFuture =
                    CompletableFuture.supplyAsync(wrap(() -> priceService.getPrice(productId)), BIZ_EXECUTOR)
                            .whenComplete((r, e) -> log("priceFuture", r, e));

            CompletableFuture<Promotion> promotionFuture =
                    CompletableFuture.supplyAsync(wrap(() -> promotionService.getPromotion(productId)), BIZ_EXECUTOR)
                            .exceptionally(ex -> {
                                System.out.println("[WARN] 营销服务失败,使用默认营销信息: " + ex.getMessage());
                                return new Promotion("默认无活动");
                            })
                            .whenComplete((r, e) -> log("promotionFuture", r, e));

            CompletableFuture<ProductDetailVO> detailFuture =
                    CompletableFuture.allOf(productFuture, stockFuture, priceFuture, promotionFuture)
                            .thenApply(v -> new ProductDetailVO(
                                    productFuture.join(),
                                    stockFuture.join(),
                                    priceFuture.join(),
                                    promotionFuture.join()
                            ))
                            .orTimeout(1500, TimeUnit.MILLISECONDS)
                            .exceptionally(ex -> {
                                throw new RuntimeException("聚合商品详情失败: " + ex.getMessage(), ex);
                            });

            return detailFuture.join();
        }

        private <T> Supplier<T> wrap(Supplier<T> supplier) {
            return () -> {
                try {
                    return supplier.get();
                } catch (Exception e) {
                    throw new CompletionException(e);
                }
            };
        }

        private void log(String taskName, Object result, Throwable throwable) {
            if (throwable != null) {
                System.out.println("[ERROR] " + taskName + " failed: " + throwable.getMessage());
            } else {
                System.out.println("[INFO] " + taskName + " success: " + result);
            }
        }
    }

    static class ProductService {
        public Product getProduct(Long productId) {
            sleep(300);
            return new Product(productId, "机械键盘");
        }
    }

    static class StockService {
        public Stock getStock(Long productId) {
            sleep(500);
            return new Stock(productId, 28);
        }
    }

    static class PriceService {
        public Price getPrice(Long productId) {
            sleep(400);
            return new Price(productId, 399.00);
        }
    }

    static class PromotionService {
        private final Random random = new Random();

        public Promotion getPromotion(Long productId) {
            sleep(600);
            if (random.nextBoolean()) {
                throw new RuntimeException("promotion service unavailable");
            }
            return new Promotion("满300减30");
        }
    }

    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("thread interrupted", e);
        }
    }

    static class Product {
        private final Long id;
        private final String name;

        public Product(Long id, String name) {
            this.id = id;
            this.name = name;
        }

        @Override
        public String toString() {
            return "{id=" + id + ", name='" + name + "'}";
        }
    }

    static class Stock {
        private final Long productId;
        private final int available;

        public Stock(Long productId, int available) {
            this.productId = productId;
            this.available = available;
        }

        @Override
        public String toString() {
            return "{productId=" + productId + ", available=" + available + "}";
        }
    }

    static class Price {
        private final Long productId;
        private final double amount;

        public Price(Long productId, double amount) {
            this.productId = productId;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "{productId=" + productId + ", amount=" + amount + "}";
        }
    }

    static class Promotion {
        private final String desc;

        public Promotion(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "{desc='" + desc + "'}";
        }
    }

    static class ProductDetailVO {
        private final Product product;
        private final Stock stock;
        private final Price price;
        private final Promotion promotion;

        public ProductDetailVO(Product product, Stock stock, Price price, Promotion promotion) {
            this.product = product;
            this.stock = stock;
            this.price = price;
            this.promotion = promotion;
        }

        @Override
        public String toString() {
            return "ProductDetailVO{" +
                    "product=" + product +
                    ", stock=" + stock +
                    ", price=" + price +
                    ", promotion=" + promotion +
                    '}';
        }
    }
}

这段代码里几个关键点,值得你重点看

1. 使用自定义线程池,而不是默认线程池

CompletableFuture.supplyAsync(task, BIZ_EXECUTOR)

默认的 ForkJoinPool.commonPool() 并不是不能用,但业务系统里我更建议显式传入线程池:

  • 方便隔离不同业务
  • 方便压测和调优
  • 线程数、队列长度、拒绝策略都可控
  • 避免公共线程池被其他异步任务拖垮

2. 用 allOf 做并行汇总

CompletableFuture.allOf(productFuture, stockFuture, priceFuture, promotionFuture)

注意:allOf 本身不直接返回聚合结果,只表示“都完成了”。
所以后面还要再用 join() 把各自结果取出来。

3. 对非核心依赖做降级

营销信息通常不是强一致核心数据,所以:

.exceptionally(ex -> new Promotion("默认无活动"))

这类兜底非常实用。
我的经验是:不要把所有异常都往上抛,而是按“核心依赖 / 非核心依赖”做分层处理。

4. 超时控制不能省

.orTimeout(1500, TimeUnit.MILLISECONDS)

异步并不意味着不会卡住。
如果底层服务持续慢响应,线程池仍然会被占住,调用方也一样会等死。


进一步扩展:有依赖关系时怎么写

上面的四个查询彼此独立,适合并行。
但如果营销规则依赖商品分类,就要写成“先查商品,再查营销”。

import java.util.concurrent.*;

public class ThenComposeExample {

    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(4);

    public static void main(String[] args) {
        CompletableFuture<String> resultFuture =
                CompletableFuture.supplyAsync(() -> getProductCategory(1001L), EXECUTOR)
                        .thenCompose(category ->
                                CompletableFuture.supplyAsync(() -> getPromotionByCategory(category), EXECUTOR)
                        )
                        .exceptionally(ex -> "默认分类活动");

        System.out.println(resultFuture.join());
        EXECUTOR.shutdown();
    }

    static String getProductCategory(Long productId) {
        sleep(200);
        return "keyboard";
    }

    static String getPromotionByCategory(String category) {
        sleep(300);
        return category + " 类目限时折扣";
    }

    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

如果你看到“异步任务 A 完成后,再发起异步任务 B”,优先想到 thenCompose


状态变化图:一个 Future 是怎么走完生命周期的

stateDiagram-v2
    [*] --> Created
    Created --> Running: supplyAsync/runAsync
    Running --> Success: 正常完成
    Running --> Failed: 抛出异常
    Running --> Timeout: 超时
    Failed --> Recovered: exceptionally/handle兜底
    Success --> [*]
    Recovered --> [*]
    Timeout --> [*]

这张图的意义在于:
异常不是终点,超时也不是终点,关键是你有没有定义“失败后的业务行为”。


常见坑与排查

这部分很重要,很多线上问题不是不会用 API,而是细节没处理好。

坑 1:误用默认线程池,结果线上抖动严重

现象

  • 某些接口偶发变慢
  • CPU 不高,但请求堆积
  • 异步任务互相影响

原因

默认公共线程池和别的任务共用,且不一定适合你的业务模型。
如果任务里还有阻塞 IO(比如远程调用、数据库查询),问题更明显。

建议

  • 业务异步任务使用独立线程池
  • IO 密集型线程池大小通常可比 CPU 核数更大,但别无脑放大
  • 配合监控:活跃线程数、队列长度、拒绝次数、任务耗时

坑 2:join() / get() 用错位置,异步又写回串行了

例如:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> queryA(), executor);
String a = f1.join();

CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> queryB(a), executor);
String b = f2.join();

这虽然用了 CompletableFuture,但主线程每步都在等,本质还是串行阻塞。

正确思路

  • 能链式编排就链式编排
  • allOf 汇总就不要中途频繁 join

坑 3:异常被 CompletionException 包了一层,看不懂真实原因

这是最常见的排查痛点之一。

future.join();

如果任务异常,join() 会抛出 CompletionException
真正的业务异常通常在:

ex.getCause()

排查建议

打印日志时不要只打印:

ex.getMessage()

最好完整输出堆栈,并关注根因:

Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
cause.printStackTrace();

坑 4:whenComplete 以为能兜底,实际上它不会吞异常

很多人第一次写会这样理解错:

future.whenComplete((r, e) -> {
    if (e != null) {
        System.out.println("出错了");
    }
});

这只是“看到异常”,并没有改变 Future 的失败状态。
如果你要兜底,应该用:

  • exceptionally
  • handle

坑 5:链路超时了,但底层任务还在跑

orTimeout 只能让 CompletableFuture 在上层感知超时,
不等于底层 IO 调用就真的取消了

比如你里面执行的是 HTTP 请求、数据库请求、RPC 调用,
它们是否真正停止,还取决于底层客户端自己的超时机制。

实战建议

超时要分层设置:

  • CompletableFuture 编排超时
  • HTTP/RPC 客户端超时
  • 数据库连接和查询超时

这三个最好一起配。


常见排查路径

如果某个异步聚合接口突然变慢或报错,我一般按这个顺序查:

1. 看线程池

重点看:

  • 当前线程数
  • 活跃线程数
  • 队列积压
  • 拒绝策略触发次数

2. 看每个子任务耗时

给每个 Future 打日志或埋点:

  • 开始时间
  • 结束时间
  • 是否异常
  • 降级是否生效

3. 看异常链

尤其注意:

  • CompletionException
  • ExecutionException
  • 真正 cause

4. 看是否有隐式阻塞

比如:

  • 异步方法里又调用了同步阻塞接口
  • 主线程过早 join
  • 某些公共锁导致线程等待

安全/性能最佳实践

这一节我尽量给“可以直接拿去执行”的建议。

1. 区分核心依赖和非核心依赖

不是所有失败都该直接让接口报错。

一个简单策略:

  • 核心依赖:商品、价格、库存
    失败就整体失败,或者明确返回错误
  • 非核心依赖:营销、推荐、标签
    失败则降级,返回默认值

这样用户体验和系统稳定性会好很多。


2. 线程池隔离,避免相互拖垮

建议至少按业务域拆分线程池,例如:

  • 商品聚合线程池
  • 推荐计算线程池
  • 消息通知线程池

不要所有异步任务都塞进一个池子。

为什么这也算“安全”最佳实践

因为线程池被打满,本质上是一种资源争抢。
如果没有隔离,一个边缘业务的高峰可能拖垮核心接口。


3. 不要在异步任务里做长时间阻塞操作

CompletableFuture 很适合编排,但它并不能神奇地消灭阻塞。
如果你在线程池里执行的是:

  • 大量同步 HTTP 调用
  • 大量数据库慢查询
  • 大量 Thread.sleep
  • 大量锁等待

那最终还是会把线程耗尽。

建议

  • 优先优化底层依赖时延
  • 为阻塞型任务配置合适线程池
  • 有条件时考虑响应式或事件驱动方案,但别为了“新技术”强上

4. 统一超时、降级、日志规范

我很建议团队里沉淀统一模板,例如:

  • 每个远程依赖都要有超时
  • 每个非核心依赖都要定义默认值
  • 每个 Future 都要打成功/失败日志
  • 聚合接口统一输出 traceId、子任务耗时、异常原因

这样问题一来,排查速度会快很多。


5. 慎用 join(),更慎用循环里 join()

下面这种写法很危险:

for (Long id : ids) {
    CompletableFuture<Result> future = CompletableFuture.supplyAsync(() -> query(id), executor);
    results.add(future.join());
}

这又退化成串行了。

更好的做法

先全部提交,再统一收集:

List<CompletableFuture<Result>> futures = ids.stream()
        .map(id -> CompletableFuture.supplyAsync(() -> query(id), executor))
        .toList();

List<Result> results = futures.stream()
        .map(CompletableFuture::join)
        .toList();

如果你在 JDK 8,没有 toList(),可以用 Collectors.toList()


6. 注意上下文传递问题

在 Web 场景里,异步线程经常拿不到主线程上下文,比如:

  • ThreadLocal
  • traceId
  • 登录态信息
  • 租户信息

如果你依赖这些上下文,一定要显式传递或使用支持上下文传播的方案。
否则很容易出现:

  • 日志 trace 丢失
  • 多租户串数据
  • 审计字段缺失

这是我在实际项目里见过很多次的问题,排查起来很烦。


逐步验证清单

如果你准备把 CompletableFuture 用到生产代码里,我建议按这个清单自测一遍:

  • 所有异步任务是否都指定了自定义线程池
  • 并行任务是否真的并行,没有中途过早 join
  • 核心依赖和非核心依赖是否区分处理
  • 是否配置了超时
  • 是否有异常兜底逻辑
  • 是否记录了子任务耗时和异常日志
  • 线程池参数是否根据压测数据调整过
  • 底层 HTTP/RPC/DB 是否也配置了超时
  • 是否考虑了上下文传递
  • 是否验证过某个子任务失败时整体返回是否符合预期

一个简单的取舍建议:什么时候适合用 CompletableFuture

很适合:

  • 聚合查询接口
  • 多个独立远程调用并行执行
  • 有简单依赖关系的异步链路
  • 需要异常兜底、超时控制的中等复杂业务

不太适合单靠它硬扛的场景:

  • 超复杂 DAG 编排
  • 海量任务调度
  • 强依赖非阻塞 IO 模型
  • 需要跨进程工作流编排

换句话说,CompletableFuture 非常适合“应用层异步编排”,
但它不是万能工作流引擎,也不是性能灵药。


总结

CompletableFuture 真正的价值,不只是“把代码改成异步”,而是让你能更清晰地表达:

  • 哪些任务可以并行
  • 哪些任务有前后依赖
  • 哪些异常该失败,哪些该兜底
  • 哪些调用必须限时结束

如果你只记住几个落地建议,我建议记这 5 条:

  1. 优先并行化无依赖任务,接口延迟通常会立刻改善
  2. 一定使用自定义线程池,别把业务命运交给公共线程池
  3. 核心依赖失败就显式报错,非核心依赖失败就降级
  4. 超时要分层控制:编排层、RPC/HTTP 层、数据库层都要有
  5. 排查问题先看线程池、再看子任务耗时、最后看异常根因

最后说句很实在的话:
CompletableFuture 写得好,代码会很优雅;写得不好,异步链会比同步代码更难维护。
所以别一上来就追求“全链路异步化”,先从一个典型聚合接口开始,把并行、超时、兜底、日志这四件事做好,收益通常就已经很明显了。


分享到:

上一篇
《区块链智能合约安全审计实战:从常见漏洞识别到自动化检测流程搭建》
下一篇
《前端性能实战:基于 Core Web Vitals 的页面加载优化与排查指南》