Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-requestsnapshot-publish-ordering.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/client': patch
---

Fix `requestSnapshot()` so it resolves only after the injected snapshot batch has been delivered to subscribers, including async and reentrant subscriber paths.
34 changes: 27 additions & 7 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,12 @@ export class ShapeStream<T extends Row<unknown> = Row>
#tickPromiseResolver?: () => void
#tickPromiseRejecter?: (reason?: unknown) => void
#messageChain = Promise.resolve<void[]>([]) // promise chain for incoming messages
// Tracks when subscriber callbacks are actively being delivered from
// #messageChain. requestSnapshot can inject a nested batch from inside a
// subscriber; in that reentrant case #publish uses this as an intentional
// escape hatch to deliver the nested snapshot batch immediately rather than
// queueing it behind the subscriber that is awaiting it.
#isPublishing = false
#snapshotTracker = new SnapshotTracker()
#pauseLock: PauseLock
#currentFetchUrl?: URL // Current fetch URL for computing shape key
Expand Down Expand Up @@ -1719,11 +1725,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
}

async #publish(messages: Message<T>[]): Promise<void[]> {
// We process messages asynchronously
// but SSE's `onmessage` handler is synchronous.
// We use a promise chain to ensure that the handlers
// execute sequentially in the order the messages were received.
this.#messageChain = this.#messageChain.then(() =>
const deliver = () =>
Promise.all(
Array.from(this.#subscribers.values()).map(async ([callback, __]) => {
try {
Expand All @@ -1735,7 +1737,25 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
})
)
)

// We process messages asynchronously but SSE's `onmessage` handler is
// synchronous. Use a promise chain to ensure handlers execute sequentially
// in the order messages were received. If a subscriber reentrantly requests
// a snapshot, deliver that nested batch immediately instead of appending it
// behind the currently-running subscriber callback, which would deadlock
// when requestSnapshot awaits publication.
if (this.#isPublishing) {
return deliver()
}

this.#messageChain = this.#messageChain.then(async () => {
this.#isPublishing = true
try {
return await deliver()
} finally {
this.#isPublishing = false
}
})

return this.#messageChain
}
Expand Down Expand Up @@ -1901,7 +1921,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
metadata,
new Set(data.map((message) => message.key))
)
this.#onMessages(dataWithEndBoundary, false)
await this.#onMessages(dataWithEndBoundary, false)

// On cold start the stream's offset is still at "now". Advance it
// to the snapshot's position so no updates are missed in between.
Expand Down
9 changes: 5 additions & 4 deletions packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1694,10 +1694,11 @@ describe.for(fetchAndSse)(
limit: 100,
})

// Wait until shape reflects the snapshot
await vi.waitFor(() => {
expect(shape.currentRows.length).toBe(data.length)
})
// requestSnapshot must not resolve until subscriber callbacks for the
// injected snapshot batch have completed. Callers such as TanStack DB's
// on-demand loadSubset rely on this to make immediate reads after await
// consistent.
expect(shape.currentRows.length).toBe(data.length)

// Compare keys in stream vs returned snapshot data
const returnedKeys = new Set(data.map((m) => m.key))
Expand Down
180 changes: 180 additions & 0 deletions packages/typescript-client/test/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import {
ShapeStream,
isChangeMessage,
isControlMessage,
Message,
Row,
_resetHttpWarningForTesting,
Expand All @@ -24,6 +25,185 @@ describe(`ShapeStream`, () => {

afterEach(() => aborter.abort())

it(`requestSnapshot waits for snapshot messages to be published to subscribers before resolving`, async () => {
const snapshotRow = {
key: `test-1`,
value: { id: `1` },
headers: {
operation: `insert`,
relation: [`public`, `test`],
},
offset: `0_0`,
}

const fetchMock = vi.fn(() =>
Promise.resolve(
new Response(
JSON.stringify({
metadata: {
snapshot_mark: 1,
xmin: `1`,
xmax: `2`,
xip_list: [],
database_lsn: `0`,
},
data: [snapshotRow],
}),
{
status: 200,
headers: {
'content-type': `application/json`,
'electric-handle': `handle-1`,
'electric-offset': `0_0`,
'electric-schema': `{"id":{"type":"text"}}`,
},
}
)
)
)

const stream = new ShapeStream({
url: shapeUrl,
params: { table: `test` },
log: `changes_only`,
signal: aborter.signal,
fetchClient: fetchMock,
subscribe: false,
})

let releaseSubscriber!: () => void
const subscriberFinished = new Promise<void>((resolve) => {
releaseSubscriber = resolve
})
let snapshotRequestResolved = false
let publishedMessages: Message<Row>[] = []

stream.subscribe(async (messages) => {
if (messages.some(isChangeMessage)) {
publishedMessages = messages
await subscriberFinished
}
})

const snapshotRequest = stream.requestSnapshot({ limit: 1 }).then(() => {
snapshotRequestResolved = true
})

await resolveInMacrotask(undefined)
expect(snapshotRequestResolved).toBe(false)

releaseSubscriber()
await snapshotRequest

expect(publishedMessages.some(isChangeMessage)).toBe(true)
expect(
publishedMessages.some(
(message) =>
isControlMessage(message) &&
message.headers.control === `snapshot-end`
)
).toBe(true)
expect(
publishedMessages.some(
(message) =>
isControlMessage(message) && message.headers.control === `subset-end`
)
).toBe(true)
})

it(`requestSnapshot can be awaited reentrantly from a subscriber`, async () => {
const streamRow = {
key: `stream-1`,
value: { id: `1` },
headers: {
operation: `insert`,
relation: [`public`, `test`],
},
offset: `0_0`,
}
const snapshotRow = {
key: `snapshot-1`,
value: { id: `2` },
headers: {
operation: `insert`,
relation: [`public`, `test`],
},
offset: `0_1`,
}

let requestCount = 0
const fetchMock = vi.fn(() => {
requestCount++

if (requestCount === 1) {
return Promise.resolve(
new Response(
JSON.stringify([
streamRow,
{ headers: { control: `up-to-date` }, offset: `0_0` },
]),
{
status: 200,
headers: {
'content-type': `application/json`,
'electric-handle': `handle-1`,
'electric-offset': `0_0`,
'electric-schema': `{"id":{"type":"text"}}`,
},
}
)
)
}

return Promise.resolve(
new Response(
JSON.stringify({
metadata: {
snapshot_mark: 1,
xmin: `1`,
xmax: `2`,
xip_list: [],
database_lsn: `0`,
},
data: [snapshotRow],
}),
{
status: 200,
headers: {
'content-type': `application/json`,
'electric-handle': `handle-1`,
'electric-offset': `0_1`,
'electric-schema': `{"id":{"type":"text"}}`,
},
}
)
)
})

const stream = new ShapeStream({
url: shapeUrl,
params: { table: `test` },
log: `changes_only`,
signal: aborter.signal,
fetchClient: fetchMock,
subscribe: false,
})

let requestedSnapshot = false
let reentrantSnapshotResolved = false
stream.subscribe(async (messages) => {
if (requestedSnapshot || !messages.some(isChangeMessage)) return

requestedSnapshot = true
await stream.requestSnapshot({ limit: 1 })
reentrantSnapshotResolved = true
})

await vi.waitFor(() => {
expect(reentrantSnapshotResolved).toBe(true)
})
})

it(`should attach specified headers to requests`, async () => {
const eventTarget = new EventTarget()
const requestArgs: Array<RequestInit | undefined> = []
Expand Down
Loading