跳转到内容
123xiao | 无名键客

《分布式架构下的幂等性设计实战:从接口重试到消息消费防重的完整方案》

字数: 0 阅读时长: 1 分钟

背景与问题

只要系统一上分布式,幂等性几乎就躲不过去。

我见过很多线上问题,表面上看是“请求重复了”,实际上背后可能是:

  • 网关超时,客户端自动重试
  • MQ 消费者处理成功了,但 ack 丢了,导致消息再次投递
  • 上游服务降级后恢复,补偿任务把同一笔业务又推了一遍
  • 定时任务没有加锁,多实例同时执行
  • 用户手抖连点两次“提交订单”

这些问题的共同点是:同一个业务动作,可能被执行多次
而我们真正想要的是:执行一次和执行多次,结果保持一致

这就是幂等性。

很多人第一次做幂等时,会直接在接口里加个“去重表”,能跑,但很快会遇到更多问题:

  • 幂等键怎么设计?
  • 只防接口重复,消息重复怎么办?
  • 数据库唯一索引、Redis、状态机,分别适合什么场景?
  • 幂等是“绝对不重复执行”,还是“重复执行但结果一致”?
  • 高并发下,如何避免“查了没有,再插入时报重复”这种竞态?

这篇文章我会从两个最常见、也最容易出事故的场景展开:

  1. 同步接口重试的幂等控制
  2. 异步消息消费的防重设计

目标不是讲概念,而是给出一套能落地、能运行、能排查的完整方案。


先统一一个认知:幂等不等于“只能执行一次”

这是幂等设计里最容易混淆的地方。

  • 唯一执行一次:更接近“去重”或“精确一次”,实现成本高
  • 幂等:允许收到多次请求,但最终结果不变

举个例子:

  • 创建订单:如果重复提交,应该只生成一个订单
  • 扣减积分:同一业务请求不能重复扣
  • 更新用户昵称:同样的修改执行多次,结果天然一致

所以幂等设计要先分类型:

场景是否天然幂等常见做法
覆盖更新,如 set status=PAID状态校验
累加操作,如余额扣减、库存扣减幂等键 + 唯一约束 + 事务
创建资源,如订单、支付单业务唯一键
MQ 消费落库消息去重表 / 消费记录表

一句话总结:
天然幂等的,尽量用状态机保证;非天然幂等的,用唯一标识把“同一次业务操作”钉住。


核心原理

幂等设计真正落地,通常离不开下面三个核心点:

  1. 唯一标识同一个业务请求
  2. 将“处理过”持久化下来
  3. 在并发下保证判断与执行的一致性

1. 幂等键:先回答“什么叫同一件事”

幂等的起点不是 Redis,也不是数据库,而是幂等键设计

常见的幂等键来源:

  • 客户端生成的 requestId
  • 业务唯一号,如 orderNopaymentNo
  • 消息中间件的 messageId
  • 业务组合键,如 userId + couponId + activityId

设计原则:

  • 稳定:重试时必须相同
  • 唯一:不同业务动作不能冲突
  • 可追踪:日志、链路里能查到

比如创建支付单时,幂等键可以是:

idem:createPay:user123:bizOrder789

而不是随机 UUID。
随机 UUID 适合“每次请求唯一”,不适合“同一业务重试仍然相同”。


2. 幂等状态持久化:不能只靠内存判断

单机服务里你可以放内存 Map,但分布式环境不行。
服务重启、实例扩容、流量切换后,内存就失效了。

常见持久化手段:

  • 数据库唯一索引
  • Redis SetNX / Lua
  • 去重表 / 消费记录表
  • 状态机表

它们各自的定位并不一样:

方案优点缺点适用场景
数据库唯一索引强一致、简单可靠吞吐受限下单、支付、账户类核心业务
Redis SetNX高性能、低延迟需要处理过期与一致性接口防抖、短期幂等控制
去重表可追踪、好排查表膨胀、需归档MQ 消费、异步任务
状态机业务语义清晰设计成本较高订单、支付、审批流

