跳转到内容
123xiao | 无名键客

《分布式架构中基于消息队列的最终一致性实现:订单与库存场景的设计与避坑》

字数: 0 阅读时长: 1 分钟

背景与问题

订单服务和库存服务,几乎是分布式系统里最经典的一对“冤家”。

在单体应用里,下单通常就是一段本地事务:

  1. 创建订单
  2. 扣减库存
  3. 提交事务

要么全成功,要么全失败,逻辑非常直观。

但一旦拆成微服务:

  • 订单服务负责订单创建
  • 库存服务负责库存扣减
  • 两边各自有独立数据库

这时问题立刻出现了:跨服务的原子事务很难做,成本也很高。如果硬上 2PC/XA,理论上可以追求强一致,但实际在高并发、电商促销、链路复杂的场景里,往往会碰到这些问题:

  • 性能差,锁资源时间长
  • 可用性下降,协调器成为薄弱点
  • 服务升级、扩缩容、异构存储时很别扭
  • 一旦链路某处抖动,整体吞吐明显下滑

所以很多业务系统会选择一个更现实的方案:基于消息队列实现最终一致性

这里的核心目标不是“绝对同一时刻一致”,而是:

  • 订单最终能反映库存处理结果
  • 库存不会被重复扣减
  • 系统在失败、重试、消息重复、服务抖动的情况下,仍能恢复到正确状态

这篇文章我就围绕一个典型场景来讲:用户下单后,订单服务通过消息驱动库存扣减,最终达成一致。重点不是概念堆砌,而是把设计、代码、坑点和排查思路串起来。


场景建模:订单与库存的一条真实链路

先把问题说具体一点。

假设用户下单购买商品 sku-1001,数量 2。业务流程可能是:

  1. 订单服务校验参数,创建订单,状态为 PENDING
  2. 订单服务发送“订单已创建,待扣库存”消息
  3. 库存服务消费消息,执行库存预扣或实扣
  4. 库存服务处理完成后,回写结果消息
  5. 订单服务根据结果把订单状态改成:
    • CONFIRMED:库存扣减成功
    • FAILED:库存不足或处理失败

这个流程里最重要的不是“消息有没有发出去”,而是:

  • 订单数据库写入和消息发送不能丢一头
  • 库存消费必须幂等
  • 回执消息也必须可重试、可对账
  • 订单状态流转要可恢复,不要卡死在中间态

下面先看整体结构。

flowchart LR
    U[用户下单] --> O[订单服务]
    O --> OD[(订单库)]
    O --> OT[(Outbox事件表)]
    O --> MQ[(消息队列)]
    MQ --> I[库存服务]
    I --> ID[(库存库)]
    I --> MQ2[(结果消息)]
    MQ2 --> O2[订单结果处理]
    O2 --> OD

这张图里有一个关键点:Outbox 事件表。这是很多系统把“最终一致性”做稳的核心。


核心原理

1. 最终一致性不是“放个 MQ 就行”

很多人第一次设计时会写成这样:

  1. 订单服务插入订单
  2. 调用 MQ 发送消息

看起来没问题,但实际上这里有一个经典故障窗口:

  • 订单已写入数据库
  • 还没来得及发消息,服务宕机了

结果就是:订单存在,但库存永远收不到消息

反过来也一样危险:

  • 消息发出去了
  • 订单事务回滚了

结果库存已经扣了,但订单根本不存在。

所以“数据库写入”和“消息发送”必须通过某种机制关联起来。最常见、最好落地的办法,就是 本地事务 + Outbox 表 + 异步投递

2. Outbox 模式

思路很朴素:

  • 在订单服务本地事务里,同时做两件事:
    • 写订单表
    • 写消息事件表(Outbox)
  • 事务提交后,由后台任务扫描 Outbox 表,把未投递事件推送到 MQ
  • 推送成功后,将事件标记为已发送

这样做的好处是:

  • 订单写入和“待发送消息”是同一个本地事务
  • 不依赖分布式事务
  • 即使 MQ 短暂不可用,也能靠后台补发恢复

订单侧状态流转

