跳转到内容
123xiao | 无名键客

《微服务架构下的分布式事务实战:基于 Saga 模式的设计、补偿与一致性治理》

字数: 0 阅读时长: 1 分钟

微服务架构下的分布式事务实战:基于 Saga 模式的设计、补偿与一致性治理

在单体系统里,事务这件事往往没那么“折磨人”:一个数据库连接、一个本地事务,提交或回滚,大家都懂。

但一旦拆成微服务,情况就变了。下单在订单服务,扣库存在库存服务,扣款在账户服务,发券可能又在营销服务。每个服务都有自己的数据库,这时候你再想靠一个 BEGIN / COMMIT / ROLLBACK 把所有动作绑在一起,基本就不现实了。

我第一次在生产环境里处理这类问题时,最痛的不是“事务失败”,而是“事务半成功”:订单建好了,库存扣了,结果支付失败;或者支付成功了,但消息丢了,订单迟迟没变成已支付。系统表面看还能跑,实际上数据已经开始“轻微腐烂”。

这篇文章我不讲太多抽象概念,重点从架构设计、Saga 原理、可运行代码、补偿策略、一致性治理几个角度,把这件事落到实处。


背景与问题

微服务为什么让事务变难

分布式事务难,不是因为“事务”本身变了,而是因为约束变多了:

  • 服务之间通过 RPC 或消息通信,天然存在网络延迟和失败
  • 每个服务独立存储,没法共享一个数据库事务
  • 调用链一长,任何一个环节都可能超时、重试、重复消费
  • CAP 决定了你不可能同时把一致性、可用性、分区容错都拉满

很多团队一开始会想到 2PC/XA。理论上它能保证强一致,但在微服务里经常会遇到几个现实问题:

  • 协调器成为瓶颈和单点
  • 锁持有时间长,吞吐明显下降
  • 各种中间件、数据库、驱动兼容性复杂
  • 云原生环境下运维成本高,不够“松耦合”

所以,大多数互联网型业务更常见的做法是:接受最终一致性,用业务补偿代替强回滚。而 Saga 就是这类思路里的代表模式。

一个典型业务场景

以“创建订单”为例,常见步骤可能是:

  1. 订单服务创建订单,状态为 PENDING
  2. 库存服务冻结库存
  3. 账户服务扣减余额
  4. 订单服务将状态改为 CONFIRMED

如果第 3 步失败,就需要对第 2 步执行补偿:解冻库存
如果第 4 步失败,但前 3 步已经成功,就不能简单回滚扣款,而是要通过重试、补偿或人工介入把状态修正回来。

这就进入 Saga 的核心。


核心原理

Saga 的基本思想

Saga 可以理解为:把一个长事务拆成多个本地事务,每个本地事务成功后进入下一步;如果中间某一步失败,则按相反顺序执行补偿操作。

比如:

  • 正向动作:

    • 创建订单
    • 冻结库存
    • 扣减账户
    • 确认订单
  • 补偿动作:

    • 取消订单
    • 释放库存
    • 退款/返还余额

注意,Saga 的“补偿”不是数据库层面的自动回滚,而是业务语义上的逆向操作

编排式 vs 协同式

Saga 常见有两种实现方式。

1. 编排式(Orchestration)

由一个“协调者”统一驱动各步骤:

  • 优点:

    • 流程清晰,可观测性好
    • 适合复杂业务链路
    • 失败路径更容易集中治理
  • 缺点:

    • 协调器本身需要高可用设计
    • 业务编排过多时容易变成“流程上帝”

2. 协同式(Choreography)

各服务通过事件自行触发后续步骤:

  • 优点:

    • 解耦程度高
    • 更符合事件驱动架构
  • 缺点:

    • 链路追踪复杂
    • 业务流转容易分散在多个服务中,不利于排障

中级读者如果要落地,我的建议很直接:

  • 链路复杂、需要审计与可视化:优先编排式
  • 流程较短、事件驱动成熟:可考虑协同式

方案对比与取舍分析

方案一致性性能实现复杂度适用场景
本地事务强一致单库单服务
2PC/XA强一致较低传统强事务场景
TCC较强一致很高核心资金、库存预留
Saga最终一致订单、履约、营销等长流程业务
可靠消息最终一致最终一致异步解耦场景

