跳转到内容
123xiao | 无名键客

《分布式架构下的幂等性设计与落地:从消息消费到接口重试的实战指南》

字数: 0 阅读时长: 1 分钟

背景与问题

只要系统一旦进入分布式,重复请求、重复投递、重复执行几乎就不是“会不会发生”的问题,而是“什么时候发生”。

最常见的几个场景:

  • 消息队列消费者处理成功了,但 ACK 丢了,消息被重复投递
  • 用户点击两次“提交订单”,网关层重试了两次
  • 下游接口超时,调用方发起重试,但第一次其实已经成功
  • 定时任务因为主备切换,被重复调度
  • 服务重启后,某些中间状态未及时持久化,导致补偿逻辑重复执行

这些问题背后,本质上都指向同一个关键词:幂等性

很多团队对幂等性的理解停留在“加个唯一索引”或者“查一下数据库有没有处理过”。这不算错,但远远不够。因为在真实系统里,幂等性不是一个点状技巧,而是一整套设计:业务语义定义、唯一标识生成、状态机约束、存储介质选择、并发控制、失败恢复和监控排查

我自己踩过一个挺典型的坑:消费者收到支付成功消息后更新订单状态,同时给用户发券。订单更新靠数据库状态控制住了,但发券接口没做幂等,结果重复消费时用户券包里多了一张券。也就是说,局部幂等不等于链路幂等

所以这篇文章不只讲概念,而是从两个最常见场景入手:

  1. 消息消费幂等
  2. 接口重试幂等

并把它们串成一套可落地的架构实践。


核心原理

什么是幂等性

幂等性的定义很简单:

对同一个操作执行一次和执行多次,产生的业务结果应当一致。

这里有两个关键词容易被忽略:

  • 同一个操作:必须能识别“这是同一次业务意图”
  • 业务结果一致:不是要求每次技术返回值完全一样,而是最终业务状态一致

比如:

  • “设置订单状态为已支付”通常可以做成幂等
  • “余额增加 100 元”天然不是幂等,除非你把它改写成“处理交易号 T123 对账户入账一次”

所以幂等不是“代码风格”,而是业务语义改写能力

幂等性的三层实现思路

我通常把幂等设计拆成三层:

1. 业务标识层:谁代表“同一次操作”

常见标识:

  • 客户端生成请求号 requestId
  • 支付流水号 paymentId
  • 订单号 orderNo
  • 消息唯一 ID messageId
  • 业务去重键,如 userId + couponBatchId + sourceBizId

如果这一层没定义清楚,后面技术手段都会变成“猜测重复”。

2. 状态控制层:重复执行时如何保持结果一致

常用方式:

  • 唯一索引
  • 去重表
  • 乐观锁 / 状态机流转
  • Redis SETNX
  • 条件更新 update ... where status = 'INIT'
  • Outbox / Inbox 模式

3. 结果返回层:已经处理过时怎么响应

这层经常被忽略。比如接口重试时,第二次请求命中幂等记录后:

  • 直接返回“已处理”
  • 返回第一次处理结果
  • 返回处理中状态,让调用方稍后查询

如果不定义清楚,前端和调用方会很难接入。


为什么“至少一次投递”一定要求幂等

绝大多数消息队列在工程实践里,可靠消费往往更容易做到的是 At-Least-Once(至少一次),而不是 Exactly-Once(精确一次)。

原因很现实:

  • 消费成功但 ACK 失败
  • Broker 重试投递
  • 消费端处理超时被认为失败
  • 网络分区导致状态不一致

所以从架构角度,不要把“消息只会来一次”当作前提条件。正确思路是:

  • 接受消息可能重复
  • 通过消费端幂等让重复消费无害化

下面这张图可以概括这个链路。

flowchart TD
    A[生产者发送消息] --> B[MQ 持久化]
    B --> C[消费者拉取消息]
    C --> D[执行业务逻辑]
    D --> E[ACK 成功]
    D --> F[ACK 丢失/超时]
    F --> G[MQ 重复投递]
    G --> C
    C --> H[幂等校验]
    H --> I[已处理则跳过]
    H --> J[未处理则继续执行]

