跳转到内容
123xiao | 无名键客

《分布式架构中一致性与可用性的取舍:基于 Redis、消息队列与幂等设计的高并发订单系统实战》

字数: 0 阅读时长: 1 分钟

背景与问题

做高并发订单系统时,最难的通常不是“把接口写出来”,而是在流量上来之后,系统到底要优先保什么

我第一次接触这类系统时,最容易犯的错就是:既想要强一致、又想要高可用、还想要极致性能。现实是,三者很难同时拉满。尤其在秒杀、促销、抢券、限量库存这类场景里,订单链路横跨:

  • 接入层
  • 库存服务
  • 订单服务
  • 支付服务
  • Redis
  • 消息队列
  • 数据库

一旦链路变长,问题就来了:

  • 用户重复点击,生成重复订单
  • 请求超时,但后端其实已经成功
  • Redis 扣减成功,数据库落库失败
  • 消息发出去了,消费者重复消费
  • 库存“看起来还有”,但实际上已经超卖
  • 某个下游短暂故障,整个下单链路雪崩

所以这篇文章不打算只讲概念,而是从一个高并发订单系统的实战方案出发,拆开看三个关键点:

  1. Redis:扛住瞬时高并发,做热点拦截和快速扣减
  2. 消息队列:削峰填谷,把同步链路改成异步化
  3. 幂等设计:保证“多次请求、重复消息、重试投递”都不会把数据搞乱

这三者组合起来,本质是在回答一个问题:

当分布式系统无法同时完美满足一致性和可用性时,我们如何设计一个“业务可接受”的方案?


方案全景:先看整体链路

先给一个整体架构图,后面所有细节都围绕它展开。

flowchart LR
    U[用户/客户端] --> G[网关/API]
    G --> I[幂等校验层]
    I --> R[Redis 预扣库存]
    R -->|成功| MQ[消息队列]
    R -->|失败| F[返回售罄/重试]
    MQ --> O[订单服务消费者]
    O --> DB[(MySQL 订单库)]
    O --> S[库存服务落库]
    O --> C[补偿/回滚]

这个架构的核心思想很明确:

  • 入口快:先用 Redis 处理热点和快速判定
  • 链路短:前台请求尽量不要同步调用太多服务
  • 异步化:通过消息队列解耦订单创建
  • 最终一致:数据库、库存、订单状态通过异步和补偿达成一致
  • 幂等兜底:任何一步都允许重试,但结果不能重复生效

核心原理

1. 一致性与可用性的取舍,不是口号,是设计约束

在订单系统里,常见的两类方案:

方案 A:强一致优先

典型做法:

  • 下单时同步锁库存
  • 同步创建订单
  • 同步扣余额/冻结支付状态
  • 所有步骤都成功才返回

优点:

  • 用户拿到的结果最“确定”
  • 系统状态相对直观

缺点:

  • 链路很长
  • 任一服务波动都会影响下单成功率
  • 高峰期容易把数据库和下游打满

方案 B:可用性优先,最终一致

典型做法:

  • 入口只做快速校验和预扣减
  • 订单创建异步化
  • 用消息队列削峰
  • 通过补偿、对账、幂等保证最终一致

优点:

  • 峰值吞吐高
  • 系统更抗抖动
  • 单点故障不容易拖垮全链路

缺点:

  • 用户可能看到“处理中”
  • 需要处理消息重复、延迟、补偿、状态回查
  • 研发复杂度更高

对于高并发订单系统,我通常建议优先选后者,但要加一句前提:

业务要接受“短暂不一致”,同时你必须有补偿和对账能力。


2. Redis 为什么适合放在前面

Redis 在这里不是数据库替代品,而是第一道高并发闸门

它适合做三件事:

2.1 库存快速预扣

比如商品库存 1000,先放在 Redis:

stock:sku:1001 = 1000

请求到来时,先在 Redis 原子扣减。好处是:

  • 单机 QPS 高
  • 操作延迟低
  • 能快速挡住无效请求

2.2 限流与重复提交拦截

例如同一个用户在 5 秒内只能提交一次:

order:req:user:2001 = requestId

利用 SET NX EX 就能做轻量拦截。

2.3 热点隔离

秒杀时最贵的资源其实不是 CPU,而是数据库连接和行锁。Redis 能把大量“注定失败”的请求挡在前面,避免数据库成为战场。

但要注意:Redis 预扣并不等于最终成功。真正的订单成功,还得依赖后续消息消费和数据库落库。


3. 消息队列解决了什么

消息队列最重要的价值,不是“异步”这么简单,而是:

  • 削峰:把突发流量摊平
  • 解耦:订单服务与库存、优惠、积分等逻辑解耦
  • 容错:消费者失败可以重试
  • 可扩展:后续增加风控、通知、埋点不改下单主链路

典型流程是:

  1. 接口收到请求
  2. Redis 预扣成功
  3. 投递“创建订单”消息
  4. 消费者异步创建订单并落库
  5. 如果失败,触发补偿:回滚 Redis 预扣库存

这里的关键难点在于:

消息队列天然可能重复投递,所以消费者必须幂等。


4. 幂等设计是这套方案的“保险丝”

很多人把幂等理解成“防止重复点提交”。这只是入口层的一部分。

真正完整的幂等,要覆盖:

  • 接口幂等:用户重复请求不重复创建订单
  • 消息幂等:同一条消息重复消费不重复落库
  • 状态幂等:重复取消、重复支付回调不重复扣减/回滚

常见做法:

4.1 请求幂等

每次提交订单带一个 requestId,服务端保存:

  • 如果第一次处理:写入并执行业务
  • 如果已经处理过:直接返回历史结果

4.2 消息幂等

消费者处理消息前,先检查 messageId 或业务唯一键是否已处理:

  • 已处理:直接 ACK
  • 未处理:执行业务并标记完成

4.3 数据唯一约束

这是最后一道防线。比如数据库中加唯一索引:

  • request_id
  • user_id + sku_id + biz_no(按业务设计)

即使上层漏了,数据库也能防止重复订单落地。


一致性落点:不是“绝对一致”,而是“有边界的最终一致”

很多团队在设计时容易陷入一个误区:只要用了 MQ,就是最终一致。但如果没有补偿,最终一致只是愿望。

一个能真正落地的订单系统,至少要明确三种状态:

  1. 已接受请求,但订单未创建
  2. 订单创建成功
  3. 订单创建失败,库存已回补

可以把它理解成一个状态机:

stateDiagram-v2
    [*] --> REQUEST_ACCEPTED
    REQUEST_ACCEPTED --> ORDER_CREATING: 发送消息成功
    ORDER_CREATING --> ORDER_CREATED: 消费成功
    ORDER_CREATING --> CREATE_FAILED: 消费失败/超限重试
    CREATE_FAILED --> STOCK_ROLLBACK: 回补库存
    STOCK_ROLLBACK --> CLOSED
    ORDER_CREATED --> [*]
    CLOSED --> [*]

这张图想表达的是:

  • 前端“下单成功受理” ≠ 最终订单创建成功
  • 任何异步链路都要定义失败出口
  • 回滚路径必须可观测、可重试

方案对比与取舍分析

1. 纯数据库扣库存

做法

  • select stock
  • 判断库存
  • update stock = stock - 1 where stock > 0

优点

  • 简单
  • 数据一致性更直观

缺点

  • 高并发下数据库压力大
  • 热点商品会产生严重锁竞争
  • 容易成为瓶颈

适合:并发不高、库存热点不明显的业务。


2. Redis 预扣 + MQ 异步落库

做法

  • Redis 原子扣减
  • MQ 异步创建订单
  • 失败后补偿回滚

优点

  • 性能好
  • 扩展性强
  • 抗峰值能力好

缺点

  • 架构复杂
  • 存在短暂不一致
  • 需要幂等、补偿、监控、对账

适合:秒杀、促销、热门商品抢购等高峰流量业务。


3. Redis + Lua + 本地消息表 / 事务消息

