'use strict'; /* global WebAssembly */ const assert = require('node:assert'); const util = require('../core/util.js'); const { channels } = require('../core/diagnostics.js'); const timers = require('../util/timers.js'); const { RequestContentLengthMismatchError, ResponseContentLengthMismatchError, RequestAbortedError, HeadersTimeoutError, HeadersOverflowError, SocketError, InformationalError, BodyTimeoutError, HTTPParserError, ResponseExceededMaxSizeError, } = require('../core/errors.js'); const { kUrl, kReset, kClient, kParser, kBlocking, kRunning, kPending, kSize, kWriting, kQueue, kNoRef, kKeepAliveDefaultTimeout, kHostHeader, kPendingIdx, kRunningIdx, kError, kPipelining, kSocket, kKeepAliveTimeoutValue, kMaxHeadersSize, kKeepAliveMaxTimeout, kKeepAliveTimeoutThreshold, kHeadersTimeout, kBodyTimeout, kStrictContentLength, kMaxRequests, kCounter, kMaxResponseSize, kOnError, kResume, kHTTPContext, kClosed, } = require('../core/symbols.js'); const constants = require('../llhttp/constants.js'); const EMPTY_BUF = Buffer.alloc(0); const FastBuffer = Buffer[Symbol.species]; const removeAllListeners = util.removeAllListeners; let extractBody; async function lazyllhttp() { const llhttpWasmData = process.env.JEST_WORKER_ID ? require('../llhttp/llhttp-wasm.js') : undefined; let mod; try { mod = await WebAssembly.compile(require('../llhttp/llhttp_simd-wasm.js')); } catch (e) { /* istanbul ignore next */ // We could check if the error was caused by the simd option not // being enabled, but the occurring of this other error // * https://github.com/emscripten-core/emscripten/issues/11495 // got me to remove that check to avoid breaking Node 12. mod = await WebAssembly.compile( llhttpWasmData || require('../llhttp/llhttp-wasm.js') ); } return await WebAssembly.instantiate(mod, { env: { /** * @param {number} p * @param {number} at * @param {number} len * @returns {number} */ wasm_on_url: (p, at, len) => { /* istanbul ignore next */ return 0; }, /** * @param {number} p * @param {number} at * @param {number} len * @returns {number} */ wasm_on_status: (p, at, len) => { assert(currentParser.ptr === p); const start = at - currentBufferPtr + currentBufferRef.byteOffset; return currentParser.onStatus( new FastBuffer(currentBufferRef.buffer, start, len) ); }, /** * @param {number} p * @returns {number} */ wasm_on_message_begin: (p) => { assert(currentParser.ptr === p); return currentParser.onMessageBegin(); }, /** * @param {number} p * @param {number} at * @param {number} len * @returns {number} */ wasm_on_header_field: (p, at, len) => { assert(currentParser.ptr === p); const start = at - currentBufferPtr + currentBufferRef.byteOffset; return currentParser.onHeaderField( new FastBuffer(currentBufferRef.buffer, start, len) ); }, /** * @param {number} p * @param {number} at * @param {number} len * @returns {number} */ wasm_on_header_value: (p, at, len) => { assert(currentParser.ptr === p); const start = at - currentBufferPtr + currentBufferRef.byteOffset; return currentParser.onHeaderValue( new FastBuffer(currentBufferRef.buffer, start, len) ); }, /** * @param {number} p * @param {number} statusCode * @param {0|1} upgrade * @param {0|1} shouldKeepAlive * @returns {number} */ wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => { assert(currentParser.ptr === p); return currentParser.onHeadersComplete( statusCode, upgrade === 1, shouldKeepAlive === 1 ); }, /** * @param {number} p * @param {number} at * @param {number} len * @returns {number} */ wasm_on_body: (p, at, len) => { assert(currentParser.ptr === p); const start = at - currentBufferPtr + currentBufferRef.byteOffset; return currentParser.onBody( new FastBuffer(currentBufferRef.buffer, start, len) ); }, /** * @param {number} p * @returns {number} */ wasm_on_message_complete: (p) => { assert(currentParser.ptr === p); return currentParser.onMessageComplete(); }, }, }); } let llhttpInstance = null; /** * @type {Promise|null} */ let llhttpPromise = lazyllhttp(); llhttpPromise.catch(); /** * @type {Parser|null} */ let currentParser = null; let currentBufferRef = null; /** * @type {number} */ let currentBufferSize = 0; let currentBufferPtr = null; const USE_NATIVE_TIMER = 0; const USE_FAST_TIMER = 1; // Use fast timers for headers and body to take eventual event loop // latency into account. const TIMEOUT_HEADERS = 2 | USE_FAST_TIMER; const TIMEOUT_BODY = 4 | USE_FAST_TIMER; // Use native timers to ignore event loop latency for keep-alive // handling. const TIMEOUT_KEEP_ALIVE = 8 | USE_NATIVE_TIMER; class Parser { /** * @param {import('./client.js')} client * @param {import('net').Socket} socket * @param {*} llhttp */ constructor(client, socket, { exports }) { this.llhttp = exports; this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE); this.client = client; /** * @type {import('net').Socket} */ this.socket = socket; this.timeout = null; this.timeoutValue = null; this.timeoutType = null; this.statusCode = 0; this.statusText = ''; this.upgrade = false; this.headers = []; this.headersSize = 0; this.headersMaxSize = client[kMaxHeadersSize]; this.shouldKeepAlive = false; this.paused = false; this.resume = this.resume.bind(this); this.bytesRead = 0; this.keepAlive = ''; this.contentLength = ''; this.connection = ''; this.maxResponseSize = client[kMaxResponseSize]; } setTimeout(delay, type) { // If the existing timer and the new timer are of different timer type // (fast or native) or have different delay, we need to clear the existing // timer and set a new one. if ( delay !== this.timeoutValue || (type & USE_FAST_TIMER) ^ (this.timeoutType & USE_FAST_TIMER) ) { // If a timeout is already set, clear it with clearTimeout of the fast // timer implementation, as it can clear fast and native timers. if (this.timeout) { timers.clearTimeout(this.timeout); this.timeout = null; } if (delay) { if (type & USE_FAST_TIMER) { this.timeout = timers.setFastTimeout( onParserTimeout, delay, new WeakRef(this) ); } else { this.timeout = setTimeout(onParserTimeout, delay, new WeakRef(this)); this.timeout.unref(); } } this.timeoutValue = delay; } else if (this.timeout) { // istanbul ignore else: only for jest if (this.timeout.refresh) { this.timeout.refresh(); } } this.timeoutType = type; } resume() { if (this.socket.destroyed || !this.paused) { return; } assert(this.ptr != null); assert(currentParser === null); this.llhttp.llhttp_resume(this.ptr); assert(this.timeoutType === TIMEOUT_BODY); if (this.timeout) { // istanbul ignore else: only for jest if (this.timeout.refresh) { this.timeout.refresh(); } } this.paused = false; this.execute(this.socket.read() || EMPTY_BUF); // Flush parser. this.readMore(); } readMore() { while (!this.paused && this.ptr) { const chunk = this.socket.read(); if (chunk === null) { break; } this.execute(chunk); } } /** * @param {Buffer} chunk */ execute(chunk) { assert(currentParser === null); assert(this.ptr != null); assert(!this.paused); const { socket, llhttp } = this; // Allocate a new buffer if the current buffer is too small. if (chunk.length > currentBufferSize) { if (currentBufferPtr) { llhttp.free(currentBufferPtr); } // Allocate a buffer that is a multiple of 4096 bytes. currentBufferSize = Math.ceil(chunk.length / 4096) * 4096; currentBufferPtr = llhttp.malloc(currentBufferSize); } new Uint8Array( llhttp.memory.buffer, currentBufferPtr, currentBufferSize ).set(chunk); // Call `execute` on the wasm parser. // We pass the `llhttp_parser` pointer address, the pointer address of buffer view data, // and finally the length of bytes to parse. // The return value is an error code or `constants.ERROR.OK`. try { let ret; try { currentBufferRef = chunk; currentParser = this; ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, chunk.length); /* eslint-disable-next-line no-useless-catch */ } catch (err) { /* istanbul ignore next: difficult to make a test case for */ throw err; } finally { currentParser = null; currentBufferRef = null; } if (ret !== constants.ERROR.OK) { const data = chunk.subarray( llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr ); if (ret === constants.ERROR.PAUSED_UPGRADE) { this.onUpgrade(data); } else if (ret === constants.ERROR.PAUSED) { this.paused = true; socket.unshift(data); } else { const ptr = llhttp.llhttp_get_error_reason(this.ptr); let message = ''; /* istanbul ignore else: difficult to make a test case for */ if (ptr) { const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0); message = 'Response does not match the HTTP/1.1 protocol (' + Buffer.from(llhttp.memory.buffer, ptr, len).toString() + ')'; } throw new HTTPParserError(message, constants.ERROR[ret], data); } } } catch (err) { util.destroy(socket, err); } } destroy() { assert(currentParser === null); assert(this.ptr != null); this.llhttp.llhttp_free(this.ptr); this.ptr = null; this.timeout && timers.clearTimeout(this.timeout); this.timeout = null; this.timeoutValue = null; this.timeoutType = null; this.paused = false; } /** * @param {Buffer} buf * @returns {0} */ onStatus(buf) { this.statusText = buf.toString(); return 0; } /** * @returns {0|-1} */ onMessageBegin() { const { socket, client } = this; /* istanbul ignore next: difficult to make a test case for */ if (socket.destroyed) { return -1; } const request = client[kQueue][client[kRunningIdx]]; if (!request) { return -1; } request.onResponseStarted(); return 0; } /** * @param {Buffer} buf * @returns {number} */ onHeaderField(buf) { const len = this.headers.length; if ((len & 1) === 0) { this.headers.push(buf); } else { this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf]); } this.trackHeader(buf.length); return 0; } /** * @param {Buffer} buf * @returns {number} */ onHeaderValue(buf) { let len = this.headers.length; if ((len & 1) === 1) { this.headers.push(buf); len += 1; } else { this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf]); } const key = this.headers[len - 2]; if (key.length === 10) { const headerName = util.bufferToLowerCasedHeaderName(key); if (headerName === 'keep-alive') { this.keepAlive += buf.toString(); } else if (headerName === 'connection') { this.connection += buf.toString(); } } else if ( key.length === 14 && util.bufferToLowerCasedHeaderName(key) === 'content-length' ) { this.contentLength += buf.toString(); } this.trackHeader(buf.length); return 0; } /** * @param {number} len */ trackHeader(len) { this.headersSize += len; if (this.headersSize >= this.headersMaxSize) { util.destroy(this.socket, new HeadersOverflowError()); } } /** * @param {Buffer} head */ onUpgrade(head) { const { upgrade, client, socket, headers, statusCode } = this; assert(upgrade); assert(client[kSocket] === socket); assert(!socket.destroyed); assert(!this.paused); assert((headers.length & 1) === 0); const request = client[kQueue][client[kRunningIdx]]; assert(request); assert(request.upgrade || request.method === 'CONNECT'); this.statusCode = 0; this.statusText = ''; this.shouldKeepAlive = false; this.headers = []; this.headersSize = 0; socket.unshift(head); socket[kParser].destroy(); socket[kParser] = null; socket[kClient] = null; socket[kError] = null; removeAllListeners(socket); client[kSocket] = null; client[kHTTPContext] = null; // TODO (fix): This is hacky... client[kQueue][client[kRunningIdx]++] = null; client.emit( 'disconnect', client[kUrl], [client], new InformationalError('upgrade') ); try { request.onUpgrade(statusCode, headers, socket); } catch (err) { util.destroy(socket, err); } client[kResume](); } /** * @param {number} statusCode * @param {boolean} upgrade * @param {boolean} shouldKeepAlive * @returns {number} */ onHeadersComplete(statusCode, upgrade, shouldKeepAlive) { const { client, socket, headers, statusText } = this; /* istanbul ignore next: difficult to make a test case for */ if (socket.destroyed) { return -1; } const request = client[kQueue][client[kRunningIdx]]; /* istanbul ignore next: difficult to make a test case for */ if (!request) { return -1; } assert(!this.upgrade); assert(this.statusCode < 200); if (statusCode === 100) { util.destroy( socket, new SocketError('bad response', util.getSocketInfo(socket)) ); return -1; } /* this can only happen if server is misbehaving */ if (upgrade && !request.upgrade) { util.destroy( socket, new SocketError('bad upgrade', util.getSocketInfo(socket)) ); return -1; } assert(this.timeoutType === TIMEOUT_HEADERS); this.statusCode = statusCode; this.shouldKeepAlive = shouldKeepAlive || // Override llhttp value which does not allow keepAlive for HEAD. (request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive'); if (this.statusCode >= 200) { const bodyTimeout = request.bodyTimeout != null ? request.bodyTimeout : client[kBodyTimeout]; this.setTimeout(bodyTimeout, TIMEOUT_BODY); } else if (this.timeout) { // istanbul ignore else: only for jest if (this.timeout.refresh) { this.timeout.refresh(); } } if (request.method === 'CONNECT') { assert(client[kRunning] === 1); this.upgrade = true; return 2; } if (upgrade) { assert(client[kRunning] === 1); this.upgrade = true; return 2; } assert((this.headers.length & 1) === 0); this.headers = []; this.headersSize = 0; if (this.shouldKeepAlive && client[kPipelining]) { const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null; if (keepAliveTimeout != null) { const timeout = Math.min( keepAliveTimeout - client[kKeepAliveTimeoutThreshold], client[kKeepAliveMaxTimeout] ); if (timeout <= 0) { socket[kReset] = true; } else { client[kKeepAliveTimeoutValue] = timeout; } } else { client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout]; } } else { // Stop more requests from being dispatched. socket[kReset] = true; } const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false; if (request.aborted) { return -1; } if (request.method === 'HEAD') { return 1; } if (statusCode < 200) { return 1; } if (socket[kBlocking]) { socket[kBlocking] = false; client[kResume](); } return pause ? constants.ERROR.PAUSED : 0; } /** * @param {Buffer} buf * @returns {number} */ onBody(buf) { const { client, socket, statusCode, maxResponseSize } = this; if (socket.destroyed) { return -1; } const request = client[kQueue][client[kRunningIdx]]; assert(request); assert(this.timeoutType === TIMEOUT_BODY); if (this.timeout) { // istanbul ignore else: only for jest if (this.timeout.refresh) { this.timeout.refresh(); } } assert(statusCode >= 200); if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) { util.destroy(socket, new ResponseExceededMaxSizeError()); return -1; } this.bytesRead += buf.length; if (request.onData(buf) === false) { return constants.ERROR.PAUSED; } return 0; } /** * @returns {number} */ onMessageComplete() { const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive, } = this; if (socket.destroyed && (!statusCode || shouldKeepAlive)) { return -1; } if (upgrade) { return 0; } assert(statusCode >= 100); assert((this.headers.length & 1) === 0); const request = client[kQueue][client[kRunningIdx]]; assert(request); this.statusCode = 0; this.statusText = ''; this.bytesRead = 0; this.contentLength = ''; this.keepAlive = ''; this.connection = ''; this.headers = []; this.headersSize = 0; if (statusCode < 200) { return 0; } /* istanbul ignore next: should be handled by llhttp? */ if ( request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10) ) { util.destroy(socket, new ResponseContentLengthMismatchError()); return -1; } request.onComplete(headers); client[kQueue][client[kRunningIdx]++] = null; if (socket[kWriting]) { assert(client[kRunning] === 0); // Response completed before request. util.destroy(socket, new InformationalError('reset')); return constants.ERROR.PAUSED; } else if (!shouldKeepAlive) { util.destroy(socket, new InformationalError('reset')); return constants.ERROR.PAUSED; } else if (socket[kReset] && client[kRunning] === 0) { // Destroy socket once all requests have completed. // The request at the tail of the pipeline is the one // that requested reset and no further requests should // have been queued since then. util.destroy(socket, new InformationalError('reset')); return constants.ERROR.PAUSED; } else if (client[kPipelining] == null || client[kPipelining] === 1) { // We must wait a full event loop cycle to reuse this socket to make sure // that non-spec compliant servers are not closing the connection even if they // said they won't. setImmediate(() => client[kResume]()); } else { client[kResume](); } return 0; } } function onParserTimeout(parser) { const { socket, timeoutType, client, paused } = parser.deref(); /* istanbul ignore else */ if (timeoutType === TIMEOUT_HEADERS) { if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) { assert(!paused, 'cannot be paused while waiting for headers'); util.destroy(socket, new HeadersTimeoutError()); } } else if (timeoutType === TIMEOUT_BODY) { if (!paused) { util.destroy(socket, new BodyTimeoutError()); } } else if (timeoutType === TIMEOUT_KEEP_ALIVE) { assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue]); util.destroy(socket, new InformationalError('socket idle timeout')); } } /** * @param {import ('./client.js')} client * @param {import('net').Socket} socket * @returns */ async function connectH1(client, socket) { client[kSocket] = socket; if (!llhttpInstance) { const noop = () => {}; socket.on('error', noop); llhttpInstance = await llhttpPromise; llhttpPromise = null; socket.off('error', noop); } if (socket.errored) { throw socket.errored; } if (socket.destroyed) { throw new SocketError('destroyed'); } socket[kNoRef] = false; socket[kWriting] = false; socket[kReset] = false; socket[kBlocking] = false; socket[kParser] = new Parser(client, socket, llhttpInstance); util.addListener(socket, 'error', onHttpSocketError); util.addListener(socket, 'readable', onHttpSocketReadable); util.addListener(socket, 'end', onHttpSocketEnd); util.addListener(socket, 'close', onHttpSocketClose); socket[kClosed] = false; socket.on('close', onSocketClose); return { version: 'h1', defaultPipelining: 1, write(request) { return writeH1(client, request); }, resume() { resumeH1(client); }, /** * @param {Error|undefined} err * @param {() => void} callback */ destroy(err, callback) { if (socket[kClosed]) { queueMicrotask(callback); } else { socket.on('close', callback); socket.destroy(err); } }, /** * @returns {boolean} */ get destroyed() { return socket.destroyed; }, /** * @param {import('../core/request.js')} request * @returns {boolean} */ busy(request) { if (socket[kWriting] || socket[kReset] || socket[kBlocking]) { return true; } if (request) { if (client[kRunning] > 0 && !request.idempotent) { // Non-idempotent request cannot be retried. // Ensure that no other requests are inflight and // could cause failure. return true; } if ( client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT') ) { // Don't dispatch an upgrade until all preceding requests have completed. // A misbehaving server might upgrade the connection before all pipelined // request has completed. return true; } if ( client[kRunning] > 0 && util.bodyLength(request.body) !== 0 && (util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body)) ) { // Request with stream or iterator body can error while other requests // are inflight and indirectly error those as well. // Ensure this doesn't happen by waiting for inflight // to complete before dispatching. // Request with stream or iterator body cannot be retried. // Ensure that no other requests are inflight and // could cause failure. return true; } } return false; }, }; } function onHttpSocketError(err) { assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID'); const parser = this[kParser]; // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded // to the user. if ( err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive ) { // We treat all incoming data so for as a valid response. parser.onMessageComplete(); return; } this[kError] = err; this[kClient][kOnError](err); } function onHttpSocketReadable() { this[kParser]?.readMore(); } function onHttpSocketEnd() { const parser = this[kParser]; if (parser.statusCode && !parser.shouldKeepAlive) { // We treat all incoming data so far as a valid response. parser.onMessageComplete(); return; } util.destroy( this, new SocketError('other side closed', util.getSocketInfo(this)) ); } function onHttpSocketClose() { const parser = this[kParser]; if (parser) { if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { // We treat all incoming data so far as a valid response. parser.onMessageComplete(); } this[kParser].destroy(); this[kParser] = null; } const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)); const client = this[kClient]; client[kSocket] = null; client[kHTTPContext] = null; // TODO (fix): This is hacky... if (client.destroyed) { assert(client[kPending] === 0); // Fail entire queue. const requests = client[kQueue].splice(client[kRunningIdx]); for (let i = 0; i < requests.length; i++) { const request = requests[i]; util.errorRequest(client, request, err); } } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') { // Fail head of pipeline. const request = client[kQueue][client[kRunningIdx]]; client[kQueue][client[kRunningIdx]++] = null; util.errorRequest(client, request, err); } client[kPendingIdx] = client[kRunningIdx]; assert(client[kRunning] === 0); client.emit('disconnect', client[kUrl], [client], err); client[kResume](); } function onSocketClose() { this[kClosed] = true; } /** * @param {import('./client.js')} client */ function resumeH1(client) { const socket = client[kSocket]; if (socket && !socket.destroyed) { if (client[kSize] === 0) { if (!socket[kNoRef] && socket.unref) { socket.unref(); socket[kNoRef] = true; } } else if (socket[kNoRef] && socket.ref) { socket.ref(); socket[kNoRef] = false; } if (client[kSize] === 0) { if (socket[kParser].timeoutType !== TIMEOUT_KEEP_ALIVE) { socket[kParser].setTimeout( client[kKeepAliveTimeoutValue], TIMEOUT_KEEP_ALIVE ); } } else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) { if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) { const request = client[kQueue][client[kRunningIdx]]; const headersTimeout = request.headersTimeout != null ? request.headersTimeout : client[kHeadersTimeout]; socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS); } } } } // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2 function shouldSendContentLength(method) { return ( method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT' ); } /** * @param {import('./client.js')} client * @param {import('../core/request.js')} request * @returns */ function writeH1(client, request) { const { method, path, host, upgrade, blocking, reset } = request; let { body, headers, contentLength } = request; // https://tools.ietf.org/html/rfc7231#section-4.3.1 // https://tools.ietf.org/html/rfc7231#section-4.3.2 // https://tools.ietf.org/html/rfc7231#section-4.3.5 // Sending a payload body on a request that does not // expect it can cause undefined behavior on some // servers and corrupt connection state. Do not // re-use the connection for further requests. const expectsPayload = method === 'PUT' || method === 'POST' || method === 'PATCH' || method === 'QUERY' || method === 'PROPFIND' || method === 'PROPPATCH'; if (util.isFormDataLike(body)) { if (!extractBody) { extractBody = require('../web/fetch/body.js').extractBody; } const [bodyStream, contentType] = extractBody(body); if (request.contentType == null) { headers.push('content-type', contentType); } body = bodyStream.stream; contentLength = bodyStream.length; } else if ( util.isBlobLike(body) && request.contentType == null && body.type ) { headers.push('content-type', body.type); } if (body && typeof body.read === 'function') { // Try to read EOF in order to get length. body.read(0); } const bodyLength = util.bodyLength(body); contentLength = bodyLength ?? contentLength; if (contentLength === null) { contentLength = request.contentLength; } if (contentLength === 0 && !expectsPayload) { // https://tools.ietf.org/html/rfc7230#section-3.3.2 // A user agent SHOULD NOT send a Content-Length header field when // the request message does not contain a payload body and the method // semantics do not anticipate such a body. contentLength = null; } // https://github.com/nodejs/undici/issues/2046 // A user agent may send a Content-Length header with 0 value, this should be allowed. if ( shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength ) { if (client[kStrictContentLength]) { util.errorRequest( client, request, new RequestContentLengthMismatchError() ); return false; } process.emitWarning(new RequestContentLengthMismatchError()); } const socket = client[kSocket]; /** * @param {Error} [err] * @returns {void} */ const abort = (err) => { if (request.aborted || request.completed) { return; } util.errorRequest(client, request, err || new RequestAbortedError()); util.destroy(body); util.destroy(socket, new InformationalError('aborted')); }; try { request.onConnect(abort); } catch (err) { util.errorRequest(client, request, err); } if (request.aborted) { return false; } if (method === 'HEAD') { // https://github.com/mcollina/undici/issues/258 // Close after a HEAD request to interop with misbehaving servers // that may send a body in the response. socket[kReset] = true; } if (upgrade || method === 'CONNECT') { // On CONNECT or upgrade, block pipeline from dispatching further // requests on this connection. socket[kReset] = true; } if (reset != null) { socket[kReset] = reset; } if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) { socket[kReset] = true; } if (blocking) { socket[kBlocking] = true; } let header = `${method} ${path} HTTP/1.1\r\n`; if (typeof host === 'string') { header += `host: ${host}\r\n`; } else { header += client[kHostHeader]; } if (upgrade) { header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n`; } else if (client[kPipelining] && !socket[kReset]) { header += 'connection: keep-alive\r\n'; } else { header += 'connection: close\r\n'; } if (Array.isArray(headers)) { for (let n = 0; n < headers.length; n += 2) { const key = headers[n + 0]; const val = headers[n + 1]; if (Array.isArray(val)) { for (let i = 0; i < val.length; i++) { header += `${key}: ${val[i]}\r\n`; } } else { header += `${key}: ${val}\r\n`; } } } if (channels.sendHeaders.hasSubscribers) { channels.sendHeaders.publish({ request, headers: header, socket }); } /* istanbul ignore else: assertion */ if (!body || bodyLength === 0) { writeBuffer( abort, null, client, request, socket, contentLength, header, expectsPayload ); } else if (util.isBuffer(body)) { writeBuffer( abort, body, client, request, socket, contentLength, header, expectsPayload ); } else if (util.isBlobLike(body)) { if (typeof body.stream === 'function') { writeIterable( abort, body.stream(), client, request, socket, contentLength, header, expectsPayload ); } else { writeBlob( abort, body, client, request, socket, contentLength, header, expectsPayload ); } } else if (util.isStream(body)) { writeStream( abort, body, client, request, socket, contentLength, header, expectsPayload ); } else if (util.isIterable(body)) { writeIterable( abort, body, client, request, socket, contentLength, header, expectsPayload ); } else { assert(false); } return true; } /** * @param {AbortCallback} abort * @param {import('stream').Stream} body * @param {import('./client.js')} client * @param {import('../core/request.js')} request * @param {import('net').Socket} socket * @param {number} contentLength * @param {string} header * @param {boolean} expectsPayload */ function writeStream( abort, body, client, request, socket, contentLength, header, expectsPayload ) { assert( contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined' ); let finished = false; const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header, }); /** * @param {Buffer} chunk * @returns {void} */ const onData = function (chunk) { if (finished) { return; } try { if (!writer.write(chunk) && this.pause) { this.pause(); } } catch (err) { util.destroy(this, err); } }; /** * @returns {void} */ const onDrain = function () { if (finished) { return; } if (body.resume) { body.resume(); } }; /** * @returns {void} */ const onClose = function () { // 'close' might be emitted *before* 'error' for // broken streams. Wait a tick to avoid this case. queueMicrotask(() => { // It's only safe to remove 'error' listener after // 'close'. body.removeListener('error', onFinished); }); if (!finished) { const err = new RequestAbortedError(); queueMicrotask(() => onFinished(err)); } }; /** * @param {Error} [err] * @returns */ const onFinished = function (err) { if (finished) { return; } finished = true; assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1)); socket.off('drain', onDrain).off('error', onFinished); body .removeListener('data', onData) .removeListener('end', onFinished) .removeListener('close', onClose); if (!err) { try { writer.end(); } catch (er) { err = er; } } writer.destroy(err); if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) { util.destroy(body, err); } else { util.destroy(body); } }; body .on('data', onData) .on('end', onFinished) .on('error', onFinished) .on('close', onClose); if (body.resume) { body.resume(); } socket.on('drain', onDrain).on('error', onFinished); if (body.errorEmitted ?? body.errored) { setImmediate(() => onFinished(body.errored)); } else if (body.endEmitted ?? body.readableEnded) { setImmediate(() => onFinished(null)); } if (body.closeEmitted ?? body.closed) { setImmediate(onClose); } } /** * @typedef AbortCallback * @type {Function} * @param {Error} [err] * @returns {void} */ /** * @param {AbortCallback} abort * @param {Uint8Array|null} body * @param {import('./client.js')} client * @param {import('../core/request.js')} request * @param {import('net').Socket} socket * @param {number} contentLength * @param {string} header * @param {boolean} expectsPayload * @returns {void} */ function writeBuffer( abort, body, client, request, socket, contentLength, header, expectsPayload ) { try { if (!body) { if (contentLength === 0) { socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1'); } else { assert(contentLength === null, 'no body must not have content length'); socket.write(`${header}\r\n`, 'latin1'); } } else if (util.isBuffer(body)) { assert( contentLength === body.byteLength, 'buffer body must have content length' ); socket.cork(); socket.write( `${header}content-length: ${contentLength}\r\n\r\n`, 'latin1' ); socket.write(body); socket.uncork(); request.onBodySent(body); if (!expectsPayload && request.reset !== false) { socket[kReset] = true; } } request.onRequestSent(); client[kResume](); } catch (err) { abort(err); } } /** * @param {AbortCallback} abort * @param {Blob} body * @param {import('./client.js')} client * @param {import('../core/request.js')} request * @param {import('net').Socket} socket * @param {number} contentLength * @param {string} header * @param {boolean} expectsPayload * @returns {Promise} */ async function writeBlob( abort, body, client, request, socket, contentLength, header, expectsPayload ) { assert(contentLength === body.size, 'blob body must have content length'); try { if (contentLength != null && contentLength !== body.size) { throw new RequestContentLengthMismatchError(); } const buffer = Buffer.from(await body.arrayBuffer()); socket.cork(); socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1'); socket.write(buffer); socket.uncork(); request.onBodySent(buffer); request.onRequestSent(); if (!expectsPayload && request.reset !== false) { socket[kReset] = true; } client[kResume](); } catch (err) { abort(err); } } /** * @param {AbortCallback} abort * @param {Iterable} body * @param {import('./client.js')} client * @param {import('../core/request.js')} request * @param {import('net').Socket} socket * @param {number} contentLength * @param {string} header * @param {boolean} expectsPayload * @returns {Promise} */ async function writeIterable( abort, body, client, request, socket, contentLength, header, expectsPayload ) { assert( contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined' ); let callback = null; function onDrain() { if (callback) { const cb = callback; callback = null; cb(); } } const waitForDrain = () => new Promise((resolve, reject) => { assert(callback === null); if (socket[kError]) { reject(socket[kError]); } else { callback = resolve; } }); socket.on('close', onDrain).on('drain', onDrain); const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header, }); try { // It's up to the user to somehow abort the async iterable. for await (const chunk of body) { if (socket[kError]) { throw socket[kError]; } if (!writer.write(chunk)) { await waitForDrain(); } } writer.end(); } catch (err) { writer.destroy(err); } finally { socket.off('close', onDrain).off('drain', onDrain); } } class AsyncWriter { /** * * @param {object} arg * @param {AbortCallback} arg.abort * @param {import('net').Socket} arg.socket * @param {import('../core/request.js')} arg.request * @param {number} arg.contentLength * @param {import('./client.js')} arg.client * @param {boolean} arg.expectsPayload * @param {string} arg.header */ constructor({ abort, socket, request, contentLength, client, expectsPayload, header, }) { this.socket = socket; this.request = request; this.contentLength = contentLength; this.client = client; this.bytesWritten = 0; this.expectsPayload = expectsPayload; this.header = header; this.abort = abort; socket[kWriting] = true; } /** * @param {Buffer} chunk * @returns */ write(chunk) { const { socket, request, contentLength, client, bytesWritten, expectsPayload, header, } = this; if (socket[kError]) { throw socket[kError]; } if (socket.destroyed) { return false; } const len = Buffer.byteLength(chunk); if (!len) { return true; } // We should defer writing chunks. if (contentLength !== null && bytesWritten + len > contentLength) { if (client[kStrictContentLength]) { throw new RequestContentLengthMismatchError(); } process.emitWarning(new RequestContentLengthMismatchError()); } socket.cork(); if (bytesWritten === 0) { if (!expectsPayload && request.reset !== false) { socket[kReset] = true; } if (contentLength === null) { socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1'); } else { socket.write( `${header}content-length: ${contentLength}\r\n\r\n`, 'latin1' ); } } if (contentLength === null) { socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1'); } this.bytesWritten += len; const ret = socket.write(chunk); socket.uncork(); request.onBodySent(chunk); if (!ret) { if ( socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS ) { // istanbul ignore else: only for jest if (socket[kParser].timeout.refresh) { socket[kParser].timeout.refresh(); } } } return ret; } /** * @returns {void} */ end() { const { socket, contentLength, client, bytesWritten, expectsPayload, header, request, } = this; request.onRequestSent(); socket[kWriting] = false; if (socket[kError]) { throw socket[kError]; } if (socket.destroyed) { return; } if (bytesWritten === 0) { if (expectsPayload) { // https://tools.ietf.org/html/rfc7230#section-3.3.2 // A user agent SHOULD send a Content-Length in a request message when // no Transfer-Encoding is sent and the request method defines a meaning // for an enclosed payload body. socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1'); } else { socket.write(`${header}\r\n`, 'latin1'); } } else if (contentLength === null) { socket.write('\r\n0\r\n\r\n', 'latin1'); } if (contentLength !== null && bytesWritten !== contentLength) { if (client[kStrictContentLength]) { throw new RequestContentLengthMismatchError(); } else { process.emitWarning(new RequestContentLengthMismatchError()); } } if ( socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS ) { // istanbul ignore else: only for jest if (socket[kParser].timeout.refresh) { socket[kParser].timeout.refresh(); } } client[kResume](); } /** * @param {Error} [err] * @returns {void} */ destroy(err) { const { socket, client, abort } = this; socket[kWriting] = false; if (err) { assert( client[kRunning] <= 1, 'pipeline should only contain this request' ); abort(err); } } } module.exports = connectH1;