跳转到内容
123xiao | 无名键客

《微服务架构中的分布式事务实战:基于 Saga 模式的订单系统一致性设计与落地》

字数: 0 阅读时长: 1 分钟

背景与问题

只要系统一拆成微服务,事务这件事就没法再靠单库 BEGIN/COMMIT/ROLLBACK 糊过去了。
订单系统尤其典型:用户下单时,往往要同时做几件事:

  • 订单服务创建订单
  • 库存服务扣减库存
  • 支付服务冻结或扣款
  • 营销服务锁定优惠券
  • 物流服务预创建运单

在单体架构里,这些操作可能都在一个数据库事务里完成;但到了微服务架构中,每个服务都有自己的数据库,天然就变成了跨服务、跨库的一致性问题

很多团队最开始会问:能不能上 2PC/XA?
理论上可以,实践里通常不推荐,原因很现实:

  • 微服务异构严重,XA 支持并不统一
  • 锁资源时间长,吞吐量差
  • 一旦协调者或参与者异常,恢复复杂
  • 对可用性不友好,不适合高并发订单链路

所以,订单这类场景更常用的是最终一致性方案,而 Saga 就是其中最经典、最落地的一种。

这篇文章我会从一个“下单”主链路出发,带你完整走一遍:

  1. Saga 为什么适合订单系统
  2. 编排式 Saga 如何设计状态流转
  3. 怎么写出可运行代码
  4. 出问题时怎么排查
  5. 落地时安全和性能上要注意什么

方案对比与取舍分析

在进入 Saga 之前,先把常见分布式事务方案摆清楚,避免“为了用技术而用技术”。

方案一致性性能实现复杂度适用场景
2PC/XA强一致少量核心金融场景、同构系统
TCC最终一致很高对一致性要求高、业务可预留 Try/Confirm/Cancel
本地消息表 + MQ最终一致异步驱动型流程
Saga最终一致长事务、订单/履约/营销等链路

Saga 的优势在于:

  • 每一步都是本地事务,性能比 XA 更友好
  • 补偿逻辑贴近业务语义,适合订单类长链路
  • 可按服务职责拆分,不强依赖统一事务框架

Saga 的代价也很明确:

  • 不是强一致,存在短暂不一致窗口
  • 补偿逻辑不等于数据库回滚,要认真设计
  • 幂等、重试、乱序、悬挂问题都得自己扛

一句话总结:
如果你的订单系统追求高可用、高吞吐,并且业务允许“短时间最终一致”,Saga 往往是比 XA 更现实的选择。


核心原理

Saga 的思想其实不复杂:
把一个大事务拆成多个本地事务,每个本地事务成功后继续往下走;一旦某一步失败,就按相反顺序执行已经完成步骤的补偿动作

比如下单流程:

  1. 创建订单
  2. 扣减库存
  3. 冻结余额
  4. 确认订单

如果第 3 步冻结余额失败:

  • 补偿第 2 步:恢复库存
  • 补偿第 1 步:取消订单

这里要注意一个关键点:
补偿不是技术回滚,而是业务回滚。

例如:

  • “创建订单”的补偿不是删除数据,而是把状态改成 CANCELLED
  • “扣减库存”的补偿不是回到过去,而是新增一次库存回补
  • “冻结余额”的补偿是解除冻结,而不是硬改流水

这决定了 Saga 设计必须面向业务状态机,而不是只盯着数据库操作。

编排式 vs 协同式

Saga 常见有两种实现方式:

  • 编排式(Orchestration):由一个协调者统一驱动流程
  • 协同式(Choreography):各服务通过事件自发协作

对于订单系统主链路,我更建议先用编排式。原因很简单:

  • 流程一眼能看清
  • 错误处理集中
  • 状态更容易追踪
  • 新团队更容易维护

后面代码示例也采用编排式。

订单 Saga 状态流转

