跳转到内容
123xiao | 无名键客

《分布式架构中基于 Saga 模式的跨服务事务设计与落地实践-113》

字数: 0 阅读时长: 1 分钟

分布式架构中基于 Saga 模式的跨服务事务设计与落地实践

在单体应用里,事务这件事往往是“理所当然”的:一个数据库连接、一段 begin/commit/rollback,事情就结束了。可一旦进入微服务或分布式架构,订单、库存、支付、优惠券、积分分别由不同服务和不同数据库维护,传统本地事务就不够用了。

很多团队在这个阶段会先问一个问题:跨服务事务到底要不要追求强一致?

我自己的经验是,业务系统里绝大多数“交易链路”其实并不需要所有服务在同一时刻绝对一致,而是需要:

  • 最终能收敛到正确状态
  • 中间过程可观测、可重试、可补偿
  • 出问题时能定位、能止血、能人工兜底

这也是 Saga 模式特别适合的地方。


背景与问题

为什么本地事务不够了

假设一个下单流程包含以下步骤:

  1. 订单服务创建订单
  2. 库存服务扣减库存
  3. 支付服务冻结或扣款
  4. 营销服务核销优惠券
  5. 积分服务发放积分

如果这几个动作分散在多个服务、多个数据库中,就会面临几个典型问题:

  • 数据库事务无法跨服务传播
  • 网络调用可能超时、重复、乱序
  • 服务成功与响应成功不是一回事
  • 回滚往往不是技术回滚,而是业务补偿

举个很真实的例子:

  • 订单服务已经创建成功
  • 库存已扣减
  • 支付服务调用超时,但实际上支付已成功
  • 调用方误以为失败,开始回滚库存和订单

这时候如果没有统一的事务编排和幂等机制,系统就会进入非常难缠的“半成功状态”。

传统方案为什么不总适合

跨服务事务常见有几类思路:

方案一致性实现复杂度性能影响适用场景
2PC/XA强一致传统集中式系统、资源支持 XA
TCC较强很高核心资金链路、可显式预留资源
Saga最终一致较小订单、履约、营销等长流程业务
本地消息表/事件驱动最终一致解耦型异步业务

Saga 的核心优势在于:它承认分布式环境中的失败是常态,因此不强行做“跨库原子提交”,而是通过前向执行 + 失败补偿来完成业务闭环。


核心原理

Saga 可以粗略理解为:

把一个长事务拆成多个本地事务,每一步成功后进入下一步;如果中途失败,则按相反顺序执行补偿操作。

两种常见形态

1. 编排式 Saga(Orchestration)

由一个统一的 Saga 协调器负责驱动流程:

  • 调订单服务
  • 调库存服务
  • 调支付服务
  • 失败时统一触发补偿

优点:

  • 流程集中,可观测性强
  • 便于统一重试、超时控制、告警

缺点:

  • 协调器容易变成“流程大脑”
  • 对协调器可用性要求高

2. 协同式 Saga(Choreography)

每个服务通过事件驱动下一步:

  • 订单创建事件 -> 库存扣减
  • 库存扣减事件 -> 支付处理
  • 支付失败事件 -> 触发订单取消和库存回补

优点:

  • 服务解耦
  • 更符合事件驱动架构

缺点:

  • 流程散落在多个服务中
  • 排查链路和全局状态更复杂

对于中级工程团队,如果业务流程比较明确,我通常会优先建议从编排式 Saga 起步,因为落地、排查、治理都更直接。


方案对比与取舍分析

Saga 与 TCC 的取舍

很多人会把 Saga 和 TCC 混在一起。它们都处理分布式事务,但适用点不同。

维度SagaTCC
事务模型本地事务 + 补偿Try/Confirm/Cancel
对业务侵入
资源预留能力
一致性控制最终一致更强
落地成本相对较低较高

如果你的业务是:

  • 电商下单、营销核销、履约处理:Saga 很合适
  • 账户余额、支付扣款、资金冻结:更适合 TCC 或专门账务模型

一句话概括:

Saga 擅长“把事做完再修正”,TCC 擅长“先占住资源再确认”。

Saga 的边界条件

Saga 不是银弹。以下场景要慎用:

  • 补偿动作无法定义,或者补偿成本极高
  • 外部系统不支持幂等和状态查询
  • 某一步是不可逆动作,例如已经向第三方真实出款
  • 业务无法接受较长时间的中间不一致

Saga 事务流程设计

