跳转到内容
123xiao | 无名键客

《Web3 中间件实战:用 The Graph 与 Ethers.js 构建可扩展的链上数据查询服务》

字数: 0 阅读时长: 1 分钟

Web3 中间件实战:用 The Graph 与 Ethers.js 构建可扩展的链上数据查询服务

在很多 Web3 项目里,真正把产品做“顺手”的,不只是合约本身,而是链上数据怎么被稳定、快速、可扩展地读出来

如果你直接拿 ethers.js 去扫链上历史事件,一开始会觉得很直接:连 RPC、读合约、拉事件、拼结果,几百行代码就能跑起来。但数据量一大,问题就来了:

  • 历史区块扫描很慢
  • RPC 频率限制容易触发
  • 分页、过滤、聚合都得自己做
  • 前端每次都直连链,体验不稳定
  • 重组(reorg)和索引一致性处理麻烦

这时候,The Graph 负责“索引”Ethers.js 负责“实时补充与链上校验”,是一个非常实用的组合:
前者擅长把事件数据整理成结构化查询接口,后者擅长做链上实时读取、状态校验和回退兜底。把两者放进一个中间件里,基本就是很多生产系统的常见做法。

这篇文章我会带你做一个完整的实战:构建一个ERC-20 Transfer 查询服务,支持:

  • 用 The Graph 查询历史转账记录
  • 用 Ethers.js 补充最新链上余额
  • 通过 Node.js 暴露统一 API
  • 具备基础的错误处理、性能优化和排查思路

我会尽量按“带你做一遍”的方式来写,而不是只讲概念。


背景与问题

假设你要做一个钱包、资产看板或者链上分析后台,用户会提这样的需求:

  1. 查询某地址最近的转账记录
  2. 查询某代币在某账户上的余额
  3. 支持按区块范围、时间范围、代币地址过滤
  4. 尽可能快,最好前端一打开页面就有结果

如果完全依赖 ethers.js

  • provider.getLogs() 在大范围区块查询时容易慢
  • 不同 RPC 节点对日志返回条数有限制
  • 你需要自己维护事件解析、分页游标、排序逻辑
  • 复杂查询(比如“某地址既是 from 又是 to 的记录”)会很痛苦

如果完全依赖 The Graph:

  • 子图索引有延迟,不一定是最新块
  • 某些动态链上状态(如 balanceOf)最好直接读链
  • 子图 schema 设计不当,后期改动成本大

所以更合理的思路是:

  • 历史数据、可检索数据:交给 The Graph
  • 实时状态、强一致读取:交给 Ethers.js
  • 统一服务出口:用 Node.js 中间层对外提供 API

这层中间件的价值,在于把“索引查询”和“链上直读”隔离开,让前端和业务代码不必知道底层差异。


核心原理

先看整体结构。

flowchart LR
    A[Smart Contract / ERC20] --> B[Blockchain Events]
    B --> C[The Graph Indexer]
    C --> D[GraphQL Query API]
    A --> E[Ethers.js RPC Reader]
    D --> F[Node.js Middleware]
    E --> F
    F --> G[Frontend / Backend Consumer]

这个架构里有三个关键角色:

1. The Graph:做“离线整理后的可查询索引”

The Graph 通过监听链上事件,把事件转换成你定义好的实体,例如:

  • Transfer
  • Account
  • Token

然后暴露 GraphQL API,让你可以像查数据库一样查链上历史数据。

这特别适合:

  • 历史转账记录
  • 排序和分页
  • 复杂过滤
  • 聚合前的数据准备

2. Ethers.js:做“最新状态读取”和“校验”

Ethers.js 直接连 RPC 节点,适合读取:

  • balanceOf
  • symbol
  • decimals
  • totalSupply
  • 最新块高
  • 某一笔交易收据

也适合做兜底逻辑,例如:

  • The Graph 暂时没同步到最新块
  • 需要校验某条记录对应交易是否真的上链
  • 需要读取合约当前实时状态

