跳转到内容
123xiao | 无名键客

《微服务架构中分布式事务的一致性落地:基于 Saga 模式的设计与实践》

字数: 0 阅读时长: 1 分钟

微服务架构中分布式事务的一致性落地:基于 Saga 模式的设计与实践

在单体时代,事务这件事通常没那么“刺激”:一个数据库连接、一个本地事务,提交或回滚,世界就安静了。
但到了微服务架构里,订单、库存、支付、账户、优惠券都拆成了独立服务,每个服务各管各的库,这时候“下单成功但库存没扣”“库存扣了但支付失败”“退款回来了但订单状态没更新”这类问题,就会非常真实地砸到你脸上。

我自己第一次在生产环境里处理这类问题时,最大的感受不是“理论复杂”,而是:你以为要解决的是事务,实际上要解决的是失败、重试、乱序、补偿和可观测性
这也是本文想讲清楚的重点:在微服务架构下,如何基于 Saga 模式,把“最终一致性”真正做成一套能上线、能排查、能扩容的方案。


背景与问题

为什么传统分布式事务不太适合微服务

常见的强一致分布式事务方案是 2PC(Two-Phase Commit)。它的优点很直接:参与方要么都成功,要么都失败。
但在微服务场景中,它往往带来几个现实问题:

  1. 协调器复杂,链路长
  2. 参与方长时间锁资源
  3. 对网络抖动和超时敏感
  4. 数据库、中间件需要支持 XA 或类似协议
  5. 吞吐低,不适合高并发业务

对于订单、支付、库存这类互联网业务,大多数时候系统更需要的是:

  • 高可用
  • 可恢复
  • 可重试
  • 可补偿
  • 最终一致

这正是 Saga 模式发挥作用的地方。

一个典型业务场景

以“创建订单”为例,通常会涉及以下步骤:

  1. 订单服务创建订单
  2. 库存服务冻结库存
  3. 支付服务扣款
  4. 账户服务累计积分
  5. 全部成功后,订单状态改为已确认

如果第 3 步支付失败,那么前面已经执行成功的步骤就不能“装作没发生过”,必须走补偿逻辑:

  • 释放库存
  • 取消订单

Saga 的核心思想就是:
把一个长事务拆成多个本地事务,每一步成功后继续下一步,一旦某一步失败,就按相反顺序执行补偿操作。


核心原理

Saga 一般有两种实现方式:

  1. 编排式(Orchestration):由一个 Saga 协调器统一驱动流程
  2. 协同式(Choreography):服务之间通过事件相互触发

如果你的团队更关注流程可控、排查方便、状态可视化,我更推荐先从编排式 Saga开始。它虽然会引入一个中心协调器,但整体可维护性通常更高,尤其适合业务链条较长的场景。

Saga 的执行模型

flowchart TD
    A[开始下单] --> B[订单服务: 创建订单]
    B --> C[库存服务: 冻结库存]
    C --> D[支付服务: 扣款]
    D --> E[积分服务: 增加积分]
    E --> F[订单服务: 确认订单]

    D -->|失败| G[补偿: 解冻库存]
    G --> H[补偿: 取消订单]

    C -->|失败| H
    B -->|失败| I[结束并记录失败]
    H --> J[结束: 最终一致]

正向事务与补偿事务

每个 Saga Step 至少包含两部分:

  • Action:正向动作
  • Compensation:补偿动作

例如:

Step正向动作补偿动作
创建订单插入订单,状态为 PENDING将订单改为 CANCELLED
冻结库存扣减可用库存,增加冻结库存释放冻结库存
支付扣款扣减用户余额发起退款
增加积分增加积分扣减积分

注意:补偿不是数据库回滚,而是一个新的业务操作。
这点非常关键,因为很多人一开始设计时会误以为“失败了就还原”。现实里并不总能完全还原,比如外部支付网关已经受理了扣款,只能发起退款,而不是“像没发生过一样”。

状态机视角更容易落地

实际工程里,Saga 最好显式维护状态,而不是靠日志“猜”。一个典型状态流转如下:

stateDiagram-v2
    [*] --> PENDING
    PENDING --> RUNNING
    RUNNING --> COMPLETED
    RUNNING --> COMPENSATING
    COMPENSATING --> COMPENSATED
    COMPENSATING --> FAILED
    COMPLETED --> [*]
    COMPENSATED --> [*]
    FAILED --> [*]

