跳转到内容
123xiao | 无名键客

《分布式架构中基于 Kafka 与 Outbox 模式实现最终一致性的实战指南》

字数: 0 阅读时长: 1 分钟

背景与问题

在分布式系统里,“业务库写成功了,但消息没发出去”,几乎是所有团队早晚都会遇到的问题。

一个很典型的场景:

  • 订单服务创建订单
  • 需要通知库存服务扣减库存
  • 还要通知积分服务发放积分
  • 还可能通知风控、履约、搜索索引等下游系统

如果你在代码里这么写:

  1. 先更新数据库
  2. 再发送 Kafka 消息

看起来很自然,但这里有个致命窗口期:

  • 数据库事务已经提交成功
  • Kafka 消息却因为网络抖动、Broker 不可用、进程崩溃而发送失败

结果就是:本地状态已变更,外部世界却不知道
这就是分布式一致性里最让人头疼的一类问题。

很多人第一反应是上 2PC/XA,但实际业务里往往会发现:

  • 中间件支持有限
  • 性能开销高
  • 耦合度太强
  • 运维复杂度不低
  • 一旦链路长,问题更难排

所以在互联网业务系统里,更常见的落地方式不是强一致,而是:

接受短暂不一致,通过可靠事件投递和幂等消费,达成最终一致性。

Outbox 模式 + Kafka,就是这条路线中非常经典、也非常实用的一种组合。


为什么选 Outbox,而不是“数据库提交后直接发 Kafka”

先把核心矛盾讲透:

直接发消息的问题

假设你的订单创建逻辑如下:

@Transactional
public void createOrder(CreateOrderCommand cmd) {
    orderRepository.save(order);
    kafkaTemplate.send("order-created", event);
}

看起来是在一个方法里完成的,但其实不是一个原子操作:

  • 数据库事务的提交,由数据库保证
  • Kafka 消息发送,由 Kafka 保证
  • 它们不是同一个事务边界

于是会有几种失败场景:

  1. DB 成功,Kafka 失败
  2. Kafka 发出去了,但事务最终回滚
  3. 应用在两步之间崩溃
  4. 重试导致消息重复

这些都不是“少见角落”,而是线上迟早发生的现实。

Outbox 模式的核心思路

Outbox 的做法很朴素:

  • 不直接把消息发到 Kafka
  • 而是在本地事务里,把业务数据和“待发送事件”一起写入数据库
  • 之后由独立的投递器,把 Outbox 表中的事件转发到 Kafka
  • 转发成功后,再把事件标记为已发送

这样,至少能保证一件关键的事:

只要业务数据提交成功,对应事件一定会被持久化到本地数据库,不会凭空丢失。

这一步非常重要,因为它把“业务变更”和“事件产生”绑定到了一个本地事务中。


核心原理

一句话理解

Outbox 模式本质上是:

把跨系统一致性问题,先降维成单库事务问题,再通过异步投递把事件可靠送出去。

整体流程

flowchart LR
    A[业务请求] --> B[应用服务]
    B --> C[本地事务: 写业务表]
    C --> D[本地事务: 写 Outbox 表]
    D --> E[事务提交]
    E --> F[Outbox Relay/Poller]
    F --> G[发送到 Kafka Topic]
    G --> H[消费者处理]
    F --> I[标记 Outbox 已发送]

这里有两个关键点:

1. 业务数据与 Outbox 事件同事务提交

例如创建订单时,事务里同时写两张表:

  • orders
  • outbox_events

如果事务回滚,两张表都回滚;如果事务提交,两张表都提交。

这样就不会出现“订单有了,但事件没记录”的情况。

2. 投递 Kafka 与业务事务解耦

投递 Kafka 的逻辑由单独组件负责,可以是:

  • 应用内定时轮询(poller)
  • CDC(如 Debezium)监听 Outbox 表变更
  • 独立 Relay 服务

对于大多数团队来说,先用 Poller 落地最快,后续再演进到 CDC。


方案对比与取舍分析

在架构选型时,我建议不要一上来就把 Outbox 神化。它很好,但也有边界。

常见方案对比

方案一致性实现复杂度性能适用场景
本地事务后直接发 Kafka允许少量丢消息的非关键场景
2PC/XA低~中强一致且基础设施支持完善
本地消息表 / Outbox最终一致互联网业务主流方案
事务消息(如部分 MQ 提供)最终一致/较强依赖特定消息中间件能力

