跳转到内容
123xiao | 无名键客

《Java 中基于 CompletableFuture 的异步编排实战:并行任务、超时控制与异常处理》

字数: 0 阅读时长: 1 分钟

Java 中基于 CompletableFuture 的异步编排实战:并行任务、超时控制与异常处理

在 Java 后端开发里,很多业务接口本质上都不是“算得慢”,而是“等得多”:等数据库、等缓存、等第三方服务、等消息结果。

如果这些等待是串行的,接口耗时就会被一层层叠加;如果处理不好异常和超时,异步代码还会变成“看起来很高级,排查起来很痛苦”的事故现场。CompletableFuture 正好就是解决这类问题的常用工具。

这篇文章我会从一个真实感比较强的聚合查询场景切入,带你一步步完成:

  • 并行执行多个任务
  • 设置超时与降级
  • 收拢异常,避免线程悄悄失败
  • 写出可运行、可维护、可排查的异步编排代码

背景与问题

假设我们要实现一个“用户主页聚合接口”,需要同时获取:

  1. 用户基本信息
  2. 用户订单摘要
  3. 用户优惠券信息

如果串行执行,伪代码大概是这样:

UserProfile profile = userService.getProfile(userId);
OrderSummary orders = orderService.getOrderSummary(userId);
CouponInfo coupons = couponService.getCoupons(userId);
return assemble(profile, orders, coupons);

问题很直接:

  • 每个远程调用都要几十到几百毫秒
  • 总耗时接近三者之和
  • 任意一个服务慢或失败,整体接口就会被拖住

这时候,异步编排的目标就不是“炫技”,而是三个字:更稳、更快


前置知识与环境准备