建议至少记录:

  • saga_id
  • biz_id
  • current_step
  • status
  • payload
  • retry_count
  • last_error
  • created_at
  • updated_at

编排式与协同式的取舍

维度编排式 Saga协同式 Saga
流程可见性
耦合方式集中协调事件驱动
排查难度较低较高
扩展灵活性
适合场景核心主交易链路事件扩散型业务

如果你在做电商、交易、清结算主流程,优先考虑编排式。
如果是营销、通知、画像、风控侧链路,协同式也很合适。


方案对比与架构取舍

在正式写代码前,先把常见方案边界讲清楚。

1. 本地事务

适用条件:

  • 单服务
  • 单数据库
  • 没有跨库跨服务写操作

优点:简单、稳定。
缺点:一旦服务拆分,就不够用了。

2. 2PC / XA

适用条件:

  • 强一致要求极高
  • 参与方少
  • 资源锁定可接受
  • 基础设施支持 XA

优点:理论一致性强。
缺点:吞吐和可用性代价高,不适合大多数互联网业务。

3. TCC

适用条件:

  • 业务动作天然可拆为 Try / Confirm / Cancel
  • 团队能接受更高的开发复杂度

优点:一致性比 Saga 更强,控制力更细。
缺点:侵入性强,每个服务都要实现三套接口。

4. Saga

适用条件:

  • 允许最终一致
  • 可设计补偿动作
  • 链路长、跨服务较多

优点:更贴合微服务现实,性能和可用性更均衡。
缺点:补偿设计复杂,对幂等和状态管理要求高。

一个经验判断是:

  • 能本地事务解决,就别上分布式事务
  • 必须跨服务写,就优先思考 Saga 是否足够
  • 只有在业务确认/取消语义非常明确时,再考虑 TCC

参考架构设计

下面给一个典型的编排式 Saga 架构:

sequenceDiagram
    participant Client as 客户端
    participant Orchestrator as Saga协调器
    participant Order as 订单服务
    participant Inventory as 库存服务
    participant Payment as 支付服务
    participant Point as 积分服务

    Client->>Orchestrator: 提交下单请求
    Orchestrator->>Order: 创建订单
    Order-->>Orchestrator: 成功
    Orchestrator->>Inventory: 冻结库存
    Inventory-->>Orchestrator: 成功
    Orchestrator->>Payment: 扣款
    Payment-->>Orchestrator: 失败
    Orchestrator->>Inventory: 补偿-解冻库存
    Inventory-->>Orchestrator: 成功
    Orchestrator->>Order: 补偿-取消订单
    Order-->>Orchestrator: 成功
    Orchestrator-->>Client: 返回失败结果

这个架构中,协调器至少承担三件事:

  1. 按顺序驱动每一步
  2. 持久化 Saga 状态
  3. 在失败时执行补偿和重试

实战代码(可运行)

下面我用 Python 写一个可运行的极简 Saga 编排器示例
它不依赖消息队列,也不接数据库,重点是把核心机制讲透:步骤执行、失败补偿、幂等思路和状态记录

你可以直接保存为 saga_demo.py 运行。

from dataclasses import dataclass, field
from typing import Callable, List, Dict, Any
import uuid
import time


class SagaStepError(Exception):
    pass


@dataclass
class SagaStep:
    name: str
    action: Callable[[Dict[str, Any]], None]
    compensation: Callable[[Dict[str, Any]], None]


@dataclass
class SagaContext:
    saga_id: str
    biz_id: str
    data: Dict[str, Any] = field(default_factory=dict)
    executed_steps: List[str] = field(default_factory=list)
    logs: List[str] = field(default_factory=list)
    status: str = "PENDING"

    def log(self, message: str):
        ts = time.strftime("%Y-%m-%d %H:%M:%S")
        line = f"[{ts}] [saga={self.saga_id}] {message}"
        self.logs.append(line)
        print(line)


class SagaOrchestrator:
    def __init__(self, steps: List[SagaStep]):
        self.steps = steps

    def execute(self, context: SagaContext):
        context.status = "RUNNING"
        context.log(f"开始执行 Saga, biz_id={context.biz_id}")

        try:
            for step in self.steps:
                context.log(f"执行步骤: {step.name}")
                step.action(context.data)
                context.executed_steps.append(step.name)
                context.log(f"步骤成功: {step.name}")

            context.status = "COMPLETED"
            context.log("Saga 执行完成")
            return True

        except Exception as e:
            context.status = "COMPENSATING"
            context.log(f"步骤失败: {str(e)},开始补偿")

            for step in reversed(self.steps):
                if step.name in context.executed_steps:
                    try:
                        context.log(f"执行补偿: {step.name}")
                        step.compensation(context.data)
                        context.log(f"补偿成功: {step.name}")
                    except Exception as ce:
                        context.status = "FAILED"
                        context.log(f"补偿失败: {step.name}, error={str(ce)}")
                        return False

            context.status = "COMPENSATED"
            context.log("补偿完成,Saga 最终一致")
            return False


