跳转到内容
123xiao | 无名键客

《微服务架构下的分布式事务实战:基于 Saga 模式的设计、落地与避坑》

字数: 0 阅读时长: 1 分钟

微服务架构下的分布式事务实战:基于 Saga 模式的设计、落地与避坑

在单体应用里,事务这件事通常没那么让人头疼:一个数据库连接、一个本地事务、commitrollback,世界看起来很简单。

但一旦进入微服务架构,订单、库存、支付、账户这些能力拆成了独立服务,原来“一把梭”的数据库事务就失效了。此时如果还想维持业务一致性,就必须面对一个现实:跨服务调用链上的一致性,本质上是一个分布式问题

这篇文章我想换一个更贴近落地的角度来讲:不是只讲概念,而是围绕“怎么设计、怎么写、怎么排查、哪里最容易翻车”来展开,重点放在 Saga 模式。如果你已经知道 2PC、TCC、消息最终一致性这些词,但落到项目里总感觉“差临门一脚”,那这篇文章会比较适合你。


背景与问题

我们先看一个非常典型的业务流程:下单。

  1. 订单服务创建订单
  2. 库存服务冻结库存
  3. 支付服务扣款
  4. 订单服务改为已支付

在单体里,这几个步骤可能都落在一个数据库事务里。但在微服务中,它们可能分散在不同服务、不同数据库,甚至不同团队维护的系统里。

为什么不能直接用分布式数据库事务?

理论上可以考虑 XA / 2PC,但在大多数互联网业务里,这条路往往越走越窄:

  • 协调器复杂,链路长,性能开销大
  • 参与方必须支持协议,异构系统接入麻烦
  • 一旦某个节点卡住,整体阻塞
  • 对可用性不友好,不符合微服务“自治”理念

所以很多团队最终会接受一个事实:

在微服务架构下,强一致往往不是默认选项,最终一致性才是更常见的工程解法。

而 Saga,就是实现最终一致性的经典模式之一。

一个现实问题:下单失败后怎么“回滚”?

假设流程如下:

  • 订单创建成功
  • 库存冻结成功
  • 支付扣款失败

这时你没法像本地事务一样直接把前面步骤全部数据库回滚掉。因为它们已经分别提交到各自的服务里了。

Saga 的核心思路就是:

  • 把一个长事务拆成多个本地事务
  • 每个本地事务执行成功后继续下一步
  • 如果中间某步失败,则按相反顺序执行补偿操作

比如:

  • 创建订单 → 补偿:取消订单
  • 冻结库存 → 补偿:释放库存
  • 扣款成功 → 补偿:退款

这不是数据库层面的回滚,而是业务层面的反向操作


方案对比与取舍分析

在真正选型前,先把 Saga 放到常见方案里看一下。

方案一致性实现复杂度性能适用场景
2PC / XA强一致较差传统中台、强依赖单库生态
TCC强一致偏业务控制很高中等核心资金、对一致性要求极高
Saga最终一致中等较好订单、库存、流程型业务
本地消息表 + 消费重试最终一致中等异步型流程、事件驱动系统

为什么很多团队最后会选 Saga?

因为它在工程上比较平衡:

  • 不要求所有服务支持分布式事务协议
  • 本地事务边界清晰
  • 对性能和可用性更友好
  • 与微服务自治理念相容

Saga 的边界条件

但 Saga 不是万能药,下面这些场景要谨慎:

  • 资金强一致:比如核心清结算,光靠补偿未必足够
  • 补偿代价极高:例如已经触发外部不可逆动作
  • 长时间悬挂不可接受:比如库存冻结太久会影响销售

我的经验是:Saga 很适合流程性业务,但不适合所有关键业务。


核心原理

Saga 主要有两种组织方式:

  1. 编排式(Orchestration):由一个中心协调器驱动每一步
  2. 协同式(Choreography):各服务通过事件自行协作

对于中级读者、以及大多数业务团队,我更建议先从 编排式 Saga 入手,因为它更直观、更易排查。