这是更稳的一类增强方案。适合对可靠性要求更高的团队。

适用场景

  • 担心“Redis 扣减成功但消息没发出去”
  • 需要更严格的消息一致性保障

代价

  • 系统更复杂
  • 开发和运维成本更高

我的经验是:先别一上来就堆最复杂方案。如果业务刚起步,先把幂等、补偿、监控做扎实,往往比盲目上事务消息更有价值。


容量估算:别只看 QPS,还要看失败洪峰

做订单系统容量评估时,常被忽略的是“异常时的回压”。

举个简化例子:

  • 峰值请求:20,000 QPS
  • Redis 预扣成功率:10%
  • 实际入 MQ:2,000 QPS
  • 订单消费者单实例处理能力:200 QPS
  • 那么至少需要 10 个消费者实例

但这还不够,因为你还要考虑:

  • 消费重试导致的放大
  • 下游数据库抖动时的积压
  • 回补库存任务的峰值
  • 监控和日志本身的 IO 消耗

所以实际建议是:

  • 消费能力至少按峰值的 2~3 倍冗余
  • Redis、MQ、DB 都要测压
  • 用压测验证积压恢复时间,而不只是接口 RT

实战代码(可运行)

下面用 Python 做一个精简版示例,模拟:

  • Redis 预扣库存
  • 请求幂等
  • MQ 入队
  • 消费者幂等落库
  • 失败补偿

为了方便直接运行,这里用内存结构模拟 Redis、MQ 和数据库逻辑。虽然是简化版,但核心流程和实际系统是一致的。

import threading
import queue
import time
import uuid
import random
import sqlite3

class FakeRedis:
    def __init__(self):
        self.data = {}
        self.lock = threading.Lock()

    def setnx_with_expire(self, key, value, expire_seconds):
        with self.lock:
            if key in self.data:
                return False
            self.data[key] = {"value": value, "expire_at": time.time() + expire_seconds}
            return True

    def get(self, key):
        with self.lock:
            item = self.data.get(key)
            if not item:
                return None
            if item["expire_at"] < time.time():
                del self.data[key]
                return None
            return item["value"]

    def decr_stock(self, key):
        with self.lock:
            stock = self.data.get(key, {"value": 0}).get("value", 0)
            if stock <= 0:
                return -1
            stock -= 1
            self.data[key] = {"value": stock, "expire_at": time.time() + 3600}
            return stock

    def incr_stock(self, key):
        with self.lock:
            stock = self.data.get(key, {"value": 0}).get("value", 0)
            stock += 1
            self.data[key] = {"value": stock, "expire_at": time.time() + 3600}
            return stock

    def set_stock(self, key, stock):
        with self.lock:
            self.data[key] = {"value": stock, "expire_at": time.time() + 3600}