stateDiagram-v2
    [*] --> PENDING
    PENDING --> INVENTORY_RESERVED: 库存锁定成功
    INVENTORY_RESERVED --> PAYMENT_FROZEN: 支付冻结成功
    PAYMENT_FROZEN --> CONFIRMED: 订单确认
    INVENTORY_RESERVED --> CANCELLED: 支付失败后补偿
    PAYMENT_FROZEN --> CANCELLED: 确认失败后补偿
    PENDING --> CANCELLED: 创建后直接取消
    CONFIRMED --> [*]
    CANCELLED --> [*]

时序示意

sequenceDiagram
    participant U as 用户
    participant O as 订单服务
    participant S as Saga协调器
    participant I as 库存服务
    participant P as 支付服务

    U->>O: 提交订单
    O->>S: 发起下单Saga
    S->>O: 创建订单
    O-->>S: 成功
    S->>I: 锁定库存
    I-->>S: 成功
    S->>P: 冻结余额
    alt 支付成功
        P-->>S: 成功
        S->>O: 确认订单
        O-->>S: 成功
    else 支付失败
        P-->>S: 失败
        S->>I: 补偿-释放库存
        I-->>S: 成功
        S->>O: 补偿-取消订单
        O-->>S: 成功
    end

订单系统一致性设计

下面给一个足够贴近实战的设计思路。我们先约束几个原则:

1. 一切步骤都要可重试

在分布式环境里,超时不等于失败。
比如支付服务其实已经冻结成功了,但响应超时了。此时协调器如果直接判定失败并补偿,就可能出现“冻结成功 + 又解冻一次”的复杂情况。

所以每个动作都必须支持:

  • 幂等执行
  • 重复查询
  • 补偿重试

2. 业务状态必须显式落库

不要只靠日志推断 Saga 进度。至少要有一张 saga_logorder_saga 表,记录:

  • saga_id
  • order_id
  • 当前步骤
  • 执行状态
  • 补偿状态
  • 最近错误信息
  • 更新时间

这样出问题时,才知道订单卡在哪一步。

3. 先“预留/冻结”,再“确认”

订单场景里,库存和余额都不应该直接永久扣减,而应分成两阶段语义:

  • 锁库存 / 释放库存 / 确认扣减
  • 冻结余额 / 解冻余额 / 确认扣款

这虽然不是标准 TCC,但借鉴了它的思想,会显著降低补偿难度。


实战代码(可运行)

下面我用 Python 做一个最小可运行示例。
它不是生产级框架,但足够把 Saga 的关键逻辑讲透:状态推进、失败补偿、幂等保护。

目录结构

saga_order_demo/
├── app.py
└── requirements.txt

requirements.txt

flask==2.3.3

app.py

from flask import Flask, request, jsonify
from threading import Lock
import uuid

app = Flask(__name__)

# 模拟数据库
db_lock = Lock()

orders = {}
inventory = {
    "sku-1": {"available": 10, "reserved": 0}
}
accounts = {
    "user-1": {"balance": 1000, "frozen": 0}
}
sagas = {}
idempotency_records = set()


def make_response(success=True, message="", data=None):
    return jsonify({
        "success": success,
        "message": message,
        "data": data or {}
    })


def idempotent_key(service, action, business_id):
    return f"{service}:{action}:{business_id}"


# -----------------------------
# 订单服务
# -----------------------------
def create_order(order_id, user_id, sku_id, amount):
    with db_lock:
        key = idempotent_key("order", "create", order_id)
        if key in idempotency_records:
            return True

        orders[order_id] = {
            "order_id": order_id,
            "user_id": user_id,
            "sku_id": sku_id,
            "amount": amount,
            "status": "PENDING"
        }
        idempotency_records.add(key)
        return True


def confirm_order(order_id):
    with db_lock:
        key = idempotent_key("order", "confirm", order_id)
        if key in idempotency_records:
            return True

        order = orders.get(order_id)
        if not order:
            raise Exception("order not found")

        if order["status"] == "CANCELLED":
            raise Exception("order already cancelled")

        order["status"] = "CONFIRMED"
        idempotency_records.add(key)
        return True