为什么 Kafka + Outbox 很常见

因为它在三者之间取得了平衡:

  • 一致性足够强:不会无声丢事件
  • 技术栈通用:数据库 + Kafka 就能做
  • 演进空间大:先轮询,后 CDC
  • 对业务侵入可控:主要增加事件建模和幂等处理

它的代价也要接受

Outbox 不是白来的,它会带来:

  • 额外表设计
  • 投递延迟
  • 消息重复的可能性
  • 消费者幂等复杂度
  • Outbox 清理和分区管理成本

所以它不是“所有系统都必须上”,而是适合:

  • 核心业务事件
  • 需要可靠异步传播
  • 能接受秒级或百毫秒级最终一致
  • 有能力做幂等消费

数据模型设计

下面给一个比较实用的 Outbox 表设计。

业务表

CREATE TABLE orders (
    id BIGSERIAL PRIMARY KEY,
    order_no VARCHAR(64) NOT NULL UNIQUE,
    user_id BIGINT NOT NULL,
    amount NUMERIC(12,2) NOT NULL,
    status VARCHAR(32) NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

Outbox 表

CREATE TABLE outbox_events (
    id BIGSERIAL PRIMARY KEY,
    event_id VARCHAR(64) NOT NULL UNIQUE,
    aggregate_type VARCHAR(64) NOT NULL,
    aggregate_id VARCHAR(64) NOT NULL,
    event_type VARCHAR(128) NOT NULL,
    payload JSONB NOT NULL,
    headers JSONB,
    status VARCHAR(16) NOT NULL DEFAULT 'NEW',
    retry_count INT NOT NULL DEFAULT 0,
    next_retry_at TIMESTAMP NOT NULL DEFAULT NOW(),
    created_at TIMESTAMP NOT NULL DEFAULT NOW(),
    sent_at TIMESTAMP NULL
);

CREATE INDEX idx_outbox_status_next_retry
ON outbox_events(status, next_retry_at, id);

字段建议

  • event_id:全局唯一事件 ID,用于去重
  • aggregate_type / aggregate_id:方便按业务实体追踪
  • event_type:如 OrderCreated
  • payload:完整事件载荷
  • statusNEW / SENDING / SENT / FAILED
  • retry_count:失败重试次数
  • next_retry_at:退避重试时间
  • sent_at:发送成功时间

这里我特别建议保留 event_id。后面做消费者幂等、问题排查、链路追踪时,真的很好用。


事件时序图

sequenceDiagram
    participant Client as 客户端
    participant App as 订单服务
    participant DB as 数据库
    participant Relay as Outbox投递器
    participant Kafka as Kafka
    participant Consumer as 下游消费者

    Client->>App: 创建订单
    App->>DB: 事务内写 orders
    App->>DB: 事务内写 outbox_events
    DB-->>App: 提交成功
    App-->>Client: 返回成功

    loop 定时轮询
        Relay->>DB: 查询待发送事件
        DB-->>Relay: 返回事件列表
        Relay->>Kafka: 发送事件
        Kafka-->>Relay: ack
        Relay->>DB: 标记 SENT
    end

    Kafka->>Consumer: 推送/拉取消息
    Consumer->>Consumer: 幂等处理

实战代码(可运行)

下面用 Python + PostgreSQL + Kafka 做一个最小可运行示例。
我会尽量写得贴近真实项目,但不引入太多框架,方便你直接看懂。

依赖安装

pip install sqlalchemy psycopg2-binary kafka-python

建表 SQL

CREATE TABLE IF NOT EXISTS orders (
    id BIGSERIAL PRIMARY KEY,
    order_no VARCHAR(64) NOT NULL UNIQUE,
    user_id BIGINT NOT NULL,
    amount NUMERIC(12,2) NOT NULL,
    status VARCHAR(32) NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS outbox_events (
    id BIGSERIAL PRIMARY KEY,
    event_id VARCHAR(64) NOT NULL UNIQUE,
    aggregate_type VARCHAR(64) NOT NULL,
    aggregate_id VARCHAR(64) NOT NULL,
    event_type VARCHAR(128) NOT NULL,
    payload JSONB NOT NULL,
    headers JSONB,
    status VARCHAR(16) NOT NULL DEFAULT 'NEW',
    retry_count INT NOT NULL DEFAULT 0,
    next_retry_at TIMESTAMP NOT NULL DEFAULT NOW(),
    created_at TIMESTAMP NOT NULL DEFAULT NOW(),
    sent_at TIMESTAMP NULL
);

CREATE INDEX IF NOT EXISTS idx_outbox_status_next_retry
ON outbox_events(status, next_retry_at, id);

1)业务写入:订单与 Outbox 同事务提交

# app.py
import json
import uuid
from decimal import Decimal
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+psycopg2://postgres:postgres@localhost:5432/testdb"

engine = create_engine(DATABASE_URL, future=True)
SessionLocal = sessionmaker(bind=engine, future=True)

def create_order(order_no: str, user_id: int, amount: Decimal):
    event_id = str(uuid.uuid4())
    payload = {
        "eventId": event_id,
        "eventType": "OrderCreated",
        "orderNo": order_no,
        "userId": user_id,
        "amount": str(amount),
    }

    with SessionLocal.begin() as session:
        session.execute(
            text("""
                INSERT INTO orders(order_no, user_id, amount, status)
                VALUES (:order_no, :user_id, :amount, :status)
            """),
            {
                "order_no": order_no,
                "user_id": user_id,
                "amount": amount,
                "status": "CREATED"
            }
        )

        session.execute(
            text("""
                INSERT INTO outbox_events(
                    event_id, aggregate_type, aggregate_id, event_type,
                    payload, headers, status, retry_count, next_retry_at
                ) VALUES (
                    :event_id, :aggregate_type, :aggregate_id, :event_type,
                    CAST(:payload AS JSONB), CAST(:headers AS JSONB),
                    :status, 0, NOW()
                )
            """),
            {
                "event_id": event_id,
                "aggregate_type": "Order",
                "aggregate_id": order_no,
                "event_type": "OrderCreated",
                "payload": json.dumps(payload),
                "headers": json.dumps({"source": "order-service"}),
                "status": "NEW",
            }
        )

    print(f"order created, order_no={order_no}, event_id={event_id}")

if __name__ == "__main__":
    create_order("ORD-10001", 1001, Decimal("99.90"))

这段代码的关键点只有一个:

订单写入和 Outbox 写入共用同一个数据库事务。

只要 with SessionLocal.begin() 成功提交,业务状态和待发送事件就同时落库。


2)Outbox Relay:轮询并发送 Kafka

