跳转到内容
123xiao | 无名键客

《微服务架构下的分布式事务实战:基于 Saga 模式的设计、落地与故障恢复》

字数: 0 阅读时长: 1 分钟

微服务架构下的分布式事务实战:基于 Saga 模式的设计、落地与故障恢复

在单体应用里,事务这件事通常不难:一个数据库连接,一个 BEGIN,一把梭。
可一旦进入微服务架构,订单、库存、支付、账户都拆成独立服务后,事情马上就变味了:业务要么一起成功,要么一起回滚,但底层已经没有一个统一数据库让你做本地事务了。

我第一次在生产里踩分布式事务坑时,最明显的症状是:订单显示“已创建”,库存也扣了,但支付失败,退款补偿又没执行成功。这类问题不会每天都发生,但一旦发生,客服、运营、财务都会找上门。

这篇文章就从“问题怎么出现”开始,带你把 Saga 模式的核心原理、代码落地、故障恢复和排查路径走一遍。重点不是讲概念,而是讲怎么避免线上翻车,以及翻车后怎么救


背景与问题

为什么微服务里事务变难了

在微服务架构下,一个下单流程往往会跨多个服务:

  • 订单服务:创建订单
  • 库存服务:冻结或扣减库存
  • 支付服务:发起扣款
  • 账户服务:记录资金流水
  • 通知服务:发送短信或站内信

如果用传统 2PC/XA 去做强一致事务,理论上最完整,但实践里常遇到几个问题:

  1. 数据库和中间件支持有限
  2. 协调器引入复杂度
  3. 同步阻塞,吞吐差
  4. 长事务对可用性非常不友好
  5. 云原生环境中跨服务、跨存储很难统一支持

所以大多数互联网业务最后都会转向最终一致性方案,而 Saga 是其中最常见、也最实用的一种。

典型故障现象

我把线上最常见的异常现象总结成 4 类:

  1. 主流程执行成功,但补偿没触发
  2. 补偿触发了,但补偿本身失败
  3. 消息重复投递,导致重复扣库存/重复退款
  4. 状态不一致:订单失败,但库存仍冻结

这些问题本质上都不是“代码不会写”,而是事务边界、状态机设计、幂等和恢复机制没做好


核心原理

Saga 的核心思想很简单:

把一个长事务拆成多个本地事务,每一步成功后继续下一步;如果中间某一步失败,就按相反顺序执行对应的补偿动作。

Saga 两种常见实现方式

1. Choreography(事件编排 / 事件驱动)

每个服务订阅事件,自主决定下一步做什么。

优点:

  • 去中心化
  • 服务自治强

缺点:

  • 流程分散,链路不直观
  • 排查问题难
  • 随着参与方变多,事件风暴严重

2. Orchestration(中心编排)

由一个 Saga 协调器统一驱动流程。

优点:

  • 流程清晰
  • 易排障
  • 更适合复杂业务

缺点:

  • 编排器本身需要高可用
  • 容易变成“流程上帝服务”

在 troubleshooting 场景里,我更推荐先采用中心编排式 Saga,因为它更容易观测、补偿和人工介入。

订单 Saga 的状态流转

下面是一个典型下单 Saga:

flowchart TD
    A[创建订单] --> B[冻结库存]
    B --> C[发起支付]
    C --> D[确认订单]
    C -->|支付失败| E[解冻库存]
    B -->|库存失败| F[取消订单]
    E --> F

补偿不是回滚

这是很多人第一次接触 Saga 时最容易误解的点。

数据库回滚是“撤销未提交结果”,而 Saga 补偿是“再执行一个业务动作,把状态纠正回来”。

比如:

  • 扣库存的补偿不是“数据库回滚”,而是“增加可用库存/解除冻结”
  • 支付的补偿不是“撤销 SQL”,而是“退款”
  • 发券的补偿可能不是“删除券”,而是“标记失效”

也正因为如此,补偿操作必须显式设计,不能指望框架替你自动搞定。


现象复现

先构造一个最常见的问题:
订单创建成功,库存冻结成功,但支付失败,随后补偿库存时因为网络抖动失败,最终留下“失败订单 + 冻结库存”。

复现链路