什么时候 Saga 不合适

Saga 不是万能钥匙,以下场景要谨慎:

  • 强实时资金一致性要求极高的场景
  • 补偿动作难以定义,或者补偿代价远高于失败代价
  • 某些动作天然不可逆,比如外部供应商已经发货、短信已发送、第三方已结算

这类情况通常要改流程:先冻结、后确认;先预占、后提交,而不是事后强行补偿。


一张图看懂 Saga 链路

flowchart LR
    A[订单服务 创建订单 PENDING] --> B[库存服务 冻结库存]
    B --> C[账户服务 扣减余额]
    C --> D[订单服务 确认订单 CONFIRMED]

    C -.失败.-> E[补偿: 释放库存]
    E -.-> F[补偿: 取消订单]

    B -.失败.-> F

核心流程时序图

sequenceDiagram
    participant Client as 客户端
    participant Orchestrator as Saga协调器
    participant Order as 订单服务
    participant Inventory as 库存服务
    participant Account as 账户服务

    Client->>Orchestrator: 创建订单请求
    Orchestrator->>Order: createOrder()
    Order-->>Orchestrator: orderId

    Orchestrator->>Inventory: reserveStock(orderId, product, qty)
    Inventory-->>Orchestrator: success

    Orchestrator->>Account: debit(orderId, userId, amount)
    alt 扣款成功
        Account-->>Orchestrator: success
        Orchestrator->>Order: confirmOrder(orderId)
        Order-->>Orchestrator: success
        Orchestrator-->>Client: 下单成功
    else 扣款失败
        Account-->>Orchestrator: fail
        Orchestrator->>Inventory: releaseStock(orderId, product, qty)
        Orchestrator->>Order: cancelOrder(orderId)
        Orchestrator-->>Client: 下单失败
    end

设计要点:补偿不是“反向执行”那么简单

很多人刚接触 Saga 时,会自然地认为:正向做什么,反向就撤销什么。真实系统里没这么简单。

1. 补偿必须幂等

补偿请求可能因为超时被重试多次。
如果“释放库存”执行两遍,库存就会被多加一次;如果“退款”执行两遍,就是真金白银的事故。

所以补偿接口必须做到:

  • 同一个 sagaId + step 只生效一次
  • 已完成的补偿再次调用时,直接返回成功
  • 落库时有唯一约束或状态机控制

2. 正向动作也要幂等

不要以为只有补偿要幂等。实际线上更常见的是:

  • 调用超时,但对方其实已经成功
  • 消息重复投递
  • 协调器故障恢复后重复推进步骤

因此每个本地事务都要支持“重复请求不重复执行”。

3. 空补偿与悬挂问题

这是面试和生产里都常见的经典坑。

  • 空补偿:补偿请求先到了,但正向事务根本没执行
  • 悬挂:补偿完成后,迟到的正向请求又执行了

解决思路通常是引入事务日志表,记录每一步状态:

  • INIT
  • DONE
  • COMPENSATED

正向执行前先检查是否已补偿;补偿执行前检查正向是否已完成。


状态机设计建议

stateDiagram-v2
    [*] --> INIT
    INIT --> ORDER_CREATED
    ORDER_CREATED --> STOCK_RESERVED
    STOCK_RESERVED --> ACCOUNT_DEBITED
    ACCOUNT_DEBITED --> CONFIRMED

    ORDER_CREATED --> CANCELLED: 库存失败
    STOCK_RESERVED --> COMPENSATING: 扣款失败
    COMPENSATING --> CANCELLED: 释放库存/取消订单完成

    CONFIRMED --> [*]
    CANCELLED --> [*]

状态机的意义不只是“好看”,而是方便你做:

  • 重试恢复
  • 死信回放
  • 人工介入
  • 对账修复

实战代码(可运行)

下面我用一个简化版 Python Flask 示例模拟编排式 Saga。
为了方便直接运行,我把“订单、库存、账户”都写在一个进程里,用内存数据结构模拟数据库。真实项目中,你可以把这些函数拆成独立服务,再把 HTTP 调用替换成 RPC 或消息。