1. 编排式 Saga 流程图

flowchart TD
    A[开始下单] --> B[订单服务: 创建订单 PENDING]
    B --> C[库存服务: 冻结库存]
    C --> D[支付服务: 扣款]
    D --> E[订单服务: 更新订单 PAID]

    C -->|失败| C1[补偿: 取消订单]
    D -->|失败| D1[补偿: 释放库存]
    D1 --> D2[补偿: 取消订单]

这里要抓住一个重点:

Saga 的事务边界不是“一次性提交”,而是“每一步本地提交 + 失败补偿”。

2. Saga 状态机

如果你只把 Saga 当成“调用几个接口然后异常时反着调回来”,项目很快就会乱。更稳妥的做法,是把它显式建模成状态机。

stateDiagram-v2
    [*] --> Pending
    Pending --> OrderCreated: 创建订单成功
    OrderCreated --> InventoryReserved: 冻结库存成功
    InventoryReserved --> PaymentCompleted: 扣款成功
    PaymentCompleted --> Done: 更新订单完成

    OrderCreated --> Compensating: 冻结库存失败
    InventoryReserved --> Compensating: 扣款失败
    Compensating --> Cancelled: 补偿完成
    Compensating --> CompensationFailed: 补偿失败

Saga 设计里的几个关键概念

1)正向动作

就是正常业务步骤,比如:

  • 创建订单
  • 冻结库存
  • 扣减余额

2)补偿动作

在后续步骤失败时,用来抵消之前动作的业务操作,比如:

  • 取消订单
  • 解冻库存
  • 退款

注意,补偿不一定等于完全回到原始状态。例如支付退款可能存在渠道延迟,这就是 Saga 与本地事务最大的不同。

3)幂等性

这个我建议你务必重视。因为分布式环境里:

  • 调用可能超时但其实成功了
  • 消息可能重复投递
  • 协调器重试可能导致重复补偿

所以每个正向动作和补偿动作都必须支持幂等。否则一次重试,可能把库存多扣一次、退款多退一次。

4)可观测性

Saga 没有可观测性几乎不可维护。你至少要能看到:

  • 当前 Saga 实例到哪一步了
  • 哪一步失败了
  • 补偿是否完成
  • 是否正在重试
  • 是否进入人工介入队列

实战设计:一个最小可运行的 Saga 示例

为了把重点放在机制上,下面我用 Python 写一个可直接运行的简化版示例。它模拟三个服务:

  • OrderService
  • InventoryService
  • PaymentService

再由一个 SagaOrchestrator 负责驱动流程和补偿。

目标场景

用户提交订单后:

  1. 创建订单
  2. 冻结库存
  3. 扣减账户余额
  4. 任一步失败,就执行补偿

代码示例

from dataclasses import dataclass
from typing import Dict, Set


class BusinessError(Exception):
    pass


@dataclass
class Order:
    order_id: str
    user_id: str
    product_id: str
    amount: int
    status: str


class OrderService:
    def __init__(self):
        self.orders: Dict[str, Order] = {}
        self.processed_create: Set[str] = set()
        self.processed_cancel: Set[str] = set()
        self.processed_paid: Set[str] = set()

    def create_order(self, request_id: str, order_id: str, user_id: str, product_id: str, amount: int):
        if request_id in self.processed_create:
            return self.orders[order_id]

        if order_id in self.orders:
            raise BusinessError(f"order {order_id} already exists")

        order = Order(order_id, user_id, product_id, amount, "PENDING")
        self.orders[order_id] = order
        self.processed_create.add(request_id)
        print(f"[OrderService] create order success: {order_id}")
        return order

    def mark_paid(self, request_id: str, order_id: str):
        if request_id in self.processed_paid:
            return

        order = self.orders.get(order_id)
        if not order:
            raise BusinessError(f"order {order_id} not found")

        if order.status == "CANCELLED":
            raise BusinessError(f"order {order_id} already cancelled")

        order.status = "PAID"
        self.processed_paid.add(request_id)
        print(f"[OrderService] mark paid success: {order_id}")

    def cancel_order(self, request_id: str, order_id: str):
        if request_id in self.processed_cancel:
            return

        order = self.orders.get(order_id)
        if not order:
            return

        if order.status != "PAID":
            order.status = "CANCELLED"
            print(f"[OrderService] cancel order success: {order_id}")

        self.processed_cancel.add(request_id)


