跳转到内容
123xiao | 无名键客

《微服务架构下的分布式事务实战:基于 Saga 模式的设计、落地与故障补偿》

字数: 0 阅读时长: 1 分钟

背景与问题

在单体应用里,事务通常靠数据库的 ACID 一把梭:begin -> update -> commit/rollback。但一旦系统拆成微服务,订单、库存、支付、积分分别归不同服务和数据库管理,事情就没那么简单了。

一个很典型的场景:

  • 订单服务创建订单
  • 库存服务扣减库存
  • 支付服务冻结或扣款
  • 积分服务发放奖励积分

如果中间任何一步失败,你就会遇到这些“线上很烦、测试时又不容易完整复现”的问题:

  • 订单成功了,但库存没扣
  • 库存扣了,但支付失败
  • 支付成功了,但订单状态还停留在“处理中”
  • 补偿逻辑重复执行,导致库存被加回两次
  • 消息丢了,Saga 卡在中间态,没人处理

这类问题的根源是:本地事务很好做,跨服务的一致性很难做。

很多团队一开始会想到 2PC/XA,但在微服务场景里常常不合适:

  • 依赖协调器,耦合重
  • 性能开销大
  • 对数据库、中间件支持要求高
  • 长事务容易拖垮系统吞吐

所以在业务可接受“最终一致性”的前提下,Saga 模式是一个更现实的选择。


背景下最常见的故障现象

先别急着讲原理,我更想从排障视角切入。因为大部分人不是“不会写 Saga”,而是“写了之后线上出怪问题”。

典型现象

1. 订单长时间停留在 PENDING

表现:

  • 用户下单后页面一直转圈
  • 后台订单状态没有推进
  • 运营看到“处理中”订单越来越多

根因通常是:

  • 某个下游服务超时
  • 消息投递成功但消费失败
  • Saga 协调器状态丢失
  • 补偿逻辑和正向逻辑互相打架

2. 补偿执行了,但数据还是不一致

表现:

  • 库存已经回滚,但订单没有取消
  • 订单取消了,但支付未退款
  • 一个服务补偿成功,另一个服务补偿失败

根因通常是:

  • 补偿不是幂等的
  • 补偿顺序设计错误
  • 正向事务和补偿事务业务语义不对称

3. 重试后“越修越乱”

表现:

  • 同一 Saga 被处理多次
  • 重复扣库存、重复退款、重复发积分
  • 日志里出现多个相同业务键的操作记录

根因通常是:

  • 没有幂等键
  • 消息至少一次投递带来重复消费
  • 定时补偿任务和人工处理同时执行

核心原理

Saga 的核心思想可以概括成一句话:

把一个长事务拆成多个本地事务,每个本地事务都对应一个补偿动作;如果中间某一步失败,就按相反顺序执行补偿。

Saga 的两种常见实现

1. Choreography(事件编排)

每个服务监听事件并触发下一个动作,没有中心协调器。

优点:

  • 解耦,看起来很优雅
  • 每个服务自治性强

缺点:

  • 业务链一长,事件流转难追踪
  • 排障成本高
  • 变更流程时容易牵一发动全身

2. Orchestration(中心编排)

由一个 Saga Orchestrator 负责驱动各步骤。

优点:

  • 流程清晰
  • 状态集中,便于监控和补偿
  • 更适合复杂业务和 troublehsooting

缺点:

  • 协调器本身需要高可用
  • 流程控制集中,设计不好会变成“上帝服务”

对于中级读者和大多数业务团队,我的建议是:

复杂链路优先选 Orchestration。 不是因为它理论上最完美,而是因为它更容易定位问题、落地监控和做人工干预。

Saga 的状态流转

stateDiagram-v2
    [*] --> Started
    Started --> OrderCreated
    OrderCreated --> InventoryReserved
    InventoryReserved --> PaymentCharged
    PaymentCharged --> Completed

    InventoryReserved --> Compensating: Payment failed
    PaymentCharged --> Compensating: Reward failed
    OrderCreated --> Compensating: Inventory failed

    Compensating --> OrderCancelled
    OrderCancelled --> InventoryReleased
    InventoryReleased --> PaymentRefunded
    PaymentRefunded --> Failed
    Failed --> [*]
    Completed --> [*]

注意:补偿不是数据库意义上的 rollback。 它是一个新的业务动作,比如:

  • “扣库存”的补偿不是数据库回滚,而是“释放预留库存”
  • “支付扣款”的补偿不是回滚 SQL,而是“发起退款”
  • “创建订单”的补偿不是删数据,而是“把订单状态置为 CANCELLED”