class OrderSystem:
    def __init__(self):
        self.redis = FakeRedis()
        self.mq = queue.Queue()
        self.db = sqlite3.connect(":memory:", check_same_thread=False)
        self.db.execute("""
        CREATE TABLE orders (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            request_id TEXT UNIQUE,
            user_id INTEGER,
            sku_id INTEGER,
            status TEXT
        )
        """)
        self.db.execute("""
        CREATE TABLE consumed_messages (
            message_id TEXT PRIMARY KEY,
            consumed_at TEXT
        )
        """)
        self.db.commit()
        self.db_lock = threading.Lock()

    def init_stock(self, sku_id, stock):
        self.redis.set_stock(f"stock:sku:{sku_id}", stock)

    def place_order(self, user_id, sku_id, request_id):
        idem_key = f"order:req:{request_id}"
        ok = self.redis.setnx_with_expire(idem_key, "1", 60)
        if not ok:
            return {"success": False, "msg": "重复请求"}

        stock_key = f"stock:sku:{sku_id}"
        left = self.redis.decr_stock(stock_key)
        if left < 0:
            return {"success": False, "msg": "库存不足"}

        message = {
            "message_id": str(uuid.uuid4()),
            "request_id": request_id,
            "user_id": user_id,
            "sku_id": sku_id
        }
        self.mq.put(message)
        return {"success": True, "msg": "请求已受理", "message": message}

    def consume_order(self, fail_rate=0.2):
        while not self.mq.empty():
            message = self.mq.get()
            message_id = message["message_id"]
            request_id = message["request_id"]
            user_id = message["user_id"]
            sku_id = message["sku_id"]

            with self.db_lock:
                cur = self.db.cursor()
                cur.execute("SELECT 1 FROM consumed_messages WHERE message_id = ?", (message_id,))
                if cur.fetchone():
                    print(f"[consumer] duplicate message ignored: {message_id}")
                    continue

                try:
                    if random.random() < fail_rate:
                        raise Exception("模拟订单落库失败")

                    cur.execute("""
                    INSERT INTO orders (request_id, user_id, sku_id, status)
                    VALUES (?, ?, ?, ?)
                    """, (request_id, user_id, sku_id, "CREATED"))

                    cur.execute("""
                    INSERT INTO consumed_messages (message_id, consumed_at)
                    VALUES (?, datetime('now'))
                    """, (message_id,))
                    self.db.commit()
                    print(f"[consumer] order created: request_id={request_id}")

                except sqlite3.IntegrityError:
                    self.db.commit()
                    print(f"[consumer] idempotent hit: request_id={request_id}")
                except Exception as e:
                    self.db.rollback()
                    self.redis.incr_stock(f"stock:sku:{sku_id}")
                    print(f"[consumer] failed, rollback stock. request_id={request_id}, err={e}")

    def print_orders(self):
        with self.db_lock:
            cur = self.db.cursor()
            cur.execute("SELECT id, request_id, user_id, sku_id, status FROM orders")
            rows = cur.fetchall()
            for row in rows:
                print("[db order]", row)

if __name__ == "__main__":
    system = OrderSystem()
    system.init_stock(1001, 5)

    requests = [
        ("req-1", 1),
        ("req-2", 2),
        ("req-3", 3),
        ("req-4", 4),
        ("req-5", 5),
        ("req-1", 1),  # 重复请求
        ("req-6", 6),  # 可能库存不足
    ]

    for request_id, user_id in requests:
        result = system.place_order(user_id=user_id, sku_id=1001, request_id=request_id)
        print("[place_order]", request_id, result)

    system.consume_order(fail_rate=0.3)
    system.print_orders()

这段代码体现了什么

入口层幂等

ok = self.redis.setnx_with_expire(idem_key, "1", 60)

同一个 request_id 在一定时间窗口内只能受理一次。

Redis 预扣库存

left = self.redis.decr_stock(stock_key)

如果库存不足,直接失败,不进入数据库。

MQ 异步化

self.mq.put(message)

这里用内存队列模拟 MQ,真实场景可以换成 Kafka、RabbitMQ、RocketMQ。

消费者幂等

cur.execute("SELECT 1 FROM consumed_messages WHERE message_id = ?", (message_id,))

同一消息只处理一次。

数据库唯一约束兜底

request_id TEXT UNIQUE

即使消息重复、上层幂等失效,也不会重复建单。

失败补偿

self.redis.incr_stock(f"stock:sku:{sku_id}")

订单落库失败时,把 Redis 预扣库存回补。


订单时序:把关键竞态看清楚

很多 bug 其实不是代码写错,而是时序没想清楚。下面这张时序图能帮助你定位关键点。

sequenceDiagram
    participant C as Client
    participant A as API
    participant R as Redis
    participant Q as MQ
    participant O as OrderConsumer
    participant D as DB

    C->>A: 提交订单(requestId)
    A->>R: 幂等校验 SET NX
    R-->>A: OK
    A->>R: 预扣库存 DECR
    R-->>A: 成功
    A->>Q: 发送创建订单消息
    Q-->>A: 已接收
    A-->>C: 请求已受理

    Q->>O: 投递消息
    O->>D: 插入订单(requestId唯一)
    D-->>O: 成功
    O-->>Q: ACK

    Note over O,D: 若落库失败,则回补Redis库存并记录告警

