Skip to content

Commit a15cafe

Browse files
devsnekbartlomieju
andauthored
feat(ext/web): transferable {Readable,Writable,Transform}Stream (#31126)
https://streams.spec.whatwg.org/#rs-transfer https://streams.spec.whatwg.org/#ws-transfer https://streams.spec.whatwg.org/#ts-transfer Remaining test failures are due to our `DOMException` not correctly being serializable and can be solved in a followup. ```js // example const INDEX_HTML = Deno.readTextFileSync("./index.html"); const worker = new Worker("./the_algorithm.js", { type: "module" }); Deno.serve(async (req) => { if (req.method === "POST" && req.path === "/the-algorithm") { const { port1, port2 } = new MessageChannel(); worker.postMessage({ stream: req.body, port: port1 }, { transfer: [req.body, port1] }); const res = await new Promise((resolve) => { port1.onmessage = (e) => resolve(e.data); }); return new Response(res); } if (req.path === "/") { return new Response(INDEX_HTML, { "content-type": "text/html" }); } return new Response(null, { status: 404 }); }); ``` --------- Co-authored-by: Bartek Iwańczuk <[email protected]>
1 parent 642f2a4 commit a15cafe

File tree

6 files changed

+380
-16
lines changed

6 files changed

+380
-16
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,6 @@ Untitled*.ipynb
4545
/.ms-playwright
4646

4747
**/.claude/settings.local.json
48+
49+
# pyenv
50+
/.python-version

ext/web/06_streams.js

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ const {
6060
PromisePrototypeThen,
6161
PromiseReject,
6262
PromiseResolve,
63+
PromiseWithResolvers,
6364
RangeError,
6465
ReflectHas,
6566
SafeFinalizationRegistry,
@@ -5157,6 +5158,8 @@ class ReadableStream {
51575158
/** @type {Deferred<void>} */
51585159
[_isClosedPromise];
51595160

5161+
[core.hostObjectBrand] = "ReadableStream";
5162+
51605163
/**
51615164
* @param {UnderlyingSource<R>=} underlyingSource
51625165
* @param {QueuingStrategy<R>=} strategy
@@ -6164,6 +6167,8 @@ class TransformStream {
61646167
/** @type {WritableStream<I>} */
61656168
[_writable];
61666169

6170+
[core.hostObjectBrand] = "TransformStream";
6171+
61676172
/**
61686173
* @param {Transformer<I, O>} transformer
61696174
* @param {QueuingStrategy<I>} writableStrategy
@@ -6174,6 +6179,10 @@ class TransformStream {
61746179
writableStrategy = { __proto__: null },
61756180
readableStrategy = { __proto__: null },
61766181
) {
6182+
if (transformer === _brand) {
6183+
this[_brand] = _brand;
6184+
return;
6185+
}
61776186
const prefix = "Failed to construct 'TransformStream'";
61786187
if (transformer !== undefined) {
61796188
transformer = webidl.converters.object(transformer, prefix, "Argument 1");
@@ -6374,6 +6383,8 @@ class WritableStream {
63746383
/** @type {Deferred<void>[]} */
63756384
[_writeRequests];
63766385

6386+
[core.hostObjectBrand] = "WritableStream";
6387+
63776388
/**
63786389
* @param {UnderlyingSink<W>=} underlyingSink
63796390
* @param {QueuingStrategy<W>=} strategy
@@ -6740,6 +6751,283 @@ function createProxy(stream) {
67406751
return stream.pipeThrough(new TransformStream());
67416752
}
67426753

6754+
function packAndPostMessage(port, type, value) {
6755+
port.postMessage({ type, value, __proto__: null });
6756+
}
6757+
6758+
function crossRealmTransformSendError(port, error) {
6759+
packAndPostMessage(port, "error", error);
6760+
}
6761+
6762+
function packAndPostMessageHandlingError(port, type, value) {
6763+
try {
6764+
packAndPostMessage(port, type, value);
6765+
} catch (e) {
6766+
crossRealmTransformSendError(port, e);
6767+
throw e;
6768+
}
6769+
}
6770+
6771+
/**
6772+
* @param stream {ReadableStream<any>}
6773+
* @param port {MessagePort}
6774+
*/
6775+
function setUpCrossRealmTransformReadable(stream, port) {
6776+
initializeReadableStream(stream);
6777+
const controller = new ReadableStreamDefaultController(_brand);
6778+
port.addEventListener("message", (event) => {
6779+
if (event.data.type === "chunk") {
6780+
readableStreamDefaultControllerEnqueue(controller, event.data.value);
6781+
} else if (event.data.type === "close") {
6782+
readableStreamDefaultControllerClose(controller);
6783+
port.close();
6784+
} else if (event.data.type === "error") {
6785+
readableStreamDefaultControllerError(controller, event.data.value);
6786+
port.close();
6787+
}
6788+
});
6789+
port.addEventListener("messageerror", (event) => {
6790+
crossRealmTransformSendError(port, event.error);
6791+
readableStreamDefaultControllerError(controller, event.error);
6792+
port.close();
6793+
});
6794+
port.start();
6795+
const startAlgorithm = () => undefined;
6796+
const pullAlgorithm = () => {
6797+
packAndPostMessage(port, "pull", undefined);
6798+
return PromiseResolve(undefined);
6799+
};
6800+
const cancelAlgorithm = (reason) => {
6801+
try {
6802+
packAndPostMessageHandlingError(port, "error", reason);
6803+
} catch (e) {
6804+
return PromiseReject(e);
6805+
} finally {
6806+
port.close();
6807+
}
6808+
return PromiseResolve(undefined);
6809+
};
6810+
const sizeAlgorithm = () => 1;
6811+
setUpReadableStreamDefaultController(
6812+
stream,
6813+
controller,
6814+
startAlgorithm,
6815+
pullAlgorithm,
6816+
cancelAlgorithm,
6817+
0,
6818+
sizeAlgorithm,
6819+
);
6820+
}
6821+
6822+
/**
6823+
* @param stream {WritableStream<any>}
6824+
* @param port {MessagePort}
6825+
*/
6826+
function setUpCrossRealmTransformWritable(stream, port) {
6827+
initializeWritableStream(stream);
6828+
const controller = new WritableStreamDefaultController(_brand);
6829+
let backpressurePromise = PromiseWithResolvers();
6830+
port.addEventListener("message", (event) => {
6831+
if (event.data.type === "pull") {
6832+
if (backpressurePromise) {
6833+
backpressurePromise.resolve();
6834+
backpressurePromise = undefined;
6835+
}
6836+
} else if (event.data.type === "error") {
6837+
writableStreamDefaultControllerErrorIfNeeded(
6838+
controller,
6839+
event.data.value,
6840+
);
6841+
if (backpressurePromise) {
6842+
backpressurePromise.resolve();
6843+
backpressurePromise = undefined;
6844+
}
6845+
}
6846+
});
6847+
port.addEventListener("messageerror", (event) => {
6848+
crossRealmTransformSendError(port, event.error);
6849+
writableStreamDefaultControllerErrorIfNeeded(controller, event.error);
6850+
port.close();
6851+
});
6852+
port.start();
6853+
const startAlgorithm = () => undefined;
6854+
const writeAlgorithm = (chunk) => {
6855+
if (!backpressurePromise) {
6856+
backpressurePromise = PromiseWithResolvers();
6857+
backpressurePromise.resolve();
6858+
}
6859+
return PromisePrototypeThen(backpressurePromise.promise, () => {
6860+
backpressurePromise = PromiseWithResolvers();
6861+
try {
6862+
packAndPostMessageHandlingError(port, "chunk", chunk);
6863+
} catch (e) {
6864+
port.close();
6865+
throw e;
6866+
}
6867+
});
6868+
};
6869+
const closeAlgorithm = () => {
6870+
packAndPostMessage(port, "close", undefined);
6871+
port.close();
6872+
return PromiseResolve(undefined);
6873+
};
6874+
const abortAlgorithm = (reason) => {
6875+
try {
6876+
packAndPostMessageHandlingError(port, "error", reason);
6877+
return PromiseResolve(undefined);
6878+
} catch (error) {
6879+
return PromiseReject(error);
6880+
} finally {
6881+
port.close();
6882+
}
6883+
};
6884+
const sizeAlgorithm = () => 1;
6885+
setUpWritableStreamDefaultController(
6886+
stream,
6887+
controller,
6888+
startAlgorithm,
6889+
writeAlgorithm,
6890+
closeAlgorithm,
6891+
abortAlgorithm,
6892+
1,
6893+
sizeAlgorithm,
6894+
);
6895+
}
6896+
6897+
/**
6898+
* @param value {ReadableStream<any>}
6899+
* @param port {MessagePort}
6900+
*/
6901+
function readableStreamTransferSteps(value, port) {
6902+
if (isReadableStreamLocked(value)) {
6903+
throw new DOMException(
6904+
"Cannot transfer a locked ReadableStream",
6905+
"DataCloneError",
6906+
);
6907+
}
6908+
const writable = new WritableStream(_brand);
6909+
setUpCrossRealmTransformWritable(writable, port);
6910+
const promise = readableStreamPipeTo(value, writable, false, false, false);
6911+
setPromiseIsHandledToTrue(promise);
6912+
}
6913+
6914+
/**
6915+
* @param port {MessagePort}
6916+
* @returns {ReadableStream<any>}
6917+
*/
6918+
function readableStreamTransferReceivingSteps(port) {
6919+
const stream = new ReadableStream(_brand);
6920+
setUpCrossRealmTransformReadable(stream, port);
6921+
return stream;
6922+
}
6923+
6924+
/**
6925+
* @param value {WritableStream<any>}
6926+
* @param port {MessagePort}
6927+
*/
6928+
function writableStreamTransferSteps(value, port) {
6929+
if (isWritableStreamLocked(value)) {
6930+
throw new DOMException(
6931+
"Cannot transfer a locked WritableStream",
6932+
"DataCloneError",
6933+
);
6934+
}
6935+
const readable = new ReadableStream(_brand);
6936+
setUpCrossRealmTransformReadable(readable, port);
6937+
const promise = readableStreamPipeTo(readable, value, false, false, false);
6938+
setPromiseIsHandledToTrue(promise);
6939+
}
6940+
6941+
/**
6942+
* @param port {MessagePort}
6943+
* @returns {WritableStream<any>}
6944+
*/
6945+
function writableStreamTransferReceivingSteps(port) {
6946+
const stream = new WritableStream(_brand);
6947+
setUpCrossRealmTransformWritable(stream, port);
6948+
return stream;
6949+
}
6950+
6951+
/**
6952+
* @param value {TransformStream<any>}
6953+
* @param portR {MessagePort}
6954+
* @param portW {MessagePort}
6955+
*/
6956+
function transformStreamTransferSteps(value, portR, portW) {
6957+
if (isReadableStreamLocked(value.readable)) {
6958+
throw new DOMException(
6959+
"Cannot transfer a locked ReadableStream",
6960+
"DataCloneError",
6961+
);
6962+
}
6963+
if (isWritableStreamLocked(value.writable)) {
6964+
throw new DOMException(
6965+
"Cannot transfer a locked WritableStream",
6966+
"DataCloneError",
6967+
);
6968+
}
6969+
readableStreamTransferSteps(value.readable, portR);
6970+
writableStreamTransferSteps(value.writable, portW);
6971+
}
6972+
6973+
/**
6974+
* @param portR {MessagePort}
6975+
* @param portW {MessagePort}
6976+
* @returns {TransformStream<any>}
6977+
*/
6978+
function transformStreamTransferReceivingSteps(portR, portW) {
6979+
const stream = new TransformStream(_brand);
6980+
stream[_readable] = new ReadableStream(_brand);
6981+
setUpCrossRealmTransformReadable(stream[_readable], portR);
6982+
stream[_writable] = new WritableStream(_brand);
6983+
setUpCrossRealmTransformWritable(stream[_writable], portW);
6984+
return stream;
6985+
}
6986+
6987+
core.registerTransferableResource(
6988+
"ReadableStream",
6989+
(value) => {
6990+
const { port1, port2 } = new MessageChannel();
6991+
readableStreamTransferSteps(value, port1);
6992+
return core.getTransferableResource("MessagePort").send(port2);
6993+
},
6994+
(rid) => {
6995+
const port = core.getTransferableResource("MessagePort").receive(rid);
6996+
return readableStreamTransferReceivingSteps(port);
6997+
},
6998+
);
6999+
7000+
core.registerTransferableResource(
7001+
"WritableStream",
7002+
(value) => {
7003+
const { port1, port2 } = new MessageChannel();
7004+
writableStreamTransferSteps(value, port1);
7005+
return core.getTransferableResource("MessagePort").send(port2);
7006+
},
7007+
(rid) => {
7008+
const port = core.getTransferableResource("MessagePort").receive(rid);
7009+
return writableStreamTransferReceivingSteps(port);
7010+
},
7011+
);
7012+
7013+
core.registerTransferableResource(
7014+
"TransformStream",
7015+
(value) => {
7016+
const { port1: portR1, port2: portR2 } = new MessageChannel();
7017+
const { port1: portW1, port2: portW2 } = new MessageChannel();
7018+
transformStreamTransferSteps(value, portR1, portW1);
7019+
return [
7020+
core.getTransferableResource("MessagePort").send(portR2),
7021+
core.getTransferableResource("MessagePort").send(portW2),
7022+
];
7023+
},
7024+
(rids) => {
7025+
const portR = core.getTransferableResource("MessagePort").receive(rids[0]);
7026+
const portW = core.getTransferableResource("MessagePort").receive(rids[1]);
7027+
return transformStreamTransferReceivingSteps(portR, portW);
7028+
},
7029+
);
7030+
67437031
webidl.converters.ReadableStream = webidl
67447032
.createInterfaceConverter("ReadableStream", ReadableStream.prototype);
67457033
webidl.converters.WritableStream = webidl

ext/web/13_message_port.js

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,13 @@ function deserializeJsMessageData(messageData) {
385385
ArrayPrototypePush(hostObjects, hostObj);
386386
break;
387387
}
388+
case "multiResource": {
389+
const { 0: type, 1: rids } = transferable.data;
390+
const hostObj = core.getTransferableResource(type).receive(rids);
391+
ArrayPrototypePush(transferables, hostObj);
392+
ArrayPrototypePush(hostObjects, hostObj);
393+
break;
394+
}
388395
case "arrayBuffer": {
389396
ArrayPrototypePush(transferredArrayBuffers, transferable.data);
390397
const index = ArrayPrototypePush(transferables, null);
@@ -460,10 +467,17 @@ function serializeJsMessageData(data, transferables) {
460467
if (transferable[core.hostObjectBrand]) {
461468
const type = transferable[core.hostObjectBrand];
462469
const rid = core.getTransferableResource(type).send(transferable);
463-
ArrayPrototypePush(serializedTransferables, {
464-
kind: "resource",
465-
data: [type, rid],
466-
});
470+
if (typeof rid === "number") {
471+
ArrayPrototypePush(serializedTransferables, {
472+
kind: "resource",
473+
data: [type, rid],
474+
});
475+
} else {
476+
ArrayPrototypePush(serializedTransferables, {
477+
kind: "multiResource",
478+
data: [type, rid],
479+
});
480+
}
467481
} else if (isArrayBuffer(transferable)) {
468482
ArrayPrototypePush(serializedTransferables, {
469483
kind: "arrayBuffer",

0 commit comments

Comments
 (0)