这也是很多团队一开始最容易理解错的地方。

一个下单 Saga 的时序

sequenceDiagram
    participant Client as Client
    participant Saga as Saga Orchestrator
    participant Order as Order Service
    participant Inventory as Inventory Service
    participant Payment as Payment Service

    Client->>Saga: Create Order Request
    Saga->>Order: createOrder()
    Order-->>Saga: orderCreated

    Saga->>Inventory: reserveInventory()
    alt success
        Inventory-->>Saga: inventoryReserved
        Saga->>Payment: charge()
        alt success
            Payment-->>Saga: paymentCharged
            Saga->>Order: markPaid()
            Order-->>Saga: done
        else fail
            Payment-->>Saga: chargeFailed
            Saga->>Inventory: releaseInventory()
            Saga->>Order: cancelOrder()
        end
    else fail
        Inventory-->>Saga: reserveFailed
        Saga->>Order: cancelOrder()
    end

现象复现

下面我用一个可运行的 Python 示例,模拟一个简化版 Saga 编排器。它不依赖消息队列,主要用于理解:

  • 正向事务怎么串起来
  • 异常时怎么补偿
  • 幂等怎么做
  • 排障时看哪些状态

运行效果

  • 成功路径:创建订单 -> 预留库存 -> 扣款成功
  • 失败路径:支付失败 -> 释放库存 -> 取消订单
  • 重复执行补偿时不会二次修改数据

实战代码(可运行)

from dataclasses import dataclass, field
from typing import Dict, List
import uuid


# ========== 模拟数据库 ==========
orders: Dict[str, dict] = {}
inventory: Dict[str, int] = {"sku-1": 10}
payments: Dict[str, dict] = {}
idempotency_log: set = set()


# ========== 通用异常 ==========
class BusinessError(Exception):
    pass


# ========== 服务层 ==========
class OrderService:
    def create_order(self, saga_id: str, order_id: str, sku: str, amount: int):
        key = f"create_order:{saga_id}"
        if key in idempotency_log:
            return orders[order_id]

        orders[order_id] = {
            "order_id": order_id,
            "sku": sku,
            "amount": amount,
            "status": "PENDING"
        }
        idempotency_log.add(key)
        return orders[order_id]

    def mark_paid(self, saga_id: str, order_id: str):
        key = f"mark_paid:{saga_id}"
        if key in idempotency_log:
            return

        if order_id not in orders:
            raise BusinessError("order not found")
        orders[order_id]["status"] = "PAID"
        idempotency_log.add(key)

    def cancel_order(self, saga_id: str, order_id: str):
        key = f"cancel_order:{saga_id}"
        if key in idempotency_log:
            return

        if order_id in orders:
            orders[order_id]["status"] = "CANCELLED"
        idempotency_log.add(key)


class InventoryService:
    def reserve(self, saga_id: str, sku: str, count: int):
        key = f"reserve_inventory:{saga_id}"
        if key in idempotency_log:
            return

        current = inventory.get(sku, 0)
        if current < count:
            raise BusinessError("insufficient inventory")
        inventory[sku] = current - count
        idempotency_log.add(key)

    def release(self, saga_id: str, sku: str, count: int):
        key = f"release_inventory:{saga_id}"
        if key in idempotency_log:
            return

        inventory[sku] = inventory.get(sku, 0) + count
        idempotency_log.add(key)


class PaymentService:
    def charge(self, saga_id: str, order_id: str, amount: int, should_fail: bool = False):
        key = f"charge_payment:{saga_id}"
        if key in idempotency_log:
            return payments.get(order_id)

        if should_fail:
            raise BusinessError("payment failed")

        payments[order_id] = {
            "order_id": order_id,
            "amount": amount,
            "status": "CHARGED"
        }
        idempotency_log.add(key)
        return payments[order_id]

    def refund(self, saga_id: str, order_id: str):
        key = f"refund_payment:{saga_id}"
        if key in idempotency_log:
            return

        if order_id in payments and payments[order_id]["status"] == "CHARGED":
            payments[order_id]["status"] = "REFUNDED"
        idempotency_log.add(key)


# ========== Saga 定义 ==========
@dataclass
class SagaStep:
    name: str
    action: callable
    compensation: callable


@dataclass
class SagaContext:
    saga_id: str
    order_id: str
    sku: str
    amount: int
    executed_steps: List[SagaStep] = field(default_factory=list)


