Skip to content

Commit 2c75012

Browse files
committed
fix(rivetkit): remove top-level promise errors
1 parent e554d7e commit 2c75012

10 files changed

Lines changed: 64 additions & 50 deletions

File tree

rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import type {
1313
ManagerDriver,
1414
} from "rivetkit/driver-helpers";
1515
import { promiseWithResolvers } from "rivetkit/utils";
16+
import { logger } from "./log";
1617
import { kvDelete, kvGet, kvListPrefix, kvPut } from "./actor-kv";
1718
import { GLOBAL_KV_KEYS } from "./global-kv";
1819
import { getCloudflareAmbientEnv } from "./handler";
@@ -142,7 +143,7 @@ export class CloudflareActorsActorDriver implements ActorDriver {
142143
// Create new actor state if it doesn't exist
143144
if (!actorState) {
144145
actorState = new ActorGlobalState();
145-
actorState.actorPromise = promiseWithResolvers();
146+
actorState.actorPromise = promiseWithResolvers((reason) => logger().warn({ msg: "unhandled actor promise rejection", reason }));
146147
this.#globalState.setActorState(doState.ctx, actorState);
147148
} else if (actorState.actorPromise) {
148149
// Another request is already loading this actor, wait for it

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ export const sleepWithLongRpc = actor({
5252
},
5353
longRunningRpc: async (c) => {
5454
c.log.info("starting long running rpc");
55-
c.vars.longRunningResolve = promiseWithResolvers();
55+
c.vars.longRunningResolve = promiseWithResolvers((reason) => c.log.warn({ msg: "unhandled long running rpc rejection", reason }));
5656
c.broadcast("waiting");
5757
await c.vars.longRunningResolve.promise;
5858
c.log.info("finished long running rpc");

rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
QUEUE_METADATA_VERSIONED,
77
} from "@/schemas/actor-persist/versioned";
88
import { promiseWithResolvers } from "@/utils";
9+
import { loggerWithoutContext } from "@/actor/log";
910
import type { AnyDatabaseProvider } from "../database";
1011
import type { ActorDriver } from "../driver";
1112
import * as errors from "../errors";
@@ -260,7 +261,7 @@ export class QueueManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
260261
}
261262

262263
const { promise, resolve, reject } =
263-
promiseWithResolvers<QueueMessage[]>();
264+
promiseWithResolvers<QueueMessage[]>((reason) => loggerWithoutContext().warn({ msg: "unhandled queue message waiter rejection", reason }));
264265
const waiterId = crypto.randomUUID();
265266
const waiter: QueueWaiter = {
266267
id: waiterId,
@@ -318,7 +319,7 @@ export class QueueManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
318319
timeout?: number,
319320
): Promise<QueueCompletionResult> {
320321
const { promise, resolve, reject } =
321-
promiseWithResolvers<QueueCompletionResult>();
322+
promiseWithResolvers<QueueCompletionResult>((reason) => loggerWithoutContext().warn({ msg: "unhandled queue completion waiter rejection", reason }));
322323
const waiterId = crypto.randomUUID();
323324

324325
const waiter: QueueCompletionWaiter = {

rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
CONN_VERSIONED,
88
} from "@/schemas/actor-persist/versioned";
99
import { promiseWithResolvers, SinglePromiseQueue } from "@/utils";
10+
import { loggerWithoutContext } from "@/actor/log";
1011
import { type AnyConn, CONN_STATE_MANAGER_SYMBOL } from "../conn/mod";
1112
import { convertConnToBarePersistedConn } from "../conn/persisted";
1213
import type { ActorDriver } from "../driver";
@@ -210,7 +211,7 @@ export class StateManager<S, CP, CS, I> {
210211
} else {
211212
// Create promise for waiting
212213
if (!this.#onPersistSavedPromise) {
213-
this.#onPersistSavedPromise = promiseWithResolvers();
214+
this.#onPersistSavedPromise = promiseWithResolvers((reason) => loggerWithoutContext().warn({ msg: "unhandled persist saved promise rejection", reason }));
214215
}
215216

216217
// Save throttled

rivetkit-typescript/packages/rivetkit/src/actor/router-websocket-endpoints.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ export async function routeWebSocket(
8686
});
8787

8888
// Promise used to wait for the websocket close in `disconnect`
89-
const closePromiseResolvers = promiseWithResolvers<void>();
89+
const closePromiseResolvers = promiseWithResolvers<void>((reason) => loggerWithoutContext().warn({ msg: "unhandled websocket close promise rejection", reason }));
9090

9191
// Strip query parameters from requestPath for routing purposes.
9292
// This handles paths like "/websocket?query=value" which should route

rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ export class ActorConnRaw {
228228
const { promise, resolve, reject } = promiseWithResolvers<{
229229
id: bigint;
230230
output: unknown;
231-
}>();
231+
}>((reason) => logger().warn({ msg: "unhandled action promise rejection", reason }));
232232
this.#actionsInFlight.set(actionId, {
233233
name: opts.name,
234234
resolve,
@@ -359,7 +359,7 @@ export class ActorConnRaw {
359359
// Create promise for open
360360
if (this.#onOpenPromise)
361361
throw new Error("#onOpenPromise already defined");
362-
this.#onOpenPromise = promiseWithResolvers();
362+
this.#onOpenPromise = promiseWithResolvers((reason) => logger().warn({ msg: "unhandled open promise rejection", reason }));
363363

364364
await this.#connectWebSocket();
365365

@@ -1189,7 +1189,7 @@ export class ActorConnRaw {
11891189
ws.readyState !== 2 /* CLOSING */ &&
11901190
ws.readyState !== 3 /* CLOSED */
11911191
) {
1192-
const { promise, resolve } = promiseWithResolvers();
1192+
const { promise, resolve } = promiseWithResolvers((reason) => logger().warn({ msg: "unhandled websocket close promise rejection", reason }));
11931193
ws.addEventListener("close", () => resolve(undefined));
11941194
ws.close(1000, "Disposed");
11951195
await promise;

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ export class EngineActorDriver implements ActorDriver {
8282
#actors: Map<string, ActorHandler> = new Map();
8383
#actorRouter: ActorRouter;
8484

85-
#runnerStarted: PromiseWithResolvers<undefined> = promiseWithResolvers();
86-
#runnerStopped: PromiseWithResolvers<undefined> = promiseWithResolvers();
85+
#runnerStarted: PromiseWithResolvers<undefined> = promiseWithResolvers((reason) => logger().warn({ msg: "unhandled runner started promise rejection", reason }));
86+
#runnerStopped: PromiseWithResolvers<undefined> = promiseWithResolvers((reason) => logger().warn({ msg: "unhandled runner stopped promise rejection", reason }));
8787
#isRunnerStopped: boolean = false;
8888

8989
// HACK: Track actor stop intent locally since the runner protocol doesn't
@@ -426,7 +426,7 @@ export class EngineActorDriver implements ActorDriver {
426426
// async operations to avoid race conditions where multiple calls might try to
427427
// create the same handler simultaneously.
428428
handler = {
429-
actorStartPromise: promiseWithResolvers(),
429+
actorStartPromise: promiseWithResolvers((reason) => logger().warn({ msg: "unhandled actor start promise rejection", reason })),
430430
};
431431
this.#actors.set(actorId, handler);
432432
}
@@ -435,40 +435,50 @@ export class EngineActorDriver implements ActorDriver {
435435
invariant(actorConfig.key, "actor should have a key");
436436
const key = deserializeActorKey(actorConfig.key);
437437

438-
// Initialize storage
439-
const [persistDataBuffer] = await this.#runner.kvGet(actorId, [
440-
KEYS.PERSIST_DATA,
441-
]);
442-
if (persistDataBuffer === null) {
443-
const initialKvState = getInitialActorKvState(input);
444-
await this.#runner.kvPut(actorId, initialKvState);
445-
logger().debug({
446-
msg: "initialized persist data for new actor",
447-
actorId,
448-
});
449-
} else {
450-
logger().debug({
451-
msg: "found existing persist data for actor",
452-
actorId,
453-
dataSize: persistDataBuffer.byteLength,
454-
});
455-
}
438+
try {
439+
// Initialize storage
440+
const [persistDataBuffer] = await this.#runner.kvGet(actorId, [
441+
KEYS.PERSIST_DATA,
442+
]);
443+
if (persistDataBuffer === null) {
444+
const initialKvState = getInitialActorKvState(input);
445+
await this.#runner.kvPut(actorId, initialKvState);
446+
logger().debug({
447+
msg: "initialized persist data for new actor",
448+
actorId,
449+
});
450+
} else {
451+
logger().debug({
452+
msg: "found existing persist data for actor",
453+
actorId,
454+
dataSize: persistDataBuffer.byteLength,
455+
});
456+
}
456457

457-
// Create actor instance
458-
const definition = lookupInRegistry(this.#config, actorConfig.name);
459-
handler.actor = await definition.instantiate();
458+
// Create actor instance
459+
const definition = lookupInRegistry(this.#config, actorConfig.name);
460+
handler.actor = await definition.instantiate();
460461

461-
// Start actor
462-
await handler.actor.start(
463-
this,
464-
this.#inlineClient,
465-
actorId,
466-
name,
467-
key,
468-
"unknown", // TODO: Add regions
469-
);
462+
// Start actor
463+
await handler.actor.start(
464+
this,
465+
this.#inlineClient,
466+
actorId,
467+
name,
468+
key,
469+
"unknown", // TODO: Add regions
470+
);
470471

471-
logger().debug({ msg: "runner actor started", actorId, name, key });
472+
logger().debug({ msg: "runner actor started", actorId, name, key });
473+
} catch (innerError) {
474+
const error = new Error(
475+
`Failed to start actor ${actorId}: ${innerError}`,
476+
{ cause: innerError },
477+
);
478+
handler.actorStartPromise?.reject(error);
479+
handler.actorStartPromise = undefined;
480+
throw error;
481+
}
472482
}
473483

474484
async #runnerOnActorStop(

rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ export class FileSystemGlobalState {
434434
return;
435435
}
436436
actor.lifecycleState = ActorLifecycleState.STARTING_SLEEP;
437-
actor.stopPromise = promiseWithResolvers();
437+
actor.stopPromise = promiseWithResolvers((reason) => logger().warn({ msg: "unhandled actor sleep stop promise rejection", reason }));
438438

439439
// Wait for actor to fully start before stopping it to avoid race conditions
440440
if (actor.loadPromise) await actor.loadPromise.catch();
@@ -486,7 +486,7 @@ export class FileSystemGlobalState {
486486
return;
487487
}
488488
actor.lifecycleState = ActorLifecycleState.STARTING_DESTROY;
489-
actor.stopPromise = promiseWithResolvers();
489+
actor.stopPromise = promiseWithResolvers((reason) => logger().warn({ msg: "unhandled actor destroy stop promise rejection", reason }));
490490

491491
// Wait for actor to fully start before stopping it to avoid race conditions
492492
if (actor.loadPromise) await actor.loadPromise.catch();
@@ -658,7 +658,7 @@ export class FileSystemGlobalState {
658658
invariant(entry, "actor entry does not exist");
659659

660660
const previousWrite = entry.pendingWriteResolver;
661-
const currentWrite = promiseWithResolvers<void>();
661+
const currentWrite = promiseWithResolvers<void>((reason) => logger().warn({ msg: "unhandled kv write promise rejection", reason }));
662662
entry.pendingWriteResolver = currentWrite;
663663

664664
if (previousWrite) {
@@ -895,7 +895,7 @@ export class FileSystemGlobalState {
895895
}
896896

897897
// Create start promise
898-
entry.startPromise = promiseWithResolvers();
898+
entry.startPromise = promiseWithResolvers((reason) => logger().warn({ msg: "unhandled actor start promise rejection", reason }));
899899

900900
try {
901901
// Create actor

rivetkit-typescript/packages/rivetkit/src/manager/gateway.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ export async function createTestWebSocketProxy(
425425
promise: clientToProxyWsPromise,
426426
resolve: clientToProxyWsResolve,
427427
reject: clientToProxyWsReject,
428-
} = promiseWithResolvers<WSContext>();
428+
} = promiseWithResolvers<WSContext>((reason) => logger().warn({ msg: "unhandled client websocket promise rejection", reason }));
429429
try {
430430
// Resolve the client WebSocket promise
431431
logger().debug({ msg: "awaiting client websocket promise" });

rivetkit-typescript/packages/rivetkit/src/utils.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ export type LongTimeoutHandle = { abort: () => void };
9494
*
9595
* This is specifically for Cloudflare Workers. Their implementation of Promise.withResolvers does not work correctly.
9696
*/
97-
export function promiseWithResolvers<T>(): {
97+
export function promiseWithResolvers<T>(onReject: (reason?: any) => void): {
9898
promise: Promise<T>;
9999
resolve: (value: T | PromiseLike<T>) => void;
100100
reject: (reason?: any) => void;
@@ -105,6 +105,7 @@ export function promiseWithResolvers<T>(): {
105105
resolve = res;
106106
reject = rej;
107107
});
108+
promise.catch(onReject);
108109
return { promise, resolve, reject };
109110
}
110111

@@ -155,7 +156,7 @@ export class SinglePromiseQueue {
155156

156157
// Ensure a shared resolver exists for all callers in this cycle
157158
if (!this.#pending) {
158-
this.#pending = promiseWithResolvers<void>();
159+
this.#pending = promiseWithResolvers<void>((reason) => logger().warn({ msg: "unhandled single promise queue rejection", reason }));
159160
}
160161

161162
const waitForThisCycle = this.#pending.promise;

0 commit comments

Comments
 (0)