Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,12 @@ export class ShapeStream<T extends Row<unknown> = Row>
const schema = this.#schema! // we know that it is not undefined because it is set by `this.#onInitialResponse`
const res = await response.text()
const messages = res || `[]`
const batch = this.#messageParser.parse<Array<Message<T>>>(messages, schema)
// Use async parsing to yield to the main thread periodically,
// preventing UI blocking when processing large responses
const batch = await this.#messageParser.parseAsync<Array<Message<T>>>(
messages,
schema
)

await this.#onMessages(batch)
}
Expand Down Expand Up @@ -1512,10 +1517,11 @@ export class ShapeStream<T extends Row<unknown> = Row>
})

const { metadata, data: rawData } = await response.json()
const data = this.#messageParser.parseSnapshotData<ChangeMessage<T>>(
rawData,
schema
)
// Use async parsing to yield to the main thread periodically,
// preventing UI blocking when processing large snapshots
const data = await this.#messageParser.parseSnapshotDataAsync<
ChangeMessage<T>
>(rawData, schema)

return {
metadata,
Expand Down
1 change: 1 addition & 0 deletions packages/typescript-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ export {
snakeToCamel,
camelToSnake,
} from './column-mapper'
export { yieldToMain } from './yield'
99 changes: 99 additions & 0 deletions packages/typescript-client/src/parser.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ColumnInfo, GetExtensions, Row, Schema, Value } from './types'
import { ParserNullValueError } from './error'
import { yieldToMain, DEFAULT_YIELD_EVERY } from './yield'

type Token = string
type NullableToken = Token | null
Expand Down Expand Up @@ -157,6 +158,104 @@ export class MessageParser<T extends Row<unknown>> {
})
}

/**
* Async version of parse() that yields to the main thread periodically
* to prevent UI blocking when processing large datasets.
*
* This is useful when parsing large shape responses where blocking the
* main thread would cause UI jank or unresponsiveness.
*
* @param messages - JSON string of messages to parse
* @param schema - Schema for type parsing
*/
async parseAsync<Result>(messages: string, schema: Schema): Promise<Result> {
// First, parse the JSON without transformation (this is fast)
const parsed = JSON.parse(messages)

// If it's not an array, just transform it directly (single message)
if (!Array.isArray(parsed)) {
this.transformParsedMessage(parsed, schema)
return parsed as Result
}

// For arrays, transform each message with yielding
for (let i = 0; i < parsed.length; i++) {
this.transformParsedMessage(parsed[i], schema)

// Yield periodically to prevent blocking
if ((i + 1) % DEFAULT_YIELD_EVERY === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i'd prefer if parseAsync and parseSnapshotDataAsync would take the yield every configuration as an argument instead of relying on a global. We can still have a global configuration but i would pass that global as an argument when calling these functions.

await yieldToMain()
}
}

return parsed as Result
}

/**
* Async version of parseSnapshotData() that yields to the main thread periodically
* to prevent UI blocking when processing large snapshots.
*
* @param messages - Array of already-parsed messages to transform
* @param schema - Schema for type parsing
*/
async parseSnapshotDataAsync<Result>(
messages: Array<unknown>,
schema: Schema
): Promise<Array<Result>> {
const results: Array<Result> = []

for (let i = 0; i < messages.length; i++) {
const msg = messages[i] as Record<string, unknown>

// Transform the value property if it exists
if (msg.value && typeof msg.value === `object` && msg.value !== null) {
msg.value = this.transformMessageValue(msg.value, schema)
}

// Transform the old_value property if it exists
if (
msg.old_value &&
typeof msg.old_value === `object` &&
msg.old_value !== null
) {
msg.old_value = this.transformMessageValue(msg.old_value, schema)
}

results.push(msg as Result)

// Yield periodically to prevent blocking
if ((i + 1) % DEFAULT_YIELD_EVERY === 0) {
await yieldToMain()
}
}

return results
}

