背景与问题
单体时代,我们习惯了一个本地事务包打天下:
BEGIN;
UPDATE account SET balance = balance - 100 WHERE user_id = 1;
INSERT INTO orders(order_id, user_id, amount) VALUES('O1001', 1, 100);
COMMIT;
一旦服务拆分成订单、库存、支付、账户几个微服务,这种舒服的日子基本就结束了。
你会很快遇到这些问题:
- 订单创建成功了,但库存扣减失败
- 库存扣了,支付超时了
- 消息发出去了,但业务库事务回滚了
- 消费端因为重试把同一笔钱扣了两次
- 补偿逻辑写得不完整,数据“看起来都成功”,但实际账对不上
很多团队第一反应是:能不能上分布式事务框架,一把梭?
理论上可以,但真实生产里,2PC/XA 往往有这些现实问题:
- 对数据库和中间件要求高
- 性能抖动明显
- 长事务导致锁持有时间变长
- 跨团队、跨语言、跨存储实现困难
所以在大多数互联网业务里,更常见的落地方式是:
- Saga:把大事务拆成一组本地事务 + 补偿动作
- Outbox:保证“业务数据更新”和“事件投递”尽量原子
- 幂等设计:接受消息重复、超时重试、网络抖动,把结果做对
这篇文章我会从一个常见的下单场景出发,把这三件事串起来讲清楚,并给出一套能跑起来的示例。
一个典型场景:下单链路怎么保证一致性
假设下单流程如下:
- 订单服务创建订单
- 库存服务扣减库存
- 支付服务完成扣款
- 订单服务将状态改为
CONFIRMED
如果中途失败,要做补偿:
- 库存已扣但支付失败,则回补库存
- 订单已创建但最终失败,则订单取消
这不是“所有服务同时成功”的强一致,而是最终一致性。
flowchart LR
A[用户提交订单] --> B[订单服务创建 PENDING 订单]
B --> C[Outbox 写入 OrderCreated 事件]
C --> D[消息投递器发送事件]
D --> E[库存服务扣减库存]
E --> F[支付服务执行扣款]
F --> G[订单服务确认订单]
F -->|失败| H[触发补偿]
H --> I[库存回补]
I --> J[订单取消]
核心原理
1. Saga:把大事务拆成一段段本地事务
Saga 的核心思想很朴素:
每个服务只管自己的本地事务,失败时执行相反方向的补偿动作。
比如下单 Saga:
T1:订单服务创建订单T2:库存服务扣减库存T3:支付服务扣款T4:订单服务确认订单
对应补偿:
C2:库存回补C1:订单取消
注意,补偿不是“数据库回滚”,而是新的业务动作。
这点特别重要。我见过有人把补偿理解成把数据库改回去,结果一旦中间插入了别的业务行为,逻辑就全乱了。
编排式 vs 协同式
Saga 常见两种风格:
- 编排式(Orchestration):由一个协调者控制流程
- 协同式(Choreography):服务靠事件彼此驱动
如果团队刚开始做分布式事务,我一般更建议先用编排式,因为:
- 流程可视化更好
- 排障路径更清晰
- 更适合复杂补偿
协同式在简单流程里很轻,但事件多了以后,容易出现“谁触发了谁,已经没人说得清”的情况。
sequenceDiagram
participant U as 用户
participant O as 订单服务
participant X as Outbox投递器
participant S as 库存服务
participant P as 支付服务
U->>O: 创建订单
O->>O: 本地事务:订单=PENDING + Outbox事件
O-->>U: 返回受理成功
X->>S: 投递 OrderCreated
S->>S: 幂等扣库存
S-->>X: InventoryReserved
X->>P: 投递 InventoryReserved
P->>P: 幂等扣款
alt 支付成功
P-->>X: PaymentSucceeded
X->>O: 更新订单为 CONFIRMED
else 支付失败
P-->>X: PaymentFailed
X->>S: 触发库存补偿
X->>O: 更新订单为 CANCELED
end
2. Outbox:解决“数据库成功了,但消息没发出去”
这是微服务里最常见、也最隐蔽的一类问题。
错误写法通常是:
- 先提交订单
- 再调用 MQ 发送消息
如果在第 2 步宕机,就会发生:
- 订单已经存在
- 但库存服务永远收不到事件
Outbox 模式的做法是:
- 在同一个本地事务里
- 写业务表
- 写 outbox 事件表
- 事务提交后,由后台任务扫描 outbox 表并投递到 MQ
- 投递成功后更新 outbox 状态
这样能保证:
只要业务数据提交成功,事件就一定有机会被发送出去。
Outbox 的关键点
- 事件表和业务表必须在同一个库、同一个事务内
- 投递器必须可重试
- 消息发送可能重复,消费者必须幂等
- 事件要带唯一 ID 和业务键
3. 幂等设计:不要假设消息只来一次
在分布式系统里,“只投递一次”几乎都是理想状态。
现实通常是“至少一次”。
所以消费端一定要实现幂等。常见办法:
基于业务唯一键幂等
例如扣减库存时,使用 order_id + sku_id 作为唯一约束。
基于消息 ID 幂等
每条消息带 event_id,消费时先记录处理结果:
- 没处理过:执行业务逻辑
- 已处理过:直接返回成功
状态机幂等
订单状态从:
PENDING -> CONFIRMEDPENDING -> CANCELED
如果已经是 CONFIRMED,再来一次成功消息,也不能重复变更。
stateDiagram-v2
[*] --> PENDING
PENDING --> CONFIRMED: PaymentSucceeded
PENDING --> CANCELED: PaymentFailed / Timeout
CONFIRMED --> [*]
CANCELED --> [*]
方案对比与取舍分析
在真正落地前,先把几个常见方案摆在一起看,会更清楚边界。
| 方案 | 一致性 | 性能 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|
| 本地事务 | 强一致 | 高 | 低 | 单体、单库 |
| XA/2PC | 强一致 | 中低 | 高 | 金融级少量核心链路 |
| TCC | 强一致倾向 | 中 | 很高 | 业务可预留资源、模型稳定 |
| Saga + Outbox + 幂等 | 最终一致 | 高 | 中 | 大多数互联网业务 |
| 纯消息最终一致 | 最终一致 | 高 | 中低 | 允许短时不一致的异步流程 |
我的经验是:
- 高频主链路:优先 Saga + Outbox + 幂等
- 资金强约束场景:谨慎评估 TCC 或账户账本模型
- 跨团队复杂协作:尽量避免要求所有服务都支持 XA
落地设计:一个可运行的简化实现
下面我们用 Python + SQLite 做一个单机版示例,模拟:
- 订单创建
- Outbox 写事件
- 后台投递
- 库存服务幂等消费
- 支付失败后的补偿
这个示例不是完整生产代码,但逻辑是可以跑通的,便于理解核心机制。
实战代码(可运行)
1. 数据表设计
CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
amount INTEGER NOT NULL,
status TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS inventory (
sku_id TEXT PRIMARY KEY,
stock INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS inventory_reservations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id TEXT NOT NULL,
sku_id TEXT NOT NULL,
quantity INTEGER NOT NULL,
status TEXT NOT NULL,
UNIQUE(order_id, sku_id)
);
CREATE TABLE IF NOT EXISTS outbox_events (
event_id TEXT PRIMARY KEY,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'NEW',
retry_count INTEGER NOT NULL DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS processed_messages (
consumer_name TEXT NOT NULL,
event_id TEXT NOT NULL,
processed_at DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (consumer_name, event_id)
);
2. Python 示例代码
把下面代码保存为 saga_outbox_demo.py,直接运行即可。
import sqlite3
import json
import uuid
from contextlib import contextmanager
DB = "demo.db"
@contextmanager
def get_conn():
conn = sqlite3.connect(DB)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_db():
with get_conn() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
amount INTEGER NOT NULL,
status TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS inventory (
sku_id TEXT PRIMARY KEY,
stock INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS inventory_reservations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id TEXT NOT NULL,
sku_id TEXT NOT NULL,
quantity INTEGER NOT NULL,
status TEXT NOT NULL,
UNIQUE(order_id, sku_id)
);
CREATE TABLE IF NOT EXISTS outbox_events (
event_id TEXT PRIMARY KEY,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'NEW',
retry_count INTEGER NOT NULL DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS processed_messages (
consumer_name TEXT NOT NULL,
event_id TEXT NOT NULL,
processed_at DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (consumer_name, event_id)
);
""")
conn.execute("INSERT OR IGNORE INTO inventory(sku_id, stock) VALUES(?, ?)", ("SKU-1", 10))
def new_event(aggregate_id, event_type, payload):
return {
"event_id": str(uuid.uuid4()),
"aggregate_id": aggregate_id,
"event_type": event_type,
"payload": json.dumps(payload, ensure_ascii=False)
}
def create_order(order_id, user_id, amount, sku_id, quantity):
event = new_event(order_id, "OrderCreated", {
"order_id": order_id,
"user_id": user_id,
"amount": amount,
"sku_id": sku_id,
"quantity": quantity
})
with get_conn() as conn:
conn.execute(
"INSERT INTO orders(order_id, user_id, amount, status) VALUES (?, ?, ?, ?)",
(order_id, user_id, amount, "PENDING")
)
conn.execute(
"INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
(event["event_id"], event["aggregate_id"], event["event_type"], event["payload"])
)
print(f"[订单服务] 创建订单成功: {order_id}")
def mark_message_processed(conn, consumer_name, event_id):
conn.execute(
"INSERT INTO processed_messages(consumer_name, event_id) VALUES (?, ?)",
(consumer_name, event_id)
)
def is_message_processed(conn, consumer_name, event_id):
row = conn.execute(
"SELECT 1 FROM processed_messages WHERE consumer_name=? AND event_id=?",
(consumer_name, event_id)
).fetchone()
return row is not None
def inventory_consume_order_created(event):
consumer_name = "inventory_service"
payload = json.loads(event["payload"])
order_id = payload["order_id"]
sku_id = payload["sku_id"]
quantity = payload["quantity"]
with get_conn() as conn:
if is_message_processed(conn, consumer_name, event["event_id"]):
print(f"[库存服务] 幂等命中,忽略重复消息: {event['event_id']}")
return
reservation = conn.execute(
"SELECT * FROM inventory_reservations WHERE order_id=? AND sku_id=?",
(order_id, sku_id)
).fetchone()
if reservation:
mark_message_processed(conn, consumer_name, event["event_id"])
print(f"[库存服务] 业务幂等命中,订单已预留库存: {order_id}")
return
stock_row = conn.execute(
"SELECT stock FROM inventory WHERE sku_id=?",
(sku_id,)
).fetchone()
if not stock_row or stock_row["stock"] < quantity:
fail_event = new_event(order_id, "InventoryFailed", {
"order_id": order_id,
"reason": "库存不足"
})
conn.execute(
"INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
(fail_event["event_id"], fail_event["aggregate_id"], fail_event["event_type"], fail_event["payload"])
)
mark_message_processed(conn, consumer_name, event["event_id"])
print(f"[库存服务] 库存不足: {order_id}")
return
conn.execute(
"UPDATE inventory SET stock = stock - ? WHERE sku_id=?",
(quantity, sku_id)
)
conn.execute(
"INSERT INTO inventory_reservations(order_id, sku_id, quantity, status) VALUES (?, ?, ?, ?)",
(order_id, sku_id, quantity, "RESERVED")
)
ok_event = new_event(order_id, "InventoryReserved", {
"order_id": order_id,
"sku_id": sku_id,
"quantity": quantity
})
conn.execute(
"INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
(ok_event["event_id"], ok_event["aggregate_id"], ok_event["event_type"], ok_event["payload"])
)
mark_message_processed(conn, consumer_name, event["event_id"])
print(f"[库存服务] 库存预留成功: {order_id}")
def payment_consume_inventory_reserved(event, should_fail=True):
payload = json.loads(event["payload"])
order_id = payload["order_id"]
with get_conn() as conn:
if should_fail:
fail_event = new_event(order_id, "PaymentFailed", {
"order_id": order_id,
"reason": "支付超时"
})
conn.execute(
"INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
(fail_event["event_id"], fail_event["aggregate_id"], fail_event["event_type"], fail_event["payload"])
)
print(f"[支付服务] 支付失败: {order_id}")
else:
ok_event = new_event(order_id, "PaymentSucceeded", {
"order_id": order_id
})
conn.execute(
"INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
(ok_event["event_id"], ok_event["aggregate_id"], ok_event["event_type"], ok_event["payload"])
)
print(f"[支付服务] 支付成功: {order_id}")
def order_consume_payment_succeeded(event):
payload = json.loads(event["payload"])
order_id = payload["order_id"]
with get_conn() as conn:
conn.execute(
"UPDATE orders SET status='CONFIRMED' WHERE order_id=? AND status='PENDING'",
(order_id,)
)
print(f"[订单服务] 订单确认: {order_id}")
def order_consume_payment_failed(event):
payload = json.loads(event["payload"])
order_id = payload["order_id"]
with get_conn() as conn:
reservation = conn.execute(
"SELECT * FROM inventory_reservations WHERE order_id=? AND status='RESERVED'",
(order_id,)
).fetchone()
if reservation:
compensate_event = new_event(order_id, "ReleaseInventory", {
"order_id": order_id,
"sku_id": reservation["sku_id"],
"quantity": reservation["quantity"]
})
conn.execute(
"INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
(compensate_event["event_id"], compensate_event["aggregate_id"], compensate_event["event_type"], compensate_event["payload"])
)
conn.execute(
"UPDATE orders SET status='CANCELED' WHERE order_id=? AND status='PENDING'",
(order_id,)
)
print(f"[订单服务] 订单取消: {order_id}")
def inventory_consume_release(event):
payload = json.loads(event["payload"])
order_id = payload["order_id"]
sku_id = payload["sku_id"]
quantity = payload["quantity"]
with get_conn() as conn:
reservation = conn.execute(
"SELECT * FROM inventory_reservations WHERE order_id=? AND sku_id=?",
(order_id, sku_id)
).fetchone()
if not reservation:
print(f"[库存服务] 无需回补,预留记录不存在: {order_id}")
return
if reservation["status"] == "RELEASED":
print(f"[库存服务] 幂等命中,库存已回补: {order_id}")
return
conn.execute(
"UPDATE inventory SET stock = stock + ? WHERE sku_id=?",
(quantity, sku_id)
)
conn.execute(
"UPDATE inventory_reservations SET status='RELEASED' WHERE order_id=? AND sku_id=?",
(order_id, sku_id)
)
print(f"[库存服务] 库存回补成功: {order_id}")
def dispatch_event(event):
et = event["event_type"]
if et == "OrderCreated":
inventory_consume_order_created(event)
elif et == "InventoryReserved":
payment_consume_inventory_reserved(event, should_fail=True)
elif et == "PaymentSucceeded":
order_consume_payment_succeeded(event)
elif et == "PaymentFailed":
order_consume_payment_failed(event)
elif et == "ReleaseInventory":
inventory_consume_release(event)
elif et == "InventoryFailed":
print(f"[系统] 订单失败,原因:库存不足, order_id={json.loads(event['payload'])['order_id']}")
else:
print(f"[系统] 未识别事件类型: {et}")
def outbox_publisher_once():
with get_conn() as conn:
events = conn.execute(
"SELECT * FROM outbox_events WHERE status='NEW' ORDER BY created_at LIMIT 20"
).fetchall()
for event in events:
try:
dispatch_event(event)
with get_conn() as conn:
conn.execute(
"UPDATE outbox_events SET status='SENT' WHERE event_id=?",
(event["event_id"],)
)
except Exception as e:
with get_conn() as conn:
conn.execute(
"UPDATE outbox_events SET retry_count = retry_count + 1 WHERE event_id=?",
(event["event_id"],)
)
print(f"[投递器] 处理失败 event_id={event['event_id']}, error={e}")
def print_state():
with get_conn() as conn:
orders = conn.execute("SELECT * FROM orders").fetchall()
inventory = conn.execute("SELECT * FROM inventory").fetchall()
reservations = conn.execute("SELECT * FROM inventory_reservations").fetchall()
print("\n=== 当前状态 ===")
print("orders:")
for r in orders:
print(dict(r))
print("inventory:")
for r in inventory:
print(dict(r))
print("inventory_reservations:")
for r in reservations:
print(dict(r))
print("================\n")
if __name__ == "__main__":
init_db()
create_order("ORD-1001", "U-1", 100, "SKU-1", 2)
for _ in range(5):
outbox_publisher_once()
print_state()
3. 运行结果你会看到什么
上面的代码里,我故意让支付阶段失败:
payment_consume_inventory_reserved(event, should_fail=True)
所以最终效果应该是:
- 订单先创建为
PENDING - 库存预留成功
- 支付失败
- 订单变为
CANCELED - 库存被回补
这就是一个最小可运行的 Saga + Outbox + 幂等的闭环。
关键设计说明
1. 为什么订单先返回成功,而不是等所有步骤完成?
因为跨服务链路可能很长,支付和库存也可能有重试。
如果同步等到底:
- 用户响应时间很难控制
- 调用链耦合严重
- 某个下游慢就把整条链路拖垮
更常见做法是:
- 先返回“受理中”
- 前端轮询或订阅订单状态
- 后台完成 Saga
2. 为什么一定要接受“重复消息”?
因为这些情况永远存在:
- MQ 投递后 ack 丢了
- 消费者处理成功,但回执超时
- 投递器宕机重启重新扫描
- 网络抖动导致发送方误判失败后重发
与其追求理想化的“消息绝不重复”,不如把系统设计成:
重复也没关系。
3. 补偿动作为什么也要幂等?
因为补偿本身也可能重复执行。
比如 ReleaseInventory 收到两次:
- 第一次把库存从 8 加回 10
- 第二次如果再加一次,就变 12 了
所以补偿必须检查当前状态,保证“执行一次和执行十次效果一样”。
常见坑与排查
这一部分我想讲得更接地气一点,因为我自己在项目里真踩过。
坑 1:业务数据提交了,但事件表没写进去
现象
- 订单表有数据
- outbox 没事件
- 下游完全没感知
根因
- 业务写库和 outbox 写库不在同一个事务
- 某些 ORM 配置导致自动提交
排查方法
- 打开 SQL 日志
- 核对是否同一个连接、同一个事务
- 查看失败时数据库中订单和 outbox 的时间戳差异
建议
- 显式开启事务
- 不要把“写订单”和“写 outbox”放在两个仓储层事务里各自提交
坑 2:消费者已经处理成功,但投递状态没更新
现象
- 下游业务已执行
- outbox 仍是
NEW - 后台继续重复发送
根因
- 发送成功和状态更新之间崩溃
- 这其实是 Outbox 的正常设计权衡,不是 bug
正确处理
- 接受重复发送
- 让消费者幂等
如果系统要求“不能重复”,最后通常会把系统复杂度推到很高,收益却不一定值得。
坑 3:补偿顺序写反了
现象
- 先取消订单,再回补库存
- 结果有其他流程看到订单已取消,提前做了售后或退款处理
- 数据链路变得很难解释
建议
补偿要按照依赖关系逆序执行。一般来说:
- 先释放资源
- 再关闭业务单据
坑 4:状态机没有封口
现象
订单既收到 PaymentSucceeded,又迟到一个 PaymentFailed,最后状态来回跳。
建议
状态流转必须加条件:
UPDATE orders
SET status = 'CONFIRMED'
WHERE order_id = ? AND status = 'PENDING';
再比如取消:
UPDATE orders
SET status = 'CANCELED'
WHERE order_id = ? AND status = 'PENDING';
这样迟到消息只会影响 0 行,不会污染最终结果。
坑 5:把 Saga 当成“万能一致性方案”
Saga 很适合订单、库存、履约这类业务。
但如果你处理的是:
- 实时资金账户余额
- 高价值转账
- 强监管账务
就不能只靠“订单状态对了就行”。这类场景往往需要:
- 账户账本
- 冻结/解冻模型
- 更细的对账机制
- 必要时更强一致手段
安全/性能最佳实践
安全方面
1. 事件载荷避免敏感信息裸奔
Outbox 和 MQ 里不要直接放:
- 银行卡号
- 身份证号
- 完整支付凭证
- 用户隐私字段
建议:
- 只传必要业务键
- 敏感字段脱敏
- 必要时加密存储
2. 事件签名与来源校验
跨系统消费时,尤其是多团队共享消息总线,要校验:
- 事件来源
- schema 版本
- 签名或认证信息
否则一条伪造消息就可能触发错误补偿。
3. 补偿接口要防重放
补偿接口经常被忽略,但它同样需要:
- 鉴权
- 幂等令牌
- 调用审计日志
性能方面
1. Outbox 表要按状态和时间建索引
典型索引:
CREATE INDEX idx_outbox_status_created
ON outbox_events(status, created_at);
否则扫描投递会越来越慢。
2. 分批拉取,避免大事务
不要一次扫 10 万条事件。
建议按批次投递,比如:
- 每批 100~500 条
- 控制单次事务大小
- 支持并发 worker
3. 做好归档与清理
Outbox、processed_messages 这种表增长非常快。
建议:
- 保留最近 3~30 天热数据
- 历史归档到冷库
- 定期删除已完成事件
4. 消费者处理要短小
如果消费逻辑太重:
- 不要长时间占用消费线程
- 复杂计算拆到异步任务
- 防止消息积压引发雪崩
容量估算思路
架构文章不能只讲概念,最好顺手把量级感带上。
假设:
- 日订单量:100 万
- 每单平均产生 4 个事件
- 单事件记录平均 1 KB
那么每天 outbox 原始写入量大约:
100 万 * 4 * 1 KB = 约 4 GB/天
再加上索引、重试记录、processed_messages,真实存储往往会更高。
这意味着你需要提前考虑:
- 事件表分区或分表
- 冷热分层
- 消费延迟监控
- 积压峰值时的扩容能力
如果你的系统峰值 TPS 高于平时 5~10 倍,投递器和消费者的并发模型也必须按峰值设计,而不是按平均值设计。
监控与可观测性建议
这块很容易被忽略,但没有它,排障非常痛苦。
建议至少监控这些指标:
- Outbox
NEW状态事件堆积数 - 事件平均投递延迟
- 事件重试次数分布
- 各类 Saga 成功率/补偿率
- 订单从
PENDING到终态的耗时 - 幂等命中次数
- 死信消息数量
日志中建议统一打印:
trace_idorder_idevent_idevent_type- 当前状态
- 重试次数
这样你查一笔订单时,才能把整条链路串起来。
一套比较稳妥的落地建议
如果你准备在真实项目里上这套方案,我建议按下面顺序推进:
-
先定义状态机
- 哪些状态是终态
- 哪些事件允许触发流转
- 迟到消息怎么处理
-
再定义事件模型
- 事件名
- 唯一 ID
- schema 版本
- 业务键
-
实现 Outbox
- 业务表与事件表同事务
- 投递器支持重试、告警、归档
-
实现消费者幂等
- 消息级幂等
- 业务级幂等
- 补偿级幂等
-
最后再补监控和对账
- 别等出事后才补
其中最容易被低估的是对账。
因为即使你设计得很好,生产环境里还是会有:
- 人工改数
- 历史脏数据
- 中间件异常
- 版本升级带来的兼容问题
所以建议保留离线对账任务,定期核对:
- 订单状态
- 库存预留状态
- 支付状态
- 补偿完成情况
总结
服务拆分后,事务一致性不再是“一个事务提交”的问题,而是一条业务链如何在失败、重试、重复、延迟下仍然收敛到正确结果。
这篇文章的核心结论可以浓缩成三句话:
- Saga 负责把跨服务事务拆成可补偿的业务步骤
- Outbox 负责保证业务提交后事件不会凭空丢失
- 幂等设计 负责把重复消息、重复补偿、迟到事件变成可接受现实
如果你让我给一个最实用的落地建议,那就是:
- 不要追求“绝不失败”
- 也不要迷信“消息只会来一次”
- 要把系统设计成:失败可补偿、消息可重放、结果可收敛
边界条件也要说清楚:
- 这套方案适合大多数订单、库存、履约类业务
- 对资金强一致、监管严苛场景,要引入账本、冻结模型甚至更强事务机制
- 如果团队缺少状态机、事件建模、监控能力,先小范围试点,不要一上来全链路改造
最后一句更接近工程实践的话:
分布式一致性不是“选一个模式”就结束,而是模式 + 状态机 + 幂等 + 监控 + 对账一起落地,系统才真正站得住。