接口重试中的幂等,不只是“防重复提交”

接口重试通常有三类来源:

  1. 用户重复点击
  2. 网关 / SDK 自动重试
  3. 服务调用方因为超时重试

这里最大的问题是:调用方超时,不代表服务端没成功

例如:

  • 服务端已经落库成功
  • 但响应包在网络中丢了
  • 调用方超时后再次发起请求

如果接口没有幂等保护,就会重复创建资源、重复扣款、重复发券。

所以接口幂等的关键,不是“挡住脏请求”,而是让同一业务请求在不可靠网络下仍然只生效一次


方案对比与取舍分析

不同场景的幂等方案,侧重点不一样。下面是一个简化对比。

方案适用场景优点缺点
数据库唯一索引创建型操作、去重键明确简单可靠依赖 DB,热点冲突明显
去重表消息消费、接口请求去重语义清晰,便于排查需要维护记录清理
Redis SETNX高并发短时去重性能高需处理过期、持久化和误删
状态机条件更新状态流转类业务与业务强绑定,最稳设计复杂度较高
请求号 + 结果缓存接口幂等返回结果调用体验好结果存储和过期策略要设计
Inbox/Outbox跨服务一致性链路完整实现成本高

我的建议很直接:

  • 核心资金、库存、订单:优先数据库状态机 + 唯一约束
  • 消息消费:优先去重表 / Inbox
  • 高频弱一致请求:可以 Redis 辅助,但不要只靠 Redis
  • 需要返回首次结果的接口:幂等记录里保存响应摘要

消息消费幂等设计

一个稳妥的消费模型

消息消费场景里,一个比较常见的落地套路是:

  1. 提取消息唯一键 message_id
  2. 插入消费记录表,利用唯一约束去重
  3. 执行业务逻辑
  4. 更新消费状态为成功
  5. 如果执行异常,不 ACK,让消息重试
  6. 如果发现已处理,则直接 ACK 并跳过

消费记录表可以设计成这样:

CREATE TABLE message_consume_record (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    message_id VARCHAR(64) NOT NULL,
    consumer_group_name VARCHAR(64) NOT NULL,
    status VARCHAR(16) NOT NULL,
    payload TEXT,
    result_text TEXT,
    created_at DATETIME NOT NULL,
    updated_at DATETIME NOT NULL,
    UNIQUE KEY uk_message_consumer (message_id, consumer_group_name)
);

其中:

  • message_id + consumer_group_name 联合唯一,防止同一消费者组重复处理
  • status 可以取值:PROCESSINGSUCCESSFAILED

状态流转建议

stateDiagram-v2
    [*] --> PROCESSING
    PROCESSING --> SUCCESS: 业务执行成功
    PROCESSING --> FAILED: 业务执行失败
    FAILED --> PROCESSING: MQ 重试/人工补偿
    SUCCESS --> SUCCESS: 重复消息直接跳过

这里有个细节很重要:
不要只记录“处理过没”,最好记录“处理中/成功/失败”状态。

否则遇到消费者执行到一半宕机,就会出现两难:

  • 你认为“处理过了”,后续不再重试,可能丢业务
  • 你认为“没处理过”,后续重复执行,可能造成副作用

接口重试幂等设计

推荐链路

一个比较通用的接口幂等链路如下:

  1. 调用方生成 Idempotency-Key
  2. 服务端先检查这个 key 是否存在
  3. 不存在则抢占处理权并记录为 PROCESSING
  4. 执行业务逻辑
  5. 成功后保存最终结果并更新为 SUCCESS
  6. 重复请求到来时:
    • 若状态 SUCCESS,直接返回第一次结果
    • 若状态 PROCESSING,返回“处理中”或引导查询
    • 若状态 FAILED,按策略决定是否允许重试