3. 并发下的关键:判断和执行业务不能脱节

经典错误写法:

  1. 先查有没有处理过
  2. 没有就执行业务
  3. 再写入处理记录

高并发下,两个请求可能同时查到“没有处理过”,然后都执行成功。
这就是典型竞态。

正确思路通常是:

  • 先抢占处理资格,再执行业务
  • 或者 依赖数据库唯一约束,让第二个请求直接失败
  • 或者 把幂等记录写入和业务更新放进一个事务里

方案对比与取舍分析

如果只给一个“万能方案”,大概率会误导。
幂等设计必须看业务代价、吞吐和一致性要求。

方案一:数据库唯一约束型

适合:

  • 订单创建
  • 支付落单
  • 账户流水
  • 优惠券领取

核心思想是:
把“同一业务动作只能成功一次”交给数据库保证。

优点:

  • 最稳
  • 容易审计
  • 对金融类场景友好

缺点:

  • 高并发热点下 DB 压力大
  • 需要良好的索引设计

方案二:Redis 抢占型

适合:

  • 接口重试控制
  • 防止用户连点
  • 对一致性要求没那么极致,但要求高吞吐

核心思想是:

  • 请求来了先 SET NX
  • 抢到锁/标记的人执行
  • 其他请求直接返回处理中或历史结果

优点:

  • 减少数据库压力

缺点:

  • 需要处理 TTL
  • Redis 成功但 DB 失败时,需要补偿
  • 不能替代最终一致性存储

方案三:消息消费去重表型

适合:

  • MQ 至少一次投递模型
  • 异步补偿任务
  • 批处理任务

核心思想是:

  • 每条消息都带 messageId 或业务唯一号
  • 消费前先插入“消费记录”
  • 插入成功才真正处理
  • 已存在则直接跳过

这是很多团队最实用的一套方案,因为好解释、好排查。


方案四:状态机型

适合:

  • 订单状态流转
  • 支付状态变更
  • 审批流程

例如订单只能:

INIT -> PAID -> SHIPPED -> FINISHED

重复收到“支付成功”通知时,只要当前状态已经是 PAID 或之后,就直接认为本次操作幂等成功。

这种方式的好处是,幂等和业务语义天然融合,不会只停留在技术层“做个去重表”。


整体架构建议

在真实系统里,我更推荐“分层防线”,而不是押宝一个点。

flowchart TD
    A[客户端请求] --> B[网关/应用层幂等键校验]
    B --> C[Redis短期防重]
    C --> D[服务层业务校验]
    D --> E[数据库唯一约束/状态机事务]
    E --> F[写出事件或消息]
    F --> G[MQ消费者]
    G --> H[消费记录表防重]
    H --> I[下游业务处理]

这张图的核心意思是:

  • 入口拦一层:减少重复流量
  • 核心落库再拦一层:保证最终正确
  • 异步消费再拦一层:避免消息重复副作用

实战代码(可运行)

下面用一个简化但可运行的 Python 示例,演示两类核心场景:

  1. 接口创建订单的幂等控制
  2. 消息消费防重

为了方便直接运行,我用 Flask + SQLite 演示。
SQLite 虽然不是生产级分布式数据库,但足够说明思路。


一、接口幂等:创建订单

设计思路

  • 客户端传 Idempotency-Key
  • 服务端把这个 key 和订单结果绑定
  • 数据库里对 idempotency_key 建唯一索引
  • 重复请求直接返回第一次结果

数据结构

classDiagram
    class orders {
      +id INTEGER
      +user_id TEXT
      +biz_order_no TEXT
      +amount INTEGER
      +status TEXT
      +idempotency_key TEXT
      +created_at TEXT
    }

    class message_consume_log {
      +id INTEGER
      +message_id TEXT
      +consumer_group TEXT
      +status TEXT
      +created_at TEXT
    }

