分布式架构中基于 Saga 模式的分布式事务落地实践:从服务拆分到一致性保障
在单体应用里,事务这件事通常不太难:一个数据库连接,一个 BEGIN,出了问题 ROLLBACK。
但服务一旦拆开,订单、库存、支付、积分各自都有自己的库,问题就变了——你会很快发现,原来“提交成功”只是某个服务自己成功了,不代表整个业务成功。
我第一次真正踩到这个坑,是在一个“下单扣库存再扣余额”的流程里:订单服务创建成功了,库存也锁住了,结果支付服务超时。最后用户看到了“支付失败”,但库存还被占着,运营同学第二天就来问:为什么明明没付款,商品卖不出去了?
这篇文章就从这个典型场景出发,讲讲 Saga 模式 在分布式架构里的落地方式:它到底解决什么问题,怎么设计,代码怎么写,最容易踩哪些坑,以及在安全和性能上该怎么做。
背景与问题
服务拆分之后,事务边界断了
典型电商下单流程可以拆成几个服务:
- 订单服务:创建订单、维护状态
- 库存服务:预扣库存、释放库存
- 支付服务:扣款、退款
- 通知服务:发送支付结果、订单结果
在单体里,这些动作可能在一个本地事务里完成。
但拆成微服务后,每个服务有自己的数据库,传统单库事务不再适用。
为什么不直接上 2PC / XA?
很多人一开始会想到“两阶段提交”。理论上它可以做强一致,但在实际业务系统里常常不合适:
- 协调器复杂,依赖重
- 数据库、驱动、中间件兼容性要求高
- 阻塞明显,吞吐差
- 一旦出现网络抖动或参与方长时间不可用,系统会很难受
对于大多数互联网业务来说,真正需要的是:
- 可以接受短时间不一致
- 最终能恢复到正确状态
- 故障时有补偿手段
- 吞吐优先于强一致
这正是 Saga 的适用场景。
方案对比与取舍分析
在开始实现之前,先把边界说清楚。Saga 不是银弹,它适合的是长事务、可补偿、接受最终一致性的业务。
常见方案对比
| 方案 | 一致性 | 性能 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|
| 本地事务 | 强一致 | 高 | 低 | 单体、单库 |
| 2PC/XA | 强一致 | 低到中 | 高 | 核心强一致场景 |
| TCC | 强一致趋近 | 中 | 很高 | 资金、账户、核心资源预留 |
| Saga | 最终一致 | 高 | 中 | 订单、库存、履约等长流程 |
Saga 适合什么,不适合什么
适合:
- 下单、履约、发券、积分等业务流程
- 每一步都有明确补偿动作
- 允许短时间状态不一致
不适合:
- 绝对不能出现中间态的资金总账
- 补偿成本极高或不可逆的操作
- 外部系统不支持幂等和状态查询
一句话总结:
Saga 的本质不是“事务不失败”,而是“失败后系统能回到可接受状态”。
核心原理
Saga 可以理解为:
把一个大事务拆成多个本地事务,每个本地事务成功后进入下一步;如果某一步失败,就按相反顺序执行补偿操作。
一个典型 Saga 链路
以“下单”为例:
- 创建订单
- 预扣库存
- 扣减余额
- 确认订单完成
如果第 3 步失败,则触发补偿:
- 释放库存
- 取消订单
编排式与协同式
Saga 落地通常有两种风格:
1. 编排式 Saga(Orchestration)
由一个统一的 Saga 协调器决定下一步做什么。
优点:
- 流程清晰
- 状态集中
- 排查容易
缺点:
- 协调器可能变成核心依赖
2. 协同式 Saga(Choreography)
各服务通过事件驱动,自己监听上一步结果并决定后续动作。
优点:
- 更松耦合
- 没有中心协调器单点
缺点:
- 流程分散
- 链路排查复杂
- 事件风暴时难维护
对于中级团队或者刚开始治理分布式事务的系统,我通常建议:
先用编排式,把流程走通、补偿做好、监控建全,再考虑演进到协同式。
Saga 执行流程图
flowchart TD
A[用户提交订单] --> B[订单服务: 创建订单 PENDING]
B --> C[库存服务: 预扣库存]
C --> D[支付服务: 扣款]
D --> E[订单服务: 更新为 CONFIRMED]
E --> F[流程成功结束]
D -- 失败 --> G[库存服务: 释放库存]
G --> H[订单服务: 更新为 CANCELLED]
H --> I[流程补偿结束]
C -- 失败 --> H
状态流转设计
Saga 设计里,状态机非常关键。
如果没有明确状态,线上就会出现“到底该不该补偿”“补偿到哪一步了”“重试是否安全”这些非常头疼的问题。
stateDiagram-v2
[*] --> PENDING
PENDING --> STOCK_RESERVED: 库存预扣成功
STOCK_RESERVED --> PAID: 支付成功
PAID --> CONFIRMED: 订单确认成功
STOCK_RESERVED --> CANCELLING: 支付失败
PENDING --> CANCELLING: 库存失败
CANCELLING --> CANCELLED: 补偿完成
PAID --> REFUNDING: 确认失败需回滚
REFUNDING --> CANCELLED: 退款+释放库存完成
核心原理拆解:落地时必须具备的 4 个能力
1. 本地事务
每一步服务内部,必须先保证自己的数据一致。
比如库存服务执行“预扣库存”时,要在一个本地事务里完成:
- 检查可用库存
- 扣减可用库存
- 增加冻结库存
- 记录操作日志
2. 补偿事务
补偿不是简单“反向执行”这么粗糙。
比如:
- 创建订单的补偿:不是物理删除,而是改状态为
CANCELLED - 扣库存的补偿:释放冻结库存,不是盲目加库存
- 扣款的补偿:如果已扣款则退款,如果未扣款则跳过
3. 幂等性
这是 Saga 能不能稳定运行的生命线。
为什么?因为分布式环境里你一定会遇到:
- 请求超时但实际执行成功
- 消息重复投递
- 协调器重试
- 服务实例重启
所以每个动作和补偿动作都必须是幂等的。
同一个 saga_id + step 重复执行,结果必须一致。
4. 可观测性
如果没有统一的 saga_id、步骤状态、失败原因、重试次数,你的 Saga 在生产环境里会像黑盒一样。
出问题时只能“猜”。
时序图:成功与失败路径
sequenceDiagram
participant U as 用户
participant O as Saga协调器
participant S1 as 订单服务
participant S2 as 库存服务
participant S3 as 支付服务
U->>O: 提交订单
O->>S1: createOrder(sagaId, orderId)
S1-->>O: success
O->>S2: reserveStock(sagaId, orderId)
S2-->>O: success
O->>S3: pay(sagaId, orderId)
S3-->>O: failed
O->>S2: releaseStock(sagaId, orderId)
S2-->>O: success
O->>S1: cancelOrder(sagaId, orderId)
S1-->>O: success
O-->>U: 下单失败,已完成补偿
实战代码(可运行)
下面用 Python 做一个可运行的编排式 Saga 示例。
这个示例不依赖第三方框架,重点展示:
- Saga 协调器
- 步骤定义
- 正向动作
- 补偿动作
- 幂等记录
- 失败回滚
你可以直接保存为 saga_demo.py 运行。
from dataclasses import dataclass, field
from typing import Callable, List, Dict, Set
import uuid
class BusinessError(Exception):
pass
@dataclass
class OrderService:
orders: Dict[str, str] = field(default_factory=dict)
executed: Set[str] = field(default_factory=set)
def create_order(self, saga_id: str, order_id: str):
key = f"{saga_id}:create_order"
if key in self.executed:
print("[OrderService] create_order 幂等命中")
return
self.orders[order_id] = "PENDING"
self.executed.add(key)
print(f"[OrderService] 订单已创建: {order_id}, status=PENDING")
def cancel_order(self, saga_id: str, order_id: str):
key = f"{saga_id}:cancel_order"
if key in self.executed:
print("[OrderService] cancel_order 幂等命中")
return
if order_id in self.orders and self.orders[order_id] != "CANCELLED":
self.orders[order_id] = "CANCELLED"
self.executed.add(key)
print(f"[OrderService] 订单已取消: {order_id}")
def confirm_order(self, saga_id: str, order_id: str):
key = f"{saga_id}:confirm_order"
if key in self.executed:
print("[OrderService] confirm_order 幂等命中")
return
if order_id not in self.orders:
raise BusinessError("订单不存在")
self.orders[order_id] = "CONFIRMED"
self.executed.add(key)
print(f"[OrderService] 订单已确认: {order_id}")
@dataclass
class InventoryService:
stock: Dict[str, int] = field(default_factory=lambda: {"apple": 10})
reserved: Dict[str, int] = field(default_factory=dict)
executed: Set[str] = field(default_factory=set)
def reserve_stock(self, saga_id: str, order_id: str, sku: str, count: int):
key = f"{saga_id}:reserve_stock"
if key in self.executed:
print("[InventoryService] reserve_stock 幂等命中")
return
if self.stock.get(sku, 0) < count:
raise BusinessError("库存不足")
self.stock[sku] -= count
self.reserved[order_id] = self.reserved.get(order_id, 0) + count
self.executed.add(key)
print(f"[InventoryService] 已预扣库存: sku={sku}, count={count}")
def release_stock(self, saga_id: str, order_id: str, sku: str):
key = f"{saga_id}:release_stock"
if key in self.executed:
print("[InventoryService] release_stock 幂等命中")
return
count = self.reserved.get(order_id, 0)
if count > 0:
self.stock[sku] = self.stock.get(sku, 0) + count
self.reserved[order_id] = 0
self.executed.add(key)
print(f"[InventoryService] 已释放库存: sku={sku}, count={count}")
@dataclass
class PaymentService:
balances: Dict[str, int] = field(default_factory=lambda: {"u1001": 100})
paid_orders: Set[str] = field(default_factory=set)
executed: Set[str] = field(default_factory=set)
def pay(self, saga_id: str, user_id: str, order_id: str, amount: int):
key = f"{saga_id}:pay"
if key in self.executed:
print("[PaymentService] pay 幂等命中")
return
if self.balances.get(user_id, 0) < amount:
raise BusinessError("余额不足")
self.balances[user_id] -= amount
self.paid_orders.add(order_id)
self.executed.add(key)
print(f"[PaymentService] 支付成功: user={user_id}, amount={amount}")
def refund(self, saga_id: str, user_id: str, order_id: str, amount: int):
key = f"{saga_id}:refund"
if key in self.executed:
print("[PaymentService] refund 幂等命中")
return
if order_id in self.paid_orders:
self.balances[user_id] = self.balances.get(user_id, 0) + amount
self.paid_orders.remove(order_id)
self.executed.add(key)
print(f"[PaymentService] 已退款: user={user_id}, amount={amount}")
@dataclass
class SagaStep:
name: str
action: Callable[[], None]
compensation: Callable[[], None]
class SagaOrchestrator:
def __init__(self):
self.completed_steps: List[SagaStep] = []
def execute(self, steps: List[SagaStep]):
try:
for step in steps:
print(f"\n[Saga] 执行步骤: {step.name}")
step.action()
self.completed_steps.append(step)
print("\n[Saga] 全部步骤执行成功")
except Exception as e:
print(f"\n[Saga] 步骤失败: {e}")
self.compensate()
raise
def compensate(self):
print("\n[Saga] 开始补偿...")
for step in reversed(self.completed_steps):
try:
print(f"[Saga] 补偿步骤: {step.name}")
step.compensation()
except Exception as ce:
print(f"[Saga] 补偿失败(需人工介入): {step.name}, error={ce}")
print("[Saga] 补偿结束")
def run_demo(balance: int):
saga_id = str(uuid.uuid4())
order_id = "ORD-10001"
user_id = "u1001"
sku = "apple"
count = 2
amount = 80
order_service = OrderService()
inventory_service = InventoryService()
payment_service = PaymentService(balances={user_id: balance})
orchestrator = SagaOrchestrator()
steps = [
SagaStep(
name="创建订单",
action=lambda: order_service.create_order(saga_id, order_id),
compensation=lambda: order_service.cancel_order(saga_id, order_id)
),
SagaStep(
name="预扣库存",
action=lambda: inventory_service.reserve_stock(saga_id, order_id, sku, count),
compensation=lambda: inventory_service.release_stock(saga_id, order_id, sku)
),
SagaStep(
name="支付扣款",
action=lambda: payment_service.pay(saga_id, user_id, order_id, amount),
compensation=lambda: payment_service.refund(saga_id, user_id, order_id, amount)
),
SagaStep(
name="确认订单",
action=lambda: order_service.confirm_order(saga_id, order_id),
compensation=lambda: order_service.cancel_order(saga_id, order_id)
),
]
try:
orchestrator.execute(steps)
except Exception:
print("[Demo] Saga 执行失败")
finally:
print("\n[最终状态]")
print("orders =", order_service.orders)
print("stock =", inventory_service.stock)
print("reserved =", inventory_service.reserved)
print("balances =", payment_service.balances)
if __name__ == "__main__":
print("==== 场景1:余额充足,成功 ====")
run_demo(balance=100)
print("\n\n==== 场景2:余额不足,触发补偿 ====")
run_demo(balance=50)
运行效果说明
-
当余额为
100时,流程成功:- 订单创建成功
- 库存预扣成功
- 支付成功
- 订单确认成功
-
当余额为
50时,支付失败:- 已执行的“创建订单”“预扣库存”会触发补偿
- 最终订单被取消,库存恢复
这个示例是内存版,但它已经把 Saga 的几个关键点体现出来了:
- 每一步都有补偿动作
- 补偿逆序执行
- 动作和补偿都带幂等键
- 协调器负责整体推进和失败回滚
生产环境如何真正落地
上面的代码是“原理版”,生产里还要补几个关键构件。
1. Saga 日志表
你至少需要一张表记录事务实例和步骤执行情况。
CREATE TABLE saga_instance (
saga_id VARCHAR(64) PRIMARY KEY,
business_key VARCHAR(128) NOT NULL,
status VARCHAR(32) NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
CREATE TABLE saga_step_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
saga_id VARCHAR(64) NOT NULL,
step_name VARCHAR(64) NOT NULL,
action_status VARCHAR(32) NOT NULL,
compensation_status VARCHAR(32) NOT NULL,
retry_count INT NOT NULL DEFAULT 0,
error_message VARCHAR(512),
updated_at TIMESTAMP NOT NULL,
UNIQUE KEY uk_saga_step (saga_id, step_name)
);
建议状态值至少包括:
INITRUNNINGSUCCESSFAILEDCOMPENSATINGCOMPENSATEDPARTIAL_FAILED
2. 业务去重键
幂等不能只靠“感觉”。
实际系统里,建议使用这些维度之一:
saga_id + step_namebusiness_order_no + action_type- 消息唯一 ID
3. Outbox / 可靠消息
如果你的 Saga 是事件驱动的,那么“本地事务提交了,但消息没发出去”会直接破坏一致性。
这时建议引入 Outbox 模式:
- 业务数据和消息记录写入同一个本地事务
- 后台任务异步把 Outbox 推送到 MQ
- 消费端做幂等
常见坑与排查
这是我觉得最值得说的一部分。很多系统不是不会写 Saga,而是写出来后线上不稳定。
坑 1:把补偿当成“简单回滚”
这是最常见误区。
例如支付服务:
- 正向动作:扣款
- 补偿动作:退款
但退款不等于回滚数据库字段。
因为支付可能已经影响外部渠道、清算流水、账单记录。补偿应该是一笔新的业务动作,而不是试图把历史“抹掉”。
排查建议:
- 补偿动作是否有独立流水号?
- 补偿是否可审计?
- 补偿后状态是否闭环?
坑 2:补偿失败后没有兜底
如果释放库存时又失败了怎么办?
很多系统写到这里就结束了,结果库存长期冻结。
建议做法:
- 补偿失败进入
COMPENSATION_RETRY - 启动定时任务继续重试
- 超过阈值后告警并人工介入
- 提供后台补偿工具
我自己的经验是:
自动重试 + 人工补偿台 几乎是必须的,不要幻想线上不会出极端状态。
坑 3:没有幂等,重试越多越乱
典型现象:
- 重试一次,库存多扣一次
- 重试一次,用户多退一笔钱
- MQ 重复消费,订单状态反复跳变
排查路径:
- 看请求是否带全局唯一 ID
- 看服务端是否落幂等记录
- 看数据库是否有唯一索引约束
- 看消费逻辑是否先检查状态再执行业务
坑 4:状态机不完整
很多项目一开始只定义:
- 成功
- 失败
实际上这远远不够。
你至少要区分:
- 执行中
- 执行成功
- 执行失败
- 补偿中
- 补偿成功
- 补偿失败
- 待人工处理
否则线上一出问题,根本不知道该重试正向还是重试补偿。
坑 5:超时与“假失败”
一个非常真实的线上场景:
- 协调器调用支付服务
- 5 秒没返回,协调器判定失败并开始补偿
- 第 6 秒支付服务其实扣款成功了
于是你会得到一个极其尴尬的状态:
订单取消了,但用户的钱扣了。
排查建议:
- 远程调用失败后,不要立刻把结果当成“业务失败”
- 优先设计“可查询状态”的接口
- 对于未知结果,先进入
UNKNOWN状态,异步对账确认
这件事非常重要:
分布式系统里,超时通常表示“未知”,不等于“失败”。
安全/性能最佳实践
Saga 主要讨论一致性,但安全和性能也不能忽略。
安全最佳实践
1. 补偿接口必须鉴权
补偿接口不是普通业务接口。
如果被错误调用,可能直接释放库存、取消订单、退款。
建议:
- 只允许内网调用
- 做服务间身份认证
- 校验
saga_id、业务主键、签名 - 落审计日志
2. 避免敏感信息在 Saga 日志中裸奔
不要在 saga 日志、消息体中直接存:
- 明文手机号
- 银行卡号
- 身份证号
- 支付凭证原文
日志里保留必要业务键即可。
3. 人工补偿台要有权限分级
运营可查看,不一定可执行。
执行补偿建议至少做到:
- RBAC 权限控制
- 双人审批(高风险操作)
- 操作审计可追溯
性能最佳实践
1. 缩短 Saga 链路
参与服务越多,失败概率越高。
能合并的步骤尽量合并,能本地化的不远程化。
例如:
- “创建订单 + 初始化订单明细”放在一个服务内本地事务完成
- 不要把非常细碎的状态更新拆成多个远程步骤
2. 补偿逻辑要轻量
补偿不是做全量重算。
它应该尽可能只做必要的逆向修正,减少资源占用和锁竞争。
3. 使用异步推进非关键步骤
比如通知短信、站内信、积分赠送,很多时候不必放进主 Saga 链路。
否则会拖慢核心交易路径。
4. 控制重试策略
不要无脑立即重试。建议使用:
- 指数退避
- 最大重试次数
- 熔断与降级
- 死信队列
容量估算与治理建议
Saga 系统除了能跑,还得考虑规模。
一个简单估算思路
假设:
- 峰值下单 QPS:500
- 每笔订单平均 4 个 Saga 步骤
- 每步平均 1 次状态写入
- 失败率 2%,失败补偿平均 2 步
那么每秒大约产生:
- 正向步骤写入:
500 * 4 = 2000 - 补偿写入:
500 * 2% * 2 = 20
总计约 2020 次步骤记录写入/秒。
这意味着:
- saga 日志表要分库分表或按时间归档
- 索引不能过多
- 热点业务键要避免集中冲突
- 监控和查询系统不能直接压主库
我建议优先治理的 3 件事
- 先把状态机建完整
- 所有动作都做幂等
- 建立失败重试和人工补偿闭环
如果这三件事没做好,系统规模越大,事故只会越频繁。
一个更接近真实项目的落地清单
如果你准备在项目里上 Saga,可以按下面这个顺序推进:
第一阶段:跑通最小闭环
- 定义
saga_id - 明确每一步 action / compensation
- 设计状态机
- 完成幂等处理
- 做基础日志记录
第二阶段:补足可靠性
- 引入 Saga 实例表和步骤日志表
- 加重试机制
- 增加超时与未知状态处理
- 接入告警和链路追踪
第三阶段:工程化治理
- 对接 Outbox/MQ
- 增加人工补偿后台
- 做对账任务
- 建立容量评估和归档策略
总结
Saga 模式的价值,不在于把分布式系统变成“看起来像单机事务”,而在于它提供了一套可恢复、可追踪、可扩展的一致性解决方案。
你可以把它记成三句话:
- 把大事务拆成多个本地事务
- 每一步都必须有可执行的补偿
- 用幂等、状态机、日志和重试把最终一致性兜住
如果你正在做订单、库存、支付、履约这类流程,我的建议很直接:
- 优先选择编排式 Saga 起步
- 先收敛业务状态机,再谈框架选型
- 补偿要当成正式业务能力建设,不是异常分支凑合写一下
- 对于“超时”的情况,永远先当成“未知”处理
最后说个很现实的边界条件:
如果你的场景是“总账绝对准确,任何中间态都不可接受”,那 Saga 往往不是最佳答案,应该评估 TCC 或更强一致的方案。
但如果你做的是大多数互联网交易流程,Saga 几乎是绕不过去的一课,而且值得认真做好。