下面以“订单创建”流程为例:

  • 创建订单
  • 扣减库存
  • 扣减余额
  • 任一步失败,执行补偿:
    • 退款/解冻余额
    • 回补库存
    • 取消订单

流程图

flowchart TD
    A[用户提交订单] --> B[创建订单 PENDING]
    B --> C[扣减库存]
    C --> D[扣减余额]
    D --> E[订单确认 CONFIRMED]

    C -.失败.-> X1[取消订单]
    D -.失败.-> X2[回补库存]
    X2 --> X1

时序图

sequenceDiagram
    participant Client as 客户端
    participant Saga as Saga协调器
    participant Order as 订单服务
    participant Inventory as 库存服务
    participant Payment as 支付服务

    Client->>Saga: 提交创建订单
    Saga->>Order: createPendingOrder()
    Order-->>Saga: orderId
    Saga->>Inventory: deduct(orderId, sku, count)
    Inventory-->>Saga: success
    Saga->>Payment: debit(orderId, userId, amount)
    alt 支付成功
        Payment-->>Saga: success
        Saga->>Order: confirm(orderId)
        Order-->>Saga: success
        Saga-->>Client: 下单成功
    else 支付失败
        Payment-->>Saga: failed
        Saga->>Inventory: compensateDeduct(orderId)
        Inventory-->>Saga: success
        Saga->>Order: cancel(orderId)
        Order-->>Saga: success
        Saga-->>Client: 下单失败
    end

状态机建议

状态一定要设计清楚,不然补偿和重试很容易打架。

stateDiagram-v2
    [*] --> PENDING
    PENDING --> INVENTORY_DEDUCTED
    INVENTORY_DEDUCTED --> PAYMENT_DONE
    PAYMENT_DONE --> CONFIRMED
    PENDING --> CANCELED
    INVENTORY_DEDUCTED --> COMPENSATING
    COMPENSATING --> CANCELED
    COMPENSATING --> COMPENSATION_FAILED

实战代码(可运行)

下面我用 Python 做一个可运行的最小 Saga 编排示例。它不是生产级框架,但足够把核心逻辑讲透:

  • 每个服务维护自己的本地状态
  • Saga 协调器负责编排和补偿
  • 所有操作都做了简单幂等
  • 可以模拟失败

你可以直接保存为 saga_demo.py 运行。

from dataclasses import dataclass, field
from typing import Dict, Set
import uuid


@dataclass
class Order:
    order_id: str
    user_id: str
    sku: str
    count: int
    amount: int
    status: str = "PENDING"


class OrderService:
    def __init__(self):
        self.orders: Dict[str, Order] = {}
        self.created_ops: Set[str] = set()
        self.confirmed_ops: Set[str] = set()
        self.canceled_ops: Set[str] = set()

    def create_pending_order(self, op_id: str, user_id: str, sku: str, count: int, amount: int) -> str:
        if op_id in self.created_ops:
            # 幂等:重复请求返回原订单
            for oid, order in self.orders.items():
                if order.user_id == user_id and order.sku == sku and order.count == count and order.amount == amount:
                    return oid

        order_id = str(uuid.uuid4())
        self.orders[order_id] = Order(
            order_id=order_id,
            user_id=user_id,
            sku=sku,
            count=count,
            amount=amount,
            status="PENDING"
        )
        self.created_ops.add(op_id)
        print(f"[Order] create pending order: {order_id}")
        return order_id

    def confirm_order(self, op_id: str, order_id: str):
        if op_id in self.confirmed_ops:
            print(f"[Order] confirm idempotent: {order_id}")
            return

        order = self.orders[order_id]
        if order.status == "CANCELED":
            raise Exception("cannot confirm canceled order")
        order.status = "CONFIRMED"
        self.confirmed_ops.add(op_id)
        print(f"[Order] confirmed: {order_id}")

    def cancel_order(self, op_id: str, order_id: str):
        if op_id in self.canceled_ops:
            print(f"[Order] cancel idempotent: {order_id}")
            return

        order = self.orders[order_id]
        if order.status == "CONFIRMED":
            raise Exception("cannot cancel confirmed order")
        order.status = "CANCELED"
        self.canceled_ops.add(op_id)
        print(f"[Order] canceled: {order_id}")