可运行代码

from flask import Flask, request, jsonify
import sqlite3
from datetime import datetime
import uuid

app = Flask(__name__)
DB_FILE = "demo_idempotent.db"


def get_conn():
    conn = sqlite3.connect(DB_FILE)
    conn.row_factory = sqlite3.Row
    return conn


def init_db():
    conn = get_conn()
    cur = conn.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS orders (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        user_id TEXT NOT NULL,
        biz_order_no TEXT NOT NULL,
        amount INTEGER NOT NULL,
        status TEXT NOT NULL,
        idempotency_key TEXT NOT NULL UNIQUE,
        created_at TEXT NOT NULL
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS message_consume_log (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        message_id TEXT NOT NULL,
        consumer_group TEXT NOT NULL,
        status TEXT NOT NULL,
        created_at TEXT NOT NULL,
        UNIQUE(message_id, consumer_group)
    )
    """)

    conn.commit()
    conn.close()


@app.route("/orders", methods=["POST"])
def create_order():
    idem_key = request.headers.get("Idempotency-Key")
    if not idem_key:
        return jsonify({"error": "missing Idempotency-Key"}), 400

    body = request.get_json() or {}
    user_id = body.get("user_id")
    biz_order_no = body.get("biz_order_no")
    amount = body.get("amount")

    if not user_id or not biz_order_no or amount is None:
        return jsonify({"error": "invalid params"}), 400

    conn = get_conn()
    cur = conn.cursor()

    try:
        cur.execute("""
            INSERT INTO orders(user_id, biz_order_no, amount, status, idempotency_key, created_at)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (
            user_id,
            biz_order_no,
            amount,
            "CREATED",
            idem_key,
            datetime.utcnow().isoformat()
        ))
        conn.commit()

        order_id = cur.lastrowid
        return jsonify({
            "code": 0,
            "message": "created",
            "data": {
                "order_id": order_id,
                "biz_order_no": biz_order_no,
                "status": "CREATED",
                "idempotency_key": idem_key
            }
        }), 201

    except sqlite3.IntegrityError:
        # 唯一索引冲突,说明是重复请求,直接返回第一次的结果
        cur.execute("""
            SELECT id, user_id, biz_order_no, amount, status, idempotency_key, created_at
            FROM orders
            WHERE idempotency_key = ?
        """, (idem_key,))
        row = cur.fetchone()

        if row:
            return jsonify({
                "code": 0,
                "message": "idempotent replay",
                "data": {
                    "order_id": row["id"],
                    "biz_order_no": row["biz_order_no"],
                    "status": row["status"],
                    "idempotency_key": row["idempotency_key"]
                }
            }), 200
        else:
            return jsonify({"error": "idempotent conflict but order not found"}), 500
    finally:
        conn.close()


def consume_message(message_id, consumer_group, biz_order_no):
    conn = get_conn()
    cur = conn.cursor()
    try:
        # 先写消费记录,抢占处理资格
        cur.execute("""
            INSERT INTO message_consume_log(message_id, consumer_group, status, created_at)
            VALUES (?, ?, ?, ?)
        """, (
            message_id,
            consumer_group,
            "CONSUMED",
            datetime.utcnow().isoformat()
        ))
        conn.commit()

        # 模拟业务处理
        print(f"[BUSINESS] process message={message_id}, order={biz_order_no}")
        return {"code": 0, "message": "processed"}

    except sqlite3.IntegrityError:
        # 重复消费,直接跳过
        return {"code": 0, "message": "duplicate message ignored"}
    finally:
        conn.close()


@app.route("/simulate-consume", methods=["POST"])
def simulate_consume():
    body = request.get_json() or {}
    message_id = body.get("message_id") or str(uuid.uuid4())
    consumer_group = body.get("consumer_group") or "order-service"
    biz_order_no = body.get("biz_order_no") or "BIZ-DEMO-001"

    result = consume_message(message_id, consumer_group, biz_order_no)
    return jsonify(result)


