---
title: Build an Indexer
description: Build a production-shaped Thru backend indexer with Postgres,
  resumable checkpoints, Drizzle tables, custom queries, and live validation.
source_url:
  html: https://thru.org/docs/indexing/build-an-indexer/
  md: https://thru.org/docs/indexing/build-an-indexer.md
---

# Build an Indexer

Use this guide to go from zero to a production-oriented Thru indexer. It starts with the smallest working setup and then walks through the concerns you will need for a real backend service.

## Prerequisites

- Node 20+ and TypeScript
- PostgreSQL
- Drizzle for schema and migrations

## Install

```bash
pnpm add @thru/indexer @thru/replay @thru/sdk @thru/programs postgres drizzle-orm
pnpm add -D drizzle-kit tsx typescript
```

## Step 1: Define A Token Transfer Event Stream

Use the token ABI type to decode the token program event payload, then return one row per matching transfer.

```ts
import { create } from "@bufbuild/protobuf";
import { decodeAddress, encodeAddress, encodeSignature } from "@thru/sdk/helpers";
import { defineEventStream, t } from "@thru/indexer";
import { FilterParamValueSchema, FilterSchema, type Event } from "@thru/replay";
import { TokenEvent } from "./abi/thru/program/token/types";

const TOKEN_PROGRAM = process.env.TOKEN_PROGRAM_ID!;

const tokenTransfers = defineEventStream({
  name: "token-transfers",
  description: "Transfer events emitted by the token program",

  schema: {
    id: t.text().primaryKey(),
    slot: t.bigint().notNull().index(),
    txnSignature: t.text().notNull(),
    source: t.text().notNull().index(),
    dest: t.text().notNull().index(),
    amount: t.bigint().notNull(),
    indexedAt: t.timestamp().notNull().defaultNow(),
  },

  filterFactory: () => {
    const programBytes = new Uint8Array(decodeAddress(TOKEN_PROGRAM));
    return create(FilterSchema, {
      expression: "event.program.value == params.address",
      params: {
        address: create(FilterParamValueSchema, {
          kind: { case: "bytesValue", value: programBytes },
        }),
      },
    });
  },

  parse: (event: Event) => {
    if (!event.payload || event.slot === undefined) return null;

    const envelope = TokenEvent.from_array(event.payload);
    const transfer = envelope?.payload()?.asTransfer();
    if (!transfer) return null;

    return {
      id: event.eventId,
      slot: event.slot,
      txnSignature: encodeSignature(event.transactionSignature?.value ?? new Uint8Array()),
      source: encodeAddress(new Uint8Array(transfer.source.get_bytes())),
      dest: encodeAddress(new Uint8Array(transfer.dest.get_bytes())),
      amount: transfer.amount,
      indexedAt: new Date(),
    };
  },
});

export const tokenTransferEvents = tokenTransfers.table;
export default tokenTransfers;
```

## Step 2: Define A Token Account Stream

For token account state inside the indexer runtime, decode the raw account bytes directly with the token program account type.

```ts
import { decodeAddress, encodeAddress } from "@thru/sdk/helpers";
import { defineAccountStream, t } from "@thru/indexer";
import { TokenAccount } from "@thru/programs/token";

const TOKEN_PROGRAM = process.env.TOKEN_PROGRAM_ID!;
const TOKEN_ACCOUNT_SIZE = 73;

const tokenAccounts = defineAccountStream({
  name: "token-accounts",
  description: "Latest token account balances by address",

  ownerProgramFactory: () => new Uint8Array(decodeAddress(TOKEN_PROGRAM)),
  expectedSize: TOKEN_ACCOUNT_SIZE,

  schema: {
    address: t.text().primaryKey(),
    mint: t.text().notNull().index(),
    owner: t.text().notNull().index(),
    amount: t.bigint().notNull(),
    isFrozen: t.boolean().notNull(),
    slot: t.bigint().notNull(),
    seq: t.bigint().notNull(),
    updatedAt: t.timestamp().notNull().defaultNow(),
  },

  parse: (account) => {
    if (account.data.length !== TOKEN_ACCOUNT_SIZE) return null;

    const parsed = TokenAccount.from_array(account.data);
    if (!parsed) return null;

    return {
      address: encodeAddress(account.address),
      mint: encodeAddress(new Uint8Array(parsed.mint.get_bytes())),
      owner: encodeAddress(new Uint8Array(parsed.owner.get_bytes())),
      amount: parsed.amount,
      isFrozen: parsed.is_frozen !== 0,
      slot: account.slot,
      seq: account.seq,
      updatedAt: new Date(),
    };
  },
});

export const tokenAccountsTable = tokenAccounts.table;
export default tokenAccounts;
```