本文示例基于:

  • JDK 9+(因为会演示 orTimeout / completeOnTimeout
  • 标准库 java.util.concurrent.CompletableFuture
  • 示例代码可直接放在一个 main 方法里运行

如果你用的是 JDK 8,也能做同样的事情,只是超时控制需要手动封装,文中后面会提到替代思路。


核心原理

1. CompletableFuture 解决的是什么问题

CompletableFuture 同时具备两层能力:

  • Future:代表一个“未来会完成”的结果
  • CompletionStage:支持把多个异步阶段串起来做编排

它特别适合下面几类场景:

  • 并行拉取多个独立数据源
  • 某个任务成功后继续做下游处理
  • 给异步链路加统一异常兜底
  • 在整体结果返回前完成聚合

2. 常见编排方式怎么选

supplyAsync

有返回值的异步任务。

CompletableFuture<UserProfile> future = CompletableFuture.supplyAsync(() -> userService.getProfile(userId), executor);

runAsync

无返回值任务,比如异步记录日志、发送通知。

thenApply

对上一步结果做同步转换。

thenCompose

把“异步套异步”摊平,避免 CompletableFuture<CompletableFuture<T>>

thenCombine

合并两个独立任务的结果。

allOf

等待一组任务都完成。

exceptionally / handle / whenComplete

做异常兜底、记录日志、统一收尾。


3. 一张图看懂基本流程

flowchart LR
    A[接收请求] --> B[并行查询用户信息]
    A --> C[并行查询订单摘要]
    A --> D[并行查询优惠券]
    B --> E[结果聚合]
    C --> E
    D --> E
    E --> F[返回响应]

4. 异常与超时在链路中的位置

很多同学第一次用 CompletableFuture,更关注“怎么并行”,却忽略了“谁来收异常、谁来做超时兜底”。实际上这两件事才决定代码是否能上生产。

sequenceDiagram
    participant Client as 调用方
    participant CF as CompletableFuture
    participant S1 as 用户服务
    participant S2 as 订单服务
    participant S3 as 优惠券服务

    Client->>CF: 发起聚合请求
    CF->>S1: 异步调用
    CF->>S2: 异步调用
    CF->>S3: 异步调用
    S1-->>CF: 返回结果
    S2-->>CF: 超时/异常
    S3-->>CF: 返回结果
    CF->>CF: 异常处理 + 降级
    CF-->>Client: 返回聚合结果

实战代码(可运行)

下面我们用一个完整示例,把并行任务、超时控制、异常处理放在一起。

1. 示例目标

实现一个 loadUserDashboard 方法:

  • 并行获取用户信息、订单摘要、优惠券
  • 订单服务如果超时,返回默认值
  • 优惠券服务如果抛异常,记录日志并降级
  • 最终拼装成一个统一结果

2. 完整代码

import java.time.LocalTime;
import java.util.concurrent.*;

public class CompletableFutureDemo {

    private static final ExecutorService BIZ_EXECUTOR = new ThreadPoolExecutor(
            4,
            8,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            r -> {
                Thread t = new Thread(r);
                t.setName("biz-exec-" + t.getId());
                return t;
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public static void main(String[] args) {
        try {
            Dashboard dashboard = loadUserDashboard(1001L);
            log("最终结果: " + dashboard);
        } finally {
            BIZ_EXECUTOR.shutdown();
        }
    }

    public static Dashboard loadUserDashboard(Long userId) {
        CompletableFuture<UserProfile> profileFuture =
                CompletableFuture.supplyAsync(() -> getUserProfile(userId), BIZ_EXECUTOR)
                        .orTimeout(800, TimeUnit.MILLISECONDS)
                        .exceptionally(ex -> {
                            log("用户信息查询失败: " + ex.getMessage());
                            return UserProfile.defaultProfile(userId);
                        });

        CompletableFuture<OrderSummary> orderFuture =
                CompletableFuture.supplyAsync(() -> getOrderSummary(userId), BIZ_EXECUTOR)
                        .completeOnTimeout(OrderSummary.empty(userId), 500, TimeUnit.MILLISECONDS)
                        .exceptionally(ex -> {
                            log("订单摘要查询失败: " + ex.getMessage());
                            return OrderSummary.empty(userId);
                        });

        CompletableFuture<CouponInfo> couponFuture =
                CompletableFuture.supplyAsync(() -> getCouponInfo(userId), BIZ_EXECUTOR)
                        .orTimeout(700, TimeUnit.MILLISECONDS)
                        .handle((result, ex) -> {
                            if (ex != null) {
                                log("优惠券查询失败,降级为空列表: " + ex.getMessage());
                                return CouponInfo.empty(userId);
                            }
                            return result;
                        });

        CompletableFuture<Dashboard> dashboardFuture =
                CompletableFuture.allOf(profileFuture, orderFuture, couponFuture)
                        .thenApply(v -> new Dashboard(
                                profileFuture.join(),
                                orderFuture.join(),
                                couponFuture.join()
                        ))
                        .whenComplete((result, ex) -> {
                            if (ex == null) {
                                log("聚合完成");
                            } else {
                                log("聚合失败: " + ex.getMessage());
                            }
                        });

        return dashboardFuture.join();
    }

    private static UserProfile getUserProfile(Long userId) {
        sleep(300);
        log("用户服务返回");
        return new UserProfile(userId, "Alice");
    }

    private static OrderSummary getOrderSummary(Long userId) {
        sleep(900);
        log("订单服务返回");
        return new OrderSummary(userId, 12);
    }

    private static CouponInfo getCouponInfo(Long userId) {
        sleep(200);
        log("优惠券服务开始处理");
        throw new RuntimeException("优惠券服务不可用");
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("线程被中断", e);
        }
    }

    private static void log(String msg) {
        System.out.println(LocalTime.now() + " [" + Thread.currentThread().getName() + "] " + msg);
    }

    static class Dashboard {
        private final UserProfile profile;
        private final OrderSummary orderSummary;
        private final CouponInfo couponInfo;

        public Dashboard(UserProfile profile, OrderSummary orderSummary, CouponInfo couponInfo) {
            this.profile = profile;
            this.orderSummary = orderSummary;
            this.couponInfo = couponInfo;
        }

        @Override
        public String toString() {
            return "Dashboard{" +
                    "profile=" + profile +
                    ", orderSummary=" + orderSummary +
                    ", couponInfo=" + couponInfo +
                    '}';
        }
    }

    static class UserProfile {
        private final Long userId;
        private final String name;

        public UserProfile(Long userId, String name) {
            this.userId = userId;
            this.name = name;
        }

        public static UserProfile defaultProfile(Long userId) {
            return new UserProfile(userId, "UNKNOWN");
        }

        @Override
        public String toString() {
            return "UserProfile{" +
                    "userId=" + userId +
                    ", name='" + name + '\'' +
                    '}';
        }
    }

    static class OrderSummary {
        private final Long userId;
        private final int orderCount;

        public OrderSummary(Long userId, int orderCount) {
            this.userId = userId;
            this.orderCount = orderCount;
        }

        public static OrderSummary empty(Long userId) {
            return new OrderSummary(userId, 0);
        }

        @Override
        public String toString() {
            return "OrderSummary{" +
                    "userId=" + userId +
                    ", orderCount=" + orderCount +
                    '}';
        }
    }

    static class CouponInfo {
        private final Long userId;
        private final int availableCoupons;

        public CouponInfo(Long userId, int availableCoupons) {
            this.userId = userId;
            this.availableCoupons = availableCoupons;
        }

        public static CouponInfo empty(Long userId) {
            return new CouponInfo(userId, 0);
        }

        @Override
        public String toString() {
            return "CouponInfo{" +
                    "userId=" + userId +
                    ", availableCoupons=" + availableCoupons +
                    '}';
        }
    }
}

3. 这段代码里最关键的几个点

并行执行

三段 supplyAsync(...) 是独立启动的,不会相互等待。

订单超时降级

.completeOnTimeout(OrderSummary.empty(userId), 500, TimeUnit.MILLISECONDS)

意思是:如果 500ms 内没拿到结果,就直接用默认值完成。

这很适合“非核心字段”,比如推荐位、营销信息、统计摘要。

优惠券异常兜底

.handle((result, ex) -> {
    if (ex != null) {
        return CouponInfo.empty(userId);
    }
    return result;
})

handle 不管成功失败都会执行,因此很适合统一收口。

聚合阶段使用 allOf

CompletableFuture.allOf(profileFuture, orderFuture, couponFuture)

它本身只表示“都完成了”,不直接返回聚合结果,所以后面要用 join() 把每个 future 的结果取出来。


逐步验证清单

如果你准备把这套写法放进项目,我建议按下面顺序自己跑一遍:

  1. 三个任务都成功,确认是并行执行
  2. 让其中一个任务抛异常,确认是否能正常降级
  3. 让其中一个任务超时,确认响应时间是否被控制住
  4. 把线程池大小调小,看是否出现排队
  5. 观察日志里是否能定位到底哪个阶段慢、哪个阶段失败

这是我自己比较常用的一套验证方式,能尽早发现“代码能跑,但线上不稳”的问题。


核心原理再深入一点:thenApplythenComposethenCombine

这三个方法很容易混。

thenApply:同步转换结果

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> "alice")
                .thenApply(String::toUpperCase);

输入一个值,输出另一个值,但这一步本身不是新的异步任务。


thenCompose:串联异步任务

比如先查用户,再根据用户等级查推荐:

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> "VIP")
                .thenCompose(level ->
                        CompletableFuture.supplyAsync(() -> "recommend for " + level)
                );