if __name__ == "__main__":
    init_db()
    app.run(debug=True, port=5000)

二、运行方式

安装依赖:

pip install flask

启动服务:

python app.py

三、验证接口幂等

第一次请求:

curl -X POST http://127.0.0.1:5000/orders \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: order-create-u1001-biz9001" \
  -d '{
    "user_id": "u1001",
    "biz_order_no": "biz9001",
    "amount": 199
  }'

再次发送同样请求:

curl -X POST http://127.0.0.1:5000/orders \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: order-create-u1001-biz9001" \
  -d '{
    "user_id": "u1001",
    "biz_order_no": "biz9001",
    "amount": 199
  }'

第二次不会重复创建,而是返回第一次结果。


四、验证消息消费防重

第一次消费:

curl -X POST http://127.0.0.1:5000/simulate-consume \
  -H "Content-Type: application/json" \
  -d '{
    "message_id": "msg-10001",
    "consumer_group": "order-service",
    "biz_order_no": "biz9001"
  }'

重复消费同一消息:

curl -X POST http://127.0.0.1:5000/simulate-consume \
  -H "Content-Type: application/json" \
  -d '{
    "message_id": "msg-10001",
    "consumer_group": "order-service",
    "biz_order_no": "biz9001"
  }'

第二次会被识别为重复消息并忽略。


消息消费的正确时序

很多 MQ 重复消费问题,不是“不知道要防重”,而是防重时序放错了

sequenceDiagram
    participant MQ as 消息队列
    participant C as Consumer
    participant DB as 去重表/业务库

    MQ->>C: 投递消息(messageId=msg-1)
    C->>DB: 插入消费记录(msg-1, groupA)
    alt 插入成功
        C->>DB: 执行业务更新
        C-->>MQ: ack
    else 已存在
        C-->>MQ: ack并跳过
    end

这里有一个关键问题:
消费记录插入成功后,业务处理失败怎么办?

这是很多实现里最危险的点。

更稳妥的方式

把“写消费记录”和“业务更新”放到同一个本地事务里。
至少要满足:

  • 两者都成功
  • 或两者都失败回滚

伪代码如下:

def consume_with_transaction(conn, message_id, consumer_group, order_no):
    cur = conn.cursor()
    try:
        conn.execute("BEGIN")

        cur.execute("""
            INSERT INTO message_consume_log(message_id, consumer_group, status, created_at)
            VALUES (?, ?, ?, ?)
        """, (message_id, consumer_group, "CONSUMED", datetime.utcnow().isoformat()))

        cur.execute("""
            UPDATE orders
            SET status = 'PAID'
            WHERE biz_order_no = ? AND status = 'CREATED'
        """, (order_no,))

        conn.commit()
        return True
    except Exception:
        conn.rollback()
        raise

如果你的业务跨多个资源,比如“既要改 DB,又要调外部支付系统”,那就不是简单本地事务能解决的了,这时候要结合:

  • 本地消息表
  • Outbox Pattern
  • Saga / 补偿事务

常见坑与排查

这部分我想讲得实战一点,因为很多幂等问题不是设计不会,而是线上一出事不知道怎么查。

坑一:幂等键设计错了,导致“该重复的不重复,不该重复的重复了”

典型错误

  • 每次请求都生成新的 UUID,当作幂等键
  • 使用用户 ID 作为幂等键,导致用户所有请求互相冲突
  • 同一个业务动作在不同服务里幂等键语义不一致

排查方法

重点查这几个字段:

SELECT id, biz_order_no, idempotency_key, created_at
FROM orders
ORDER BY id DESC;

看现象:

  • 同一业务单号对应多个幂等键:说明上游重试时键变了
  • 多个不同业务单号对应同一幂等键:说明键设计冲突了

建议

幂等键优先使用:

业务动作类型 + 业务主键

例如:

create-order:biz9001
pay-order:biz9001