/**
* Transform value and old_value properties of a parsed message object.
* Used by parseAsync to transform messages after initial JSON parsing.
*/
private transformParsedMessage(
message: Record<string, unknown>,
schema: Schema
): void {
if (
message.value &&
typeof message.value === `object` &&
message.value !== null
) {
message.value = this.transformMessageValue(message.value, schema)
}
if (
message.old_value &&
typeof message.old_value === `object` &&
message.old_value !== null
) {
message.old_value = this.transformMessageValue(message.old_value, schema)
}
}

/**
* Transform a message value or old_value object by parsing its columns.
*/
Expand Down
42 changes: 42 additions & 0 deletions packages/typescript-client/src/yield.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Utility for yielding control back to the main thread to prevent UI blocking.
*
* This is particularly useful when processing large amounts of data (like parsing
* large shapes) to ensure the UI remains responsive.
*/

/**
* Default number of items to process before yielding to the main thread.
* This value balances responsiveness with overhead from yielding.
*/
export const DEFAULT_YIELD_EVERY = 1000

/**
* Yields control back to the main thread, allowing the browser to handle
* user interactions, rendering, and other tasks.
*
* Uses `scheduler.yield()` if available (Chrome 129+, behind flag in other browsers),
* otherwise falls back to `setTimeout(0)`.
*
* @returns A promise that resolves after yielding to the main thread
*/
export function yieldToMain(): Promise<void> {
// Check if scheduler.yield is available (modern browsers)
// Guard typeof globalThis first to avoid reference errors in exotic environments
if (typeof globalThis !== `undefined`) {
const g = globalThis as GlobalWithScheduler
if (g.scheduler && typeof g.scheduler.yield === `function`) {
return g.scheduler.yield()
}
}

// Fallback to setTimeout(0) which yields to the event loop
return new Promise((resolve) => setTimeout(resolve, 0))
}

// Type definitions for the Scheduler API (available in some modern browsers)
interface GlobalWithScheduler {
scheduler?: {
yield: () => Promise<void>
}
}
112 changes: 111 additions & 1 deletion packages/typescript-client/test/parser.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { describe, expect, it } from 'vitest'
import { describe, expect, it, vi } from 'vitest'
import { MessageParser, defaultParser, pgArrayParser } from '../src/parser'
import * as yieldModule from '../src/yield'