如果你用 thenApply 返回另一个 CompletableFuture,就会得到嵌套结构,不好处理;thenCompose 就是专门拿来“拆平”的。


thenCombine:合并两个独立异步结果

CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> "B");

CompletableFuture<String> combined = a.thenCombine(b, (x, y) -> x + y);

当两个任务互不依赖,但结果要一起使用时,thenCombine 很顺手。


一张图理解三者区别

flowchart TD
    A[上游结果] --> B[thenApply: 同步转换]
    A --> C[thenCompose: 继续发起异步任务]
    D[任务1结果] --> E[thenCombine: 合并两个结果]
    F[任务2结果] --> E

常见坑与排查

这一节非常重要。我见过不少项目用了 CompletableFuture,结果问题不是出在 API 不会用,而是出在细节上。

坑 1:默认线程池用得太随意

如果你没显式传 executor,很多异步任务会落到 ForkJoinPool.commonPool()

这在 demo 里没问题,在生产环境里常常有风险:

  • 任务类型混杂
  • 有阻塞 I/O
  • 线程数不受控
  • 某一类任务把公共线程池占满

建议:业务异步任务用独立线程池,不要全靠默认池。


坑 2:allOf 完成了,不代表你拿到了业务结果

很多人第一次写会这样:

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();

然后以为结果已经有了。其实 allOf 只保证“都结束”,具体值还得自己取:

Result r1 = f1.join();
Result r2 = f2.join();
Result r3 = f3.join();

坑 3:join()get() 混着用,异常处理变乱

  • get() 抛的是受检异常:InterruptedException, ExecutionException
  • join() 抛的是运行时异常:CompletionException

在链路里我更常用 join(),代码更干净;但前提是你要知道异常被包了一层。

排查时建议看根因:

try {
    future.join();
} catch (CompletionException e) {
    Throwable cause = e.getCause();
    cause.printStackTrace();
}

坑 4:超时只是 Future 超时,不一定能中断底层任务

这是一个非常容易误解的点。

orTimeout / completeOnTimeout 控制的是 CompletableFuture 的完成状态,不一定能真正取消底层正在执行的远程调用。

也就是说:

  • 调用方可能已经返回默认值了
  • 但底层线程可能还在跑
  • 如果底层是阻塞 I/O,没有超时配置,资源仍然会继续占着

所以,应用层 Future 超时RPC/HTTP/数据库客户端超时 必须一起配。


坑 5:异常被吞掉,日志里什么都没有

比如:

CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("boom");
});

如果你既不 join/get,也不加异常处理,这个异常很可能不会以你预期的方式暴露出来。

建议

  • 每条重要链路都要有 exceptionally / handle / whenComplete
  • 日志要带请求 ID、用户 ID、阶段名
  • 不要让关键任务“悄悄失败”

一个实用的排查思路

