2025-04-02 06:50:39 -04:00

568 lines
13 KiB
JavaScript

// Ported from https://github.com/nodejs/undici/pull/907
'use strict';
const assert = require('node:assert');
const { Readable } = require('node:stream');
const {
RequestAbortedError,
NotSupportedError,
InvalidArgumentError,
AbortError,
} = require('../core/errors');
const util = require('../core/util');
const { ReadableStreamFrom } = require('../core/util');
const kConsume = Symbol('kConsume');
const kReading = Symbol('kReading');
const kBody = Symbol('kBody');
const kAbort = Symbol('kAbort');
const kContentType = Symbol('kContentType');
const kContentLength = Symbol('kContentLength');
const kUsed = Symbol('kUsed');
const kBytesRead = Symbol('kBytesRead');
const noop = () => {};
/**
* @class
* @extends {Readable}
* @see https://fetch.spec.whatwg.org/#body
*/
class BodyReadable extends Readable {
/**
* @param {object} opts
* @param {(this: Readable, size: number) => void} opts.resume
* @param {() => (void | null)} opts.abort
* @param {string} [opts.contentType = '']
* @param {number} [opts.contentLength]
* @param {number} [opts.highWaterMark = 64 * 1024]
*/
constructor({
resume,
abort,
contentType = '',
contentLength,
highWaterMark = 64 * 1024, // Same as nodejs fs streams.
}) {
super({
autoDestroy: true,
read: resume,
highWaterMark,
});
this._readableState.dataEmitted = false;
this[kAbort] = abort;
/**
* @type {Consume | null}
*/
this[kConsume] = null;
this[kBytesRead] = 0;
/**
* @type {ReadableStream|null}
*/
this[kBody] = null;
this[kUsed] = false;
this[kContentType] = contentType;
this[kContentLength] =
Number.isFinite(contentLength) ? contentLength : null;
// Is stream being consumed through Readable API?
// This is an optimization so that we avoid checking
// for 'data' and 'readable' listeners in the hot path
// inside push().
this[kReading] = false;
}
/**
* @param {Error|null} err
* @param {(error:(Error|null)) => void} callback
* @returns {void}
*/
_destroy(err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError();
}
if (err) {
this[kAbort]();
}
// Workaround for Node "bug". If the stream is destroyed in same
// tick as it is created, then a user who is waiting for a
// promise (i.e micro tick) for installing an 'error' listener will
// never get a chance and will always encounter an unhandled exception.
if (!this[kUsed]) {
setImmediate(() => {
callback(err);
});
} else {
callback(err);
}
}
/**
* @param {string} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
on(event, listener) {
if (event === 'data' || event === 'readable') {
this[kReading] = true;
this[kUsed] = true;
}
return super.on(event, listener);
}
/**
* @param {string} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
addListener(event, listener) {
return this.on(event, listener);
}
/**
* @param {string|symbol} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
off(event, listener) {
const ret = super.off(event, listener);
if (event === 'data' || event === 'readable') {
this[kReading] =
this.listenerCount('data') > 0 || this.listenerCount('readable') > 0;
}
return ret;
}
/**
* @param {string|symbol} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
removeListener(event, listener) {
return this.off(event, listener);
}
/**
* @param {Buffer|null} chunk
* @returns {boolean}
*/
push(chunk) {
this[kBytesRead] += chunk ? chunk.length : 0;
if (this[kConsume] && chunk !== null) {
consumePush(this[kConsume], chunk);
return this[kReading] ? super.push(chunk) : true;
}
return super.push(chunk);
}
/**
* Consumes and returns the body as a string.
*
* @see https://fetch.spec.whatwg.org/#dom-body-text
* @returns {Promise<string>}
*/
text() {
return consume(this, 'text');
}
/**
* Consumes and returns the body as a JavaScript Object.
*
* @see https://fetch.spec.whatwg.org/#dom-body-json
* @returns {Promise<unknown>}
*/
json() {
return consume(this, 'json');
}
/**
* Consumes and returns the body as a Blob
*
* @see https://fetch.spec.whatwg.org/#dom-body-blob
* @returns {Promise<Blob>}
*/
blob() {
return consume(this, 'blob');
}
/**
* Consumes and returns the body as an Uint8Array.
*
* @see https://fetch.spec.whatwg.org/#dom-body-bytes
* @returns {Promise<Uint8Array>}
*/
bytes() {
return consume(this, 'bytes');
}
/**
* Consumes and returns the body as an ArrayBuffer.
*
* @see https://fetch.spec.whatwg.org/#dom-body-arraybuffer
* @returns {Promise<ArrayBuffer>}
*/
arrayBuffer() {
return consume(this, 'arrayBuffer');
}
/**
* Not implemented
*
* @see https://fetch.spec.whatwg.org/#dom-body-formdata
* @throws {NotSupportedError}
*/
async formData() {
// TODO: Implement.
throw new NotSupportedError();
}
/**
* Returns true if the body is not null and the body has been consumed.
* Otherwise, returns false.
*
* @see https://fetch.spec.whatwg.org/#dom-body-bodyused
* @readonly
* @returns {boolean}
*/
get bodyUsed() {
return util.isDisturbed(this);
}
/**
* @see https://fetch.spec.whatwg.org/#dom-body-body
* @readonly
* @returns {ReadableStream}
*/
get body() {
if (!this[kBody]) {
this[kBody] = ReadableStreamFrom(this);
if (this[kConsume]) {
// TODO: Is this the best way to force a lock?
this[kBody].getReader(); // Ensure stream is locked.
assert(this[kBody].locked);
}
}
return this[kBody];
}
/**
* Dumps the response body by reading `limit` number of bytes.
* @param {object} opts
* @param {number} [opts.limit = 131072] Number of bytes to read.
* @param {AbortSignal} [opts.signal] An AbortSignal to cancel the dump.
* @returns {Promise<null>}
*/
async dump(opts) {
const signal = opts?.signal;
if (
signal != null &&
(typeof signal !== 'object' || !('aborted' in signal))
) {
throw new InvalidArgumentError('signal must be an AbortSignal');
}
const limit =
opts?.limit && Number.isFinite(opts.limit) ? opts.limit : 128 * 1024;
signal?.throwIfAborted();
if (this._readableState.closeEmitted) {
return null;
}
return await new Promise((resolve, reject) => {
if (
(this[kContentLength] && this[kContentLength] > limit) ||
this[kBytesRead] > limit
) {
this.destroy(new AbortError());
}
if (signal) {
const onAbort = () => {
this.destroy(signal.reason ?? new AbortError());
};
signal.addEventListener('abort', onAbort);
this.on('close', function () {
signal.removeEventListener('abort', onAbort);
if (signal.aborted) {
reject(signal.reason ?? new AbortError());
} else {
resolve(null);
}
});
} else {
this.on('close', resolve);
}
this.on('error', noop)
.on('data', () => {
if (this[kBytesRead] > limit) {
this.destroy();
}
})
.resume();
});
}
/**
* @param {BufferEncoding} encoding
* @returns {this}
*/
setEncoding(encoding) {
if (Buffer.isEncoding(encoding)) {
this._readableState.encoding = encoding;
}
return this;
}
}
/**
* @see https://streams.spec.whatwg.org/#readablestream-locked
* @param {BodyReadable} bodyReadable
* @returns {boolean}
*/
function isLocked(bodyReadable) {
// Consume is an implicit lock.
return (
bodyReadable[kBody]?.locked === true || bodyReadable[kConsume] !== null
);
}
/**
* @see https://fetch.spec.whatwg.org/#body-unusable
* @param {BodyReadable} bodyReadable
* @returns {boolean}
*/
function isUnusable(bodyReadable) {
return util.isDisturbed(bodyReadable) || isLocked(bodyReadable);
}
/**
* @typedef {object} Consume
* @property {string} type
* @property {BodyReadable} stream
* @property {((value?: any) => void)} resolve
* @property {((err: Error) => void)} reject
* @property {number} length
* @property {Buffer[]} body
*/
/**
* @param {BodyReadable} stream
* @param {string} type
* @returns {Promise<any>}
*/
function consume(stream, type) {
assert(!stream[kConsume]);
return new Promise((resolve, reject) => {
if (isUnusable(stream)) {
const rState = stream._readableState;
if (rState.destroyed && rState.closeEmitted === false) {
stream
.on('error', (err) => {
reject(err);
})
.on('close', () => {
reject(new TypeError('unusable'));
});
} else {
reject(rState.errored ?? new TypeError('unusable'));
}
} else {
queueMicrotask(() => {
stream[kConsume] = {
type,
stream,
resolve,
reject,
length: 0,
body: [],
};
stream
.on('error', function (err) {
consumeFinish(this[kConsume], err);
})
.on('close', function () {
if (this[kConsume].body !== null) {
consumeFinish(this[kConsume], new RequestAbortedError());
}
});
consumeStart(stream[kConsume]);
});
}
});
}
/**
* @param {Consume} consume
* @returns {void}
*/
function consumeStart(consume) {
if (consume.body === null) {
return;
}
const { _readableState: state } = consume.stream;
if (state.bufferIndex) {
const start = state.bufferIndex;
const end = state.buffer.length;
for (let n = start; n < end; n++) {
consumePush(consume, state.buffer[n]);
}
} else {
for (const chunk of state.buffer) {
consumePush(consume, chunk);
}
}
if (state.endEmitted) {
consumeEnd(this[kConsume], this._readableState.encoding);
} else {
consume.stream.on('end', function () {
consumeEnd(this[kConsume], this._readableState.encoding);
});
}
consume.stream.resume();
while (consume.stream.read() != null) {
// Loop
}
}
/**
* @param {Buffer[]} chunks
* @param {number} length
* @param {BufferEncoding} encoding
* @returns {string}
*/
function chunksDecode(chunks, length, encoding) {
if (chunks.length === 0 || length === 0) {
return '';
}
const buffer =
chunks.length === 1 ? chunks[0] : Buffer.concat(chunks, length);
const bufferLength = buffer.length;
// Skip BOM.
const start =
(
bufferLength > 2 &&
buffer[0] === 0xef &&
buffer[1] === 0xbb &&
buffer[2] === 0xbf
) ?
3
: 0;
if (!encoding || encoding === 'utf8' || encoding === 'utf-8') {
return buffer.utf8Slice(start, bufferLength);
} else {
return buffer.subarray(start, bufferLength).toString(encoding);
}
}
/**
* @param {Buffer[]} chunks
* @param {number} length
* @returns {Uint8Array}
*/
function chunksConcat(chunks, length) {
if (chunks.length === 0 || length === 0) {
return new Uint8Array(0);
}
if (chunks.length === 1) {
// fast-path
return new Uint8Array(chunks[0]);
}
const buffer = new Uint8Array(Buffer.allocUnsafeSlow(length).buffer);
let offset = 0;
for (let i = 0; i < chunks.length; ++i) {
const chunk = chunks[i];
buffer.set(chunk, offset);
offset += chunk.length;
}
return buffer;
}
/**
* @param {Consume} consume
* @param {BufferEncoding} encoding
* @returns {void}
*/
function consumeEnd(consume, encoding) {
const { type, body, resolve, stream, length } = consume;
try {
if (type === 'text') {
resolve(chunksDecode(body, length, encoding));
} else if (type === 'json') {
resolve(JSON.parse(chunksDecode(body, length, encoding)));
} else if (type === 'arrayBuffer') {
resolve(chunksConcat(body, length).buffer);
} else if (type === 'blob') {
resolve(new Blob(body, { type: stream[kContentType] }));
} else if (type === 'bytes') {
resolve(chunksConcat(body, length));
}
consumeFinish(consume);
} catch (err) {
stream.destroy(err);
}
}
/**
* @param {Consume} consume
* @param {Buffer} chunk
* @returns {void}
*/
function consumePush(consume, chunk) {
consume.length += chunk.length;
consume.body.push(chunk);
}
/**
* @param {Consume} consume
* @param {Error} [err]
* @returns {void}
*/
function consumeFinish(consume, err) {
if (consume.body === null) {
return;
}
if (err) {
consume.reject(err);
} else {
consume.resolve();
}
// Reset the consume object to allow for garbage collection.
consume.type = null;
consume.stream = null;
consume.resolve = null;
consume.reject = null;
consume.length = 0;
consume.body = null;
}
module.exports = {
Readable: BodyReadable,
chunksDecode,
};