运行方式

安装依赖:

pip install flask

保存为 app.py,执行:

python app.py

然后请求:

curl -X POST http://127.0.0.1:5000/create-order \
  -H "Content-Type: application/json" \
  -d '{"order_id":"o1001","user_id":"u1","product_id":"p1","qty":2,"amount":80}'

完整代码

from flask import Flask, request, jsonify
from threading import Lock

app = Flask(__name__)

db_lock = Lock()

# 模拟数据库
orders = {}
inventory = {"p1": 10}
accounts = {"u1": 100}

# 事务日志:用于幂等、补偿和排查
saga_logs = {}
step_logs = set()
compensate_logs = set()


def step_key(saga_id, step_name):
    return f"{saga_id}:{step_name}"


def mark_step_done(saga_id, step_name):
    step_logs.add(step_key(saga_id, step_name))


def is_step_done(saga_id, step_name):
    return step_key(saga_id, step_name) in step_logs


def mark_compensated(saga_id, step_name):
    compensate_logs.add(step_key(saga_id, step_name))


def is_compensated(saga_id, step_name):
    return step_key(saga_id, step_name) in compensate_logs


# ========== 订单服务 ==========
def create_order(saga_id, order_id, user_id, product_id, qty, amount):
    with db_lock:
        if is_compensated(saga_id, "create_order"):
            return False, "order step already compensated"

        if is_step_done(saga_id, "create_order"):
            return True, "idempotent success"

        orders[order_id] = {
            "order_id": order_id,
            "user_id": user_id,
            "product_id": product_id,
            "qty": qty,
            "amount": amount,
            "status": "PENDING"
        }
        mark_step_done(saga_id, "create_order")
        return True, "order created"


def confirm_order(saga_id, order_id):
    with db_lock:
        if is_step_done(saga_id, "confirm_order"):
            return True, "idempotent success"

        order = orders.get(order_id)
        if not order:
            return False, "order not found"
        if order["status"] == "CANCELLED":
            return False, "order already cancelled"

        order["status"] = "CONFIRMED"
        mark_step_done(saga_id, "confirm_order")
        return True, "order confirmed"


def cancel_order(saga_id, order_id):
    with db_lock:
        if is_compensated(saga_id, "create_order"):
            return True, "idempotent compensated"

        order = orders.get(order_id)
        if not order:
            mark_compensated(saga_id, "create_order")
            return True, "empty compensation"

        order["status"] = "CANCELLED"
        mark_compensated(saga_id, "create_order")
        return True, "order cancelled"


# ========== 库存服务 ==========
def reserve_stock(saga_id, product_id, qty):
    with db_lock:
        if is_compensated(saga_id, "reserve_stock"):
            return False, "stock step already compensated"

        if is_step_done(saga_id, "reserve_stock"):
            return True, "idempotent success"

        current = inventory.get(product_id, 0)
        if current < qty:
            return False, "not enough stock"

        inventory[product_id] = current - qty
        mark_step_done(saga_id, "reserve_stock")
        return True, "stock reserved"


def release_stock(saga_id, product_id, qty):
    with db_lock:
        if is_compensated(saga_id, "reserve_stock"):
            return True, "idempotent compensated"

        if not is_step_done(saga_id, "reserve_stock"):
            mark_compensated(saga_id, "reserve_stock")
            return True, "empty compensation"

        inventory[product_id] = inventory.get(product_id, 0) + qty
        mark_compensated(saga_id, "reserve_stock")
        return True, "stock released"


# ========== 账户服务 ==========
def debit_account(saga_id, user_id, amount):
    with db_lock:
        if is_compensated(saga_id, "debit_account"):
            return False, "account step already compensated"

        if is_step_done(saga_id, "debit_account"):
            return True, "idempotent success"

        current = accounts.get(user_id, 0)
        if current < amount:
            return False, "not enough balance"

        accounts[user_id] = current - amount
        mark_step_done(saga_id, "debit_account")
        return True, "account debited"