而不是裸 biz9001,否则不同动作可能串线。


坑二:接口返回超时,但业务其实成功了

这是线上特别常见的一种“假失败”。

流程大致是:

  1. 服务已经落库成功
  2. 结果返回时网络超时
  3. 客户端以为失败,于是重试
  4. 如果没有幂等,就创建第二笔

排查方法

查应用日志与数据库时间线:

  • 请求进入时间
  • DB 提交时间
  • HTTP 返回时间
  • 客户端重试时间

如果 DB 已经有记录,而客户端说失败,十有八九就是“成功但响应丢了”。

建议

  • 重试前先按幂等键查询历史结果
  • 对外接口尽量支持“按请求号查结果”

坑三:Redis 做了防重,但数据库还是插了两条

这个坑我以前真踩过。

原因通常是:

  • Redis 只做了短期锁
  • 锁过期后重试又进来了
  • 最终数据库层没有唯一约束兜底

结论

Redis 不是最终事实来源。

Redis 更适合:

  • 削峰
  • 限制短时间重复点击
  • 降低 DB 压力

但核心业务的最终幂等,还是要靠:

  • DB 唯一索引
  • 事务
  • 状态机

坑四:消息去重表先插成功,业务失败,导致“假消费成功”

这是 MQ 场景最危险的一类。

症状

  • 去重表里看到消息已消费
  • 业务表没有更新
  • MQ 也不再重投了

原因

消费记录和业务操作不在一个事务里。
或者先 ack,再执行业务。

排查路径

先查消费记录:

SELECT *
FROM message_consume_log
WHERE message_id = 'msg-10001' AND consumer_group = 'order-service';

再查业务记录:

SELECT *
FROM orders
WHERE biz_order_no = 'biz9001';

如果前者有,后者没有,就说明消费时序出问题了。


坑五:状态机没收口,重复回调把状态“回退”了

比如支付回调:

  • 第一次通知把订单改成 PAID
  • 第二次脏数据回调又把状态改回 PROCESSING

这不是简单的重复执行,而是非法状态跃迁

建议

所有关键状态变更都要带条件:

UPDATE orders
SET status = 'PAID'
WHERE biz_order_no = ? AND status = 'CREATED';

而不是:

UPDATE orders
SET status = 'PAID'
WHERE biz_order_no = ?;

第一种是状态机思路,第二种容易把脏数据写进去。


安全/性能最佳实践

幂等不只是正确性问题,还关系到性能和可运维性。

1. 幂等键不要直接信任客户端原始输入

如果接口完全由客户端自定义幂等键,可能带来两个问题:

  • 恶意构造超长 key,拖垮存储或日志
  • 使用可预测 key 影响其他请求

建议:

  • 限制长度,比如 64 或 128 字符
  • 只允许白名单字符
  • 服务端拼接业务前缀
  • 敏感场景可结合用户身份一起校验

例如:

idem:{userId}:{bizType}:{bizId}

2. 去重记录要设置生命周期和归档策略

消息去重表如果不清理,几年后一定变成大表。

建议按业务特性设置保留时间:

  • 支付、账务:保留更久,甚至长期归档
  • 普通通知、营销消息:保留 7~30 天

可以按月分表,或者定期归档。

示例 SQL:

DELETE FROM message_consume_log
WHERE created_at < datetime('now', '-30 day');

生产环境里更建议归档,不要直接大批量删除。


3. 热点幂等键要关注锁竞争

比如秒杀场景下,同一个商品或同一个活动键会很热。
如果所有流量都打到同一个 Redis key 或同一条 DB 唯一索引上,性能会抖。

优化思路:

  • 区分“用户级幂等”和“活动级限流”
  • 把幂等和库存控制拆开
  • 热点数据前移到缓存,但数据库保底

4. 幂等结果最好可回放、可查询

最理想的接口表现不是简单返回:

{"error": "duplicate request"}