class InventoryService:
    def __init__(self):
        self.stock: Dict[str, int] = {}
        self.reserved: Dict[str, int] = {}
        self.processed_reserve: Set[str] = set()
        self.processed_release: Set[str] = set()

    def add_stock(self, product_id: str, count: int):
        self.stock[product_id] = self.stock.get(product_id, 0) + count

    def reserve(self, request_id: str, order_id: str, product_id: str, count: int):
        if request_id in self.processed_reserve:
            return

        available = self.stock.get(product_id, 0)
        if available < count:
            raise BusinessError(f"insufficient stock for {product_id}")

        self.stock[product_id] -= count
        self.reserved[order_id] = self.reserved.get(order_id, 0) + count
        self.processed_reserve.add(request_id)
        print(f"[InventoryService] reserve success: order={order_id}, count={count}")

    def release(self, request_id: str, order_id: str, product_id: str):
        if request_id in self.processed_release:
            return

        count = self.reserved.get(order_id, 0)
        if count > 0:
            self.stock[product_id] = self.stock.get(product_id, 0) + count
            self.reserved[order_id] = 0
            print(f"[InventoryService] release success: order={order_id}, count={count}")

        self.processed_release.add(request_id)


class PaymentService:
    def __init__(self):
        self.balance: Dict[str, int] = {}
        self.paid_orders: Set[str] = set()
        self.refunded_orders: Set[str] = set()
        self.processed_pay: Set[str] = set()
        self.processed_refund: Set[str] = set()

    def add_balance(self, user_id: str, amount: int):
        self.balance[user_id] = self.balance.get(user_id, 0) + amount

    def pay(self, request_id: str, order_id: str, user_id: str, amount: int, fail=False):
        if request_id in self.processed_pay:
            return

        if fail:
            raise BusinessError("payment gateway timeout")

        if self.balance.get(user_id, 0) < amount:
            raise BusinessError("insufficient balance")

        self.balance[user_id] -= amount
        self.paid_orders.add(order_id)
        self.processed_pay.add(request_id)
        print(f"[PaymentService] pay success: order={order_id}, amount={amount}")

    def refund(self, request_id: str, order_id: str, user_id: str, amount: int):
        if request_id in self.processed_refund:
            return

        if order_id in self.paid_orders and order_id not in self.refunded_orders:
            self.balance[user_id] = self.balance.get(user_id, 0) + amount
            self.refunded_orders.add(order_id)
            print(f"[PaymentService] refund success: order={order_id}, amount={amount}")

        self.processed_refund.add(request_id)


class SagaOrchestrator:
    def __init__(self, order_service: OrderService, inventory_service: InventoryService, payment_service: PaymentService):
        self.order_service = order_service
        self.inventory_service = inventory_service
        self.payment_service = payment_service

    def create_order_saga(self, saga_id: str, order_id: str, user_id: str, product_id: str, amount: int, payment_fail=False):
        completed_steps = []

        try:
            self.order_service.create_order(f"{saga_id}:create_order", order_id, user_id, product_id, amount)
            completed_steps.append("create_order")

            self.inventory_service.reserve(f"{saga_id}:reserve_inventory", order_id, product_id, 1)
            completed_steps.append("reserve_inventory")

            self.payment_service.pay(f"{saga_id}:pay", order_id, user_id, amount, fail=payment_fail)
            completed_steps.append("pay")

            self.order_service.mark_paid(f"{saga_id}:mark_paid", order_id)
            completed_steps.append("mark_paid")

            print(f"[Saga] success: {saga_id}")

        except Exception as e:
            print(f"[Saga] failed: {saga_id}, reason={e}")
            self.compensate(saga_id, order_id, user_id, product_id, amount, completed_steps)

    def compensate(self, saga_id: str, order_id: str, user_id: str, product_id: str, amount: int, completed_steps):
        for step in reversed(completed_steps):
            try:
                if step == "pay":
                    self.payment_service.refund(f"{saga_id}:refund", order_id, user_id, amount)
                elif step == "reserve_inventory":
                    self.inventory_service.release(f"{saga_id}:release_inventory", order_id, product_id)
                elif step == "create_order":
                    self.order_service.cancel_order(f"{saga_id}:cancel_order", order_id)
            except Exception as e:
                print(f"[Saga] compensation failed at step={step}, reason={e}")

        print(f"[Saga] compensation done: {saga_id}")


