Build an Indexer
Use this guide to go from zero to a production-oriented Thru indexer. It starts with the smallest working setup and then walks through the concerns you will need for a real backend service.
Prerequisites
Section titled “Prerequisites”- Node 20+ and TypeScript
- PostgreSQL
- Drizzle for schema and migrations
Install
Section titled “Install”pnpm add @thru/indexer @thru/replay @thru/sdk @thru/programs postgres drizzle-ormpnpm add -D drizzle-kit tsx typescriptStep 1: Define A Token Transfer Event Stream
Section titled “Step 1: Define A Token Transfer Event Stream”Use the token ABI type to decode the token program event payload, then return one row per matching transfer.
import { create } from "@bufbuild/protobuf";import { decodeAddress, encodeAddress, encodeSignature } from "@thru/sdk/helpers";import { defineEventStream, t } from "@thru/indexer";import { FilterParamValueSchema, FilterSchema, type Event } from "@thru/replay";import { TokenEvent } from "./abi/thru/program/token/types";
const TOKEN_PROGRAM = process.env.TOKEN_PROGRAM_ID!;
const tokenTransfers = defineEventStream({ name: "token-transfers", description: "Transfer events emitted by the token program",
schema: { id: t.text().primaryKey(), slot: t.bigint().notNull().index(), txnSignature: t.text().notNull(), source: t.text().notNull().index(), dest: t.text().notNull().index(), amount: t.bigint().notNull(), indexedAt: t.timestamp().notNull().defaultNow(), },
filterFactory: () => { const programBytes = new Uint8Array(decodeAddress(TOKEN_PROGRAM)); return create(FilterSchema, { expression: "event.program.value == params.address", params: { address: create(FilterParamValueSchema, { kind: { case: "bytesValue", value: programBytes }, }), }, }); },
parse: (event: Event) => { if (!event.payload || event.slot === undefined) return null;
const envelope = TokenEvent.from_array(event.payload); const transfer = envelope?.payload()?.asTransfer(); if (!transfer) return null;
return { id: event.eventId, slot: event.slot, txnSignature: encodeSignature(event.transactionSignature?.value ?? new Uint8Array()), source: encodeAddress(new Uint8Array(transfer.source.get_bytes())), dest: encodeAddress(new Uint8Array(transfer.dest.get_bytes())), amount: transfer.amount, indexedAt: new Date(), }; },});
export const tokenTransferEvents = tokenTransfers.table;export default tokenTransfers;Step 2: Define A Token Account Stream
Section titled “Step 2: Define A Token Account Stream”For token account state inside the indexer runtime, decode the raw account bytes directly with the token program account type.
import { decodeAddress, encodeAddress } from "@thru/sdk/helpers";import { defineAccountStream, t } from "@thru/indexer";import { TokenAccount } from "@thru/programs/token";
const TOKEN_PROGRAM = process.env.TOKEN_PROGRAM_ID!;const TOKEN_ACCOUNT_SIZE = 73;
const tokenAccounts = defineAccountStream({ name: "token-accounts", description: "Latest token account balances by address",
ownerProgramFactory: () => new Uint8Array(decodeAddress(TOKEN_PROGRAM)), expectedSize: TOKEN_ACCOUNT_SIZE,
schema: { address: t.text().primaryKey(), mint: t.text().notNull().index(), owner: t.text().notNull().index(), amount: t.bigint().notNull(), isFrozen: t.boolean().notNull(), slot: t.bigint().notNull(), seq: t.bigint().notNull(), updatedAt: t.timestamp().notNull().defaultNow(), },
parse: (account) => { if (account.data.length !== TOKEN_ACCOUNT_SIZE) return null;
const parsed = TokenAccount.from_array(account.data); if (!parsed) return null;
return { address: encodeAddress(account.address), mint: encodeAddress(new Uint8Array(parsed.mint.get_bytes())), owner: encodeAddress(new Uint8Array(parsed.owner.get_bytes())), amount: parsed.amount, isFrozen: parsed.is_frozen !== 0, slot: account.slot, seq: account.seq, updatedAt: new Date(), }; },});
export const tokenAccountsTable = tokenAccounts.table;export default tokenAccounts;Step 3: Export The Tables For Drizzle
Section titled “Step 3: Export The Tables For Drizzle”export { checkpointTable } from "@thru/indexer";export { tokenAccountsTable } from "./account-streams/token-accounts";export { tokenTransferEvents } from "./streams/token-transfers";Use those exports as your Drizzle schema input.
import { defineConfig } from "drizzle-kit";
export default defineConfig({ schema: "./src/schema.ts", out: "./drizzle", dialect: "postgresql", dbCredentials: { url: process.env.DATABASE_URL!, },});Step 4: Create The Indexer Runtime
Section titled “Step 4: Create The Indexer Runtime”import { Indexer } from "@thru/indexer";import { ChainClient } from "@thru/replay";import tokenAccounts from "./account-streams/token-accounts";import { db } from "./db";import tokenTransfers from "./streams/token-transfers";
const indexer = new Indexer({ db, clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }), eventStreams: [tokenTransfers], accountStreams: [tokenAccounts], defaultStartSlot: 0n, safetyMargin: 64, pageSize: 512, logLevel: "info",});
await indexer.start();What You Get
Section titled “What You Get”After this setup:
token_transfer_eventsstores immutable token transfer rowstoken_accountsstores the latest token account state by addressindexer_checkpointstracks resumable progress per stream
Production Concerns
Section titled “Production Concerns”The steps above give you a working indexer. The sections below cover what you need when you move that indexer into a real backend service.
Separate Worker And API Processes
Section titled “Separate Worker And API Processes”In production, run the indexer worker and the API server as separate processes. This keeps write-heavy backfill work from competing with read traffic, and lets you scale or restart each side independently.
A common layout:
src/ streams/ # shared stream definitions account-streams/ # shared account stream definitions schema.ts # shared Drizzle schema exports db.ts # shared database client worker.ts # indexer process, runs Indexer.start() queries/ # app-owned Drizzle query helpers api.ts # app-owned API process, if your backend exposes oneThe worker owns the Indexer instance and calls indexer.start(). It writes rows and updates checkpoints.
import { Indexer } from "@thru/indexer";import { ChainClient } from "@thru/replay";import tokenAccounts from "./account-streams/token-accounts";import { db } from "./db";import tokenTransfers from "./streams/token-transfers";
const indexer = new Indexer({ db, clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }), eventStreams: [tokenTransfers], accountStreams: [tokenAccounts],});
process.on("SIGINT", () => indexer.stop());process.on("SIGTERM", () => indexer.stop());
await indexer.start();The API process should query the Drizzle tables directly and own its response shape.
import { desc, eq } from "drizzle-orm";import { db } from "./db";import { tokenAccountsTable, tokenTransferEvents } from "./schema";
export async function getRecentTransfersForOwner(owner: string) { const accounts = await db .select() .from(tokenAccountsTable) .where(eq(tokenAccountsTable.owner, owner));
const accountAddresses = new Set(accounts.map((account) => account.address));
const transfers = await db .select() .from(tokenTransferEvents) .orderBy(desc(tokenTransferEvents.slot)) .limit(200);
return transfers.filter( (transfer) => accountAddresses.has(transfer.source) || accountAddresses.has(transfer.dest) );}Resumability
Section titled “Resumability”The checkpoint table records progress per stream. On restart, the runtime resumes from the last stored checkpoint instead of replaying from the default start slot.
Use a conservative safetyMargin so the worker stays behind the live tip during backfill-to-live handoff.
Runtime Validation
Section titled “Runtime Validation”Enable runtime validation in development when you want parse outputs checked against generated schemas:
const indexer = new Indexer({ db, clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }), eventStreams: [tokenTransfers], accountStreams: [tokenAccounts], validateParse: process.env.NODE_ENV !== "production",});Custom SQL
Section titled “Custom SQL”Because indexed data lands in ordinary Drizzle tables, you can query it with Drizzle or raw SQL through your database layer for analytics, reporting, joins, and aggregates.