describe(`Default parser`, () => {
it(`should parse integers`, () => {
Expand Down Expand Up @@ -268,3 +269,112 @@ describe(`Message parser`, () => {
).toEqual([{ value: { a: [1, 2, null, 4, 5] } }])
})
})

describe(`Async Message parser`, () => {
const parser = new MessageParser()

it(`should parse messages asynchronously with same results as sync`, async () => {
const messages = `[ { "value": { "a": "123" } }, { "value": { "a": "456" } } ]`
const schema = { a: { type: `int4` } }

const syncResult = parser.parse(messages, schema)
const asyncResult = await parser.parseAsync(messages, schema)

expect(asyncResult).toEqual(syncResult)
})

it(`should parse a single message (non-array)`, async () => {
const message = `{ "value": { "a": "123" } }`
const schema = { a: { type: `int4` } }

const result = await parser.parseAsync(message, schema)
expect(result).toEqual({ value: { a: 123 } })
})

it(`should parse snapshot data asynchronously with same results as sync`, async () => {
const messages = [
{ value: { a: `123` } },
{ value: { a: `456` } },
{ value: { a: `789` } },
]
const schema = { a: { type: `int4` } }

const syncResult = parser.parseSnapshotData(
structuredClone(messages),
schema
)
const asyncResult = await parser.parseSnapshotDataAsync(
structuredClone(messages),
schema
)

expect(asyncResult).toEqual(syncResult)
})

it(`should yield periodically during async snapshot parsing`, async () => {
const yieldSpy = vi.spyOn(yieldModule, `yieldToMain`).mockResolvedValue()

// Create messages that exceed the yield threshold (DEFAULT_YIELD_EVERY = 1000)
const messageCount = 2500
const messages = Array.from({ length: messageCount }, (_, i) => ({
value: { a: `${i}` },
}))
const schema = { a: { type: `int4` } }

await parser.parseSnapshotDataAsync(messages, schema)

// Should yield twice (after 1000 and 2000 messages)
expect(yieldSpy).toHaveBeenCalledTimes(2)

yieldSpy.mockRestore()
})

it(`should yield periodically during parseAsync`, async () => {
const yieldSpy = vi.spyOn(yieldModule, `yieldToMain`).mockResolvedValue()

// Create a JSON string with many messages that exceed the yield threshold
const messageCount = 2500
const messagesArray = Array.from({ length: messageCount }, (_, i) => ({
value: { a: `${i}` },
}))
const messagesJson = JSON.stringify(messagesArray)
const schema = { a: { type: `int4` } }

await parser.parseAsync(messagesJson, schema)

// Should yield twice (after 1000 and 2000 messages)
expect(yieldSpy).toHaveBeenCalledTimes(2)

yieldSpy.mockRestore()
})

it(`should not yield for small datasets`, async () => {
const yieldSpy = vi.spyOn(yieldModule, `yieldToMain`).mockResolvedValue()

const messages = [{ value: { a: `123` } }, { value: { a: `456` } }]
const schema = { a: { type: `int4` } }

await parser.parseSnapshotDataAsync(messages, schema)

// Should not yield for just 2 messages
expect(yieldSpy).not.toHaveBeenCalled()

yieldSpy.mockRestore()
})

it(`should handle old_value in async parsing`, async () => {
const messages = `[ { "value": { "a": "123" }, "old_value": { "a": "100" } } ]`
const schema = { a: { type: `int4` } }

const result = await parser.parseAsync(messages, schema)
expect(result).toEqual([{ value: { a: 123 }, old_value: { a: 100 } }])
})

it(`should handle null values in async parsing`, async () => {
const messages = [{ value: { a: null } }]
const schema = { a: { type: `int4` } }

const result = await parser.parseSnapshotDataAsync(messages, schema)
expect(result).toEqual([{ value: { a: null } }])
})
})
80 changes: 80 additions & 0 deletions packages/typescript-client/test/yield.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { describe, expect, it, vi, afterEach } from 'vitest'
import { yieldToMain } from '../src/yield'

describe(`yieldToMain`, () => {
afterEach(() => {
vi.restoreAllMocks()
})

it(`should return a promise that resolves`, async () => {
const result = yieldToMain()
expect(result).toBeInstanceOf(Promise)
await expect(result).resolves.toBeUndefined()
})

it(`should use scheduler.yield when available`, async () => {
const mockYield = vi.fn().mockResolvedValue(undefined)
type GlobalWithScheduler = { scheduler?: { yield?: () => Promise<void> } }
const g = globalThis as unknown as GlobalWithScheduler
const originalScheduler = g.scheduler

// Mock scheduler.yield
g.scheduler = { yield: mockYield }

await yieldToMain()
expect(mockYield).toHaveBeenCalled()

// Restore
if (originalScheduler) {
g.scheduler = originalScheduler
} else {
delete g.scheduler
}
})

it(`should fall back to setTimeout when scheduler.yield is not available`, async () => {
type GlobalWithScheduler = { scheduler?: { yield?: () => Promise<void> } }
const g = globalThis as unknown as GlobalWithScheduler
const originalScheduler = g.scheduler

// Remove scheduler
delete g.scheduler

// Track that setTimeout is used
const setTimeoutSpy = vi.spyOn(globalThis, `setTimeout`)

await yieldToMain()

expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), 0)

// Restore
if (originalScheduler) {
g.scheduler = originalScheduler
}
setTimeoutSpy.mockRestore()
})

it(`should yield control to allow other tasks to run`, async () => {
// This test verifies that yieldToMain actually yields - we can't
// guarantee exact ordering of setTimeout callbacks, but we can verify
// that other tasks get a chance to run during the yield
const taskRan = { value: false }

const yieldingTask = (async () => {
await yieldToMain()
return taskRan.value
})()

// Queue a task that should have a chance to run during the yield
setTimeout(() => {
taskRan.value = true
}, 0)

// Wait for both the yield and the setTimeout
await new Promise((resolve) => setTimeout(resolve, 10))
await yieldingTask

// The setTimeout should have had a chance to run
expect(taskRan.value).toBe(true)
})
})
Loading