3. Node.js 中间件:做“统一出口”

中间件负责:

  • 封装 GraphQL 查询
  • 封装链上读取
  • 合并与格式化返回
  • 缓存热点数据
  • 对错误做降级处理

这层很关键,因为它让前端不必同时处理 GraphQL 和 RPC 两套逻辑。


前置知识与环境准备

为了能直接跟着做,先准备下面这些东西:

  • Node.js 18+
  • npm 或 pnpm
  • 一个可访问的以太坊 RPC URL
  • 一个可用的 The Graph 子图接口
  • 基础了解:
    • Solidity 事件
    • ERC-20 标准
    • GraphQL 基础查询
    • JavaScript/Node.js 异步编程

本文示例会假设你已经有一个可查询 ERC-20 Transfer 的子图。
如果你自己维护子图,后面我也会给出最小 schema 和 mapping 示例。

初始化项目:

mkdir web3-middleware-demo
cd web3-middleware-demo
npm init -y
npm install express ethers graphql-request dotenv

项目结构先保持简单:

.
├── src
   ├── app.js
   ├── graphClient.js
   ├── chainClient.js
   └── service.js
├── .env
└── package.json

.env 示例:

PORT=3000
RPC_URL=https://mainnet.infura.io/v3/YOUR_KEY
GRAPH_URL=https://api.thegraph.com/subgraphs/name/YOUR_SUBGRAPH
TOKEN_ADDRESS=0x0000000000000000000000000000000000000000

核心原理拆解:一次查询是怎么流动的

这里用一个时序图看得更清楚。

sequenceDiagram
    participant U as User
    participant API as Node Middleware
    participant G as The Graph
    participant R as Ethereum RPC
    participant T as ERC20 Contract

    U->>API: GET /address/:account/transfers
    API->>G: GraphQL 查询历史 Transfer
    G-->>API: 返回结构化历史记录
    API->>R: 读取最新区块 / balanceOf
    R->>T: eth_call
    T-->>R: 返回余额
    R-->>API: 返回链上实时数据
    API-->>U: 合并后的统一响应

这个流程里最重要的设计点有两个:

  1. 历史查询和实时读取职责分离
  2. API 返回统一格式,屏蔽底层来源差异

这样你后续想替换成别的索引器,或者增加缓存层,都不会影响调用方。


一个最小可用的子图设计

如果你还没有子图,先看一个最小模型。我们只关心 ERC-20 的转账事件。

GraphQL Schema

type Transfer @entity(immutable: true) {
  id: ID!
  from: Bytes!
  to: Bytes!
  value: BigInt!
  blockNumber: BigInt!
  blockTimestamp: BigInt!
  transactionHash: Bytes!
}

type Account @entity {
  id: ID!
  sentTransfers: [Transfer!]! @derivedFrom(field: "from")
  receivedTransfers: [Transfer!]! @derivedFrom(field: "to")
}

Subgraph 事件配置示意

specVersion: 0.0.5
schema:
  file: ./schema.graphql

dataSources:
  - kind: ethereum
    name: ERC20
    network: mainnet
    source:
      address: "0xYourTokenAddress"
      abi: ERC20
      startBlock: 18000000
    mapping:
      kind: ethereum/events
      apiVersion: 0.0.7
      language: wasm/assemblyscript
      entities:
        - Transfer
        - Account
      abis:
        - name: ERC20
          file: ./abis/ERC20.json
      eventHandlers:
        - event: Transfer(indexed address,indexed address,uint256)
          handler: handleTransfer
      file: ./src/mapping.ts

Mapping 示例

import { Transfer as TransferEvent } from "../generated/ERC20/ERC20"
import { Transfer, Account } from "../generated/schema"

