'use strict'; const assert = require('node:assert'); const net = require('node:net'); const http = require('node:http'); const util = require('../core/util.js'); const { channels } = require('../core/diagnostics.js'); const Request = require('../core/request.js'); const DispatcherBase = require('./dispatcher-base'); const { InvalidArgumentError, InformationalError, ClientDestroyedError, } = require('../core/errors.js'); const buildConnector = require('../core/connect.js'); const { kUrl, kServerName, kClient, kBusy, kConnect, kResuming, kRunning, kPending, kSize, kQueue, kConnected, kConnecting, kNeedDrain, kKeepAliveDefaultTimeout, kHostHeader, kPendingIdx, kRunningIdx, kError, kPipelining, kKeepAliveTimeoutValue, kMaxHeadersSize, kKeepAliveMaxTimeout, kKeepAliveTimeoutThreshold, kHeadersTimeout, kBodyTimeout, kStrictContentLength, kConnector, kMaxRequests, kCounter, kClose, kDestroy, kDispatch, kLocalAddress, kMaxResponseSize, kOnError, kHTTPContext, kMaxConcurrentStreams, kResume, } = require('../core/symbols.js'); const connectH1 = require('./client-h1.js'); const connectH2 = require('./client-h2.js'); const kClosedResolve = Symbol('kClosedResolve'); const getDefaultNodeMaxHeaderSize = ( http && http.maxHeaderSize && Number.isInteger(http.maxHeaderSize) && http.maxHeaderSize > 0 ) ? () => http.maxHeaderSize : () => { throw new InvalidArgumentError( 'http module not available or http.maxHeaderSize invalid' ); }; const noop = () => {}; function getPipelining(client) { return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1; } /** * @type {import('../../types/client.js').default} */ class Client extends DispatcherBase { /** * * @param {string|URL} url * @param {import('../../types/client.js').Client.Options} options */ constructor( url, { maxHeaderSize, headersTimeout, socketTimeout, requestTimeout, connectTimeout, bodyTimeout, idleTimeout, keepAlive, keepAliveTimeout, maxKeepAliveTimeout, keepAliveMaxTimeout, keepAliveTimeoutThreshold, socketPath, pipelining, tls, strictContentLength, maxCachedSessions, connect, maxRequestsPerClient, localAddress, maxResponseSize, autoSelectFamily, autoSelectFamilyAttemptTimeout, // h2 maxConcurrentStreams, allowH2, } = {} ) { if (keepAlive !== undefined) { throw new InvalidArgumentError( 'unsupported keepAlive, use pipelining=0 instead' ); } if (socketTimeout !== undefined) { throw new InvalidArgumentError( 'unsupported socketTimeout, use headersTimeout & bodyTimeout instead' ); } if (requestTimeout !== undefined) { throw new InvalidArgumentError( 'unsupported requestTimeout, use headersTimeout & bodyTimeout instead' ); } if (idleTimeout !== undefined) { throw new InvalidArgumentError( 'unsupported idleTimeout, use keepAliveTimeout instead' ); } if (maxKeepAliveTimeout !== undefined) { throw new InvalidArgumentError( 'unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead' ); } if (maxHeaderSize != null) { if (!Number.isInteger(maxHeaderSize) || maxHeaderSize < 1) { throw new InvalidArgumentError('invalid maxHeaderSize'); } } else { // If maxHeaderSize is not provided, use the default value from the http module // or if that is not available, throw an error. maxHeaderSize = getDefaultNodeMaxHeaderSize(); } if (socketPath != null && typeof socketPath !== 'string') { throw new InvalidArgumentError('invalid socketPath'); } if ( connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0) ) { throw new InvalidArgumentError('invalid connectTimeout'); } if ( keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0) ) { throw new InvalidArgumentError('invalid keepAliveTimeout'); } if ( keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0) ) { throw new InvalidArgumentError('invalid keepAliveMaxTimeout'); } if ( keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold) ) { throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold'); } if ( headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0) ) { throw new InvalidArgumentError( 'headersTimeout must be a positive integer or zero' ); } if ( bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0) ) { throw new InvalidArgumentError( 'bodyTimeout must be a positive integer or zero' ); } if ( connect != null && typeof connect !== 'function' && typeof connect !== 'object' ) { throw new InvalidArgumentError('connect must be a function or an object'); } if ( maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0) ) { throw new InvalidArgumentError( 'maxRequestsPerClient must be a positive number' ); } if ( localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0) ) { throw new InvalidArgumentError( 'localAddress must be valid string IP address' ); } if ( maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1) ) { throw new InvalidArgumentError( 'maxResponseSize must be a positive number' ); } if ( autoSelectFamilyAttemptTimeout != null && (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1) ) { throw new InvalidArgumentError( 'autoSelectFamilyAttemptTimeout must be a positive number' ); } // h2 if (allowH2 != null && typeof allowH2 !== 'boolean') { throw new InvalidArgumentError('allowH2 must be a valid boolean value'); } if ( maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1) ) { throw new InvalidArgumentError( 'maxConcurrentStreams must be a positive integer, greater than 0' ); } super(); if (typeof connect !== 'function') { connect = buildConnector({ ...tls, maxCachedSessions, allowH2, socketPath, timeout: connectTimeout, ...(typeof autoSelectFamily === 'boolean' ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined), ...connect, }); } this[kUrl] = util.parseOrigin(url); this[kConnector] = connect; this[kPipelining] = pipelining != null ? pipelining : 1; this[kMaxHeadersSize] = maxHeaderSize; this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout; this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout; this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 2e3 : keepAliveTimeoutThreshold; this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]; this[kServerName] = null; this[kLocalAddress] = localAddress != null ? localAddress : null; this[kResuming] = 0; // 0, idle, 1, scheduled, 2 resuming this[kNeedDrain] = 0; // 0, idle, 1, scheduled, 2 resuming this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`; this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3; this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3; this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength; this[kMaxRequests] = maxRequestsPerClient; this[kClosedResolve] = null; this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1; this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100; // Max peerConcurrentStreams for a Node h2 server this[kHTTPContext] = null; // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. // | complete | running | pending | // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length // kRunningIdx points to the first running element. // kPendingIdx points to the first pending element. // This implements a fast queue with an amortized // time of O(1). this[kQueue] = []; this[kRunningIdx] = 0; this[kPendingIdx] = 0; this[kResume] = (sync) => resume(this, sync); this[kOnError] = (err) => onError(this, err); } get pipelining() { return this[kPipelining]; } set pipelining(value) { this[kPipelining] = value; this[kResume](true); } get [kPending]() { return this[kQueue].length - this[kPendingIdx]; } get [kRunning]() { return this[kPendingIdx] - this[kRunningIdx]; } get [kSize]() { return this[kQueue].length - this[kRunningIdx]; } get [kConnected]() { return ( !!this[kHTTPContext] && !this[kConnecting] && !this[kHTTPContext].destroyed ); } get [kBusy]() { return Boolean( this[kHTTPContext]?.busy(null) || this[kSize] >= (getPipelining(this) || 1) || this[kPending] > 0 ); } /* istanbul ignore: only used for test */ [kConnect](cb) { connect(this); this.once('connect', cb); } [kDispatch](opts, handler) { const origin = opts.origin || this[kUrl].origin; const request = new Request(origin, opts, handler); this[kQueue].push(request); if (this[kResuming]) { // Do nothing. } else if ( util.bodyLength(request.body) == null && util.isIterable(request.body) ) { // Wait a tick in case stream/iterator is ended in the same tick. this[kResuming] = 1; queueMicrotask(() => resume(this)); } else { this[kResume](true); } if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { this[kNeedDrain] = 2; } return this[kNeedDrain] < 2; } async [kClose]() { // TODO: for H2 we need to gracefully flush the remaining enqueued // request and close each stream. return new Promise((resolve) => { if (this[kSize]) { this[kClosedResolve] = resolve; } else { resolve(null); } }); } async [kDestroy](err) { return new Promise((resolve) => { const requests = this[kQueue].splice(this[kPendingIdx]); for (let i = 0; i < requests.length; i++) { const request = requests[i]; util.errorRequest(this, request, err); } const callback = () => { if (this[kClosedResolve]) { // TODO (fix): Should we error here with ClientDestroyedError? this[kClosedResolve](); this[kClosedResolve] = null; } resolve(null); }; if (this[kHTTPContext]) { this[kHTTPContext].destroy(err, callback); this[kHTTPContext] = null; } else { queueMicrotask(callback); } this[kResume](); }); } } function onError(client, err) { if ( client[kRunning] === 0 && err.code !== 'UND_ERR_INFO' && err.code !== 'UND_ERR_SOCKET' ) { // Error is not caused by running request and not a recoverable // socket error. assert(client[kPendingIdx] === client[kRunningIdx]); const requests = client[kQueue].splice(client[kRunningIdx]); for (let i = 0; i < requests.length; i++) { const request = requests[i]; util.errorRequest(client, request, err); } assert(client[kSize] === 0); } } /** * @param {Client} client * @returns */ async function connect(client) { assert(!client[kConnecting]); assert(!client[kHTTPContext]); let { host, hostname, protocol, port } = client[kUrl]; // Resolve ipv6 if (hostname[0] === '[') { const idx = hostname.indexOf(']'); assert(idx !== -1); const ip = hostname.substring(1, idx); assert(net.isIPv6(ip)); hostname = ip; } client[kConnecting] = true; if (channels.beforeConnect.hasSubscribers) { channels.beforeConnect.publish({ connectParams: { host, hostname, protocol, port, version: client[kHTTPContext]?.version, servername: client[kServerName], localAddress: client[kLocalAddress], }, connector: client[kConnector], }); } try { const socket = await new Promise((resolve, reject) => { client[kConnector]( { host, hostname, protocol, port, servername: client[kServerName], localAddress: client[kLocalAddress], }, (err, socket) => { if (err) { reject(err); } else { resolve(socket); } } ); }); if (client.destroyed) { util.destroy(socket.on('error', noop), new ClientDestroyedError()); return; } assert(socket); try { client[kHTTPContext] = socket.alpnProtocol === 'h2' ? await connectH2(client, socket) : await connectH1(client, socket); } catch (err) { socket.destroy().on('error', noop); throw err; } client[kConnecting] = false; socket[kCounter] = 0; socket[kMaxRequests] = client[kMaxRequests]; socket[kClient] = client; socket[kError] = null; if (channels.connected.hasSubscribers) { channels.connected.publish({ connectParams: { host, hostname, protocol, port, version: client[kHTTPContext]?.version, servername: client[kServerName], localAddress: client[kLocalAddress], }, connector: client[kConnector], socket, }); } client.emit('connect', client[kUrl], [client]); } catch (err) { if (client.destroyed) { return; } client[kConnecting] = false; if (channels.connectError.hasSubscribers) { channels.connectError.publish({ connectParams: { host, hostname, protocol, port, version: client[kHTTPContext]?.version, servername: client[kServerName], localAddress: client[kLocalAddress], }, connector: client[kConnector], error: err, }); } if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') { assert(client[kRunning] === 0); while ( client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName] ) { const request = client[kQueue][client[kPendingIdx]++]; util.errorRequest(client, request, err); } } else { onError(client, err); } client.emit('connectionError', client[kUrl], [client], err); } client[kResume](); } function emitDrain(client) { client[kNeedDrain] = 0; client.emit('drain', client[kUrl], [client]); } function resume(client, sync) { if (client[kResuming] === 2) { return; } client[kResuming] = 2; _resume(client, sync); client[kResuming] = 0; if (client[kRunningIdx] > 256) { client[kQueue].splice(0, client[kRunningIdx]); client[kPendingIdx] -= client[kRunningIdx]; client[kRunningIdx] = 0; } } function _resume(client, sync) { while (true) { if (client.destroyed) { assert(client[kPending] === 0); return; } if (client[kClosedResolve] && !client[kSize]) { client[kClosedResolve](); client[kClosedResolve] = null; return; } if (client[kHTTPContext]) { client[kHTTPContext].resume(); } if (client[kBusy]) { client[kNeedDrain] = 2; } else if (client[kNeedDrain] === 2) { if (sync) { client[kNeedDrain] = 1; queueMicrotask(() => emitDrain(client)); } else { emitDrain(client); } continue; } if (client[kPending] === 0) { return; } if (client[kRunning] >= (getPipelining(client) || 1)) { return; } const request = client[kQueue][client[kPendingIdx]]; if ( client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername ) { if (client[kRunning] > 0) { return; } client[kServerName] = request.servername; client[kHTTPContext]?.destroy( new InformationalError('servername changed'), () => { client[kHTTPContext] = null; resume(client); } ); } if (client[kConnecting]) { return; } if (!client[kHTTPContext]) { connect(client); return; } if (client[kHTTPContext].destroyed) { return; } if (client[kHTTPContext].busy(request)) { return; } if (!request.aborted && client[kHTTPContext].write(request)) { client[kPendingIdx]++; } else { client[kQueue].splice(client[kPendingIdx], 1); } } } module.exports = Client;