def cancel_order(order_id):
    with db_lock:
        key = idempotent_key("order", "cancel", order_id)
        if key in idempotency_records:
            return True

        order = orders.get(order_id)
        if not order:
            return True  # 补偿幂等:订单不存在也视为成功

        if order["status"] == "CONFIRMED":
            raise Exception("confirmed order cannot be cancelled directly")

        order["status"] = "CANCELLED"
        idempotency_records.add(key)
        return True


# -----------------------------
# 库存服务
# -----------------------------
def reserve_inventory(order_id, sku_id, qty):
    with db_lock:
        key = idempotent_key("inventory", "reserve", order_id)
        if key in idempotency_records:
            return True

        sku = inventory.get(sku_id)
        if not sku:
            raise Exception("sku not found")

        if sku["available"] < qty:
            raise Exception("inventory not enough")

        sku["available"] -= qty
        sku["reserved"] += qty
        idempotency_records.add(key)
        return True


def release_inventory(order_id, sku_id, qty):
    with db_lock:
        key = idempotent_key("inventory", "release", order_id)
        if key in idempotency_records:
            return True

        sku = inventory.get(sku_id)
        if not sku:
            return True

        if sku["reserved"] >= qty:
            sku["reserved"] -= qty
            sku["available"] += qty

        idempotency_records.add(key)
        return True


# -----------------------------
# 支付服务
# -----------------------------
def freeze_balance(order_id, user_id, amount, fail=False):
    with db_lock:
        key = idempotent_key("payment", "freeze", order_id)
        if key in idempotency_records:
            return True

        account = accounts.get(user_id)
        if not account:
            raise Exception("account not found")

        if fail:
            raise Exception("mock payment failure")

        if account["balance"] < amount:
            raise Exception("balance not enough")

        account["balance"] -= amount
        account["frozen"] += amount
        idempotency_records.add(key)
        return True


def unfreeze_balance(order_id, user_id, amount):
    with db_lock:
        key = idempotent_key("payment", "unfreeze", order_id)
        if key in idempotency_records:
            return True

        account = accounts.get(user_id)
        if not account:
            return True

        if account["frozen"] >= amount:
            account["frozen"] -= amount
            account["balance"] += amount

        idempotency_records.add(key)
        return True


# -----------------------------
# Saga 协调器
# -----------------------------
def update_saga(saga_id, data):
    with db_lock:
        sagas.setdefault(saga_id, {})
        sagas[saga_id].update(data)


def execute_order_saga(user_id, sku_id, qty, amount, mock_payment_fail=False):
    order_id = str(uuid.uuid4())
    saga_id = str(uuid.uuid4())

    update_saga(saga_id, {
        "saga_id": saga_id,
        "order_id": order_id,
        "status": "RUNNING",
        "step": "INIT",
        "error": ""
    })

    completed_steps = []

    try:
        create_order(order_id, user_id, sku_id, amount)
        completed_steps.append("create_order")
        update_saga(saga_id, {"step": "ORDER_CREATED"})

        reserve_inventory(order_id, sku_id, qty)
        completed_steps.append("reserve_inventory")
        update_saga(saga_id, {"step": "INVENTORY_RESERVED"})

        freeze_balance(order_id, user_id, amount, fail=mock_payment_fail)
        completed_steps.append("freeze_balance")
        update_saga(saga_id, {"step": "PAYMENT_FROZEN"})

        confirm_order(order_id)
        completed_steps.append("confirm_order")
        update_saga(saga_id, {"step": "COMPLETED", "status": "SUCCESS"})

        return {
            "saga_id": saga_id,
            "order_id": order_id,
            "status": "SUCCESS"
        }

    except Exception as e:
        update_saga(saga_id, {
            "status": "COMPENSATING",
            "error": str(e)
        })

        # 逆序补偿
        if "freeze_balance" in completed_steps:
            unfreeze_balance(order_id, user_id, amount)

        if "reserve_inventory" in completed_steps:
            release_inventory(order_id, sku_id, qty)

        if "create_order" in completed_steps:
            cancel_order(order_id)

        update_saga(saga_id, {
            "status": "FAILED",
            "step": "COMPENSATED"
        })

        return {
            "saga_id": saga_id,
            "order_id": order_id,
            "status": "FAILED",
            "error": str(e)
        }