而是能返回第一次成功时的业务结果。
这样调用方体验更好,也能减少反复查询。

建议在幂等记录中保存:

  • 幂等键
  • 请求摘要
  • 响应结果
  • 处理状态
  • 创建时间

如果结果体较大,可以只保存关键业务主键,再引导调用方查详情。


5. 区分“处理中”和“已完成”

很多系统只记录“有没有处理过”,不记录处理中状态。
于是遇到慢请求时,第二次重试无法判断:

  • 是第一次还在执行?
  • 还是第一次已经失败了?

更实用的状态设计是:

  • PROCESSING
  • SUCCESS
  • FAILED
stateDiagram-v2
    [*] --> PROCESSING
    PROCESSING --> SUCCESS
    PROCESSING --> FAILED
    FAILED --> PROCESSING: 重试
    SUCCESS --> [*]

这样你在网关层、服务层、补偿任务里都更容易处理。


容量估算思路

架构文章里,很多人会讲方案,但不讲量上来以后怎么办。这里给一个简单估算框架。

假设:

  • 每天 1000 万次接口请求
  • 其中 2% 会重试
  • MQ 每天 5000 万条消息
  • 去重记录保留 15 天

那么:

  • 接口幂等记录量:约 1000 万/天
  • 消息去重记录量:约 5000 万/天
  • 15 天保留总量:约 9 亿记录

这时就不能再把所有去重记录都堆在单表里了。一般要考虑:

  • 按天或按月分表
  • 根据消费者组拆分
  • 冷热数据分层
  • 索引控制,只保留必要字段
  • 批量归档而不是直接删除

如果是核心交易系统,建议一开始就把“记录保留周期”和“查询路径”设计好,不然越往后补越痛苦。


落地建议:一套够用的分层方案

如果你现在要从零搭一套幂等能力,我建议这样做:

同步接口

  • 客户端或网关传 Idempotency-Key
  • 应用层做参数校验
  • Redis 做短期防重,可选
  • 数据库唯一索引做最终兜底
  • 重复请求返回第一次结果

异步消息

  • 所有消息带 messageId
  • 消费者按 (messageId, consumerGroup) 做唯一约束
  • 消费记录与业务更新放同一事务
  • 成功后再 ack
  • 去重表定期归档

状态流转类业务

  • 明确状态机
  • 所有更新都带前置状态条件
  • 非法状态跃迁要报警

总结

幂等性设计不是某一个中间件的功能,而是一整套“防重复副作用”的工程化方法。

真正实用的结论,我建议记住这几条:

  1. 先定义“同一件事”是什么,再谈幂等键
  2. Redis 可以提速,但数据库唯一约束才是最终兜底
  3. MQ 防重不能只写去重表,必须考虑事务边界
  4. 状态机比单纯 if 判断更稳,特别适合订单和支付
  5. 重复请求最好返回历史结果,而不是简单报错
  6. 去重记录会膨胀,归档策略要提前设计

如果你的系统是中等复杂度,我会推荐一个相对稳妥的组合:

  • 接口层:Idempotency-Key + Redis 短期防重
  • 核心落库:数据库唯一索引 + 事务
  • 异步链路:消费记录表 + 本地事务
  • 关键业务:状态机收口

最后说个边界条件:
幂等性并不能解决所有重复问题,它解决的是“同一业务操作重复到达”的一致性;
如果你的业务本身没有唯一语义,或者上下游都拿不出稳定标识,那幂等就只能做到“尽量降低风险”,很难做到真正可靠。

所以,做幂等设计时,别只盯着代码,要把业务唯一标识、事务边界、状态流转、重试策略一起看。这样方案才能经得住线上流量和故障场景。


分享到:

上一篇
《Web3 中级实战:基于智能合约与钱包登录构建一套可落地的链上会员积分系统》
下一篇
《Spring Boot 中级实战:基于 Spring Cache + Redis 构建高并发场景下的多级缓存与一致性方案》