分布式架构中基于消息队列与幂等设计的订单系统最终一致性实战
订单系统是很多业务系统里最“不能出错”的一环:用户下单了,库存要扣、优惠券要核销、支付状态要更新、物流要建单、积分可能还要发放。单体应用里这些动作通常写在一个事务里,成功就一起成功,失败就一起回滚;但到了分布式架构里,这种做法基本走不通。
这篇文章我想从订单创建与支付确认这个常见链路切入,讲一套更贴近生产落地的方案:用消息队列解耦跨服务调用,用幂等设计兜住重复投递和重试,把“绝对实时一致”换成“可控的最终一致性”。
这不是纯理论文章,我会给出一套能跑起来的示例代码,并穿插一些我在项目里踩过的坑。
背景与问题
先看一个典型订单链路:
- 用户提交订单
- 订单服务生成订单
- 库存服务扣减库存
- 营销服务锁定优惠券
- 支付成功后更新订单状态
- 积分服务发积分
- 通知服务发短信/站内信
如果这些步骤都用同步 RPC 串起来,会出现几个很现实的问题:
- 强依赖太多:一个下游超时,整个下单链路就卡住
- 事务无法跨服务天然传播
- 部分成功、部分失败时很难处理
- 高峰期链路被拖长,用户体验差
- 重试导致重复扣库存、重复发券、重复发积分
很多团队一开始会想上 2PC、TCC、Saga 等分布式事务方案。但对中多数业务团队来说,真正长期稳定的,往往是:
核心链路保证本地事务,跨服务通过消息驱动,配合幂等、重试、补偿和对账实现最终一致性。
一个具体故障场景
假设订单创建成功后,需要通知库存服务扣库存。
- 订单服务本地事务提交成功
- 消息准备发送给 MQ
- 就在这个瞬间,网络闪断
- 订单库里有订单,但库存没收到事件
这时候系统就出现了“半成功”状态。
再比如:
- MQ 已经把消息投递给库存服务
- 库存服务处理成功了
- 但消费确认 ACK 丢了
- MQ 认为没处理成功,于是又投递一次
如果没有幂等,库存会被扣两次。
所以,最终一致性的关键不是“消息队列一上就万事大吉”,而是这三个问题必须一起解决:
- 消息不能丢
- 消息重复也不能出错
- 失败后能重试、能补偿、能排查
方案全貌:订单最终一致性的落地思路
这里我建议把整个问题拆成两层:
- 订单服务内部:本地事务 + Outbox 事件表
- 服务之间:MQ 异步投递 + 消费端幂等
总体架构图
flowchart LR
U[用户] --> O[订单服务]
O --> DB[(订单库)]
O --> OUTBOX[(Outbox事件表)]
OUTBOX --> P[消息投递器]
P --> MQ[(消息队列)]
MQ --> I[库存服务]
MQ --> C[优惠券服务]
MQ --> N[通知服务]
I --> IDB[(库存库)]
C --> CDB[(营销库)]
N --> NDB[(通知库)]
核心动作拆解
订单创建时,不直接“先发 MQ 再写数据库”,而是:
- 开启本地事务
- 写订单表
- 写 Outbox 事件表
- 提交事务
- 由独立投递器扫描 Outbox,把事件投递到 MQ
- 消费者收到消息后做幂等处理
- 成功则记录消费成功;失败则重试/进入死信队列
这种模式的关键价值在于:
- 订单数据和待发送消息在一个本地事务里提交
- 即使 MQ 暂时不可用,消息也不会丢,因为事件还在数据库里
- 投递器可以异步补发
- 消费端幂等可以抵御重复消息
核心原理
这一节讲清楚为什么这套方案可行。
1. 最终一致性,不等于放弃一致性
很多人第一次接触最终一致性,会误以为是“允许数据错一会儿”。这话不算错,但不完整。
更准确地说:
- 核心事实必须落地:例如订单已创建、支付已成功
- 派生动作允许异步完成:例如发积分、发通知
- 中间短暂不一致是可接受的
- 最终必须收敛到正确状态
也就是说,最终一致性不是“随缘一致”,而是要靠消息、重试、幂等、补偿、对账把状态收回来。
2. Outbox 模式解决“本地事务与发消息的原子性”
最常见错误写法:
写订单成功 -> 发 MQ 成功/失败
问题是数据库和 MQ 不是一个事务资源,两个动作之间天然有缝。
Outbox 的做法是把“待发消息”先写进数据库表:
orders:业务事实outbox_events:要发出去的领域事件
这两个写入在同一个数据库事务里完成,因此要么都成功,要么都失败。
Outbox 状态流转
stateDiagram-v2
[*] --> NEW
NEW --> PUBLISHED: 投递成功
NEW --> RETRYING: 投递失败
RETRYING --> PUBLISHED: 重试成功
RETRYING --> DEAD: 超过重试阈值
PUBLISHED --> [*]
DEAD --> [*]
3. 幂等是消费端的生命线
在 MQ 语义下,尤其是 At-Least-Once 投递模型里,重复消息非常正常:
- 生产端重发
- Broker 重投
- 消费端超时
- ACK 丢失
- 业务方主动补偿重放
所以要默认一个事实:
任何消息都可能被消费多次。
幂等常见做法有三类:
方案 A:业务唯一键约束
例如扣库存记录表里,用 (order_id, sku_id) 做唯一索引。
插入重复记录时数据库拒绝第二次写入。
优点:
- 简单直接
- 强依赖数据库约束,可靠
缺点:
- 只适用于“可映射为唯一业务动作”的场景
方案 B:消费去重表
为每条消息记录一个 message_id 或 biz_key,消费前先查是否处理过。
优点:
- 通用性强
- 易审计
缺点:
- 要考虑并发下“先查后插”竞态问题,通常需要唯一索引兜底
方案 C:状态机幂等
比如订单状态只能从:
CREATED -> PAIDPAID -> SHIPPED
如果收到重复的“支付成功”消息,而订单已经是 PAID,那就直接忽略。
优点:
- 非常符合业务语义
缺点:
- 需要设计清晰的状态迁移规则
在生产上,我通常建议三者组合:
唯一索引兜底 + 消费记录表审计 + 状态机校验业务正确性。
4. 重试、死信、补偿缺一不可
最终一致性不是只靠“多试几次”。
要分清三种故障:
- 瞬时故障:网络抖动、下游短暂超时
适合自动重试 - 业务故障:库存不足、订单已取消
不应无限重试,应进入业务补偿 - 程序缺陷:代码 bug、脏数据格式错误
应进入死信队列,等待人工介入
消息处理时序图
sequenceDiagram
participant OS as 订单服务
participant DB as 订单库
participant OB as Outbox
participant PUB as 投递器
participant MQ as 消息队列
participant IS as 库存服务
participant IDC as 去重记录
OS->>DB: 写订单
OS->>OB: 写Outbox事件
OS->>DB: 提交本地事务
PUB->>OB: 扫描未投递事件
PUB->>MQ: 发送消息
MQ->>IS: 投递扣库存事件
IS->>IDC: 插入message_id(唯一约束)
alt 首次消费
IS->>IS: 扣减库存
IS-->>MQ: ACK
else 重复消费
IS->>IS: 忽略重复请求
IS-->>MQ: ACK
end
方案对比与取舍分析
订单场景里,常见有三类思路。
1. 同步 RPC + 本地事务
优点:
- 简单,直观
- 调试成本低
缺点:
- 跨服务链路长
- 吞吐差
- 下游故障会放大到主链路
适用:
- 小系统、低并发、依赖少
2. 强一致分布式事务
优点:
- 业务逻辑看起来最“整齐”
缺点:
- 落地复杂
- 对参与方侵入高
- 高可用和性能代价大
适用:
- 金融级、强一致要求极高的核心账务系统
3. MQ + 幂等 + 补偿的最终一致性
优点:
- 解耦好
- 抗峰值能力强
- 吞吐高
- 工程落地性强
缺点:
- 系统复杂度上升
- 调试需要可观测性支撑
- 需要接受短时不一致
适用:
- 订单、库存、营销、通知等典型互联网业务链路
如果让我给建议:
- 钱款记账、余额扣减:优先审慎设计,必要时上更严格事务模型
- 订单派生动作、库存锁定、优惠券核销、积分通知:最终一致性是更务实的选择
表设计建议
下面用 SQLite 演示,但字段设计对 MySQL/PostgreSQL 也适用。
订单表
CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
amount INTEGER NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
Outbox 事件表
CREATE TABLE IF NOT EXISTS outbox_events (
event_id TEXT PRIMARY KEY,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
next_retry_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_outbox_status_next_retry
ON outbox_events(status, next_retry_at);
库存表与消费去重表
CREATE TABLE IF NOT EXISTS inventory (
sku_id TEXT PRIMARY KEY,
available INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS processed_messages (
message_id TEXT PRIMARY KEY,
consumer TEXT NOT NULL,
processed_at TEXT NOT NULL
);
实战代码(可运行)
下面给出一个可直接运行的 Python 示例。
它模拟了这几个环节:
- 创建订单
- 在本地事务中写订单和 Outbox
- 投递器扫描 Outbox 并把消息发到内存队列
- 库存消费者处理消息
- 重复投递时通过幂等表避免重复扣减
你可以直接保存为 order_eventual_consistency.py 运行。
import sqlite3
import json
import uuid
import queue
from datetime import datetime
def now():
return datetime.utcnow().isoformat()
class DB:
def __init__(self, path=":memory:"):
self.conn = sqlite3.connect(path)
self.conn.isolation_level = None
self.init_schema()
def init_schema(self):
cur = self.conn.cursor()
cur.executescript("""
CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
amount INTEGER NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS outbox_events (
event_id TEXT PRIMARY KEY,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
next_retry_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_outbox_status_next_retry
ON outbox_events(status, next_retry_at);
CREATE TABLE IF NOT EXISTS inventory (
sku_id TEXT PRIMARY KEY,
available INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS processed_messages (
message_id TEXT PRIMARY KEY,
consumer TEXT NOT NULL,
processed_at TEXT NOT NULL
);
""")
self.conn.commit()
def begin(self):
self.conn.execute("BEGIN")
def commit(self):
self.conn.commit()
def rollback(self):
self.conn.rollback()
class OrderService:
def __init__(self, db: DB):
self.db = db
def create_order(self, user_id: str, amount: int, sku_id: str, qty: int):
order_id = str(uuid.uuid4())
event_id = str(uuid.uuid4())
payload = {
"message_id": event_id,
"order_id": order_id,
"sku_id": sku_id,
"qty": qty
}
try:
self.db.begin()
self.db.conn.execute("""
INSERT INTO orders(order_id, user_id, amount, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (order_id, user_id, amount, "CREATED", now(), now()))
self.db.conn.execute("""
INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status, retry_count, next_retry_at, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
event_id,
order_id,
"OrderCreated",
json.dumps(payload),
"NEW",
0,
now(),
now(),
now()
))
self.db.commit()
print(f"[OrderService] 订单创建成功 order_id={order_id}")
return order_id
except Exception:
self.db.rollback()
raise
class OutboxPublisher:
def __init__(self, db: DB, mq: queue.Queue):
self.db = db
self.mq = mq
def publish_pending_events(self):
cur = self.db.conn.cursor()
rows = cur.execute("""
SELECT event_id, payload, retry_count
FROM outbox_events
WHERE status IN ('NEW', 'RETRYING')
ORDER BY created_at
""").fetchall()
for event_id, payload, retry_count in rows:
try:
# 模拟投递到 MQ
self.mq.put(json.loads(payload))
self.db.conn.execute("""
UPDATE outbox_events
SET status = ?, updated_at = ?
WHERE event_id = ?
""", ("PUBLISHED", now(), event_id))
self.db.conn.commit()
print(f"[Publisher] 事件投递成功 event_id={event_id}")
except Exception as e:
self.db.conn.execute("""
UPDATE outbox_events
SET status = ?, retry_count = ?, updated_at = ?
WHERE event_id = ?
""", ("RETRYING", retry_count + 1, now(), event_id))
self.db.conn.commit()
print(f"[Publisher] 投递失败 event_id={event_id}, err={e}")
class InventoryConsumer:
def __init__(self, db: DB, mq: queue.Queue, consumer_name="inventory-consumer"):
self.db = db
self.mq = mq
self.consumer_name = consumer_name
def handle_once(self):
if self.mq.empty():
return False
message = self.mq.get()
message_id = message["message_id"]
sku_id = message["sku_id"]
qty = message["qty"]
try:
self.db.begin()
# 幂等去重:依赖 processed_messages.message_id 主键唯一
self.db.conn.execute("""
INSERT INTO processed_messages(message_id, consumer, processed_at)
VALUES (?, ?, ?)
""", (message_id, self.consumer_name, now()))
row = self.db.conn.execute("""
SELECT available FROM inventory WHERE sku_id = ?
""", (sku_id,)).fetchone()
if row is None:
raise Exception(f"SKU 不存在 sku_id={sku_id}")
available = row[0]
if available < qty:
raise Exception(f"库存不足 sku_id={sku_id}, available={available}, need={qty}")
self.db.conn.execute("""
UPDATE inventory
SET available = available - ?
WHERE sku_id = ?
""", (qty, sku_id))
self.db.commit()
print(f"[InventoryConsumer] 扣库存成功 sku_id={sku_id}, qty={qty}, message_id={message_id}")
return True
except sqlite3.IntegrityError:
# 重复消费,认为成功
self.db.rollback()
print(f"[InventoryConsumer] 检测到重复消息,已忽略 message_id={message_id}")
return True
except Exception as e:
self.db.rollback()
print(f"[InventoryConsumer] 消费失败 message_id={message_id}, err={e}")
return False
def seed_inventory(db: DB):
db.conn.execute("DELETE FROM inventory")
db.conn.execute("INSERT INTO inventory(sku_id, available) VALUES (?, ?)", ("SKU-1", 10))
db.conn.commit()
def print_state(db: DB):
print("\n=== orders ===")
for row in db.conn.execute("SELECT * FROM orders"):
print(row)
print("\n=== outbox_events ===")
for row in db.conn.execute("SELECT event_id, aggregate_id, event_type, status, retry_count FROM outbox_events"):
print(row)
print("\n=== inventory ===")
for row in db.conn.execute("SELECT * FROM inventory"):
print(row)
print("\n=== processed_messages ===")
for row in db.conn.execute("SELECT * FROM processed_messages"):
print(row)
print()
if __name__ == "__main__":
db = DB()
mq = queue.Queue()
seed_inventory(db)
order_service = OrderService(db)
publisher = OutboxPublisher(db, mq)
consumer = InventoryConsumer(db, mq)
order_id = order_service.create_order(
user_id="U1001",
amount=199,
sku_id="SKU-1",
qty=2
)
publisher.publish_pending_events()
# 模拟重复投递:把同一条消息再塞一次
outbox_payload = db.conn.execute("""
SELECT payload FROM outbox_events LIMIT 1
""").fetchone()[0]
mq.put(json.loads(outbox_payload))
consumer.handle_once()
consumer.handle_once()
print_state(db)
运行效果说明
正常情况下你会看到:
- 订单成功创建
- Outbox 事件成功投递
- 第一次消费扣库存成功
- 第二次重复消息被识别并忽略
- 库存只减少一次
这就是幂等生效的直接证据。
实战落地要点
上面的代码只是最小可运行示例,真正落地时还需要补上几个关键能力。
1. 订单状态不要“一步到位”
一个常见误区是:用户下单后,订单状态立刻写成“成功”。
更合理的状态设计通常是:
CREATED:订单已创建,待处理后续动作STOCK_RESERVED:库存锁定成功PAID:支付成功FULFILLING:履约中DONE:完成CANCELED:取消
这样做有两个好处:
- 状态机清晰,便于补偿
- 能识别重复事件和逆序事件
2. 区分“锁库存”和“扣库存”
在高并发下单场景里,我更推荐:
- 下单时:锁库存
- 支付成功后:正式扣减
- 超时未支付:释放锁库存
否则会遇到一个实际问题:用户下单了但没支付,如果你直接扣真实库存,库存利用率会被拖垮。
3. 事件必须带业务键
消息体里至少要包含:
message_id:消息唯一 IDorder_id:业务主键event_typeoccurred_at- 关键业务字段快照
不要只发“某某 ID,请你自己查库”。
这会导致:
- 消费端依赖生产端数据库状态
- 重放历史消息时结果不稳定
- 排查困难
容量估算与性能思考
架构设计不能只讲“能不能跑”,还要考虑“跑多大”。
1. Outbox 表会不会越来越大?
会,所以必须治理。
常见策略:
PUBLISHED状态保留 3~7 天后归档- 按时间分表或分区
- 投递扫描只查近一段时间 + 指定状态
- 建立
(status, next_retry_at)索引
如果你的订单量很大,Outbox 不做清理,很快会拖慢扫描任务。
2. 消费去重表会不会膨胀?
也会。
建议:
- 按消费者维度分表或带分区键
- 只保留一定时间窗口内的去重记录,例如 7~30 天
- 对超长链路重放需求,转交审计系统或冷存储
3. 吞吐瓶颈在哪?
一般有三个热点:
- Outbox 扫描频率过高
- 消费端幂等表写入冲突
- 库存热点 SKU 更新行锁竞争
优化思路:
- 扫描按批次分页
- 多实例分片投递
- 热点库存采用分段库存或 Redis 预扣 + DB 最终校准
- 消费端批量 ACK、批量处理(视 MQ 能力而定)
常见坑与排查
这一部分我尽量写得实战一点,因为真正折腾人的,往往不是原理,而是那些“明明看起来没问题,线上就是偶发错”的场景。
坑 1:先发消息,再提交数据库事务
现象:
- 下游消费到了消息
- 但查订单主表查不到数据
原因:
- 消息发出去了,但本地事务回滚了
排查:
- 看消息发送时间和订单表写入时间
- 查订单事务日志是否回滚
- 检查是否把发消息放在事务提交前
建议:
- 严格采用 Outbox,别图省事
坑 2:幂等只做内存缓存
现象:
- 单实例测试正常
- 多实例部署后重复扣库存
原因:
- 本地内存去重无法跨实例共享
- 服务重启后去重信息丢失
排查:
- 检查幂等是否依赖本地
set/map - 检查消费者是否横向扩容
- 看重复消息是否落在不同实例上
建议:
- 幂等状态必须落持久化存储,数据库唯一键最稳
坑 3:消费失败无限重试
现象:
- 某条坏消息持续堆积
- 队列消费延迟越来越大
原因:
- 没区分技术失败和业务失败
排查:
- 看异常类型是否固定
- 看是否库存不足却还在重试
- 检查死信队列是否启用
建议:
- 可重试错误才重试
- 业务性失败直接标记并补偿
- 超过阈值进死信队列
坑 4:消息顺序被误认为天然有保障
现象:
- 先收到订单取消,再收到支付成功
- 状态更新混乱
原因:
- 多分区、多消费者并发下,跨键顺序通常不可靠
排查:
- 看消息是否使用同一个分区键
- 查是否存在并发消费
- 检查状态流转是否允许逆序到达
建议:
- 不要过度依赖全局顺序
- 以业务状态机兜底
- 关键事件使用
order_id作为分区键
坑 5:对账能力缺失
现象:
- 用户投诉订单异常,但系统日志看起来都“成功了”
原因:
- 可能中间某一步悄悄失败,没有最终收敛
排查:
- 对比订单表、库存表、营销表状态
- 查 Outbox 是否有长期
RETRYING/DEAD - 查消费去重表、死信队列、补偿任务执行记录
建议:
- 做日常对账任务
- 建立异常订单清单
- 用自动补偿修复大多数问题
安全/性能最佳实践
这部分经常被忽略,但生产环境里非常关键。
安全实践
1. 消息体避免携带敏感信息
不要把这些字段直接塞进 MQ:
- 完整身份证号
- 银行卡号
- 明文手机号
- 支付凭证明文
建议:
- 传业务 ID,不传敏感原文
- 必要字段脱敏
- 消息链路加密与权限隔离
2. 防止消息伪造与重放
建议做法:
- 生产者身份认证
- Topic 访问 ACL
- 消息签名或至少做来源校验
- 消费端校验事件版本与来源系统
3. 死信队列要有权限控制
死信消息通常含有完整业务上下文,排查很方便,但也更敏感。
不要让所有人都能随便查看和重放。
性能实践
1. 消费端先落幂等记录,再执行业务
原因很简单:
- 如果先扣库存,再写去重记录
- 中途崩了
- 重试后还是会再次扣库存
顺序必须是:
- 写幂等记录(依赖唯一约束)
- 执行业务更新
- 一起提交事务
2. 限制重试退避策略
别无脑立刻重试。建议指数退避:
- 第 1 次:5 秒
- 第 2 次:30 秒
- 第 3 次:2 分钟
- 第 4 次:10 分钟
这样能避免下游故障时雪崩放大。
3. 建监控,不要裸跑
至少监控这些指标:
- Outbox 未投递数量
- Outbox 重试次数分布
- MQ 积压长度
- 消费成功率/失败率
- 死信队列数量
- 订单状态长时间停留在中间态的数量
4. 对库存热点做特殊治理
如果某个爆款 SKU 特别热,数据库那一行会成为锁竞争热点。
这时候单靠“MQ + 幂等”还不够,需要叠加:
- 库存分桶
- 预扣模型
- 限流
- 异步排队
一个更完整的生产落地清单
如果你准备把这套方案真正用到项目里,我建议至少检查下面这些项:
- 订单写入与 Outbox 写入在同一事务
- Outbox 有状态字段、重试次数、下次重试时间
- 投递器支持批量扫描、失败重试、告警
- 消费端幂等依赖持久化唯一约束
- 业务状态机定义清晰,拒绝非法状态跳转
- 区分可重试异常与不可重试异常
- 死信队列可观测、可回放、可审计
- 有对账任务扫描长时间未收敛订单
- 有人工介入后台处理异常单
- 敏感数据不直接透传消息链路
总结
在分布式订单系统里,最终一致性不是某个中间件的能力,而是一整套工程设计习惯。
如果只上消息队列,不做幂等,系统迟早会在重复消费上出问题;
如果只做幂等,不解决“数据库提交了但消息没发出去”,系统又会在消息丢失上翻车。
真正可落地的组合通常是:
- 本地事务保证核心事实
- Outbox 保证消息可恢复
- MQ 负责异步解耦和削峰
- 消费端幂等抵御重复投递
- 重试、死信、补偿、对账负责最终收敛
最后给几个可执行建议:
- 先把核心链路做扎实:订单、库存、支付状态先稳定,再扩展积分、通知等派生动作
- 幂等优先靠数据库唯一约束兜底:这是最不容易“想当然失效”的方案
- 不要迷信消息顺序:状态机比顺序假设更可靠
- 对账一定要做:它不是补丁,而是最终一致性的组成部分
- 接受边界条件:最终一致性适合大多数订单协同场景,但不等于所有账务都该这么做
如果你当前的订单系统还在用一串同步调用硬顶高峰,或者经常出现“偶发重复扣库存、偶发漏通知、偶发状态不一致”,那这套方案非常值得你逐步引入。先从 Outbox + 消费幂等 两件事做起,通常就能把系统稳定性拉高一个层级。