背景与问题
只要系统拆成了微服务,分布式事务几乎迟早会找上门。
一个最典型的场景是下单:
- 订单服务创建订单
- 库存服务扣减库存
- 支付服务冻结或扣款
- 营销服务发放优惠
- 物流服务创建配送单
如果这几个动作都在一个数据库里,用本地事务很好办;但一旦它们分散在不同服务、不同数据库,传统单机事务就失效了。很多团队最开始会想到 2PC/XA,但真正落地时常常会遇到几个现实问题:
- 数据库、中间件支持不统一
- 锁持有时间长,吞吐量明显下降
- 协调器故障会放大系统脆弱性
- 云原生环境下跨服务强一致成本过高
所以,在微服务里,我们通常不会强行追求“所有节点同时提交”。更常见、也更务实的做法是:
- 业务流程层面用 Saga 拆解大事务
- 服务间通信用可靠消息实现最终一致性
- 通过幂等、补偿、重试、对账把系统拉回正确状态
这篇文章不讲概念堆砌,而是从架构设计到代码实现,带你把这套方案走一遍。
为什么 Saga + 消息最终一致性更适合微服务
先说结论:Saga 适合长流程业务事务,消息最终一致性适合跨服务状态传播,两者结合,是很多中大型微服务系统里性价比很高的一种方案。
典型业务链路
以“订单创建”为例:
- 订单服务:创建订单,状态
PENDING - 库存服务:锁定库存
- 支付服务:创建支付单
- 积分服务:冻结积分
- 如果任何一步失败,触发前面步骤的补偿动作
这里的关键不是“绝不失败”,而是:
- 每一步都能独立提交本地事务
- 失败后能通过补偿把业务回滚到可接受状态
- 即使消息重复、延迟、乱序,也不会把数据搞乱
核心原理
1. Saga 模式:把大事务拆成一串本地事务
Saga 的核心思想很简单:
一个跨服务的大事务,拆成多个本地事务,每个本地事务成功后发布下一步指令;如果中间失败,则按相反顺序执行补偿操作。
Saga 常见有两种实现方式:
编排式 Saga(Orchestration)
由一个“流程协调者”统一决定下一步做什么。
优点:
- 流程清晰,适合复杂业务
- 容易观察整体状态
- 补偿逻辑集中管理
缺点:
- 协调器容易变复杂
- 需要处理状态机持久化
协同式 Saga(Choreography)
每个服务通过事件驱动下一步,谁收到事件谁决定是否继续。
优点:
- 去中心化,扩展灵活
- 服务间解耦较强
缺点:
- 链路分散,排查困难
- 业务流程容易“隐形化”
在实战里,我更建议:
- 流程简单、团队成熟度一般:优先编排式
- 流程较短、事件驱动明显:可考虑协同式
2. 消息最终一致性:允许短暂不一致,但最终收敛
消息最终一致性的核心在于:
- 本地事务成功
- 可靠地把“业务事件”写出来
- 消费方反复重试直到处理成功
- 使用幂等避免重复处理副作用
这里最经典的做法是 Outbox Pattern(事务消息表):
- 在本地事务中,同时写业务数据和消息表
- 后台任务把消息表中的未发送消息投递到 MQ
- 消费方收到消息后执行业务逻辑
- 处理成功后记录消费状态
这样即使服务在事务提交后瞬间宕机,消息也不会丢。
整体架构设计
下面先看一个推荐的落地架构。
flowchart LR
A[客户端提交订单] --> B[订单服务]
B --> C[(订单库)]
B --> D[(Outbox消息表)]
D --> E[消息投递器]
E --> F[MQ]
F --> G[库存服务]
F --> H[支付服务]
F --> I[积分服务]
G --> J[(库存库)]
H --> K[(支付库)]
I --> L[(积分库)]
G --> F
H --> F
I --> F
这个图里有两个非常关键的点:
- 订单服务不直接跨库开事务
- 事件发布依赖 Outbox,而不是“先写库再发 MQ”这种不可靠操作
Saga 状态流转
stateDiagram-v2
[*] --> PENDING
PENDING --> INVENTORY_LOCKED: 库存锁定成功
INVENTORY_LOCKED --> PAYMENT_CREATED: 支付单创建成功
PAYMENT_CREATED --> COMPLETED: 全部成功
INVENTORY_LOCKED --> COMPENSATING: 支付失败
PAYMENT_CREATED --> COMPENSATING: 后续失败
COMPENSATING --> CANCELLED: 补偿完成
COMPENSATING --> COMPENSATION_FAILED: 补偿异常
COMPENSATION_FAILED --> COMPENSATING: 重试补偿
这里建议你不要只维护“成功/失败”两态,而是明确建模:
PENDINGINVENTORY_LOCKEDPAYMENT_CREATEDCOMPLETEDCOMPENSATINGCANCELLEDCOMPENSATION_FAILED
状态足够清晰,排障时真的会轻松很多。
方案对比与取舍分析
1. 2PC/XA vs Saga
| 方案 | 一致性 | 性能 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|
| 2PC/XA | 强一致 | 较低 | 高 | 少量核心场景、老系统 |
| TCC | 强一致偏业务化 | 中 | 很高 | 对一致性要求极高 |
| Saga | 最终一致 | 高 | 中 | 长事务、微服务业务流程 |
| 可靠消息最终一致性 | 最终一致 | 高 | 中 | 异步驱动、事件传播 |
2. 什么时候别用 Saga
Saga 不是银弹。以下场景要谨慎:
- 证券撮合、账户实时扣款 等对强一致要求极高
- 补偿动作不具备业务可逆性
- 流程高度耦合且失败成本巨大
- 团队没有统一的事件规范、幂等机制、监控体系
换句话说,Saga 很适合电商、履约、营销、订单类场景,但不一定适合所有资金核心链路。
核心设计要点
1. 业务 ID 与幂等键
每一条业务操作都要带上:
order_idsaga_idmessage_ididempotency_key
我踩过的一个坑是:只按 order_id 做幂等,结果同一订单内不同步骤互相覆盖,最后查出来一团乱。更稳妥的方式是:
业务主键 + 步骤名 + 动作类型
例如:
order-1001:reserve_inventory:forwardorder-1001:reserve_inventory:compensate
2. 补偿不是数据库 rollback
很多人一开始会误以为“补偿 = 回滚”。其实不是。
比如库存锁定成功后,支付失败,补偿动作不是回滚 SQL,而是发起一笔新的业务操作:
- 解锁库存
- 取消支付单
- 回退积分冻结
它们都应当是显式的业务接口。
3. 重试要有边界
重试不是无限打。建议配置:
- 指数退避
- 最大重试次数
- 死信队列
- 人工干预入口
不然失败消息会在系统里永远打转,CPU、日志、告警一起爆。
实战代码(可运行)
下面给一个简化但可运行的示例,使用 Python + Flask + SQLite 来模拟:
- 订单服务
- Outbox 消息表
- 消息投递
- 库存服务消费
- 幂等处理
这个示例重点演示模式,不依赖真实 MQ。我们用数据库表模拟消息队列,方便本地跑起来理解全流程。
目录结构
.
├── app.py
└── requirements.txt
requirements.txt
flask==3.0.0
app.py
import json
import sqlite3
import threading
import time
import uuid
from contextlib import contextmanager
from flask import Flask, jsonify, request
app = Flask(__name__)
DB = "demo.db"
@contextmanager
def get_conn():
conn = sqlite3.connect(DB)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_db():
with get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS orders (
id TEXT PRIMARY KEY,
status TEXT NOT NULL,
amount INTEGER NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS inventory (
product_id TEXT PRIMARY KEY,
stock INTEGER NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS outbox (
id TEXT PRIMARY KEY,
topic TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'NEW',
retry_count INTEGER NOT NULL DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS inbox_processed (
idempotency_key TEXT PRIMARY KEY,
processed_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS inventory_reservations (
order_id TEXT PRIMARY KEY,
product_id TEXT NOT NULL,
quantity INTEGER NOT NULL,
status TEXT NOT NULL
)
""")
cur = conn.execute("SELECT COUNT(*) AS c FROM inventory")
if cur.fetchone()["c"] == 0:
conn.execute(
"INSERT INTO inventory(product_id, stock) VALUES (?, ?)",
("sku-1", 10)
)
def create_order_with_outbox(order_id, amount, product_id, quantity):
saga_id = str(uuid.uuid4())
message_id = str(uuid.uuid4())
idempotency_key = f"{order_id}:reserve_inventory:forward"
event = {
"message_id": message_id,
"saga_id": saga_id,
"order_id": order_id,
"product_id": product_id,
"quantity": quantity,
"idempotency_key": idempotency_key
}
with get_conn() as conn:
conn.execute(
"INSERT INTO orders(id, status, amount) VALUES (?, ?, ?)",
(order_id, "PENDING", amount)
)
conn.execute(
"INSERT INTO outbox(id, topic, payload, status) VALUES (?, ?, ?, ?)",
(message_id, "ReserveInventory", json.dumps(event), "NEW")
)
def publish_outbox():
while True:
try:
with get_conn() as conn:
rows = conn.execute(
"SELECT id, topic, payload, retry_count FROM outbox WHERE status = 'NEW' ORDER BY created_at LIMIT 10"
).fetchall()
for row in rows:
payload = json.loads(row["payload"])
try:
if row["topic"] == "ReserveInventory":
handle_reserve_inventory(payload)
conn.execute(
"UPDATE outbox SET status = 'SENT' WHERE id = ?",
(row["id"],)
)
except Exception as e:
conn.execute(
"UPDATE outbox SET retry_count = retry_count + 1 WHERE id = ?",
(row["id"],)
)
print(f"[publisher] publish failed: {e}, message_id={row['id']}")
except Exception as e:
print(f"[publisher] loop error: {e}")
time.sleep(2)
def handle_reserve_inventory(event):
order_id = event["order_id"]
product_id = event["product_id"]
quantity = event["quantity"]
idempotency_key = event["idempotency_key"]
with get_conn() as conn:
processed = conn.execute(
"SELECT 1 FROM inbox_processed WHERE idempotency_key = ?",
(idempotency_key,)
).fetchone()
if processed:
print(f"[inventory] duplicate message ignored: {idempotency_key}")
return
stock_row = conn.execute(
"SELECT stock FROM inventory WHERE product_id = ?",
(product_id,)
).fetchone()
if not stock_row:
raise Exception("product not found")
if stock_row["stock"] < quantity:
conn.execute(
"UPDATE orders SET status = 'CANCELLED' WHERE id = ?",
(order_id,)
)
conn.execute(
"INSERT INTO inbox_processed(idempotency_key) VALUES (?)",
(idempotency_key,)
)
print(f"[inventory] insufficient stock, order cancelled: {order_id}")
return
conn.execute(
"UPDATE inventory SET stock = stock - ? WHERE product_id = ?",
(quantity, product_id)
)
conn.execute(
"INSERT OR REPLACE INTO inventory_reservations(order_id, product_id, quantity, status) VALUES (?, ?, ?, ?)",
(order_id, product_id, quantity, "RESERVED")
)
conn.execute(
"UPDATE orders SET status = 'INVENTORY_LOCKED' WHERE id = ?",
(order_id,)
)
conn.execute(
"INSERT INTO inbox_processed(idempotency_key) VALUES (?)",
(idempotency_key,)
)
print(f"[inventory] reserved stock for order: {order_id}")
@app.post("/orders")
def create_order():
body = request.json or {}
order_id = body.get("order_id") or str(uuid.uuid4())
amount = int(body.get("amount", 100))
product_id = body.get("product_id", "sku-1")
quantity = int(body.get("quantity", 1))
try:
create_order_with_outbox(order_id, amount, product_id, quantity)
return jsonify({"success": True, "order_id": order_id}), 201
except sqlite3.IntegrityError:
return jsonify({"success": False, "message": "duplicate order_id"}), 409
@app.get("/orders/<order_id>")
def get_order(order_id):
with get_conn() as conn:
row = conn.execute(
"SELECT * FROM orders WHERE id = ?",
(order_id,)
).fetchone()
if not row:
return jsonify({"message": "not found"}), 404
return jsonify(dict(row))
@app.get("/inventory/<product_id>")
def get_inventory(product_id):
with get_conn() as conn:
row = conn.execute(
"SELECT * FROM inventory WHERE product_id = ?",
(product_id,)
).fetchone()
if not row:
return jsonify({"message": "not found"}), 404
return jsonify(dict(row))
if __name__ == "__main__":
init_db()
t = threading.Thread(target=publish_outbox, daemon=True)
t.start()
app.run(debug=True)
运行方式
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python app.py
测试创建订单
curl -X POST http://127.0.0.1:5000/orders \
-H "Content-Type: application/json" \
-d '{
"order_id": "order-1001",
"amount": 100,
"product_id": "sku-1",
"quantity": 2
}'
查询订单状态:
curl http://127.0.0.1:5000/orders/order-1001
查询库存:
curl http://127.0.0.1:5000/inventory/sku-1
这个示例体现了什么
虽然代码不长,但已经体现了落地的关键点:
- 订单写入和 Outbox 写入同一个本地事务
- 消息由后台任务异步投递
- 消费端通过
inbox_processed做幂等 - 库存不足时执行业务取消,而不是数据库层回滚跨服务状态
Saga 时序示意
再看一张完整一点的时序图,会更容易理解“成功路径”和“补偿路径”。
sequenceDiagram
participant C as Client
participant O as 订单服务
participant X as Outbox投递器
participant M as MQ
participant I as 库存服务
participant P as 支付服务
C->>O: 创建订单
O->>O: 本地事务写订单+Outbox
X->>M: 投递 ReserveInventory
M->>I: 消费库存锁定事件
I->>I: 本地事务锁定库存
I->>M: 发布 InventoryReserved
M->>P: 消费支付创建事件
P->>P: 创建支付单失败
P->>M: 发布 PaymentFailed
M->>O: 通知 Saga 补偿
O->>M: 发布 ReleaseInventory
M->>I: 消费释放库存事件
I->>I: 解锁库存
O->>O: 更新订单为 CANCELLED
常见坑与排查
这部分我建议认真看,很多项目不是败在“不会设计”,而是败在这些细节没兜住。
1. 先发 MQ,再提交数据库
这是非常常见的错误顺序。
如果你这样做:
- 发 MQ 成功
- 本地数据库提交失败
那消费者已经收到事件,但业务数据根本不存在,直接产生脏读和幻读式问题。
正确做法
- 用 Outbox:业务数据和消息记录同事务提交
- 异步投递 MQ
2. 消息重复消费导致库存多扣
消息系统默认至少一次投递,重复消费是常态,不是异常。
排查方法
看消费日志里是否存在:
- 同一个
message_id - 同一个
idempotency_key - 同一个业务主键被多次更新
正确做法
- 消费端一定做幂等
- 幂等记录必须落库,不要只放内存
- 幂等判断和业务处理尽量在一个本地事务里完成
3. 补偿消息比正向消息先到
在高并发、网络抖动、重试并存时,消息乱序并不罕见。
表现
- 先收到
ReleaseInventory - 后收到
ReserveInventory
如果没做状态校验,就会出现库存状态错乱。
处理建议
- 每个步骤都建状态机
- 对非法状态迁移直接拒绝
- 补偿操作要求可幂等、可重入
例如:
- 只有
RESERVED状态才能RELEASED - 已经
RELEASED再次释放时直接成功返回
4. 补偿失败后无人接手
这是线上系统里最危险的隐性故障之一。表面看订单只是“处理中”,实际上永远卡住了。
建议做法
- 补偿任务持久化
- 设置最大重试次数
- 超阈值进入人工处理队列
- 给运营或技术支持提供修复入口
5. 链路追踪缺失,定位全靠猜
当订单、库存、支付分散在不同服务里,如果没有统一 Trace ID,排查会非常痛苦。
至少要统一这些字段
trace_idsaga_idorder_idmessage_id
日志、监控、审计表里都应该能查到。
安全/性能最佳实践
安全方面
1. 消息内容最小化
不要把敏感数据原样塞进消息体,比如:
- 用户完整身份证号
- 银行卡号
- 明文手机号
消息中尽量只放:
- 业务 ID
- 资源 ID
- 必要状态
- 可脱敏字段
2. 接口验签与权限控制
补偿接口、状态推进接口,不能谁都能调。建议:
- 服务间使用 mTLS 或网关鉴权
- 内部事件带签名或可信来源校验
- 运维修复接口做 RBAC 权限控制
3. 审计留痕
对于涉及资金、库存、权益的事务流转,最好保留:
- 操作时间
- 操作者/系统来源
- 前后状态
- 请求参数摘要
这样在事故复盘时非常有价值。
性能方面
1. Outbox 表要控制膨胀
Outbox 是高频写表,如果不清理,很快就会变成热点。
建议:
- 按时间分表或分区
- 发送成功的数据归档
- 建立
(status, created_at)索引
示例 SQL:
CREATE INDEX idx_outbox_status_created_at
ON outbox(status, created_at);
2. 批量拉取、批量发送
投递器不要一条条查、一条条发。可以:
- 每次拉 100 条
- 批量提交 MQ
- 控制并发线程数
这样吞吐量会明显提升。
3. 消费逻辑要短事务
消费者本地事务里只做必要的状态变更,不要顺手查一堆远程接口。否则:
- 锁时间变长
- 重试成本变高
- 雪崩时恢复更慢
4. 容量估算要提前做
一个粗略估算公式:
每日消息量 = 日订单量 × 平均事务步骤数 × (1 + 重试率 + 补偿率)
比如:
- 日订单量 100 万
- 平均 4 个步骤
- 重试率 5%
- 补偿率 2%
那么消息量大约是:
100万 × 4 × (1 + 0.05 + 0.02) = 428万
这会直接影响:
- MQ 吞吐选型
- 消费者实例数
- Outbox 清理策略
- 日志与监控成本
落地建议:从“小闭环”开始
如果你所在团队还没有成熟的分布式事务治理体系,我不建议一上来就做一个超级通用的 Saga 平台。更现实的路径是:
第一步:先把 Outbox + 幂等打牢
先确保:
- 业务数据和事件消息同事务落库
- 消费端有稳定幂等能力
- 有失败重试和死信处理
这一步完成后,很多“消息丢失、重复消费、状态不一致”问题就能大幅下降。
第二步:对一个核心链路引入 Saga
建议选择:
- 业务价值高
- 步骤数 3~5 个
- 补偿动作清晰
- 能接受短暂最终一致
比如:
- 下单 -> 锁库存 -> 创建支付单
- 会员开通 -> 权益发放 -> 通知 CRM
第三步:补齐监控与人工修复
技术方案能否真正上线,不在于 happy path 多漂亮,而在于异常时能不能接住。至少要有:
- Saga 状态查询页
- 失败消息重试入口
- 补偿失败告警
- 业务对账任务
一个实用的排查清单
当你遇到“订单状态不对”时,可以按这个顺序查:
- 订单主记录状态是否正确
- Outbox 是否已生成消息
- 消息是否投递成功
- 消费者是否收到消息
- 幂等表是否已记录
- 下游本地事务是否提交
- 是否触发补偿
- 补偿是否重试失败
- 是否进入死信或人工队列
- 对账任务是否已经修复
这个顺序很重要,能避免一上来就在 Kafka、RocketMQ、数据库日志里到处乱翻。
总结
在微服务架构里,分布式事务的关键不是追求“绝对同步提交”,而是建立一套 可恢复、可观测、可补偿 的一致性机制。
如果把这篇文章压缩成几条最有执行价值的建议,我会给这几条:
- 优先考虑 Saga + 消息最终一致性,而不是盲目上 XA
- 消息发布一定用 Outbox,不要“写库后顺手发 MQ”
- 消费端幂等是底线,不是优化项
- 补偿动作必须显式设计,并且可重试、可审计
- 状态机、Trace ID、死信队列、人工修复入口必须配套
最后也要提醒一个边界条件:
Saga 解决的是大多数业务系统里的“最终一致性问题”,不是所有场景都能接受它。 对资金强一致、法律合规要求极高的链路,仍然要结合 TCC、账户系统记账模型,甚至局部保留单体强事务设计。
如果你的业务属于订单、库存、履约、营销这类典型微服务链路,那么从 Outbox + 幂等 + 编排式 Saga 开始,通常是一条足够稳、也足够现实的落地路径。