Skip to content

Build an Indexer

View as Markdown

Use this guide to go from zero to a production-oriented Thru indexer. It starts with the smallest working setup and then walks through the concerns you will need for a real backend service.

  • Node 20+ and TypeScript
  • PostgreSQL
  • Drizzle for schema and migrations
Terminal window
pnpm add @thru/indexer @thru/replay @thru/sdk @thru/programs postgres drizzle-orm
pnpm add -D drizzle-kit tsx typescript

Step 1: Define A Token Transfer Event Stream

Section titled “Step 1: Define A Token Transfer Event Stream”

Use the token ABI type to decode the token program event payload, then return one row per matching transfer.

import { create } from "@bufbuild/protobuf";
import { decodeAddress, encodeAddress, encodeSignature } from "@thru/sdk/helpers";
import { defineEventStream, t } from "@thru/indexer";
import { FilterParamValueSchema, FilterSchema, type Event } from "@thru/replay";
import { TokenEvent } from "./abi/thru/program/token/types";
const TOKEN_PROGRAM = process.env.TOKEN_PROGRAM_ID!;
const tokenTransfers = defineEventStream({
name: "token-transfers",
description: "Transfer events emitted by the token program",
schema: {
id: t.text().primaryKey(),
slot: t.bigint().notNull().index(),
txnSignature: t.text().notNull(),
source: t.text().notNull().index(),
dest: t.text().notNull().index(),
amount: t.bigint().notNull(),
indexedAt: t.timestamp().notNull().defaultNow(),
},
filterFactory: () => {
const programBytes = new Uint8Array(decodeAddress(TOKEN_PROGRAM));
return create(FilterSchema, {
expression: "event.program.value == params.address",
params: {
address: create(FilterParamValueSchema, {
kind: { case: "bytesValue", value: programBytes },
}),
},
});
},
parse: (event: Event) => {
if (!event.payload || event.slot === undefined) return null;
const envelope = TokenEvent.from_array(event.payload);
const transfer = envelope?.payload()?.asTransfer();
if (!transfer) return null;
return {
id: event.eventId,
slot: event.slot,
txnSignature: encodeSignature(event.transactionSignature?.value ?? new Uint8Array()),
source: encodeAddress(new Uint8Array(transfer.source.get_bytes())),
dest: encodeAddress(new Uint8Array(transfer.dest.get_bytes())),
amount: transfer.amount,
indexedAt: new Date(),
};
},
});
export const tokenTransferEvents = tokenTransfers.table;
export default tokenTransfers;

For token account state inside the indexer runtime, decode the raw account bytes directly with the token program account type.

import { decodeAddress, encodeAddress } from "@thru/sdk/helpers";
import { defineAccountStream, t } from "@thru/indexer";
import { TokenAccount } from "@thru/programs/token";
const TOKEN_PROGRAM = process.env.TOKEN_PROGRAM_ID!;
const TOKEN_ACCOUNT_SIZE = 73;
const tokenAccounts = defineAccountStream({
name: "token-accounts",
description: "Latest token account balances by address",
ownerProgramFactory: () => new Uint8Array(decodeAddress(TOKEN_PROGRAM)),
expectedSize: TOKEN_ACCOUNT_SIZE,
schema: {
address: t.text().primaryKey(),
mint: t.text().notNull().index(),
owner: t.text().notNull().index(),
amount: t.bigint().notNull(),
isFrozen: t.boolean().notNull(),
slot: t.bigint().notNull(),
seq: t.bigint().notNull(),
updatedAt: t.timestamp().notNull().defaultNow(),
},
parse: (account) => {
if (account.data.length !== TOKEN_ACCOUNT_SIZE) return null;
const parsed = TokenAccount.from_array(account.data);
if (!parsed) return null;
return {
address: encodeAddress(account.address),
mint: encodeAddress(new Uint8Array(parsed.mint.get_bytes())),
owner: encodeAddress(new Uint8Array(parsed.owner.get_bytes())),
amount: parsed.amount,
isFrozen: parsed.is_frozen !== 0,
slot: account.slot,
seq: account.seq,
updatedAt: new Date(),
};
},
});
export const tokenAccountsTable = tokenAccounts.table;
export default tokenAccounts;
export { checkpointTable } from "@thru/indexer";
export { tokenAccountsTable } from "./account-streams/token-accounts";
export { tokenTransferEvents } from "./streams/token-transfers";