stateDiagram-v2
    [*] --> PENDING
    PENDING --> CONFIRMED: 库存扣减成功
    PENDING --> FAILED: 库存不足/处理失败
    PENDING --> CANCELLED: 超时未完成取消
    FAILED --> [*]
    CONFIRMED --> [*]
    CANCELLED --> [*]

注意这里我特意保留了 PENDING 中间态。因为在分布式系统里,中间态不是问题,不可恢复的中间态才是问题

3. 消费端幂等

MQ 的现实世界里,你必须默认下面这些事都会发生:

  • 消息重复投递
  • 消费者处理到一半崩溃
  • 消费成功了,但 ack 丢了
  • 网络抖动导致生产方重复发送

所以库存服务不能靠“消息只来一次”活着,而必须靠幂等活着。

常见做法:

  • 给每个业务事件一个唯一 event_id
  • 库存服务落库前先检查 event_id 是否处理过
  • 已处理则直接返回成功,不再重复扣减

4. 结果回传与补偿

仅有“订单发消息给库存”还不够,订单服务还得知道处理结果。通常有两种方式:

  • 同步查询库存结果:实现简单,但耦合高
  • 库存服务回发结果消息:更解耦,更适合异步架构

我更推荐第二种。因为最终一致性的思维就是:不要试图在一次 RPC 里完成所有事情,而是靠事件驱动完成收敛


方案对比与取舍分析

在订单库存场景里,常见方案其实有三种。

方案一:同步 RPC + 本地重试

流程:

  1. 订单创建
  2. 同步调用库存服务扣减
  3. 成功则提交订单,失败则回滚

优点:

  • 链路直观
  • 排查简单

缺点:

  • 强依赖库存服务可用性
  • 高峰期容易级联失败
  • 一旦拆库,事务边界不好处理

适用场景:

  • 并发不高
  • 服务少
  • 对实时性要求极强

方案二:分布式事务(2PC/XA)

优点:

  • 追求强一致
  • 业务代码看起来干净

缺点:

  • 吞吐差
  • 对数据库、驱动、中间件要求高
  • 运维复杂,故障面大

适用场景:

  • 核心金融类、低吞吐、对强一致要求极高的局部链路

方案三:MQ 最终一致性

优点:

  • 解耦
  • 吞吐高
  • 容错和恢复能力更强

缺点:

  • 业务状态变复杂
  • 需要幂等、补偿、对账机制
  • 排查比单体事务更考验工程能力

适用场景:

  • 电商、零售、营销、履约等高并发业务

如果你问我在订单库存这种问题上更推荐哪个,我的答案通常是:大多数互联网交易链路,优先考虑 MQ 最终一致性。但前提是团队能把基础设施和可观测性做好。


实战代码(可运行)

下面我用 Python 做一个可运行的简化示例。它不依赖真实 MQ,而是用内存队列模拟消息中间件,重点展示几个核心点:

  • 订单创建与 Outbox 落库
  • Outbox 事件投递
  • 库存消费幂等
  • 结果回传
  • 订单最终状态收敛

这是一个教学示例,真实生产环境应替换为 MySQL/PostgreSQL + Kafka/RabbitMQ/RocketMQ 等。

代码示例

import sqlite3
import json
import queue
import threading
import time
import uuid
from contextlib import contextmanager

DB_FILE = "demo_order_inventory.db"

order_to_stock_mq = queue.Queue()
stock_to_order_mq = queue.Queue()