# -----------------------------
# HTTP 接口
# -----------------------------
@app.route("/orders", methods=["POST"])
def create_order_api():
    body = request.json or {}
    user_id = body.get("user_id", "user-1")
    sku_id = body.get("sku_id", "sku-1")
    qty = int(body.get("qty", 1))
    amount = int(body.get("amount", 100))
    mock_payment_fail = bool(body.get("mock_payment_fail", False))

    result = execute_order_saga(user_id, sku_id, qty, amount, mock_payment_fail)
    return make_response(True, "ok", result)


@app.route("/state", methods=["GET"])
def state_api():
    return make_response(True, "ok", {
        "orders": orders,
        "inventory": inventory,
        "accounts": accounts,
        "sagas": sagas
    })


if __name__ == "__main__":
    app.run(debug=True)

启动方式

pip install -r requirements.txt
python app.py

成功下单测试

curl -X POST http://127.0.0.1:5000/orders \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "user-1",
    "sku_id": "sku-1",
    "qty": 1,
    "amount": 100
  }'

模拟支付失败触发补偿

curl -X POST http://127.0.0.1:5000/orders \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "user-1",
    "sku_id": "sku-1",
    "qty": 1,
    "amount": 100,
    "mock_payment_fail": true
  }'

查看系统状态

curl http://127.0.0.1:5000/state

代码设计说明

上面的代码重点不在“框架多高级”,而在这些实战原则。

1. 每一步都有独立本地事务语义

例如:

  • create_order
  • reserve_inventory
  • freeze_balance

这些动作彼此独立,符合 Saga 对“可拆分本地事务”的要求。

2. 补偿按逆序执行

补偿顺序必须和执行顺序相反。
因为后置动作往往依赖前置动作,逆序补偿更符合依赖关系。

3. 用幂等键防止重复执行

例如:

idempotent_key("payment", "freeze", order_id)

这意味着同一个订单的冻结动作,即使由于超时或重试被重复调用,也不会重复扣减余额。

4. Saga 过程被显式记录

sagas 在示例中用内存模拟,生产环境应该落库。
这一点特别重要,因为线上排查问题时,最怕的是:

  • 日志很多
  • 但不知道订单最终卡在哪个步骤

5. 补偿成功不代表业务绝对无痕

这也是很多人第一次接触 Saga 最容易误解的地方。
Saga 的目标是把系统带回可接受的一致状态,而不是像单机事务那样“什么都没发生过”。


容量估算与架构落地建议

订单系统做 Saga,除了逻辑正确,还要考虑规模问题。

一次下单会放大多少请求?

假设一笔订单主链路有 4 个步骤:

  1. 创建订单
  2. 锁库存
  3. 冻结支付
  4. 确认订单

正常成功时,大约是 4 次服务调用。
如果失败并触发补偿,可能会再增加 2~3 次调用。

也就是说:

  • 成功路径:1 笔订单 ≈ 4 次远程调用
  • 失败路径:1 笔订单 ≈ 6~7 次远程调用

如果峰值 QPS 为 2000,链路调用量可能轻松上万 QPS。
这会直接影响:

  • 服务线程池配置
  • 连接池大小
  • MQ 分区数(如果异步化)
  • Saga 状态表写入压力

推荐的生产架构

flowchart LR
    A[API网关] --> B[订单服务]
    B --> C[Saga协调器]
    C --> D[库存服务]
    C --> E[支付服务]
    C --> F[营销服务]
    C --> G[(Saga状态表)]
    D --> H[(库存库)]
    E --> I[(账户库)]
    F --> J[(优惠券库)]
    B --> K[(订单库)]

比较稳妥的落地方式是:

  • 主链路同步编排
    • 订单创建、库存锁定、余额冻结放在同步主链路中
  • 次级动作异步解耦
    • 发短信、推送、积分发放等通过 MQ 异步处理
  • 状态集中管理
    • 用 Saga 状态表或工作流引擎记录流转