# relay.py
import json
import time
from datetime import datetime, timedelta
from kafka import KafkaProducer
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+psycopg2://postgres:postgres@localhost:5432/testdb"
KAFKA_BOOTSTRAP_SERVERS = ["localhost:9092"]
TOPIC = "order-events"

engine = create_engine(DATABASE_URL, future=True)
SessionLocal = sessionmaker(bind=engine, future=True)

producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    key_serializer=lambda v: v.encode("utf-8"),
    acks="all",
    retries=3,
)

def fetch_and_lock_batch(session, batch_size=10):
    rows = session.execute(
        text("""
            WITH cte AS (
                SELECT id
                FROM outbox_events
                WHERE status IN ('NEW', 'FAILED')
                  AND next_retry_at <= NOW()
                ORDER BY id
                FOR UPDATE SKIP LOCKED
                LIMIT :batch_size
            )
            UPDATE outbox_events o
            SET status = 'SENDING'
            FROM cte
            WHERE o.id = cte.id
            RETURNING o.id, o.event_id, o.event_type, o.aggregate_id, o.payload, o.retry_count
        """),
        {"batch_size": batch_size}
    ).mappings().all()
    return rows

def mark_sent(session, row_id):
    session.execute(
        text("""
            UPDATE outbox_events
            SET status = 'SENT', sent_at = NOW()
            WHERE id = :id
        """),
        {"id": row_id}
    )

def mark_failed(session, row_id, retry_count):
    delay_seconds = min(60, 2 ** min(retry_count + 1, 6))
    next_retry_at = datetime.utcnow() + timedelta(seconds=delay_seconds)
    session.execute(
        text("""
            UPDATE outbox_events
            SET status = 'FAILED',
                retry_count = retry_count + 1,
                next_retry_at = :next_retry_at
            WHERE id = :id
        """),
        {"id": row_id, "next_retry_at": next_retry_at}
    )

