微服务架构下的分布式事务实战:基于 Saga 模式实现订单与库存一致性
在单体应用里,订单表和库存表往往在一个数据库里,一个本地事务就能保证“一起成功、一起失败”。但一旦拆成微服务,事情就没那么简单了:订单服务有自己的库,库存服务也有自己的库,中间隔着网络、消息队列、重试机制,任何一个环节出问题,都可能让“订单已创建但库存没扣成”或者“库存扣了但订单没成功”这种问题浮出水面。
这篇文章我不打算只讲概念,而是带你从一个非常典型的场景出发:用户下单时,订单服务与库存服务如何在微服务架构下保持最终一致性。核心方案是 Saga 模式,并且会给出一套可运行的示例代码,帮助你把设计和实现对上号。
背景与问题
假设我们有两个服务:
- 订单服务 Order Service
- 库存服务 Inventory Service
用户下单的流程看起来很直观:
- 创建订单
- 扣减库存
- 扣减成功后把订单标记为已确认
- 扣减失败则取消订单
如果这两个动作不在一个数据库里,就无法直接使用本地事务。很多团队刚开始会尝试以下几种做法:
方案一:同步调用 + try/catch
订单服务先写订单,再调用库存服务扣库存。失败就把订单状态改为取消。
这个方案“看上去能跑”,但问题很多:
- 订单已落库,库存接口超时,调用方不知道库存到底扣没扣
- 重试可能造成重复扣减
- 中间任一步骤宕机,会留下中间状态
- 并发高时,排查起来非常痛苦
方案二:两阶段提交(2PC)
理论上能保证强一致,但在微服务场景里通常不太现实:
- 协调器复杂
- 锁持有时间长,吞吐下降明显
- 服务异构、多数据库、多中间件时实现困难
- 对可用性不友好
为什么 Saga 更适合
Saga 的核心思路是:
把一个大事务拆成多个本地事务,每个本地事务提交后,通过后续步骤继续推进;如果中途失败,则执行已完成步骤对应的补偿操作。
也就是说,我们不追求强一致的“同时成功”,而是接受短时间的不一致,但最终让系统收敛到正确状态。
方案对比与取舍分析
在订单与库存这个场景里,常见方案可以做一个横向比较:
| 方案 | 一致性 | 实现复杂度 | 性能 | 适用场景 |
|---|---|---|---|---|
| 本地事务 | 强一致 | 低 | 高 | 单体或单库 |
| 2PC/XA | 强一致 | 高 | 低~中 | 少量核心强一致场景 |
| TCC | 强一致偏最终 | 很高 | 中 | 资金、账户等关键业务 |
| Saga | 最终一致 | 中 | 高 | 订单、库存、物流等链路 |
为什么这题更推荐 Saga
订单和库存通常具备这些特点:
- 允许短时间中间状态存在
- 业务天然可以补偿,例如“取消订单”“释放库存”
- 服务间解耦要求高
- 对吞吐和可用性要求高于绝对强一致
所以,从工程落地角度看,Saga 往往是更平衡的方案。
核心原理
Saga 有两种主要实现方式:
- 编排式(Orchestration):由一个 Saga 协调器统一驱动流程
- 协同式(Choreography):服务通过事件自行协作
这篇文章采用更容易落地和理解的 编排式 Saga。原因很简单:订单创建、库存冻结/扣减、订单确认/取消,本身就是一个清晰的业务流程,用一个协调器来串起来,排查和补偿逻辑更集中。
一次下单 Saga 的基本流程
flowchart TD
A[用户提交订单] --> B[订单服务 创建PENDING订单]
B --> C[Saga协调器 请求库存预留]
C --> D{库存预留成功?}
D -- 是 --> E[订单服务 确认订单 CONFIRMED]
D -- 否 --> F[订单服务 取消订单 CANCELLED]
E --> G[流程结束]
F --> G
这里我故意用了“库存预留”而不是直接“扣减库存”。这是一个很重要的实践点:
- 预留:先占住库存,防止超卖
- 确认:订单最终成功后,真正消耗
- 释放:订单失败或超时后,把预留库存归还
相比直接扣减,预留机制更适合 Saga,因为它天然支持补偿。
状态机视角
订单和库存都应该是状态驱动,而不是“接口调一下就算完成”。
stateDiagram-v2
[*] --> PENDING
PENDING --> CONFIRMED: 库存预留成功
PENDING --> CANCELLED: 库存预留失败/超时
CONFIRMED --> [*]
CANCELLED --> [*]
库存侧也可以有类似状态:
- available:可用库存
- reserved:已预留库存
- consumed:已消耗库存
一个关键设计:补偿不是回滚
很多人第一次接触 Saga,会把补偿理解成“分布式回滚”。实际上不是。
补偿动作是新的业务操作,例如:
- 创建订单的补偿:取消订单
- 预留库存的补偿:释放库存
它不要求把数据库精确恢复到某个历史快照,而是让业务状态回归正确。
幂等性必须内建
Saga 最大的工程难点之一不是流程本身,而是:
- 网络超时
- 消息重复投递
- 调用方重试
- 协调器重放
所以每个参与方都必须做到:
- 相同 sagaId 的请求重复执行不出错
- 补偿操作重复执行也安全
- 状态迁移不可逆、不可乱跳
架构设计与数据模型
下面给一个简化但实战够用的设计。
服务职责划分
- Order Service
- 创建订单
- 确认订单
- 取消订单
- Inventory Service
- 预留库存
- 释放库存
- 确认消耗库存
- Saga Coordinator
- 驱动流程
- 记录 Saga 执行状态
- 触发补偿
时序图
sequenceDiagram
participant U as User
participant S as Saga Coordinator
participant O as Order Service
participant I as Inventory Service
U->>S: 发起下单(sagaId, productId, qty)
S->>O: createOrder(PENDING)
O-->>S: orderCreated(orderId)
S->>I: reserveInventory(sagaId, productId, qty)
alt 预留成功
I-->>S: reserved
S->>O: confirmOrder(orderId)
O-->>S: confirmed
else 预留失败
I-->>S: failed
S->>O: cancelOrder(orderId)
O-->>S: cancelled
end
S-->>U: 返回最终结果
建议的数据表
订单表
CREATE TABLE orders (
order_id VARCHAR(64) PRIMARY KEY,
saga_id VARCHAR(64) NOT NULL UNIQUE,
product_id VARCHAR(64) NOT NULL,
quantity INT NOT NULL,
status VARCHAR(32) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
库存表
CREATE TABLE inventory (
product_id VARCHAR(64) PRIMARY KEY,
available INT NOT NULL,
reserved INT NOT NULL DEFAULT 0
);
库存预留记录表
CREATE TABLE inventory_reservations (
saga_id VARCHAR(64) PRIMARY KEY,
product_id VARCHAR(64) NOT NULL,
quantity INT NOT NULL,
status VARCHAR(32) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
这个 inventory_reservations 表非常关键,它是库存服务实现幂等和补偿的基础。
实战代码(可运行)
下面我用 Python + Flask 做一个最小可运行示例。它不是生产级框架,但足够把 Saga 的关键动作演示清楚。
你可以把它理解成三个模块:
- 订单服务
- 库存服务
- Saga 协调器
为了方便运行,示例放在一个文件里,用内存数据模拟数据库。重点不是框架,而是状态流转和补偿逻辑。
完整示例代码
from flask import Flask, request, jsonify
from threading import Lock
import uuid
app = Flask(__name__)
# 模拟数据库
orders = {}
inventory = {
"sku-1": {"available": 10, "reserved": 0}
}
reservations = {}
saga_logs = {}
db_lock = Lock()
def generate_id():
return str(uuid.uuid4())
# =========================
# Order Service
# =========================
def create_order(saga_id, product_id, quantity):
with db_lock:
if saga_id in [o["saga_id"] for o in orders.values()]:
# 幂等:相同 saga_id 重复请求,直接返回已有订单
for order_id, order in orders.items():
if order["saga_id"] == saga_id:
return {"order_id": order_id, "status": order["status"]}
order_id = generate_id()
orders[order_id] = {
"order_id": order_id,
"saga_id": saga_id,
"product_id": product_id,
"quantity": quantity,
"status": "PENDING"
}
return {"order_id": order_id, "status": "PENDING"}
def confirm_order(order_id):
with db_lock:
order = orders.get(order_id)
if not order:
raise ValueError("order not found")
if order["status"] == "CONFIRMED":
return {"order_id": order_id, "status": "CONFIRMED"}
if order["status"] == "CANCELLED":
raise ValueError("cannot confirm cancelled order")
order["status"] = "CONFIRMED"
return {"order_id": order_id, "status": "CONFIRMED"}
def cancel_order(order_id):
with db_lock:
order = orders.get(order_id)
if not order:
raise ValueError("order not found")
if order["status"] == "CANCELLED":
return {"order_id": order_id, "status": "CANCELLED"}
if order["status"] == "CONFIRMED":
raise ValueError("cannot cancel confirmed order")
order["status"] = "CANCELLED"
return {"order_id": order_id, "status": "CANCELLED"}
# =========================
# Inventory Service
# =========================
def reserve_inventory(saga_id, product_id, quantity):
with db_lock:
# 幂等:重复预留请求直接返回历史结果
if saga_id in reservations:
return {
"saga_id": saga_id,
"status": reservations[saga_id]["status"]
}
item = inventory.get(product_id)
if not item:
reservations[saga_id] = {
"product_id": product_id,
"quantity": quantity,
"status": "FAILED"
}
return {"saga_id": saga_id, "status": "FAILED", "reason": "product not found"}
if item["available"] < quantity:
reservations[saga_id] = {
"product_id": product_id,
"quantity": quantity,
"status": "FAILED"
}
return {"saga_id": saga_id, "status": "FAILED", "reason": "insufficient stock"}
item["available"] -= quantity
item["reserved"] += quantity
reservations[saga_id] = {
"product_id": product_id,
"quantity": quantity,
"status": "RESERVED"
}
return {"saga_id": saga_id, "status": "RESERVED"}
def release_inventory(saga_id):
with db_lock:
reservation = reservations.get(saga_id)
if not reservation:
return {"saga_id": saga_id, "status": "NOT_FOUND"}
if reservation["status"] == "RELEASED":
return {"saga_id": saga_id, "status": "RELEASED"}
if reservation["status"] == "FAILED":
return {"saga_id": saga_id, "status": "FAILED"}
if reservation["status"] == "CONSUMED":
raise ValueError("cannot release consumed inventory")
product_id = reservation["product_id"]
quantity = reservation["quantity"]
item = inventory[product_id]
item["available"] += quantity
item["reserved"] -= quantity
reservation["status"] = "RELEASED"
return {"saga_id": saga_id, "status": "RELEASED"}
def commit_inventory(saga_id):
with db_lock:
reservation = reservations.get(saga_id)
if not reservation:
raise ValueError("reservation not found")
if reservation["status"] == "CONSUMED":
return {"saga_id": saga_id, "status": "CONSUMED"}
if reservation["status"] != "RESERVED":
raise ValueError(f"cannot commit reservation in status {reservation['status']}")
product_id = reservation["product_id"]
quantity = reservation["quantity"]
item = inventory[product_id]
item["reserved"] -= quantity
reservation["status"] = "CONSUMED"
return {"saga_id": saga_id, "status": "CONSUMED"}
# =========================
# Saga Coordinator
# =========================
def execute_order_saga(product_id, quantity):
saga_id = generate_id()
saga_logs[saga_id] = {"status": "STARTED"}
try:
order_result = create_order(saga_id, product_id, quantity)
order_id = order_result["order_id"]
saga_logs[saga_id]["order_id"] = order_id
saga_logs[saga_id]["status"] = "ORDER_CREATED"
reserve_result = reserve_inventory(saga_id, product_id, quantity)
if reserve_result["status"] != "RESERVED":
cancel_order(order_id)
saga_logs[saga_id]["status"] = "CANCELLED"
return {
"saga_id": saga_id,
"order_id": order_id,
"result": "FAILED",
"reason": reserve_result.get("reason", "reserve failed")
}
saga_logs[saga_id]["status"] = "INVENTORY_RESERVED"
confirm_order(order_id)
commit_inventory(saga_id)
saga_logs[saga_id]["status"] = "COMPLETED"
return {
"saga_id": saga_id,
"order_id": order_id,
"result": "SUCCESS"
}
except Exception as e:
# 补偿逻辑
order_id = saga_logs[saga_id].get("order_id")
try:
release_inventory(saga_id)
except Exception:
pass
if order_id:
try:
cancel_order(order_id)
except Exception:
pass
saga_logs[saga_id]["status"] = "COMPENSATED"
return {
"saga_id": saga_id,
"result": "FAILED",
"reason": str(e)
}
# =========================
# HTTP API
# =========================
@app.route("/checkout", methods=["POST"])
def checkout():
data = request.json
product_id = data["product_id"]
quantity = int(data["quantity"])
result = execute_order_saga(product_id, quantity)
return jsonify(result)
@app.route("/debug/orders", methods=["GET"])
def debug_orders():
return jsonify(orders)
@app.route("/debug/inventory", methods=["GET"])
def debug_inventory():
return jsonify(inventory)
@app.route("/debug/reservations", methods=["GET"])
def debug_reservations():
return jsonify(reservations)
@app.route("/debug/sagas", methods=["GET"])
def debug_sagas():
return jsonify(saga_logs)
if __name__ == "__main__":
app.run(debug=True, port=5000)
如何运行这个示例
安装依赖
pip install flask
启动服务
python app.py
发起一次成功下单
curl -X POST http://127.0.0.1:5000/checkout \
-H "Content-Type: application/json" \
-d '{"product_id":"sku-1","quantity":3}'
预期返回:
{
"order_id": "xxxx",
"result": "SUCCESS",
"saga_id": "xxxx"
}
查看订单和库存状态
curl http://127.0.0.1:5000/debug/orders
curl http://127.0.0.1:5000/debug/inventory
触发库存不足场景
curl -X POST http://127.0.0.1:5000/checkout \
-H "Content-Type: application/json" \
-d '{"product_id":"sku-1","quantity":100}'
这时会发生:
- 订单先创建为
PENDING - 库存预留失败
- 订单被补偿为
CANCELLED
这就是 Saga 的典型行为:不追求一个大事务原子提交,而是通过状态推进和补偿实现最终一致。
从示例走向生产:真正要补上的东西
上面的代码能跑,但离生产可用还差不少。下面这些是你在真实项目里几乎一定要考虑的。
1. 本地事务 + 事件落库要绑定
在真实系统中,订单服务不能只是“改状态”,还要把“订单已创建”“订单已取消”之类的事件可靠发出去。常见做法是 Outbox Pattern:
- 在同一个本地事务里:
- 更新业务表
- 写一条待发送事件到 outbox 表
- 再由异步任务把 outbox 表里的事件投递到 MQ
这样可以避免“数据库提交了,但消息没发出去”的问题。
2. 状态机要显式定义
不要在代码里随便 if else 改状态,建议明确约束:
PENDING -> CONFIRMEDPENDING -> CANCELLED- 禁止
CANCELLED -> CONFIRMED - 禁止
CONFIRMED -> CANCELLED
库存预留记录也是同样道理。
3. 每个动作都要幂等
至少需要一个全局唯一标识,例如:
sagaIdrequestIdreservationId
并在服务侧落库去重。
4. 超时要有兜底补偿
有一种很常见的故障:库存服务其实已经预留成功,但响应丢了,协调器等超时后准备重试。这时如果没有幂等,就可能重复预留;如果没有定时回查,就可能一直卡在中间状态。
生产上建议:
- Saga 状态表记录每一步开始时间
- 定时任务扫描超时实例
- 根据业务状态回查后继续推进或补偿
常见坑与排查
这一节我会重点讲实战里最容易翻车的地方。我自己踩过的坑里,前两个最常见。
坑一:订单取消了,但库存没释放
表现:
- 订单状态是
CANCELLED - 库存的
reserved没减回去 - 一段时间后出现“库存越来越少,但实际卖单不多”
根因通常有几个:
- 补偿逻辑没执行
- 补偿执行失败后没有重试
- 补偿动作不是幂等,第一次失败后后续不敢重试
- Saga 状态表缺失,系统不知道该补偿谁
排查路径:
- 查
saga_logs是否进入COMPENSATED - 查库存预留记录状态是否仍为
RESERVED - 查补偿任务日志是否有异常堆栈
- 查 MQ 消息是否积压或消费失败
止血方案:
- 先做离线对账:扫描
CANCELLED订单关联的库存预留记录 - 对状态仍为
RESERVED的记录执行补偿释放 - 补偿接口必须支持重复执行
坑二:重复扣库存
表现:
- 一笔订单只下了一次,库存却少了两次
- 日志里有超时重试
根因:
- 调用重试没有幂等 key
- 消息重复消费没有去重
- 预留和消费动作混在一个接口里,重入难处理
排查建议:
- 检查是否以
sagaId或reservationId做唯一键 - 检查库存服务是否在数据库层有唯一约束
- 检查消费日志中同一请求是否被处理多次
坑三:补偿顺序错误
如果一个 Saga 涉及多个下游服务,补偿顺序必须与执行顺序相反。比如:
- 创建订单
- 预留库存
- 创建物流单
失败时应该:
- 取消物流单
- 释放库存
- 取消订单
而不是想到哪补到哪。否则会把状态越补越乱。
坑四:把“失败”与“未知”混为一谈
比如库存服务调用超时,很多系统直接当失败处理。但超时只说明你没拿到结果,不代表对方没成功。
这时正确做法通常是:
- 先把 Saga 标成
UNKNOWN - 通过查询接口或事件回查真实状态
- 再决定继续、重试还是补偿
这个细节非常重要,不然很容易出现“对方已成功,我方又执行补偿”的反向事故。
安全/性能最佳实践
Saga 讨论多了,大家容易把注意力都放在一致性上,但安全和性能同样关键。
安全最佳实践
1. 所有内部接口也要做鉴权
不要因为是内网调用就裸奔。至少应该有:
- 服务间认证
- 请求签名或 mTLS
- 基于角色的接口访问控制
特别是像“释放库存”“取消订单”这种补偿接口,如果被误调用,后果很直接。
2. 防重放
对于 Saga 请求,建议携带:
requestId- 时间戳
- 签名
并限制请求有效时间窗口,避免旧请求被重复利用。
3. 审计日志要完整
至少记录:
- sagaId
- orderId
- 参与服务
- 请求参数摘要
- 状态变更前后值
- 操作时间
- 错误原因
这样出了事故,不至于只能靠猜。
性能最佳实践
1. 用预留而不是长事务锁
不要在跨服务调用期间持有数据库锁,这会直接把吞吐拖垮。Saga 的本质就是让每个服务只做自己的短事务。
2. 热点库存要特别处理
如果某个商品是秒杀热点,单纯依赖数据库行更新可能会成为瓶颈。可以结合:
- Redis 预扣
- 异步落库
- 分段库存
- 限流与排队
但注意:引入缓存后,一致性复杂度会更高,必须有对账机制。
3. 补偿任务要分级限速
系统故障恢复时,可能会堆积大量待补偿 Saga。如果你一口气全量重试,下游服务会被打崩。建议:
- 分批扫描
- 指数退避重试
- 按业务优先级处理
- 设置熔断和限流
4. 容量估算别只看成功链路
很多团队压测时只测“正常下单成功”,但真正压垮系统的往往是:
- 超时重试
- 重复消费
- 补偿风暴
- MQ 堆积后的追赶流量
粗略估算时,至少把以下流量算进去:
- 正常请求量
- 重试放大量
- 补偿请求量
- 对账回查量
比如正常每秒 1000 单,如果超时重试系数是 1.3,补偿比例高峰期 10%,那下游承压可能不是 1000 QPS,而是:
1000 + 300 + 100 = 1400 QPS
而这还没算回查任务。
一个更贴近生产的落地建议
如果你正准备在团队里落地 Saga,我建议按下面这个顺序推进,而不是一开始就把架构搞得特别花哨。
第一步:先把状态机和补偿动作定义清楚
文档里明确:
- 哪些是正向动作
- 哪些是补偿动作
- 每个动作的输入输出
- 允许的状态迁移
- 失败后的处理策略
第二步:实现幂等和唯一约束
在数据库层就做掉,而不是只靠代码记忆。
例如:
ALTER TABLE orders ADD CONSTRAINT uk_orders_saga_id UNIQUE (saga_id);
ALTER TABLE inventory_reservations ADD CONSTRAINT uk_reservation_saga_id UNIQUE (saga_id);
第三步:补齐可观测性
至少要有:
- sagaId 全链路日志
- 指标监控:成功率、补偿率、超时率
- 告警:卡住的 Saga 数量、积压时长
第四步:做对账系统
别觉得“有 Saga 就不需要对账”。现实里,任何复杂系统都需要定期兜底校验。
对账的典型规则包括:
CONFIRMED订单必须对应CONSUMED库存预留CANCELLED订单不能对应RESERVED库存预留- 超时未完成的 Saga 必须进入人工或自动回查队列
总结
订单与库存一致性,是微服务分布式事务里最经典也最容易出事故的一题。真正实战时,关键不在于背出 Saga 的定义,而在于把下面几件事做到位:
- 接受最终一致,而不是执着强一致
- 把大事务拆成多个本地事务
- 为每一步准备可重复执行的补偿动作
- 通过幂等、状态机、唯一约束来抵御重试和重复消息
- 用超时回查、对账和监控补上工程上的最后一公里
如果你问我一个最实用的建议,我会说:
在订单库存场景里,优先采用“订单 PENDING + 库存预留 + 成功确认 / 失败释放”的 Saga 结构,不要一上来就做跨服务强一致。
它足够稳,也足够贴近业务现实。
当然,Saga 也有边界条件。像余额扣款、证券交易这类对强一致要求极高的场景,Saga 往往不是首选,这时需要评估 TCC 或更强的一致性方案。技术方案从来不是越重越好,而是要和业务容错边界匹配。
如果你已经在做微服务拆分,订单和库存又不在一个库里,那么这套 Saga 思路,基本就是值得优先落地的一条主线。