## Step 3: Export The Tables For Drizzle

```ts
export { checkpointTable } from "@thru/indexer";
export { tokenAccountsTable } from "./account-streams/token-accounts";
export { tokenTransferEvents } from "./streams/token-transfers";
```

Use those exports as your Drizzle schema input.

```ts
import { defineConfig } from "drizzle-kit";

export default defineConfig({
  schema: "./src/schema.ts",
  out: "./drizzle",
  dialect: "postgresql",
  dbCredentials: {
    url: process.env.DATABASE_URL!,
  },
});
```

## Step 4: Create The Indexer Runtime

```ts
import { Indexer } from "@thru/indexer";
import { ChainClient } from "@thru/replay";
import tokenAccounts from "./account-streams/token-accounts";
import { db } from "./db";
import tokenTransfers from "./streams/token-transfers";

const indexer = new Indexer({
  db,
  clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }),
  eventStreams: [tokenTransfers],
  accountStreams: [tokenAccounts],
  defaultStartSlot: 0n,
  safetyMargin: 64,
  pageSize: 512,
  logLevel: "info",
});

await indexer.start();
```

## What You Get

After this setup:

- `token_transfer_events` stores immutable token transfer rows
- `token_accounts` stores the latest token account state by address
- `indexer_checkpoints` tracks resumable progress per stream

## Production Concerns

The steps above give you a working indexer. The sections below cover what you need when you move that indexer into a real backend service.

### Separate Worker And API Processes

In production, run the indexer worker and the API server as separate processes. This keeps write-heavy backfill work from competing with read traffic, and lets you scale or restart each side independently.

A common layout:

```text
src/
  streams/           # shared stream definitions
  account-streams/   # shared account stream definitions
  schema.ts          # shared Drizzle schema exports
  db.ts              # shared database client
  worker.ts          # indexer process, runs Indexer.start()
  queries/           # app-owned Drizzle query helpers
  api.ts             # app-owned API process, if your backend exposes one
```

The worker owns the `Indexer` instance and calls `indexer.start()`. It writes rows and updates checkpoints.

```ts
import { Indexer } from "@thru/indexer";
import { ChainClient } from "@thru/replay";
import tokenAccounts from "./account-streams/token-accounts";
import { db } from "./db";
import tokenTransfers from "./streams/token-transfers";

const indexer = new Indexer({
  db,
  clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }),
  eventStreams: [tokenTransfers],
  accountStreams: [tokenAccounts],
});

process.on("SIGINT", () => indexer.stop());
process.on("SIGTERM", () => indexer.stop());

await indexer.start();
```

The API process should query the Drizzle tables directly and own its response shape.

```ts
import { desc, eq } from "drizzle-orm";
import { db } from "./db";
import { tokenAccountsTable, tokenTransferEvents } from "./schema";

export async function getRecentTransfersForOwner(owner: string) {
  const accounts = await db
    .select()
    .from(tokenAccountsTable)
    .where(eq(tokenAccountsTable.owner, owner));

  const accountAddresses = new Set(accounts.map((account) => account.address));

  const transfers = await db
    .select()
    .from(tokenTransferEvents)
    .orderBy(desc(tokenTransferEvents.slot))
    .limit(200);

  return transfers.filter(
    (transfer) =>
      accountAddresses.has(transfer.source) ||
      accountAddresses.has(transfer.dest)
  );
}
```

### Resumability

The checkpoint table records progress per stream. On restart, the runtime resumes from the last stored checkpoint instead of replaying from the default start slot.

Use a conservative `safetyMargin` so the worker stays behind the live tip during backfill-to-live handoff.

### Runtime Validation

Enable runtime validation in development when you want parse outputs checked against generated schemas:

```ts
const indexer = new Indexer({
  db,
  clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }),
  eventStreams: [tokenTransfers],
  accountStreams: [tokenAccounts],
  validateParse: process.env.NODE_ENV !== "production",
});
```

### Custom SQL

Because indexed data lands in ordinary Drizzle tables, you can query it with Drizzle or raw SQL through your database layer for analytics, reporting, joins, and aggregates.

## Related Pages

- [Streams](https://thru.org/docs/indexing/streams.md)
- [Running the Indexer](https://thru.org/docs/indexing/running-the-indexer.md)
- [Querying Indexed Data](https://thru.org/docs/indexing/querying-indexed-data.md)
- [`@thru/indexer`](https://thru.org/docs/sdks/web-packages/indexer.md)
- [`@thru/replay`](https://thru.org/docs/sdks/web-packages/replay.md)