def refund_account(saga_id, user_id, amount):
    with db_lock:
        if is_compensated(saga_id, "debit_account"):
            return True, "idempotent compensated"

        if not is_step_done(saga_id, "debit_account"):
            mark_compensated(saga_id, "debit_account")
            return True, "empty compensation"

        accounts[user_id] = accounts.get(user_id, 0) + amount
        mark_compensated(saga_id, "debit_account")
        return True, "account refunded"


# ========== Saga 协调器 ==========
def execute_saga(order_id, user_id, product_id, qty, amount):
    saga_id = f"saga-{order_id}"
    saga_logs[saga_id] = {"status": "RUNNING", "steps": []}

    ok, msg = create_order(saga_id, order_id, user_id, product_id, qty, amount)
    saga_logs[saga_id]["steps"].append({"step": "create_order", "ok": ok, "msg": msg})
    if not ok:
        saga_logs[saga_id]["status"] = "FAILED"
        return False, saga_id, "create order failed"

    ok, msg = reserve_stock(saga_id, product_id, qty)
    saga_logs[saga_id]["steps"].append({"step": "reserve_stock", "ok": ok, "msg": msg})
    if not ok:
        cancel_order(saga_id, order_id)
        saga_logs[saga_id]["status"] = "COMPENSATED"
        return False, saga_id, "reserve stock failed"

    ok, msg = debit_account(saga_id, user_id, amount)
    saga_logs[saga_id]["steps"].append({"step": "debit_account", "ok": ok, "msg": msg})
    if not ok:
        release_stock(saga_id, product_id, qty)
        cancel_order(saga_id, order_id)
        saga_logs[saga_id]["status"] = "COMPENSATED"
        return False, saga_id, "debit account failed"

    ok, msg = confirm_order(saga_id, order_id)
    saga_logs[saga_id]["steps"].append({"step": "confirm_order", "ok": ok, "msg": msg})
    if not ok:
        # 这里不直接退款,而是标记异常,交给重试/人工修复
        saga_logs[saga_id]["status"] = "UNKNOWN"
        return False, saga_id, "confirm order failed after debit"

    saga_logs[saga_id]["status"] = "SUCCESS"
    return True, saga_id, "success"


@app.route("/create-order", methods=["POST"])
def create_order_api():
    data = request.json
    ok, saga_id, msg = execute_saga(
        data["order_id"],
        data["user_id"],
        data["product_id"],
        int(data["qty"]),
        int(data["amount"])
    )
    return jsonify({
        "success": ok,
        "saga_id": saga_id,
        "message": msg,
        "orders": orders,
        "inventory": inventory,
        "accounts": accounts
    })


@app.route("/saga/<saga_id>", methods=["GET"])
def query_saga(saga_id):
    return jsonify(saga_logs.get(saga_id, {}))


if __name__ == "__main__":
    app.run(debug=True)

这段代码里,真正值得学的点是什么

上面代码虽然简化,但有几个设计点是实战里非常关键的。

1. 订单先建成 PENDING

不要一上来就把订单写成“已成功”。
正确姿势通常是:

  • 初始状态:PENDING
  • 全部步骤完成后:CONFIRMED
  • 中途失败:CANCELLEDFAILED

这样做的好处是:

  • 前端和运营看得懂状态
  • 后台可以继续修复
  • 对账时更容易定位“卡在哪一步”

2. 扣款后确认订单失败,不应草率全补偿

这是很多人容易写错的地方。
如果账户已经扣了,订单确认接口只是因为网络抖动失败,你立刻触发退款,可能会造成“双向震荡”:实际订单确认成功了,但你又退款了。

更稳妥的做法是:

  • 标记 Saga 为 UNKNOWN
  • 进入重试队列
  • 基于业务主键查询下游真实状态
  • 必要时人工介入

也就是说:不是所有失败都应该立即补偿

3. 空补偿处理了“补偿先到”的情况

release_stock() 里先判断正向步骤是否完成,就是为了避免补偿请求先到时出错。
这在消息乱序、超时重试场景里非常常见。


常见坑与排查

这一节我尽量写得“像线上排障”,因为真正折磨人的地方都在这。

坑 1:补偿成功了,但主流程状态没更新

现象:

  • 库存已经释放
  • 订单也取消了
  • 但 Saga 记录还是 RUNNING

后果:

  • 定时任务重复补偿
  • 监控误报
  • 运营误以为交易还在处理中