sequenceDiagram
    participant U as 用户
    participant S as Saga协调器
    participant O as 订单服务
    participant I as 库存服务
    participant P as 支付服务

    U->>S: 提交订单
    S->>O: 创建订单
    O-->>S: 成功
    S->>I: 冻结库存
    I-->>S: 成功
    S->>P: 发起支付
    P-->>S: 失败
    S->>I: 解冻库存(补偿)
    I--xS: 超时/失败
    S->>O: 标记订单失败
    O-->>S: 成功

这时候如果没有重试和补偿任务表,库存就会长期冻结。


核心设计:一套能恢复的 Saga

如果你只记住一句话,我建议记这句:

Saga 设计的重点不是“成功路径”,而是“失败后还能不能恢复”。

设计要点

1. 每一步都要有明确状态

不要只记录“成功/失败”,至少要有:

  • PENDING
  • SUCCESS
  • FAILED
  • COMPENSATING
  • COMPENSATED

2. 每个本地事务必须幂等

因为重试一定会发生。
只要有消息投递、网络抖动、超时重放,就一定会遇到重复请求。

3. 正向动作和补偿动作都要持久化

不要只靠内存状态。服务一重启,链路上下文就丢了。

4. 补偿顺序通常与执行顺序相反

先扣库存再扣款,补偿时往往先退款再解冻库存。

5. 必须允许人工介入

有些补偿本质上不是技术问题,而是业务边界问题。
例如支付网关超时后,平台不知道用户是否真的被扣款,这时只能进入“待人工核对”。


实战代码(可运行)

下面用 Python 写一个可运行的简化版 Saga 编排器。
它不依赖外部中间件,重点展示:

  • Saga 步骤定义
  • 正向执行
  • 失败补偿
  • 幂等控制
  • 故障恢复思路

你可以直接保存为 saga_demo.py 运行。

from dataclasses import dataclass, field
from typing import Callable, List, Dict


class SagaStepError(Exception):
    pass


@dataclass
class SagaStep:
    name: str
    action: Callable[[Dict], None]
    compensate: Callable[[Dict], None]
    status: str = "PENDING"


@dataclass
class SagaExecution:
    saga_id: str
    context: Dict = field(default_factory=dict)
    steps: List[SagaStep] = field(default_factory=list)
    status: str = "PENDING"


class IdempotencyStore:
    def __init__(self):
        self.done = set()

    def already_done(self, key: str) -> bool:
        return key in self.done

    def mark_done(self, key: str):
        self.done.add(key)


idempotency_store = IdempotencyStore()


# ---- 模拟服务 ----
inventory_db = {"item_1": {"available": 10, "frozen": 0}}
order_db = {}
payment_db = {}


def create_order(ctx: Dict):
    key = f"{ctx['saga_id']}:create_order"
    if idempotency_store.already_done(key):
        print("[订单] 幂等跳过 create_order")
        return

    order_id = ctx["order_id"]
    order_db[order_id] = {"status": "CREATED", "amount": ctx["amount"]}
    idempotency_store.mark_done(key)
    print(f"[订单] 创建成功: {order_id}")


def cancel_order(ctx: Dict):
    order_id = ctx["order_id"]
    if order_id in order_db:
        order_db[order_id]["status"] = "CANCELLED"
    print(f"[订单] 已取消: {order_id}")


def reserve_inventory(ctx: Dict):
    key = f"{ctx['saga_id']}:reserve_inventory"
    if idempotency_store.already_done(key):
        print("[库存] 幂等跳过 reserve_inventory")
        return

    item_id = ctx["item_id"]
    qty = ctx["qty"]
    item = inventory_db[item_id]

    if item["available"] < qty:
        raise SagaStepError("库存不足")

    item["available"] -= qty
    item["frozen"] += qty
    idempotency_store.mark_done(key)
    print(f"[库存] 冻结成功: {qty}")


def release_inventory(ctx: Dict):
    item_id = ctx["item_id"]
    qty = ctx["qty"]
    item = inventory_db[item_id]

    if item["frozen"] >= qty:
        item["frozen"] -= qty
        item["available"] += qty
    print(f"[库存] 已解冻: {qty}")


