Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ interface QueueCompletionWaiter {
timeoutHandle?: ReturnType<typeof setTimeout>;
}

interface MessageListener {
nameSet: Set<string>;
resolve: () => void;
reject: (error: Error) => void;
actorAbortCleanup?: () => void;
signal?: AbortSignal;
signalAbortCleanup?: () => void;
}

const DEFAULT_METADATA: QueueMetadata = {
nextId: 1n,
size: 0,
Expand All @@ -75,6 +84,7 @@ export class QueueManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
#pendingWarningHandle: ReturnType<typeof setTimeout> | undefined;
#redeliveryTimeout: ReturnType<typeof setTimeout> | undefined;
#redeliveryAt: number | undefined;
#messageListeners = new Set<MessageListener>();

constructor(
actor: ActorInstance<S, CP, CS, V, I, DB>,
Expand Down Expand Up @@ -197,6 +207,7 @@ export class QueueManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
if (!options.deferWaiters) {
await this.#maybeResolveWaiters();
}
this.#notifyMessageListeners(name);

return message;
}
Expand Down Expand Up @@ -352,6 +363,59 @@ export class QueueManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
await this.#maybeResolveWaiters();
}

/** Waits for messages with any of the specified names to appear in the queue. */
async waitForNames(
names: string[],
abortSignal?: AbortSignal,
): Promise<void> {
const nameSet = new Set(names);
const existing = await this.#loadQueueMessages();
if (existing.some((message) => nameSet.has(message.name))) {
return;
}

return await new Promise<void>((resolve, reject) => {
const listener: MessageListener = {
nameSet,
resolve: () => {
this.#removeMessageListener(listener);
resolve();
},
reject: (error) => {
this.#removeMessageListener(listener);
reject(error);
},
};

const actorAbortSignal = this.#actor.abortSignal;
const onActorAbort = () =>
listener.reject(new errors.ActorAborted());
if (actorAbortSignal.aborted) {
onActorAbort();
return;
}
actorAbortSignal.addEventListener("abort", onActorAbort, {
once: true,
});
listener.actorAbortCleanup = () =>
actorAbortSignal.removeEventListener("abort", onActorAbort);

if (abortSignal) {
const onAbort = () =>
listener.reject(new errors.ActorAborted());
if (abortSignal.aborted) {
onAbort();
return;
}
abortSignal.addEventListener("abort", onAbort, { once: true });
listener.signalAbortCleanup = () =>
abortSignal.removeEventListener("abort", onAbort);
}
Comment on lines +403 to +413
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak: When abortSignal is already aborted, the code calls onAbort() and returns early (lines 406-408). However, the actor abort signal listener was already registered (line 397) but never gets cleaned up because the listener hasn't been added to #messageListeners yet (line 415). The #removeMessageListener method only calls cleanup callbacks for listeners in the set.

Fix: Manually call cleanup before early return:

if (abortSignal.aborted) {
  listener.actorAbortCleanup?.();
  onAbort();
  return;
}
Suggested change
if (abortSignal) {
const onAbort = () =>
listener.reject(new errors.ActorAborted());
if (abortSignal.aborted) {
onAbort();
return;
}
abortSignal.addEventListener("abort", onAbort, { once: true });
listener.signalAbortCleanup = () =>
abortSignal.removeEventListener("abort", onAbort);
}
if (abortSignal) {
const onAbort = () =>
listener.reject(new errors.ActorAborted());
if (abortSignal.aborted) {
listener.actorAbortCleanup?.();
onAbort();
return;
}
abortSignal.addEventListener("abort", onAbort, { once: true });
listener.signalAbortCleanup = () =>
abortSignal.removeEventListener("abort", onAbort);
}

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


this.#messageListeners.add(listener);
});
}

/** Returns all messages currently in the queue without removing them. */
async getMessages(): Promise<QueueMessage[]> {
return await this.#loadQueueMessages();
Expand Down Expand Up @@ -473,6 +537,26 @@ export class QueueManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
return decoded;
}

#removeMessageListener(listener: MessageListener): void {
if (this.#messageListeners.delete(listener)) {
listener.actorAbortCleanup?.();
listener.signalAbortCleanup?.();
}
}

#notifyMessageListeners(name: string): void {
if (this.#messageListeners.size === 0) {
return;
}
for (const listener of [...this.#messageListeners]) {
if (!listener.nameSet.has(name)) {
continue;
}
this.#removeMessageListener(listener);
listener.resolve();
}
}

async #removeMessages(
messages: QueueMessage[],
options: { resolveWaiters: boolean },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ export class ActorQueue<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {

/** Sends a message to the specified queue. */
async send(name: string, body: unknown): Promise<QueueMessage> {
return await this.#queueManager.enqueue(name, body);
const message = await this.#queueManager.enqueue(name, body);
return this.#toQueueMessage(message, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export function runActorQueueTests(driverTestConfig: DriverTestConfig) {
await waitFor(driverTestConfig, 60);
const result = await resultPromise;

expect(result.status).toBe("timedOut");
expect(result?.status).toBe("timedOut");
});

test("complete throws when wait is false", async (c) => {
Expand Down
1 change: 0 additions & 1 deletion rivetkit-typescript/packages/workflow-engine/AGENTS.md

This file was deleted.

9 changes: 9 additions & 0 deletions rivetkit-typescript/packages/workflow-engine/AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Workflow Engine Notes

## Dirty State Requirements

- History entries must set `entry.dirty = true` whenever the entry is created or mutated. `flush()` persists dirty entries and clears the flag.
- Entry metadata must set `metadata.dirty = true` whenever metadata fields change. `flush()` persists dirty metadata and clears the flag.
- Name registry writes are tracked by `storage.flushedNameCount`. New names must be registered with `registerName()` before flushing.
- Workflow state/output/error are tracked via `storage.flushedState`, `storage.flushedOutput`, and `storage.flushedError`. Update the fields and call `flush()`; it will write if the value changed.
- `flush()` does not clear workflow output/error keys when values are unset. If you need to clear them, explicitly `driver.delete(buildWorkflowOutputKey())` or `driver.delete(buildWorkflowErrorKey())`.
9 changes: 0 additions & 9 deletions rivetkit-typescript/packages/workflow-engine/agents.md

This file was deleted.

Loading