排查重点:

  1. 协调器是否在补偿后落状态
  2. 状态更新和补偿记录是否原子
  3. 是否发生补偿成功但日志写失败

建议:

  • 补偿结果必须持久化
  • Saga 流程表和步骤表分开存
  • RUNNING 超时单据做扫描修复

坑 2:重试导致重复扣款/重复释放

现象:

  • 某一步接口超时
  • 调用方重试
  • 服务端没有幂等控制
  • 最终账户多扣一次,或者库存多返一次

排查重点:

  • 是否有全局唯一 sagaId / requestId
  • 下游服务是否用业务主键做唯一约束
  • 幂等校验是在业务前还是业务后

建议:

  • 所有正向与补偿接口都接收 sagaId
  • 在本地事务中先写幂等记录,再执行业务
  • 返回“已处理成功”而不是报重复错误

坑 3:消息投递成功,但本地事务没提交

这是“本地事务 + MQ”里最经典的问题。

现象:

  • 订单表没有记录
  • 但库存服务收到事件并执行了冻结

原因通常是:

  • 业务数据和消息发送不在同一个原子单元
  • 先发消息,后写数据库;中间宕机了

解决方法:

  • 使用 Outbox Pattern(事务消息外盒模式)
  • 本地事务中同时写业务表和 outbox 表
  • 再由异步投递器把 outbox 事件发往 MQ

坑 4:补偿本身失败

这个坑往往被低估。很多团队只设计“主事务失败怎么办”,没设计“补偿失败怎么办”。

现象:

  • 扣款失败后准备释放库存
  • 释放库存接口也超时或报错
  • 结果事务卡在一半

建议:

  • 补偿也要有重试策略
  • 补偿失败进入死信队列
  • 为关键补偿动作提供后台手工重放入口

一个更稳的工程化落地方式

实际项目中,我更推荐把 Saga 的数据结构拆成两张核心表。

Saga 主表

CREATE TABLE saga_instance (
    saga_id VARCHAR(64) PRIMARY KEY,
    business_key VARCHAR(64) NOT NULL,
    status VARCHAR(32) NOT NULL,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL
);

Saga 步骤表

CREATE TABLE saga_step (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    saga_id VARCHAR(64) NOT NULL,
    step_name VARCHAR(64) NOT NULL,
    step_status VARCHAR(32) NOT NULL,
    retry_count INT NOT NULL DEFAULT 0,
    payload TEXT,
    updated_at TIMESTAMP NOT NULL,
    UNIQUE KEY uk_saga_step (saga_id, step_name)
);

这样做的价值很明显:

  • 每一步可追踪
  • 重试次数可控
  • 支持超时扫描
  • 方便做可视化页面

安全/性能最佳实践

分布式事务经常只谈一致性,但线上真正稳定,还得把安全和性能一起考虑。

安全最佳实践

1. 补偿接口不能裸奔

补偿接口如果被误调用,后果会很严重。
比如 /refund/release-stock 这种接口,必须做:

  • 服务间身份认证
  • 请求签名或 mTLS
  • 权限白名单
  • 操作审计日志

2. 防止伪造请求与重复请求

建议每次 Saga 请求带上:

  • sagaId
  • requestId
  • timestamp
  • signature

这样不仅能防重放,也方便跨服务追踪。

3. 敏感字段脱敏

Saga 日志里很容易记录原始请求。
如果里面带了手机号、身份证、支付信息,一旦日志平台权限没控好,就是合规问题。

建议:

  • 日志只保留必要字段
  • 对敏感数据脱敏或加密
  • trace 中避免打印完整支付参数

性能最佳实践

1. 避免长链路串行阻塞

不是所有步骤都必须串行。
比如“发优惠券”“发通知”这类非核心动作,最好从主 Saga 链路中拆出去,做异步最终处理。

2. 设置合理的超时与重试

我见过不少系统因为默认超时太长,导致一个失败单据卡几十秒,线程池被拖垮。

建议:

  • 同步调用超时要短
  • 重试次数有限制
  • 采用指数退避
  • 对下游失败做熔断和降级

3. 对高频步骤做容量估算