def relay_once():
    with SessionLocal.begin() as session:
        rows = fetch_and_lock_batch(session, batch_size=10)

    if not rows:
        return 0

    processed = 0
    for row in rows:
        payload = row["payload"]
        if isinstance(payload, str):
            payload = json.loads(payload)

        try:
            future = producer.send(
                TOPIC,
                key=row["aggregate_id"],
                value=payload,
                headers=[("event_id", row["event_id"].encode("utf-8"))]
            )
            future.get(timeout=10)

            with SessionLocal.begin() as session:
                mark_sent(session, row["id"])

            print(f"[SENT] id={row['id']} event_id={row['event_id']}")
            processed += 1
        except Exception as e:
            with SessionLocal.begin() as session:
                mark_failed(session, row["id"], row["retry_count"])
            print(f"[FAILED] id={row['id']} err={e}")

    return processed

if __name__ == "__main__":
    while True:
        count = relay_once()
        if count == 0:
            time.sleep(2)

这段代码解决了什么问题

  1. 批量拉取待发送事件
  2. FOR UPDATE SKIP LOCKED 避免多实例重复抢同一批记录
  3. Kafka 发送成功后再标记 SENT
  4. 失败则增加 retry_count,并指数退避重试

这已经是一个能在很多中小规模系统里跑起来的版本了。


3)Kafka 消费者:幂等处理

最终一致性方案里,重复消息不是异常,而是常态
因此消费者必须幂等。

先建一张消费去重表:

CREATE TABLE IF NOT EXISTS processed_events (
    event_id VARCHAR(64) PRIMARY KEY,
    consumer_group VARCHAR(128) NOT NULL,
    processed_at TIMESTAMP NOT NULL DEFAULT NOW()
);

消费者代码如下:

# consumer.py
import json
from kafka import KafkaConsumer
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+psycopg2://postgres:postgres@localhost:5432/testdb"
TOPIC = "order-events"
GROUP_ID = "inventory-service"

engine = create_engine(DATABASE_URL, future=True)
SessionLocal = sessionmaker(bind=engine, future=True)

consumer = KafkaConsumer(
    TOPIC,
    bootstrap_servers=["localhost:9092"],
    group_id=GROUP_ID,
    auto_offset_reset="earliest",
    enable_auto_commit=False,
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)

def process_business(payload):
    print(f"process business: {payload}")

def already_processed(session, event_id):
    row = session.execute(
        text("""
            SELECT 1 FROM processed_events
            WHERE event_id = :event_id AND consumer_group = :consumer_group
        """),
        {"event_id": event_id, "consumer_group": GROUP_ID}
    ).first()
    return row is not None

def mark_processed(session, event_id):
    session.execute(
        text("""
            INSERT INTO processed_events(event_id, consumer_group)
            VALUES (:event_id, :consumer_group)
        """),
        {"event_id": event_id, "consumer_group": GROUP_ID}
    )

if __name__ == "__main__":
    for msg in consumer:
        payload = msg.value
        event_id = payload["eventId"]

        try:
            with SessionLocal.begin() as session:
                if already_processed(session, event_id):
                    print(f"skip duplicated event: {event_id}")
                else:
                    process_business(payload)
                    mark_processed(session, event_id)

            consumer.commit()
        except Exception as e:
            print(f"consume failed: {e}")

幂等的关键原则

  • 业务处理和幂等标记最好放在一个本地事务里
  • 只有事务成功,才提交 Kafka offset
  • 这样即使消费者重启,也最多重复投递,不会漏处理

状态流转图

stateDiagram-v2
    [*] --> NEW
    NEW --> SENDING: Relay 抢占
    SENDING --> SENT: Kafka ack 成功
    SENDING --> FAILED: 发送失败
    FAILED --> SENDING: 到达重试时间
    SENT --> [*]

常见坑与排查

这一部分我想讲得更接地气一些。很多文章只讲“正确做法”,但真正花时间的是排查线上问题。

坑 1:Outbox 已写入,但一直发不出去

常见原因

  • Relay 没启动
  • SQL 条件写错,筛不到 NEW/FAILED
  • next_retry_at 被更新到未来很久
  • Kafka Broker 不可达
  • 事务未提交,读不到数据

排查 SQL

SELECT id, event_id, status, retry_count, next_retry_at, created_at
FROM outbox_events
ORDER BY id DESC
LIMIT 20;

重点看:

  • 是否大量堆积在 NEW
  • 是否都卡在 FAILED
  • next_retry_at 是否异常大
  • 是否出现长时间 SENDING

如果很多记录一直是 SENDING,通常说明:

  • Relay 抢到任务后崩了
  • 但没有“超时回收”机制