def pay(ctx: Dict):
    key = f"{ctx['saga_id']}:pay"
    if idempotency_store.already_done(key):
        print("[支付] 幂等跳过 pay")
        return

    # 模拟支付失败
    if ctx.get("force_pay_fail", False):
        raise SagaStepError("支付失败:银行返回余额不足")

    payment_db[ctx["order_id"]] = {"status": "PAID", "amount": ctx["amount"]}
    idempotency_store.mark_done(key)
    print("[支付] 支付成功")


def refund(ctx: Dict):
    order_id = ctx["order_id"]
    if order_id in payment_db:
        payment_db[order_id]["status"] = "REFUNDED"
    print("[支付] 已退款")


def confirm_order(ctx: Dict):
    order_id = ctx["order_id"]
    order_db[order_id]["status"] = "CONFIRMED"
    print(f"[订单] 已确认: {order_id}")


def unconfirm_order(ctx: Dict):
    order_id = ctx["order_id"]
    if order_id in order_db:
        order_db[order_id]["status"] = "CREATED"
    print(f"[订单] 确认回退: {order_id}")


class SagaOrchestrator:
    def execute(self, execution: SagaExecution):
        execution.status = "RUNNING"
        completed_steps = []

        try:
            for step in execution.steps:
                print(f"\n>>> 执行步骤: {step.name}")
                step.action(execution.context)
                step.status = "SUCCESS"
                completed_steps.append(step)

            execution.status = "SUCCESS"
            print("\nSaga 执行成功")
        except Exception as e:
            print(f"\n!!! Saga 执行失败: {e}")
            execution.status = "COMPENSATING"

            for step in reversed(completed_steps):
                try:
                    print(f"<<< 补偿步骤: {step.name}")
                    step.compensate(execution.context)
                    step.status = "COMPENSATED"
                except Exception as ce:
                    step.status = "COMPENSATION_FAILED"
                    print(f"xxx 补偿失败: {step.name}, error={ce}")

            execution.status = "FAILED"
            print("Saga 已结束,状态 FAILED")


if __name__ == "__main__":
    ctx = {
        "saga_id": "saga_1001",
        "order_id": "order_1001",
        "item_id": "item_1",
        "qty": 2,
        "amount": 100,
        "force_pay_fail": True
    }

    saga = SagaExecution(
        saga_id=ctx["saga_id"],
        context=ctx,
        steps=[
            SagaStep("创建订单", create_order, cancel_order),
            SagaStep("冻结库存", reserve_inventory, release_inventory),
            SagaStep("发起支付", pay, refund),
            SagaStep("确认订单", confirm_order, unconfirm_order),
        ]
    )

    orchestrator = SagaOrchestrator()
    orchestrator.execute(saga)

    print("\n--- 最终数据 ---")
    print("order_db =", order_db)
    print("inventory_db =", inventory_db)
    print("payment_db =", payment_db)
    print("saga_status =", saga.status)

运行结果预期

force_pay_fail=True 时,大致会看到:

  • 订单创建成功
  • 库存冻结成功
  • 支付失败
  • 触发补偿
  • 库存解冻
  • 订单取消

这就是一个最小可运行的 Saga 骨架。


进一步落地:数据库表怎么设计

线上系统不能只靠内存对象,一般至少要落这几张表。

1. Saga 实例表