以库存冻结为例,估算至少要看:

  • 峰值 QPS
  • 平均重试次数
  • 补偿比例
  • 热点商品集中度

一个简单估算方式:

实际写请求量 ≈ 订单QPS × (正向步骤数 + 平均补偿步骤数 + 平均重试次数)

如果订单峰值是 2000 QPS,平均每单 3 个步骤,失败补偿平均 0.5 步,重试 0.3 次,那么底层事务处理量大致就是:

2000 × (3 + 0.5 + 0.3) = 7600 次操作/秒

这还是平均值,热点时段往往更高。

4. 用异步恢复代替阻塞等待

对于“结果不确定”的场景,不要让接口一直等。
更好的方式是:

  • 先返回“处理中”
  • 后台定时任务扫描
  • 通过事件或回调通知最终结果

这能显著提升整体吞吐和用户体验。


一致性治理:别只盯着“事务”,要盯“全链路可恢复”

做微服务分布式事务,真正成熟的团队关注的不只是某个 Saga 成没成功,而是系统是否具备持续纠偏能力

我通常会把一致性治理分成四层:

1. 预防层

目标是尽量少出错:

  • 接口幂等
  • 唯一业务键
  • 明确状态机
  • 超时、熔断、限流

2. 检测层

目标是尽快发现“不一致”:

  • Saga 超时监控
  • 补偿失败告警
  • 状态卡住告警
  • 正反向金额、库存对账

3. 恢复层

目标是自动修复:

  • 重试队列
  • 死信队列回放
  • 定时补偿任务
  • 状态对齐任务

4. 兜底层

目标是人工可干预:

  • 管理后台查看 Saga 全链路
  • 支持单步重放
  • 支持人工确认/关闭/补偿
  • 全量审计日志留痕

如果只做了前两层,没有恢复和兜底,系统表面自动化,实际一出问题就只能查库改表,非常危险。


实战建议:从“能跑”到“可治理”的升级路线

如果你的团队现在还没有成熟的分布式事务框架,我建议按下面路线迭代,而不是一步到位搞得很复杂。

第一阶段:先把业务流程拆清楚

先回答三个问题:

  1. 哪些步骤是正向动作?
  2. 每个步骤的补偿动作是什么?
  3. 哪些失败需要立即补偿,哪些应该先重试确认?

第二阶段:补上幂等和状态机

至少保证:

  • 每个步骤可重复调用
  • 每个补偿可重复调用
  • 主流程状态可追踪
  • 异常状态不会“凭空消失”

第三阶段:加入事务日志和超时扫描

做到:

  • Saga 主表 + 步骤表
  • 超时任务自动扫描
  • 失败进入重试或死信

第四阶段:做可观测性和人工兜底

至少要有:

  • traceId / sagaId 全链路串联
  • 指标监控
  • 告警
  • 可视化后台

这是我比较认可的落地顺序:先保证正确,再保证可恢复,最后再做优雅。


总结

Saga 的核心,不是“把分布式事务做成像本地事务一样”,而是接受分布式世界的现实:

  • 网络会失败
  • 请求会重复
  • 消息会乱序
  • 状态会短暂不一致

真正靠谱的做法是:

  1. 本地事务 + 状态机拆分长流程
  2. 补偿机制处理业务逆向修正
  3. 幂等、日志、重试、对账保障最终一致
  4. 监控与人工兜底确保系统可治理

如果你只记住一句话,我建议记这句:

在微服务里,分布式事务不是“绝不出错”,而是“出错后仍能有序恢复”。

对于订单、库存、账户这类典型场景,Saga 是非常实用的方案;但遇到高价值资金强一致场景,仍要结合 TCC、冻结模型、账务对账等更严格的机制,不要硬套。

把 Saga 用好,关键不在“选了这个模式”,而在于你有没有把补偿语义、幂等约束、异常治理、人工兜底真正设计进去。只有这样,它才不是 PPT 上的架构图,而是能扛住线上故障的工程方案。


分享到:

上一篇
《大模型推理性能优化实战:从 KV Cache、量化到批处理调度的工程落地指南》
下一篇
《从单体到集群:面向中级工程师的高可用服务拆分与流量治理实战》