export function handleTransfer(event: TransferEvent): void {
  let transfer = new Transfer(
    event.transaction.hash.toHexString() + "-" + event.logIndex.toString()
  )

  transfer.from = event.params.from
  transfer.to = event.params.to
  transfer.value = event.params.value
  transfer.blockNumber = event.block.number
  transfer.blockTimestamp = event.block.timestamp
  transfer.transactionHash = event.transaction.hash
  transfer.save()

  let fromAccount = Account.load(event.params.from.toHexString())
  if (fromAccount == null) {
    fromAccount = new Account(event.params.from.toHexString())
    fromAccount.save()
  }

  let toAccount = Account.load(event.params.to.toHexString())
  if (toAccount == null) {
    toAccount = new Account(event.params.to.toHexString())
    toAccount.save()
  }
}

上面这个模型非常朴素,但已经足够支撑很多中间件场景。
生产里如果要做多 token、多链、价格关联、用户画像,schema 会更复杂,但思路一样。


实战代码(可运行)

下面开始搭建 Node.js 中间件。目标很明确:

  • 提供 GET /address/:account/transfers
  • 返回某地址的历史转账 + 实时余额
  • 数据源来自 The Graph 和 Ethers.js

1)封装 Graph 查询客户端

src/graphClient.js

const { GraphQLClient, gql } = require("graphql-request");

const graphClient = new GraphQLClient(process.env.GRAPH_URL);

async function queryTransfersByAccount(account, first = 10, skip = 0) {
  const normalized = account.toLowerCase();

  const query = gql`
    query GetTransfers($account: Bytes!, $first: Int!, $skip: Int!) {
      sent: transfers(
        where: { from: $account }
        first: $first
        skip: $skip
        orderBy: blockTimestamp
        orderDirection: desc
      ) {
        id
        from
        to
        value
        blockNumber
        blockTimestamp
        transactionHash
      }
      received: transfers(
        where: { to: $account }
        first: $first
        skip: $skip
        orderBy: blockTimestamp
        orderDirection: desc
      ) {
        id
        from
        to
        value
        blockNumber
        blockTimestamp
        transactionHash
      }
    }
  `;

  return graphClient.request(query, {
    account: normalized,
    first,
    skip,
  });
}

module.exports = {
  queryTransfersByAccount,
};

这里用了两个查询字段:

  • sent
  • received

这样我们就能把“作为发送方”和“作为接收方”的记录一起拿回来,再在服务层统一排序。


2)封装 Ethers.js 链上读取

src/chainClient.js

const { ethers } = require("ethers");

const provider = new ethers.JsonRpcProvider(process.env.RPC_URL);

const erc20Abi = [
  "function balanceOf(address owner) view returns (uint256)",
  "function symbol() view returns (string)",
  "function decimals() view returns (uint8)",
];

function getTokenContract() {
  return new ethers.Contract(process.env.TOKEN_ADDRESS, erc20Abi, provider);
}

async function getTokenBalance(account) {
  const contract = getTokenContract();

  const [balance, symbol, decimals, blockNumber] = await Promise.all([
    contract.balanceOf(account),
    contract.symbol(),
    contract.decimals(),
    provider.getBlockNumber(),
  ]);

  return {
    raw: balance.toString(),
    formatted: ethers.formatUnits(balance, decimals),
    symbol,
    decimals,
    blockNumber,
  };
}

module.exports = {
  getTokenBalance,
  provider,
};

这里我用了 Promise.all 并发读取,这个小优化很值,尤其是你要同时读多个链上字段的时候。


3)服务层:合并、排序、格式化

src/service.js

const { queryTransfersByAccount } = require("./graphClient");
const { getTokenBalance } = require("./chainClient");

function normalizeTransfer(item, direction) {
  return {
    id: item.id,
    direction,
    from: item.from,
    to: item.to,
    value: item.value,
    blockNumber: Number(item.blockNumber),
    blockTimestamp: Number(item.blockTimestamp),
    transactionHash: item.transactionHash,
  };
}