如果团队规模较大,后续也可以引入:

  • Temporal / Cadence
  • Seata Saga
  • 自研轻量工作流引擎

但别一开始就上太重。
很多订单系统真正缺的不是“工作流平台”,而是明确的状态模型和补偿边界


常见坑与排查

这一部分我想讲得接地气一点,因为 Saga 的坑基本都不是“语法错了”,而是线上才会疼。

坑 1:把补偿当成数据库回滚

现象:

  • 订单创建后失败,直接物理删除订单
  • 支付扣减后失败,直接回写余额
  • 库存扣减后失败,硬改库存数量

问题:

这样会破坏审计链路,账务和库存流水会很难对齐。

正确做法:

  • 订单改状态,不删除
  • 支付走冻结/解冻、扣款流水
  • 库存走预留/释放、确认扣减

坑 2:没有幂等,重试一次就脏数据

现象:

  • 库存重复扣减
  • 余额重复冻结
  • 补偿重复执行导致负库存或负冻结金额

排查方式:

先查同一个 order_id 是否存在多次相同步骤调用:

SELECT order_id, step_name, COUNT(*)
FROM saga_log
GROUP BY order_id, step_name
HAVING COUNT(*) > 1;

建议:

  • 每个动作必须有幂等键
  • 幂等记录与业务事务尽量同库提交
  • 补偿动作也必须幂等

坑 3:超时误判,实际成功却被当失败

这是我实际项目里踩过的坑。
支付服务 300ms 已经完成冻结,但网络抖动导致调用方 1s 超时,于是协调器开始补偿。结果就是:

  • 支付已冻结
  • 订单已取消
  • 又执行了解冻
  • 状态非常绕

排查重点:

  • 先看下游服务实际业务日志,而不是只看调用方超时日志
  • 检查 traceId / orderId 是否能串联跨服务日志
  • 区分“调用超时”和“业务失败”

改进建议:

  • 超时后优先查状态,再决定是否补偿
  • 关键步骤提供查询接口:queryByBusinessId(orderId)
  • 不要只靠 HTTP 返回码判断业务结果

坑 4:补偿失败后没人管

现象:

  • 订单取消了,但库存没释放
  • 库存释放了,但支付冻结没解开
  • Saga 状态停在 FAILED,没人继续修

建议做法:

增加一个补偿重试任务

  • 扫描状态为 COMPENSATINGFAILED_RETRYABLE 的记录
  • 根据最近错误信息和步骤进行重试
  • 超过阈值进入人工介入队列

示例 SQL:

SELECT saga_id, order_id, step, error_msg
FROM saga_transaction
WHERE status IN ('COMPENSATING', 'FAILED_RETRYABLE')
  AND next_retry_time <= NOW()
ORDER BY updated_at ASC
LIMIT 100;

坑 5:把所有动作都串成同步调用

现象:

  • 下单 RT 飙升
  • 链路很长,一个服务抖动整个下单全受影响
  • 高峰期线程池被占满

建议:

核心链路只保留必须同步确认的动作:

  • 订单创建
  • 库存锁定
  • 支付冻结

可异步的动作尽量异步:

  • 发送通知
  • 发放积分
  • 记录行为日志
  • 推荐刷新

安全/性能最佳实践

安全最佳实践

1. 接口幂等令牌不能省

下单接口本身也要幂等。
用户重复点击、客户端重试、网关重放,都可能造成重复创建 Saga。

常见做法:

  • 前端提交唯一请求号
  • 网关传递 Idempotency-Key
  • 服务端按用户 + 业务键做去重

2. 补偿接口要做权限隔离

补偿不是普通业务接口,最好不要暴露成任何人都能调用的开放 API。建议:

  • 仅内网访问
  • 服务间鉴权
  • 白名单或 mTLS
  • 操作审计日志完整保留

3. 敏感字段不要散落日志

订单链路常常带有:

  • 用户标识
  • 支付流水号
  • 收货信息
  • 优惠券码