看这个链路时,你要特别盯住两个风险点:

  1. Redis 扣成功,MQ 发送失败
  2. MQ 消费到了,DB 落库失败

这两个点都需要补偿策略,否则库存和订单就会不一致。


常见坑与排查

1. Redis 扣减成功,但消息没发出去

这是很典型的“中间态丢失”问题。

现象

  • Redis 库存减少了
  • 数据库没有订单
  • MQ 里也没有消息
  • 用户侧看到“处理中”但一直没有结果

原因

  • 网络抖动
  • MQ broker 短暂不可用
  • 发送端没有重试或没有确认机制

排查方法

  • 查请求日志:是否执行到发送 MQ
  • 查消息发送确认日志:是否拿到 broker ack
  • 查 Redis 库存变更日志:是否发生预扣
  • 对账:预扣量与订单量是否匹配

建议方案

  • 本地消息表事务消息
  • 至少保证发送失败时有明确回滚或补偿
  • 给“长时间处理中”的订单做定时扫描

2. 消息重复消费,生成重复订单

现象

  • 一个请求生成多个订单
  • 库存被多次扣减
  • 用户投诉“我只点了一次”

常见原因

  • 消费者超时未 ACK,MQ 重新投递
  • 消费者重启,未提交位点
  • 没有业务幂等设计

排查方法

  • 看消息 message_id 是否重复
  • 查订单表中相同 request_id 是否有多条
  • 看消费者异常和重平衡日志

解决方法

  • 消费端做幂等检查
  • 订单表加唯一约束
  • 不要依赖“MQ 不会重复投递”这种假设

3. 库存回补了,但用户已经支付

这是我见过比较麻烦的一类问题,通常发生在订单状态流转复杂时。

现象

  • 消费创建订单失败,系统回补库存
  • 后续支付回调又到了
  • 系统出现“已支付但无有效订单”或“库存重复售卖”

本质原因

订单、库存、支付不是一个本地事务,必须通过状态机控制。

建议

  • 支付前订单必须存在且状态合法
  • 支付回调也要幂等
  • 对异常支付建立人工或自动补单流程
  • 不要让“库存回补逻辑”和“订单关闭逻辑”脱节

4. Redis 和数据库库存长期不一致

现象

  • Redis 显示没货,但数据库还有库存
  • 或 Redis 还有库存,数据库已经卖完

常见原因

  • 回补失败
  • 手工改库后未同步 Redis
  • 缓存重建策略不完整
  • 多个服务都在改库存,口径不统一

排查建议

  • 先明确“谁是最终库存源”
  • 做日常库存对账任务
  • 所有库存变更都必须留审计日志
  • 不允许多个系统绕开统一库存服务直接改数据

安全/性能最佳实践

1. 不要把 Redis 当最终真相

Redis 适合做高性能前置层,但最终订单结果和账务结果仍应落数据库

建议明确:

  • Redis:高并发拦截、预扣、限流、临时态
  • MySQL:订单事实、状态流转、审计依据

如果把 Redis 当最终来源,后面做对账、审计、补单会非常痛苦。


2. Lua 脚本保证 Redis 侧原子性

如果你的逻辑不是简单的 DECR,比如还要同时判断用户是否重复参与、校验活动状态,那就尽量合并到 Lua 脚本里。

示例:

local stockKey = KEYS[1]
local userKey = KEYS[2]
local userId = ARGV[1]

if redis.call("SISMEMBER", userKey, userId) == 1 then
    return -2
end

local stock = tonumber(redis.call("GET", stockKey) or "0")
if stock <= 0 then
    return -1
end

redis.call("DECR", stockKey)
redis.call("SADD", userKey, userId)
return 1

这样可以避免:

  • 先查再扣导致竞态
  • 重复用户绕过校验
  • 多步骤操作不原子

3. 消费者要“快失败、可重试、可观测”

消费者千万不要写成一个黑盒。至少要做到:

  • 失败原因可区分:网络、数据库、参数、业务冲突
  • 重试次数有限制
  • 超过阈值进入死信队列
  • 关键字段可追踪:requestIdmessageIduserIdskuId

