Streams
Use this page when the indexer runtime is clear but you need to design or extend the actual streams.
Event Streams Vs Account Streams
Section titled “Event Streams Vs Account Streams”| Stream type | Best for | Data model |
|---|---|---|
| Event stream | Immutable logs such as transfers, mints, fills, or lifecycle events | Append-only rows keyed by event identity |
| Account stream | Current on-chain account state such as balances, configuration accounts, or inventory | Current-state rows keyed by account identity |
Use an event stream when the chain emits the thing you want directly.
Use an account stream when the important answer is “what is the latest state of this account now?”
Event Stream Shape
Section titled “Event Stream Shape”An event stream needs:
nameschemafilterorfilterFactoryparse(event)
Token transfer example:
import { create } from "@bufbuild/protobuf";import { decodeAddress, encodeAddress, encodeSignature } from "@thru/sdk/helpers";import { defineEventStream, t } from "@thru/indexer";import { FilterParamValueSchema, FilterSchema } from "@thru/replay";import { TokenEvent } from "./abi/thru/program/token/types";
const tokenTransfers = defineEventStream({ name: "token-transfers", schema: { id: t.text().primaryKey(), slot: t.bigint().notNull().index(), txnSignature: t.text().notNull(), source: t.text().notNull().index(), dest: t.text().notNull().index(), amount: t.bigint().notNull(), }, filterFactory: () => { const programBytes = new Uint8Array(decodeAddress(process.env.TOKEN_PROGRAM_ID!)); return create(FilterSchema, { expression: "event.program.value == params.address", params: { address: create(FilterParamValueSchema, { kind: { case: "bytesValue", value: programBytes }, }), }, }); }, parse: (event) => { if (!event.payload || event.slot === undefined) return null; const tokenEvent = TokenEvent.from_array(event.payload); const transfer = tokenEvent?.payload()?.asTransfer(); if (!transfer) return null;
return { id: event.eventId, slot: event.slot, txnSignature: encodeSignature(event.transactionSignature?.value ?? new Uint8Array()), source: encodeAddress(new Uint8Array(transfer.source.get_bytes())), dest: encodeAddress(new Uint8Array(transfer.dest.get_bytes())), amount: transfer.amount, }; },});Account Stream Shape
Section titled “Account Stream Shape”An account stream needs:
nameschemaownerProgramorownerProgramFactory- optional
expectedSizeordataSizes parse(account)
Token account example:
import { decodeAddress, encodeAddress } from "@thru/sdk/helpers";import { defineAccountStream, t } from "@thru/indexer";import { TokenAccount } from "@thru/programs/token";
const tokenAccounts = defineAccountStream({ name: "token-accounts", ownerProgramFactory: () => new Uint8Array(decodeAddress(process.env.TOKEN_PROGRAM_ID!)), expectedSize: 73, schema: { address: t.text().primaryKey(), mint: t.text().notNull().index(), owner: t.text().notNull().index(), amount: t.bigint().notNull(), isFrozen: t.boolean().notNull(), slot: t.bigint().notNull(), seq: t.bigint().notNull(), }, parse: (account) => { if (account.data.length !== 73) return null; const parsed = TokenAccount.from_array(account.data); if (!parsed) return null; return { address: encodeAddress(account.address), mint: encodeAddress(new Uint8Array(parsed.mint.get_bytes())), owner: encodeAddress(new Uint8Array(parsed.owner.get_bytes())), amount: parsed.amount, isFrozen: parsed.is_frozen !== 0, slot: account.slot, seq: account.seq, }; },});Practical Rules
Section titled “Practical Rules”- Use
filterFactoryandownerProgramFactorywhen values come from environment or config so migration tooling can still import the schema files safely. - Use
expectedSizewhen the account layout is fixed and size mismatches should be skipped early. - Return
nullfromparsewhen the event or account update should be ignored. It does not delete an existing row. - Export the generated
.tablefrom every stream so Drizzle can include it in migrations.
Next Steps
Section titled “Next Steps”- Move to Running the Indexer once the stream definitions exist.
- See Build an Indexer for a full end-to-end guide including production concerns.
- See
@thru/indexerand@thru/replayfor package reference details.