class InventoryService:
    def __init__(self):
        self.stock = {"SKU-1": 10}
        self.deducted_ops: Set[str] = set()
        self.compensated_ops: Set[str] = set()

    def deduct(self, op_id: str, sku: str, count: int):
        if op_id in self.deducted_ops:
            print(f"[Inventory] deduct idempotent: {sku}, {count}")
            return

        remain = self.stock.get(sku, 0)
        if remain < count:
            raise Exception("insufficient stock")
        self.stock[sku] = remain - count
        self.deducted_ops.add(op_id)
        print(f"[Inventory] deducted: {sku}, count={count}, remain={self.stock[sku]}")

    def compensate_deduct(self, op_id: str, sku: str, count: int):
        if op_id in self.compensated_ops:
            print(f"[Inventory] compensate idempotent: {sku}, {count}")
            return

        self.stock[sku] = self.stock.get(sku, 0) + count
        self.compensated_ops.add(op_id)
        print(f"[Inventory] compensated: {sku}, count={count}, remain={self.stock[sku]}")


class PaymentService:
    def __init__(self):
        self.balance = {"U-1": 100}
        self.debited_ops: Set[str] = set()
        self.refunded_ops: Set[str] = set()

    def debit(self, op_id: str, user_id: str, amount: int, fail=False):
        if op_id in self.debited_ops:
            print(f"[Payment] debit idempotent: {user_id}, {amount}")
            return

        if fail:
            raise Exception("simulated payment failure")

        remain = self.balance.get(user_id, 0)
        if remain < amount:
            raise Exception("insufficient balance")
        self.balance[user_id] = remain - amount
        self.debited_ops.add(op_id)
        print(f"[Payment] debited: {user_id}, amount={amount}, remain={self.balance[user_id]}")

    def refund(self, op_id: str, user_id: str, amount: int):
        if op_id in self.refunded_ops:
            print(f"[Payment] refund idempotent: {user_id}, {amount}")
            return

        self.balance[user_id] = self.balance.get(user_id, 0) + amount
        self.refunded_ops.add(op_id)
        print(f"[Payment] refunded: {user_id}, amount={amount}, remain={self.balance[user_id]}")


class SagaCoordinator:
    def __init__(self, order_service: OrderService, inventory_service: InventoryService, payment_service: PaymentService):
        self.order_service = order_service
        self.inventory_service = inventory_service
        self.payment_service = payment_service

    def create_order(self, user_id: str, sku: str, count: int, amount: int, payment_fail=False):
        saga_id = str(uuid.uuid4())
        print(f"\n=== start saga: {saga_id} ===")

        order_id = None
        inventory_done = False
        payment_done = False

        try:
            order_id = self.order_service.create_pending_order(
                op_id=f"{saga_id}:create_order",
                user_id=user_id,
                sku=sku,
                count=count,
                amount=amount
            )

            self.inventory_service.deduct(
                op_id=f"{saga_id}:deduct_inventory",
                sku=sku,
                count=count
            )
            inventory_done = True

            self.payment_service.debit(
                op_id=f"{saga_id}:debit_payment",
                user_id=user_id,
                amount=amount,
                fail=payment_fail
            )
            payment_done = True

            self.order_service.confirm_order(
                op_id=f"{saga_id}:confirm_order",
                order_id=order_id
            )

            print(f"=== saga success: {saga_id} ===")
            return order_id

        except Exception as e:
            print(f"[Saga] failed: {e}, begin compensation")

            if payment_done:
                self.payment_service.refund(
                    op_id=f"{saga_id}:refund_payment",
                    user_id=user_id,
                    amount=amount
                )

            if inventory_done:
                self.inventory_service.compensate_deduct(
                    op_id=f"{saga_id}:compensate_inventory",
                    sku=sku,
                    count=count
                )

            if order_id:
                self.order_service.cancel_order(
                    op_id=f"{saga_id}:cancel_order",
                    order_id=order_id
                )

            print(f"=== saga compensated: {saga_id} ===")
            return None


if __name__ == "__main__":
    order_service = OrderService()
    inventory_service = InventoryService()
    payment_service = PaymentService()

    saga = SagaCoordinator(order_service, inventory_service, payment_service)

    print(">>> case1: success")
    oid1 = saga.create_order(user_id="U-1", sku="SKU-1", count=2, amount=30, payment_fail=False)

    print("\n>>> case2: payment fail")
    oid2 = saga.create_order(user_id="U-1", sku="SKU-1", count=3, amount=20, payment_fail=True)

    print("\n>>> final state")
    print("order ids:", oid1, oid2)
    print("orders:", order_service.orders)
    print("stock:", inventory_service.stock)
    print("balance:", payment_service.balance)

