Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 0 additions & 35 deletions engine/artifacts/config-schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ import { rawHttpRequestPropertiesActor } from "./raw-http-request-properties";
import { rawWebSocketActor, rawWebSocketBinaryActor } from "./raw-websocket";
import { requestAccessActor } from "./request-access";
import { rejectConnectionActor } from "./reject-connection";
import {
runWithError,
runWithEarlyExit,
runWithoutHandler,
runWithQueueConsumer,
runWithTicks,
} from "./run";
import { scheduled } from "./scheduled";
import {
sleep,
Expand Down Expand Up @@ -127,5 +134,11 @@ export const registry = setup({
// From large-payloads.ts
largePayloadActor,
largePayloadConnActor,
// From run.ts
runWithTicks,
runWithQueueConsumer,
runWithEarlyExit,
runWithError,
runWithoutHandler,
},
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { actor } from "rivetkit";
import type { registry } from "./registry";

export const RUN_SLEEP_TIMEOUT = 500;

// Actor that tracks tick counts and respects abort signal
export const runWithTicks = actor({
state: {
tickCount: 0,
lastTickAt: 0,
runStarted: false,
runExited: false,
},
run: async (c) => {
c.state.runStarted = true;
c.log.info("run handler started");

while (!c.abortSignal.aborted) {
c.state.tickCount += 1;
c.state.lastTickAt = Date.now();
c.log.info({ msg: "tick", tickCount: c.state.tickCount });

// Wait 50ms between ticks, or exit early if aborted
await new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, 50);
c.abortSignal.addEventListener(
"abort",
() => {
clearTimeout(timeout);
resolve();
},
{ once: true },
);
});
}

c.state.runExited = true;
c.log.info("run handler exiting gracefully");
},
actions: {
getState: (c) => ({
tickCount: c.state.tickCount,
lastTickAt: c.state.lastTickAt,
runStarted: c.state.runStarted,
runExited: c.state.runExited,
}),
},
options: {
sleepTimeout: RUN_SLEEP_TIMEOUT,
runStopTimeout: 1000,
},
});

// Actor that consumes from a queue in the run handler
export const runWithQueueConsumer = actor({
state: {
messagesReceived: [] as Array<{ name: string; body: unknown }>,
runStarted: false,
},
run: async (c) => {
c.state.runStarted = true;
c.log.info("run handler started, waiting for messages");

while (!c.abortSignal.aborted) {
const message = await c.queue.next("messages", { timeout: 100 });
if (message) {
c.log.info({ msg: "received message", body: message.body });
c.state.messagesReceived.push({
name: message.name,
body: message.body,
});
}
}

c.log.info("run handler exiting gracefully");
},
actions: {
getState: (c) => ({
messagesReceived: c.state.messagesReceived,
runStarted: c.state.runStarted,
}),
sendMessage: async (c, body: unknown) => {
const client = c.client<typeof registry>();
const handle = client.runWithQueueConsumer.getForId(c.actorId);
await handle.queue.messages.send(body);
return true;
},
},
options: {
sleepTimeout: RUN_SLEEP_TIMEOUT,
runStopTimeout: 1000,
},
});

// Actor that exits the run handler after a short delay to test crash behavior
export const runWithEarlyExit = actor({
state: {
runStarted: false,
destroyCalled: false,
},
run: async (c) => {
c.state.runStarted = true;
c.log.info("run handler started, will exit after delay");
// Wait a bit so we can observe the runStarted state before exit
await new Promise((resolve) => setTimeout(resolve, 200));
c.log.info("run handler exiting early");
// Exit without respecting abort signal
},
onDestroy: (c) => {
c.state.destroyCalled = true;
},
actions: {
getState: (c) => ({
runStarted: c.state.runStarted,
destroyCalled: c.state.destroyCalled,
}),
},
options: {
sleepTimeout: RUN_SLEEP_TIMEOUT,
},
});

// Actor that throws an error in the run handler to test crash behavior
export const runWithError = actor({
state: {
runStarted: false,
destroyCalled: false,
},
run: async (c) => {
c.state.runStarted = true;
c.log.info("run handler started, will throw error");
await new Promise((resolve) => setTimeout(resolve, 50));
throw new Error("intentional error in run handler");
},
onDestroy: (c) => {
c.state.destroyCalled = true;
},
actions: {
getState: (c) => ({
runStarted: c.state.runStarted,
destroyCalled: c.state.destroyCalled,
}),
},
options: {
sleepTimeout: RUN_SLEEP_TIMEOUT,
},
});