sequenceDiagram
    participant C as Client
    participant S as Service
    participant R as Idempotency Store
    participant DB as Business DB

    C->>S: POST /orders (Idempotency-Key: k1)
    S->>R: 写入 k1=PROCESSING
    R-->>S: 成功
    S->>DB: 创建订单
    DB-->>S: 成功
    S->>R: 更新 k1=SUCCESS + result
    S-->>C: 返回订单结果

    C->>S: 重试相同请求 (k1)
    S->>R: 查询 k1
    R-->>S: SUCCESS + result
    S-->>C: 直接返回首次结果

Key 的设计原则

幂等 key 最好满足:

  • 由调用方生成
  • 同一业务意图固定不变
  • 不同业务请求绝不能复用
  • 尽量不依赖不稳定字段

例如创建订单时,可以这样生成:

  • userId + clientOrderToken
  • 或客户端直接生成 UUID 作为 Idempotency-Key

但不要这样:

  • 仅用时间戳
  • 仅用用户 ID
  • 用会变化的请求体签名但没做字段归一化

实战代码(可运行)

下面用 Python + SQLite 演示两个核心场景:

  1. 消息消费幂等
  2. 接口重试幂等

示例为了便于本地运行,使用 SQLite。生产环境里,你可以替换为 MySQL/PostgreSQL + Redis。

1)消息消费幂等示例

import sqlite3
import json
import time
from datetime import datetime

DB_FILE = "idem_demo.db"

def now():
    return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

def init_db():
    conn = sqlite3.connect(DB_FILE)
    cur = conn.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS message_consume_record (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        message_id TEXT NOT NULL,
        consumer_group_name TEXT NOT NULL,
        status TEXT NOT NULL,
        payload TEXT,
        result_text TEXT,
        created_at TEXT NOT NULL,
        updated_at TEXT NOT NULL,
        UNIQUE(message_id, consumer_group_name)
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS coupon_account (
        user_id INTEGER NOT NULL,
        coupon_code TEXT NOT NULL,
        created_at TEXT NOT NULL
    )
    """)

    conn.commit()
    conn.close()

def grant_coupon(conn, user_id, coupon_code):
    cur = conn.cursor()
    cur.execute(
        "INSERT INTO coupon_account(user_id, coupon_code, created_at) VALUES (?, ?, ?)",
        (user_id, coupon_code, now())
    )

def consume_message(message_id, consumer_group, payload):
    conn = sqlite3.connect(DB_FILE)
    cur = conn.cursor()

    try:
        cur.execute("""
        INSERT INTO message_consume_record(
            message_id, consumer_group_name, status, payload, result_text, created_at, updated_at
        ) VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (
            message_id,
            consumer_group,
            "PROCESSING",
            json.dumps(payload, ensure_ascii=False),
            "",
            now(),
            now()
        ))
        conn.commit()
    except sqlite3.IntegrityError:
        cur.execute("""
        SELECT status, result_text FROM message_consume_record
        WHERE message_id = ? AND consumer_group_name = ?
        """, (message_id, consumer_group))
        row = cur.fetchone()
        conn.close()
        print(f"[SKIP] duplicate message: {message_id}, status={row[0]}, result={row[1]}")
        return

    try:
        user_id = payload["user_id"]
        coupon_code = payload["coupon_code"]

        grant_coupon(conn, user_id, coupon_code)

        cur.execute("""
        UPDATE message_consume_record
        SET status = ?, result_text = ?, updated_at = ?
        WHERE message_id = ? AND consumer_group_name = ?
        """, ("SUCCESS", "coupon granted", now(), message_id, consumer_group))

        conn.commit()
        print(f"[OK] message consumed: {message_id}")
    except Exception as e:
        cur.execute("""
        UPDATE message_consume_record
        SET status = ?, result_text = ?, updated_at = ?
        WHERE message_id = ? AND consumer_group_name = ?
        """, ("FAILED", str(e), now(), message_id, consumer_group))
        conn.commit()
        print(f"[ERR] message failed: {message_id}, err={e}")
        raise
    finally:
        conn.close()