if __name__ == "__main__":
    order_service = OrderService()
    inventory_service = InventoryService()
    payment_service = PaymentService()

    inventory_service.add_stock("SKU-1", 10)
    payment_service.add_balance("U1001", 100)

    orchestrator = SagaOrchestrator(order_service, inventory_service, payment_service)

    print("=== case 1: success ===")
    orchestrator.create_order_saga(
        saga_id="SAGA-001",
        order_id="O-001",
        user_id="U1001",
        product_id="SKU-1",
        amount=30,
        payment_fail=False
    )

    print("\n=== case 2: payment fail, compensation ===")
    orchestrator.create_order_saga(
        saga_id="SAGA-002",
        order_id="O-002",
        user_id="U1001",
        product_id="SKU-1",
        amount=30,
        payment_fail=True
    )

    print("\n=== final state ===")
    print("orders:", order_service.orders)
    print("stock:", inventory_service.stock)
    print("reserved:", inventory_service.reserved)
    print("balance:", payment_service.balance)

如何运行

python saga_demo.py

你会看到什么

  • 第一笔订单成功完成
  • 第二笔订单在支付阶段失败
  • 系统自动释放库存、取消订单

这个示例虽然简化了很多工程细节,但已经包含了 Saga 落地最关键的几个点:

  • 本地事务拆分
  • 补偿逻辑
  • 逆序补偿
  • 幂等控制
  • 最终一致性

时序图:成功与失败补偿

有些同学看到代码还是会觉得流程不够立体,我们再用时序图梳理一下。

sequenceDiagram
    participant C as Client
    participant S as SagaOrchestrator
    participant O as OrderService
    participant I as InventoryService
    participant P as PaymentService

    C->>S: 创建订单请求
    S->>O: create_order
    O-->>S: success
    S->>I: reserve_inventory
    I-->>S: success
    S->>P: pay
    alt 支付成功
        P-->>S: success
        S->>O: mark_paid
        O-->>S: success
        S-->>C: success
    else 支付失败
        P-->>S: fail
        S->>I: release_inventory
        I-->>S: success
        S->>O: cancel_order
        O-->>S: success
        S-->>C: failed but compensated
    end

落地时真正要设计的东西

上面的示例能跑,但要进生产,还差几个关键设计。

1. Saga 日志表 / 状态表

我强烈建议不要只靠内存记录流程状态。你需要一张持久化表,至少记录:

  • saga_id
  • biz_id(如 order_id)
  • state
  • current_step
  • retry_count
  • last_error
  • updated_at

示意 SQL:

