// Ported from https://github.com/nodejs/undici/pull/907 'use strict'; const assert = require('node:assert'); const { Readable } = require('node:stream'); const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError, } = require('../core/errors'); const util = require('../core/util'); const { ReadableStreamFrom } = require('../core/util'); const kConsume = Symbol('kConsume'); const kReading = Symbol('kReading'); const kBody = Symbol('kBody'); const kAbort = Symbol('kAbort'); const kContentType = Symbol('kContentType'); const kContentLength = Symbol('kContentLength'); const kUsed = Symbol('kUsed'); const kBytesRead = Symbol('kBytesRead'); const noop = () => {}; /** * @class * @extends {Readable} * @see https://fetch.spec.whatwg.org/#body */ class BodyReadable extends Readable { /** * @param {object} opts * @param {(this: Readable, size: number) => void} opts.resume * @param {() => (void | null)} opts.abort * @param {string} [opts.contentType = ''] * @param {number} [opts.contentLength] * @param {number} [opts.highWaterMark = 64 * 1024] */ constructor({ resume, abort, contentType = '', contentLength, highWaterMark = 64 * 1024, // Same as nodejs fs streams. }) { super({ autoDestroy: true, read: resume, highWaterMark, }); this._readableState.dataEmitted = false; this[kAbort] = abort; /** * @type {Consume | null} */ this[kConsume] = null; this[kBytesRead] = 0; /** * @type {ReadableStream|null} */ this[kBody] = null; this[kUsed] = false; this[kContentType] = contentType; this[kContentLength] = Number.isFinite(contentLength) ? contentLength : null; // Is stream being consumed through Readable API? // This is an optimization so that we avoid checking // for 'data' and 'readable' listeners in the hot path // inside push(). this[kReading] = false; } /** * @param {Error|null} err * @param {(error:(Error|null)) => void} callback * @returns {void} */ _destroy(err, callback) { if (!err && !this._readableState.endEmitted) { err = new RequestAbortedError(); } if (err) { this[kAbort](); } // Workaround for Node "bug". If the stream is destroyed in same // tick as it is created, then a user who is waiting for a // promise (i.e micro tick) for installing an 'error' listener will // never get a chance and will always encounter an unhandled exception. if (!this[kUsed]) { setImmediate(() => { callback(err); }); } else { callback(err); } } /** * @param {string} event * @param {(...args: any[]) => void} listener * @returns {this} */ on(event, listener) { if (event === 'data' || event === 'readable') { this[kReading] = true; this[kUsed] = true; } return super.on(event, listener); } /** * @param {string} event * @param {(...args: any[]) => void} listener * @returns {this} */ addListener(event, listener) { return this.on(event, listener); } /** * @param {string|symbol} event * @param {(...args: any[]) => void} listener * @returns {this} */ off(event, listener) { const ret = super.off(event, listener); if (event === 'data' || event === 'readable') { this[kReading] = this.listenerCount('data') > 0 || this.listenerCount('readable') > 0; } return ret; } /** * @param {string|symbol} event * @param {(...args: any[]) => void} listener * @returns {this} */ removeListener(event, listener) { return this.off(event, listener); } /** * @param {Buffer|null} chunk * @returns {boolean} */ push(chunk) { this[kBytesRead] += chunk ? chunk.length : 0; if (this[kConsume] && chunk !== null) { consumePush(this[kConsume], chunk); return this[kReading] ? super.push(chunk) : true; } return super.push(chunk); } /** * Consumes and returns the body as a string. * * @see https://fetch.spec.whatwg.org/#dom-body-text * @returns {Promise} */ text() { return consume(this, 'text'); } /** * Consumes and returns the body as a JavaScript Object. * * @see https://fetch.spec.whatwg.org/#dom-body-json * @returns {Promise} */ json() { return consume(this, 'json'); } /** * Consumes and returns the body as a Blob * * @see https://fetch.spec.whatwg.org/#dom-body-blob * @returns {Promise} */ blob() { return consume(this, 'blob'); } /** * Consumes and returns the body as an Uint8Array. * * @see https://fetch.spec.whatwg.org/#dom-body-bytes * @returns {Promise} */ bytes() { return consume(this, 'bytes'); } /** * Consumes and returns the body as an ArrayBuffer. * * @see https://fetch.spec.whatwg.org/#dom-body-arraybuffer * @returns {Promise} */ arrayBuffer() { return consume(this, 'arrayBuffer'); } /** * Not implemented * * @see https://fetch.spec.whatwg.org/#dom-body-formdata * @throws {NotSupportedError} */ async formData() { // TODO: Implement. throw new NotSupportedError(); } /** * Returns true if the body is not null and the body has been consumed. * Otherwise, returns false. * * @see https://fetch.spec.whatwg.org/#dom-body-bodyused * @readonly * @returns {boolean} */ get bodyUsed() { return util.isDisturbed(this); } /** * @see https://fetch.spec.whatwg.org/#dom-body-body * @readonly * @returns {ReadableStream} */ get body() { if (!this[kBody]) { this[kBody] = ReadableStreamFrom(this); if (this[kConsume]) { // TODO: Is this the best way to force a lock? this[kBody].getReader(); // Ensure stream is locked. assert(this[kBody].locked); } } return this[kBody]; } /** * Dumps the response body by reading `limit` number of bytes. * @param {object} opts * @param {number} [opts.limit = 131072] Number of bytes to read. * @param {AbortSignal} [opts.signal] An AbortSignal to cancel the dump. * @returns {Promise} */ async dump(opts) { const signal = opts?.signal; if ( signal != null && (typeof signal !== 'object' || !('aborted' in signal)) ) { throw new InvalidArgumentError('signal must be an AbortSignal'); } const limit = opts?.limit && Number.isFinite(opts.limit) ? opts.limit : 128 * 1024; signal?.throwIfAborted(); if (this._readableState.closeEmitted) { return null; } return await new Promise((resolve, reject) => { if ( (this[kContentLength] && this[kContentLength] > limit) || this[kBytesRead] > limit ) { this.destroy(new AbortError()); } if (signal) { const onAbort = () => { this.destroy(signal.reason ?? new AbortError()); }; signal.addEventListener('abort', onAbort); this.on('close', function () { signal.removeEventListener('abort', onAbort); if (signal.aborted) { reject(signal.reason ?? new AbortError()); } else { resolve(null); } }); } else { this.on('close', resolve); } this.on('error', noop) .on('data', () => { if (this[kBytesRead] > limit) { this.destroy(); } }) .resume(); }); } /** * @param {BufferEncoding} encoding * @returns {this} */ setEncoding(encoding) { if (Buffer.isEncoding(encoding)) { this._readableState.encoding = encoding; } return this; } } /** * @see https://streams.spec.whatwg.org/#readablestream-locked * @param {BodyReadable} bodyReadable * @returns {boolean} */ function isLocked(bodyReadable) { // Consume is an implicit lock. return ( bodyReadable[kBody]?.locked === true || bodyReadable[kConsume] !== null ); } /** * @see https://fetch.spec.whatwg.org/#body-unusable * @param {BodyReadable} bodyReadable * @returns {boolean} */ function isUnusable(bodyReadable) { return util.isDisturbed(bodyReadable) || isLocked(bodyReadable); } /** * @typedef {object} Consume * @property {string} type * @property {BodyReadable} stream * @property {((value?: any) => void)} resolve * @property {((err: Error) => void)} reject * @property {number} length * @property {Buffer[]} body */ /** * @param {BodyReadable} stream * @param {string} type * @returns {Promise} */ function consume(stream, type) { assert(!stream[kConsume]); return new Promise((resolve, reject) => { if (isUnusable(stream)) { const rState = stream._readableState; if (rState.destroyed && rState.closeEmitted === false) { stream .on('error', (err) => { reject(err); }) .on('close', () => { reject(new TypeError('unusable')); }); } else { reject(rState.errored ?? new TypeError('unusable')); } } else { queueMicrotask(() => { stream[kConsume] = { type, stream, resolve, reject, length: 0, body: [], }; stream .on('error', function (err) { consumeFinish(this[kConsume], err); }) .on('close', function () { if (this[kConsume].body !== null) { consumeFinish(this[kConsume], new RequestAbortedError()); } }); consumeStart(stream[kConsume]); }); } }); } /** * @param {Consume} consume * @returns {void} */ function consumeStart(consume) { if (consume.body === null) { return; } const { _readableState: state } = consume.stream; if (state.bufferIndex) { const start = state.bufferIndex; const end = state.buffer.length; for (let n = start; n < end; n++) { consumePush(consume, state.buffer[n]); } } else { for (const chunk of state.buffer) { consumePush(consume, chunk); } } if (state.endEmitted) { consumeEnd(this[kConsume], this._readableState.encoding); } else { consume.stream.on('end', function () { consumeEnd(this[kConsume], this._readableState.encoding); }); } consume.stream.resume(); while (consume.stream.read() != null) { // Loop } } /** * @param {Buffer[]} chunks * @param {number} length * @param {BufferEncoding} encoding * @returns {string} */ function chunksDecode(chunks, length, encoding) { if (chunks.length === 0 || length === 0) { return ''; } const buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks, length); const bufferLength = buffer.length; // Skip BOM. const start = ( bufferLength > 2 && buffer[0] === 0xef && buffer[1] === 0xbb && buffer[2] === 0xbf ) ? 3 : 0; if (!encoding || encoding === 'utf8' || encoding === 'utf-8') { return buffer.utf8Slice(start, bufferLength); } else { return buffer.subarray(start, bufferLength).toString(encoding); } } /** * @param {Buffer[]} chunks * @param {number} length * @returns {Uint8Array} */ function chunksConcat(chunks, length) { if (chunks.length === 0 || length === 0) { return new Uint8Array(0); } if (chunks.length === 1) { // fast-path return new Uint8Array(chunks[0]); } const buffer = new Uint8Array(Buffer.allocUnsafeSlow(length).buffer); let offset = 0; for (let i = 0; i < chunks.length; ++i) { const chunk = chunks[i]; buffer.set(chunk, offset); offset += chunk.length; } return buffer; } /** * @param {Consume} consume * @param {BufferEncoding} encoding * @returns {void} */ function consumeEnd(consume, encoding) { const { type, body, resolve, stream, length } = consume; try { if (type === 'text') { resolve(chunksDecode(body, length, encoding)); } else if (type === 'json') { resolve(JSON.parse(chunksDecode(body, length, encoding))); } else if (type === 'arrayBuffer') { resolve(chunksConcat(body, length).buffer); } else if (type === 'blob') { resolve(new Blob(body, { type: stream[kContentType] })); } else if (type === 'bytes') { resolve(chunksConcat(body, length)); } consumeFinish(consume); } catch (err) { stream.destroy(err); } } /** * @param {Consume} consume * @param {Buffer} chunk * @returns {void} */ function consumePush(consume, chunk) { consume.length += chunk.length; consume.body.push(chunk); } /** * @param {Consume} consume * @param {Error} [err] * @returns {void} */ function consumeFinish(consume, err) { if (consume.body === null) { return; } if (err) { consume.reject(err); } else { consume.resolve(); } // Reset the consume object to allow for garbage collection. consume.type = null; consume.stream = null; consume.resolve = null; consume.reject = null; consume.length = 0; consume.body = null; } module.exports = { Readable: BodyReadable, chunksDecode, };