async function getAddressDashboard(account, first = 10, skip = 0) {
  const [graphData, balanceData] = await Promise.all([
    queryTransfersByAccount(account, first, skip),
    getTokenBalance(account),
  ]);

  const transfers = [
    ...graphData.sent.map((item) => normalizeTransfer(item, "sent")),
    ...graphData.received.map((item) => normalizeTransfer(item, "received")),
  ].sort((a, b) => b.blockTimestamp - a.blockTimestamp);

  return {
    account,
    balance: balanceData,
    transfers,
    meta: {
      source: {
        history: "the-graph",
        balance: "ethers-rpc",
      },
      count: transfers.length,
    },
  };
}

module.exports = {
  getAddressDashboard,
};

这一步就是中间件最核心的价值:把不同来源的数据揉成一个业务友好的对象


4)暴露 API

src/app.js

require("dotenv").config();
const express = require("express");
const { ethers } = require("ethers");
const { getAddressDashboard } = require("./service");

const app = express();
const port = process.env.PORT || 3000;

app.get("/health", (req, res) => {
  res.json({ ok: true });
});

app.get("/address/:account/transfers", async (req, res) => {
  try {
    const { account } = req.params;
    const first = Number(req.query.first || 10);
    const skip = Number(req.query.skip || 0);

    if (!ethers.isAddress(account)) {
      return res.status(400).json({ error: "invalid address" });
    }

    const data = await getAddressDashboard(account, first, skip);
    res.json(data);
  } catch (error) {
    console.error("API error:", error);
    res.status(500).json({
      error: "internal server error",
      message: error.message,
    });
  }
});

app.listen(port, () => {
  console.log(`Server running at http://localhost:${port}`);
});

运行:

node src/app.js

测试:

curl "http://localhost:3000/address/0x0000000000000000000000000000000000000000/transfers?first=5&skip=0"

一次结果应该长什么样

一个典型返回可能是这样:

{
  "account": "0x0000000000000000000000000000000000000000",
  "balance": {
    "raw": "123450000000000000000",
    "formatted": "123.45",
    "symbol": "TKN",
    "decimals": 18,
    "blockNumber": 20500001
  },
  "transfers": [
    {
      "id": "0xabc-1",
      "direction": "received",
      "from": "0x111",
      "to": "0x0000000000000000000000000000000000000000",
      "value": "1000000000000000000",
      "blockNumber": 20499990,
      "blockTimestamp": 1720000000,
      "transactionHash": "0xabc"
    }
  ],
  "meta": {
    "source": {
      "history": "the-graph",
      "balance": "ethers-rpc"
    },
    "count": 1
  }
}

这类格式前端一般很好接,因为:

  • transfers 已经是统一列表
  • direction 已经标注
  • balance 已经格式化
  • meta 明确说明了数据来源

逐步验证清单

如果你是边学边做,建议按这个顺序验证,不要一上来就怀疑“是不是 The Graph 有问题”。

第一步:验证 RPC 通不通

单独测试 ethers.js

const { ethers } = require("ethers");

async function main() {
  const provider = new ethers.JsonRpcProvider(process.env.RPC_URL);
  const block = await provider.getBlockNumber();
  console.log("latest block:", block);
}

main().catch(console.error);

如果这一步都不通,先别看子图。


第二步:验证 GraphQL 查询能返回

直接写一个最小测试:

require("dotenv").config();
const { GraphQLClient, gql } = require("graphql-request");

async function main() {
  const client = new GraphQLClient(process.env.GRAPH_URL);

  const query = gql`
    query {
      transfers(first: 1) {
        id
        from
        to
        value
      }
    }
  `;

  const data = await client.request(query);
  console.log(JSON.stringify(data, null, 2));
}

main().catch(console.error);

如果这里报错,优先检查:

  • GRAPH_URL 是否正确
  • schema 里实体名是不是 transfers
  • where 参数里的字段类型是否匹配

第三步:验证合并逻辑

只测 service 层:

require("dotenv").config();
const { getAddressDashboard } = require("./src/service");

async function main() {
  const account = "0x0000000000000000000000000000000000000000";
  const data = await getAddressDashboard(account, 5, 0);
  console.log(JSON.stringify(data, null, 2));
}

main().catch(console.error);

常见坑与排查