CREATE TABLE saga_instance (
    saga_id VARCHAR(64) PRIMARY KEY,
    biz_id VARCHAR(64) NOT NULL,
    state VARCHAR(32) NOT NULL,
    current_step VARCHAR(64),
    retry_count INT DEFAULT 0,
    last_error TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

步骤日志表也建议有:

CREATE TABLE saga_step_log (
    id BIGSERIAL PRIMARY KEY,
    saga_id VARCHAR(64) NOT NULL,
    step_name VARCHAR(64) NOT NULL,
    action_type VARCHAR(16) NOT NULL, -- forward / compensate
    step_status VARCHAR(32) NOT NULL, -- success / failed / retrying
    request_id VARCHAR(128) NOT NULL,
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

这两张表会在你线上排查时救命。真的,不夸张。

2. 业务状态不要偷懒

订单状态至少不要只有“已创建”和“已支付”,建议显式区分:

  • PENDING
  • RESERVED
  • PAYING
  • PAID
  • CANCELLING
  • CANCELLED
  • COMPENSATION_FAILED

状态越清楚,排查越轻松。很多线上事故,本质不是业务失败,而是状态表达不完整

3. 超时与重试策略

分布式系统里,“失败”不止一种:

  • 真失败
  • 假失败(超时,但远端其实成功)
  • 暂时失败(网络抖动)
  • 永久失败(库存不足)

所以不能一刀切。

建议按错误类型分层处理:

  • 可重试错误:超时、连接失败、限流
  • 不可重试错误:余额不足、库存不足、参数非法
  • 待确认错误:远端未知结果,需要查询状态接口

常见坑与排查

这一节我尽量讲得“像线上真实会遇到的事”。

1. 补偿接口不幂等

现象

  • 订单已经取消,重试时又取消一次
  • 库存已经释放,又重复加回
  • 已退款订单再次退款

根因

补偿接口往往容易被当成“失败兜底逻辑”,没有像正向接口那样严肃设计。

解决建议

  • 每一步动作都带唯一 request_id
  • 为正向与补偿分别做幂等记录
  • 补偿逻辑以“是否已补偿”为准,而不是“我猜你没补偿过”

2. 空补偿与悬挂问题

这是 Saga/TCC 里都常见的问题。

空补偿

补偿请求先到了,但正向请求其实没成功执行。

例如:

  • 协调器以为库存冻结成功了,触发释放库存
  • 实际上冻结请求根本没执行成功

如果补偿接口直接“加库存”,就会出错。

悬挂

补偿已经执行完了,但迟到的正向请求又来了。

解决建议

  • 每个步骤都记录事务状态
  • 补偿前先校验正向操作是否真的生效
  • 对已取消、已补偿状态禁止再执行正向动作

简单说,不要把补偿接口写成“盲回滚”


3. 只做了重试,没做状态查询

现象

调用支付接口超时后,系统不断重试扣款。

根因

网络超时不代表业务没成功。远端可能已经扣款成功,但响应没回来。

解决建议

优先采用“三段式”策略:

  1. 发起请求
  2. 超时后查询远端状态
  3. 确认未成功再重试

很多重复扣款事故,都是因为把“超时”直接等同于“失败”。


4. 补偿失败后没有兜底

现象

支付失败后,库存释放也失败,Saga 卡在半路。

根因

只设计了“理想中的补偿路径”,没设计“补偿失败怎么办”。

解决建议

  • 补偿失败进入 COMPENSATION_FAILED
  • 放入重试队列
  • 设置告警
  • 支持人工干预工具
  • 保留操作审计日志

Saga 的关键不是不失败,而是失败后还能收敛。


5. 业务动作不可逆,补偿根本补不回来

比如你已经调用了第三方短信、邮件、物流下发、券码核销等外部动作,这些往往无法真正撤销。

解决建议

  • 不可逆动作尽量放在 Saga 末尾
  • 或者先做“预留/草稿”,最终成功再确认
  • 对外部系统要定义“撤销/冲正/关闭”语义,而不是假设它能像数据库一样回滚

这一点我踩过坑:当时某个通知服务在支付前就发了“支付成功短信”,后面订单补偿回滚了,用户先高兴后投诉,体验极差。


安全/性能最佳实践

Saga 不只是事务问题,也牵涉安全、可用性和吞吐。

1. 安全最佳实践

接口鉴权与签名

Saga 步骤接口通常是内部调用,但“内部接口”不等于“可以裸奔”。

建议至少具备:

  • 服务间鉴权
  • 请求签名或 token 校验
  • 防重放机制
  • 请求链路追踪 ID

最小化敏感数据传递

不要在 Saga 上下文里到处传用户完整隐私数据、支付卡信息等。只传:

  • 业务主键
  • 必要金额
  • 状态标识
  • 脱敏信息

审计日志

对以下动作保留审计:

  • 扣款
  • 退款
  • 手工补偿
  • 状态强制变更

特别是人工介入场景,必须可追踪“谁在什么时间做了什么”。


2. 性能最佳实践

缩短 Saga 生命周期

Saga 持续时间越长,不确定性越大。

建议:

  • 把强依赖步骤尽量前置
  • 把通知类、报表类动作异步化
  • 减少长时间人工确认步骤夹在中间

降低同步调用深度

不要让一个 Saga 一口气串十几个同步 RPC。调用链越长,失败面越大。

经验上可以考虑:

  • 核心链路 3~5 步以内
  • 非关键动作改事件异步消费
  • 对外部依赖设置熔断与隔离

控制重试风暴

重试是必要的,但要有限度。

建议策略:

  • 指数退避
  • 最大重试次数
  • 死信队列
  • 错误分类重试

否则某个下游抖动时,重试流量会把它彻底打垮。


3. 容量估算思路

架构设计不能只讲模式,不讲容量。

假设:

  • 峰值下单 3000 TPS
  • 每个下单 Saga 包含 4 个正向步骤
  • 平均 2% 进入补偿
  • 每个补偿平均 2 步

那么每秒调用量大致为:

正向调用 = 3000 × 4 = 12000 次/秒
补偿调用 = 3000 × 2% × 2 = 120 次/秒
总调用约 = 12120 次/秒

再叠加:

  • 重试流量
  • 状态查询流量
  • 日志写入
  • 告警与监控采样

你会发现,Saga 协调器和相关基础设施也需要容量规划,不能只盯业务服务本身。


一套更稳的生产落地建议

如果你准备把 Saga 真正用进项目,我建议按下面顺序推进:

  1. 先选编排式 Saga
    • 方便统一管理和排查
  2. 先做 1 条核心链路
    • 如下单-库存-支付
  3. 所有步骤必须幂等
    • 正向和补偿都一样
  4. 建立状态表和步骤日志
    • 不然线上定位会非常痛苦
  5. 区分错误类型
    • 重试、查询、补偿不要混成一锅
  6. 把人工介入设计进去
    • 不要幻想系统能 100% 自动闭环
  7. 先压测失败场景
    • 不仅测成功链路,还要测超时、重复、半成功

很多团队的问题不是“不懂 Saga”,而是只实现了 happy path。真正难的是失败路径。


总结

Saga 的价值,不在于“模拟出一个跨服务大事务”,而在于:

  • 接受微服务环境中的分布式现实
  • 用多个本地事务 + 补偿机制来实现最终一致性
  • 在一致性、性能、可用性之间做工程化平衡

如果你只记住三件事,我建议是这三条:

  1. 补偿不是数据库回滚,而是业务反向动作
  2. 幂等、状态机、日志,是 Saga 落地三件套
  3. Saga 适合流程型最终一致业务,不适合所有强一致场景

最后给一个很务实的建议:
如果你的业务刚开始做微服务,不要一上来就把 Saga 设计得特别“宏大”。先围绕一条高价值链路做扎实,把失败、补偿、重试、排查工具都跑顺,再逐步推广。分布式事务这件事,最怕的不是复杂,而是“看起来已经实现了,其实并不可靠”。

只要把边界想清楚、状态建清楚、失败路径走通,Saga 依然是微服务架构下非常值得落地的一种分布式事务方案。


分享到:

上一篇
《Java开发踩坑实战:排查并修复线程池误用导致的接口超时与内存飙升问题-456》
下一篇
《Java 中线程池参数调优与任务拒绝策略实战:从业务压测到生产配置落地》