// Ported from https://github.com/nodejs/undici/pull/907 'use strict' const assert = require('node:assert') const { Readable } = require('node:stream') const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require('../core/errors') const util = require('../core/util') const { ReadableStreamFrom } = require('../core/util') const kConsume = Symbol('kConsume') const kReading = Symbol('kReading') const kBody = Symbol('kBody') const kAbort = Symbol('kAbort') const kContentType = Symbol('kContentType') const kContentLength = Symbol('kContentLength') const kUsed = Symbol('kUsed') const kBytesRead = Symbol('kBytesRead') const noop = () => {} /** * @class * @extends {Readable} * @see https://fetch.spec.whatwg.org/#body */ class BodyReadable extends Readable { /** * @param {object} opts * @param {(this: Readable, size: number) => void} opts.resume * @param {() => (void | null)} opts.abort * @param {string} [opts.contentType = ''] * @param {number} [opts.contentLength] * @param {number} [opts.highWaterMark = 64 * 1024] */ constructor ({ resume, abort, contentType = '', contentLength, highWaterMark = 64 * 1024 // Same as nodejs fs streams. }) { super({ autoDestroy: true, read: resume, highWaterMark }) this._readableState.dataEmitted = false this[kAbort] = abort /** * @type {Consume | null} */ this[kConsume] = null this[kBytesRead] = 0 /** * @type {ReadableStream|null} */ this[kBody] = null this[kUsed] = false this[kContentType] = contentType this[kContentLength] = Number.isFinite(contentLength) ? contentLength : null // Is stream being consumed through Readable API? // This is an optimization so that we avoid checking // for 'data' and 'readable' listeners in the hot path // inside push(). this[kReading] = false } /** * @param {Error|null} err * @param {(error:(Error|null)) => void} callback * @returns {void} */ _destroy (err, callback) { if (!err && !this._readableState.endEmitted) { err = new RequestAbortedError() } if (err) { this[kAbort]() } // Workaround for Node "bug". If the stream is destroyed in same // tick as it is created, then a user who is waiting for a // promise (i.e micro tick) for installing an 'error' listener will // never get a chance and will always encounter an unhandled exception. if (!this[kUsed]) { setImmediate(() => { callback(err) }) } else { callback(err) } } /** * @param {string} event * @param {(...args: any[]) => void} listener * @returns {this} */ on (event, listener) { if (event === 'data' || event === 'readable') { this[kReading] = true this[kUsed] = true } return super.on(event, listener) } /** * @param {string} event * @param {(...args: any[]) => void} listener * @returns {this} */ addListener (event, listener) { return this.on(event, listener) } /** * @param {string|symbol} event * @param {(...args: any[]) => void} listener * @returns {this} */ off (event, listener) { const ret = super.off(event, listener) if (event === 'data' || event === 'readable') { this[kReading] = ( this.listenerCount('data') > 0 || this.listenerCount('readable') > 0 ) } return ret } /** * @param {string|symbol} event * @param {(...args: any[]) => void} listener * @returns {this} */ removeListener (event, listener) { return this.off(event, listener) } /** * @param {Buffer|null} chunk * @returns {boolean} */ push (chunk) { this[kBytesRead] += chunk ? chunk.length : 0 if (this[kConsume] && chunk !== null) { consumePush(this[kConsume], chunk) return this[kReading] ? super.push(chunk) : true } return super.push(chunk) } /** * Consumes and returns the body as a string. * * @see https://fetch.spec.whatwg.org/#dom-body-text * @returns {Promise} */ text () { return consume(this, 'text') } /** * Consumes and returns the body as a JavaScript Object. * * @see https://fetch.spec.whatwg.org/#dom-body-json * @returns {Promise} */ json () { return consume(this, 'json') } /** * Consumes and returns the body as a Blob * * @see https://fetch.spec.whatwg.org/#dom-body-blob * @returns {Promise} */ blob () { return consume(this, 'blob') } /** * Consumes and returns the body as an Uint8Array. * * @see https://fetch.spec.whatwg.org/#dom-body-bytes * @returns {Promise} */ bytes () { return consume(this, 'bytes') } /** * Consumes and returns the body as an ArrayBuffer. * * @see https://fetch.spec.whatwg.org/#dom-body-arraybuffer * @returns {Promise} */ arrayBuffer () { return consume(this, 'arrayBuffer') } /** * Not implemented * * @see https://fetch.spec.whatwg.org/#dom-body-formdata * @throws {NotSupportedError} */ async formData () { // TODO: Implement. throw new NotSupportedError() } /** * Returns true if the body is not null and the body has been consumed. * Otherwise, returns false. * * @see https://fetch.spec.whatwg.org/#dom-body-bodyused * @readonly * @returns {boolean} */ get bodyUsed () { return util.isDisturbed(this) } /** * @see https://fetch.spec.whatwg.org/#dom-body-body * @readonly * @returns {ReadableStream} */ get body () { if (!this[kBody]) { this[kBody] = ReadableStreamFrom(this) if (this[kConsume]) { // TODO: Is this the best way to force a lock? this[kBody].getReader() // Ensure stream is locked. assert(this[kBody].locked) } } return this[kBody] } /** * Dumps the response body by reading `limit` number of bytes. * @param {object} opts * @param {number} [opts.limit = 131072] Number of bytes to read. * @param {AbortSignal} [opts.signal] An AbortSignal to cancel the dump. * @returns {Promise} */ async dump (opts) { const signal = opts?.signal if (signal != null && (typeof signal !== 'object' || !('aborted' in signal))) { throw new InvalidArgumentError('signal must be an AbortSignal') } const limit = opts?.limit && Number.isFinite(opts.limit) ? opts.limit : 128 * 1024 signal?.throwIfAborted() if (this._readableState.closeEmitted) { return null } return await new Promise((resolve, reject) => { if ( (this[kContentLength] && (this[kContentLength] > limit)) || this[kBytesRead] > limit ) { this.destroy(new AbortError()) } if (signal) { const onAbort = () => { this.destroy(signal.reason ?? new AbortError()) } signal.addEventListener('abort', onAbort) this .on('close', function () { signal.removeEventListener('abort', onAbort) if (signal.aborted) { reject(signal.reason ?? new AbortError()) } else { resolve(null) } }) } else { this.on('close', resolve) } this .on('error', noop) .on('data', () => { if (this[kBytesRead] > limit) { this.destroy() } }) .resume() }) } /** * @param {BufferEncoding} encoding * @returns {this} */ setEncoding (encoding) { if (Buffer.isEncoding(encoding)) { this._readableState.encoding = encoding } return this } } /** * @see https://streams.spec.whatwg.org/#readablestream-locked * @param {BodyReadable} bodyReadable * @returns {boolean} */ function isLocked (bodyReadable) { // Consume is an implicit lock. return bodyReadable[kBody]?.locked === true || bodyReadable[kConsume] !== null } /** * @see https://fetch.spec.whatwg.org/#body-unusable * @param {BodyReadable} bodyReadable * @returns {boolean} */ function isUnusable (bodyReadable) { return util.isDisturbed(bodyReadable) || isLocked(bodyReadable) } /** * @typedef {object} Consume * @property {string} type * @property {BodyReadable} stream * @property {((value?: any) => void)} resolve * @property {((err: Error) => void)} reject * @property {number} length * @property {Buffer[]} body */ /** * @param {BodyReadable} stream * @param {string} type * @returns {Promise} */ function consume (stream, type) { assert(!stream[kConsume]) return new Promise((resolve, reject) => { if (isUnusable(stream)) { const rState = stream._readableState if (rState.destroyed && rState.closeEmitted === false) { stream .on('error', err => { reject(err) }) .on('close', () => { reject(new TypeError('unusable')) }) } else { reject(rState.errored ?? new TypeError('unusable')) } } else { queueMicrotask(() => { stream[kConsume] = { type, stream, resolve, reject, length: 0, body: [] } stream .on('error', function (err) { consumeFinish(this[kConsume], err) }) .on('close', function () { if (this[kConsume].body !== null) { consumeFinish(this[kConsume], new RequestAbortedError()) } }) consumeStart(stream[kConsume]) }) } }) } /** * @param {Consume} consume * @returns {void} */ function consumeStart (consume) { if (consume.body === null) { return } const { _readableState: state } = consume.stream if (state.bufferIndex) { const start = state.bufferIndex const end = state.buffer.length for (let n = start; n < end; n++) { consumePush(consume, state.buffer[n]) } } else { for (const chunk of state.buffer) { consumePush(consume, chunk) } } if (state.endEmitted) { consumeEnd(this[kConsume], this._readableState.encoding) } else { consume.stream.on('end', function () { consumeEnd(this[kConsume], this._readableState.encoding) }) } consume.stream.resume() while (consume.stream.read() != null) { // Loop } } /** * @param {Buffer[]} chunks * @param {number} length * @param {BufferEncoding} encoding * @returns {string} */ function chunksDecode (chunks, length, encoding) { if (chunks.length === 0 || length === 0) { return '' } const buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks, length) const bufferLength = buffer.length // Skip BOM. const start = bufferLength > 2 && buffer[0] === 0xef && buffer[1] === 0xbb && buffer[2] === 0xbf ? 3 : 0 if (!encoding || encoding === 'utf8' || encoding === 'utf-8') { return buffer.utf8Slice(start, bufferLength) } else { return buffer.subarray(start, bufferLength).toString(encoding) } } /** * @param {Buffer[]} chunks * @param {number} length * @returns {Uint8Array} */ function chunksConcat (chunks, length) { if (chunks.length === 0 || length === 0) { return new Uint8Array(0) } if (chunks.length === 1) { // fast-path return new Uint8Array(chunks[0]) } const buffer = new Uint8Array(Buffer.allocUnsafeSlow(length).buffer) let offset = 0 for (let i = 0; i < chunks.length; ++i) { const chunk = chunks[i] buffer.set(chunk, offset) offset += chunk.length } return buffer } /** * @param {Consume} consume * @param {BufferEncoding} encoding * @returns {void} */ function consumeEnd (consume, encoding) { const { type, body, resolve, stream, length } = consume try { if (type === 'text') { resolve(chunksDecode(body, length, encoding)) } else if (type === 'json') { resolve(JSON.parse(chunksDecode(body, length, encoding))) } else if (type === 'arrayBuffer') { resolve(chunksConcat(body, length).buffer) } else if (type === 'blob') { resolve(new Blob(body, { type: stream[kContentType] })) } else if (type === 'bytes') { resolve(chunksConcat(body, length)) } consumeFinish(consume) } catch (err) { stream.destroy(err) } } /** * @param {Consume} consume * @param {Buffer} chunk * @returns {void} */ function consumePush (consume, chunk) { consume.length += chunk.length consume.body.push(chunk) } /** * @param {Consume} consume * @param {Error} [err] * @returns {void} */ function consumeFinish (consume, err) { if (consume.body === null) { return } if (err) { consume.reject(err) } else { consume.resolve() } // Reset the consume object to allow for garbage collection. consume.type = null consume.stream = null consume.resolve = null consume.reject = null consume.length = 0 consume.body = null } module.exports = { Readable: BodyReadable, chunksDecode }