diff --git a/docs/HyperIndex/Advanced/effect-api.md b/docs/HyperIndex/Advanced/effect-api.md
index 563e33ac..8f68a440 100644
--- a/docs/HyperIndex/Advanced/effect-api.md
+++ b/docs/HyperIndex/Advanced/effect-api.md
@@ -51,9 +51,10 @@ The first argument is an options object that describes the effect:
- `name` (required) - the name of the effect used for debugging and logging
- `input` (required) - the input type of the effect
-- `output` (required) - the output type of the effect
+- `output` (required) - the output type of the effect (not required for `unorderedAfterCommit` and `orderedAfterCommit` modes which return `void`)
- `rateLimit` (required) - the maximum calls allowed per timeframe, or `false` to disable
- `cache` (optional) - save effect results in the database to prevent duplicate calls (Starting from `envio@2.26.0`)
+- `mode` (optional) - the execution intent of the effect. Defaults to `speculative`. See [Execution Modes](#execution-modes).
The second argument is a function that will be called with the effect's input.
@@ -64,10 +65,15 @@ After defining an effect, you can use `context.effect` to call it from your hand
The `context.effect` function accepts an effect as the first argument and the effect's input as the second argument:
```typescript
-ERC20.Transfer.handler(async ({ event, context }) => {
- const metadata = await context.effect(getMetadata, event.params.from);
- // Process the event with the metadata
-});
+import { indexer } from "envio";
+
+indexer.onEvent(
+ { contract: "ERC20", event: "Transfer" },
+ async ({ event, context }) => {
+ const metadata = await context.effect(getMetadata, event.params.from);
+ // Process the event with the metadata
+ },
+);
```
### Reading On-Chain State (eth_call)
@@ -213,6 +219,68 @@ export const getBalance = createEffect(
Watch the following video to learn more about createEffect and other updates introduced in [v2.32.0](https://github.com/enviodev/hyperindex/releases/tag/v2.32.0).
+### Execution Modes
+
+The `mode` option controls **when** the effect runs, **how** it's ordered relative to other effect calls in the batch, and what `context.effect` returns. One constructor, one call verb — behavior shifts on `mode`.
+
+| Mode | Input | Order | Timing | Returns | Use case |
+| --- | --- | --- | --- | --- | --- |
+| `speculative` *(default)* | maybe-wrong | unordered | preload (parallel) | value | RPC reads, fetch, IPFS — the default behavior |
+| `unordered` | correct | unordered | inline (parallel within batch) | value | parallel reads/writes with verified input where order doesn't matter |
+| `ordered` | correct | in-order | inline (sequential pass) | value | low-latency writes where order matters (e.g. depends on prior entity writes in the same batch) |
+| `unorderedAfterCommit` | correct | unordered | after DB commit | `void` | high-throughput sends, fire-and-forget webhooks, partitioned Kafka |
+| `orderedAfterCommit` | correct | in-order | after DB commit | `void` | sends where order matters (Telegram, ordered streams) |
+
+The two `*AfterCommit` modes are the right choice for **outbound messaging** — Redis, Kafka, RabbitMQ, SNS/SQS, Telegram, Discord, Slack, webhooks. They fire only after the batch's database transaction commits, so a downstream consumer never sees a message for a state that didn't actually persist. They also return `void`, so you don't need to `await` the call from your handler — the runtime takes care of dispatching after commit.
+
+```typescript
+import { createEffect, indexer, S } from "envio";
+
+const notifyLargeSwap = createEffect(
+ {
+ name: "notifyLargeSwap",
+ input: { usd: S.bigint, blockNumber: S.number },
+ mode: "orderedAfterCommit",
+ },
+ async ({ input }) => {
+ const text = `Large swap: $${input.usd.toLocaleString()} in block ${input.blockNumber}`;
+ await fetch(`https://api.telegram.org/bot${process.env.ENVIO_TG_BOT_TOKEN}/sendMessage`, {
+ method: "POST",
+ body: JSON.stringify({
+ chat_id: process.env.ENVIO_TG_CHAT_ID,
+ text,
+ }),
+ });
+ }
+);
+
+indexer.onEvent(
+ { contract: "Pool", event: "Swap" },
+ async ({ event, context }) => {
+ const usd = event.params.amount;
+ if (usd > 1_000_000n) {
+ context.effect(notifyLargeSwap, {
+ usd,
+ blockNumber: event.block.number,
+ });
+ // no await — orderedAfterCommit returns void; runtime fires it after the batch's DB commit
+ }
+ },
+);
+```
+
+#### When to pick which mode
+
+- **`speculative`** — your call is idempotent, the input may not be final (preloading runs handlers ahead of confirmed state), and you only need a value back. RPC `eth_call`s, `fetch` of IPFS metadata, and any read where retries are cheap belong here.
+- **`unordered`** — you've already verified the input (e.g. derived it from a committed entity) and the work is parallel-safe. Use this for batched reads/writes that don't depend on each other.
+- **`ordered`** — the effect's correctness depends on something written earlier in the same batch. The runtime will run these calls sequentially in handler order.
+- **`unorderedAfterCommit`** — fire-and-forget sends where ordering doesn't matter: partitioned Kafka topics, Redis Streams keyed by tx hash, generic webhooks. Highest throughput, never sends for state that wasn't committed.
+- **`orderedAfterCommit`** — sends to a single ordered destination: a Telegram chat, a Slack channel, a single Kafka partition. The runtime preserves handler order across the batch.
+
+For outbound messaging, `*AfterCommit` is the safe default. If you've measured commit latency as the bottleneck and your downstream consumer is idempotent, you can also use the inline `unordered` / `ordered` modes for **lower latency** sends — same parallel/sequential semantics, but they fire before the DB commit and return a value. The tradeoff is weaker delivery: a failed batch can still produce a delivered message, so retries on the next run are duplicates rather than first-time sends.
+
+This is what makes the Effect API a good fit for **streams and chat bots** — you describe what to send and where, the runtime handles batching, ordering, and delivery semantics. See the [Streams](/docs/HyperIndex/streams) and [Chat Bots](/docs/HyperIndex/chatbots) guides for end-to-end examples per provider.
+
### Sending Notifications (Webhooks)
You can use the Effect API to send push notifications or webhook calls when specific events occur. This is useful for alerting systems, Discord/Slack bots, or triggering downstream workflows.
@@ -227,26 +295,18 @@ export const sendWebhook = createEffect(
event: S.string,
data: S.string,
},
- output: S.boolean,
rateLimit: {
calls: 10,
per: "second",
},
- // Don't cache webhook calls - we want them to fire every time
- cache: false,
+ mode: "unorderedAfterCommit",
},
async ({ input, context }) => {
- try {
- await fetch("https://your-webhook-url.com/notify", {
- method: "POST",
- headers: { "Content-Type": "application/json" },
- body: JSON.stringify({ event: input.event, data: input.data }),
- });
- return true;
- } catch (error) {
- context.log.error(`Webhook failed: ${error}`);
- return false;
- }
+ await fetch("https://your-webhook-url.com/notify", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ event: input.event, data: input.data }),
+ });
}
);
```
@@ -254,21 +314,25 @@ export const sendWebhook = createEffect(
Then call it from your handler:
```typescript
-MyContract.LargeTransfer.handler(async ({ event, context }) => {
- await context.effect(sendWebhook, {
- event: "large_transfer",
- data: JSON.stringify({
- from: event.params.from,
- to: event.params.to,
- amount: event.params.value.toString(),
- }),
- });
-});
+import { indexer } from "envio";
+
+indexer.onEvent(
+ { contract: "MyContract", event: "LargeTransfer" },
+ async ({ event, context }) => {
+ context.effect(sendWebhook, {
+ event: "large_transfer",
+ data: JSON.stringify({
+ from: event.params.from,
+ to: event.params.to,
+ amount: event.params.value.toString(),
+ }),
+ });
+ // no await — `unorderedAfterCommit` returns void and dispatches after the DB commit
+ },
+);
```
-:::warning
-Webhook effects will fire on every indexer re-run unless you set `cache: true`. If you cache them, the webhook will only fire once per unique input. Consider which behavior is appropriate for your use case.
-:::
+Because the effect runs **after** the batch's DB commit, the webhook will only fire for state that actually persisted — a reorg or a failed batch never produces a phantom notification. For send-only effects you typically don't need `cache: true`; the mode itself prevents duplicate sends within a successful run.
### Migrate from Experimental
diff --git a/docs/HyperIndex/Chatbots/discord.md b/docs/HyperIndex/Chatbots/discord.md
new file mode 100644
index 00000000..576320c5
--- /dev/null
+++ b/docs/HyperIndex/Chatbots/discord.md
@@ -0,0 +1,116 @@
+---
+id: chatbots-discord
+title: Discord
+sidebar_label: Discord
+slug: /chatbots/discord
+description: Send Discord messages from HyperIndex handlers using the Effect API.
+---
+
+Post messages to a Discord channel using either an [incoming webhook](https://support.discord.com/hc/en-us/articles/228383668) (simplest) or a bot token. Both work with raw `fetch`.
+
+### Channel setup
+
+1. Channel **Settings → Integrations → Webhooks → New Webhook**.
+2. Copy the webhook URL.
+3. Set `ENVIO_DISCORD_WEBHOOK_URL` in your `.env`.
+
+### Define the effect
+
+Webhook URL is baked into the effect. The embed is built inside the effect body — `input` is just raw event data.
+
+```typescript title="src/effects/discord.ts"
+import { createEffect, S } from "envio";
+
+const formatUnits = (value: bigint, decimals = 18) => {
+ const base = 10n ** BigInt(decimals);
+ const whole = value / base;
+ const frac = value % base;
+ if (frac === 0n) return whole.toString();
+ return `${whole}.${frac.toString().padStart(decimals, "0").replace(/0+$/, "")}`;
+};
+
+export const notifyTransfer = createEffect(
+ {
+ name: "notifyTransfer",
+ input: {
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ contract: S.string,
+ txHash: S.string,
+ },
+ rateLimit: { calls: 5, per: "second" }, // Discord webhook limit
+ mode: "orderedAfterCommit",
+ },
+ async ({ input, context }) => {
+ const res = await fetch(process.env.ENVIO_DISCORD_WEBHOOK_URL!, {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({
+ content: "**New RETH Transfer Event**",
+ embeds: [
+ {
+ title: `${formatUnits(input.value)} RETH transferred`,
+ url: `https://etherscan.io/tx/${input.txHash}`,
+ fields: [
+ { name: "From", value: input.from, inline: true },
+ { name: "To", value: input.to, inline: true },
+ { name: "Contract", value: input.contract },
+ ],
+ },
+ ],
+ allowed_mentions: { parse: [] },
+ }),
+ });
+ if (!res.ok) {
+ context.log.error(`Discord failed: ${res.status} ${await res.text()}`);
+ throw new Error(`Discord ${res.status}`);
+ }
+ }
+);
+```
+
+### Call it from a handler
+
+The rindexer template…
+
+```yaml
+chat:
+ discord:
+ - messages:
+ - event_name: Transfer
+ filter_expression: "value >= 10 && value <= 2000000000000000000"
+ template_inline: |
+ *New RETH Transfer Event*
+ from: {{from}}
+ to: {{to}}
+ amount: {{format_value(value, 18)}}
+ contract: {{transaction_information.address}}
+ [etherscan](https://etherscan.io/tx/{{transaction_information.transaction_hash}})
+```
+
+…becomes:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { notifyTransfer } from "../effects/discord";
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { from, to, value } = event.params;
+
+ if (value < 10n || value > 2_000_000_000_000_000_000n) return;
+
+ context.effect(notifyTransfer, {
+ from,
+ to,
+ value,
+ contract: event.srcAddress,
+ txHash: event.transaction.hash,
+ });
+ },
+);
+```
+
+Discord embeds give you richer formatting than plain text — fields, colors, thumbnails — without any extra dependency. If you'd rather keep things simple, replace `embeds` with a markdown `content` string. For lower latency, switch the effect to `mode: "ordered"` (or `"unordered"` if order doesn't matter) — Discord is rate-limited so the gain is small, but it skips the wait for the DB commit.
diff --git a/docs/HyperIndex/Chatbots/index.md b/docs/HyperIndex/Chatbots/index.md
new file mode 100644
index 00000000..6ece3d63
--- /dev/null
+++ b/docs/HyperIndex/Chatbots/index.md
@@ -0,0 +1,46 @@
+---
+id: chatbots
+title: Chat Bots
+sidebar_label: Overview
+slug: /chatbots
+description: Send on-chain notifications from HyperIndex to Telegram, Discord, Slack, Twilio, PagerDuty, or OpsGenie.
+---
+
+Chat bots in HyperIndex are just [Effects](/docs/HyperIndex/effect-api) running in `orderedAfterCommit` mode that call a chat platform's API. There is no separate templating language and no separate config file — you build your message string with template literals and decide when to send it with normal `if` statements.
+
+### Picking a mode
+
+Chat platforms display messages in arrival order to humans, so you almost always want **ordered** delivery. The runtime preserves the order of `context.effect(...)` calls across the entire batch, then dispatches them after the DB commit.
+
+| Mode | When to use |
+| --- | --- |
+| `orderedAfterCommit` *(default choice for chat bots)* | Single ordered destination (Telegram chat, Slack channel). Won't message users about state that was rolled back. |
+| `unorderedAfterCommit` | Independent alerts where order doesn't matter (one alert per swap, fan-out to multiple channels). Higher throughput. |
+| `ordered` *(lower latency)* | Need the message out as fast as possible and your operators tolerate the rare duplicate on a failed batch. Inline, sequential, returns a value. |
+| `unordered` *(lower latency)* | Same speed/safety tradeoff as `ordered`, but parallel dispatch. Use when alerts are fully independent. |
+
+Use the after-commit modes by default. Switch to `ordered` / `unordered` only when you measure commit latency as the bottleneck — chat platforms are rate-limited anyway, so the gain is usually small.
+
+### Comparison with rindexer
+
+| rindexer | HyperIndex |
+| --- | --- |
+| `chat:` block in YAML, one entry per provider | A `createEffect({ mode: "orderedAfterCommit" })` per destination |
+| `filter_expression: "value >= 10 && from = '0x…'"` | A regular TypeScript `if` |
+| `template_inline` Mustache template | Template literal (full TypeScript expressions inside `${...}`) |
+| 10-block max range hard-coded | Use the rate-limit option to tune throughput |
+
+### Supported platforms
+
+- [Telegram](/docs/HyperIndex/chatbots/telegram)
+- [Discord](/docs/HyperIndex/chatbots/discord)
+- [Slack](/docs/HyperIndex/chatbots/slack)
+- [Twilio (SMS)](/docs/HyperIndex/chatbots/twilio)
+- [PagerDuty](/docs/HyperIndex/chatbots/pagerduty)
+- [OpsGenie](/docs/HyperIndex/chatbots/opsgenie)
+
+### Pattern
+
+Each effect bakes in its own static config (token, channel, rate limit, message template) and accepts only the per-event values that vary in `input`. That keeps call sites tiny, makes deduplication effective, and means switching destinations later is a one-line change.
+
+The example pages below all use a small `formatUnits` helper inside the effect body to render bigints — equivalent to rindexer's `{{format_value(value, 18)}}`.
diff --git a/docs/HyperIndex/Chatbots/opsgenie.md b/docs/HyperIndex/Chatbots/opsgenie.md
new file mode 100644
index 00000000..5d4c4af3
--- /dev/null
+++ b/docs/HyperIndex/Chatbots/opsgenie.md
@@ -0,0 +1,125 @@
+---
+id: chatbots-opsgenie
+title: OpsGenie
+sidebar_label: OpsGenie
+slug: /chatbots/opsgenie
+description: Create OpsGenie alerts from HyperIndex handlers using the Effect API.
+---
+
+Create OpsGenie alerts using the [Alert API](https://docs.opsgenie.com/docs/alert-api). One JSON `POST` — no SDK required.
+
+### Integration setup
+
+1. OpsGenie → **Settings → Integrations → API**, then copy the API key.
+2. Set `ENVIO_OPSGENIE_API_KEY` in your `.env`.
+3. Use `https://api.opsgenie.com` (US) or `https://api.eu.opsgenie.com` (EU) as the host.
+
+### Define the effect
+
+API key, priority, and the alert template are baked into the effect. `input` is just the raw event data.
+
+```typescript title="src/effects/opsgenie.ts"
+import { createEffect, S } from "envio";
+
+const formatUnits = (value: bigint, decimals = 18) => {
+ const base = 10n ** BigInt(decimals);
+ const whole = value / base;
+ const frac = value % base;
+ if (frac === 0n) return whole.toString();
+ return `${whole}.${frac.toString().padStart(decimals, "0").replace(/0+$/, "")}`;
+};
+
+export const whaleAlert = createEffect(
+ {
+ name: "whaleAlert",
+ input: {
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ txHash: S.string,
+ chainId: S.number,
+ blockNumber: S.number,
+ },
+ rateLimit: { calls: 100, per: "minute" },
+ mode: "orderedAfterCommit",
+ },
+ async ({ input, context }) => {
+ const res = await fetch("https://api.opsgenie.com/v2/alerts", {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ Authorization: `GenieKey ${process.env.ENVIO_OPSGENIE_API_KEY}`,
+ },
+ body: JSON.stringify({
+ message: `Whale moved ${formatUnits(input.value)} RETH`,
+ priority: "P1",
+ // alias deduplicates alerts in OpsGenie — keep it stable per logical event.
+ alias: input.txHash,
+ description: [
+ `from: ${input.from}`,
+ `to: ${input.to}`,
+ `amount: ${formatUnits(input.value, 18)}`,
+ `etherscan: https://etherscan.io/tx/${input.txHash}`,
+ ].join("\n"),
+ details: {
+ chainId: input.chainId,
+ block: input.blockNumber,
+ },
+ source: "hyperindex",
+ }),
+ });
+ if (!res.ok) {
+ context.log.error(`OpsGenie failed: ${res.status} ${await res.text()}`);
+ throw new Error(`OpsGenie ${res.status}`);
+ }
+ }
+);
+```
+
+### Call it from a handler
+
+The rindexer config…
+
+```yaml
+chat:
+ opsgenie:
+ - api_key: ${ENVIO_OPSGENIE_API_KEY}
+ priority: P1
+ messages:
+ - event_name: Transfer
+ filter_expression: "from = '0x0338ce5020c447f7e668dc2ef778025ce3982662' && value >= 10"
+ template_inline: |
+ New RETH Transfer Event
+ from: {{from}}
+ to: {{to}}
+ amount: {{format_value(value, 18)}}
+ etherscan: https://etherscan.io/tx/{{transaction_information.transaction_hash}}
+```
+
+…becomes:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { whaleAlert } from "../effects/opsgenie";
+
+const WHALE = "0x0338ce5020c447f7e668dc2ef778025ce3982662";
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { from, to, value } = event.params;
+ if (from.toLowerCase() !== WHALE || value < 10n) return;
+
+ context.effect(whaleAlert, {
+ from,
+ to,
+ value,
+ txHash: event.transaction.hash,
+ chainId: context.chain.id,
+ blockNumber: event.block.number,
+ });
+ },
+);
+```
+
+For time-sensitive alerts, switch the effect to `mode: "ordered"` so the alert is created inline within the batch; the `alias` still protects you from duplicates if the batch later fails.
diff --git a/docs/HyperIndex/Chatbots/pagerduty.md b/docs/HyperIndex/Chatbots/pagerduty.md
new file mode 100644
index 00000000..fc795837
--- /dev/null
+++ b/docs/HyperIndex/Chatbots/pagerduty.md
@@ -0,0 +1,120 @@
+---
+id: chatbots-pagerduty
+title: PagerDuty
+sidebar_label: PagerDuty
+slug: /chatbots/pagerduty
+description: Trigger PagerDuty incidents from HyperIndex handlers using the Effect API.
+---
+
+Trigger PagerDuty incidents using the [Events API v2](https://developer.pagerduty.com/docs/ZG9jOjExMDI5NTgw-events-api-v2-overview). No SDK needed — it's a single JSON `POST`.
+
+### Integration setup
+
+1. PagerDuty → **Services → Service Directory → your service → Integrations → Add → Events API v2**.
+2. Copy the **Integration Key** (a.k.a. routing key).
+3. Set `ENVIO_PAGERDUTY_ROUTING_KEY` in your `.env`.
+
+### Define the effect
+
+Routing key, severity, and the alert template are baked into the effect — `input` is just the raw event data.
+
+```typescript title="src/effects/pagerduty.ts"
+import { createEffect, S } from "envio";
+
+const formatUnits = (value: bigint, decimals = 18) => {
+ const base = 10n ** BigInt(decimals);
+ const whole = value / base;
+ const frac = value % base;
+ if (frac === 0n) return whole.toString();
+ return `${whole}.${frac.toString().padStart(decimals, "0").replace(/0+$/, "")}`;
+};
+
+export const whaleAlert = createEffect(
+ {
+ name: "whaleAlert",
+ input: {
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ contract: S.string,
+ txHash: S.string,
+ },
+ rateLimit: { calls: 60, per: "minute" },
+ mode: "orderedAfterCommit",
+ },
+ async ({ input, context }) => {
+ const res = await fetch("https://events.pagerduty.com/v2/enqueue", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({
+ routing_key: process.env.ENVIO_PAGERDUTY_ROUTING_KEY,
+ event_action: "trigger",
+ // Stable dedup_key prevents one alert per event from spamming PagerDuty if you re-run.
+ dedup_key: input.txHash,
+ payload: {
+ summary: `Whale moved ${formatUnits(input.value)} RETH`,
+ source: input.contract,
+ severity: "critical",
+ custom_details: {
+ from: input.from,
+ to: input.to,
+ value: input.value.toString(),
+ link: `https://etherscan.io/tx/${input.txHash}`,
+ },
+ },
+ }),
+ });
+ if (!res.ok) {
+ context.log.error(`PagerDuty failed: ${res.status} ${await res.text()}`);
+ throw new Error(`PagerDuty ${res.status}`);
+ }
+ }
+);
+```
+
+### Call it from a handler
+
+The rindexer config…
+
+```yaml
+chat:
+ pagerduty:
+ - routing_key: ${ENVIO_PAGERDUTY_ROUTING_KEY}
+ severity: critical
+ messages:
+ - event_name: Transfer
+ filter_expression: "from = '0x0338ce5020c447f7e668dc2ef778025ce3982662' && value >= 10"
+ template_inline: |
+ New RETH Transfer Event
+ from: {{from}}
+ to: {{to}}
+ amount: {{format_value(value, 18)}}
+ link: https://etherscan.io/tx/{{transaction_information.transaction_hash}}
+```
+
+…becomes:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { whaleAlert } from "../effects/pagerduty";
+
+const WHALE = "0x0338ce5020c447f7e668dc2ef778025ce3982662";
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { from, to, value } = event.params;
+ if (from.toLowerCase() !== WHALE || value < 10n) return;
+
+ context.effect(whaleAlert, {
+ from,
+ to,
+ value,
+ contract: event.srcAddress,
+ txHash: event.transaction.hash,
+ });
+ },
+);
+```
+
+The `dedup_key` baked into the effect (the tx hash) guarantees at-most-one incident per transaction even across reruns. For incidents that need to fire ASAP, switch the effect to `mode: "ordered"` so the trigger runs inline within the batch — the dedup key still protects you from duplicates if the batch later fails and re-runs.
diff --git a/docs/HyperIndex/Chatbots/slack.md b/docs/HyperIndex/Chatbots/slack.md
new file mode 100644
index 00000000..45367874
--- /dev/null
+++ b/docs/HyperIndex/Chatbots/slack.md
@@ -0,0 +1,113 @@
+---
+id: chatbots-slack
+title: Slack
+sidebar_label: Slack
+slug: /chatbots/slack
+description: Send Slack messages from HyperIndex handlers using the Effect API.
+---
+
+Post to a Slack channel using either an [incoming webhook](https://api.slack.com/messaging/webhooks) (no SDK needed) or the [Web API](https://api.slack.com/web) with `@slack/web-api`. The webhook flow is shown below.
+
+### Channel setup
+
+1. Create a Slack app and enable **Incoming Webhooks**.
+2. Add a new webhook for the target channel and copy the URL.
+3. Set `ENVIO_SLACK_WEBHOOK_URL` in your `.env`.
+
+### Define the effect
+
+Webhook URL is baked in. The message string is built inside the effect — `input` is just raw event data.
+
+```typescript title="src/effects/slack.ts"
+import { createEffect, S } from "envio";
+
+const formatUnits = (value: bigint, decimals = 18) => {
+ const base = 10n ** BigInt(decimals);
+ const whole = value / base;
+ const frac = value % base;
+ if (frac === 0n) return whole.toString();
+ return `${whole}.${frac.toString().padStart(decimals, "0").replace(/0+$/, "")}`;
+};
+
+export const notifyTransfer = createEffect(
+ {
+ name: "notifyTransfer",
+ input: {
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ contract: S.string,
+ txHash: S.string,
+ },
+ rateLimit: { calls: 1, per: "second" }, // Slack: ~1 msg/sec/channel
+ mode: "orderedAfterCommit",
+ },
+ async ({ input, context }) => {
+ const text = [
+ `*New RETH Transfer Event*`,
+ `from: ${input.from}`,
+ `to: ${input.to}`,
+ `amount: ${formatUnits(input.value, 18)}`,
+ `RETH contract: ${input.contract}`,
+ ``,
+ ].join("\n");
+
+ const res = await fetch(process.env.ENVIO_SLACK_WEBHOOK_URL!, {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ text }),
+ });
+ if (!res.ok) {
+ context.log.error(`Slack failed: ${res.status} ${await res.text()}`);
+ throw new Error(`Slack ${res.status}`);
+ }
+ }
+);
+```
+
+### Call it from a handler
+
+The rindexer config…
+
+```yaml
+chat:
+ slack:
+ - bot_token: ${SLACK_BOT_TOKEN}
+ channel: "#RethTransferEvents"
+ messages:
+ - event_name: Transfer
+ filter_expression: "value >= 10 && value <= 2000000000000000000"
+ template_inline: |
+ *New RETH Transfer Event*
+ from: {{from}}
+ to: {{to}}
+ amount: {{format_value(value, 18)}}
+ RETH contract: {{transaction_information.address}}
+
+```
+
+…becomes:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { notifyTransfer } from "../effects/slack";
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { from, to, value } = event.params;
+
+ if (value < 10n || value > 2_000_000_000_000_000_000n) return;
+
+ context.effect(notifyTransfer, {
+ from,
+ to,
+ value,
+ contract: event.srcAddress,
+ txHash: event.transaction.hash,
+ });
+ },
+);
+```
+
+For richer layouts (buttons, headers, dividers) build a [Block Kit](https://api.slack.com/block-kit) array inside the effect body and add it to the request as `blocks`. If you need lower latency, switch the effect to `mode: "ordered"` — the message is posted inline within the batch instead of after the DB commit.
diff --git a/docs/HyperIndex/Chatbots/telegram.md b/docs/HyperIndex/Chatbots/telegram.md
new file mode 100644
index 00000000..8d1c9268
--- /dev/null
+++ b/docs/HyperIndex/Chatbots/telegram.md
@@ -0,0 +1,163 @@
+---
+id: chatbots-telegram
+title: Telegram
+sidebar_label: Telegram
+slug: /chatbots/telegram
+description: Send Telegram messages from HyperIndex handlers using the Effect API.
+---
+
+Send Telegram messages from your handlers via the [Bot API](https://core.telegram.org/bots/api). No SDK required — just `fetch`.
+
+### Bot setup
+
+1. Open [@BotFather](https://t.me/BotFather) in Telegram, run `/newbot`, and copy the bot token.
+2. Add the bot to a chat (or DM it) and grab the chat ID. The simplest way is to send any message and visit `https://api.telegram.org/bot/getUpdates` — `chat.id` is in the JSON.
+3. Set `ENVIO_TELEGRAM_BOT_TOKEN` and `ENVIO_TELEGRAM_CHAT_ID` in your `.env`.
+
+### Define the effect
+
+Bot token and chat ID are static — bake them in. The handler hands the effect raw values; the message string is built inside the effect body.
+
+```typescript title="src/effects/telegram.ts"
+import { createEffect, S } from "envio";
+
+const ENDPOINT = `https://api.telegram.org/bot${process.env.ENVIO_TELEGRAM_BOT_TOKEN}/sendMessage`;
+const CHAT_ID = process.env.ENVIO_TELEGRAM_CHAT_ID!;
+
+export const notifyTransfer = createEffect(
+ {
+ name: "notifyTransfer",
+ input: {
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ contract: S.string,
+ txHash: S.string,
+ },
+ rateLimit: { calls: 25, per: "second" }, // Telegram limits ~30 msg/sec globally
+ mode: "orderedAfterCommit", // human-readable feed → keep order
+ },
+ async ({ input, context }) => {
+ const text = [
+ `*New RETH Transfer Event*`,
+ `from: ${input.from}`,
+ `to: ${input.to}`,
+ `amount: ${formatUnits(input.value, 18)}`,
+ `RETH contract: ${input.contract}`,
+ `[etherscan](https://etherscan.io/tx/${input.txHash})`,
+ ].join("\n");
+
+ const res = await fetch(ENDPOINT, {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({
+ chat_id: CHAT_ID,
+ text,
+ parse_mode: "Markdown",
+ disable_web_page_preview: true,
+ }),
+ });
+ if (!res.ok) {
+ context.log.error(`Telegram failed: ${res.status} ${await res.text()}`);
+ throw new Error(`Telegram ${res.status}`);
+ }
+ }
+);
+
+const formatUnits = (value: bigint, decimals = 18) => {
+ const base = 10n ** BigInt(decimals);
+ const whole = value / base;
+ const frac = value % base;
+ if (frac === 0n) return whole.toString();
+ return `${whole}.${frac.toString().padStart(decimals, "0").replace(/0+$/, "")}`;
+};
+```
+
+### Call it from a handler
+
+The rindexer config…
+
+```yaml
+chat:
+ telegram:
+ - bot_token: ${ENVIO_TELEGRAM_BOT_TOKEN}
+ chat_id: -4223616270
+ messages:
+ - event_name: Transfer
+ filter_expression: "value >= 10 && value <= 2000000000000000000"
+ template_inline: |
+ *New RETH Transfer Event*
+ from: {{from}}
+ to: {{to}}
+ amount: {{format_value(value, 18)}}
+ RETH contract: {{transaction_information.address}}
+ [etherscan](https://etherscan.io/tx/{{transaction_information.transaction_hash}})
+```
+
+…becomes a regular handler with a template literal:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { notifyTransfer } from "../effects/telegram";
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { from, to, value } = event.params;
+
+ if (value < 10n || value > 2_000_000_000_000_000_000n) return;
+
+ context.effect(notifyTransfer, {
+ from,
+ to,
+ value,
+ contract: event.srcAddress,
+ txHash: event.transaction.hash,
+ });
+ },
+);
+```
+
+### Lower latency
+
+If a delay between the on-chain event and the Telegram message is a problem (e.g. you're driving an alerting bot), switch the effect to `mode: "ordered"` — the runtime fires it inline within the batch instead of waiting for the DB commit. The tradeoff: a failed batch may still produce a delivered message, so retries on the next run are duplicates.
+
+### Multiple chats / multiple alerts
+
+If you need to route to different chat IDs, define one effect per destination — each bakes in its own chat ID and message template — and pick the right one in the handler. A small factory keeps this DRY:
+
+```typescript title="src/effects/telegram.ts"
+const sendTo = (chatId: string, label: string) =>
+ createEffect(
+ {
+ name: `telegram:${label}`,
+ input: { value: S.bigint },
+ rateLimit: { calls: 25, per: "second" },
+ mode: "orderedAfterCommit",
+ },
+ async ({ input }) => {
+ await fetch(ENDPOINT, {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({
+ chat_id: chatId,
+ text: `${label}: ${formatUnits(input.value)} RETH`,
+ parse_mode: "Markdown",
+ }),
+ });
+ },
+ );
+
+export const whaleAlert = sendTo(process.env.ENVIO_TELEGRAM_WHALE_CHAT_ID!, "🐋 Whale");
+export const notableAlert = sendTo(process.env.ENVIO_TELEGRAM_NOTABLE_CHAT_ID!, "Heads up");
+```
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+if (value >= 1_000_000_000_000_000_000_000n) {
+ context.effect(whaleAlert, { value });
+} else if (value >= 100_000_000_000_000_000_000n) {
+ context.effect(notableAlert, { value });
+}
+```
+
+Keeping each destination in its own effect avoids passing `chatId` through `input` (which would defeat dedup and clutter the call site).
diff --git a/docs/HyperIndex/Chatbots/twilio.md b/docs/HyperIndex/Chatbots/twilio.md
new file mode 100644
index 00000000..9ffe6ded
--- /dev/null
+++ b/docs/HyperIndex/Chatbots/twilio.md
@@ -0,0 +1,132 @@
+---
+id: chatbots-twilio
+title: Twilio (SMS)
+sidebar_label: Twilio (SMS)
+slug: /chatbots/twilio
+description: Send SMS notifications from HyperIndex handlers using the Effect API and Twilio.
+---
+
+Send SMS messages via [Twilio's Programmable Messaging API](https://www.twilio.com/docs/sms). You can use the official SDK or hit the REST endpoint directly with `fetch`.
+
+### Account setup
+
+From the Twilio Console grab:
+
+- `ENVIO_TWILIO_ACCOUNT_SID`
+- `ENVIO_TWILIO_AUTH_TOKEN`
+- `ENVIO_TWILIO_FROM_NUMBER` (in E.164 format, e.g. `+15551234567`)
+
+### Installation (SDK)
+
+```bash
+pnpm add twilio
+```
+
+### Configure the client
+
+```typescript title="src/clients/twilio.ts"
+import twilio from "twilio";
+
+export const twilioClient = twilio(
+ process.env.ENVIO_TWILIO_ACCOUNT_SID!,
+ process.env.ENVIO_TWILIO_AUTH_TOKEN!
+);
+```
+
+### Define the effect
+
+From/to numbers are baked in. The handler hands the effect raw values; the SMS body is built inside.
+
+```typescript title="src/effects/twilio.ts"
+import { createEffect, S } from "envio";
+import { twilioClient } from "../clients/twilio";
+
+const FROM = process.env.ENVIO_TWILIO_FROM_NUMBER!;
+const TO = process.env.ENVIO_TWILIO_TO_NUMBER!;
+
+const formatUnits = (value: bigint, decimals = 18) => {
+ const base = 10n ** BigInt(decimals);
+ const whole = value / base;
+ const frac = value % base;
+ if (frac === 0n) return whole.toString();
+ return `${whole}.${frac.toString().padStart(decimals, "0").replace(/0+$/, "")}`;
+};
+
+export const notifyTransfer = createEffect(
+ {
+ name: "notifyTransfer",
+ input: { value: S.bigint },
+ rateLimit: { calls: 1, per: "second" }, // Twilio default trial = 1 msg/sec
+ mode: "orderedAfterCommit",
+ },
+ async ({ input }) => {
+ await twilioClient.messages.create({
+ from: FROM,
+ to: TO,
+ body: `Transfer detected: ${formatUnits(input.value)} RETH`,
+ });
+ }
+);
+```
+
+### Raw `fetch` alternative
+
+If you'd rather skip the SDK:
+
+```typescript
+const auth = Buffer.from(
+ `${process.env.ENVIO_TWILIO_ACCOUNT_SID}:${process.env.ENVIO_TWILIO_AUTH_TOKEN}`
+).toString("base64");
+
+await fetch(
+ `https://api.twilio.com/2010-04-01/Accounts/${process.env.ENVIO_TWILIO_ACCOUNT_SID}/Messages.json`,
+ {
+ method: "POST",
+ headers: {
+ Authorization: `Basic ${auth}`,
+ "Content-Type": "application/x-www-form-urlencoded",
+ },
+ body: new URLSearchParams({
+ From: process.env.ENVIO_TWILIO_FROM_NUMBER!,
+ To: input.to,
+ Body: input.body,
+ }),
+ }
+);
+```
+
+### Call it from a handler
+
+The rindexer config…
+
+```yaml
+chat:
+ twilio:
+ - account_sid: ${ENVIO_TWILIO_ACCOUNT_SID}
+ auth_token: ${ENVIO_TWILIO_AUTH_TOKEN}
+ from_number: ${ENVIO_TWILIO_FROM_NUMBER}
+ to_number: ${ENVIO_TWILIO_TO_NUMBER}
+ messages:
+ - event_name: Transfer
+ filter_expression: "value >= 10"
+ template_inline: "Transfer detected: {{value}} tokens"
+```
+
+…becomes:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { notifyTransfer } from "../effects/twilio";
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { value } = event.params;
+ if (value < 10n) return;
+
+ context.effect(notifyTransfer, { value });
+ },
+);
+```
+
+SMS is rate-limited and expensive — keep your conditions strict. For time-sensitive alerts, switch the effect to `mode: "ordered"` so the SMS is dispatched inline rather than after the DB commit.
diff --git a/docs/HyperIndex/Streams/cloudflare-queues.md b/docs/HyperIndex/Streams/cloudflare-queues.md
new file mode 100644
index 00000000..95679148
--- /dev/null
+++ b/docs/HyperIndex/Streams/cloudflare-queues.md
@@ -0,0 +1,99 @@
+---
+id: streams-cloudflare-queues
+title: Cloudflare Queues
+sidebar_label: Cloudflare Queues
+slug: /streams/cloudflare-queues
+description: Stream decoded events from HyperIndex to Cloudflare Queues using the Effect API.
+---
+
+Cloudflare Queues exposes a [REST API](https://developers.cloudflare.com/api/operations/queue-publish-message) for sending messages, so no SDK is required — just `fetch`.
+
+### Installation
+
+Nothing to install.
+
+### Define the effect
+
+Account ID, API token, and queue ID are static — bake them in. `input` is just event data.
+
+```typescript title="src/effects/cloudflare.ts"
+import { createEffect, S } from "envio";
+
+const QUEUE_ID = "blockchain-transfers";
+const ENDPOINT = `https://api.cloudflare.com/client/v4/accounts/${process.env.ENVIO_CLOUDFLARE_ACCOUNT_ID}/queues/${QUEUE_ID}/messages`;
+
+export const sendTransfer = createEffect(
+ {
+ name: "sendTransfer",
+ input: {
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ txHash: S.string,
+ chainId: S.number,
+ },
+ rateLimit: { calls: 100, per: "second" },
+ mode: "unorderedAfterCommit",
+ },
+ async ({ input, context }) => {
+ const res = await fetch(ENDPOINT, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ Authorization: `Bearer ${process.env.ENVIO_CLOUDFLARE_API_TOKEN}`,
+ },
+ body: JSON.stringify({
+ body: { ...input, value: input.value.toString() },
+ content_type: "json",
+ }),
+ });
+ if (!res.ok) {
+ context.log.error(`CF Queues failed: ${res.status} ${await res.text()}`);
+ throw new Error(`CF Queues ${res.status}`);
+ }
+ }
+);
+```
+
+### Call it from a handler
+
+The rindexer config…
+
+```yaml
+streams:
+ cloudflare_queues:
+ queues:
+ - queue_id: blockchain-transfers
+ events:
+ - event_name: Transfer
+ conditions:
+ - "value": ">=2000000000000000000"
+```
+
+…becomes:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { sendTransfer } from "../effects/cloudflare";
+
+const MIN = 2_000_000_000_000_000_000n;
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { from, to, value } = event.params;
+
+ if (value >= MIN) {
+ context.effect(sendTransfer, {
+ from,
+ to,
+ value,
+ txHash: event.transaction.hash,
+ chainId: context.chain.id,
+ });
+ }
+ },
+);
+```
+
+Use `orderedAfterCommit` if your queue consumer relies on per-batch ordering. For lower latency at the cost of commit-safe delivery, switch to `mode: "unordered"` (or `"ordered"`) — the message is sent inline within the batch.
diff --git a/docs/HyperIndex/Streams/index.md b/docs/HyperIndex/Streams/index.md
new file mode 100644
index 00000000..77887a49
--- /dev/null
+++ b/docs/HyperIndex/Streams/index.md
@@ -0,0 +1,47 @@
+---
+id: streams
+title: Streams
+sidebar_label: Overview
+slug: /streams
+description: Stream decoded blockchain events from HyperIndex to Webhooks, Kafka, RabbitMQ, SNS/SQS, Redis Streams, or Cloudflare Queues.
+---
+
+HyperIndex can publish decoded events to any external system from inside your handlers. Streams are not a separate subsystem — they're [Effects](/docs/HyperIndex/effect-api) running in `unorderedAfterCommit` or `orderedAfterCommit` mode, which means:
+
+- The send fires **after** the batch's database transaction commits, so a reorg or a failed batch never produces a phantom message.
+- You write **normal handler code** to decide what to send and when. There is no separate YAML for filters or routing — you call `context.effect(...)` inside an `if` block, just like any other branching logic.
+- You can install and configure any client SDK you want (`kafkajs`, `ioredis`, `amqplib`, `@aws-sdk/client-sns`, …) or just use raw `fetch`. The Effect API only cares about the function body.
+
+### Comparison with rindexer
+
+| rindexer | HyperIndex |
+| --- | --- |
+| YAML `streams:` block per contract | A `createEffect({ mode: "unorderedAfterCommit" \| "orderedAfterCommit" })` definition |
+| `conditions:` mini-language with `>=`, `&&`, `\|\|` | Plain TypeScript `if (...)` in your handler |
+| `template_inline` Mustache-style templates | Template literals / any string-building code you want |
+| Per-provider client baked into the binary | Use any npm package, or raw `fetch` |
+| Fires per event, no DB-commit guarantee | Fires after DB commit, so streams reflect persisted state |
+
+### Choosing a mode
+
+Pick by latency vs. delivery semantics:
+
+- **`unorderedAfterCommit`** — fire-and-forget, parallel dispatch, **after** the DB commits. Best for partitioned destinations (Kafka with a partition key, Redis Streams keyed by tx hash, generic webhooks). Highest throughput; messages only fire for state that actually persisted.
+- **`orderedAfterCommit`** — preserves the order in which `context.effect(...)` was called across the batch, dispatched **after** the DB commits. Use for single-stream destinations like a Telegram chat, a Slack channel, or a single Kafka partition.
+- **`unordered`** *(lower latency)* — same as `unorderedAfterCommit` but fires **inline within the batch**, in parallel, returning a value. Skip this if your stream must never see a message for a state that was rolled back; pick it if you need the fastest possible push (no wait for DB commit) and your downstream tolerates duplicates on retry.
+- **`ordered`** *(lower latency)* — same as `orderedAfterCommit` but fires inline, sequentially, returning a value. Useful when a handler needs the response (e.g. a stream-side ID) before continuing, and you're willing to accept that a failed batch may still produce a delivered message.
+
+Rule of thumb: **start with `*AfterCommit`**. Drop down to `unordered` / `ordered` only when you've measured commit latency as the bottleneck and your consumer is idempotent.
+
+### Supported tools
+
+Each tool has a dedicated page with installation, configuration, and a full handler example:
+
+- [Webhooks](/docs/HyperIndex/streams/webhooks)
+- [Kafka](/docs/HyperIndex/streams/kafka)
+- [RabbitMQ](/docs/HyperIndex/streams/rabbitmq)
+- [AWS SNS / SQS](/docs/HyperIndex/streams/sns-sqs)
+- [Redis Streams](/docs/HyperIndex/streams/redis)
+- [Cloudflare Queues](/docs/HyperIndex/streams/cloudflare-queues)
+
+Need something else? Anything reachable from Node — gRPC, NATS, Postgres LISTEN/NOTIFY, GCP Pub/Sub, a custom HTTP service — works the same way: install the client, call it from inside an effect.
diff --git a/docs/HyperIndex/Streams/kafka.md b/docs/HyperIndex/Streams/kafka.md
new file mode 100644
index 00000000..fc29485f
--- /dev/null
+++ b/docs/HyperIndex/Streams/kafka.md
@@ -0,0 +1,141 @@
+---
+id: streams-kafka
+title: Kafka
+sidebar_label: Kafka
+slug: /streams/kafka
+description: Stream decoded events from HyperIndex to Apache Kafka using the Effect API.
+---
+
+Publish decoded events to one or more Kafka topics. Use a partition key to fan out across partitions (with `unorderedAfterCommit`) or pin the topic to a single partition for strict ordering (with `orderedAfterCommit`).
+
+### Installation
+
+```bash
+pnpm add kafkajs
+```
+
+### Configure the client
+
+```typescript title="src/clients/kafka.ts"
+import { Kafka, Partitioners } from "kafkajs";
+
+export const kafka = new Kafka({
+ clientId: "envio-indexer",
+ brokers: (process.env.ENVIO_KAFKA_BROKERS ?? "").split(","),
+ ssl: true,
+ sasl: {
+ mechanism: "plain",
+ username: process.env.ENVIO_KAFKA_USERNAME!,
+ password: process.env.ENVIO_KAFKA_PASSWORD!,
+ },
+});
+
+export const producer = kafka.producer({
+ createPartitioner: Partitioners.DefaultPartitioner,
+ allowAutoTopicCreation: false,
+});
+
+let connected: Promise | undefined;
+export const ensureConnected = () => (connected ??= producer.connect());
+```
+
+### Define the effect
+
+Topic name is baked into the effect — `input` carries only the partition key and the per-event payload. The effect serialises to JSON internally; the handler passes raw values.
+
+```typescript title="src/effects/kafka.ts"
+import { createEffect, S } from "envio";
+import { ensureConnected, producer } from "../clients/kafka";
+
+const TOPIC = "rocketpool.transfers";
+
+export const publishTransfer = createEffect(
+ {
+ name: "publishTransfer",
+ input: {
+ key: S.string, // partition key
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ txHash: S.string,
+ blockNumber: S.number,
+ chainId: S.number,
+ },
+ rateLimit: { calls: 200, per: "second" },
+ // unordered = parallel dispatch, partition order preserved by `key`
+ mode: "unorderedAfterCommit",
+ },
+ async ({ input }) => {
+ await ensureConnected();
+ const { key, value, ...rest } = input;
+ await producer.send({
+ topic: TOPIC,
+ acks: -1, // "all"
+ messages: [
+ {
+ key,
+ value: JSON.stringify({ ...rest, value: value.toString() }),
+ },
+ ],
+ });
+ }
+);
+```
+
+### Call it from a handler
+
+The rindexer config…
+
+```yaml
+streams:
+ kafka:
+ topics:
+ - topic: test-topic
+ key: my-routing-key
+ events:
+ - event_name: Transfer
+ conditions:
+ - "value": ">=2000000000000000000"
+ - "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662"
+```
+
+…becomes:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { publishTransfer } from "../effects/kafka";
+
+const WHALE = "0x0338ce5020c447f7e668dc2ef778025ce3982662";
+const MIN = 2_000_000_000_000_000_000n;
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { from, to, value } = event.params;
+
+ if (from.toLowerCase() === WHALE && value >= MIN) {
+ context.effect(publishTransfer, {
+ key: from, // partition by sender so per-sender order is preserved
+ from,
+ to,
+ value,
+ txHash: event.transaction.hash,
+ blockNumber: event.block.number,
+ chainId: context.chain.id,
+ });
+ }
+ },
+);
+```
+
+### Mode picker
+
+| You need | Mode |
+| --- | --- |
+| High throughput, partition by key | `unorderedAfterCommit` |
+| Strict global order across the whole topic | `orderedAfterCommit` (and a single-partition topic) |
+| Lowest latency, idempotent consumer | `unordered` (parallel, inline) or `ordered` (sequential, inline) — fires before the DB commit |
+
+### Raw `fetch` alternative
+
+If you'd rather not pull in `kafkajs`, point the effect at a Kafka REST proxy (e.g. Confluent REST Proxy) and POST JSON instead.
diff --git a/docs/HyperIndex/Streams/rabbitmq.md b/docs/HyperIndex/Streams/rabbitmq.md
new file mode 100644
index 00000000..ec6d35c6
--- /dev/null
+++ b/docs/HyperIndex/Streams/rabbitmq.md
@@ -0,0 +1,113 @@
+---
+id: streams-rabbitmq
+title: RabbitMQ
+sidebar_label: RabbitMQ
+slug: /streams/rabbitmq
+description: Stream decoded events from HyperIndex to RabbitMQ exchanges using the Effect API.
+---
+
+Publish decoded events to a RabbitMQ exchange — `direct`, `topic`, or `fanout` — using `amqplib`.
+
+### Installation
+
+```bash
+pnpm add amqplib
+pnpm add -D @types/amqplib
+```
+
+### Configure the client
+
+```typescript title="src/clients/rabbitmq.ts"
+import amqp from "amqplib";
+
+let chanPromise: Promise | undefined;
+
+export const getChannel = () =>
+ (chanPromise ??= (async () => {
+ const conn = await amqp.connect(process.env.ENVIO_RABBITMQ_URL!);
+ const ch = await conn.createChannel();
+ await ch.assertExchange("transfer", "direct", { durable: true });
+ return ch;
+ })());
+```
+
+### Define the effect
+
+Exchange and routing key are static config — bake them in. `input` is just the per-event values.
+
+```typescript title="src/effects/rabbitmq.ts"
+import { createEffect, S } from "envio";
+import { getChannel } from "../clients/rabbitmq";
+
+const EXCHANGE = "transfer";
+const ROUTING_KEY = "my-routing-key";
+
+export const publishTransfer = createEffect(
+ {
+ name: "publishTransfer",
+ input: {
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ txHash: S.string,
+ },
+ rateLimit: { calls: 200, per: "second" },
+ mode: "unorderedAfterCommit",
+ },
+ async ({ input }) => {
+ const ch = await getChannel();
+ const body = JSON.stringify({ ...input, value: input.value.toString() });
+ ch.publish(EXCHANGE, ROUTING_KEY, Buffer.from(body), {
+ contentType: "application/json",
+ persistent: true,
+ });
+ }
+);
+```
+
+### Call it from a handler
+
+The rindexer config…
+
+```yaml
+streams:
+ rabbitmq:
+ exchanges:
+ - exchange: transfer
+ exchange_type: direct
+ routing_key: my-routing-key
+ events:
+ - event_name: Transfer
+ conditions:
+ - "value": ">=2000000000000000000 && value <=4000000000000000000"
+ - "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662"
+```
+
+…becomes:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { publishTransfer } from "../effects/rabbitmq";
+
+const WHALE = "0x0338ce5020c447f7e668dc2ef778025ce3982662";
+const MIN = 2_000_000_000_000_000_000n;
+const MAX = 4_000_000_000_000_000_000n;
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { from, to, value } = event.params;
+
+ if (from.toLowerCase() === WHALE && value >= MIN && value <= MAX) {
+ context.effect(publishTransfer, {
+ from,
+ to,
+ value,
+ txHash: event.transaction.hash,
+ });
+ }
+ },
+);
+```
+
+Pick `orderedAfterCommit` if your consumer needs strict per-batch ordering. For `fanout` exchanges, leave `routingKey` as an empty string. If your consumer is idempotent and you want lower latency, switch the effect to `mode: "unordered"` (or `"ordered"`) to publish inline before the DB commit.
diff --git a/docs/HyperIndex/Streams/redis.md b/docs/HyperIndex/Streams/redis.md
new file mode 100644
index 00000000..2e65c426
--- /dev/null
+++ b/docs/HyperIndex/Streams/redis.md
@@ -0,0 +1,99 @@
+---
+id: streams-redis
+title: Redis Streams
+sidebar_label: Redis Streams
+slug: /streams/redis
+description: Stream decoded events from HyperIndex to Redis Streams using the Effect API.
+---
+
+Push decoded events into a Redis Stream with `XADD`. Pair with consumer groups on the read side for at-least-once processing.
+
+### Installation
+
+```bash
+pnpm add ioredis
+```
+
+### Configure the client
+
+```typescript title="src/clients/redis.ts"
+import Redis from "ioredis";
+
+export const redis = new Redis(process.env.ENVIO_REDIS_CONNECTION_URI!);
+```
+
+### Define the effect
+
+Stream name is static — bake it in. `input` is just event data.
+
+```typescript title="src/effects/redis.ts"
+import { createEffect, S } from "envio";
+import { redis } from "../clients/redis";
+
+const STREAM = "ethereum_rocketpool_transfer_stream";
+
+export const xaddTransfer = createEffect(
+ {
+ name: "xaddTransfer",
+ input: {
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ txHash: S.string,
+ blockNumber: S.number,
+ },
+ rateLimit: { calls: 500, per: "second" },
+ mode: "unorderedAfterCommit",
+ },
+ async ({ input }) => {
+ const data = JSON.stringify({ ...input, value: input.value.toString() });
+ await redis.xadd(STREAM, "*", "data", data);
+ }
+);
+```
+
+### Call it from a handler
+
+The rindexer config…
+
+```yaml
+streams:
+ redis:
+ streams:
+ - stream_name: "ethereum_rocketpool_transfer_stream"
+ events:
+ - event_name: Transfer
+ conditions:
+ - "value": ">=2000000000000000000 && value <=4000000000000000000"
+ - "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662"
+```
+
+…becomes:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { xaddTransfer } from "../effects/redis";
+
+const WHALE = "0x0338ce5020c447f7e668dc2ef778025ce3982662";
+const MIN = 2_000_000_000_000_000_000n;
+const MAX = 4_000_000_000_000_000_000n;
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { from, to, value } = event.params;
+
+ if (from.toLowerCase() === WHALE && value >= MIN && value <= MAX) {
+ context.effect(xaddTransfer, {
+ from,
+ to,
+ value,
+ txHash: event.transaction.hash,
+ blockNumber: event.block.number,
+ });
+ }
+ },
+);
+```
+
+Redis Streams already give you total ordering per stream (the `*` ID is monotonically increasing), so `unorderedAfterCommit` is fine here — concurrent `XADD`s within a batch will simply interleave by arrival time. Use `orderedAfterCommit` only if downstream consumers expect strict handler-order across the batch. For lowest latency, switch to `mode: "unordered"` so the `XADD` runs inline within the batch — fastest, but a failed batch can leave entries in the stream that no longer reflect committed state.
diff --git a/docs/HyperIndex/Streams/sns-sqs.md b/docs/HyperIndex/Streams/sns-sqs.md
new file mode 100644
index 00000000..8437ce3e
--- /dev/null
+++ b/docs/HyperIndex/Streams/sns-sqs.md
@@ -0,0 +1,144 @@
+---
+id: streams-sns-sqs
+title: AWS SNS / SQS
+sidebar_label: AWS SNS / SQS
+slug: /streams/sns-sqs
+description: Stream decoded events from HyperIndex to AWS SNS topics or SQS queues using the Effect API.
+---
+
+Publish to an SNS topic (pub/sub fan-out) or directly to an SQS queue. Both use the official AWS SDK.
+
+### Installation
+
+```bash
+# SNS
+pnpm add @aws-sdk/client-sns
+# SQS
+pnpm add @aws-sdk/client-sqs
+```
+
+### Configure the client
+
+```typescript title="src/clients/aws.ts"
+import { SNSClient } from "@aws-sdk/client-sns";
+import { SQSClient } from "@aws-sdk/client-sqs";
+
+const region = process.env.ENVIO_AWS_REGION ?? "us-east-1";
+const credentials = {
+ accessKeyId: process.env.ENVIO_AWS_ACCESS_KEY_ID!,
+ secretAccessKey: process.env.ENVIO_AWS_SECRET_ACCESS_KEY!,
+ ...(process.env.ENVIO_AWS_SESSION_TOKEN
+ ? { sessionToken: process.env.ENVIO_AWS_SESSION_TOKEN }
+ : {}),
+};
+
+export const sns = new SNSClient({ region, credentials });
+export const sqs = new SQSClient({ region, credentials });
+```
+
+### Define the effects
+
+Topic ARN / queue URL are static — bake them in. `input` carries only event data.
+
+```typescript title="src/effects/sns.ts"
+import { PublishCommand } from "@aws-sdk/client-sns";
+import { SendMessageCommand } from "@aws-sdk/client-sqs";
+import { createEffect, S } from "envio";
+import { sns, sqs } from "../clients/aws";
+
+const TOPIC_ARN = process.env.ENVIO_SNS_TOPIC_ARN!;
+
+export const publishTransferSns = createEffect(
+ {
+ name: "publishTransferSns",
+ input: {
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ txHash: S.string,
+ },
+ rateLimit: { calls: 100, per: "second" },
+ mode: "unorderedAfterCommit",
+ },
+ async ({ input }) => {
+ await sns.send(
+ new PublishCommand({
+ TopicArn: TOPIC_ARN,
+ Message: JSON.stringify({ ...input, value: input.value.toString() }),
+ })
+ );
+ }
+);
+
+const QUEUE_URL = process.env.ENVIO_SQS_QUEUE_URL!;
+
+export const publishTransferSqs = createEffect(
+ {
+ name: "publishTransferSqs",
+ input: {
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ txHash: S.string,
+ groupId: S.optional(S.string), // for FIFO queues
+ },
+ rateLimit: { calls: 100, per: "second" },
+ mode: "unorderedAfterCommit",
+ },
+ async ({ input }) => {
+ const { groupId, value, ...rest } = input;
+ await sqs.send(
+ new SendMessageCommand({
+ QueueUrl: QUEUE_URL,
+ MessageBody: JSON.stringify({ ...rest, value: value.toString() }),
+ MessageGroupId: groupId,
+ })
+ );
+ }
+);
+```
+
+### Call it from a handler
+
+The rindexer config…
+
+```yaml
+streams:
+ sns:
+ topics:
+ - topic_arn: "arn:aws:sns:us-east-1:664643779377:test"
+ events:
+ - event_name: Transfer
+ conditions:
+ - "value": ">=2000000000000000000 && value <=4000000000000000000"
+ - "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662"
+```
+
+…becomes:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { publishTransferSns } from "../effects/sns";
+
+const WHALE = "0x0338ce5020c447f7e668dc2ef778025ce3982662";
+const MIN = 2_000_000_000_000_000_000n;
+const MAX = 4_000_000_000_000_000_000n;
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { from, to, value } = event.params;
+
+ if (from.toLowerCase() === WHALE && value >= MIN && value <= MAX) {
+ context.effect(publishTransferSns, {
+ from,
+ to,
+ value,
+ txHash: event.transaction.hash,
+ });
+ }
+ },
+);
+```
+
+For SQS FIFO queues, switch the effect to `orderedAfterCommit` and set a `groupId` (e.g. the contract address) — the runtime preserves handler order, and SQS preserves order per `MessageGroupId`. If your consumer is idempotent and you want the message out before the DB commit, use `mode: "unordered"` or `"ordered"` for lower latency.
diff --git a/docs/HyperIndex/Streams/webhooks.md b/docs/HyperIndex/Streams/webhooks.md
new file mode 100644
index 00000000..cb6e3ba3
--- /dev/null
+++ b/docs/HyperIndex/Streams/webhooks.md
@@ -0,0 +1,105 @@
+---
+id: streams-webhooks
+title: Webhooks
+sidebar_label: Webhooks
+slug: /streams/webhooks
+description: Stream decoded events from HyperIndex to a webhook endpoint using the Effect API.
+---
+
+Send decoded events to any HTTP endpoint when on-chain conditions are met. No extra dependency required — `fetch` is built into Node.
+
+### Installation
+
+Nothing to install. Use the global `fetch`.
+
+### Define the effect
+
+URL, shared secret, and the `eventName` constant are baked into the effect body — only the values that vary per call go in `input`.
+
+```typescript title="src/effects/webhook.ts"
+import { createEffect, S } from "envio";
+
+export const notifyTransfer = createEffect(
+ {
+ name: "notifyTransfer",
+ input: {
+ from: S.string,
+ to: S.string,
+ value: S.bigint,
+ txHash: S.string,
+ blockNumber: S.number,
+ chainId: S.number,
+ },
+ rateLimit: { calls: 25, per: "second" },
+ mode: "unorderedAfterCommit",
+ },
+ async ({ input, context }) => {
+ const res = await fetch(process.env.ENVIO_WEBHOOK_URL!, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ "x-envio-shared-secret": process.env.ENVIO_WEBHOOK_SHARED_SECRET ?? "",
+ },
+ body: JSON.stringify({
+ event: "RocketPoolTransfer",
+ ...input,
+ value: input.value.toString(),
+ }),
+ });
+ if (!res.ok) {
+ context.log.error(`Webhook failed: ${res.status} ${res.statusText}`);
+ throw new Error(`Webhook ${res.status}`);
+ }
+ }
+);
+```
+
+### Call it from a handler
+
+The rindexer YAML below…
+
+```yaml
+events:
+ - event_name: Transfer
+ conditions:
+ - "value": ">=2000000000000000000 && value <=4000000000000000000"
+ - "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662"
+```
+
+…becomes plain TypeScript:
+
+```typescript title="src/handlers/RocketPoolETH.ts"
+import { indexer } from "envio";
+import { notifyTransfer } from "../effects/webhook";
+
+const WHALE = "0x0338ce5020c447f7e668dc2ef778025ce3982662";
+const MIN = 2_000_000_000_000_000_000n;
+const MAX = 4_000_000_000_000_000_000n;
+
+indexer.onEvent(
+ { contract: "RocketPoolETH", event: "Transfer" },
+ async ({ event, context }) => {
+ const { from, to, value } = event.params;
+
+ if (from.toLowerCase() === WHALE && value >= MIN && value <= MAX) {
+ context.effect(notifyTransfer, {
+ from,
+ to,
+ value,
+ txHash: event.transaction.hash,
+ blockNumber: event.block.number,
+ chainId: context.chain.id,
+ });
+ }
+ },
+);
+```
+
+Because the effect is `unorderedAfterCommit`, the handler returns immediately — the runtime dispatches the webhook in parallel after the batch's DB commit. You only see a delivered request for state that actually persisted.
+
+### Tips
+
+- Switch to `mode: "orderedAfterCommit"` if your downstream consumer requires per-batch ordering.
+- Need lower latency than `*AfterCommit`? Use `mode: "unordered"` (or `"ordered"`) to fire inline within the batch — same parallel/sequential semantics, but the call returns a value and runs before the DB commit. Trades commit-safe delivery for speed; pick it only if your endpoint is idempotent.
+- Set `cache: true` on the effect if you also want to skip duplicates across full indexer reruns. By default the effect fires every run.
+- Use the rate-limit option to stay under your endpoint's throughput cap.
diff --git a/docusaurus.config.js b/docusaurus.config.js
index a1da3c31..1c6f6f3e 100644
--- a/docusaurus.config.js
+++ b/docusaurus.config.js
@@ -621,7 +621,7 @@ This file contains links to documentation sections following the llmstxt.org sta
id: "HyperIndexV2",
path: "docs/HyperIndexV2",
routeBasePath: "docs/v2/HyperIndex",
- sidebarPath: require.resolve("./sidebarsHyperIndex.js"),
+ sidebarPath: require.resolve("./sidebarsHyperIndexV2.js"),
editUrl: "https://github.com/enviodev/docs/edit/main/",
showLastUpdateAuthor: false,
showLastUpdateTime: false,
diff --git a/sidebarsHyperIndex.js b/sidebarsHyperIndex.js
index a9d8a546..b58fbaa7 100644
--- a/sidebarsHyperIndex.js
+++ b/sidebarsHyperIndex.js
@@ -28,11 +28,10 @@ if (process.env.DOCS_FOR_LLM === "true") {
const networksSection = {
type: "category",
label: "Supported Networks",
- link: {
- type: "doc",
- id: "supported-networks/index",
- },
- items: filteredNetworks,
+ items: [
+ { type: "doc", id: "supported-networks/index", label: "Overview" },
+ ...filteredNetworks,
+ ],
};
module.exports = {
@@ -79,6 +78,32 @@ module.exports = {
// "Examples/example-ens",
],
},
+ {
+ type: "category",
+ label: "Streams",
+ items: [
+ { type: "doc", id: "Streams/streams", label: "Overview" },
+ "Streams/streams-webhooks",
+ "Streams/streams-kafka",
+ "Streams/streams-rabbitmq",
+ "Streams/streams-sns-sqs",
+ "Streams/streams-redis",
+ "Streams/streams-cloudflare-queues",
+ ],
+ },
+ {
+ type: "category",
+ label: "Chat Bots",
+ items: [
+ { type: "doc", id: "Chatbots/chatbots", label: "Overview" },
+ "Chatbots/chatbots-telegram",
+ "Chatbots/chatbots-discord",
+ "Chatbots/chatbots-slack",
+ "Chatbots/chatbots-twilio",
+ "Chatbots/chatbots-pagerduty",
+ "Chatbots/chatbots-opsgenie",
+ ],
+ },
{
type: "category",
label: "Envio Cloud",
diff --git a/sidebarsHyperIndexV2.js b/sidebarsHyperIndexV2.js
new file mode 100644
index 00000000..7cc3b954
--- /dev/null
+++ b/sidebarsHyperIndexV2.js
@@ -0,0 +1,17 @@
+// Sidebar for HyperIndexV2. Reuses the V3 sidebar but strips categories that
+// only exist in V3 (e.g. Streams, Chat Bots).
+const v3Sidebar = require("./sidebarsHyperIndex.js");
+
+const V3_ONLY_LABELS = new Set(["Streams", "Chat Bots"]);
+
+const stripV3Only = (items) =>
+ items.filter((item) => {
+ if (typeof item === "object" && item !== null && item.type === "category") {
+ return !V3_ONLY_LABELS.has(item.label);
+ }
+ return true;
+ });
+
+module.exports = {
+ someSidebar: stripV3Only(v3Sidebar.someSidebar),
+};