# ---- 模拟服务状态 ----
db = {
    "orders": {},
    "inventory": {"item_1": {"available": 10, "frozen": 0}},
    "accounts": {"user_1": {"balance": 1000, "points": 0}},
}


# ---- 订单服务 ----
def create_order(data):
    order_id = data["order_id"]
    if order_id in db["orders"]:
        return  # 幂等
    db["orders"][order_id] = {
        "user_id": data["user_id"],
        "item_id": data["item_id"],
        "amount": data["amount"],
        "status": "PENDING"
    }

def cancel_order(data):
    order_id = data["order_id"]
    order = db["orders"].get(order_id)
    if order:
        order["status"] = "CANCELLED"


# ---- 库存服务 ----
def reserve_inventory(data):
    item_id = data["item_id"]
    qty = data["qty"]
    inv = db["inventory"][item_id]
    if inv["available"] < qty:
        raise SagaStepError("库存不足")
    inv["available"] -= qty
    inv["frozen"] += qty

def release_inventory(data):
    item_id = data["item_id"]
    qty = data["qty"]
    inv = db["inventory"][item_id]
    inv["available"] += qty
    inv["frozen"] -= qty


# ---- 支付服务 ----
def charge_payment(data):
    user_id = data["user_id"]
    amount = data["amount"]
    account = db["accounts"][user_id]
    if account["balance"] < amount:
        raise SagaStepError("余额不足")
    # 这里模拟第三步失败,可改成 False 触发不同场景
    if data.get("simulate_payment_failure", False):
        raise SagaStepError("支付网关超时")
    account["balance"] -= amount

def refund_payment(data):
    user_id = data["user_id"]
    amount = data["amount"]
    db["accounts"][user_id]["balance"] += amount


# ---- 积分服务 ----
def add_points(data):
    user_id = data["user_id"]
    db["accounts"][user_id]["points"] += 10

def deduct_points(data):
    user_id = data["user_id"]
    db["accounts"][user_id]["points"] -= 10


def main():
    steps = [
        SagaStep("create_order", create_order, cancel_order),
        SagaStep("reserve_inventory", reserve_inventory, release_inventory),
        SagaStep("charge_payment", charge_payment, refund_payment),
        SagaStep("add_points", add_points, deduct_points),
    ]

    orchestrator = SagaOrchestrator(steps)

    context = SagaContext(
        saga_id=str(uuid.uuid4()),
        biz_id="biz-order-1001",
        data={
            "order_id": "order_1001",
            "user_id": "user_1",
            "item_id": "item_1",
            "qty": 2,
            "amount": 200,
            "simulate_payment_failure": True
        }
    )

    result = orchestrator.execute(context)

    print("\n=== 执行结果 ===")
    print("success =", result)
    print("status =", context.status)

    print("\n=== 数据状态 ===")
    print(db)


if __name__ == "__main__":
    main()

运行效果说明

simulate_payment_failure=True 时:

  • 订单创建成功
  • 库存冻结成功
  • 支付失败
  • 触发补偿:释放库存、取消订单

这就是 Saga 的基本落地逻辑。

进一步工程化时怎么改

上面的代码是“机制演示版”,真正上线通常还要加:

  1. Saga 状态表
  2. Step 执行日志表
  3. 重试队列
  4. 死信队列
  5. 幂等键
  6. 超时扫描任务
  7. 告警与链路追踪

例如 Saga 状态表可以这么设计:

CREATE TABLE saga_instance (
  saga_id VARCHAR(64) PRIMARY KEY,
  biz_id VARCHAR(64) NOT NULL,
  status VARCHAR(32) NOT NULL,
  current_step VARCHAR(64),
  payload TEXT,
  retry_count INT DEFAULT 0,
  last_error TEXT,
  created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE saga_step_log (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  saga_id VARCHAR(64) NOT NULL,
  step_name VARCHAR(64) NOT NULL,
  action_status VARCHAR(32) NOT NULL,
  compensation_status VARCHAR(32),
  error_message TEXT,
  created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

如果你用 Java / Spring Cloud,通常会把协调器单独做成一个应用,配合:

  • MySQL / PostgreSQL 存状态
  • Kafka / RocketMQ 传事件
  • Redis 做幂等标记和限流
  • OpenTelemetry 做链路追踪

落地设计要点

1. 每一步都必须幂等

这是 Saga 成败的第一原则。
因为网络超时后,你无法确定“对方到底是没收到,还是收到了但响应丢了”。这时最常见的策略就是重试,所以幂等必不可少。

比如“冻结库存”不能每重试一次就多冻一次。常见做法:

  • 使用 biz_id + step_name 作为幂等键
  • 服务侧先检查是否已处理
  • 已处理则直接返回成功

示意流程:

flowchart LR
    A[收到步骤请求] --> B{幂等键是否已处理}
    B -- 是 --> C[直接返回成功]
    B -- 否 --> D[执行本地事务]
    D --> E[记录幂等结果]
    E --> F[返回成功]

2. 补偿动作也必须幂等

很多团队只关注正向动作幂等,补偿却没做。结果一旦协调器重试补偿,就会出现:

  • 多次退款
  • 多次解冻库存
  • 多次取消订单

所以补偿接口同样要满足“重复调用结果一致”。

3. 顺序执行不等于顺序到达

只要你引入消息队列或异步机制,就要接受乱序可能性。
因此不能只依赖“消息来了就执行”,而要依赖状态机判断当前是否允许执行该步骤

例如:

  • 当前状态是 PAYMENT_FAILED
  • 却收到了“积分增加成功”的回调

这时不能无脑写入,而要做状态校验并拒绝非法流转。

4. 外部系统必须视为不可靠

支付网关、短信服务、第三方仓储系统,都可能出现:

  • 请求超时
  • 结果未知
  • 回调重复
  • 回调延迟

工程上要设计成“接受不确定性”,而不是假设“失败就是失败,成功就是成功”。


常见坑与排查

这一部分我尽量写得接地气一些,因为真正麻烦的地方,往往都在这里。

坑 1:本地事务提交了,但消息没发出去

这是经典问题。比如订单服务本地事务提交成功了,但通知 Saga 协调器或消息队列时失败,系统就进入“半成功”状态。

解决思路

使用 Transactional Outbox 模式:

  1. 本地事务里同时写业务表和 outbox 表
  2. 后台任务异步扫描 outbox 并投递消息
  3. 投递成功后更新 outbox 状态

这样可以避免“业务提交了,消息丢了”。

CREATE TABLE outbox_event (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  aggregate_id VARCHAR(64) NOT NULL,
  event_type VARCHAR(64) NOT NULL,
  payload TEXT NOT NULL,
  status VARCHAR(32) NOT NULL DEFAULT 'NEW',
  created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

坑 2:补偿失败导致事务“卡死”

例如:

  • 支付失败后需要退款
  • 退款接口又超时
  • 协调器卡在补偿中

排查路径

  1. saga_instance.status
  2. saga_step_log
  3. 查补偿接口的重试次数
  4. 看下游服务是否实际已处理但响应丢失
  5. 检查幂等表是否已记录成功

止血方案

  • 对补偿设置重试上限
  • 超过上限后转人工处理队列
  • 补偿失败必须告警
  • 后台提供手工补偿/跳过补偿能力,但要加审计

坑 3:库存被扣两次

常见原因:

  • 接口超时后调用方重试
  • 服务端没做幂等
  • MQ 重投导致重复消费

排查建议

重点核查:

  • 请求唯一键是否一致
  • 幂等表是否生效
  • 是否出现消费者 rebalance 或重复投递
  • 是否把“收到请求”误当成“执行成功”

坑 4:状态更新与业务事实不一致

比如订单显示“已取消”,但库存没释放;或者账户已退款,但订单仍显示“支付中”。

这类问题本质上是状态机设计不完整跨服务事实对账缺失

建议做法

  • 所有关键状态流转必须落库
  • 引入定时对账任务
  • 对账发现不一致时,触发自动修复或人工审核

坑 5:把 Saga 用在不适合的场景

不是所有一致性问题都该用 Saga。
比如银行核心记账、证券交易撮合这类对强一致要求极高的场景,Saga 往往不是首选。

一个简单判断标准:

  • 如果业务可以接受“几秒到几分钟内最终一致”,Saga 合适
  • 如果业务不能接受短暂不一致,优先考虑更强的一致性方案

安全/性能最佳实践

Saga 很容易被讨论成“事务话题”,但上线后真正影响系统稳定性的,往往是安全和性能细节。

安全最佳实践

1. 接口鉴权不能省

协调器调用各个服务时,必须做服务间认证,比如:

  • mTLS
  • JWT
  • 网关签名
  • 服务白名单

不要因为是“内网服务”就默认可信。

2. 补偿接口要有权限隔离

补偿动作通常更敏感,例如退款、释放库存、取消订单。
建议:

  • 补偿接口单独鉴权
  • 限定只有协调器可调用
  • 记录审计日志

3. 关键字段防篡改

例如:

  • amount
  • order_id
  • user_id
  • saga_id

如果这些字段在跨服务调用中被篡改,后果非常严重。
实践中建议使用:

  • 请求签名
  • 不可变事件载荷
  • 审计日志

性能最佳实践

1. 缩短单步执行时间

每个 Step 都应尽量短小,只做本地事务和必要操作。
不要在步骤里夹带:

  • 大批量扫描
  • 慢 SQL
  • 超长外部调用链

否则整个 Saga 时延会被拖长。

2. 控制补偿风暴

当下游服务雪崩时,大量 Saga 会进入补偿流程,形成“正向失败 + 补偿打爆下游”的双重压力。

建议加:

  • 限流
  • 熔断
  • 舱壁隔离
  • 指数退避重试

3. 做容量估算

一个简单估算模型:

  • 每天订单量:1000 万
  • 每单平均 4 个步骤
  • 每步 1 条状态记录 + 1 条日志
  • 平均每单 8~10 次写操作

那么仅 Saga 元数据写入量就可能达到近亿级/天。
所以在设计时要考虑:

  • 状态表分库分表
  • 日志冷热分离
  • 历史实例归档
  • Trace 与业务日志解耦

4. 区分同步返回与异步完成

不要强迫用户请求一直阻塞到整个 Saga 完成。
更推荐的方式是:

  • 前端拿到“受理成功 + 处理中”
  • 后端异步推进 Saga
  • 用户通过轮询或消息订阅查询最终状态

这样吞吐和用户体验都会更稳定。


一套更实用的落地建议

如果你准备在团队内真正推动 Saga,我建议按下面这个顺序做,而不是一上来就搞“大而全平台”:

第一步:选一个链路短、失败可补偿的场景试点

比如:

  • 下单 + 冻结库存 + 扣余额
  • 开通会员 + 发优惠券

先避免接入太多第三方系统。

第二步:先做编排式

先把这些基础能力做出来:

  • 状态机
  • 幂等
  • 补偿
  • 重试
  • 日志
  • 告警

这几项比“平台感”更重要。

第三步:引入 Outbox 和异步事件

等基本流程跑稳后,再引入:

  • Outbox
  • MQ
  • 回调
  • 对账修复

第四步:再考虑平台化

当多个业务线都要用时,再抽象成:

  • Saga DSL
  • 通用协调器
  • 控制台
  • 可视化编排
  • 手工干预平台

否则很容易在第一阶段就过度设计。


总结

微服务里的分布式事务,真正难的不是“知道 Saga 是什么”,而是把它做成一套能失败、能恢复、能重试、能排查的工程体系。

你可以把本文的关键点记成六句话:

  1. Saga 适合最终一致,不适合所有强一致场景
  2. 每个步骤都要有正向动作和补偿动作
  3. 正向动作必须幂等,补偿动作也必须幂等
  4. 状态机要显式落库,不能靠日志猜执行到哪一步
  5. Outbox、重试、对账、告警是上线必备,不是附加项
  6. 先从编排式和单条核心链路做起,别一开始就平台化过度

如果你现在正在设计一条跨订单、库存、支付的主交易链路,我的建议很明确:

  • 能本地事务解决,就别分布式化
  • 必须跨服务写,就优先评估 Saga
  • 如果补偿语义天然存在,Saga 往往是最均衡的选择
  • 一旦落地,先把幂等、状态机和可观测性做扎实

因为最后决定系统是否可靠的,往往不是“是否用了 Saga”,而是:
你是否认真对待了失败。


分享到:

上一篇
《Spring Boot 中基于 Spring Cache 与 Redis 的多级缓存实战:一致性、穿透与热点 Key 处理》
下一篇
《大模型应用落地实战:基于 RAG 构建企业知识库问答系统的关键技术与优化路径》