运行结果会看到什么

  • 第一笔订单成功:订单确认、库存减少、余额减少
  • 第二笔订单支付失败:库存回补、订单取消
  • 最终各服务状态能收敛

这就是 Saga 的最核心目标:不是每一步都不失败,而是失败后系统仍可恢复到业务可接受状态


落地时的关键设计点

代码只是骨架,真正上线时,下面这些设计点更重要。

1. 全局事务 ID

每个 Saga 必须有唯一 sagaId,并在所有服务间透传:

  • 用于日志串联
  • 用于幂等键
  • 用于重试与补偿定位

常见做法:

  • traceId 负责链路追踪
  • sagaId 负责业务事务
  • 子步骤再带 stepId/opId

2. 补偿不是回滚

很多人刚接触 Saga 时,脑子里还是数据库回滚思维。实际上补偿更接近:

  • 扣库存 -> 回补库存
  • 扣余额 -> 退款
  • 创建订单 -> 取消订单

注意,这些动作不一定能把数据恢复到“原封不动”的旧值,只是恢复到业务上合理的新状态

3. 幂等必须内建

我踩过一个坑:补偿任务超时后重试,结果库存被回补了两次,最后库存凭空增加。

所以每个动作都要支持幂等:

  • 正向动作幂等
  • 补偿动作幂等
  • 查询状态幂等
  • 消息投递重复可接受

常见实现:

  • 唯一业务键 + 去重表
  • 操作流水表
  • 状态机防重入
  • 乐观锁 / 唯一索引

4. 状态机优先于布尔值

不要把流程状态拆成一堆布尔字段,例如:

  • inventoryDeducted
  • paymentDone
  • couponUsed

这种设计后期非常容易出现组合爆炸。更稳妥的方式是:

  • 明确主状态
  • 明确补偿状态
  • 明确终态
  • 每次状态变更都有事件和时间戳

常见坑与排查

Saga 最难的往往不是“写出来”,而是“线上出了问题能不能搞清楚”。

坑 1:调用超时,但下游其实成功了

现象

上游收到超时,触发补偿;下游晚一点又返回成功,导致状态冲突。

根因

  • 网络超时不等于业务失败
  • 没有状态查询接口
  • 没有“处理中”语义

排查建议

先看三个维度:

  1. 请求日志:是否真正发出
  2. 下游处理日志:是否落库成功
  3. 状态表:最终状态是否已推进

解决方法

  • 每个关键步骤提供 queryStatus(opId) 接口
  • 对超时步骤先查状态,再决定重试还是补偿
  • 引入“处理中/未知”中间状态,避免过早补偿

坑 2:补偿失败,事务悬挂

现象

支付失败后库存补偿也失败,订单一直卡在“补偿中”。

根因

  • 补偿链路没设计重试
  • 补偿服务自身依赖了更多外部系统
  • 补偿动作不是天然可逆

排查建议

重点看:

  • 补偿任务表是否有重试次数
  • 补偿失败是否进入死信或人工队列
  • 依赖服务是否存在雪崩

解决方法

  • 补偿任务持久化
  • 指数退避重试
  • 超阈值转人工处理
  • 提前定义“兜底状态”

例如:

  • COMPENSATING
  • COMPENSATION_FAILED
  • MANUAL_REVIEW

坑 3:消息重复消费导致多次执行

现象

库存被扣了两次,或者退款了两次。

根因

  • 消息至少一次投递
  • 消费端缺乏幂等

排查建议

  • 检查消息唯一键是否稳定
  • 检查消费记录表是否按业务主键去重
  • 看是否因为消费者重平衡导致重复消费

解决方法

  • 消费前落幂等记录
  • 业务表加唯一约束
  • “先判重,再执行业务,再提交消费位点”

坑 4:补偿顺序不对

现象

先取消订单,再尝试退款,导致退款逻辑因为订单状态非法而失败。

解决方法

补偿顺序通常遵循逆序原则

  1. 最后执行的正向动作,最先补偿
  2. 尽量避免补偿动作依赖已被销毁的上下文

安全/性能最佳实践

Saga 讨论里大家容易只盯着一致性,其实安全和性能一样重要。

安全最佳实践

1. 补偿接口要鉴权

补偿接口本质上可以“撤销业务动作”,如果暴露不当,风险很大。

建议:

  • 仅允许内网访问
  • 使用服务间身份认证
  • sagaId/opId 做签名校验或权限校验
  • 记录完整审计日志

2. 防止伪造重试