这一部分我尽量写得接地气一点,因为这些问题真的是最容易把时间耗光的地方。

坑 1:The Graph 查不到数据,但链上明明有交易

常见原因

  • 子图的 startBlock 设得太晚
  • 合约地址配置错了
  • 事件签名写错
  • 子图还没同步到目标区块
  • 查询的地址大小写或格式不一致

排查方式

先查子图最新同步块高。不同部署环境方式略有不同,但核心思路是确认:

  • 当前索引到了哪个区块
  • 目标交易发生在哪个区块

如果目标交易区块 > 子图当前同步区块,那不是没数据,是还没同步到


坑 2:GraphQL 查询字段类型不匹配

比如 Bytes!BigInt! 这些类型,如果你用错变量类型,会直接报错。

例子

错误写法:

query GetTransfers($account: String!) {
  transfers(where: { from: $account }) {
    id
  }
}

正确写法:

query GetTransfers($account: Bytes!) {
  transfers(where: { from: $account }) {
    id
  }
}

经验建议

我自己一般会把 GraphQL 查询先在 Playground 里跑通,再搬到代码里,不然变量和 schema 对不上时很烦。


坑 3:Ethers v5 和 v6 API 混用

这几年很多示例代码还停留在 v5,但新项目往往装的是 v6。
两者在不少细节上不一样,比如:

  • ethers.providers.JsonRpcProvider vs ethers.JsonRpcProvider
  • utils.formatUnits vs ethers.formatUnits

如果你复制示例时没注意版本,很容易报 undefined

建议

确认本地版本:

npm list ethers

然后统一按一个版本写,不要混搭。


坑 4:分页逻辑不稳定

The Graph 常见分页方式是 first + skip,在数据量很大时,skip 越大越慢,且数据变动时可能出现重复或漏项。

建议

中小规模可以先用 skip
如果你的数据量非常大,建议改成基于游标或基于字段范围分页,比如按:

  • blockTimestamp_lt
  • id_gt

来做增量翻页。


坑 5:历史记录和实时余额对不上

这很正常,不一定是 bug。

可能原因

  • The Graph 还没同步到最新块
  • 余额是实时读链,转账列表是历史索引
  • 链发生了短暂 reorg
  • 合约存在 mint/burn 等非普通 transfer 逻辑

处理建议

在 API 返回里明确标注:

  • 历史数据来源
  • 实时余额来源
  • 对应块高

这样前端和调用方就知道这个“不一致”是可解释的,而不是黑盒错误。


安全/性能最佳实践

到这里功能已经能跑了,但如果想真正拿去做生产中间件,下面这些点非常关键。

1. 不要让前端直接拼 GraphQL 查询

很多人为了省事,前端直连 The Graph。能不能做?能。
但一旦查询复杂、需要鉴权、需要合并链上数据,最好还是走中间件。

原因很简单:

  • 避免暴露底层接口细节
  • 方便做缓存、限流、审计
  • 方便热修复查询逻辑
  • 统一错误格式

2. 对热点读取做缓存

例如:

  • symbol
  • decimals
  • 某地址首页数据
  • 最近区块范围内的热门代币转账

最简单可以先做进程内缓存,生产里再接 Redis。

缓存策略建议:

  • 代币元信息:长 TTL
  • 余额:短 TTL,5~15 秒
  • 历史转账:按查询参数缓存 15~60 秒

3. 控制查询规模

不要让调用方随意传一个 first=10000
这是很典型的中间件自杀式接口设计。

建议限制:

  • first 最大 100 或 200
  • skip 最大某个阈值
  • 地址参数必须校验
  • 可选加 API Key 和 rate limit

4. 明确处理 reorg 与最终一致性

链上系统不是传统数据库,没有“刚写完就永远稳定”的保证。
尤其是新区块附近的数据,可能受重组影响。

实践建议

  • 历史查询尽量以“已确认块”为准
  • 对最新几个块的数据做“软展示”
  • 返回 indexedBlocklatestRpcBlock
  • 对关键业务增加二次校验