日志里不要直接打印完整敏感信息,至少脱敏处理。


性能最佳实践

1. 本地事务越短越好

每个 Saga 步骤虽然是本地事务,但如果本地事务里做了太多事,比如:

  • 复杂 SQL
  • 远程调用
  • 大对象序列化

就会拉长持锁时间,吞吐量很快掉下来。

原则是:
本地事务内只做必要的数据更新,不做额外重活。

2. 关键状态表要有索引

如果你有 saga_transaction 表,至少要考虑这些索引:

CREATE TABLE saga_transaction (
    saga_id VARCHAR(64) PRIMARY KEY,
    order_id VARCHAR(64) NOT NULL,
    status VARCHAR(32) NOT NULL,
    step VARCHAR(64) NOT NULL,
    error_msg VARCHAR(255),
    next_retry_time DATETIME,
    updated_at DATETIME NOT NULL
);

CREATE INDEX idx_order_id ON saga_transaction(order_id);
CREATE INDEX idx_status_retry_time ON saga_transaction(status, next_retry_time);
CREATE INDEX idx_updated_at ON saga_transaction(updated_at);

3. 给补偿任务设置退避重试

失败就立刻重试,往往会把故障放大。
建议使用指数退避:

  • 第 1 次:10 秒
  • 第 2 次:30 秒
  • 第 3 次:1 分钟
  • 第 4 次:5 分钟

并限制最大重试次数。

4. 指标要可观测

至少监控这些指标:

  • Saga 成功率
  • Saga 平均耗时 / P95
  • 每一步成功率
  • 补偿触发率
  • 补偿成功率
  • 卡单数
  • 重试队列积压数

如果没有这些指标,系统“看起来能跑”,但一到大促就很危险。


一套更稳妥的落地清单

如果你准备在订单系统里真正上 Saga,我建议按下面顺序推进:

  1. 先定义订单状态机
    • PENDING / RESERVED / FROZEN / CONFIRMED / CANCELLED
  2. 再定义每个步骤的补偿语义
    • 不是技术回滚,而是业务补偿
  3. 确保每个动作可幂等
    • 执行幂等、补偿幂等、查询幂等
  4. 建设 Saga 状态表
    • 不要只靠日志
  5. 补偿任务和人工介入机制必须先有
    • 不然失败单子会越积越多
  6. 最后再考虑框架化
    • 先跑通一条链路,再抽象成平台能力

这是一个很现实的经验:
不要一开始就追求“最优雅的分布式事务框架”,先保证订单失败时能找得到、补得回来、人工兜得住。


总结

Saga 适合解决微服务订单系统中的长事务一致性问题,它的核心不是“模拟强一致”,而是:

  • 把大事务拆成多个本地事务
  • 失败时执行业务补偿
  • 通过状态机、幂等、重试、可观测性实现最终一致

落地时最关键的几点,我建议你记住:

  1. 补偿要有业务语义,不要等同数据库回滚
  2. 每个步骤和补偿步骤都必须幂等
  3. 超时后先查状态,再决定是否补偿
  4. Saga 过程必须显式记录,支持重试和人工接管
  5. 主链路做减法,只同步处理必须同步的动作

如果你的订单系统具备以下特征:

  • 服务已拆分
  • 下单链路涉及库存、支付、营销等多个服务
  • 接受短暂不一致,但必须最终收敛
  • 对吞吐和可用性要求高

那么 Saga 基本就是一条值得优先考虑的路。

当然,它不是银弹。
对于“资金一分不能错、且无法接受短暂不一致”的绝对强一致场景,Saga 仍然有边界。
但对绝大多数互联网订单系统来说,只要状态模型设计清楚、补偿策略完善,Saga 往往是工程上最平衡、最能落地的方案。


分享到:

上一篇
《从 Prompt 到 Pipeline:中级开发者实战构建可迭代优化的 AI 应用工作流》
下一篇
《从抓包到还原签名:中级开发者实战 Web 逆向中的前端加密参数分析与自动化复现》