跳转到内容
123xiao | 无名键客

《Java 中基于 CompletableFuture 的异步编排实战:并行调用、超时控制与异常处理优化》

字数: 0 阅读时长: 1 分钟

Java 中基于 CompletableFuture 的异步编排实战:并行调用、超时控制与异常处理优化

在很多 Java 服务里,真正拖慢接口响应时间的,往往不是某一段复杂计算,而是多个远程调用串行执行:查用户、查订单、查库存、查营销信息……每一步都不算慢,但堆起来就慢了。

这篇文章我想从**“一个接口怎么从串行改成异步编排”**这个角度,带你把 CompletableFuture 用到实战里。重点放在三件事:

  • 并行调用:把可同时执行的任务一起跑
  • 超时控制:别让慢依赖拖垮主流程
  • 异常处理优化:失败时能降级、能定位、能收口

如果你会基本的 Java 并发,这篇内容应该能直接上手。


背景与问题

先看一个典型需求:组装“用户首页”数据。

接口要聚合这些来源:

  1. 用户基本信息
  2. 订单统计
  3. 优惠券信息

如果写成串行,大概像这样:

public HomePageVO getHomePage(Long userId) {
    UserInfo userInfo = userService.getUserInfo(userId);
    OrderSummary orderSummary = orderService.getOrderSummary(userId);
    CouponInfo couponInfo = couponService.getCouponInfo(userId);

    return new HomePageVO(userInfo, orderSummary, couponInfo);
}

问题很明显:

  • 三个调用彼此独立,却被串行执行
  • 任意一个依赖慢,整体都慢
  • 任意一个依赖报错,整个接口直接失败
  • 超时、重试、降级逻辑散落在业务代码里

假设三个接口耗时分别是:

  • 用户服务:80ms
  • 订单服务:120ms
  • 优惠券服务:100ms

串行总耗时接近 300ms+;并行后理想耗时接近 120ms+少量调度开销

这就是异步编排最直接的价值。


前置知识与环境准备

你需要知道什么

建议你先熟悉这些概念:

  • ExecutorService
  • 线程池基础参数
  • Lambda 表达式
  • FutureCompletableFuture 的区别
  • Java 8+ 语法

运行环境