class OrderSagaOrchestrator:
    def __init__(self):
        self.order_service = OrderService()
        self.inventory_service = InventoryService()
        self.payment_service = PaymentService()

    def execute(self, sku: str, amount: int, payment_should_fail: bool = False):
        saga_id = str(uuid.uuid4())
        order_id = str(uuid.uuid4())
        ctx = SagaContext(saga_id=saga_id, order_id=order_id, sku=sku, amount=amount)

        steps = [
            SagaStep(
                "create_order",
                lambda: self.order_service.create_order(ctx.saga_id, ctx.order_id, ctx.sku, ctx.amount),
                lambda: self.order_service.cancel_order(ctx.saga_id, ctx.order_id),
            ),
            SagaStep(
                "reserve_inventory",
                lambda: self.inventory_service.reserve(ctx.saga_id, ctx.sku, 1),
                lambda: self.inventory_service.release(ctx.saga_id, ctx.sku, 1),
            ),
            SagaStep(
                "charge_payment",
                lambda: self.payment_service.charge(ctx.saga_id, ctx.order_id, ctx.amount, payment_should_fail),
                lambda: self.payment_service.refund(ctx.saga_id, ctx.order_id),
            ),
            SagaStep(
                "mark_paid",
                lambda: self.order_service.mark_paid(ctx.saga_id, ctx.order_id),
                lambda: self.order_service.cancel_order(ctx.saga_id, ctx.order_id),
            ),
        ]

        try:
            for step in steps:
                print(f"[ACTION] {step.name}")
                step.action()
                ctx.executed_steps.append(step)
            print(f"[SUCCESS] saga_id={ctx.saga_id}, order_id={ctx.order_id}")
            return ctx
        except Exception as e:
            print(f"[ERROR] {e}, start compensation")
            for step in reversed(ctx.executed_steps):
                try:
                    print(f"[COMPENSATE] {step.name}")
                    step.compensation()
                except Exception as ce:
                    print(f"[COMPENSATION ERROR] step={step.name}, error={ce}")
            return ctx


if __name__ == "__main__":
    orchestrator = OrderSagaOrchestrator()

    print("=== CASE 1: success ===")
    ctx1 = orchestrator.execute("sku-1", 100, payment_should_fail=False)
    print("orders =", orders)
    print("inventory =", inventory)
    print("payments =", payments)

    print("\n=== CASE 2: payment fail ===")
    ctx2 = orchestrator.execute("sku-1", 200, payment_should_fail=True)
    print("orders =", orders)
    print("inventory =", inventory)
    print("payments =", payments)

运行说明

保存为 saga_demo.py,然后执行:

python saga_demo.py

你应该关注什么

这个示例虽然简化,但已经包含了几个很关键的落地点:

  1. 每一步都有补偿动作
  2. 补偿按逆序执行
  3. saga_id 做幂等控制
  4. 订单不是删除,而是改状态
  5. 支付失败不会影响已完成步骤的“物理回滚”,只能做业务补偿

设计落地:数据库与消息表怎么配合

在真实项目里,光靠内存对象肯定不够。通常我会建议至少落这几张表:

  • saga_instance:Saga 实例主表
  • saga_step:每一步执行状态
  • outbox_message:本地消息表
  • idempotency_record:幂等记录表

一个简化的表结构示意

classDiagram
    class saga_instance {
      +varchar saga_id
      +varchar business_key
      +varchar status
      +datetime created_at
      +datetime updated_at
    }

    class saga_step {
      +bigint id
      +varchar saga_id
      +varchar step_name
      +varchar step_status
      +int retry_count
      +text error_message
      +datetime updated_at
    }

    class outbox_message {
      +bigint id
      +varchar aggregate_id
      +varchar topic
      +text payload
      +varchar status
      +datetime next_retry_time
    }

    class idempotency_record {
      +bigint id
      +varchar idempotency_key
      +varchar biz_type
      +datetime created_at
    }

    saga_instance --> saga_step : contains

推荐的本地事务写法

以“订单创建 + 写出站消息”为例,建议放在一个本地事务里:

BEGIN;

INSERT INTO orders(order_id, user_id, status, amount)
VALUES ('o1001', 'u1', 'PENDING', 100);

INSERT INTO outbox_message(aggregate_id, topic, payload, status, next_retry_time)
VALUES (
  'o1001',
  'order.created',
  '{"orderId":"o1001","amount":100}',
  'NEW',
  NOW()
);

COMMIT;

然后由独立投递程序扫描 outbox_message 表发送消息。这样可以避免:

  • 订单写成功但消息没发出去
  • 消息发出去了但订单事务回滚了

这个模式就是常说的 Transactional Outbox,Saga 项目里非常常见。


定位路径:线上问题怎么查