建议做法

定期把长时间 SENDING 的记录回退到 FAILED

UPDATE outbox_events
SET status = 'FAILED',
    next_retry_at = NOW(),
    retry_count = retry_count + 1
WHERE status = 'SENDING'
  AND created_at < NOW() - INTERVAL '10 minutes';

更严谨一点,应该增加 updated_at 字段,用它判断超时,而不是 created_at


坑 2:消息重复消费

这几乎一定会发生,不要抱侥幸心理。

发生原因

  • Relay 发成功了,但标记 SENT 前崩溃
  • Kafka ack 已收到,但本地更新状态失败
  • 消费者处理成功了,但 offset 未提交
  • 消费者 rebalance 导致重复拉取

正确心态

不要试图完全消灭重复,应该:

让系统具备“重复也不出错”的能力。

排查方法

  1. event_id 查 Outbox
  2. 查 Kafka 消费日志
  3. 查消费者幂等表 processed_events
  4. 核对业务状态是否被重复修改

如果业务本身不方便天然幂等,比如“账户加钱”,那就必须:

  • 引入业务流水号
  • 用唯一约束确保一次事件只入账一次

坑 3:顺序性被打乱

Kafka 只在同一个 partition 内保证顺序。
如果你希望同一订单、同一用户的事件有序,就要设计好消息 key。

实战建议

  • 订单维度顺序:key = orderNo
  • 用户维度顺序:key = userId
  • 不要随机 key,否则顺序性基本没法谈

但这里有个现实取舍:

  • key 太集中,容易产生热点分区
  • key 太分散,跨实体就无法保证顺序

所以顺序一定要按最小业务一致性单元来定义,而不是贪心要求“全局有序”。


坑 4:Outbox 表越积越大

这个坑非常常见。刚开始只有几百条,感觉没事;半年后可能就几千万。

影响

  • 轮询变慢
  • 索引膨胀
  • vacuum 压力增大
  • 备份恢复更慢

建议做法

  1. 已发送数据定期归档或删除
  2. 按时间分区
  3. 只扫描活跃分区
  4. status + next_retry_at + id 建复合索引

比如定期删除 7 天前已发送数据:

DELETE FROM outbox_events
WHERE status = 'SENT'
  AND sent_at < NOW() - INTERVAL '7 days';

如果数据量很大,建议改成分区表 + 分区级清理,比大表 delete 更稳。


坑 5:消费者幂等表本身成了瓶颈

processed_events 如果写得很猛,也会变大。

解决思路

  • 只对关键事件做持久化幂等
  • 用业务唯一约束替代单独幂等表
  • 对幂等表做 TTL/归档
  • Redis 只能做辅助,不能替代强幂等持久化

我踩过一个坑:团队一开始用 Redis 做消费去重,结果 Redis key 过期后,补消费任务又把老消息重新处理了一遍。
所以只要场景涉及资金、库存、权益,最终判定还是要落到数据库唯一约束或持久化记录上


安全/性能最佳实践

Outbox 不是只要能跑就行,真正上线还要考虑安全和性能。

1. 安全:消息内容最小化

不要把整个业务对象原封不动塞进 Kafka。

建议

  • 只传下游需要的字段
  • 避免敏感信息直接出现在 payload
  • 必要时做字段脱敏或加密
  • headers 中不要泄露内部凭据

例如:

  • 不要把用户完整身份证号、手机号、支付信息直接广播
  • 可以改为用户 ID + 授权读取方式

2. 性能:控制轮询批次与频率

轮询不是越快越好,过快会给数据库带来额外压力。

常见参数建议

  • 批次大小:100 ~ 1000,按实例规模调整
  • 空轮询休眠:500ms ~ 2s
  • 重试退避:指数退避,避免打爆 Kafka
  • 多实例并发:结合 FOR UPDATE SKIP LOCKED

如果你的事件量已经很大,比如每秒几万条,那么单纯轮询数据库通常会吃力,这时建议演进到:

  • Debezium CDC 监听 Outbox 表
  • Kafka Connect 接入
  • 降低应用层轮询压力

3. Kafka 生产端配置要保守一点

关键业务事件我建议:

  • acks=all
  • 开启重试
  • 合理设置 delivery.timeout.ms
  • 根据吞吐需求决定是否启用幂等生产者

如果你使用 Java 客户端,通常还会建议:

  • enable.idempotence=true
  • max.in.flight.requests.per.connection 合理配置
  • retries 足够大

