diff --git a/.changeset/fix-requestsnapshot-publish-ordering.md b/.changeset/fix-requestsnapshot-publish-ordering.md new file mode 100644 index 0000000000..90e0082d17 --- /dev/null +++ b/.changeset/fix-requestsnapshot-publish-ordering.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/client': patch +--- + +Fix `requestSnapshot()` so it resolves only after the injected snapshot batch has been delivered to subscribers, including async and reentrant subscriber paths. diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 58212292dd..541dfd202f 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -609,6 +609,12 @@ export class ShapeStream = Row> #tickPromiseResolver?: () => void #tickPromiseRejecter?: (reason?: unknown) => void #messageChain = Promise.resolve([]) // promise chain for incoming messages + // Tracks when subscriber callbacks are actively being delivered from + // #messageChain. requestSnapshot can inject a nested batch from inside a + // subscriber; in that reentrant case #publish uses this as an intentional + // escape hatch to deliver the nested snapshot batch immediately rather than + // queueing it behind the subscriber that is awaiting it. + #isPublishing = false #snapshotTracker = new SnapshotTracker() #pauseLock: PauseLock #currentFetchUrl?: URL // Current fetch URL for computing shape key @@ -1719,11 +1725,7 @@ export class ShapeStream = Row> } async #publish(messages: Message[]): Promise { - // We process messages asynchronously - // but SSE's `onmessage` handler is synchronous. - // We use a promise chain to ensure that the handlers - // execute sequentially in the order the messages were received. - this.#messageChain = this.#messageChain.then(() => + const deliver = () => Promise.all( Array.from(this.#subscribers.values()).map(async ([callback, __]) => { try { @@ -1735,7 +1737,25 @@ export class ShapeStream = Row> } }) ) - ) + + // We process messages asynchronously but SSE's `onmessage` handler is + // synchronous. Use a promise chain to ensure handlers execute sequentially + // in the order messages were received. If a subscriber reentrantly requests + // a snapshot, deliver that nested batch immediately instead of appending it + // behind the currently-running subscriber callback, which would deadlock + // when requestSnapshot awaits publication. + if (this.#isPublishing) { + return deliver() + } + + this.#messageChain = this.#messageChain.then(async () => { + this.#isPublishing = true + try { + return await deliver() + } finally { + this.#isPublishing = false + } + }) return this.#messageChain } @@ -1901,7 +1921,7 @@ export class ShapeStream = Row> metadata, new Set(data.map((message) => message.key)) ) - this.#onMessages(dataWithEndBoundary, false) + await this.#onMessages(dataWithEndBoundary, false) // On cold start the stream's offset is still at "now". Advance it // to the snapshot's position so no updates are missed in between. diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index f4babae414..94830fc02f 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -1694,10 +1694,11 @@ describe.for(fetchAndSse)( limit: 100, }) - // Wait until shape reflects the snapshot - await vi.waitFor(() => { - expect(shape.currentRows.length).toBe(data.length) - }) + // requestSnapshot must not resolve until subscriber callbacks for the + // injected snapshot batch have completed. Callers such as TanStack DB's + // on-demand loadSubset rely on this to make immediate reads after await + // consistent. + expect(shape.currentRows.length).toBe(data.length) // Compare keys in stream vs returned snapshot data const returnedKeys = new Set(data.map((m) => m.key)) diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index f249a513ab..b1bf11a842 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { ShapeStream, isChangeMessage, + isControlMessage, Message, Row, _resetHttpWarningForTesting, @@ -24,6 +25,185 @@ describe(`ShapeStream`, () => { afterEach(() => aborter.abort()) + it(`requestSnapshot waits for snapshot messages to be published to subscribers before resolving`, async () => { + const snapshotRow = { + key: `test-1`, + value: { id: `1` }, + headers: { + operation: `insert`, + relation: [`public`, `test`], + }, + offset: `0_0`, + } + + const fetchMock = vi.fn(() => + Promise.resolve( + new Response( + JSON.stringify({ + metadata: { + snapshot_mark: 1, + xmin: `1`, + xmax: `2`, + xip_list: [], + database_lsn: `0`, + }, + data: [snapshotRow], + }), + { + status: 200, + headers: { + 'content-type': `application/json`, + 'electric-handle': `handle-1`, + 'electric-offset': `0_0`, + 'electric-schema': `{"id":{"type":"text"}}`, + }, + } + ) + ) + ) + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + log: `changes_only`, + signal: aborter.signal, + fetchClient: fetchMock, + subscribe: false, + }) + + let releaseSubscriber!: () => void + const subscriberFinished = new Promise((resolve) => { + releaseSubscriber = resolve + }) + let snapshotRequestResolved = false + let publishedMessages: Message[] = [] + + stream.subscribe(async (messages) => { + if (messages.some(isChangeMessage)) { + publishedMessages = messages + await subscriberFinished + } + }) + + const snapshotRequest = stream.requestSnapshot({ limit: 1 }).then(() => { + snapshotRequestResolved = true + }) + + await resolveInMacrotask(undefined) + expect(snapshotRequestResolved).toBe(false) + + releaseSubscriber() + await snapshotRequest + + expect(publishedMessages.some(isChangeMessage)).toBe(true) + expect( + publishedMessages.some( + (message) => + isControlMessage(message) && + message.headers.control === `snapshot-end` + ) + ).toBe(true) + expect( + publishedMessages.some( + (message) => + isControlMessage(message) && message.headers.control === `subset-end` + ) + ).toBe(true) + }) + + it(`requestSnapshot can be awaited reentrantly from a subscriber`, async () => { + const streamRow = { + key: `stream-1`, + value: { id: `1` }, + headers: { + operation: `insert`, + relation: [`public`, `test`], + }, + offset: `0_0`, + } + const snapshotRow = { + key: `snapshot-1`, + value: { id: `2` }, + headers: { + operation: `insert`, + relation: [`public`, `test`], + }, + offset: `0_1`, + } + + let requestCount = 0 + const fetchMock = vi.fn(() => { + requestCount++ + + if (requestCount === 1) { + return Promise.resolve( + new Response( + JSON.stringify([ + streamRow, + { headers: { control: `up-to-date` }, offset: `0_0` }, + ]), + { + status: 200, + headers: { + 'content-type': `application/json`, + 'electric-handle': `handle-1`, + 'electric-offset': `0_0`, + 'electric-schema': `{"id":{"type":"text"}}`, + }, + } + ) + ) + } + + return Promise.resolve( + new Response( + JSON.stringify({ + metadata: { + snapshot_mark: 1, + xmin: `1`, + xmax: `2`, + xip_list: [], + database_lsn: `0`, + }, + data: [snapshotRow], + }), + { + status: 200, + headers: { + 'content-type': `application/json`, + 'electric-handle': `handle-1`, + 'electric-offset': `0_1`, + 'electric-schema': `{"id":{"type":"text"}}`, + }, + } + ) + ) + }) + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + log: `changes_only`, + signal: aborter.signal, + fetchClient: fetchMock, + subscribe: false, + }) + + let requestedSnapshot = false + let reentrantSnapshotResolved = false + stream.subscribe(async (messages) => { + if (requestedSnapshot || !messages.some(isChangeMessage)) return + + requestedSnapshot = true + await stream.requestSnapshot({ limit: 1 }) + reentrantSnapshotResolved = true + }) + + await vi.waitFor(() => { + expect(reentrantSnapshotResolved).toBe(true) + }) + }) + it(`should attach specified headers to requests`, async () => { const eventTarget = new EventTarget() const requestArgs: Array = []