这一节是 troubleshooting 文章里最重要的部分。很多线上问题不是“不会修”,而是“查不到链路”。

建议的排查顺序

第一步:先查 Saga 实例状态

先明确这个 Saga 目前在哪一步:

SELECT saga_id, business_key, status, updated_at
FROM saga_instance
WHERE business_key = 'ORDER:o1001';

重点看:

  • 是否处于 RUNNING
  • 是否已经进入 COMPENSATING
  • 是否长时间没更新

第二步:查步骤表,找到卡点

SELECT step_name, step_status, retry_count, error_message, updated_at
FROM saga_step
WHERE saga_id = 'your-saga-id'
ORDER BY id;

如果看到:

  • create_order = SUCCESS
  • reserve_inventory = SUCCESS
  • charge_payment = FAILED

那基本就锁定是支付步骤的问题。

第三步:查消息投递和消费记录

如果用了 MQ,一定要查:

  • 生产端有没有写出站消息
  • 投递程序有没有发送成功
  • 消费端有没有消费
  • 消费失败后有没有重试
  • 重试是否打到死信队列

第四步:查幂等表与业务表是否匹配

这是我踩过的坑之一:幂等记录写成功了,但业务动作实际没成功。 这样一来,后续重试会被误判为“已处理”。

你应该同时核对:

  • idempotency_record
  • 订单/库存/支付业务表
  • 下游调用返回日志

第五步:查补偿是否执行过、是否执行完整

有时候问题不是“失败了没补偿”,而是“补偿只做了一半”。

建议重点核对:

  • 补偿开始时间
  • 已执行的补偿步骤
  • 补偿失败原因
  • 是否有人工干预记录

常见坑与排查

坑 1:把 Saga 当成分布式回滚

现象

设计时以为失败就能“像数据库一样全部回滚”。

本质问题

Saga 是最终一致性,不是强一致性。 有些动作天然不可逆,比如:

  • 已发出的短信
  • 已调用的外部物流系统
  • 已经发生的优惠券核销

建议

  • 先区分哪些动作可补偿,哪些只能“冲正”
  • 对不可逆动作放到 Saga 后置阶段,或者异步化
  • 对外部系统设计“撤销接口”时,务必验证语义

坑 2:补偿接口不幂等

现象

一次支付失败,补偿任务跑了两次,库存被加回两次。

根因

消息至少一次投递、任务重试、人工触发,这三者都可能导致重复补偿。

解决方案

  • 每个动作和补偿动作都要有唯一幂等键
  • 幂等键建议包含:saga_id + step_name + action_type
  • 幂等判断必须和业务更新尽量放在同一本地事务中

示例:

INSERT INTO idempotency_record(idempotency_key, biz_type, created_at)
VALUES ('refund:step3:saga-123', 'PAYMENT_REFUND', NOW());

如果插入失败(唯一键冲突),说明已处理过。


坑 3:正向成功了,但回写状态失败

现象

支付已经成功,但 mark_paid 超时失败,导致 Saga 被判定失败并开始补偿。

这种情况很危险

因为业务真实世界里,钱可能已经扣了,但系统状态还没更新。

排查思路

  • 查支付流水是否成功
  • 查订单状态回写日志
  • 查补偿是否误触发退款

止血方案

  • 对“关键外部动作”先以事实为准,比如支付流水
  • 允许 Saga 进入 UNKNOWNREVIEW_REQUIRED 状态
  • 不要立刻全自动补偿,先做人审或二次对账

这是一个非常实战的建议:
不确定,就不要自动补。 尤其涉及资金时,宁可进入待核查,也不要盲目退款。


坑 4:补偿顺序设计错了

现象

先取消订单,再退款失败,导致用户已扣款但订单已关闭。

原则

补偿顺序通常是正向步骤的逆序,但要结合业务语义调整。

比如:

  1. 创建订单
  2. 锁库存
  3. 扣款
  4. 发券

失败时不一定简单逆序就够,可能需要:

  1. 撤销发券
  2. 退款
  3. 释放库存
  4. 关闭订单

建议

  • 用“资金优先”“库存优先”“权益优先”明确补偿等级
  • 给每一步定义业务后果,而不是只看技术顺序

坑 5:只记日志,不记状态机

现象

线上看起来日志很多,但就是不知道流程走到哪一步了。

根因

没有正式的 Saga 状态模型,只靠日志字符串排查。

建议

至少记录:

  • Saga 实例状态
  • 当前步骤
  • 每步重试次数
  • 最近错误码
  • 最近错误消息
  • 最后更新时间

日志是辅助,状态机才是排障基线