本文示例基于:

  • JDK 9+ 更方便(因为有 orTimeout / completeOnTimeout
  • JDK 8 也能做,后面会补一个兼容思路

核心原理

CompletableFuture 的好处,不只是“异步执行”,而是它支持声明式编排

  • supplyAsync:异步执行有返回值任务
  • runAsync:异步执行无返回值任务
  • thenApply:转换结果
  • thenCompose:串联异步任务
  • thenCombine:合并两个独立结果
  • allOf:等待多个任务全部完成
  • anyOf:等待任一任务完成
  • exceptionally / handle / whenComplete:异常处理与收尾
  • orTimeout / completeOnTimeout:超时控制

一张图先看整体

flowchart LR
    A[请求进入] --> B[并行发起用户服务]
    A --> C[并行发起订单服务]
    A --> D[并行发起优惠券服务]
    B --> E[汇总结果]
    C --> E
    D --> E
    E --> F[返回首页数据]

串行依赖和并行依赖的区别

  • 并行依赖:彼此无前后关系,越早一起发起越好
  • 串行依赖:后一步依赖前一步结果,比如先查用户等级,再决定查哪类权益

很多人第一次用 CompletableFuture,容易把所有步骤都写成链式调用,结果本来能并行的任务被不小心串行了。这个坑我自己也踩过。


一个最小认知模型

你可以把 CompletableFuture 理解成两层:

  1. 任务层:任务在哪个线程池执行
  2. 编排层:多个任务之间怎么组合、异常怎么传播、超时怎么控制

光把任务丢到线程池里不难,难的是把编排关系写清楚。

编排关系示意图

sequenceDiagram
    participant Client as 调用方
    participant CF1 as 用户任务
    participant CF2 as 订单任务
    participant CF3 as 优惠券任务
    participant Agg as 聚合逻辑

    Client->>CF1: supplyAsync()
    Client->>CF2: supplyAsync()
    Client->>CF3: supplyAsync()
    CF1-->>Agg: UserInfo
    CF2-->>Agg: OrderSummary
    CF3-->>Agg: CouponInfo
    Agg-->>Client: HomePageVO

实战代码(可运行)

下面我给一个完整可运行示例。为了方便演示,我用 sleep 模拟远程调用。

示例目标

实现一个首页聚合接口,要求:

  • 三个依赖并行调用
  • 订单服务超时后返回默认值
  • 优惠券服务异常时降级
  • 最终统一汇总
  • 使用自定义线程池,避免误用公共线程池

完整代码

import java.time.LocalTime;
import java.util.concurrent.*;
import java.util.function.Supplier;

public class CompletableFutureDemo {

    private static final ExecutorService BIZ_POOL = new ThreadPoolExecutor(
            8,
            16,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(200),
            new ThreadFactory() {
                private int index = 1;

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "cf-biz-" + index++);
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public static void main(String[] args) {
        try {
            HomePageVO result = getHomePage(1001L);
            log("最终结果:" + result);
        } finally {
            BIZ_POOL.shutdown();
        }
    }

    public static HomePageVO getHomePage(Long userId) {
        CompletableFuture<UserInfo> userFuture = CompletableFuture
                .supplyAsync(() -> getUserInfo(userId), BIZ_POOL)
                .whenComplete((r, ex) -> {
                    if (ex == null) {
                        log("用户信息查询完成");
                    } else {
                        log("用户信息查询失败: " + ex.getMessage());
                    }
                });

        CompletableFuture<OrderSummary> orderFuture = CompletableFuture
                .supplyAsync(() -> getOrderSummary(userId), BIZ_POOL)
                .completeOnTimeout(OrderSummary.defaultValue(), 150, TimeUnit.MILLISECONDS)
                .exceptionally(ex -> {
                    log("订单服务异常,降级返回默认值: " + ex.getMessage());
                    return OrderSummary.defaultValue();
                });

        CompletableFuture<CouponInfo> couponFuture = CompletableFuture
                .supplyAsync(() -> getCouponInfo(userId), BIZ_POOL)
                .exceptionally(ex -> {
                    log("优惠券服务异常,降级返回空优惠券: " + ex.getMessage());
                    return CouponInfo.empty();
                });

        CompletableFuture<Void> allDone = CompletableFuture.allOf(userFuture, orderFuture, couponFuture);

        return allDone.thenApply(v -> {
            UserInfo userInfo = userFuture.join();
            OrderSummary orderSummary = orderFuture.join();
            CouponInfo couponInfo = couponFuture.join();
            return new HomePageVO(userInfo, orderSummary, couponInfo);
        }).join();
    }

    private static UserInfo getUserInfo(Long userId) {
        sleep(80);
        return new UserInfo(userId, "Alice");
    }

    private static OrderSummary getOrderSummary(Long userId) {
        sleep(200); // 故意模拟超时
        return new OrderSummary(12, 999.50);
    }

    private static CouponInfo getCouponInfo(Long userId) {
        sleep(60);
        throw new RuntimeException("coupon service unavailable");
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("thread interrupted", e);
        }
    }

    private static void log(String msg) {
        System.out.println(LocalTime.now() + " [" + Thread.currentThread().getName() + "] " + msg);
    }

    static class HomePageVO {
        private final UserInfo userInfo;
        private final OrderSummary orderSummary;
        private final CouponInfo couponInfo;

        public HomePageVO(UserInfo userInfo, OrderSummary orderSummary, CouponInfo couponInfo) {
            this.userInfo = userInfo;
            this.orderSummary = orderSummary;
            this.couponInfo = couponInfo;
        }

        @Override
        public String toString() {
            return "HomePageVO{" +
                    "userInfo=" + userInfo +
                    ", orderSummary=" + orderSummary +
                    ", couponInfo=" + couponInfo +
                    '}';
        }
    }

    static class UserInfo {
        private final Long userId;
        private final String name;

        public UserInfo(Long userId, String name) {
            this.userId = userId;
            this.name = name;
        }

        @Override
        public String toString() {
            return "UserInfo{" +
                    "userId=" + userId +
                    ", name='" + name + '\'' +
                    '}';
        }
    }

    static class OrderSummary {
        private final int orderCount;
        private final double totalAmount;

        public OrderSummary(int orderCount, double totalAmount) {
            this.orderCount = orderCount;
            this.totalAmount = totalAmount;
        }

        public static OrderSummary defaultValue() {
            return new OrderSummary(0, 0.0);
        }

        @Override
        public String toString() {
            return "OrderSummary{" +
                    "orderCount=" + orderCount +
                    ", totalAmount=" + totalAmount +
                    '}';
        }
    }

    static class CouponInfo {
        private final int availableCount;

        public CouponInfo(int availableCount) {
            this.availableCount = availableCount;
        }

        public static CouponInfo empty() {
            return new CouponInfo(0);
        }

        @Override
        public String toString() {
            return "CouponInfo{" +
                    "availableCount=" + availableCount +
                    '}';
        }
    }
}

代码拆解:一步步看懂

1. 用 supplyAsync 发起并行任务

CompletableFuture<UserInfo> userFuture =
        CompletableFuture.supplyAsync(() -> getUserInfo(userId), BIZ_POOL);

这一步只是“提交任务”,不会阻塞当前线程。

如果你连续写三个 supplyAsync,它们会尽快被线程池调度执行,因此能并行。


2. 使用 completeOnTimeout 做超时兜底

CompletableFuture<OrderSummary> orderFuture = CompletableFuture
        .supplyAsync(() -> getOrderSummary(userId), BIZ_POOL)
        .completeOnTimeout(OrderSummary.defaultValue(), 150, TimeUnit.MILLISECONDS);

意思是:

  • 如果 150ms 内没完成
  • 就直接用默认值补上这个 Future 的结果

这在“非核心字段可接受降级”的场景特别好用。

它和 orTimeout 的区别

  • orTimeout:超时后让任务异常完成
  • completeOnTimeout:超时后让任务正常完成,但结果是默认值

如果你的业务希望“超时算失败,统一走异常逻辑”,用 orTimeout; 如果你希望“超时自动降级,不影响主流程”,用 completeOnTimeout


3. 使用 exceptionally 兜底异常

.exceptionally(ex -> {
    log("优惠券服务异常,降级返回空优惠券: " + ex.getMessage());
    return CouponInfo.empty();
});

这里很实用,但要注意一个点:

  • exceptionally吞掉异常并转换为正常结果
  • 后续链路拿到的是“降级后的值”,而不是异常

这意味着如果你还想记录原始异常,一定要在这里打日志,或者更早用 whenComplete 观测。


4. 用 allOf 等待全部完成,再 join

CompletableFuture<Void> allDone = CompletableFuture.allOf(userFuture, orderFuture, couponFuture);

return allDone.thenApply(v -> {
    UserInfo userInfo = userFuture.join();
    OrderSummary orderSummary = orderFuture.join();
    CouponInfo couponInfo = couponFuture.join();
    return new HomePageVO(userInfo, orderSummary, couponInfo);
}).join();

这段写法非常常见,也比较稳妥:

  • allOf 等待全部 Future 完成
  • 再从各个 Future 中取结果

为什么这里用 join() 而不是 get()

  • join() 不需要处理受检异常,代码更干净
  • 它抛的是 CompletionException
  • 如果你在业务层统一封装异常,join() 常常更顺手

再进一层:串联依赖怎么写

并不是所有场景都适合 allOf。有些任务是有依赖关系的。

比如:

  1. 先查用户信息
  2. 根据用户等级决定权益查询策略
  3. 再查权益信息

这种场景应该用 thenCompose,而不是把第二步也提前并行发出去。

CompletableFuture<UserInfo> userFuture =
        CompletableFuture.supplyAsync(() -> getUserInfo(1001L), BIZ_POOL);

CompletableFuture<String> benefitFuture = userFuture.thenCompose(user ->
        CompletableFuture.supplyAsync(() -> "权益信息: level=" + user.name, BIZ_POOL)
);

依赖编排示意

flowchart TD
    A[查询用户信息] --> B{根据用户结果判断}
    B --> C[查询权益信息]
    C --> D[组装返回值]

异常处理:不是会写就够了,要知道传播规则

这是 CompletableFuture 最容易绕的部分。

常用方法怎么选

exceptionally

只在异常时执行,返回一个兜底值。

future.exceptionally(ex -> defaultValue);

适合:

  • 降级
  • 给默认值
  • 不想中断主流程

handle

不管成功还是失败都会执行,可以统一转换结果。

future.handle((result, ex) -> {
    if (ex != null) {
        return defaultValue;
    }
    return result;
});

适合:

  • 成功失败统一映射
  • 想把异常结果也转成某种领域对象

whenComplete

更像“观察者”,适合记录日志、打指标,但不改变结果

future.whenComplete((result, ex) -> {
    if (ex != null) {
        log("任务失败");
    }
});

适合:

  • 埋点
  • 日志
  • tracing

我个人的习惯是:

  • 记录日志whenComplete
  • 业务兜底exceptionallyhandle

这样职责比较清晰。


常见坑与排查

这一部分很重要,很多线上问题不是不会写,而是“写得像对的,但跑起来不对”。

坑 1:默认线程池用得太随意

如果你写:

CompletableFuture.supplyAsync(() -> doSomething());

没有传线程池时,默认使用 ForkJoinPool.commonPool()

问题在于:

  • 这个公共线程池会被全局共享
  • 一旦任务里有阻塞操作(比如 HTTP、数据库、RPC)
  • 很容易把公共线程池拖慢

建议

  • 业务异步编排,尽量传入专用线程池
  • CPU 密集和 IO 密集任务尽量拆开线程池

坑 2:明明写了异步,结果还是串行

比如这样:

UserInfo user = CompletableFuture.supplyAsync(() -> getUserInfo(userId), BIZ_POOL).join();
OrderSummary order = CompletableFuture.supplyAsync(() -> getOrderSummary(userId), BIZ_POOL).join();
CouponInfo coupon = CompletableFuture.supplyAsync(() -> getCouponInfo(userId), BIZ_POOL).join();

看着用了异步,其实每次都马上 join(),本质上还是串行。

正确思路

先都发起,再统一等待:

CompletableFuture<UserInfo> f1 = CompletableFuture.supplyAsync(() -> getUserInfo(userId), BIZ_POOL);
CompletableFuture<OrderSummary> f2 = CompletableFuture.supplyAsync(() -> getOrderSummary(userId), BIZ_POOL);
CompletableFuture<CouponInfo> f3 = CompletableFuture.supplyAsync(() -> getCouponInfo(userId), BIZ_POOL);

CompletableFuture.allOf(f1, f2, f3).join();

坑 3:异常被包装,看不清真实原因

join() 抛的是 CompletionExceptionget() 抛的是 ExecutionException。真实异常通常藏在 getCause() 里。

排查方式

try {
    future.join();
} catch (CompletionException ex) {
    Throwable cause = ex.getCause();
    cause.printStackTrace();
}

线上排障时,不要只打印外层异常信息,不然你很可能只看到一句:

java.util.concurrent.CompletionException

这几乎没法定位。


坑 4:超时只“超了 Future”,底层任务可能还在跑

这是很多人忽略的一点。

completeOnTimeout / orTimeout 控制的是 CompletableFuture 的完成状态,不等于底层 IO 调用真的被取消了。

也就是说:

  • 你的主流程可能已经返回默认值了
  • 但底层 HTTP/RPC 线程可能还在继续执行

这意味着什么

真正要止损,还要结合:

  • HTTP 客户端超时
  • RPC 框架超时
  • 数据库查询超时
  • 线程中断响应

只靠 CompletableFuture 不足以彻底“停止慢调用”。


坑 5:线程池队列太大,问题被隐藏

很多项目喜欢把队列设成超大,比如几千几万。表面上不报错,实际上:

  • 请求堆积在线程池里
  • 延迟越来越高
  • 最后变成“慢性雪崩”

建议

线程池参数要结合业务压测:

  • 核心线程数
  • 最大线程数
  • 队列大小
  • 拒绝策略

对聚合接口来说,宁可更早暴露容量问题,也不要无限排队。


安全/性能最佳实践

这里我把最实用的建议收成一组,你可以直接拿去对照项目。

1. 为异步编排准备专用线程池

不要默认混用。

ExecutorService ioPool = new ThreadPoolExecutor(
        16, 32, 60, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(500),
        new ThreadPoolExecutor.CallerRunsPolicy()
);

如果你的任务主要是远程调用、网络等待,线程数通常可以比 CPU 核数大一些,但不要拍脑袋定值,最好压测。


2. 区分核心结果和可降级结果

不是所有字段都值得“死等”。

比如首页接口里:

  • 用户身份、登录态:核心字段
  • 优惠券角标、推荐内容:可降级字段

可降级字段建议:

  • 更短超时
  • 失败兜底默认值
  • 单独打监控

核心字段建议:

  • 明确失败策略
  • 不要无脑吞异常

3. 为每个异步任务打上名字和日志

工程里最怕“有 Future,没观测”。

你至少应该能知道:

  • 哪个子任务开始了
  • 耗时多少
  • 是超时还是业务异常
  • 降级是否发生
  • 降级比例是否上升

一个简单包装思路:

public static <T> Supplier<T> timed(String taskName, Supplier<T> supplier) {
    return () -> {
        long start = System.currentTimeMillis();
        try {
            return supplier.get();
        } finally {
            long cost = System.currentTimeMillis() - start;
            System.out.println(taskName + " cost=" + cost + "ms");
        }
    };
}

使用方式:

CompletableFuture<UserInfo> future =
        CompletableFuture.supplyAsync(
                timed("queryUserInfo", () -> getUserInfo(1001L)),
                BIZ_POOL
        );

4. 尽量避免在异步链里做阻塞嵌套

例如在一个 supplyAsync 里面又去 join 另一个 Future,这很容易形成:

  • 线程池占满
  • 线程互相等待
  • 吞吐下降

尽量让编排写在 Future 链外层,而不是在任务内部互等。


5. 结合限流、熔断、超时一起看

CompletableFuture 解决的是编排问题,不是全部的稳定性问题。

如果下游经常抖动,还应配合:

  • 限流
  • 熔断
  • 隔离
  • 重试(谨慎)
  • 舱壁模式

尤其是重试,别直接在异步聚合层无脑重试,不然可能把下游放大打挂。


6. 对线程上下文传播保持警惕

在异步线程里,下面这些上下文可能会丢失:

  • ThreadLocal
  • MDC 日志上下文
  • 链路追踪上下文
  • 用户登录上下文

如果项目依赖这些信息,需要:

  • 手动透传
  • 使用支持上下文传播的封装线程池
  • 或引入统一的任务装饰器

这个问题在本地测试时不明显,但线上日志一旦串不起来,排查会非常痛苦。


逐步验证清单

如果你准备在项目里落地,我建议按这个顺序验证。

第一步:先验证并行是否真的生效

方法很简单:

  • 打开始时间
  • 发起多个 Future
  • 最后统计总耗时

如果总耗时仍接近串行总和,说明你很可能哪里提前 join 了。


第二步:验证超时降级是否符合预期

重点看两个问题:

  • 主流程是否能按预期快速返回默认值
  • 超时后的底层调用是否还在消耗资源

第二点经常被忽略。


第三步:验证异常传播路径

建议分别模拟:

  • 单个依赖抛异常
  • 多个依赖同时异常
  • 核心任务异常、非核心任务异常

确认:

  • 哪些异常被吞掉
  • 哪些异常会向上抛出
  • 日志中是否能定位真实原因

第四步:压测线程池

压测时重点观察:

  • 活跃线程数
  • 队列长度
  • 平均耗时 / TP99
  • 拒绝次数
  • 降级次数

如果线程池队列一直积压,说明线程池大小或下游能力有问题,不是把队列再调大就能解决的。


JDK 8 怎么办?

如果你还在 JDK 8,orTimeoutcompleteOnTimeout 不可用。

常见做法有两种:

  1. 使用 ScheduledExecutorService 自己实现超时补偿
  2. 借助成熟工具库封装超时 Future

一个简化思路如下:

import java.util.concurrent.*;

public class Jdk8TimeoutDemo {

    private static final ScheduledExecutorService SCHEDULER =
            Executors.newScheduledThreadPool(1);

    public static <T> CompletableFuture<T> completeOnTimeout(
            CompletableFuture<T> future, T defaultValue, long timeout, TimeUnit unit) {

        SCHEDULER.schedule(() -> future.complete(defaultValue), timeout, unit);
        return future;
    }
}

注意,这只是演示思路。生产里要考虑:

  • 定时任务线程池资源
  • 原 Future 已完成时的竞争
  • 底层任务取消
  • 指标埋点

一个更贴近工程的落地建议

如果你的接口里异步任务越来越多,我不建议把所有 CompletableFuture 逻辑都堆在 Controller 或 Service 主函数里。

更好的做法是分层:

  • 任务层:每个依赖调用一个独立方法
  • 编排层:负责并行、串行、超时、异常兜底
  • 聚合层:只做结果组装

例如:

public CompletableFuture<UserInfo> queryUser(Long userId) { ... }

public CompletableFuture<OrderSummary> queryOrder(Long userId) { ... }

public CompletableFuture<CouponInfo> queryCoupon(Long userId) { ... }

public HomePageVO aggregate(Long userId) { ... }

这样做的好处:

  • 更容易单测
  • 更容易替换降级策略
  • 更容易查哪个依赖出了问题

总结

CompletableFuture 真正强的地方,不是“能异步”,而是它让你把任务之间的关系写清楚:

  • 能并行的就并行,降低总耗时
  • 可降级的加超时和默认值,避免拖垮主流程
  • 异常统一收口,既能定位也能兜底
  • 配合专用线程池和观测手段,才能真正在线上可用

最后给几个我认为最实用的可执行建议:

  1. 先发起,再统一等待,不要边创建边 join
  2. 一定使用自定义线程池,别把业务阻塞任务丢进公共池
  3. 区分核心链路和可降级链路,超时策略不要一刀切
  4. 异常日志要打印真实 cause,别只看 CompletionException
  5. 别把超时当取消,底层调用的超时仍需单独配置

如果你刚开始落地,可以先从一个简单聚合接口改造起:把 3 个独立远程调用并行化,再加上 1 个超时降级和 1 个异常兜底。先跑通,再逐步抽象。这个节奏通常最稳。


分享到:

上一篇
《Web逆向实战:基于浏览器开发者工具定位并还原前端加密请求参数的完整方法-85》
下一篇
《Java 中基于 CompletableFuture 与线程池的异步任务编排实战与性能优化》