// Actor without a run handler for comparison
export const runWithoutHandler = actor({
state: {
wakeCount: 0,
},
onWake: (c) => {
c.state.wakeCount += 1;
},
actions: {
getState: (c) => ({
wakeCount: c.state.wakeCount,
}),
},
options: {
sleepTimeout: RUN_SLEEP_TIMEOUT,
},
});
47 changes: 47 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/actor/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type {
DestroyContext,
DisconnectContext,
RequestContext,
RunContext,
SleepContext,
StateChangeContext,
WakeContext,
Expand Down Expand Up @@ -52,6 +53,7 @@ export const ActorConfigSchema = z
onDestroy: zFunction().optional(),
onWake: zFunction().optional(),
onSleep: zFunction().optional(),
run: zFunction().optional(),
onStateChange: zFunction().optional(),
onBeforeConnect: zFunction().optional(),
onConnect: zFunction().optional(),
Expand Down Expand Up @@ -79,6 +81,8 @@ export const ActorConfigSchema = z
actionTimeout: z.number().positive().default(60_000),
// Max time to wait for waitUntil background promises during shutdown
waitUntilTimeout: z.number().positive().default(15_000),
// Max time to wait for run handler to stop during shutdown
runStopTimeout: z.number().positive().default(15_000),
connectionLivenessTimeout: z.number().positive().default(2500),
connectionLivenessInterval: z.number().positive().default(5000),
noSleep: z.boolean().default(false),
Expand Down Expand Up @@ -319,6 +323,34 @@ interface BaseActorConfig<
>,
) => void | Promise<void>;

/**
* Called after the actor starts up. Does not block actor startup.
*
* Use this for background tasks like:
* - Reading from queues in a loop
* - Tick loops for periodic work
* - Custom workflow logic
*
* The handler receives an abort signal via `c.abortSignal` that fires
* when the actor is stopping. You should use this to gracefully exit.
*
* If this handler exits or throws, the actor will crash and reschedule.
* On shutdown, the actor waits for this handler to complete with a
* configurable timeout (options.runStopTimeout, default 15s).
*
* @returns Void or a Promise. If the promise exits, the actor crashes.
*/
run?: (
c: RunContext<
TState,
TConnParams,
TConnState,
TVars,
TInput,
TDatabase
>,
) => void | Promise<void>;

/**
* Called when the actor's state changes.
*
Expand Down Expand Up @@ -498,6 +530,8 @@ export type ActorConfig<
| "onCreate"
| "onDestroy"
| "onWake"
| "onSleep"
| "run"
| "onStateChange"
| "onBeforeConnect"
| "onConnect"
Expand Down Expand Up @@ -559,6 +593,7 @@ export type ActorConfigInput<
| "onDestroy"
| "onWake"
| "onSleep"
| "run"
| "onStateChange"
| "onBeforeConnect"
| "onConnect"
Expand Down Expand Up @@ -672,6 +707,12 @@ export const DocActorOptionsSchema = z
.describe(
"Max time in ms to wait for waitUntil background promises during shutdown. Default: 15000",
),
runStopTimeout: z
.number()
.optional()
.describe(
"Max time in ms to wait for run handler to stop during shutdown. Default: 15000",
),
connectionLivenessTimeout: z
.number()
.optional()
Expand Down Expand Up @@ -779,6 +820,12 @@ export const DocActorConfigSchema = z
.describe(
"Called when the actor is stopping or sleeping. Use to clean up resources.",
),
run: z
.unknown()
.optional()
.describe(
"Called after actor starts. Does not block startup. Use for background tasks like queue processing or tick loops. If it exits or throws, the actor crashes.",
),
onStateChange: z
.unknown()
.optional()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export { CreateVarsContext, type CreateVarsContextOf } from "./create-vars";
export { DestroyContext, type DestroyContextOf } from "./destroy";
export { DisconnectContext, type DisconnectContextOf } from "./disconnect";
export { RequestContext, type RequestContextOf } from "./request";
export { RunContext, type RunContextOf } from "./run";
export { SleepContext, type SleepContextOf } from "./sleep";
export {
StateChangeContext,
Expand Down
40 changes: 40 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/actor/contexts/run.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import type { AnyDatabaseProvider } from "../database";
import type { ActorDefinition, AnyActorDefinition } from "../definition";
import { ActorContext } from "./base/actor";

/**
* Context for the run lifecycle hook.
*
* This context is passed to the `run` handler which executes after the actor
* starts. It does not block actor startup and is intended for background tasks.
*
* Use `c.abortSignal` to detect when the actor is stopping and gracefully exit.
*/
export class RunContext<
TState,
TConnParams,
TConnState,
TVars,
TInput,
TDatabase extends AnyDatabaseProvider,
> extends ActorContext<
TState,
TConnParams,
TConnState,
TVars,
TInput,
TDatabase
> {}

export type RunContextOf<AD extends AnyActorDefinition> =
AD extends ActorDefinition<
infer S,
infer CP,
infer CS,
infer V,
infer I,
infer DB extends AnyDatabaseProvider,
any
>
? RunContext<S, CP, CS, V, I, DB>
: never;
Loading
Loading