From da5b0ee5520ae02ce6193555e168ff11f3118dcc Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 13 Jun 2026 09:05:02 +0200 Subject: [PATCH 1/2] stream: reduce allocations on WHATWG streams hot paths Pure-JavaScript optimizations to lib/internal/webstreams/* that reduce per-chunk and per-construction allocations on hot paths without observable behavior change. Per-chunk: reuse promise-reaction closures per controller, add buffered fast path for async iterator, specialize callback wrappers by arity, and share immutable nil records for writable stream resets. Per-construction: use queueMicrotask for non-object start results, materialize reader/writer .closed and .ready records lazily, and remove dead allocations. Assisted-by: Claude Fable 5 Signed-off-by: Matteo Collina --- benchmark/webstreams/creation.js | 11 + lib/internal/webstreams/readablestream.js | 292 +++++++++++---- lib/internal/webstreams/transformstream.js | 9 +- lib/internal/webstreams/util.js | 57 ++- lib/internal/webstreams/writablestream.js | 412 ++++++++++----------- 5 files changed, 484 insertions(+), 297 deletions(-) diff --git a/benchmark/webstreams/creation.js b/benchmark/webstreams/creation.js index 2c3e1d273f6d4c..749ea960638bd0 100644 --- a/benchmark/webstreams/creation.js +++ b/benchmark/webstreams/creation.js @@ -21,6 +21,11 @@ const bench = common.createBenchmark(main, { 'ReadableStream.tee', ], +}, { + // Each case collects garbage right before bench.start() so that the + // timed window measures the work under test rather than leftover + // GC work from the setup phase. + flags: ['--expose-gc'], }); let readableStream; @@ -33,6 +38,7 @@ let teeResult; function main({ n, kind }) { switch (kind) { case 'ReadableStream': + globalThis.gc(); bench.start(); for (let i = 0; i < n; ++i) readableStream = new ReadableStream(); @@ -42,6 +48,7 @@ function main({ n, kind }) { assert.ok(readableStream); break; case 'WritableStream': + globalThis.gc(); bench.start(); for (let i = 0; i < n; ++i) writableStream = new WritableStream(); @@ -51,6 +58,7 @@ function main({ n, kind }) { assert.ok(writableStream); break; case 'TransformStream': + globalThis.gc(); bench.start(); for (let i = 0; i < n; ++i) transformStream = new TransformStream(); @@ -62,6 +70,7 @@ function main({ n, kind }) { case 'ReadableStreamDefaultReader': { const readers = Array.from({ length: n }, () => new ReadableStream()); + globalThis.gc(); bench.start(); for (let i = 0; i < n; ++i) readableStreamDefaultReader = new ReadableStreamDefaultReader(readers[i]); @@ -74,6 +83,7 @@ function main({ n, kind }) { case 'ReadableStreamBYOBReader': { const readers = Array.from({ length: n }, () => new ReadableStream({ type: 'bytes' })); + globalThis.gc(); bench.start(); for (let i = 0; i < n; ++i) readableStreamBYOBReader = new ReadableStreamBYOBReader(readers[i]); @@ -86,6 +96,7 @@ function main({ n, kind }) { case 'ReadableStream.tee': { const streams = Array.from({ length: n }, () => new ReadableStream()); + globalThis.gc(); bench.start(); for (let i = 0; i < n; ++i) teeResult = streams[i].tee(); diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 8ae3fff11abbf1..573f0c30e47146 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -104,8 +104,9 @@ const { canCopyArrayBuffer, cloneAsUint8Array, copyArrayBuffer, - createPromiseCallback, + createPromiseCallback1, customInspect, + defaultSizeAlgorithm, dequeueValue, enqueueValueWithSize, extractHighWaterMark, @@ -118,7 +119,9 @@ const { nonOpCancel, nonOpPull, nonOpStart, + rejectedHandledRecord, resetQueue, + resolvedRecord, setPromiseHandled, } = require('internal/webstreams/util'); @@ -135,6 +138,8 @@ const { writableStreamDefaultWriterCloseWithErrorPropagation, writableStreamDefaultWriterRelease, writableStreamDefaultWriterWrite, + writerClosedPromise, + writerReadyPromise, } = require('internal/webstreams/writablestream'); const { Buffer } = require('buffer'); @@ -251,9 +256,6 @@ class ReadableStream { validateObject(strategy, 'strategy', kValidateObjectAllowObjectsAndNull); this[kState] = createReadableStreamState(); - this[kIsClosedPromise] = PromiseWithResolvers(); - this[kControllerErrorFunction] = () => {}; - // The spec requires handling of the strategy first // here. Specifically, if getting the size and // highWaterMark from the strategy fail, that has @@ -294,6 +296,36 @@ class ReadableStream { return this[kState].state === 'readable'; } + [kControllerErrorFunction](error) { + // Used by the internal stream interop (addAbortSignal). Historically + // only default controllers were wired here; byte stream controllers + // keep the previous no-op behavior. + const controller = this[kState].controller; + if (isReadableStreamDefaultController(controller)) + controller.error(error); + } + + // Used by the internal stream interop (end-of-stream). Materialized + // lazily since its settlement is derivable from the stream state; the + // settle sites in readableStreamClose/Error only touch the cache. + get [kIsClosedPromise]() { + let cache = this[kState].closedPromise; + if (cache === undefined) { + switch (this[kState].state) { + case 'readable': + cache = PromiseWithResolvers(); + break; + case 'closed': + cache = resolvedRecord(); + break; + default: + cache = rejectedHandledRecord(this[kState].storedError); + } + this[kState].closedPromise = cache; + } + return cache; + } + /** * @readonly * @type {boolean} @@ -535,9 +567,38 @@ class ReadableStream { state.current = PromiseResolve(); started = true; } - state.current = state.current !== undefined ? - PromisePrototypeThen(state.current, nextSteps, nextSteps) : - nextSteps(); + if (state.current !== undefined) { + state.current = + PromisePrototypeThen(state.current, nextSteps, nextSteps); + return state.current; + } + // No read is in flight. Mirror the buffered fast path of + // ReadableStreamDefaultReader.read(): when data is already queued + // in a default controller, resolve immediately without allocating + // a read request. The result settles synchronously, so leaving + // state.current undefined matches the state the slow path reaches + // once its read request callbacks have settled. + const stream = reader[kState].stream; + if (!state.done && stream !== undefined) { + const controller = stream[kState].controller; + if (stream[kState].state === 'readable' && + isReadableStreamDefaultController(controller) && + controller[kState].queue.length > 0) { + stream[kState].disturbed = true; + const chunk = dequeueValue(controller); + + if (controller[kState].closeRequested && + !controller[kState].queue.length) { + readableStreamDefaultControllerClearAlgorithms(controller); + readableStreamClose(stream); + } else { + readableStreamDefaultControllerCallPullIfNeeded(controller); + } + + return PromiseResolve({ done: false, value: chunk }); + } + } + state.current = nextSteps(); return state.current; }, @@ -611,7 +672,7 @@ class ReadableStream { // lingering promise not being properly resolved. // https://github.com/nodejs/node/issues/51486 new transfer.CrossRealmTransformReadableSource(port, true), - 0, () => 1); + 0, defaultSizeAlgorithm); } } @@ -641,8 +702,6 @@ function InternalTransferredReadableStream() { markTransferMode(this, false, true); this[kType] = 'ReadableStream'; this[kState] = createReadableStreamState(); - - this[kIsClosedPromise] = PromiseWithResolvers(); } ObjectSetPrototypeOf(InternalTransferredReadableStream.prototype, ReadableStream.prototype); @@ -833,13 +892,10 @@ class ReadableStreamDefaultReader { if (!isReadableStream(stream)) throw new ERR_INVALID_ARG_TYPE('stream', 'ReadableStream', stream); this[kState] = { - readRequests: [], + // All fields are unconditionally assigned during setup. + readRequests: undefined, stream: undefined, - close: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, + close: undefined, }; setupReadableStreamDefaultReader(this, stream); } @@ -903,7 +959,7 @@ class ReadableStreamDefaultReader { get closed() { if (!isReadableStreamDefaultReader(this)) return PromiseReject(new ERR_INVALID_THIS('ReadableStreamDefaultReader')); - return this[kState].close.promise; + return readerClosedPromise(this).promise; } /** @@ -924,7 +980,7 @@ class ReadableStreamDefaultReader { return customInspect(depth, options, this[kType], { stream: this[kState].stream, readRequests: this[kState].readRequests.length, - close: this[kState].close.promise, + close: readerClosedPromise(this).promise, }); } } @@ -947,13 +1003,10 @@ class ReadableStreamBYOBReader { if (!isReadableStream(stream)) throw new ERR_INVALID_ARG_TYPE('stream', 'ReadableStream', stream); this[kState] = { + // All fields are unconditionally assigned during setup. stream: undefined, - readIntoRequests: [], - close: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, + readIntoRequests: undefined, + close: undefined, }; setupReadableStreamBYOBReader(this, stream); } @@ -1043,7 +1096,7 @@ class ReadableStreamBYOBReader { get closed() { if (!isReadableStreamBYOBReader(this)) return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader')); - return this[kState].close.promise; + return readerClosedPromise(this).promise; } /** @@ -1064,7 +1117,7 @@ class ReadableStreamBYOBReader { return customInspect(depth, options, this[kType], { stream: this[kState].stream, readIntoRequests: this[kState].readIntoRequests.length, - close: this[kState].close.promise, + close: readerClosedPromise(this).promise, }); } } @@ -1272,7 +1325,6 @@ function InternalReadableStream(start, pull, cancel, highWaterMark, size) { markTransferMode(this, false, true); this[kType] = 'ReadableStream'; this[kState] = createReadableStreamState(); - this[kIsClosedPromise] = PromiseWithResolvers(); const controller = new ReadableStreamDefaultController(kSkipThrow); setupReadableStreamDefaultController( this, @@ -1287,7 +1339,7 @@ function InternalReadableStream(start, pull, cancel, highWaterMark, size) { ObjectSetPrototypeOf(InternalReadableStream.prototype, ReadableStream.prototype); ObjectSetPrototypeOf(InternalReadableStream, ReadableStream); -function createReadableStream(start, pull, cancel, highWaterMark = 1, size = () => 1) { +function createReadableStream(start, pull, cancel, highWaterMark = 1, size = defaultSizeAlgorithm) { const stream = new InternalReadableStream(start, pull, cancel, highWaterMark, size); // For spec compliance the InternalReadableStream must be a ReadableStream @@ -1300,7 +1352,6 @@ function InternalReadableByteStream(start, pull, cancel) { markTransferMode(this, false, true); this[kType] = 'ReadableStream'; this[kState] = createReadableStreamState(); - this[kIsClosedPromise] = PromiseWithResolvers(); const controller = new ReadableByteStreamController(kSkipThrow); setupReadableByteStreamController( this, @@ -1341,6 +1392,7 @@ const isReadableStreamBYOBReader = function createReadableStreamState() { return { __proto__: null, + closedPromise: undefined, disturbed: false, reader: undefined, state: 'readable', @@ -1549,7 +1601,7 @@ function readableStreamPipeTo( if (shuttingDown) return true; if (dest[kState].backpressure) { - await writer[kState].ready.promise; + await writerReadyPromise(writer).promise; if (shuttingDown) return true; } @@ -1573,7 +1625,7 @@ function readableStreamPipeTo( } // Write the chunk - we're already in a separate microtask from enqueue - // because we awaited writer[kState].ready.promise above. + // because we awaited the writer ready promise above. state.currentWrite = writableStreamDefaultWriterWrite(writer, chunk); markPromiseAsHandled(state.currentWrite); @@ -1623,7 +1675,7 @@ function readableStreamPipeTo( setPromiseHandled(run()); - watchErrored(source, reader[kState].close.promise, (error) => { + watchErrored(source, readerClosedPromise(reader).promise, (error) => { if (!preventAbort) { return shutdownWithAnAction( () => writableStreamAbort(dest, error), @@ -1633,7 +1685,7 @@ function readableStreamPipeTo( shutdown(true, error); }); - watchErrored(dest, writer[kState].close.promise, (error) => { + watchErrored(dest, writerClosedPromise(writer).promise, (error) => { if (!preventCancel) { return shutdownWithAnAction( () => readableStreamCancel(source, error), @@ -1643,7 +1695,7 @@ function readableStreamPipeTo( shutdown(true, error); }); - watchClosed(source, reader[kState].close.promise, () => { + watchClosed(source, readerClosedPromise(reader).promise, () => { if (!preventClose) { return shutdownWithAnAction( () => writableStreamDefaultWriterCloseWithErrorPropagation(writer)); @@ -1782,7 +1834,7 @@ function readableStreamDefaultTee(stream, cloneForBranch2) { createReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm); PromisePrototypeThen( - reader[kState].close.promise, + readerClosedPromise(reader).promise, undefined, (error) => { readableStreamDefaultControllerError(branch1[kState].controller, error); @@ -1812,7 +1864,7 @@ function readableByteStreamTee(stream) { function forwardReaderError(thisReader) { PromisePrototypeThen( - thisReader[kState].close.promise, + readerClosedPromise(thisReader).promise, undefined, (error) => { if (thisReader !== reader) { @@ -2117,7 +2169,7 @@ function readableStreamCancel(stream, reason) { function readableStreamClose(stream) { assert(stream[kState].state === 'readable'); stream[kState].state = 'closed'; - stream[kIsClosedPromise].resolve(); + stream[kState].closedPromise?.resolve?.(); const { reader, } = stream[kState]; @@ -2125,7 +2177,7 @@ function readableStreamClose(stream) { if (reader === undefined) return; - reader[kState].close.resolve(); + reader[kState].close?.resolve?.(); if (readableStreamHasDefaultReader(stream)) { for (let n = 0; n < reader[kState].readRequests.length; n++) @@ -2138,8 +2190,11 @@ function readableStreamError(stream, error) { assert(stream[kState].state === 'readable'); stream[kState].state = 'errored'; stream[kState].storedError = error; - setPromiseHandled(stream[kIsClosedPromise].promise); - stream[kIsClosedPromise].reject(error); + const closedPromiseCache = stream[kState].closedPromise; + if (closedPromiseCache !== undefined) { + setPromiseHandled(closedPromiseCache.promise); + closedPromiseCache.reject?.(error); + } const { reader, @@ -2148,8 +2203,11 @@ function readableStreamError(stream, error) { if (reader === undefined) return; - setPromiseHandled(reader[kState].close.promise); - reader[kState].close.reject(error); + const closeCache = reader[kState].close; + if (closeCache !== undefined) { + setPromiseHandled(closeCache.promise); + closeCache.reject?.(error); + } if (readableStreamHasDefaultReader(stream)) { for (let n = 0; n < reader[kState].readRequests.length; n++) @@ -2253,26 +2311,53 @@ function readableStreamReaderGenericCancel(reader, reason) { function readableStreamReaderGenericInitialize(reader, stream) { reader[kState].stream = stream; stream[kState].reader = reader; - switch (stream[kState].state) { - case 'readable': - reader[kState].close = PromiseWithResolvers(); - break; - case 'closed': - reader[kState].close = { - promise: PromiseResolve(), + // The reader's closed promise (reader[kState].close) is materialized + // lazily by readerClosedPromise(); its settlement is fully derivable + // from the reader/stream state, so nothing is allocated here. +} + +// Materializes the reader's [[closedPromise]] record on first +// observation. While unobserved, its settlement is derivable: a +// detached reader was released (rejected with the shared released +// error); otherwise it follows the stream state. The settle sites +// (readableStreamClose/Error, readableStreamReaderGenericRelease) +// only touch the record when it has been materialized. +function readerClosedPromise(reader) { + let close = reader[kState].close; + if (close === undefined) { + const stream = reader[kState].stream; + if (stream === undefined) { + close = { + promise: PromiseReject(lazyReadableReleasedError()), resolve: undefined, reject: undefined, }; - break; - case 'errored': - reader[kState].close = { - promise: PromiseReject(stream[kState].storedError), - resolve: undefined, - reject: undefined, - }; - setPromiseHandled(reader[kState].close.promise); - break; + setPromiseHandled(close.promise); + } else { + switch (stream[kState].state) { + case 'readable': + close = PromiseWithResolvers(); + break; + case 'closed': + close = { + promise: PromiseResolve(), + resolve: undefined, + reject: undefined, + }; + break; + case 'errored': + close = { + promise: PromiseReject(stream[kState].storedError), + resolve: undefined, + reject: undefined, + }; + setPromiseHandled(close.promise); + break; + } + } + reader[kState].close = close; } + return close; } function readableStreamDefaultReaderRelease(reader) { @@ -2312,17 +2397,17 @@ function readableStreamReaderGenericRelease(reader) { assert(stream !== undefined); assert(stream[kState].reader === reader); - const releasedStateError = lazyReadableReleasedError(); + const closeCache = reader[kState].close; if (stream[kState].state === 'readable') { - reader[kState].close.reject?.(releasedStateError); + if (closeCache !== undefined) { + closeCache.reject?.(lazyReadableReleasedError()); + setPromiseHandled(closeCache.promise); + } } else { - reader[kState].close = { - promise: PromiseReject(releasedStateError), - resolve: undefined, - reject: undefined, - }; + // The spec replaces [[closedPromise]] here. Dropping the cache makes + // the next observation derive the released rejection. + reader[kState].close = undefined; } - setPromiseHandled(reader[kState].close.promise); stream[kState].controller[kRelease](); @@ -2475,16 +2560,24 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) { } assert(!controller[kState].pullAgain); controller[kState].pulling = true; - PromisePrototypeThen( - controller[kState].pullAlgorithm(controller), - () => { + if (controller[kState].pullFulfilled === undefined) { + // The pull reaction closures only capture the controller, so they are + // created once on the first pull and reused for every subsequent pull + // instead of allocating two fresh closures per chunk. + controller[kState].pullFulfilled = () => { controller[kState].pulling = false; if (controller[kState].pullAgain) { controller[kState].pullAgain = false; readableStreamDefaultControllerCallPullIfNeeded(controller); } - }, - (error) => readableStreamDefaultControllerError(controller, error)); + }; + controller[kState].pullRejected = + (error) => readableStreamDefaultControllerError(controller, error); + } + PromisePrototypeThen( + controller[kState].pullAlgorithm(controller), + controller[kState].pullFulfilled, + controller[kState].pullRejected); } function readableStreamDefaultControllerClearAlgorithms(controller) { @@ -2547,6 +2640,8 @@ function setupReadableStreamDefaultController( pullAgain: false, pullAlgorithm, pulling: false, + pullFulfilled: undefined, + pullRejected: undefined, queue: [], queueTotalSize: 0, started: false, @@ -2554,10 +2649,24 @@ function setupReadableStreamDefaultController( stream, }; stream[kState].controller = controller; - stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller); const startResult = startAlgorithm(); + if (startResult === null || + (typeof startResult !== 'object' && typeof startResult !== 'function')) { + // Non-thenable start result: fulfillment is guaranteed and no .then + // lookup on the result is observable, so run the post-start step + // directly at the exact microtask position the promise reaction + // would have had, skipping two promise allocations. + queueMicrotask(() => { + controller[kState].started = true; + assert(!controller[kState].pulling); + assert(!controller[kState].pullAgain); + readableStreamDefaultControllerCallPullIfNeeded(controller); + }); + return; + } + PromisePrototypeThen( new Promise((r) => r(startResult)), () => { @@ -2582,10 +2691,10 @@ function setupReadableStreamDefaultControllerFromSource( FunctionPrototypeBind(start, source, controller) : nonOpStart; const pullAlgorithm = pull ? - createPromiseCallback('source.pull', pull, source) : + createPromiseCallback1('source.pull', pull, source) : nonOpPull; const cancelAlgorithm = cancel ? - createPromiseCallback('source.cancel', cancel, source) : + createPromiseCallback1('source.cancel', cancel, source) : nonOpCancel; setupReadableStreamDefaultController( @@ -3236,16 +3345,23 @@ function readableByteStreamControllerCallPullIfNeeded(controller) { } assert(!controller[kState].pullAgain); controller[kState].pulling = true; - PromisePrototypeThen( - controller[kState].pullAlgorithm(controller), - () => { + if (controller[kState].pullFulfilled === undefined) { + // See readableStreamDefaultControllerCallPullIfNeeded: created once, + // reused for every pull. + controller[kState].pullFulfilled = () => { controller[kState].pulling = false; if (controller[kState].pullAgain) { controller[kState].pullAgain = false; readableByteStreamControllerCallPullIfNeeded(controller); } - }, - (error) => readableByteStreamControllerError(controller, error)); + }; + controller[kState].pullRejected = + (error) => readableByteStreamControllerError(controller, error); + } + PromisePrototypeThen( + controller[kState].pullAlgorithm(controller), + controller[kState].pullFulfilled, + controller[kState].pullRejected); } function readableByteStreamControllerError(controller, error) { @@ -3367,6 +3483,8 @@ function setupReadableByteStreamController( closeRequested: false, pullAgain: false, pulling: false, + pullFulfilled: undefined, + pullRejected: undefined, started: false, stream, queue: [], @@ -3381,6 +3499,18 @@ function setupReadableByteStreamController( const startResult = startAlgorithm(); + if (startResult === null || + (typeof startResult !== 'object' && typeof startResult !== 'function')) { + // See setupReadableStreamDefaultController. + queueMicrotask(() => { + controller[kState].started = true; + assert(!controller[kState].pulling); + assert(!controller[kState].pullAgain); + readableByteStreamControllerCallPullIfNeeded(controller); + }); + return; + } + PromisePrototypeThen( new Promise((r) => r(startResult)), () => { @@ -3405,10 +3535,10 @@ function setupReadableByteStreamControllerFromSource( FunctionPrototypeBind(start, source, controller) : nonOpStart; const pullAlgorithm = pull ? - createPromiseCallback('source.pull', pull, source, controller) : + createPromiseCallback1('source.pull', pull, source) : nonOpPull; const cancelAlgorithm = cancel ? - createPromiseCallback('source.cancel', cancel, source) : + createPromiseCallback1('source.cancel', cancel, source) : nonOpCancel; if (autoAllocateChunkSize === 0) { diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 96d48b438d479c..9b54db000cf70c 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -43,7 +43,8 @@ const { } = require('internal/worker/js_transferable'); const { - createPromiseCallback, + createPromiseCallback1, + createPromiseCallback2, customInspect, extractHighWaterMark, extractSizeAlgorithm, @@ -462,13 +463,13 @@ function setupTransformStreamDefaultControllerFromTransformer( const flush = transformer?.flush; const cancel = transformer?.cancel; const transformAlgorithm = transform ? - createPromiseCallback('transformer.transform', transform, transformer) : + createPromiseCallback2('transformer.transform', transform, transformer) : defaultTransformAlgorithm; const flushAlgorithm = flush ? - createPromiseCallback('transformer.flush', flush, transformer) : + createPromiseCallback1('transformer.flush', flush, transformer) : nonOpFlush; const cancelAlgorithm = cancel ? - createPromiseCallback('transformer.cancel', cancel, transformer) : + createPromiseCallback1('transformer.cancel', cancel, transformer) : nonOpCancel; setupTransformStreamDefaultController( diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 808b0b069e57f7..fd8eb9e26a00b1 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -7,10 +7,12 @@ const { ArrayPrototypePush, ArrayPrototypeShift, AsyncIteratorPrototype, + FunctionPrototypeCall, MathMax, NumberIsNaN, PromisePrototypeThen, - ReflectApply, + PromiseReject, + PromiseResolve, ReflectGet, Symbol, Uint8Array, @@ -69,8 +71,12 @@ function extractHighWaterMark(value, defaultHWM) { return coercedValue; } +// The default size algorithm is never exposed to user code, so a single +// shared function avoids one closure allocation per stream. +const defaultSizeAlgorithm = () => 1; + function extractSizeAlgorithm(size) { - if (size === undefined) return () => 1; + if (size === undefined) return defaultSizeAlgorithm; validateFunction(size, 'strategy.size'); return size; } @@ -169,9 +175,25 @@ function enqueueValueWithSize(controller, value, size) { controller[kState].queueTotalSize += size; } -function createPromiseCallback(name, fn, thisArg) { +// Arity-specialized variants of the promise-callback wrapper. The generic +// rest-parameter + ReflectApply form allocated an arguments array on every +// invocation; these run on per-chunk hot paths (pull/write/transform), so +// each known call-site arity gets its own wrapper. The exact number of +// arguments passed through to the user callback is observable and must be +// preserved. +function createPromiseCallback0(name, fn, thisArg) { + validateFunction(fn, name); + return async () => FunctionPrototypeCall(fn, thisArg); +} + +function createPromiseCallback1(name, fn, thisArg) { + validateFunction(fn, name); + return async (arg) => FunctionPrototypeCall(fn, thisArg, arg); +} + +function createPromiseCallback2(name, fn, thisArg) { validateFunction(fn, name); - return async (...args) => ReflectApply(fn, thisArg, args); + return async (arg1, arg2) => FunctionPrototypeCall(fn, thisArg, arg1, arg2); } function isPromisePending(promise) { @@ -180,6 +202,26 @@ function isPromisePending(promise) { return details?.[0] === kPending; } +// Shared shapes for lazily-materialized { promise, resolve, reject } +// records whose settlement is already known. +function resolvedRecord() { + return { + promise: PromiseResolve(), + resolve: undefined, + reject: undefined, + }; +} + +function rejectedHandledRecord(error) { + const record = { + promise: PromiseReject(error), + resolve: undefined, + reject: undefined, + }; + setPromiseHandled(record.promise); + return record; +} + function setPromiseHandled(promise) { // Alternatively, we could use the native API // MarkAsHandled, but this avoids the extra boundary cross @@ -213,8 +255,11 @@ module.exports = { canCopyArrayBuffer, cloneAsUint8Array, copyArrayBuffer, - createPromiseCallback, + createPromiseCallback0, + createPromiseCallback1, + createPromiseCallback2, customInspect, + defaultSizeAlgorithm, dequeueValue, enqueueValueWithSize, extractHighWaterMark, @@ -231,6 +276,8 @@ module.exports = { nonOpStart, nonOpWrite, peekQueueValue, + rejectedHandledRecord, resetQueue, + resolvedRecord, setPromiseHandled, }; diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 8f62a199d5b301..b8334decb759c8 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -55,8 +55,11 @@ const { } = require('internal/worker/js_transferable'); const { - createPromiseCallback, + createPromiseCallback0, + createPromiseCallback1, + createPromiseCallback2, customInspect, + defaultSizeAlgorithm, dequeueValue, enqueueValueWithSize, extractHighWaterMark, @@ -71,7 +74,9 @@ const { nonOpStart, nonOpWrite, peekQueueValue, + rejectedHandledRecord, resetQueue, + resolvedRecord, setPromiseHandled, } = require('internal/webstreams/util'); @@ -86,6 +91,10 @@ const { AbortController, } = require('internal/abort_controller'); +const { + queueMicrotask, +} = require('internal/process/task_queues'); + const assert = require('internal/assert'); const kAbort = Symbol('kAbort'); @@ -93,6 +102,23 @@ const kCloseSentinel = Symbol('kCloseSentinel'); const kError = Symbol('kError'); const kSkipThrow = Symbol('kSkipThrow'); +// Shared sentinels for the "no pending request" state records. These +// records are only ever replaced wholesale and never mutated in place, +// so single shared instances are safe and avoid an allocation on every +// state reset (one per write on the hot path). +const kNilRequest = { + __proto__: null, + promise: undefined, + resolve: undefined, + reject: undefined, +}; +const kNilPendingAbortRequest = { + __proto__: null, + abort: kNilRequest, + reason: undefined, + wasAlreadyErroring: false, +}; + let releasedError; function lazyWritableReleasedError() { @@ -165,9 +191,6 @@ class WritableStream { this[kState] = createWritableStreamState(); - this[kIsClosedPromise] = PromiseWithResolvers(); - this[kControllerErrorFunction] = () => {}; - const size = extractSizeAlgorithm(strategy?.size); const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1); @@ -186,6 +209,32 @@ class WritableStream { return this[kState].state === 'writable'; } + [kControllerErrorFunction](error) { + // Used by the internal stream interop (addAbortSignal). + this[kState].controller.error(error); + } + + // Used by the internal stream interop (end-of-stream). Materialized + // lazily since its settlement is derivable from the stream state. + get [kIsClosedPromise]() { + let cache = this[kState].closedPromise; + if (cache === undefined) { + switch (this[kState].state) { + case 'writable': + case 'erroring': + cache = PromiseWithResolvers(); + break; + case 'closed': + cache = resolvedRecord(); + break; + default: + cache = rejectedHandledRecord(this[kState].storedError); + } + this[kState].closedPromise = cache; + } + return cache; + } + /** * @readonly * @type {boolean} @@ -290,7 +339,7 @@ class WritableStream { // https://github.com/nodejs/node/issues/51486 new transfer.CrossRealmTransformWritableSink(port, true), 1, - () => 1); + defaultSizeAlgorithm); } } @@ -307,8 +356,6 @@ function InternalTransferredWritableStream() { markTransferMode(this, false, true); this[kType] = 'WritableStream'; this[kState] = createWritableStreamState(); - - this[kIsClosedPromise] = PromiseWithResolvers(); } ObjectSetPrototypeOf(InternalTransferredWritableStream.prototype, WritableStream.prototype); @@ -335,16 +382,9 @@ class WritableStreamDefaultWriter { throw new ERR_INVALID_ARG_TYPE('stream', 'WritableStream', stream); this[kState] = { stream: undefined, - close: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - ready: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, + // Lazily materialized by writerClosedPromise()/writerReadyPromise(). + close: undefined, + ready: undefined, }; setupWritableStreamDefaultWriter(this, stream); } @@ -356,7 +396,7 @@ class WritableStreamDefaultWriter { get closed() { if (!isWritableStreamDefaultWriter(this)) return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter')); - return this[kState].close.promise; + return writerClosedPromise(this).promise; } /** @@ -380,7 +420,7 @@ class WritableStreamDefaultWriter { get ready() { if (!isWritableStreamDefaultWriter(this)) return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter')); - return this[kState].ready.promise; + return writerReadyPromise(this).promise; } /** @@ -449,8 +489,8 @@ class WritableStreamDefaultWriter { [kInspect](depth, options) { return customInspect(depth, options, this[kType], { stream: this[kState].stream, - close: this[kState].close.promise, - ready: this[kState].ready.promise, + close: writerClosedPromise(this).promise, + ready: writerReadyPromise(this).promise, desiredSize: this.desiredSize, }); } @@ -524,7 +564,6 @@ function InternalWritableStream(start, write, close, abort, highWaterMark, size) markTransferMode(this, false, true); this[kType] = 'WritableStream'; this[kState] = createWritableStreamState(); - this[kIsClosedPromise] = PromiseWithResolvers(); const controller = new WritableStreamDefaultController(kSkipThrow); setupWritableStreamDefaultController( @@ -542,7 +581,7 @@ function InternalWritableStream(start, write, close, abort, highWaterMark, size) ObjectSetPrototypeOf(InternalWritableStream.prototype, WritableStream.prototype); ObjectSetPrototypeOf(InternalWritableStream, WritableStream); -function createWritableStream(start, write, close, abort, highWaterMark = 1, size = () => 1) { +function createWritableStream(start, write, close, abort, highWaterMark = 1, size = defaultSizeAlgorithm) { const stream = new InternalWritableStream(start, write, close, abort, highWaterMark, size); // For spec compliance the InternalWritableStream must be a WritableStream @@ -560,36 +599,11 @@ const isWritableStreamDefaultController = function createWritableStreamState() { return { __proto__: null, - close: PromiseWithResolvers(), - closeRequest: { - __proto__: null, - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightWriteRequest: { - __proto__: null, - promise: undefined, - resolve: undefined, - reject: undefined, - }, - inFlightCloseRequest: { - __proto__: null, - promise: undefined, - resolve: undefined, - reject: undefined, - }, - pendingAbortRequest: { - __proto__: null, - abort: { - __proto__: null, - promise: undefined, - resolve: undefined, - reject: undefined, - }, - reason: undefined, - wasAlreadyErroring: false, - }, + closedPromise: undefined, + closeRequest: kNilRequest, + inFlightWriteRequest: kNilRequest, + inFlightCloseRequest: kNilRequest, + pendingAbortRequest: kNilPendingAbortRequest, backpressure: false, controller: undefined, state: 'writable', @@ -615,59 +629,68 @@ function setupWritableStreamDefaultWriter(writer, stream) { throw new ERR_INVALID_STATE.TypeError('WritableStream is locked'); writer[kState].stream = stream; stream[kState].writer = writer; - switch (stream[kState].state) { - case 'writable': - if (!writableStreamCloseQueuedOrInFlight(stream) && - stream[kState].backpressure) { - writer[kState].ready = PromiseWithResolvers(); - } else { - writer[kState].ready = { - promise: PromiseResolve(), - resolve: undefined, - reject: undefined, - }; + // The writer's [[readyPromise]] and [[closedPromise]] are materialized + // lazily by writerReadyPromise()/writerClosedPromise(); their settlement + // is fully derivable from the writer/stream state, so nothing is + // allocated here. +} + +// Materializes the writer's [[closedPromise]] record on first +// observation. While unobserved, its settlement is derivable: a detached +// writer was released (rejected with the shared released error); +// otherwise it follows the stream state. The settle sites only touch the +// record when it has been materialized. +function writerClosedPromise(writer) { + let close = writer[kState].close; + if (close === undefined) { + const stream = writer[kState].stream; + if (stream === undefined) { + close = rejectedHandledRecord(lazyWritableReleasedError()); + } else { + switch (stream[kState].state) { + case 'writable': + case 'erroring': + close = PromiseWithResolvers(); + break; + case 'closed': + close = resolvedRecord(); + break; + default: + close = rejectedHandledRecord(stream[kState].storedError); } - setClosedPromiseToNewPromise(); - break; - case 'erroring': - writer[kState].ready = { - promise: PromiseReject(stream[kState].storedError), - resolve: undefined, - reject: undefined, - }; - setPromiseHandled(writer[kState].ready.promise); - setClosedPromiseToNewPromise(); - break; - case 'closed': - writer[kState].ready = { - promise: PromiseResolve(), - resolve: undefined, - reject: undefined, - }; - writer[kState].close = { - promise: PromiseResolve(), - resolve: undefined, - reject: undefined, - }; - break; - default: - writer[kState].ready = { - promise: PromiseReject(stream[kState].storedError), - resolve: undefined, - reject: undefined, - }; - writer[kState].close = { - promise: PromiseReject(stream[kState].storedError), - resolve: undefined, - reject: undefined, - }; - setPromiseHandled(writer[kState].ready.promise); - setPromiseHandled(writer[kState].close.promise); + } + writer[kState].close = close; } + return close; +} - function setClosedPromiseToNewPromise() { - writer[kState].close = PromiseWithResolvers(); +// Same as writerClosedPromise(), for the writer's [[readyPromise]]. The +// pending epoch is derivable too: a writable stream with backpressure and +// no queued close has a pending ready promise. +function writerReadyPromise(writer) { + let ready = writer[kState].ready; + if (ready === undefined) { + const stream = writer[kState].stream; + if (stream === undefined) { + ready = rejectedHandledRecord(lazyWritableReleasedError()); + } else { + switch (stream[kState].state) { + case 'writable': + ready = !writableStreamCloseQueuedOrInFlight(stream) && + stream[kState].backpressure ? + PromiseWithResolvers() : + resolvedRecord(); + break; + case 'closed': + ready = resolvedRecord(); + break; + default: + ready = rejectedHandledRecord(stream[kState].storedError); + } + } + writer[kState].ready = ready; } + return ready; } function writableStreamAbort(stream, reason) { @@ -721,7 +744,7 @@ function writableStreamClose(stream) { stream[kState].closeRequest = PromiseWithResolvers(); const { promise } = stream[kState].closeRequest; if (writer !== undefined && backpressure && state === 'writable') - writer[kState].ready.resolve?.(); + writer[kState].ready?.resolve?.(); writableStreamDefaultControllerClose(controller); return promise; } @@ -734,9 +757,11 @@ function writableStreamUpdateBackpressure(stream, backpressure) { } = stream[kState]; if (writer !== undefined && stream[kState].backpressure !== backpressure) { if (backpressure) { - writer[kState].ready = PromiseWithResolvers(); + // The spec replaces [[readyPromise]] with a fresh pending promise; + // dropping the cache lets the next observation derive it. + writer[kState].ready = undefined; } else { - writer[kState].ready.resolve?.(); + writer[kState].ready?.resolve?.(); } } stream[kState].backpressure = backpressure; @@ -766,22 +791,24 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { if (stream[kState].closeRequest.promise !== undefined) { assert(stream[kState].inFlightCloseRequest.promise === undefined); stream[kState].closeRequest.reject?.(stream[kState].storedError); - stream[kState].closeRequest = { - promise: undefined, - reject: undefined, - resolve: undefined, - }; + stream[kState].closeRequest = kNilRequest; } - setPromiseHandled(stream[kIsClosedPromise].promise); - stream[kIsClosedPromise].reject(stream[kState]?.storedError); + const closedPromiseCache = stream[kState].closedPromise; + if (closedPromiseCache !== undefined) { + setPromiseHandled(closedPromiseCache.promise); + closedPromiseCache.reject?.(stream[kState].storedError); + } const { writer, } = stream[kState]; if (writer !== undefined) { - setPromiseHandled(writer[kState].close.promise); - writer[kState].close.reject?.(stream[kState].storedError); + const closeCache = writer[kState].close; + if (closeCache !== undefined) { + setPromiseHandled(closeCache.promise); + closeCache.reject?.(stream[kState].storedError); + } } } @@ -796,11 +823,7 @@ function writableStreamMarkCloseRequestInFlight(stream) { assert(stream[kState].inFlightWriteRequest.promise === undefined); assert(stream[kState].closeRequest.promise !== undefined); stream[kState].inFlightCloseRequest = stream[kState].closeRequest; - stream[kState].closeRequest = { - promise: undefined, - resolve: undefined, - reject: undefined, - }; + stream[kState].closeRequest = kNilRequest; } function writableStreamHasOperationMarkedInFlight(stream) { @@ -818,11 +841,7 @@ function writableStreamHasOperationMarkedInFlight(stream) { function writableStreamFinishInFlightWriteWithError(stream, error) { assert(stream[kState].inFlightWriteRequest.promise !== undefined); stream[kState].inFlightWriteRequest.reject?.(error); - stream[kState].inFlightWriteRequest = { - promise: undefined, - resolve: undefined, - reject: undefined, - }; + stream[kState].inFlightWriteRequest = kNilRequest; assert(stream[kState].state === 'writable' || stream[kState].state === 'erroring'); writableStreamDealWithRejection(stream, error); @@ -831,34 +850,18 @@ function writableStreamFinishInFlightWriteWithError(stream, error) { function writableStreamFinishInFlightWrite(stream) { assert(stream[kState].inFlightWriteRequest.promise !== undefined); stream[kState].inFlightWriteRequest.resolve?.(); - stream[kState].inFlightWriteRequest = { - promise: undefined, - resolve: undefined, - reject: undefined, - }; + stream[kState].inFlightWriteRequest = kNilRequest; } function writableStreamFinishInFlightCloseWithError(stream, error) { assert(stream[kState].inFlightCloseRequest.promise !== undefined); stream[kState].inFlightCloseRequest.reject?.(error); - stream[kState].inFlightCloseRequest = { - promise: undefined, - resolve: undefined, - reject: undefined, - }; + stream[kState].inFlightCloseRequest = kNilRequest; assert(stream[kState].state === 'writable' || stream[kState].state === 'erroring'); if (stream[kState].pendingAbortRequest.abort.promise !== undefined) { stream[kState].pendingAbortRequest.abort.reject?.(error); - stream[kState].pendingAbortRequest = { - abort: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - reason: undefined, - wasAlreadyErroring: false, - }; + stream[kState].pendingAbortRequest = kNilPendingAbortRequest; } writableStreamDealWithRejection(stream, error); } @@ -866,30 +869,18 @@ function writableStreamFinishInFlightCloseWithError(stream, error) { function writableStreamFinishInFlightClose(stream) { assert(stream[kState].inFlightCloseRequest.promise !== undefined); stream[kState].inFlightCloseRequest.resolve?.(); - stream[kState].inFlightCloseRequest = { - promise: undefined, - resolve: undefined, - reject: undefined, - }; + stream[kState].inFlightCloseRequest = kNilRequest; if (stream[kState].state === 'erroring') { stream[kState].storedError = undefined; if (stream[kState].pendingAbortRequest.abort.promise !== undefined) { stream[kState].pendingAbortRequest.abort.resolve?.(); - stream[kState].pendingAbortRequest = { - abort: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - reason: undefined, - wasAlreadyErroring: false, - }; + stream[kState].pendingAbortRequest = kNilPendingAbortRequest; } } stream[kState].state = 'closed'; if (stream[kState].writer !== undefined) - stream[kState].writer[kState].close.resolve?.(); - stream[kIsClosedPromise].resolve?.(); + stream[kState].writer[kState].close?.resolve?.(); + stream[kState].closedPromise?.resolve?.(); assert(stream[kState].pendingAbortRequest.abort.promise === undefined); assert(stream[kState].storedError === undefined); } @@ -910,15 +901,7 @@ function writableStreamFinishErroring(stream) { } const abortRequest = stream[kState].pendingAbortRequest; - stream[kState].pendingAbortRequest = { - abort: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - reason: undefined, - wasAlreadyErroring: false, - }; + stream[kState].pendingAbortRequest = kNilPendingAbortRequest; if (abortRequest.wasAlreadyErroring) { abortRequest.abort.reject?.(storedError); writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); @@ -960,19 +943,11 @@ function writableStreamCloseQueuedOrInFlight(stream) { function writableStreamAddWriteRequest(stream) { assert(isWritableStreamLocked(stream)); assert(stream[kState].state === 'writable'); - const { - promise, - resolve, - reject, - } = PromiseWithResolvers(); - ArrayPrototypePush( - stream[kState].writeRequests, - { - promise, - resolve, - reject, - }); - return promise; + // PromiseWithResolvers() already returns a { promise, resolve, reject } + // record, so push it as-is instead of rebuilding an identical object. + const writeRequest = PromiseWithResolvers(); + ArrayPrototypePush(stream[kState].writeRequests, writeRequest); + return writeRequest.promise; } function writableStreamDefaultWriterWrite(writer, chunk) { @@ -1042,29 +1017,27 @@ function writableStreamDefaultWriterGetDesiredSize(writer) { } function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) { - if (isPromisePending(writer[kState].ready.promise)) { - writer[kState].ready.reject?.(error); + const ready = writer[kState].ready; + if (ready !== undefined && isPromisePending(ready.promise)) { + ready.reject?.(error); + setPromiseHandled(ready.promise); } else { - writer[kState].ready = { - promise: PromiseReject(error), - resolve: undefined, - reject: undefined, - }; + // The spec replaces [[readyPromise]] with a promise rejected with the + // same error the post-release/post-erroring state derives; dropping + // the cache lets the next observation materialize it. + writer[kState].ready = undefined; } - setPromiseHandled(writer[kState].ready.promise); } function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) { - if (isPromisePending(writer[kState].close.promise)) { - writer[kState].close.reject?.(error); + const close = writer[kState].close; + if (close !== undefined && isPromisePending(close.promise)) { + close.reject?.(error); + setPromiseHandled(close.promise); } else { - writer[kState].close = { - promise: PromiseReject(error), - resolve: undefined, - reject: undefined, - }; + // See writableStreamDefaultWriterEnsureReadyPromiseRejected. + writer[kState].close = undefined; } - setPromiseHandled(writer[kState].close.promise); } function writableStreamDefaultWriterCloseWithErrorPropagation(writer) { @@ -1128,9 +1101,11 @@ function writableStreamDefaultControllerProcessWrite(controller, chunk) { } = controller[kState]; writableStreamMarkFirstWriteRequestInFlight(stream); - PromisePrototypeThen( - writeAlgorithm(chunk, controller), - () => { + if (controller[kState].writeFulfilled === undefined) { + // The write reaction closures only capture the stream and controller, + // so they are created once on the first write and reused for every + // subsequent write instead of allocating two fresh closures per chunk. + controller[kState].writeFulfilled = () => { writableStreamFinishInFlightWrite(stream); const { state, @@ -1144,13 +1119,18 @@ function writableStreamDefaultControllerProcessWrite(controller, chunk) { writableStreamDefaultControllerGetBackpressure(controller)); } writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); - }, - (error) => { + }; + controller[kState].writeRejected = (error) => { if (stream[kState].state === 'writable') writableStreamDefaultControllerClearAlgorithms(controller); writableStreamFinishInFlightWriteWithError(stream, error); - }); + }; + } + PromisePrototypeThen( + writeAlgorithm(chunk, controller), + controller[kState].writeFulfilled, + controller[kState].writeRejected); } function writableStreamDefaultControllerProcessClose(controller) { @@ -1272,13 +1252,13 @@ function setupWritableStreamDefaultControllerFromSink( FunctionPrototypeBind(start, sink, controller) : nonOpStart; const writeAlgorithm = write ? - createPromiseCallback('sink.write', write, sink) : + createPromiseCallback2('sink.write', write, sink) : nonOpWrite; const closeAlgorithm = close ? - createPromiseCallback('sink.close', close, sink) : + createPromiseCallback0('sink.close', close, sink) : nonOpCancel; const abortAlgorithm = abort ? - createPromiseCallback('sink.abort', abort, sink) : + createPromiseCallback1('sink.abort', abort, sink) : nonOpCancel; setupWritableStreamDefaultController( stream, @@ -1313,9 +1293,10 @@ function setupWritableStreamDefaultController( started: false, stream, writeAlgorithm, + writeFulfilled: undefined, + writeRejected: undefined, }; stream[kState].controller = controller; - stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller); writableStreamUpdateBackpressure( stream, @@ -1323,6 +1304,21 @@ function setupWritableStreamDefaultController( const startResult = startAlgorithm(); + if (startResult === null || + (typeof startResult !== 'object' && typeof startResult !== 'function')) { + // Non-thenable start result: fulfillment is guaranteed and no .then + // lookup on the result is observable, so run the post-start step + // directly at the exact microtask position the promise reaction + // would have had, skipping two promise allocations. + queueMicrotask(() => { + assert(stream[kState].state === 'writable' || + stream[kState].state === 'erroring'); + controller[kState].started = true; + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }); + return; + } + PromisePrototypeThen( new Promise((r) => r(startResult)), () => { @@ -1373,6 +1369,8 @@ module.exports = { writableStreamDefaultWriterGetDesiredSize, writableStreamDefaultWriterEnsureReadyPromiseRejected, writableStreamDefaultWriterEnsureClosedPromiseRejected, + writerClosedPromise, + writerReadyPromise, writableStreamDefaultWriterCloseWithErrorPropagation, writableStreamDefaultWriterClose, writableStreamDefaultWriterAbort, From 1d6ccc91298c2687fbbd8b60f86e394a357876d3 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 15 Jun 2026 09:23:58 +0000 Subject: [PATCH 2/2] webstreams: rename promise callback functions for clarity --- lib/internal/webstreams/readablestream.js | 10 +++++----- lib/internal/webstreams/transformstream.js | 10 +++++----- lib/internal/webstreams/util.js | 12 ++++++------ lib/internal/webstreams/writablestream.js | 12 ++++++------ 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 573f0c30e47146..ee496f30e5934e 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -104,7 +104,7 @@ const { canCopyArrayBuffer, cloneAsUint8Array, copyArrayBuffer, - createPromiseCallback1, + createPromiseCallback1Param, customInspect, defaultSizeAlgorithm, dequeueValue, @@ -2691,10 +2691,10 @@ function setupReadableStreamDefaultControllerFromSource( FunctionPrototypeBind(start, source, controller) : nonOpStart; const pullAlgorithm = pull ? - createPromiseCallback1('source.pull', pull, source) : + createPromiseCallback1Param('source.pull', pull, source) : nonOpPull; const cancelAlgorithm = cancel ? - createPromiseCallback1('source.cancel', cancel, source) : + createPromiseCallback1Param('source.cancel', cancel, source) : nonOpCancel; setupReadableStreamDefaultController( @@ -3535,10 +3535,10 @@ function setupReadableByteStreamControllerFromSource( FunctionPrototypeBind(start, source, controller) : nonOpStart; const pullAlgorithm = pull ? - createPromiseCallback1('source.pull', pull, source) : + createPromiseCallback1Param('source.pull', pull, source) : nonOpPull; const cancelAlgorithm = cancel ? - createPromiseCallback1('source.cancel', cancel, source) : + createPromiseCallback1Param('source.cancel', cancel, source) : nonOpCancel; if (autoAllocateChunkSize === 0) { diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index 9b54db000cf70c..5b1be9e3aa7a65 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -43,8 +43,8 @@ const { } = require('internal/worker/js_transferable'); const { - createPromiseCallback1, - createPromiseCallback2, + createPromiseCallback1Param, + createPromiseCallback2Params, customInspect, extractHighWaterMark, extractSizeAlgorithm, @@ -463,13 +463,13 @@ function setupTransformStreamDefaultControllerFromTransformer( const flush = transformer?.flush; const cancel = transformer?.cancel; const transformAlgorithm = transform ? - createPromiseCallback2('transformer.transform', transform, transformer) : + createPromiseCallback2Params('transformer.transform', transform, transformer) : defaultTransformAlgorithm; const flushAlgorithm = flush ? - createPromiseCallback1('transformer.flush', flush, transformer) : + createPromiseCallback1Param('transformer.flush', flush, transformer) : nonOpFlush; const cancelAlgorithm = cancel ? - createPromiseCallback1('transformer.cancel', cancel, transformer) : + createPromiseCallback1Param('transformer.cancel', cancel, transformer) : nonOpCancel; setupTransformStreamDefaultController( diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index fd8eb9e26a00b1..836e1de130505b 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -181,17 +181,17 @@ function enqueueValueWithSize(controller, value, size) { // each known call-site arity gets its own wrapper. The exact number of // arguments passed through to the user callback is observable and must be // preserved. -function createPromiseCallback0(name, fn, thisArg) { +function createPromiseCallbackNoParams(name, fn, thisArg) { validateFunction(fn, name); return async () => FunctionPrototypeCall(fn, thisArg); } -function createPromiseCallback1(name, fn, thisArg) { +function createPromiseCallback1Param(name, fn, thisArg) { validateFunction(fn, name); return async (arg) => FunctionPrototypeCall(fn, thisArg, arg); } -function createPromiseCallback2(name, fn, thisArg) { +function createPromiseCallback2Params(name, fn, thisArg) { validateFunction(fn, name); return async (arg1, arg2) => FunctionPrototypeCall(fn, thisArg, arg1, arg2); } @@ -255,9 +255,9 @@ module.exports = { canCopyArrayBuffer, cloneAsUint8Array, copyArrayBuffer, - createPromiseCallback0, - createPromiseCallback1, - createPromiseCallback2, + createPromiseCallbackNoParams, + createPromiseCallback1Param, + createPromiseCallback2Params, customInspect, defaultSizeAlgorithm, dequeueValue, diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index b8334decb759c8..bc7f58a05fc2b9 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -55,9 +55,9 @@ const { } = require('internal/worker/js_transferable'); const { - createPromiseCallback0, - createPromiseCallback1, - createPromiseCallback2, + createPromiseCallbackNoParams, + createPromiseCallback1Param, + createPromiseCallback2Params, customInspect, defaultSizeAlgorithm, dequeueValue, @@ -1252,13 +1252,13 @@ function setupWritableStreamDefaultControllerFromSink( FunctionPrototypeBind(start, sink, controller) : nonOpStart; const writeAlgorithm = write ? - createPromiseCallback2('sink.write', write, sink) : + createPromiseCallback2Params('sink.write', write, sink) : nonOpWrite; const closeAlgorithm = close ? - createPromiseCallback0('sink.close', close, sink) : + createPromiseCallbackNoParams('sink.close', close, sink) : nonOpCancel; const abortAlgorithm = abort ? - createPromiseCallback1('sink.abort', abort, sink) : + createPromiseCallback1Param('sink.abort', abort, sink) : nonOpCancel; setupWritableStreamDefaultController( stream,