如果重试任务由消息系统或任务系统触发,要避免外部伪造请求反复打补偿接口。

可做法:

  • 请求带内部 token
  • 操作表校验状态流转合法性
  • 相同 opId 只允许执行到特定终态

3. 敏感数据最小化透传

Saga 上下文中不要传完整敏感信息,例如:

  • 银行卡号
  • 身份证号
  • 明文手机号

建议只传:

  • 业务 ID
  • 脱敏后的展示字段
  • 下游查询所需最小信息

性能最佳实践

1. 缩短本地事务时间

本地事务中不要做这些事:

  • 远程调用
  • 慢 SQL
  • 大批量扫描
  • 大对象序列化

原则是:

本地事务只做本地可靠落库,远程动作交给事务编排或消息机制。

2. 控制 Saga 上下文大小

我见过有人把整个订单 JSON 透传给每一步服务,最后消息体越来越大,链路延迟明显上升。

建议上下文只保留:

  • sagaId
  • orderId
  • userId
  • 必要金额/数量
  • 当前步骤和版本号

3. 重试要有限流和退避

补偿失败后疯狂重试,会把下游压垮,最终从“局部错误”变成“系统事故”。

建议:

  • 指数退避
  • 最大重试次数
  • 熔断与降级
  • 死信队列 + 人工介入

4. 监控关键指标

至少监控这些指标:

  • Saga 成功率
  • 平均完成时长
  • 补偿率
  • 补偿失败率
  • 长时间未结束事务数
  • 每步骤超时分布
  • 重试次数分布

容量估算与治理建议

架构文章不能只停留在“模式正确”,还要考虑跑起来后的规模问题。

一个粗略估算方法

假设:

  • 下单峰值 QPS:500
  • 每个 Saga 平均 4 个步骤
  • 每步平均 1 次消息投递
  • 失败补偿比例:2%

那么:

  • 每秒正向步骤请求数约:500 × 4 = 2000
  • 每秒补偿额外请求数约:500 × 4 × 2% = 40
  • 如果每步都记录事件日志,日志写入约:2000 ~ 3000+/s

这意味着你至少要评估:

  • 事务日志表写入能力
  • 消息队列吞吐
  • 补偿任务堆积上限
  • 监控系统基数增长

治理建议

  • Saga 状态表按时间分区或分表
  • 历史完成事务归档
  • 长事务单独监控
  • 补偿失败工单化
  • 对热点业务链路做压测演练

一套更贴近生产的落地建议

如果你准备在真实系统中上 Saga,我建议按下面顺序推进,而不是一下子搞很重的框架。

第一步:先统一模型

明确每个步骤的:

  • 正向动作
  • 补偿动作
  • 幂等键
  • 超时阈值
  • 可查询状态
  • 最终终态

第二步:先做编排式

先有一个简单可靠的协调器,哪怕是应用内模块,也比“每个服务自己猜流程”更可控。

第三步:补齐可观测性

至少做到:

  • sagaId 全链路日志
  • 状态变化事件
  • 补偿告警
  • 超时任务扫描

第四步:准备人工兜底

请务必接受现实:总会有自动补偿处理不了的情况。

所以生产系统中要有:

  • 补偿失败列表
  • 人工重试入口
  • 人工终止/标记完成
  • 操作审计记录

这部分经常被忽略,但真的很关键。


总结

Saga 模式适合解决分布式架构中大量跨服务、长流程、可补偿的事务问题。它不追求跨服务的瞬时强一致,而是通过本地事务、状态推进、失败补偿、幂等重试来实现最终一致。

如果你只记住几条落地建议,我建议是这几条:

  1. 优先选择编排式 Saga 起步,流程更清晰,排查更直观
  2. 每一步都要幂等,包括正向、补偿、重试、查询
  3. 状态机要先设计好,不要靠布尔字段拼流程
  4. 超时不等于失败,先查状态再决定补偿
  5. 补偿失败要可重试、可告警、可人工接管
  6. 明确边界:资金强一致场景,不要勉强用 Saga

最后一句比较务实的话:
Saga 真正的难点不是“怎么把流程串起来”,而是“失败之后系统还能不能优雅地收回来”。
只要你围绕这个目标设计,方案通常就不会跑偏。


分享到:

上一篇
《Spring Boot 中基于 Spring Cache + Redis 的多级缓存实战:缓存穿透、击穿与雪崩治理方案》
下一篇
《Docker 多阶段构建与镜像瘦身实战:从构建提速到生产安全交付》