Use those exports as your Drizzle schema input.

import { defineConfig } from "drizzle-kit";
export default defineConfig({
schema: "./src/schema.ts",
out: "./drizzle",
dialect: "postgresql",
dbCredentials: {
url: process.env.DATABASE_URL!,
},
});
import { Indexer } from "@thru/indexer";
import { ChainClient } from "@thru/replay";
import tokenAccounts from "./account-streams/token-accounts";
import { db } from "./db";
import tokenTransfers from "./streams/token-transfers";
const indexer = new Indexer({
db,
clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }),
eventStreams: [tokenTransfers],
accountStreams: [tokenAccounts],
defaultStartSlot: 0n,
safetyMargin: 64,
pageSize: 512,
logLevel: "info",
});
await indexer.start();

After this setup:

  • token_transfer_events stores immutable token transfer rows
  • token_accounts stores the latest token account state by address
  • indexer_checkpoints tracks resumable progress per stream

The steps above give you a working indexer. The sections below cover what you need when you move that indexer into a real backend service.

In production, run the indexer worker and the API server as separate processes. This keeps write-heavy backfill work from competing with read traffic, and lets you scale or restart each side independently.

A common layout:

src/
streams/ # shared stream definitions
account-streams/ # shared account stream definitions
schema.ts # shared Drizzle schema exports
db.ts # shared database client
worker.ts # indexer process, runs Indexer.start()
queries/ # app-owned Drizzle query helpers
api.ts # app-owned API process, if your backend exposes one

The worker owns the Indexer instance and calls indexer.start(). It writes rows and updates checkpoints.

import { Indexer } from "@thru/indexer";
import { ChainClient } from "@thru/replay";
import tokenAccounts from "./account-streams/token-accounts";
import { db } from "./db";
import tokenTransfers from "./streams/token-transfers";
const indexer = new Indexer({
db,
clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }),
eventStreams: [tokenTransfers],
accountStreams: [tokenAccounts],
});
process.on("SIGINT", () => indexer.stop());
process.on("SIGTERM", () => indexer.stop());
await indexer.start();

The API process should query the Drizzle tables directly and own its response shape.

import { desc, eq } from "drizzle-orm";
import { db } from "./db";
import { tokenAccountsTable, tokenTransferEvents } from "./schema";
export async function getRecentTransfersForOwner(owner: string) {
const accounts = await db
.select()
.from(tokenAccountsTable)
.where(eq(tokenAccountsTable.owner, owner));
const accountAddresses = new Set(accounts.map((account) => account.address));
const transfers = await db
.select()
.from(tokenTransferEvents)
.orderBy(desc(tokenTransferEvents.slot))
.limit(200);
return transfers.filter(
(transfer) =>
accountAddresses.has(transfer.source) ||
accountAddresses.has(transfer.dest)
);
}

The checkpoint table records progress per stream. On restart, the runtime resumes from the last stored checkpoint instead of replaying from the default start slot.

Use a conservative safetyMargin so the worker stays behind the live tip during backfill-to-live handoff.

Enable runtime validation in development when you want parse outputs checked against generated schemas:

const indexer = new Indexer({
db,
clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }),
eventStreams: [tokenTransfers],
accountStreams: [tokenAccounts],
validateParse: process.env.NODE_ENV !== "production",
});

Because indexed data lands in ordinary Drizzle tables, you can query it with Drizzle or raw SQL through your database layer for analytics, reporting, joins, and aggregates.