Web3 中间件实战:用 The Graph + Ethers.js 构建可扩展的链上数据查询与事件监听服务
在 Web3 应用里,很多团队一开始都把“读链上数据”和“监听链上事件”想得比较简单:前端直接调 RPC,后端直接订阅合约事件,能跑起来就行。
但业务一旦变复杂,问题会很快冒出来:
- 要按条件筛选历史事件,RPC 查得又慢又贵
- 要做分页、聚合、排行榜,直接扫链很痛苦
- WebSocket 监听偶发断连,漏事件不好补
- 重组(reorg)和确认数处理不好,业务数据会抖动
- 多个服务都自己连链,调用逻辑分散,维护成本高
这时候,中间件层就很有价值:
The Graph 负责“索引和查询历史数据”,Ethers.js 负责“实时链交互与事件监听”,两者结合,能搭出一套比较稳的链上数据服务。
这篇文章我会带你从一个实用角度出发,做一个可运行的教程:
- 用 The Graph 建立 ERC-20 转账事件的索引
- 用 Ethers.js 做实时监听
- 用 Node.js/Express 封装成统一查询接口
- 补上生产环境常见的坑、排查方式和性能/安全建议
背景与问题
先明确一下这个方案要解决什么。
假设你在做一个钱包、积分系统或者交易分析后台,需要支持:
- 查询某个地址最近的转账记录
- 查询某个 Token 的最近事件
- 实时感知新的 Transfer 事件
- 给上层应用输出统一的 REST/JSON 接口
- 在监听掉线后,能从历史区块补数据
如果你全靠 RPC:
eth_getLogs能查历史日志,但区块范围一大就容易超时- 复杂过滤和分页不友好
- 排序、聚合、字段关联都得自己做
- 多次重复查询同一批链上事件,成本高
而 The Graph 擅长把链上事件转成结构化实体,支持 GraphQL 查询;Ethers.js 则很适合做:
- 合约交互
- 实时订阅
- 区块/事件补偿扫描
- 与多 RPC 提供商集成
所以比较自然的分工是:
- 历史数据与复杂查询:The Graph
- 实时监听与链交互:Ethers.js
- 统一业务出口:Node.js 中间件
前置知识与环境准备
建议你具备以下基础:
- 知道以太坊事件(event/log)是什么
- 用过 Node.js
- 对 ABI、合约地址、区块确认数有基本概念
- 会一点 GraphQL 会更轻松
环境
本文示例使用:
- Node.js 18+
- npm 9+
- Ethers.js v6
- Express
- The Graph(Graph Node 或 Hosted/Studio 风格接口均可)
- 一个支持 ERC-20
Transfer事件的合约
安装依赖
mkdir web3-middleware-demo
cd web3-middleware-demo
npm init -y
npm install express ethers graphql-request dotenv
项目结构可以先这样:
web3-middleware-demo/
├─ src/
│ ├─ app.js
│ ├─ config.js
│ ├─ graphClient.js
│ ├─ listener.js
│ ├─ sync.js
│ └─ abi/
│ └─ erc20.json
├─ .env
└─ package.json
核心原理
先看整体架构。
flowchart LR
A[区块链网络] --> B[Ethers.js 监听器]
A --> C[The Graph 索引器]
C --> D[GraphQL 查询层]
B --> E[事件缓冲/补偿同步]
D --> F[Node.js 中间件 API]
E --> F
F --> G[前端/业务服务]
这个架构的关键点是:查询链路和实时链路分开,但在中间件层汇合。
1. The Graph 解决“可查询的历史数据”
The Graph 的工作方式本质上是:
- 订阅指定合约事件
- 在 mapping 里把事件转成实体
- 存进索引数据库
- 通过 GraphQL 提供查询
对于“按地址查最近 20 笔转账”这种需求,它比原始 RPC 好用得多。
2. Ethers.js 解决“实时事件与链交互”
Ethers.js 可以:
- 通过 WebSocket Provider 实时监听事件
- 用 JSON-RPC 回查区块、交易、receipt
- 在掉线后按区块范围补扫日志
所以它很适合做“增量事件消费器”。
3. 中间件负责“统一出口 + 一致性控制”
中间件层通常负责:
- 合并 Graph 查询结果与实时缓存
- 给前端统一 REST/GraphQL 接口
- 做幂等、确认数控制、重试
- 在 The Graph 还没索引到最新块时,用实时数据补平“时间差”
这点很重要:
The Graph 不一定永远是最新块;实时监听也不适合做复杂历史查询。两者结合才完整。
用 The Graph 建立 ERC-20 Transfer 索引
这一部分我会用“最小可理解”的方式说明子图结构。你可以部署到自己的 Graph Node,也可以迁移到 Studio 风格环境。
1)subgraph.yaml
specVersion: 0.0.5
description: ERC20 Transfer Indexer
repository: https://example.com/erc20-transfer-indexer
schema:
file: ./schema.graphql
dataSources:
- kind: ethereum
name: ERC20Token
network: mainnet
source:
address: "0xYourTokenAddressHere"
abi: ERC20
startBlock: 18000000
mapping:
kind: ethereum/events
apiVersion: 0.0.7
language: wasm/assemblyscript
entities:
- TransferEvent
- Account
abis:
- name: ERC20
file: ./abis/ERC20.json
eventHandlers:
- event: Transfer(indexed address,indexed address,uint256)
handler: handleTransfer
file: ./src/mapping.ts
2)schema.graphql
type Account @entity {
id: ID!
sentTransfers: [TransferEvent!]! @derivedFrom(field: "from")
receivedTransfers: [TransferEvent!]! @derivedFrom(field: "to")
}
type TransferEvent @entity {
id: ID!
txHash: Bytes!
logIndex: BigInt!
blockNumber: BigInt!
timestamp: BigInt!
from: Account!
to: Account!
value: BigInt!
}
3)mapping.ts
import { Transfer } from "../generated/ERC20Token/ERC20"
import { Account, TransferEvent } from "../generated/schema"
function getOrCreateAccount(address: string): Account {
let account = Account.load(address)
if (account == null) {
account = new Account(address)
account.save()
}
return account as Account
}
export function handleTransfer(event: Transfer): void {
let fromId = event.params.from.toHexString()
let toId = event.params.to.toHexString()
getOrCreateAccount(fromId)
getOrCreateAccount(toId)
let entity = new TransferEvent(
event.transaction.hash.toHexString() + "-" + event.logIndex.toString()
)
entity.txHash = event.transaction.hash
entity.logIndex = event.logIndex
entity.blockNumber = event.block.number
entity.timestamp = event.block.timestamp
entity.from = fromId
entity.to = toId
entity.value = event.params.value
entity.save()
}
4)一个常用 GraphQL 查询
按地址查询最近转入转出记录:
query GetTransfers($account: String!, $first: Int!) {
sent: transferEvents(
first: $first
orderBy: timestamp
orderDirection: desc
where: { from: $account }
) {
id
txHash
blockNumber
timestamp
value
from { id }
to { id }
}
received: transferEvents(
first: $first
orderBy: timestamp
orderDirection: desc
where: { to: $account }
) {
id
txHash
blockNumber
timestamp
value
from { id }
to { id }
}
}
实战代码:构建统一查询与监听服务
下面开始写中间件代码。目标是实现两个能力:
GET /transfers/:address:从 The Graph 查询历史数据- 后台实时监听
Transfer,并在接口里补充最近尚未被索引的事件
第一步:配置文件
.env
PORT=3000
RPC_HTTP_URL=https://your-rpc-http-endpoint
RPC_WS_URL=wss://your-rpc-ws-endpoint
GRAPH_ENDPOINT=https://your-subgraph-endpoint
TOKEN_ADDRESS=0xYourTokenAddressHere
CONFIRMATIONS=6
src/config.js
import dotenv from "dotenv";
dotenv.config();
export const config = {
port: Number(process.env.PORT || 3000),
rpcHttpUrl: process.env.RPC_HTTP_URL,
rpcWsUrl: process.env.RPC_WS_URL,
graphEndpoint: process.env.GRAPH_ENDPOINT,
tokenAddress: process.env.TOKEN_ADDRESS,
confirmations: Number(process.env.CONFIRMATIONS || 6),
};
if (!config.rpcHttpUrl || !config.rpcWsUrl || !config.graphEndpoint || !config.tokenAddress) {
throw new Error("Missing required env configuration");
}
第二步:准备 ABI
src/abi/erc20.json
[
{
"anonymous": false,
"inputs": [
{ "indexed": true, "internalType": "address", "name": "from", "type": "address" },
{ "indexed": true, "internalType": "address", "name": "to", "type": "address" },
{ "indexed": false, "internalType": "uint256", "name": "value", "type": "uint256" }
],
"name": "Transfer",
"type": "event"
}
]
第三步:封装 The Graph 查询客户端
src/graphClient.js
import { GraphQLClient, gql } from "graphql-request";
import { config } from "./config.js";
const client = new GraphQLClient(config.graphEndpoint);
const GET_TRANSFERS = gql`
query GetTransfers($account: String!, $first: Int!) {
sent: transferEvents(
first: $first
orderBy: timestamp
orderDirection: desc
where: { from: $account }
) {
id
txHash
blockNumber
timestamp
value
from { id }
to { id }
}
received: transferEvents(
first: $first
orderBy: timestamp
orderDirection: desc
where: { to: $account }
) {
id
txHash
blockNumber
timestamp
value
from { id }
to { id }
}
}
`;
export async function getTransfersByAddress(address, first = 10) {
const account = address.toLowerCase();
const data = await client.request(GET_TRANSFERS, {
account,
first,
});
return data;
}
第四步:用 Ethers.js 做实时监听
这里我们维护一个内存缓冲区,存最近监听到的事件。
在实际生产里,你更可能把它写入 Redis、PostgreSQL 或消息队列。
src/listener.js
import { ethers } from "ethers";
import { config } from "./config.js";
import erc20Abi from "./abi/erc20.json" assert { type: "json" };
const wsProvider = new ethers.WebSocketProvider(config.rpcWsUrl);
const httpProvider = new ethers.JsonRpcProvider(config.rpcHttpUrl);
const contract = new ethers.Contract(config.tokenAddress, erc20Abi, wsProvider);
const recentEvents = [];
const MAX_BUFFER_SIZE = 500;
function normalizeEvent(log) {
return {
id: `${log.transactionHash}-${log.index}`,
txHash: log.transactionHash,
blockNumber: log.blockNumber,
timestamp: null,
from: log.args.from.toLowerCase(),
to: log.args.to.toLowerCase(),
value: log.args.value.toString(),
source: "realtime",
confirmed: false,
};
}
async function enrichTimestamp(event) {
const block = await httpProvider.getBlock(event.blockNumber);
return {
...event,
timestamp: block ? Number(block.timestamp) : null,
};
}
function pushRecentEvent(event) {
recentEvents.unshift(event);
if (recentEvents.length > MAX_BUFFER_SIZE) {
recentEvents.pop();
}
}
export function getRecentEventsByAddress(address) {
const target = address.toLowerCase();
return recentEvents.filter(
(e) => e.from === target || e.to === target
);
}
export function startListener() {
contract.on("Transfer", async (from, to, value, event) => {
try {
const normalized = normalizeEvent({
transactionHash: event.log.transactionHash,
index: event.log.index,
blockNumber: event.log.blockNumber,
args: { from, to, value },
});
const enriched = await enrichTimestamp(normalized);
pushRecentEvent(enriched);
console.log("[Transfer]", enriched);
} catch (err) {
console.error("Listener event handling error:", err);
}
});
wsProvider.websocket.on("close", (code) => {
console.error("WebSocket closed:", code);
});
wsProvider.websocket.on("error", (err) => {
console.error("WebSocket error:", err);
});
}
我第一次这么写监听器时,最容易忽略的是:监听到事件不代表它已经“最终确认”。
所以这里先把它标成confirmed: false,后面再补确认逻辑。
第五步:增加历史补偿扫描
如果 WebSocket 断了,或者服务重启,单靠 contract.on 是会漏事件的。
所以我们需要一个“补扫器”。
思路是:
- 记录上次成功处理到的区块
- 定时从
lastProcessedBlock + 1扫到latest - confirmations - 用
queryFilter或getLogs拉日志 - 做幂等去重后写入缓冲/存储
src/sync.js
import { ethers } from "ethers";
import { config } from "./config.js";
import erc20Abi from "./abi/erc20.json" assert { type: "json" };
const provider = new ethers.JsonRpcProvider(config.rpcHttpUrl);
const contract = new ethers.Contract(config.tokenAddress, erc20Abi, provider);
let lastProcessedBlock = 0;
const seenEventIds = new Set();
const syncedEvents = [];
const MAX_SYNCED_EVENTS = 1000;
function pushSyncedEvent(event) {
if (seenEventIds.has(event.id)) return;
seenEventIds.add(event.id);
syncedEvents.unshift(event);
if (syncedEvents.length > MAX_SYNCED_EVENTS) {
const removed = syncedEvents.pop();
if (removed) seenEventIds.delete(removed.id);
}
}
export function getSyncedEventsByAddress(address) {
const target = address.toLowerCase();
return syncedEvents.filter((e) => e.from === target || e.to === target);
}
export async function initSyncStartBlock() {
const latest = await provider.getBlockNumber();
lastProcessedBlock = Math.max(0, latest - config.confirmations - 100);
console.log("Initial sync start block:", lastProcessedBlock);
}
export async function syncMissedEvents() {
const latest = await provider.getBlockNumber();
const safeBlock = latest - config.confirmations;
if (safeBlock <= lastProcessedBlock) return;
const fromBlock = lastProcessedBlock + 1;
const toBlock = safeBlock;
console.log(`Syncing logs from ${fromBlock} to ${toBlock}`);
const logs = await contract.queryFilter("Transfer", fromBlock, toBlock);
for (const log of logs) {
const block = await provider.getBlock(log.blockNumber);
const event = {
id: `${log.transactionHash}-${log.index}`,
txHash: log.transactionHash,
blockNumber: log.blockNumber,
timestamp: block ? Number(block.timestamp) : null,
from: log.args.from.toLowerCase(),
to: log.args.to.toLowerCase(),
value: log.args.value.toString(),
source: "sync",
confirmed: true,
};
pushSyncedEvent(event);
}
lastProcessedBlock = toBlock;
}
第六步:统一 API 出口
现在把历史查询和实时/补偿事件合并。
src/app.js
import express from "express";
import { ethers } from "ethers";
import { config } from "./config.js";
import { getTransfersByAddress } from "./graphClient.js";
import { startListener, getRecentEventsByAddress } from "./listener.js";
import { initSyncStartBlock, syncMissedEvents, getSyncedEventsByAddress } from "./sync.js";
const app = express();
app.use(express.json());
function uniqueById(items) {
const map = new Map();
for (const item of items) {
map.set(item.id, item);
}
return Array.from(map.values());
}
function sortByTimestampDesc(items) {
return items.sort((a, b) => (b.timestamp || 0) - (a.timestamp || 0));
}
app.get("/health", async (req, res) => {
res.json({ ok: true });
});
app.get("/transfers/:address", async (req, res) => {
try {
const address = req.params.address;
if (!ethers.isAddress(address)) {
return res.status(400).json({ error: "Invalid address" });
}
const graphData = await getTransfersByAddress(address, 20);
const graphItems = [
...graphData.sent.map((x) => ({
id: x.id,
txHash: x.txHash,
blockNumber: Number(x.blockNumber),
timestamp: Number(x.timestamp),
from: x.from.id,
to: x.to.id,
value: x.value,
source: "graph",
confirmed: true,
})),
...graphData.received.map((x) => ({
id: x.id,
txHash: x.txHash,
blockNumber: Number(x.blockNumber),
timestamp: Number(x.timestamp),
from: x.from.id,
to: x.to.id,
value: x.value,
source: "graph",
confirmed: true,
})),
];
const realtimeItems = getRecentEventsByAddress(address);
const syncedItems = getSyncedEventsByAddress(address);
const merged = uniqueById([
...realtimeItems,
...syncedItems,
...graphItems,
]);
const result = sortByTimestampDesc(merged).slice(0, 30);
res.json({
address: address.toLowerCase(),
total: result.length,
items: result,
});
} catch (err) {
console.error(err);
res.status(500).json({
error: "Internal server error",
message: err.message,
});
}
});
async function main() {
await initSyncStartBlock();
startListener();
setInterval(async () => {
try {
await syncMissedEvents();
} catch (err) {
console.error("syncMissedEvents failed:", err);
}
}, 15000);
app.listen(config.port, () => {
console.log(`Server running at http://localhost:${config.port}`);
});
}
main().catch((err) => {
console.error("Application start failed:", err);
process.exit(1);
});
运行与验证
1)启动服务
如果你用 ES Module,记得在 package.json 里加:
{
"type": "module",
"scripts": {
"start": "node src/app.js"
}
}
然后运行:
npm start
2)访问健康检查
curl http://localhost:3000/health
3)查询某地址转账记录
curl http://localhost:3000/transfers/0xYourAddressHere
返回示例:
{
"address": "0xyouraddresshere",
"total": 12,
"items": [
{
"id": "0xabc-12",
"txHash": "0xabc",
"blockNumber": 18345678,
"timestamp": 1700000000,
"from": "0x111",
"to": "0xyouraddresshere",
"value": "1000000000000000000",
"source": "graph",
"confirmed": true
}
]
}
数据流与一致性关系图
你可以把整个过程理解成两条数据流:
sequenceDiagram
participant Chain as Blockchain
participant Graph as The Graph
participant Listener as Ethers Listener
participant Sync as Backfill Sync
participant API as Middleware API
Chain->>Graph: 产生 Transfer 日志
Graph->>API: 提供历史索引查询
Chain->>Listener: WebSocket 推送最新事件
Listener->>API: 写入实时缓冲
Chain->>Sync: HTTP RPC 按区块补扫
Sync->>API: 补齐漏掉的已确认事件
API->>API: 去重、排序、统一输出
这张图里最核心的一点是:
Graph 提供“好查”,Listener 提供“够快”,Sync 提供“补漏”。
逐步验证清单
如果你想确认这一套不是“代码看起来能跑,实际跑不稳”,建议按这个顺序验证:
验证 1:The Graph 索引是否正确
检查:
- 子图是否成功同步
TransferEvent数量是否增长- GraphQL 查询按地址是否能返回数据
id是否唯一(推荐txHash-logIndex)
验证 2:监听器是否收到新事件
检查:
- WebSocket 是否连接成功
- 新转账发生时,控制台是否打印
[Transfer] - 同一笔事件是否只进入一次缓冲区
验证 3:补偿扫描是否生效
手动停掉服务 1 分钟,再重启,检查:
- 断线期间发生的事件是否能通过
syncMissedEvents找回 confirmed字段是否正确lastProcessedBlock是否推进
验证 4:统一接口结果是否合理
重点看:
- Graph 和实时数据是否发生重复
- 排序是否按时间倒序
- 某些实时事件虽然 Graph 还没索引到,但接口已经能看见
常见坑与排查
这一部分非常重要,很多问题都不是“代码语法错”,而是链上数据系统天然复杂。
1. The Graph 查不到最新事件
现象:
- 链上已经发生转账
- Ethers.js 监听到了
- GraphQL 里还查不到
原因:
- 子图索引有延迟
- 节点同步还没追到该块
- 映射报错导致某段区块卡住
排查:
- 查看 subgraph 同步高度
- 检查 mapping 日志
- 对比链上最新块与索引块高度差
建议:
- 不要把 The Graph 当成“实时真相源”
- 最新数据用 Ethers.js 实时链路补齐
2. WebSocket 断连后漏事件
现象:
- 服务看起来还活着
- 但某段时间没有收到任何事件
- 事后发现漏了多笔链上日志
原因:
- provider 断连没有自动恢复
- 中间网络抖动
- 基础设施供应商主动断开闲置连接
排查:
- 打印
close/error日志 - 增加心跳和重连状态监控
- 检查监听器是否真的重新订阅成功
建议:
- 监听只负责“尽快收到”
- 不要用监听代替补偿扫描
- 补扫才是抗丢失的关键
3. 出现重复事件
现象:
- 同一笔转账在接口中出现两次甚至三次
原因:
- Graph、实时监听、补扫都拿到同一笔事件
- 去重键不稳定
- 重组导致事件先后出现不同状态
建议:
- 用
txHash + logIndex作为事件唯一 ID - 聚合层统一去重
- 给事件增加
source和confirmed字段,方便诊断
4. reorg 导致数据回滚或抖动
现象:
- 某事件刚出现,过一会儿又没了
- 块号、交易状态前后不一致
原因:
- 链发生短重组
- 你把未确认区块的数据直接当成最终结果了
建议:
- 业务层区分:
- 未确认事件
- 已确认事件
- 对资金类、记账类业务,至少等待若干确认数
- 对前端展示可先显示“pending/confirming”状态
5. queryFilter 扫大区间太慢
现象:
- 补偿扫描一次扫几百万块,直接超时
原因:
- 单次扫描区间过大
- RPC 服务商对日志查询有限流
建议:
- 按块范围分片扫描,比如每 2000~10000 块一段
- 热门合约要做好节流与断点续扫
- 必要时自建归档节点或专用索引服务
性能最佳实践
这一套服务跑到生产后,性能瓶颈通常不在“Node.js 算得慢”,而在RPC、索引延迟、数据合并策略。
1. 查询和监听分离部署
不要把所有逻辑塞到一个进程里。
推荐拆分为:
api-service:提供 HTTP 查询接口listener-service:实时监听并写缓存/队列sync-service:负责历史补偿和确认状态推进
这样做的好处:
- API 压力不会影响监听稳定性
- 监听挂了可以单独重启
- 补扫可以独立调度和水平扩展
2. 用 Redis 或数据库替代内存缓冲
本文为了易跑通用了内存数组,但生产里建议:
- Redis:适合最近事件缓存、去重键、游标
- PostgreSQL:适合查询、审计、业务对账
- Kafka/RabbitMQ:适合事件分发
3. 为 Graph 查询加缓存
像“某地址最近 20 笔转账”这种接口,很多前端会频繁刷新。
建议:
- 短 TTL 缓存 5~15 秒
- 对热门地址做缓存预热
- Graph endpoint 加限流和重试
4. 分页设计要稳定
不要只提供 page=1&page=2 这种传统分页。
链上数据持续增长,偏移分页很容易抖。
更稳妥的是:
- 按
timestamp + id - 或按
blockNumber + logIndex - 使用 cursor 分页
安全最佳实践
Web3 中间件除了性能,安全也很容易被忽略。
1. 不要信任外部输入的地址和查询参数
必须校验:
- 地址是否合法
first/limit是否超出上限- 块范围是否过大
例如接口参数建议限制为:
limit <= 100- 历史扫描区间单次不超过某个阈值
2. 不要把未确认事件直接驱动关键业务
比如:
- 资产入账
- 发积分
- 解锁权限
这些动作如果基于未确认事件,遇到 reorg 很麻烦。
建议做法:
- 监听到事件后先记为
pending - 达到确认数后再变成
confirmed - 关键业务只消费
confirmed
3. RPC 提供商要做多活
我个人很建议至少准备两个 RPC:
- 主 RPC
- 备用 RPC
因为链上服务最怕“单点基础设施故障”。
可进一步做:
- 读请求负载均衡
- 监听主链路 + 补偿备用链路
- 超时自动切换 provider
4. 防止日志风暴拖垮服务
热门合约在极端情况下会产生大量事件。
要避免:
- 每条事件都同步写数据库
- 每次都单独查一次区块时间戳
- 大量 console 输出阻塞
建议:
- 批量写入
- 做区块信息缓存
- 结构化日志 + 限流
状态管理建议图
如果你准备把它做成生产级服务,可以参考下面这种状态流转:
stateDiagram-v2
[*] --> Detected
Detected --> Pending: 监听到新事件
Pending --> Confirmed: 达到确认数
Pending --> Dropped: reorg 后事件消失
Confirmed --> Indexed: The Graph 可查询
Indexed --> [*]
Dropped --> [*]
这个模型的意义在于:
你不要假设一条链上事件从出现到稳定是一步到位的。
它往往要经历“检测到 -> 等确认 -> 被索引”的过程。
边界条件与方案取舍
这套方案很好用,但也不是万能的。
适合的场景
- 钱包、浏览器、后台管理系统
- 资产流水、事件看板、行为分析
- 需要“历史可查 + 最新可见”的应用
不太适合的场景
- 高频交易撮合核心路径
- 毫秒级实时风控
- 需要复杂多合约强一致关联计算、且完全依赖单一子图的系统
什么时候要升级架构
当你出现这些情况时,可以考虑进一步升级:
- 单合约监听扩展到几十上百个合约
- 事件量达到每分钟数十万
- 要支持多链统一索引
- 需要审计级可追溯数据存储
这时通常会演进到:
- The Graph + 自建 ETL
- 监听器 + MQ + 流式处理
- 冷热数据分层存储
- 多链统一事件规范化模型
总结
我们这篇文章搭了一套很实用的 Web3 中间件骨架:
- The Graph:负责历史索引和结构化查询
- Ethers.js:负责实时监听和链上补偿扫描
- Node.js 中间件:负责统一 API、去重、排序和确认状态整合
如果你只记住三个最关键的建议,我会建议是这三个:
- 不要只靠 RPC 查历史,也不要只靠 WebSocket 做实时
- 不要把 The Graph 当成实时源,要用监听链路补齐最新数据
- 不要忽略补偿扫描和确认数,它们决定这套系统稳不稳
最后给一个可执行落地建议:
- 如果你现在还在“前端直连 RPC 查转账记录”,先把历史查询迁到 The Graph
- 如果你已经有监听器,但经常漏事件,优先补上按区块范围的 backfill
- 如果你准备上生产,把内存缓冲换成 Redis/PostgreSQL,并记录游标、确认状态和去重键
这样做下来,你的链上数据服务通常就不再只是“能跑”,而是开始接近“可维护、可扩展、可上线”。