一个很实用的建议是:

所有日志都带统一链路 ID,不然线上排查会非常痛苦。


4. 限流、熔断、降级要提前设计

高并发订单系统不是“代码没报错”就算成功,系统要能在压力过大时优雅退化。

建议最少有这些策略:

  • 网关限流:防止恶意刷单
  • 用户维度限频:防止重复点击和脚本刷接口
  • 商品维度限流:热点 SKU 保护
  • 下游熔断:支付或库存服务抖动时及时止损
  • 降级返回:返回“排队中”而不是把线程全部打满

5. 数据库层要有最终兜底

无论上层设计多漂亮,数据库层都要留最后一道防线:

  • 唯一索引防重复订单
  • 状态机字段防非法流转
  • 必要时使用乐观锁版本号
  • 所有关键更新写成条件更新

例如:

UPDATE orders
SET status = 'PAID'
WHERE order_id = 10001 AND status = 'CREATED';

而不是无条件更新。这样重复支付回调也不会把状态搞乱。


6. 对账系统不是可选项

只要你接受最终一致,就一定要有对账。

建议至少做两类对账:

  • 订单-库存对账:预扣数、下单数、回补数是否平衡
  • 订单-支付对账:订单状态和支付流水是否一致

很多线上“诡异问题”,最终不是靠接口修好的,而是靠对账把脏数据捞出来的。


一个更稳的落地建议

如果你现在要做一个中型电商或活动订单系统,我会建议分阶段建设,而不是一次做满。

第一阶段:先把最小闭环跑通

  • Redis 预扣库存
  • MQ 异步建单
  • 请求幂等
  • 消费幂等
  • 数据库唯一索引
  • 失败回补库存

这套已经能解决绝大多数高并发基础问题。

第二阶段:补齐可靠性

  • 本地消息表或事务消息
  • 死信队列
  • 延迟消息取消未支付订单
  • 定时对账和补偿任务
  • 统一链路日志和监控告警

第三阶段:做精细化治理

  • 热点商品隔离集群
  • 动态限流
  • 用户画像风控
  • 机房容灾和多活策略
  • 更细粒度的库存分段与预热

不要小看“分阶段”这件事。很多系统不是死于方案不先进,而是死于第一版就太复杂,结果没法稳定落地。


总结

高并发订单系统里,一致性与可用性的取舍,本质上不是选边站,而是要回答这三个问题:

  1. 哪些地方必须强一致?

    • 例如订单唯一性、支付结果落账、状态合法流转
  2. 哪些地方可以接受短暂不一致?

    • 例如用户先看到“处理中”,订单稍后创建成功
  3. 如果出错,怎么补回来?

    • 回补库存、重试消息、死信处理、人工补单、对账修复

落到工程实践上,一个务实可行的方案通常是:

  • Redis 扛住前端高并发和热点库存
  • 消息队列 削峰解耦,把同步链路改成异步
  • 幂等设计 兜住重复请求、重复消费和重试
  • 补偿与对账 兑现最终一致

最后给几个可执行建议,适合直接带回去做系统设计评审:

  • 不要追求“全链路绝对强一致”,先定义业务可接受边界
  • 入口层一定做请求幂等,消费端一定做消息幂等
  • 数据库唯一索引必须有,它不是可选项
  • Redis 预扣后,一定要有失败补偿路径
  • 没有监控、告警、对账的最终一致,基本等于没有一致性

如果你的业务是秒杀、抢购、促销活动,这套思路非常适合;但如果是银行转账、核心账务,就不能简单照搬,因为那类场景对强一致要求更高,设计边界完全不同。

真正成熟的分布式系统,不是没有问题,而是问题出现时,系统知道如何有序地失败,并且能恢复回来。这也是我理解的“实战可用”的架构。


分享到:

上一篇
《Node.js 中基于 BullMQ 与 Redis 构建高可靠异步任务队列的实战指南》
下一篇
《集群架构实战:基于 Kubernetes 的高可用控制面与工作节点故障自愈设计》