止血方案:出故障时先保证不继续扩大

当你发现 Saga 已经出现大面积卡单或重复补偿,不要第一反应就是“重试全部任务”。先止血。

推荐止血步骤

1. 暂停自动重试

避免重复扣款、重复退款、重复释放库存。

2. 关闭入口流量或降级部分流程

比如先停止“积分发放”“优惠券发放”这类非核心步骤,把主链路保住。

3. 按业务类型分类处理

  • 资金类:优先人工核账
  • 库存类:优先对账恢复
  • 权益类:允许延迟补发

4. 导出异常 Saga 清单

按照下面维度筛:

  • 运行超时超过 10 分钟
  • 重试次数超过阈值
  • 处于 COMPENSATING 超过阈值
  • 关键步骤状态不一致

5. 人工补偿必须留痕

人工执行退款、回补库存、改订单状态时,一定要记录:

  • 操作人
  • 时间
  • 原因
  • 关联 saga_id
  • 操作结果

否则后面二次排查会非常痛苦。


安全/性能最佳实践

安全方面

1. 补偿接口要做权限隔离

补偿接口本质上具备“修改业务结果”的能力,不能暴露成普通开放接口。

建议:

  • 仅允许内部服务调用
  • 使用服务间鉴权
  • 对人工补偿接口启用 RBAC 和审计日志

2. 敏感字段不要直接打日志

支付单号、用户手机号、卡号片段等信息,不要在 Saga 日志里明文输出。

3. 防止重放攻击或误调用

对于回调和补偿接口:

  • 校验签名
  • 校验时间戳
  • 校验请求唯一流水号

性能方面

1. Saga 步骤尽量短小

不要在单步里做大量慢操作,否则会拉长整个事务链路。

2. 超时要分级设置

不要全链路统一 30 秒。建议:

  • 库存预留:短超时
  • 支付扣款:中等超时
  • 权益发放:可异步

3. 重试要指数退避

避免故障时重试风暴把下游彻底打死。

伪代码示例:

def next_retry_delay(retry_count: int) -> int:
    base = 2
    max_delay = 60
    return min(base ** retry_count, max_delay)

4. 补偿任务和主流程隔离资源池

这个细节非常重要。否则一旦主流程异常堆积,补偿线程也抢不到资源,系统会越来越乱。

5. 做好对账任务

最终一致性不是“祈祷一致”,而是要靠对账兜底。

至少要有:

  • 订单与支付对账
  • 订单与库存对账
  • Saga 状态与业务事实对账

一个更实用的落地建议:先从 3 步 Saga 开始

如果你们团队第一次落地 Saga,我不建议一上来就做 7~10 个步骤的大流程。我的经验是:

先挑一个最典型、边界最清楚的链路,比如:

  1. 创建订单
  2. 锁库存
  3. 扣款

把下面这些基础设施补齐:

  • Saga 实例表
  • 步骤状态表
  • 幂等机制
  • Outbox 消息投递
  • 超时重试
  • 人工干预后台
  • 对账任务

等这套机制稳定了,再逐步把积分、优惠券、履约通知这些非核心步骤接进来。

否则很容易出现“功能看起来完整,出事时没人知道怎么救火”的情况。


总结

Saga 不是银弹,但在微服务架构里,它是处理分布式事务非常实用的一种方式,特别适合:

  • 接受最终一致性
  • 链路较长
  • 服务边界清晰
  • 需要高可用和可扩展性

如果你是从 troubleshooting 的角度来落地,我建议抓住 4 个关键词:

  1. 状态机:别只打日志,要有明确 Saga 状态
  2. 幂等:正向动作和补偿动作都必须幂等
  3. Outbox:业务更新和消息发送要解耦但保持一致
  4. 对账与人工兜底:自动化不是万能,尤其是资金场景

最后给几个可以直接执行的建议:

  • 复杂业务优先用 Orchestration
  • 所有步骤都定义清楚“成功条件”和“补偿语义”
  • 不确定状态不要盲目自动补偿,先标记待核查
  • 补偿接口必须幂等、可审计、可重试
  • 先做小链路试点,再推广到全局

如果你已经在线上踩过 Saga 的坑,你会发现一个朴素但很重要的事实:

分布式事务真正难的,不是“让它跑起来”,而是“出故障时还能把它稳稳收回来”。


分享到:

上一篇
《从原型到生产:中级开发者构建企业级 AI 问答系统的检索增强生成(RAG)实战路径》
下一篇
《Spring Boot 中基于 JWT 与 Spring Security 的前后端分离认证鉴权实战指南》