---
title: Streams
description: Define event streams and account streams for Thru indexing, and
  understand when to use each one.
source_url:
  html: https://thru.org/docs/indexing/streams/
  md: https://thru.org/docs/indexing/streams.md
---

# Streams

Use this page when the indexer runtime is clear but you need to design or extend the actual streams.

## Event Streams Vs Account Streams

| Stream type | Best for | Data model |
| - | - | - |
| Event stream | Immutable logs such as transfers, mints, fills, or lifecycle events | Append-only rows keyed by event identity |
| Account stream | Current on-chain account state such as balances, configuration accounts, or inventory | Current-state rows keyed by account identity |

Use an event stream when the chain emits the thing you want directly.

Use an account stream when the important answer is “what is the latest state of this account now?”

## Event Stream Shape

An event stream needs:

- `name`
- `schema`
- `filter` or `filterFactory`
- `parse(event)`

Token transfer example:

```ts
import { create } from "@bufbuild/protobuf";
import { decodeAddress, encodeAddress, encodeSignature } from "@thru/sdk/helpers";
import { defineEventStream, t } from "@thru/indexer";
import { FilterParamValueSchema, FilterSchema } from "@thru/replay";
import { TokenEvent } from "./abi/thru/program/token/types";

const tokenTransfers = defineEventStream({
  name: "token-transfers",
  schema: {
    id: t.text().primaryKey(),
    slot: t.bigint().notNull().index(),
    txnSignature: t.text().notNull(),
    source: t.text().notNull().index(),
    dest: t.text().notNull().index(),
    amount: t.bigint().notNull(),
  },
  filterFactory: () => {
    const programBytes = new Uint8Array(decodeAddress(process.env.TOKEN_PROGRAM_ID!));
    return create(FilterSchema, {
      expression: "event.program.value == params.address",
      params: {
        address: create(FilterParamValueSchema, {
          kind: { case: "bytesValue", value: programBytes },
        }),
      },
    });
  },
  parse: (event) => {
    if (!event.payload || event.slot === undefined) return null;
    const tokenEvent = TokenEvent.from_array(event.payload);
    const transfer = tokenEvent?.payload()?.asTransfer();
    if (!transfer) return null;

    return {
      id: event.eventId,
      slot: event.slot,
      txnSignature: encodeSignature(event.transactionSignature?.value ?? new Uint8Array()),
      source: encodeAddress(new Uint8Array(transfer.source.get_bytes())),
      dest: encodeAddress(new Uint8Array(transfer.dest.get_bytes())),
      amount: transfer.amount,
    };
  },
});
```

## Account Stream Shape

An account stream needs:

- `name`
- `schema`
- `ownerProgram` or `ownerProgramFactory`
- optional `expectedSize` or `dataSizes`
- `parse(account)`

Token account example:

```ts
import { decodeAddress, encodeAddress } from "@thru/sdk/helpers";
import { defineAccountStream, t } from "@thru/indexer";
import { TokenAccount } from "@thru/programs/token";

const tokenAccounts = defineAccountStream({
  name: "token-accounts",
  ownerProgramFactory: () => new Uint8Array(decodeAddress(process.env.TOKEN_PROGRAM_ID!)),
  expectedSize: 73,
  schema: {
    address: t.text().primaryKey(),
    mint: t.text().notNull().index(),
    owner: t.text().notNull().index(),
    amount: t.bigint().notNull(),
    isFrozen: t.boolean().notNull(),
    slot: t.bigint().notNull(),
    seq: t.bigint().notNull(),
  },
  parse: (account) => {
    if (account.data.length !== 73) return null;
    const parsed = TokenAccount.from_array(account.data);
    if (!parsed) return null;
    return {
      address: encodeAddress(account.address),
      mint: encodeAddress(new Uint8Array(parsed.mint.get_bytes())),
      owner: encodeAddress(new Uint8Array(parsed.owner.get_bytes())),
      amount: parsed.amount,
      isFrozen: parsed.is_frozen !== 0,
      slot: account.slot,
      seq: account.seq,
    };
  },
});
```

## Practical Rules

- Use `filterFactory` and `ownerProgramFactory` when values come from environment or config so migration tooling can still import the schema files safely.
- Use `expectedSize` when the account layout is fixed and size mismatches should be skipped early.
- Return `null` from `parse` when the event or account update should be ignored. It does not delete an existing row.
- Export the generated `.table` from every stream so Drizzle can include it in migrations.

## Next Steps

- Move to [Running the Indexer](https://thru.org/docs/indexing/running-the-indexer.md) once the stream definitions exist.
- See [Build an Indexer](https://thru.org/docs/indexing/build-an-indexer.md) for a full end-to-end guide including production concerns.
- See [`@thru/indexer`](https://thru.org/docs/sdks/web-packages/indexer.md) and [`@thru/replay`](https://thru.org/docs/sdks/web-packages/replay.md) for package reference details.