def init_db():
    conn = sqlite3.connect(DB_FILE)
    cur = conn.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS orders (
        order_id TEXT PRIMARY KEY,
        sku_id TEXT NOT NULL,
        quantity INTEGER NOT NULL,
        status TEXT NOT NULL,
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS outbox_events (
        event_id TEXT PRIMARY KEY,
        topic TEXT NOT NULL,
        payload TEXT NOT NULL,
        status TEXT NOT NULL,
        retry_count INTEGER DEFAULT 0,
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS inventory (
        sku_id TEXT PRIMARY KEY,
        available INTEGER NOT NULL
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS inventory_txn (
        event_id TEXT PRIMARY KEY,
        order_id TEXT NOT NULL,
        sku_id TEXT NOT NULL,
        quantity INTEGER NOT NULL,
        result TEXT NOT NULL,
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS order_result_events (
        event_id TEXT PRIMARY KEY,
        order_id TEXT NOT NULL,
        result TEXT NOT NULL,
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
    )
    """)

    cur.execute("INSERT OR IGNORE INTO inventory(sku_id, available) VALUES('sku-1001', 10)")

    conn.commit()
    conn.close()


@contextmanager
def db_conn():
    conn = sqlite3.connect(DB_FILE)
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()


def create_order(order_id, sku_id, quantity):
    event_id = str(uuid.uuid4())
    payload = {
        "event_id": event_id,
        "order_id": order_id,
        "sku_id": sku_id,
        "quantity": quantity
    }

    with db_conn() as conn:
        cur = conn.cursor()
        cur.execute(
            "INSERT INTO orders(order_id, sku_id, quantity, status) VALUES (?, ?, ?, ?)",
            (order_id, sku_id, quantity, "PENDING")
        )
        cur.execute(
            "INSERT INTO outbox_events(event_id, topic, payload, status) VALUES (?, ?, ?, ?)",
            (event_id, "order_created", json.dumps(payload), "NEW")
        )

    print(f"[订单服务] 订单创建成功 order_id={order_id}, status=PENDING")


def outbox_dispatcher():
    while True:
        with db_conn() as conn:
            cur = conn.cursor()
            cur.execute("""
                SELECT event_id, topic, payload, retry_count
                FROM outbox_events
                WHERE status = 'NEW'
                ORDER BY created_at
                LIMIT 10
            """)
            rows = cur.fetchall()

            for event_id, topic, payload, retry_count in rows:
                try:
                    order_to_stock_mq.put(json.loads(payload), timeout=1)
                    cur.execute(
                        "UPDATE outbox_events SET status = 'SENT' WHERE event_id = ?",
                        (event_id,)
                    )
                    print(f"[Outbox] 事件已投递 event_id={event_id}")
                except Exception:
                    cur.execute(
                        "UPDATE outbox_events SET retry_count = ? WHERE event_id = ?",
                        (retry_count + 1, event_id)
                    )

        time.sleep(1)


def inventory_consumer():
    while True:
        try:
            msg = order_to_stock_mq.get(timeout=1)
        except queue.Empty:
            continue

        event_id = msg["event_id"]
        order_id = msg["order_id"]
        sku_id = msg["sku_id"]
        quantity = msg["quantity"]

        with db_conn() as conn:
            cur = conn.cursor()

            # 幂等校验:同一个 event_id 处理过就直接跳过
            cur.execute("SELECT result FROM inventory_txn WHERE event_id = ?", (event_id,))
            existed = cur.fetchone()
            if existed:
                print(f"[库存服务] 重复消息,跳过 event_id={event_id}")
                continue

            cur.execute("SELECT available FROM inventory WHERE sku_id = ?", (sku_id,))
            row = cur.fetchone()
            if not row:
                result = "FAILED"
            else:
                available = row[0]
                if available >= quantity:
                    cur.execute(
                        "UPDATE inventory SET available = available - ? WHERE sku_id = ?",
                        (quantity, sku_id)
                    )
                    result = "SUCCESS"
                else:
                    result = "FAILED"

            cur.execute("""
                INSERT INTO inventory_txn(event_id, order_id, sku_id, quantity, result)
                VALUES (?, ?, ?, ?, ?)
            """, (event_id, order_id, sku_id, quantity, result))

        stock_to_order_mq.put({
            "event_id": str(uuid.uuid4()),
            "order_id": order_id,
            "source_event_id": event_id,
            "result": result
        })

        print(f"[库存服务] 处理完成 order_id={order_id}, result={result}")


def order_result_consumer():
    while True:
        try:
            msg = stock_to_order_mq.get(timeout=1)
        except queue.Empty:
            continue

        result_event_id = msg["event_id"]
        order_id = msg["order_id"]
        result = msg["result"]

        with db_conn() as conn:
            cur = conn.cursor()

            # 回执消息也要幂等
            cur.execute("SELECT 1 FROM order_result_events WHERE event_id = ?", (result_event_id,))
            if cur.fetchone():
                continue

            cur.execute(
                "INSERT INTO order_result_events(event_id, order_id, result) VALUES (?, ?, ?)",
                (result_event_id, order_id, result)
            )

            new_status = "CONFIRMED" if result == "SUCCESS" else "FAILED"
            cur.execute(
                "UPDATE orders SET status = ? WHERE order_id = ? AND status = 'PENDING'",
                (new_status, order_id)
            )

        print(f"[订单服务] 订单状态更新 order_id={order_id}, status={new_status}")


def print_state():
    with db_conn() as conn:
        cur = conn.cursor()

        cur.execute("SELECT order_id, sku_id, quantity, status FROM orders ORDER BY created_at")
        orders = cur.fetchall()

        cur.execute("SELECT sku_id, available FROM inventory")
        inventory = cur.fetchall()

        print("\n===== 当前系统状态 =====")
        print("orders:")
        for row in orders:
            print(row)
        print("inventory:")
        for row in inventory:
            print(row)
        print("=======================\n")


if __name__ == "__main__":
    init_db()

    threads = [
        threading.Thread(target=outbox_dispatcher, daemon=True),
        threading.Thread(target=inventory_consumer, daemon=True),
        threading.Thread(target=order_result_consumer, daemon=True),
    ]

    for t in threads:
        t.start()

    create_order("order-1001", "sku-1001", 2)
    create_order("order-1002", "sku-1001", 5)
    create_order("order-1003", "sku-1001", 6)

    time.sleep(5)
    print_state()

运行方式

python demo.py

预期现象

初始库存是 10:

  • order-1001 扣 2,成功
  • order-1002 扣 5,成功
  • order-1003 扣 6,失败,因为只剩 3

这段代码虽然是简化版,但把几个关键设计都带出来了:

  • 创建订单和写 Outbox 在一个本地事务里
  • Outbox 异步投递
  • 库存消费按 event_id 幂等
  • 结果消息回流到订单服务
  • 订单从 PENDING 收敛到最终状态

时序图:一笔订单如何走完整条链路

sequenceDiagram
    participant User as 用户
    participant Order as 订单服务
    participant DB as 订单DB/Outbox
    participant MQ as 消息队列
    participant Stock as 库存服务
    participant InvDB as 库存DB

    User->>Order: 提交订单
    Order->>DB: 本地事务写订单(PENDING)+Outbox事件
    DB-->>Order: 提交成功
    Order-->>User: 下单受理成功

    Order->>MQ: Outbox异步投递订单事件
    MQ->>Stock: 投递扣库存消息
    Stock->>InvDB: 幂等检查并扣减库存
    InvDB-->>Stock: 成功/失败
    Stock->>MQ: 发送库存处理结果
    MQ->>Order: 投递结果消息
    Order->>DB: 更新订单状态(CONFIRMED/FAILED)

常见坑与排查

这一部分非常关键。很多团队不是不会画架构图,而是上线后扛不住异常流量和边界故障。下面这些坑,我基本都见过,有些我自己也踩过。

1. 订单落库成功,但消息没发出去

现象

  • 订单状态一直是 PENDING
  • 库存服务完全没收到消息
  • 业务方反馈“订单卡住了”

根因

通常是没有 Outbox,或者 Outbox 扫描任务停了。

排查路径

  1. 查订单表是否存在该订单
  2. 查 Outbox 表是否存在对应事件
  3. 看事件状态:
    • NEW:说明没发出去
    • SENT:说明发过了,去 MQ 查消费情况
  4. 检查 Outbox 投递线程/任务是否异常退出
  5. 看 MQ 连接、权限、broker 状态

建议

  • Outbox 扫描任务必须有监控
  • NEW 状态积压要报警
  • 单个事件重试次数超过阈值要人工介入

2. 消息重复消费,库存被扣多次

现象

  • 同一订单只下了一次
  • 库存却减少了两次甚至更多

根因

消费端没有做幂等,或者幂等只做在内存里,服务重启后失效。

正确做法

  • 以业务唯一键或事件唯一键做幂等
  • 幂等记录必须持久化到数据库或可靠存储
  • 扣减库存和写幂等记录应在同一事务里完成

SQL 思路示例

CREATE TABLE inventory_txn (
  event_id VARCHAR(64) PRIMARY KEY,
  order_id VARCHAR(64) NOT NULL,
  sku_id VARCHAR(64) NOT NULL,
  quantity INT NOT NULL,
  result VARCHAR(16) NOT NULL,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

如果 event_id 已存在,说明处理过了。

3. 库存扣减成功,但订单状态没更新

现象

  • 库存已经少了
  • 订单还停留在 PENDING

根因

一般是结果回传链路出了问题:

  • 库存服务没发结果消息
  • 结果消息丢失
  • 订单服务消费失败
  • 订单状态更新 SQL 条件写错

排查路径

  1. 查库存事务表,确认是否扣减成功
  2. 查结果消息是否已投递
  3. 查订单结果消费日志
  4. 查订单状态更新记录
  5. 看是否存在死信队列积压

补救思路

  • 做对账任务:库存成功但订单未确认的记录自动修复
  • PENDING 超时订单定期巡检
  • 允许人工补单/回滚的运营入口

4. 超卖问题

这类问题特别常见,尤其在秒杀场景。

错误写法

先查库存,再更新库存:

SELECT available FROM inventory WHERE sku_id = 'sku-1001';
UPDATE inventory SET available = available - 1 WHERE sku_id = 'sku-1001';

高并发下两个线程可能同时读到同一个库存值。

更稳妥的写法

用条件更新保证原子扣减:

UPDATE inventory
SET available = available - 1
WHERE sku_id = 'sku-1001'
  AND available >= 1;

然后检查受影响行数:

  • 1:扣减成功
  • 0:库存不足

这比“先查再扣”稳得多。

5. 消息顺序依赖导致状态错乱

比如你有:

  • 订单创建消息
  • 订单取消消息

如果取消消息先到,创建消息后到,消费者就可能把一个已取消订单又改回处理中。

建议

  • 尽量避免强依赖全局顺序
  • 使用状态机约束,非法状态转换直接拒绝
  • 同一业务实体尽量路由到同一分区/队列

例如:

  • PENDING -> CONFIRMED 合法
  • CANCELLED -> CONFIRMED 不合法,应拒绝

6. 把“重试”当成“补偿”

重试和补偿不是一回事。

  • 重试:同一动作再执行一次,希望成功
  • 补偿:执行反向动作,把系统拉回可接受状态

比如库存扣减成功但订单创建失败:

  • 重试订单创建:叫重试
  • 把库存加回去:叫补偿

很多系统的问题在于,什么都靠重试,最后重试把错误放大了。


安全/性能最佳实践

最终一致性不是只讲业务逻辑,安全和性能也必须一起考虑。

1. 消息体不要裸奔,做签名或最少字段校验

如果 MQ 是跨网络、跨集群,建议至少做到:

  • 消息体字段白名单校验
  • 关键字段非空校验
  • 业务签名或 HMAC 校验
  • 敏感信息脱敏,不要把用户隐私直接塞进消息

一个简单的消息结构可以像这样:

{
  "event_id": "a1b2c3",
  "event_type": "ORDER_CREATED",
  "timestamp": 1710000000,
  "order_id": "order-1001",
  "sku_id": "sku-1001",
  "quantity": 2,
  "sign": "xxxxxx"
}

2. 队列长度、消费延迟、死信数必须监控

我比较建议最少监控这几类指标:

  • Outbox 待发送数量
  • MQ topic/queue 积压量
  • 消费成功率、失败率、重试次数
  • 死信队列数量
  • 订单 PENDING 超时数量
  • 库存扣减成功但订单未确认数量

如果这些指标你看不到,系统出问题时基本只能靠猜。

3. 幂等表要控制膨胀

幂等记录如果一直不清理,表会越来越大,影响查询性能。

建议:

  • 按天/按月分表
  • event_id 建唯一索引
  • 设置归档策略
  • 保留期根据业务回放窗口决定,比如 7 天、30 天、90 天

4. 批量拉取 Outbox,但不要无限扫表

Outbox 调度器常见优化方式:

  • 按状态索引拉取 NEW 事件
  • 每次限制批量条数
  • 多实例并发时用分片或“抢占式更新”避免重复发送
  • 已发送记录定期归档

例如:

SELECT event_id, topic, payload
FROM outbox_events
WHERE status = 'NEW'
ORDER BY created_at
LIMIT 100;

一定要配索引,否则数据量一大,扫表会拖垮数据库。

5. 对账机制不是可选项

只要是最终一致性,就要接受一个现实:理论上总会有极少数记录因为各种边界故障没及时收敛

所以必须有对账任务,例如:

  • 订单 PENDING 超过 10 分钟,检查库存事务结果
  • 库存已扣减但订单未确认,自动补状态
  • 订单已取消但库存未释放,自动发起释放流程

这其实是“系统自愈能力”的一部分,不只是运维脚本。

6. 容量估算别忽略峰值

在架构设计里,我建议至少算三件事:

订单峰值消息量

如果大促峰值是 5000 单/s,且每单至少产生两条消息:

  • 订单创建事件
  • 库存处理结果事件

那么基础消息吞吐至少是:

5000 * 2 = 10000 msg/s

如果加上重试、取消、回滚、补偿消息,实际预留最好乘以 2~3 倍。

库存热点 SKU 压力

热点商品会导致单个 SKU 成为写热点。你需要考虑:

  • 分片库存
  • 预扣库存
  • Redis 原子扣减 + DB 异步落账
  • 限流与排队

PENDING 中间态容量

如果库存服务平均处理延迟是 200ms,峰值 5000 单/s,那么理论上的在途订单量约为:

5000 * 0.2 = 1000

如果发生消息堆积,PENDING 会迅速放大,所以订单表和查询接口都要考虑中间态压力。


一个更稳的落地建议

如果你准备在真实项目里做,我建议按下面的最小闭环来落地:

  1. 订单服务

    • 本地事务写订单 + Outbox
    • 订单初始状态 PENDING
  2. 消息投递层

    • Outbox 扫描补发
    • 失败重试
    • 超阈值告警
  3. 库存服务

    • 原子扣减库存
    • 幂等表防重复
    • 处理结果写本地事务
  4. 结果回流

    • 库存结果事件回传订单服务
    • 订单状态机控制合法迁移
  5. 兜底机制

    • 死信队列
    • 对账任务
    • 人工补偿入口

这五步做全了,系统才算真正具备“最终一致性”的工程化能力。只做前两步,基本只是“用了 MQ”;做完整套,才算“设计了可靠异步架构”。


总结

基于消息队列实现订单与库存的最终一致性,本质上是在做三件事:

  1. 把跨服务事务拆成可恢复的本地事务
  2. 用消息驱动状态收敛,而不是强求一步到位
  3. 用幂等、补偿、对账去对抗分布式环境的不确定性

如果你只记住几个最重要的结论,我建议记这几条:

  • 不要直接“写库后发消息”,优先用 Outbox
  • 消费端一定要 幂等
  • 库存扣减要用 原子条件更新
  • 订单状态必须设计成 可恢复状态机
  • 对账和补偿 不是锦上添花,而是标配
  • 最终一致性适合高并发业务,但前提是你能接受短暂中间态

最后说一句很实在的话:
最终一致性不是降低要求,而是换一种更适合分布式系统的达成方式。

它不像本地事务那样“看上去简单”,但只要设计得当,面对真实世界的高并发、抖动、失败和重试,它反而更可靠、更有韧性。


分享到:

上一篇
《Web逆向实战:中级开发者如何定位并复现前端签名参数生成逻辑》
下一篇
《分布式架构中基于一致性哈希与服务注册发现的灰度发布实战设计与落地》