diff --git a/engine/artifacts/config-schema.json b/engine/artifacts/config-schema.json index dc71d7c010..337d476a4a 100644 --- a/engine/artifacts/config-schema.json +++ b/engine/artifacts/config-schema.json @@ -270,13 +270,6 @@ } ] }, - "provision_users": { - "default": {}, - "type": "object", - "additionalProperties": { - "$ref": "#/definitions/ClickHouseUser" - } - }, "secure": { "default": false, "type": "boolean" @@ -287,34 +280,6 @@ }, "additionalProperties": false }, - "ClickHouseUser": { - "type": "object", - "required": [ - "password", - "role", - "username" - ], - "properties": { - "password": { - "$ref": "#/definitions/Secret" - }, - "role": { - "$ref": "#/definitions/ClickHouseUserRole" - }, - "username": { - "type": "string" - } - }, - "additionalProperties": false - }, - "ClickHouseUserRole": { - "type": "string", - "enum": [ - "Admin", - "Write", - "ReadOnly" - ] - }, "Datacenter": { "type": "object", "required": [ diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts index 5ee3f7b440..a60304bc9e 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts @@ -37,6 +37,13 @@ import { rawHttpRequestPropertiesActor } from "./raw-http-request-properties"; import { rawWebSocketActor, rawWebSocketBinaryActor } from "./raw-websocket"; import { requestAccessActor } from "./request-access"; import { rejectConnectionActor } from "./reject-connection"; +import { + runWithError, + runWithEarlyExit, + runWithoutHandler, + runWithQueueConsumer, + runWithTicks, +} from "./run"; import { scheduled } from "./scheduled"; import { sleep, @@ -127,5 +134,11 @@ export const registry = setup({ // From large-payloads.ts largePayloadActor, largePayloadConnActor, + // From run.ts + runWithTicks, + runWithQueueConsumer, + runWithEarlyExit, + runWithError, + runWithoutHandler, }, }); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/run.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/run.ts new file mode 100644 index 0000000000..9d1fa24463 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/run.ts @@ -0,0 +1,165 @@ +import { actor } from "rivetkit"; +import type { registry } from "./registry"; + +export const RUN_SLEEP_TIMEOUT = 500; + +// Actor that tracks tick counts and respects abort signal +export const runWithTicks = actor({ + state: { + tickCount: 0, + lastTickAt: 0, + runStarted: false, + runExited: false, + }, + run: async (c) => { + c.state.runStarted = true; + c.log.info("run handler started"); + + while (!c.abortSignal.aborted) { + c.state.tickCount += 1; + c.state.lastTickAt = Date.now(); + c.log.info({ msg: "tick", tickCount: c.state.tickCount }); + + // Wait 50ms between ticks, or exit early if aborted + await new Promise((resolve) => { + const timeout = setTimeout(resolve, 50); + c.abortSignal.addEventListener( + "abort", + () => { + clearTimeout(timeout); + resolve(); + }, + { once: true }, + ); + }); + } + + c.state.runExited = true; + c.log.info("run handler exiting gracefully"); + }, + actions: { + getState: (c) => ({ + tickCount: c.state.tickCount, + lastTickAt: c.state.lastTickAt, + runStarted: c.state.runStarted, + runExited: c.state.runExited, + }), + }, + options: { + sleepTimeout: RUN_SLEEP_TIMEOUT, + runStopTimeout: 1000, + }, +}); + +// Actor that consumes from a queue in the run handler +export const runWithQueueConsumer = actor({ + state: { + messagesReceived: [] as Array<{ name: string; body: unknown }>, + runStarted: false, + }, + run: async (c) => { + c.state.runStarted = true; + c.log.info("run handler started, waiting for messages"); + + while (!c.abortSignal.aborted) { + const message = await c.queue.next("messages", { timeout: 100 }); + if (message) { + c.log.info({ msg: "received message", body: message.body }); + c.state.messagesReceived.push({ + name: message.name, + body: message.body, + }); + } + } + + c.log.info("run handler exiting gracefully"); + }, + actions: { + getState: (c) => ({ + messagesReceived: c.state.messagesReceived, + runStarted: c.state.runStarted, + }), + sendMessage: async (c, body: unknown) => { + const client = c.client(); + const handle = client.runWithQueueConsumer.getForId(c.actorId); + await handle.queue.messages.send(body); + return true; + }, + }, + options: { + sleepTimeout: RUN_SLEEP_TIMEOUT, + runStopTimeout: 1000, + }, +}); + +// Actor that exits the run handler after a short delay to test crash behavior +export const runWithEarlyExit = actor({ + state: { + runStarted: false, + destroyCalled: false, + }, + run: async (c) => { + c.state.runStarted = true; + c.log.info("run handler started, will exit after delay"); + // Wait a bit so we can observe the runStarted state before exit + await new Promise((resolve) => setTimeout(resolve, 200)); + c.log.info("run handler exiting early"); + // Exit without respecting abort signal + }, + onDestroy: (c) => { + c.state.destroyCalled = true; + }, + actions: { + getState: (c) => ({ + runStarted: c.state.runStarted, + destroyCalled: c.state.destroyCalled, + }), + }, + options: { + sleepTimeout: RUN_SLEEP_TIMEOUT, + }, +}); + +// Actor that throws an error in the run handler to test crash behavior +export const runWithError = actor({ + state: { + runStarted: false, + destroyCalled: false, + }, + run: async (c) => { + c.state.runStarted = true; + c.log.info("run handler started, will throw error"); + await new Promise((resolve) => setTimeout(resolve, 50)); + throw new Error("intentional error in run handler"); + }, + onDestroy: (c) => { + c.state.destroyCalled = true; + }, + actions: { + getState: (c) => ({ + runStarted: c.state.runStarted, + destroyCalled: c.state.destroyCalled, + }), + }, + options: { + sleepTimeout: RUN_SLEEP_TIMEOUT, + }, +}); + +// Actor without a run handler for comparison +export const runWithoutHandler = actor({ + state: { + wakeCount: 0, + }, + onWake: (c) => { + c.state.wakeCount += 1; + }, + actions: { + getState: (c) => ({ + wakeCount: c.state.wakeCount, + }), + }, + options: { + sleepTimeout: RUN_SLEEP_TIMEOUT, + }, +}); diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index 836f31bbe1..2a33749a6b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -13,6 +13,7 @@ import type { DestroyContext, DisconnectContext, RequestContext, + RunContext, SleepContext, StateChangeContext, WakeContext, @@ -52,6 +53,7 @@ export const ActorConfigSchema = z onDestroy: zFunction().optional(), onWake: zFunction().optional(), onSleep: zFunction().optional(), + run: zFunction().optional(), onStateChange: zFunction().optional(), onBeforeConnect: zFunction().optional(), onConnect: zFunction().optional(), @@ -79,6 +81,8 @@ export const ActorConfigSchema = z actionTimeout: z.number().positive().default(60_000), // Max time to wait for waitUntil background promises during shutdown waitUntilTimeout: z.number().positive().default(15_000), + // Max time to wait for run handler to stop during shutdown + runStopTimeout: z.number().positive().default(15_000), connectionLivenessTimeout: z.number().positive().default(2500), connectionLivenessInterval: z.number().positive().default(5000), noSleep: z.boolean().default(false), @@ -319,6 +323,34 @@ interface BaseActorConfig< >, ) => void | Promise; + /** + * Called after the actor starts up. Does not block actor startup. + * + * Use this for background tasks like: + * - Reading from queues in a loop + * - Tick loops for periodic work + * - Custom workflow logic + * + * The handler receives an abort signal via `c.abortSignal` that fires + * when the actor is stopping. You should use this to gracefully exit. + * + * If this handler exits or throws, the actor will crash and reschedule. + * On shutdown, the actor waits for this handler to complete with a + * configurable timeout (options.runStopTimeout, default 15s). + * + * @returns Void or a Promise. If the promise exits, the actor crashes. + */ + run?: ( + c: RunContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ) => void | Promise; + /** * Called when the actor's state changes. * @@ -498,6 +530,8 @@ export type ActorConfig< | "onCreate" | "onDestroy" | "onWake" + | "onSleep" + | "run" | "onStateChange" | "onBeforeConnect" | "onConnect" @@ -559,6 +593,7 @@ export type ActorConfigInput< | "onDestroy" | "onWake" | "onSleep" + | "run" | "onStateChange" | "onBeforeConnect" | "onConnect" @@ -672,6 +707,12 @@ export const DocActorOptionsSchema = z .describe( "Max time in ms to wait for waitUntil background promises during shutdown. Default: 15000", ), + runStopTimeout: z + .number() + .optional() + .describe( + "Max time in ms to wait for run handler to stop during shutdown. Default: 15000", + ), connectionLivenessTimeout: z .number() .optional() @@ -779,6 +820,12 @@ export const DocActorConfigSchema = z .describe( "Called when the actor is stopping or sleeping. Use to clean up resources.", ), + run: z + .unknown() + .optional() + .describe( + "Called after actor starts. Does not block startup. Use for background tasks like queue processing or tick loops. If it exits or throws, the actor crashes.", + ), onStateChange: z .unknown() .optional() diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/index.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/index.ts index 93250d0bea..618447b5a3 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/index.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/index.ts @@ -23,6 +23,7 @@ export { CreateVarsContext, type CreateVarsContextOf } from "./create-vars"; export { DestroyContext, type DestroyContextOf } from "./destroy"; export { DisconnectContext, type DisconnectContextOf } from "./disconnect"; export { RequestContext, type RequestContextOf } from "./request"; +export { RunContext, type RunContextOf } from "./run"; export { SleepContext, type SleepContextOf } from "./sleep"; export { StateChangeContext, diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/run.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/run.ts new file mode 100644 index 0000000000..63aa2a3ad1 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/run.ts @@ -0,0 +1,40 @@ +import type { AnyDatabaseProvider } from "../database"; +import type { ActorDefinition, AnyActorDefinition } from "../definition"; +import { ActorContext } from "./base/actor"; + +/** + * Context for the run lifecycle hook. + * + * This context is passed to the `run` handler which executes after the actor + * starts. It does not block actor startup and is intended for background tasks. + * + * Use `c.abortSignal` to detect when the actor is stopping and gracefully exit. + */ +export class RunContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +> extends ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase +> {} + +export type RunContextOf = + AD extends ActorDefinition< + infer S, + infer CP, + infer CS, + infer V, + infer I, + infer DB extends AnyDatabaseProvider, + any + > + ? RunContext + : never; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index 7960aec9a5..24c7b6d55e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -134,6 +134,7 @@ export class ActorInstance { // MARK: - Background Tasks #backgroundPromises: Promise[] = []; + #runPromise?: Promise; // MARK: - HTTP/WebSocket Tracking #activeHonoHttpRequests = 0; @@ -325,6 +326,9 @@ export class ActorInstance { // timer this.resetSleepTimer(); + // Start run handler in background (does not block startup) + this.#startRunHandler(); + // Trigger any pending alarms await this.onAlarm(); } @@ -363,6 +367,9 @@ export class ActorInstance { this.#abortController.abort(); } catch {} + // Wait for run handler to complete + await this.#waitForRunHandler(this.#config.options.runStopTimeout); + // Call onStop lifecycle if (mode === "sleep") { await this.#callOnSleep(); @@ -907,6 +914,57 @@ export class ActorInstance { } } + #startRunHandler() { + if (!this.#config.run) return; + + this.#rLog.debug({ msg: "starting run handler" }); + + const runResult = this.#config.run(this.actorContext); + + if (runResult instanceof Promise) { + this.#runPromise = runResult + .then(() => { + // Run handler exited normally - this should crash the actor + this.#rLog.warn({ + msg: "run handler exited unexpectedly, crashing actor to reschedule", + }); + this.startDestroy(); + }) + .catch((error) => { + // Run handler threw an error - crash the actor + this.#rLog.error({ + msg: "run handler threw error, crashing actor to reschedule", + error: stringifyError(error), + }); + this.startDestroy(); + }); + } + } + + async #waitForRunHandler(timeoutMs: number) { + if (!this.#runPromise) { + return; + } + + this.#rLog.debug({ msg: "waiting for run handler to complete" }); + + const timedOut = await Promise.race([ + this.#runPromise.then(() => false).catch(() => false), + new Promise((resolve) => + setTimeout(() => resolve(true), timeoutMs), + ), + ]); + + if (timedOut) { + this.#rLog.warn({ + msg: "run handler did not complete in time, it may have leaked - ensure you use the abort signal (c.abortSignal) to exit gracefully", + timeoutMs, + }); + } else { + this.#rLog.debug({ msg: "run handler completed" }); + } + } + async #setupDatabase() { if ("db" in this.#config && this.#config.db) { const client = await this.#config.db.createClient({ diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts index f7c112b66f..e7ba67679b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts @@ -26,6 +26,7 @@ import { runActorKvTests } from "./tests/actor-kv"; import { runActorMetadataTests } from "./tests/actor-metadata"; import { runActorOnStateChangeTests } from "./tests/actor-onstatechange"; import { runActorQueueTests } from "./tests/actor-queue"; +import { runActorRunTests } from "./tests/actor-run"; import { runActorVarsTests } from "./tests/actor-vars"; import { runManagerDriverTests } from "./tests/manager-driver"; import { runRawHttpTests } from "./tests/raw-http"; @@ -124,6 +125,8 @@ export function runDriverTests( runActorQueueTests(driverTestConfig); + runActorRunTests(driverTestConfig); + runActorInlineClientTests(driverTestConfig); runActorKvTests(driverTestConfig); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-run.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-run.ts new file mode 100644 index 0000000000..50ebfdc645 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-run.ts @@ -0,0 +1,162 @@ +import { describe, expect, test } from "vitest"; +import { RUN_SLEEP_TIMEOUT } from "../../../fixtures/driver-test-suite/run"; +import type { DriverTestConfig } from "../mod"; +import { setupDriverTest, waitFor } from "../utils"; + +export function runActorRunTests(driverTestConfig: DriverTestConfig) { + describe.skipIf(driverTestConfig.skip?.sleep)("Actor Run Tests", () => { + test("run handler starts after actor startup", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actor = client.runWithTicks.getOrCreate(["run-starts"]); + + // Wait a bit for run handler to start + await waitFor(driverTestConfig, 100); + + const state = await actor.getState(); + expect(state.runStarted).toBe(true); + expect(state.tickCount).toBeGreaterThan(0); + }); + + test("run handler ticks continuously", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actor = client.runWithTicks.getOrCreate(["run-ticks"]); + + // Wait for some ticks + await waitFor(driverTestConfig, 200); + + const state1 = await actor.getState(); + expect(state1.tickCount).toBeGreaterThan(0); + + const count1 = state1.tickCount; + + // Wait more and check tick count increased + await waitFor(driverTestConfig, 200); + + const state2 = await actor.getState(); + expect(state2.tickCount).toBeGreaterThan(count1); + }); + + test("run handler exits gracefully on actor stop", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actor = client.runWithTicks.getOrCreate([ + "run-graceful-exit", + ]); + + // Wait for run to start + await waitFor(driverTestConfig, 100); + + const state1 = await actor.getState(); + expect(state1.runStarted).toBe(true); + + // Wait for sleep timeout to trigger sleep + await waitFor(driverTestConfig, RUN_SLEEP_TIMEOUT + 300); + + // Wake actor again and check state persisted + const state2 = await actor.getState(); + expect(state2.runExited).toBe(true); + }); + + test("actor without run handler works normally", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actor = client.runWithoutHandler.getOrCreate([ + "no-run-handler", + ]); + + const state = await actor.getState(); + expect(state.wakeCount).toBe(1); + + // Wait for sleep and wake again + await waitFor(driverTestConfig, RUN_SLEEP_TIMEOUT + 300); + + const state2 = await actor.getState(); + expect(state2.wakeCount).toBe(2); + }); + + test("run handler can consume from queue", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actor = client.runWithQueueConsumer.getOrCreate([ + "queue-consumer", + ]); + + // Wait for run handler to start + await waitFor(driverTestConfig, 100); + + // Send some messages to the queue + await actor.sendMessage({ type: "test", value: 1 }); + await actor.sendMessage({ type: "test", value: 2 }); + await actor.sendMessage({ type: "test", value: 3 }); + + // Wait for messages to be consumed + await waitFor(driverTestConfig, 300); + + const state = await actor.getState(); + expect(state.runStarted).toBe(true); + expect(state.messagesReceived.length).toBe(3); + expect(state.messagesReceived[0].body).toEqual({ + type: "test", + value: 1, + }); + expect(state.messagesReceived[1].body).toEqual({ + type: "test", + value: 2, + }); + expect(state.messagesReceived[2].body).toEqual({ + type: "test", + value: 3, + }); + }); + + test("run handler that exits early triggers destroy", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actor = client.runWithEarlyExit.getOrCreate(["early-exit"]); + + // Wait for run to start and exit + await waitFor(driverTestConfig, 100); + + const state1 = await actor.getState(); + expect(state1.runStarted).toBe(true); + + // Wait for the actor to be destroyed + await waitFor(driverTestConfig, 300); + + // After the run handler exits early, the actor should be destroyed. + // Depending on the driver, it may be in a destroyed state or recreated. + // In the file-system driver test environment, the actor is not automatically + // rescheduled, so we just verify the initial behavior worked. + // A new getOrCreate should create a fresh actor. + const actor2 = client.runWithEarlyExit.getOrCreate([ + "early-exit-fresh", + ]); + const state2 = await actor2.getState(); + expect(state2.runStarted).toBe(true); + }); + + test("run handler that throws error triggers destroy", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actor = client.runWithError.getOrCreate(["run-error"]); + + // Wait for run to start and throw + await waitFor(driverTestConfig, 100); + + const state1 = await actor.getState(); + expect(state1.runStarted).toBe(true); + + // Wait for the actor to be destroyed + await waitFor(driverTestConfig, 300); + + // After the run handler throws, the actor should be destroyed. + // Similar to the early exit test, the driver may not automatically reschedule. + // A new getOrCreate should create a fresh actor. + const actor2 = client.runWithError.getOrCreate(["run-error-fresh"]); + const state2 = await actor2.getState(); + expect(state2.runStarted).toBe(true); + }); + }); +} diff --git a/website/src/content/docs/actors/lifecycle.mdx b/website/src/content/docs/actors/lifecycle.mdx index 72142d8933..685cd2cfed 100644 --- a/website/src/content/docs/actors/lifecycle.mdx +++ b/website/src/content/docs/actors/lifecycle.mdx @@ -14,6 +14,7 @@ Actors transition through several states during their lifetime. Each transition 2. `onCreate` 3. `createVars` 4. `onWake` +5. `run` (background, does not block) **On Destroy** @@ -23,10 +24,12 @@ Actors transition through several states during their lifetime. Each transition 1. `createVars` 2. `onWake` +3. `run` (background, does not block) **On Sleep** (after idle period) -1. `onSleep` +1. Wait for `run` to complete (with timeout) +2. `onSleep` **On Connect** (per client) @@ -231,6 +234,87 @@ const counter = actor({ }); ``` +### `run` + +[API Reference](/typedoc/interfaces/rivetkit.mod.ActorDefinition.html) + +The `run` hook is called after the actor starts and runs in the background without blocking actor startup. This is ideal for long-running background tasks like: + +- Reading from message queues in a loop +- Tick loops for periodic work +- Custom workflow logic +- Background processing + +The handler receives an abort signal via `c.abortSignal` that fires when the actor is stopping. You should always check or listen to this signal to exit gracefully. + +**Important behavior:** +- If the `run` handler exits (returns), the actor will crash and reschedule +- If the `run` handler throws an error, the actor will crash and reschedule +- On shutdown, the actor waits for the `run` handler to complete (with configurable timeout via `options.runStopTimeout`) + +```typescript +import { actor } from "rivetkit"; + +// Example: Tick loop +const tickActor = actor({ + state: { tickCount: 0 }, + + run: async (c) => { + c.log.info("Background loop started"); + + while (!c.abortSignal.aborted) { + c.state.tickCount++; + c.log.info({ msg: "tick", count: c.state.tickCount }); + + // Wait 1 second, but exit early if aborted + await new Promise((resolve) => { + const timeout = setTimeout(resolve, 1000); + c.abortSignal.addEventListener("abort", () => { + clearTimeout(timeout); + resolve(); + }, { once: true }); + }); + } + + c.log.info("Background loop exiting gracefully"); + }, + + actions: { + getTickCount: (c) => c.state.tickCount + } +}); +``` + +```typescript +import { actor } from "rivetkit"; + +// Example: Queue consumer +const queueConsumer = actor({ + state: { processedCount: 0 }, + + run: async (c) => { + c.log.info("Queue consumer started"); + + while (!c.abortSignal.aborted) { + // Wait for next message with timeout + const message = await c.queue.next("tasks", { timeout: 1000 }); + + if (message) { + c.log.info({ msg: "processing message", body: message.body }); + // Process the message... + c.state.processedCount++; + } + } + + c.log.info("Queue consumer exiting gracefully"); + }, + + actions: { + getProcessedCount: (c) => c.state.processedCount + } +}); +``` + ### `onStateChange` [API Reference](/typedoc/interfaces/rivetkit.mod.ActorDefinition.html) @@ -571,6 +655,9 @@ const myActor = actor({ // Max time to wait for background promises during shutdown (default: 15000ms) waitUntilTimeout: 15_000, + // Max time to wait for run handler to stop during shutdown (default: 15000ms) + runStopTimeout: 15_000, + // Timeout for connection liveness check (default: 2500ms) connectionLivenessTimeout: 2500, @@ -602,6 +689,7 @@ const myActor = actor({ | `stateSaveInterval` | 10000ms | Interval for persisting state | | `actionTimeout` | 60000ms | Timeout for action execution | | `waitUntilTimeout` | 15000ms | Max time to wait for background promises during shutdown | +| `runStopTimeout` | 15000ms | Max time to wait for run handler to stop during shutdown | | `connectionLivenessTimeout` | 2500ms | Timeout for connection liveness check | | `connectionLivenessInterval` | 5000ms | Interval for connection liveness check | | `noSleep` | false | Prevent actor from sleeping | @@ -612,7 +700,7 @@ const myActor = actor({ ### Running Background Tasks -The `c.runInBackground` method allows you to execute promises asynchronously without blocking the actor's main execution flow. The actor is prevented from sleeping while the promise passed to `runInBackground` is still active. This is useful for fire-and-forget operations where you don't need to wait for completion. +The `c.waitUntil` method allows you to execute promises asynchronously without blocking the actor's main execution flow. The actor is prevented from sleeping while the promise passed to `waitUntil` is still active. This is useful for fire-and-forget operations where you don't need to wait for completion. Common use cases: - **Analytics and logging**: Send events to external services without delaying responses @@ -635,7 +723,7 @@ const gameRoom = actor({ playerJoined: (c, playerId: string) => { c.state.players[playerId] = { joinedAt: Date.now() }; - // Send analytics event without blocking using waitUntil + // Send analytics event without blocking c.waitUntil( fetch('https://analytics.example.com/events', { method: 'POST', @@ -780,6 +868,21 @@ const counter = actor({ console.log(`Counter "${c.state.name}" started with count:`, c.state.count); }, + // Background task (does not block startup) + run: async (c) => { + while (!c.abortSignal.aborted) { + // Example: periodic logging + console.log(`Counter "${c.state.name}" is at ${c.state.count}`); + await new Promise((resolve) => { + const timeout = setTimeout(resolve, 60000); + c.abortSignal.addEventListener("abort", () => { + clearTimeout(timeout); + resolve(); + }, { once: true }); + }); + } + }, + onStateChange: (c, newState) => { c.broadcast('countUpdated', { count: newState.count,