5. 批量化与并发控制

如果一个页面要查 100 个地址余额,不要简单 Promise.all(100个RPC) 就结束了。
这样很可能把自己的 RPC 打爆。

更好的做法

  • 做并发池
  • 批量聚合请求
  • 热点地址缓存
  • 尽可能减少重复 symbol/decimals 读取

6. 错误降级

一个成熟的中间件,不应该因为 The Graph 暂时超时,就整个接口直接不可用。

建议的降级策略

  • The Graph 超时:返回余额 + 空历史记录
  • RPC 超时:返回历史记录 + 标记余额不可用
  • 两者都失败:返回统一错误码和可观测日志

可以把可用性优先级设计成这样:

stateDiagram-v2
    [*] --> FullData
    FullData --> HistoryOnly: RPC timeout
    FullData --> BalanceOnly: Graph timeout
    HistoryOnly --> PartialResponse
    BalanceOnly --> PartialResponse
    PartialResponse --> [*]

这类降级设计非常实用,尤其在公共 RPC 或第三方索引服务波动时。


进阶扩展:从 Demo 到可扩展服务

如果你要把这个方案继续做大,可以往这几个方向演进。

多链支持

抽象一个 chainId 参数:

  • 不同链使用不同 RPC URL
  • 不同链使用不同子图 endpoint
  • 中间件统一输出格式

这样一个 API 就能支持 Ethereum、Arbitrum、Polygon 等网络。


多合约支持

如果不是只查一个 token,而是多个 token,可以把 TOKEN_ADDRESS 从环境变量变成接口参数,但一定要加白名单,不要允许用户任意打未知合约。


丰富查询能力

你可以继续扩展:

  • 按时间范围查询
  • 按 token 查询
  • 按方向过滤 sent/received
  • 返回交易状态与 gas 信息
  • 聚合每日转账量

这些都适合放在中间件里,而不是让前端自己拼。


一个更稳妥的接口设计建议

真实项目里,我更推荐把接口拆成两个:

  1. GET /address/:account/transfers
  2. GET /address/:account/balance

然后再加一个聚合接口:

  1. GET /address/:account/dashboard

原因是:

  • 单一接口更容易缓存
  • 排查问题时更容易定位到底是 Graph 挂了还是 RPC 挂了
  • 聚合接口可以复用底层服务函数

也就是说,聚合是面向产品的,拆分是面向工程的


总结

如果把这篇文章压缩成一句话,那就是:

The Graph 负责把链上历史事件“整理成可查数据”,Ethers.js 负责把链上实时状态“直接读出来”,Node.js 中间件负责把两者变成一个稳定、可扩展的服务接口。

本文我们做了这些事:

  • 明确了为什么不能只靠 RPC 扫链
  • 梳理了 The Graph + Ethers.js 的职责边界
  • 给出了一个最小可用的子图模型
  • 实现了可运行的 Node.js 查询中间件
  • 讨论了分页、同步延迟、版本差异、reorg 等常见坑
  • 给出了缓存、限流、降级、并发控制等生产建议

如果你准备把它落到项目里,我建议按下面的优先级推进:

  1. 先做最小可用版本:历史转账 + 实时余额
  2. 再补工程能力:缓存、限流、日志
  3. 最后做扩展性:多链、多 token、复杂筛选

边界条件也要记住:

  • 如果你追求绝对最新状态,不能只依赖 The Graph
  • 如果你需要复杂历史检索,不要只靠 Ethers.js 扫链
  • 如果你要给前端稳定体验,最好一定有中间件层

这套方案并不神奇,但非常务实。很多时候,系统能不能稳定跑,不在于用了多新的技术,而在于你有没有把“索引”和“实时读取”这两件事分清楚。


分享到:

上一篇
《从 Prompt 到 Workflow:面向中级开发者的 AI Agent 实战设计与落地指南》
下一篇
《区块链中间件实战:基于事件索引与智能合约日志构建高可用链上数据服务》