当异步聚合接口变慢时,我一般按这个顺序查:

  1. 线程池是否打满
  2. 队列是否堆积
  3. 是不是某个下游服务频繁超时
  4. 异常是否被统一记录
  5. 降级逻辑是否真的生效
  6. 底层 HTTP/RPC 客户端有没有设置连接和读超时

如果只看接口总耗时,不拆阶段,往往很难定位。


安全/性能最佳实践

1. 为不同类型任务隔离线程池

不要把数据库查询、HTTP 调用、CPU 密集计算全扔进一个线程池。

常见建议:

  • I/O 密集型:线程数可适度大一些
  • CPU 密集型:线程数接近 CPU 核数
  • 核心链路与非核心链路分池

这样一个推荐服务变慢,不至于把订单主链路拖死。


2. 线程池参数不要只会“无脑加大”

很多事故不是线程太少,而是线程太多导致:

  • 上下文切换严重
  • 下游被打爆
  • 队列堆积更隐蔽
  • GC 压力增大

建议至少明确这几个参数的含义:

  • corePoolSize
  • maximumPoolSize
  • queueCapacity
  • RejectedExecutionHandler

并结合压测数据调,不要拍脑袋。


3. 给每个下游设置边界时间

如果一个聚合接口总 SLA 是 800ms,你不能给每个下游都配 800ms 超时,那样整体肯定收不住。

更合理的思路是:

  • 用户信息:300ms
  • 订单摘要:200ms
  • 优惠券:150ms
  • 预留聚合与序列化开销

要有整体预算意识,而不是每个服务“各超各的”。


4. 区分核心数据和可降级数据

不是所有失败都该一视同仁。

例如:

  • 用户实名认证状态:核心,不能乱降级
  • 推荐商品:可降级
  • 营销角标:可直接返回空

如果核心字段失败却被你悄悄吞掉,业务可能比接口报错更危险。


5. 日志和监控要围绕“阶段”来做

我很建议给异步编排加这种维度:

  • 阶段名:profile/order/coupon
  • 开始时间、结束时间
  • 是否超时
  • 是否降级
  • 异常类型
  • 线程池活跃线程数、队列长度

这样你在监控平台上会很容易看出:到底是线程池问题,还是某个下游慢。


6. 注意上下文传递

在线上项目里,你可能还需要传递这些上下文:

  • TraceId
  • 用户身份
  • 租户信息
  • MDC 日志上下文

CompletableFuture 切线程后,这些上下文默认不一定自动带过去。如果项目依赖链路追踪或审计日志,这一点一定要补齐。


一个更贴近生产的编排建议

如果你要做的是接口聚合,我通常建议按下面这个原则组织:

  1. 先划分核心与非核心任务
  2. 核心任务失败直接失败或明确报错
  3. 非核心任务允许超时降级
  4. 每个任务独立记录耗时与异常
  5. 统一在聚合层收口,不要到处 scattered 地 try-catch

这会让代码结构清晰很多。异步编排最怕的不是 API 多,而是逻辑散。


什么时候不适合用 CompletableFuture

虽然它很好用,但也不是所有场景都适合。

不太适合的情况

  • 链路极其复杂,分支和回滚很多
  • 需要强事务一致性
  • 大量流式处理、背压控制
  • 团队对异步调试经验不足

这时候可以考虑:

  • 更清晰的任务编排框架
  • 响应式方案
  • 消息队列解耦
  • 显式工作流引擎

CompletableFuture 更适合中等复杂度、单机内编排、请求内聚合这类问题。


总结

CompletableFuture 的价值,不只是“把几个任务并行起来”,而是帮我们把异步流程写成可组合、可控制、可降级的结构。

这篇文章你可以直接带走的几个实用结论是:

  • 并行任务:独立任务用 supplyAsync + allOf
  • 结果组合:简单场景用 thenCombine,复杂聚合用 allOfjoin
  • 超时控制:优先用 orTimeout / completeOnTimeout
  • 异常处理:链路里至少放一个 handleexceptionallywhenComplete
  • 线程池:生产环境尽量显式配置,不要依赖默认公共线程池
  • 边界意识:Future 超时不等于底层调用真的停了,RPC/HTTP 客户端超时也要配

如果你现在正准备把一个串行聚合接口改造成异步版,我建议你按这个顺序落地:

  1. 先并行化独立调用
  2. 再补超时
  3. 再补异常与降级
  4. 最后加监控与线程池治理

一步一步来,比一上来写一大串链式调用更稳。
我自己踩坑后最大的感受就是:异步代码写得快不难,写得稳才值钱。


分享到:

上一篇
《区块链智能合约安全审计实战:从常见漏洞识别到自动化检测流程搭建》
下一篇
《区块链节点数据索引与查询优化实战:面向中级开发者的架构设计与性能调优》