if __name__ == "__main__":
    init_db()

    msg = {
        "user_id": 1001,
        "coupon_code": "NEW_USER_50"
    }

    consume_message("msg-001", "coupon-service", msg)
    consume_message("msg-001", "coupon-service", msg)  # 重复消费

运行效果

第一次执行会真正发券;第二次执行会因为唯一约束命中而跳过。

这个例子虽然简单,但核心思想已经完整:

  • 先登记,再执行业务
  • 唯一约束挡住重复
  • 状态记录便于恢复和排查

2)接口重试幂等示例

import sqlite3
import json
from datetime import datetime

DB_FILE = "idem_demo.db"

def now():
    return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

def init_api_db():
    conn = sqlite3.connect(DB_FILE)
    cur = conn.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS api_idempotency_record (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        idem_key TEXT NOT NULL UNIQUE,
        status TEXT NOT NULL,
        request_body TEXT NOT NULL,
        response_body TEXT,
        created_at TEXT NOT NULL,
        updated_at TEXT NOT NULL
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS biz_order (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        order_no TEXT NOT NULL UNIQUE,
        user_id INTEGER NOT NULL,
        amount INTEGER NOT NULL,
        created_at TEXT NOT NULL
    )
    """)

    conn.commit()
    conn.close()

def create_order(idem_key, user_id, amount):
    conn = sqlite3.connect(DB_FILE)
    cur = conn.cursor()

    request_body = json.dumps({"user_id": user_id, "amount": amount}, ensure_ascii=False)

    try:
        cur.execute("""
        INSERT INTO api_idempotency_record(
            idem_key, status, request_body, response_body, created_at, updated_at
        ) VALUES (?, ?, ?, ?, ?, ?)
        """, (idem_key, "PROCESSING", request_body, "", now(), now()))
        conn.commit()
    except sqlite3.IntegrityError:
        cur.execute("""
        SELECT status, response_body, request_body
        FROM api_idempotency_record WHERE idem_key = ?
        """, (idem_key,))
        row = cur.fetchone()
        conn.close()

        status, response_body, old_request_body = row
        if old_request_body != request_body:
            return {"code": 409, "msg": "幂等键已被其他请求复用"}
        if status == "SUCCESS":
            return {"code": 200, "msg": "ok(duplicate)", "data": json.loads(response_body)}
        return {"code": 202, "msg": "处理中,请稍后重试"}

    try:
        order_no = f"ORD-{int(datetime.utcnow().timestamp())}-{user_id}"
        cur.execute("""
        INSERT INTO biz_order(order_no, user_id, amount, created_at)
        VALUES (?, ?, ?, ?)
        """, (order_no, user_id, amount, now()))

        response = {"order_no": order_no, "user_id": user_id, "amount": amount}

        cur.execute("""
        UPDATE api_idempotency_record
        SET status = ?, response_body = ?, updated_at = ?
        WHERE idem_key = ?
        """, ("SUCCESS", json.dumps(response, ensure_ascii=False), now(), idem_key))

        conn.commit()
        return {"code": 200, "msg": "ok", "data": response}
    except Exception as e:
        cur.execute("""
        UPDATE api_idempotency_record
        SET status = ?, response_body = ?, updated_at = ?
        WHERE idem_key = ?
        """, ("FAILED", json.dumps({"error": str(e)}, ensure_ascii=False), now(), idem_key))
        conn.commit()
        return {"code": 500, "msg": str(e)}
    finally:
        conn.close()

if __name__ == "__main__":
    init_api_db()

    print(create_order("req-abc-001", 1001, 299))
    print(create_order("req-abc-001", 1001, 299))  # 相同幂等键重试
    print(create_order("req-abc-001", 1001, 399))  # 相同幂等键但不同请求体

这个示例体现了三个关键点

  1. 幂等键唯一
  2. 同一幂等键必须绑定同一请求体
  3. 重复请求返回首次结果,而不是重复执行业务

这第三点非常重要。很多实现虽然防止了重复落库,但第二次请求只返回一句“重复请求”,对调用方并不友好。
更实用的做法是:存下首次处理结果,重试时直接回放


容量估算与存储选择

幂等记录不是免费的,特别是在高并发系统里,要提前估算量级。

一个简单估算方法

假设:

  • 接口峰值 QPS:2000
  • 其中 20% 需要幂等保护
  • 幂等记录保留 3 天

那么记录量大约是:

2000 * 20% * 86400 * 3 = 103,680,000

也就是 1 亿级别。

这个量级下,几个建议很实际:

  • 不要把长周期幂等数据全部压在单表
  • 按日期分表或冷热分层
  • 只保留必要字段,避免大对象响应全量存储
  • 对成功记录设置过期清理策略
  • 对关键业务保留审计摘要,不一定保留完整请求包

Redis 还是数据库?

很多人第一反应是 Redis。我的经验是:

  • Redis 适合做前置快速拦截
  • 数据库适合做最终可信记录

如果只是 SETNX 一下,业务刚执行完 Redis key 过期了,或者 Redis 故障丢数据,重复执行还是会发生。所以更稳妥的组合通常是:

  • Redis:抗高并发、快速判断
  • DB:落最终事实、支持审计与恢复

常见坑与排查

1. 只在应用内存里做去重

这在单机场景可能有点用,但在分布式部署下基本不可靠:

  • 实例之间内存不共享
  • 重启后数据丢失
  • 扩缩容时无法保证一致

排查方式:

  • 检查重复请求是否命中不同实例
  • 核对幂等记录是否落到共享存储
  • 查看重启时段是否与重复问题吻合

2. 业务成功了,但幂等状态没更新

这是很经典的“半成功”问题:

  • 订单已创建
  • 但幂等记录仍是 PROCESSING
  • 调用方重试后系统误以为还没完成

典型原因:

  • 业务表和幂等表不在同一个事务
  • 先执行业务再写幂等记录
  • 更新幂等结果时发生超时或锁等待

建议:

  • 尽量把业务结果写入与幂等状态更新放在同一事务
  • 至少保证“先写幂等占位,再执行业务”
  • 对长时间 PROCESSING 的记录做巡检和补偿

3. 同一个幂等键被不同请求复用

这类问题经常来自客户端实现不规范,比如把用户 ID 当幂等键,导致不同订单共享同一个 key。

表现:

  • 用户第二次下单总是返回第一次订单结果
  • 或直接报“重复请求”

排查方式:

  • 对比幂等记录中的请求体摘要
  • 记录 idem_key 与请求 hash 的映射
  • 对不一致请求直接返回 409

4. 只保护主操作,没保护副作用

这是我前面提到的“发券坑”。

例如:

  • 订单表更新是幂等的
  • 但通知、发券、积分、库存冻结没有统一幂等控制

那么主流程看起来没问题,副作用却可能被执行多次。

建议:

  • 把链路拆成多个可幂等子步骤
  • 每个对外副作用都带唯一业务号
  • 不要假设下游一定支持精确一次

5. 把“查询接口”误当成天然安全

查询通常是幂等的,但如果查询里埋了副作用,比如:

  • 查询时顺手修复状态
  • 查询时刷新某些标记
  • 查询时触发缓存回填并写库

那它就不再是纯幂等读操作了。

排查方向:

  • 审核查询链路里是否有写操作
  • 核查 ORM 自动 flush、懒加载触发更新等隐式行为

安全/性能最佳实践

安全实践

1. 幂等键要有边界,不要无限复用

幂等键如果长期有效,会带来两个问题:

  • 存储膨胀
  • 被恶意复用导致业务阻塞

建议:

  • 为幂等记录设置 TTL 或归档周期
  • 对外暴露的 key 要有长度、字符集校验
  • 对热点 key 做访问频控

2. 不要把敏感数据直接存入幂等记录

有些团队为了“方便排查”,把完整请求体、完整响应体一股脑存库,里面可能包含:

  • 身份证号
  • 手机号
  • 地址
  • 支付信息

建议:

  • 只保存必要字段和摘要
  • 敏感字段脱敏或加密
  • 日志与幂等表权限隔离

3. 防重不等于防刷

幂等性只解决“同一次请求重复执行”的问题,不等于:

  • 防攻击
  • 防恶意刷接口
  • 防撞库

所以仍然需要配合:

  • 限流
  • 鉴权
  • 签名校验
  • 风控规则

性能实践

1. 唯一索引字段尽量短

如果幂等键过长,唯一索引会放大写入成本。
建议:

  • 原始请求号保留
  • 同时生成固定长度摘要作为索引键,例如 SHA-256 截断

2. 热点场景先本地/缓存过滤,再落库

在高并发秒杀或短时突发流量里,可以采用:

  • 网关层请求号校验
  • Redis SETNX
  • 业务库最终唯一约束兜底

也就是说,前面挡一层性能,后面守一层正确性

3. 清理策略要提前设计

幂等表一旦不清理,很容易变成大表,带来:

  • 索引膨胀
  • 查询变慢
  • 归档困难

建议:

  • 成功记录保留短周期
  • 失败记录保留更久,便于排查
  • 定期归档冷数据

一套更完整的架构建议

如果你的系统已经有一定复杂度,我建议把幂等设计纳入统一治理,而不是每个服务各写一套。

可以考虑抽象出一个统一组件,至少包含:

  • 幂等键校验
  • 状态存储
  • 请求体摘要比对
  • 首次结果回放
  • 超时处理中记录扫描
  • 监控指标与告警

一个简化的架构视图如下:

flowchart LR
    A[客户端/调用方] --> B[网关]
    B --> C[业务服务]
    C --> D[幂等组件]
    D --> E[(Redis)]
    D --> F[(MySQL)]
    C --> G[(业务库)]
    C --> H[MQ]
    H --> I[消费者]
    I --> J[消费去重表]

可以重点监控这些指标:

  • 幂等命中率
  • 重复消息比例
  • PROCESSING 超时数量
  • 同 key 不同请求体冲突数
  • 唯一索引冲突频次
  • 重试成功率

这些指标往往比单纯看错误日志更有用。因为很多幂等问题,不会直接报错,而是默默地产生业务重复。


总结

幂等性这件事,说简单也简单,说复杂也复杂。
简单在于核心原则很少:

  1. 定义“同一次业务操作”的唯一标识
  2. 用可靠存储记录处理状态
  3. 让重复请求返回一致结果,而不是重复执行副作用

复杂在于,它不是只加一行去重代码就结束了,而是要覆盖:

  • 消息重复投递
  • 接口超时重试
  • 并发竞争
  • 半成功状态
  • 下游副作用重复
  • 数据清理与监控治理

如果你准备在项目里落地,我建议按下面这个顺序做,最不容易出错:

  1. 先梳理业务语义:哪些操作必须幂等
  2. 给每类操作定义唯一业务键
  3. 优先用数据库唯一约束或状态机兜底
  4. 高并发场景再叠加 Redis 做前置拦截
  5. 保存首次处理结果,优化重试体验
  6. 补上超时处理中巡检和告警
  7. 逐步把副作用链路也纳入幂等设计

最后给一个边界判断:
如果一个操作本质上是“累计型”的,比如“积分 +10”、“余额 +100”,那它天然不是幂等操作。要想实现幂等,必须把它改造成“某笔交易只入账一次”的模型。不要试图拿技术手段硬抹平错误的业务语义。

这通常也是分布式幂等设计里,最值得花时间想清楚的一步。


分享到:

上一篇
《自动化测试中的接口回归体系设计:从用例分层、数据构造到 CI 持续校验实战》
下一篇
《Java 中使用 CompletableFuture 构建高并发异步任务编排的实战指南》