这不能解决 Outbox 的所有问题,但能减少 Broker 侧写入失败和重复风险。


4. 消费端必须按“至少一次”设计

Kafka 很多时候现实上就是 at-least-once 语义。
因此消费端要默认自己会收到重复消息。

消费端建议

  • 先执行业务事务,再提交 offset
  • 幂等校验与业务更新同事务
  • 对失败消息进入重试或死信队列
  • 对不可恢复错误进行人工告警

5. 做好可观测性

如果没有监控,Outbox 方案会变成“默默出问题”。

必备指标

  • Outbox NEW 数量
  • Outbox FAILED 数量
  • 最老未发送事件的堆积时长
  • Relay 发送成功率/失败率
  • Kafka produce latency
  • 消费延迟
  • 死信数量

必备日志字段

  • event_id
  • aggregate_id
  • event_type
  • Kafka topic / partition / offset
  • retry_count
  • trace_id

我个人很建议把 event_id 贯穿:

  • 业务日志
  • Outbox 表
  • Kafka headers
  • 消费日志

这样问题追起来会轻松很多。


容量估算思路

架构设计如果只讲模式,不讲容量,很容易到上线时翻车。

简单估算方法

假设:

  • 每秒创建订单 2000 单
  • 每个订单产生 1 条核心事件
  • Outbox 平均每条记录 1KB
  • 保留 7 天已发送记录

那么:

  • 每秒新增 Outbox:2000 条
  • 每天约:2000 * 86400 ≈ 1.728 亿条
  • 数据量约:1.728 亿 KB ≈ 165 GB/天(粗估,不含索引)

这时候你就会发现:

Outbox 如果不清理、不分区,数据库很快就顶不住。

所以在中高吞吐场景,至少要考虑:

  • 表分区
  • 快速归档
  • CDC 替代高频轮询
  • Kafka topic 分区数规划
  • 消费组扩容能力

一个务实建议

如果你们现在还在日均几十万到几百万事件量级,先把:

  • 同事务写 Outbox
  • Relay 重试
  • 消费幂等
  • 监控告警

这四件事做好,收益就已经很大。
不要一开始就把 CDC、事件平台、复杂编排全部上齐,容易把项目做重。


落地建议:从简单版本开始演进

如果你准备在现有系统引入 Outbox,我建议按这个节奏来:

第 1 阶段:最小可用版本

  • 本地事务写业务表 + Outbox 表
  • 定时轮询发送 Kafka
  • 消费端幂等
  • 基础监控

适合先验证业务正确性。

第 2 阶段:增强可靠性

  • 增加 SENDING 超时回收
  • 指数退避重试
  • 死信处理
  • 完善告警与追踪

适合正式上线。

第 3 阶段:高吞吐优化

  • Outbox 分区表
  • Relay 多实例并发
  • CDC 替代轮询
  • 事件治理平台化

适合订单、支付、库存等核心链路扩大规模后演进。


总结

在分布式架构里,“写库 + 发消息”不是天然原子操作
如果直接在业务代码里提交数据库后发送 Kafka,线上迟早会遇到不一致问题。

Outbox 模式的价值在于:

  • 用本地事务保证“业务变更”和“事件产生”同时落地
  • 用异步 Relay 把事件可靠投递到 Kafka
  • 用消费端幂等承接重复消息
  • 最终实现可控、可追踪、可恢复的最终一致性

你可以把它记成一句话:

先把事件安全地存下来,再慢慢可靠地发出去。

最后给几个可执行建议:

  1. 关键业务事件优先上 Outbox
  2. 业务写库与 Outbox 入库必须同事务
  3. 不要追求零重复,要追求幂等
  4. event_id 贯穿全链路
  5. 尽早规划 Outbox 清理、分区和监控
  6. 中低流量先轮询,高流量再考虑 CDC

边界条件也要清楚:

  • 如果你要求严格实时强一致,Outbox 不是银弹
  • 如果你完全不能接受重复处理,必须从业务模型上做唯一约束
  • 如果吞吐极高而数据库较弱,单纯轮询 Outbox 会成为瓶颈

但对于大多数互联网业务系统来说,Kafka + Outbox 仍然是实现最终一致性的高性价比方案
它不花哨,却足够务实;不完美,却非常能打。


分享到:

下一篇
《Web逆向实战:中级开发者如何定位并复现前端签名算法实现接口自动化调用》