CREATE TABLE saga_instance (
    saga_id VARCHAR(64) PRIMARY KEY,
    business_key VARCHAR(64) NOT NULL,
    saga_type VARCHAR(64) NOT NULL,
    status VARCHAR(32) NOT NULL,
    context_json TEXT NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

2. Saga 步骤表

CREATE TABLE saga_step (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    saga_id VARCHAR(64) NOT NULL,
    step_name VARCHAR(64) NOT NULL,
    step_order INT NOT NULL,
    status VARCHAR(32) NOT NULL,
    retry_count INT NOT NULL DEFAULT 0,
    last_error TEXT,
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    UNIQUE KEY uk_saga_step (saga_id, step_name)
);

3. 幂等记录表

CREATE TABLE idempotency_record (
    idem_key VARCHAR(128) PRIMARY KEY,
    status VARCHAR(32) NOT NULL,
    response_json TEXT,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

4. 补偿任务表

CREATE TABLE compensation_task (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    saga_id VARCHAR(64) NOT NULL,
    step_name VARCHAR(64) NOT NULL,
    status VARCHAR(32) NOT NULL,
    next_retry_time TIMESTAMP NOT NULL,
    retry_count INT NOT NULL DEFAULT 0,
    last_error TEXT,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

故障恢复机制怎么做

Saga 真正能不能上线,关键看恢复机制。
我建议至少做这三层。

第一层:同步重试

对于明确的瞬时错误,比如:

  • 网络超时
  • RPC 连接断开
  • 短暂限流

可以做有限次同步重试,比如 3 次指数退避。

第二层:异步补偿任务

如果补偿动作当下失败,不能就地放弃。
应该写入补偿任务表,由后台任务持续拉起执行。

stateDiagram-v2
    [*] --> PENDING
    PENDING --> RUNNING
    RUNNING --> SUCCESS
    RUNNING --> FAILED
    FAILED --> RETRYING
    RETRYING --> SUCCESS
    RETRYING --> MANUAL_INTERVENTION
    MANUAL_INTERVENTION --> SUCCESS

第三层:人工介入

这些情况通常需要人工处理:

  • 支付网关返回未知状态
  • 补偿多次失败,且影响资金或库存
  • 下游系统数据已被人工修改
  • 补偿动作存在业务副作用,不宜自动多次重放

一个经验原则是:

只要涉及钱,恢复机制里就必须留“人工核对”出口。


定位路径:线上出问题时先看什么

真出问题时,排查顺序比“知道多少理论”更重要。
下面给一个我自己比较常用的定位路径。

1. 先确认业务主键

先拿到:

  • orderId
  • sagaId
  • requestId
  • traceId

没有统一业务主键,排查会像大海捞针。

2. 看 Saga 实例状态

优先回答两个问题:

  • 是卡在正向流程,还是卡在补偿流程?
  • 哪一步最后成功,哪一步第一次失败?

3. 对比服务本地状态

按参与方逐个看:

  • 订单状态是否已取消
  • 库存是否仍冻结
  • 支付是否已扣款
  • 补偿任务是否存在

4. 查幂等记录和消息消费记录

排查是否有:

  • 重复执行
  • 消息未消费
  • 消费成功但 ACK 丢失
  • 消费失败后未重试

5. 最后再怀疑代码 bug

很多分布式事务问题,根源不是业务逻辑写错,而是:

  • 超时阈值过短
  • 状态写入时机不对
  • 幂等键不稳定
  • 补偿任务没有兜底扫描

常见坑与排查

下面这些坑,几乎每个 Saga 项目都会遇到。

坑 1:把“超时”当成“失败”

现象: 调用支付超时,协调器开始补偿;但几秒后支付平台其实扣款成功了。
后果: 订单被取消,但钱已经扣了。

排查方式:

  • 看下游接口语义:超时到底是“未执行”还是“结果未知”
  • 是否支持查询接口
  • 是否在补偿前做过结果确认

建议:

  • 对“结果未知”引入中间态 UNKNOWN
  • 先查询结果,再决定补偿
  • 对支付类操作优先采用“查询确认 + 延迟补偿”

坑 2:补偿操作不幂等

现象: 同一条补偿任务被执行两次,库存加回了两次。
后果: 库存被冲高,造成超卖风险。

排查方式:

  • 检查补偿接口是否按 sagaId + stepName 去重
  • 看补偿日志里是否有重复请求
  • 看消息是否发生重复投递

建议:

  • 正向和补偿都要幂等
  • 幂等键必须稳定且可重建
  • 幂等记录落库,不要只放缓存

坑 3:状态更新与消息发送不一致

现象: 本地事务提交了,但事件没发出去;或事件发出去了,但本地状态没落库。
后果: 流程“断链”。

建议:

  • 使用 Outbox Pattern
  • 先在本地事务里写业务数据和 outbox 事件
  • 再由异步投递器把事件发到消息队列

示意流程:

flowchart LR
    A[业务事务提交] --> B[写业务表]
    A --> C[写Outbox表]
    C --> D[异步投递器扫描]
    D --> E[消息队列]
    E --> F[下游服务消费]

坑 4:补偿顺序设计错了

现象: 支付失败后先取消订单,再解冻库存,但取消订单动作触发了额外清理流程,导致后续补偿拿不到上下文。
建议:

  • 明确依赖顺序
  • 先补偿外部副作用强的动作,再处理内部状态
  • 一般遵循“后执行先补偿”

坑 5:把 Saga 用在不适合的场景

Saga 很适合:

  • 订单
  • 库存
  • 账户余额变更
  • 跨服务审批流

但不太适合:

  • 强一致金融记账核心链路
  • 高频低价值、无需事务一致性的流程
  • 无法定义补偿动作的不可逆操作

边界要清楚:
如果业务根本不能接受短暂不一致,Saga 不是银弹。


安全/性能最佳实践

这一节经常被忽略,但线上稳定性往往就栽在这里。

安全方面

1. 补偿接口必须鉴权

很多团队把补偿接口当内部接口,结果网关放开了,存在被误调用风险。
建议:

  • 内网鉴权
  • 服务间 mTLS 或签名校验
  • 对敏感补偿接口做 RBAC 控制

2. 避免在上下文里放敏感明文

Saga 上下文常会被持久化,别直接存:

  • 银行卡号
  • 身份证号
  • 完整手机号
  • 支付令牌明文

应存脱敏值或引用 ID。

3. 人工介入要留审计日志

人工重试、强制完成、强制取消,都必须有:

  • 操作人
  • 操作时间
  • 原因说明
  • 前后状态

性能方面

1. Saga 步骤尽量短小

长时间步骤会拖高失败概率,也会拖长恢复窗口。
能拆就拆,但不要拆到碎片化失控。

2. 重试要有退避和上限

错误重试不是越快越好。
建议:

  • 指数退避
  • 最大重试次数
  • 熔断后转人工

3. 日志必须结构化

至少打印:

  • traceId
  • sagaId
  • stepName
  • action/compensate
  • status
  • errorCode

这样查问题才快。

4. 补偿扫描任务要可水平扩展

不要把补偿恢复写成单机定时器。
建议:

  • 任务分片
  • 基于数据库悲观锁/乐观锁抢占
  • 避免重复执行

一份可执行的止血方案

如果你现在系统里已经有 Saga,但经常出现卡单、冻结不释放,我建议先别急着大改架构,先做这几件事止血:

短期止血

  1. 给所有步骤补齐 sagaId
  2. 给正向和补偿接口补幂等
  3. 增加补偿任务表
  4. 增加失败任务扫描器
  5. 对支付类超时引入“未知状态”
  6. 做一个人工处理后台页

中期治理

  1. 引入统一编排器
  2. 统一状态模型
  3. 落地 Outbox
  4. 接入链路追踪和告警
  5. 建立补偿失败 SLA

长期优化

  1. 识别真正需要事务的流程
  2. 缩小事务边界
  3. 对资金类场景引入更强对账机制
  4. 建立演练机制,定期做故障注入

总结

Saga 模式不是“让分布式事务变简单”,而是把问题从“数据库层回滚”转移到“业务层补偿与恢复”。
所以它真正考验的,不是会不会写编排代码,而是:

  • 你有没有定义清楚每一步的状态
  • 补偿动作是不是可执行、可幂等
  • 出现未知结果时能不能查、能不能恢复
  • 系统重启、消息重复、网络超时后还能不能收敛到一致

如果你问我一个最实用的落地建议,我会说:

先别追求“最优雅的 Saga 设计”,先保证“失败后一定能补回来”。

对于中级工程师来说,真正把 Saga 用好,通常分三步:

  1. 先把正向流程跑通
  2. 再把补偿流程做全
  3. 最后把恢复、重试、人工介入补齐

做到这一步,Saga 才不只是 PPT 上的分布式事务方案,而是能在生产环境里扛住故障的工程实践。


分享到:

上一篇
《分布式架构中基于一致性哈希与服务治理的灰度发布实战指南》
下一篇
《微服务架构中服务拆分与接口治理的实战指南:从边界划分到版本演进》