707 lines
17 KiB
JavaScript
707 lines
17 KiB
JavaScript
'use strict';
|
|
|
|
const assert = require('node:assert');
|
|
const net = require('node:net');
|
|
const http = require('node:http');
|
|
const util = require('../core/util.js');
|
|
const { channels } = require('../core/diagnostics.js');
|
|
const Request = require('../core/request.js');
|
|
const DispatcherBase = require('./dispatcher-base');
|
|
const {
|
|
InvalidArgumentError,
|
|
InformationalError,
|
|
ClientDestroyedError,
|
|
} = require('../core/errors.js');
|
|
const buildConnector = require('../core/connect.js');
|
|
const {
|
|
kUrl,
|
|
kServerName,
|
|
kClient,
|
|
kBusy,
|
|
kConnect,
|
|
kResuming,
|
|
kRunning,
|
|
kPending,
|
|
kSize,
|
|
kQueue,
|
|
kConnected,
|
|
kConnecting,
|
|
kNeedDrain,
|
|
kKeepAliveDefaultTimeout,
|
|
kHostHeader,
|
|
kPendingIdx,
|
|
kRunningIdx,
|
|
kError,
|
|
kPipelining,
|
|
kKeepAliveTimeoutValue,
|
|
kMaxHeadersSize,
|
|
kKeepAliveMaxTimeout,
|
|
kKeepAliveTimeoutThreshold,
|
|
kHeadersTimeout,
|
|
kBodyTimeout,
|
|
kStrictContentLength,
|
|
kConnector,
|
|
kMaxRequests,
|
|
kCounter,
|
|
kClose,
|
|
kDestroy,
|
|
kDispatch,
|
|
kLocalAddress,
|
|
kMaxResponseSize,
|
|
kOnError,
|
|
kHTTPContext,
|
|
kMaxConcurrentStreams,
|
|
kResume,
|
|
} = require('../core/symbols.js');
|
|
const connectH1 = require('./client-h1.js');
|
|
const connectH2 = require('./client-h2.js');
|
|
|
|
const kClosedResolve = Symbol('kClosedResolve');
|
|
|
|
const getDefaultNodeMaxHeaderSize =
|
|
(
|
|
http &&
|
|
http.maxHeaderSize &&
|
|
Number.isInteger(http.maxHeaderSize) &&
|
|
http.maxHeaderSize > 0
|
|
) ?
|
|
() => http.maxHeaderSize
|
|
: () => {
|
|
throw new InvalidArgumentError(
|
|
'http module not available or http.maxHeaderSize invalid'
|
|
);
|
|
};
|
|
|
|
const noop = () => {};
|
|
|
|
function getPipelining(client) {
|
|
return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1;
|
|
}
|
|
|
|
/**
|
|
* @type {import('../../types/client.js').default}
|
|
*/
|
|
class Client extends DispatcherBase {
|
|
/**
|
|
*
|
|
* @param {string|URL} url
|
|
* @param {import('../../types/client.js').Client.Options} options
|
|
*/
|
|
constructor(
|
|
url,
|
|
{
|
|
maxHeaderSize,
|
|
headersTimeout,
|
|
socketTimeout,
|
|
requestTimeout,
|
|
connectTimeout,
|
|
bodyTimeout,
|
|
idleTimeout,
|
|
keepAlive,
|
|
keepAliveTimeout,
|
|
maxKeepAliveTimeout,
|
|
keepAliveMaxTimeout,
|
|
keepAliveTimeoutThreshold,
|
|
socketPath,
|
|
pipelining,
|
|
tls,
|
|
strictContentLength,
|
|
maxCachedSessions,
|
|
connect,
|
|
maxRequestsPerClient,
|
|
localAddress,
|
|
maxResponseSize,
|
|
autoSelectFamily,
|
|
autoSelectFamilyAttemptTimeout,
|
|
// h2
|
|
maxConcurrentStreams,
|
|
allowH2,
|
|
} = {}
|
|
) {
|
|
if (keepAlive !== undefined) {
|
|
throw new InvalidArgumentError(
|
|
'unsupported keepAlive, use pipelining=0 instead'
|
|
);
|
|
}
|
|
|
|
if (socketTimeout !== undefined) {
|
|
throw new InvalidArgumentError(
|
|
'unsupported socketTimeout, use headersTimeout & bodyTimeout instead'
|
|
);
|
|
}
|
|
|
|
if (requestTimeout !== undefined) {
|
|
throw new InvalidArgumentError(
|
|
'unsupported requestTimeout, use headersTimeout & bodyTimeout instead'
|
|
);
|
|
}
|
|
|
|
if (idleTimeout !== undefined) {
|
|
throw new InvalidArgumentError(
|
|
'unsupported idleTimeout, use keepAliveTimeout instead'
|
|
);
|
|
}
|
|
|
|
if (maxKeepAliveTimeout !== undefined) {
|
|
throw new InvalidArgumentError(
|
|
'unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead'
|
|
);
|
|
}
|
|
|
|
if (maxHeaderSize != null) {
|
|
if (!Number.isInteger(maxHeaderSize) || maxHeaderSize < 1) {
|
|
throw new InvalidArgumentError('invalid maxHeaderSize');
|
|
}
|
|
} else {
|
|
// If maxHeaderSize is not provided, use the default value from the http module
|
|
// or if that is not available, throw an error.
|
|
maxHeaderSize = getDefaultNodeMaxHeaderSize();
|
|
}
|
|
|
|
if (socketPath != null && typeof socketPath !== 'string') {
|
|
throw new InvalidArgumentError('invalid socketPath');
|
|
}
|
|
|
|
if (
|
|
connectTimeout != null &&
|
|
(!Number.isFinite(connectTimeout) || connectTimeout < 0)
|
|
) {
|
|
throw new InvalidArgumentError('invalid connectTimeout');
|
|
}
|
|
|
|
if (
|
|
keepAliveTimeout != null &&
|
|
(!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)
|
|
) {
|
|
throw new InvalidArgumentError('invalid keepAliveTimeout');
|
|
}
|
|
|
|
if (
|
|
keepAliveMaxTimeout != null &&
|
|
(!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)
|
|
) {
|
|
throw new InvalidArgumentError('invalid keepAliveMaxTimeout');
|
|
}
|
|
|
|
if (
|
|
keepAliveTimeoutThreshold != null &&
|
|
!Number.isFinite(keepAliveTimeoutThreshold)
|
|
) {
|
|
throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold');
|
|
}
|
|
|
|
if (
|
|
headersTimeout != null &&
|
|
(!Number.isInteger(headersTimeout) || headersTimeout < 0)
|
|
) {
|
|
throw new InvalidArgumentError(
|
|
'headersTimeout must be a positive integer or zero'
|
|
);
|
|
}
|
|
|
|
if (
|
|
bodyTimeout != null &&
|
|
(!Number.isInteger(bodyTimeout) || bodyTimeout < 0)
|
|
) {
|
|
throw new InvalidArgumentError(
|
|
'bodyTimeout must be a positive integer or zero'
|
|
);
|
|
}
|
|
|
|
if (
|
|
connect != null &&
|
|
typeof connect !== 'function' &&
|
|
typeof connect !== 'object'
|
|
) {
|
|
throw new InvalidArgumentError('connect must be a function or an object');
|
|
}
|
|
|
|
if (
|
|
maxRequestsPerClient != null &&
|
|
(!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)
|
|
) {
|
|
throw new InvalidArgumentError(
|
|
'maxRequestsPerClient must be a positive number'
|
|
);
|
|
}
|
|
|
|
if (
|
|
localAddress != null &&
|
|
(typeof localAddress !== 'string' || net.isIP(localAddress) === 0)
|
|
) {
|
|
throw new InvalidArgumentError(
|
|
'localAddress must be valid string IP address'
|
|
);
|
|
}
|
|
|
|
if (
|
|
maxResponseSize != null &&
|
|
(!Number.isInteger(maxResponseSize) || maxResponseSize < -1)
|
|
) {
|
|
throw new InvalidArgumentError(
|
|
'maxResponseSize must be a positive number'
|
|
);
|
|
}
|
|
|
|
if (
|
|
autoSelectFamilyAttemptTimeout != null &&
|
|
(!Number.isInteger(autoSelectFamilyAttemptTimeout) ||
|
|
autoSelectFamilyAttemptTimeout < -1)
|
|
) {
|
|
throw new InvalidArgumentError(
|
|
'autoSelectFamilyAttemptTimeout must be a positive number'
|
|
);
|
|
}
|
|
|
|
// h2
|
|
if (allowH2 != null && typeof allowH2 !== 'boolean') {
|
|
throw new InvalidArgumentError('allowH2 must be a valid boolean value');
|
|
}
|
|
|
|
if (
|
|
maxConcurrentStreams != null &&
|
|
(typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)
|
|
) {
|
|
throw new InvalidArgumentError(
|
|
'maxConcurrentStreams must be a positive integer, greater than 0'
|
|
);
|
|
}
|
|
|
|
super();
|
|
|
|
if (typeof connect !== 'function') {
|
|
connect = buildConnector({
|
|
...tls,
|
|
maxCachedSessions,
|
|
allowH2,
|
|
socketPath,
|
|
timeout: connectTimeout,
|
|
...(typeof autoSelectFamily === 'boolean' ?
|
|
{ autoSelectFamily, autoSelectFamilyAttemptTimeout }
|
|
: undefined),
|
|
...connect,
|
|
});
|
|
}
|
|
|
|
this[kUrl] = util.parseOrigin(url);
|
|
this[kConnector] = connect;
|
|
this[kPipelining] = pipelining != null ? pipelining : 1;
|
|
this[kMaxHeadersSize] = maxHeaderSize;
|
|
this[kKeepAliveDefaultTimeout] =
|
|
keepAliveTimeout == null ? 4e3 : keepAliveTimeout;
|
|
this[kKeepAliveMaxTimeout] =
|
|
keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout;
|
|
this[kKeepAliveTimeoutThreshold] =
|
|
keepAliveTimeoutThreshold == null ? 2e3 : keepAliveTimeoutThreshold;
|
|
this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout];
|
|
this[kServerName] = null;
|
|
this[kLocalAddress] = localAddress != null ? localAddress : null;
|
|
this[kResuming] = 0; // 0, idle, 1, scheduled, 2 resuming
|
|
this[kNeedDrain] = 0; // 0, idle, 1, scheduled, 2 resuming
|
|
this[kHostHeader] =
|
|
`host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`;
|
|
this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3;
|
|
this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3;
|
|
this[kStrictContentLength] =
|
|
strictContentLength == null ? true : strictContentLength;
|
|
this[kMaxRequests] = maxRequestsPerClient;
|
|
this[kClosedResolve] = null;
|
|
this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1;
|
|
this[kMaxConcurrentStreams] =
|
|
maxConcurrentStreams != null ? maxConcurrentStreams : 100; // Max peerConcurrentStreams for a Node h2 server
|
|
this[kHTTPContext] = null;
|
|
|
|
// kQueue is built up of 3 sections separated by
|
|
// the kRunningIdx and kPendingIdx indices.
|
|
// | complete | running | pending |
|
|
// ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
|
|
// kRunningIdx points to the first running element.
|
|
// kPendingIdx points to the first pending element.
|
|
// This implements a fast queue with an amortized
|
|
// time of O(1).
|
|
|
|
this[kQueue] = [];
|
|
this[kRunningIdx] = 0;
|
|
this[kPendingIdx] = 0;
|
|
|
|
this[kResume] = (sync) => resume(this, sync);
|
|
this[kOnError] = (err) => onError(this, err);
|
|
}
|
|
|
|
get pipelining() {
|
|
return this[kPipelining];
|
|
}
|
|
|
|
set pipelining(value) {
|
|
this[kPipelining] = value;
|
|
this[kResume](true);
|
|
}
|
|
|
|
get [kPending]() {
|
|
return this[kQueue].length - this[kPendingIdx];
|
|
}
|
|
|
|
get [kRunning]() {
|
|
return this[kPendingIdx] - this[kRunningIdx];
|
|
}
|
|
|
|
get [kSize]() {
|
|
return this[kQueue].length - this[kRunningIdx];
|
|
}
|
|
|
|
get [kConnected]() {
|
|
return (
|
|
!!this[kHTTPContext] &&
|
|
!this[kConnecting] &&
|
|
!this[kHTTPContext].destroyed
|
|
);
|
|
}
|
|
|
|
get [kBusy]() {
|
|
return Boolean(
|
|
this[kHTTPContext]?.busy(null) ||
|
|
this[kSize] >= (getPipelining(this) || 1) ||
|
|
this[kPending] > 0
|
|
);
|
|
}
|
|
|
|
/* istanbul ignore: only used for test */
|
|
[kConnect](cb) {
|
|
connect(this);
|
|
this.once('connect', cb);
|
|
}
|
|
|
|
[kDispatch](opts, handler) {
|
|
const origin = opts.origin || this[kUrl].origin;
|
|
const request = new Request(origin, opts, handler);
|
|
|
|
this[kQueue].push(request);
|
|
if (this[kResuming]) {
|
|
// Do nothing.
|
|
} else if (
|
|
util.bodyLength(request.body) == null &&
|
|
util.isIterable(request.body)
|
|
) {
|
|
// Wait a tick in case stream/iterator is ended in the same tick.
|
|
this[kResuming] = 1;
|
|
queueMicrotask(() => resume(this));
|
|
} else {
|
|
this[kResume](true);
|
|
}
|
|
|
|
if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
|
|
this[kNeedDrain] = 2;
|
|
}
|
|
|
|
return this[kNeedDrain] < 2;
|
|
}
|
|
|
|
async [kClose]() {
|
|
// TODO: for H2 we need to gracefully flush the remaining enqueued
|
|
// request and close each stream.
|
|
return new Promise((resolve) => {
|
|
if (this[kSize]) {
|
|
this[kClosedResolve] = resolve;
|
|
} else {
|
|
resolve(null);
|
|
}
|
|
});
|
|
}
|
|
|
|
async [kDestroy](err) {
|
|
return new Promise((resolve) => {
|
|
const requests = this[kQueue].splice(this[kPendingIdx]);
|
|
for (let i = 0; i < requests.length; i++) {
|
|
const request = requests[i];
|
|
util.errorRequest(this, request, err);
|
|
}
|
|
|
|
const callback = () => {
|
|
if (this[kClosedResolve]) {
|
|
// TODO (fix): Should we error here with ClientDestroyedError?
|
|
this[kClosedResolve]();
|
|
this[kClosedResolve] = null;
|
|
}
|
|
resolve(null);
|
|
};
|
|
|
|
if (this[kHTTPContext]) {
|
|
this[kHTTPContext].destroy(err, callback);
|
|
this[kHTTPContext] = null;
|
|
} else {
|
|
queueMicrotask(callback);
|
|
}
|
|
|
|
this[kResume]();
|
|
});
|
|
}
|
|
}
|
|
|
|
function onError(client, err) {
|
|
if (
|
|
client[kRunning] === 0 &&
|
|
err.code !== 'UND_ERR_INFO' &&
|
|
err.code !== 'UND_ERR_SOCKET'
|
|
) {
|
|
// Error is not caused by running request and not a recoverable
|
|
// socket error.
|
|
|
|
assert(client[kPendingIdx] === client[kRunningIdx]);
|
|
|
|
const requests = client[kQueue].splice(client[kRunningIdx]);
|
|
|
|
for (let i = 0; i < requests.length; i++) {
|
|
const request = requests[i];
|
|
util.errorRequest(client, request, err);
|
|
}
|
|
assert(client[kSize] === 0);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param {Client} client
|
|
* @returns
|
|
*/
|
|
async function connect(client) {
|
|
assert(!client[kConnecting]);
|
|
assert(!client[kHTTPContext]);
|
|
|
|
let { host, hostname, protocol, port } = client[kUrl];
|
|
|
|
// Resolve ipv6
|
|
if (hostname[0] === '[') {
|
|
const idx = hostname.indexOf(']');
|
|
|
|
assert(idx !== -1);
|
|
const ip = hostname.substring(1, idx);
|
|
|
|
assert(net.isIPv6(ip));
|
|
hostname = ip;
|
|
}
|
|
|
|
client[kConnecting] = true;
|
|
|
|
if (channels.beforeConnect.hasSubscribers) {
|
|
channels.beforeConnect.publish({
|
|
connectParams: {
|
|
host,
|
|
hostname,
|
|
protocol,
|
|
port,
|
|
version: client[kHTTPContext]?.version,
|
|
servername: client[kServerName],
|
|
localAddress: client[kLocalAddress],
|
|
},
|
|
connector: client[kConnector],
|
|
});
|
|
}
|
|
|
|
try {
|
|
const socket = await new Promise((resolve, reject) => {
|
|
client[kConnector](
|
|
{
|
|
host,
|
|
hostname,
|
|
protocol,
|
|
port,
|
|
servername: client[kServerName],
|
|
localAddress: client[kLocalAddress],
|
|
},
|
|
(err, socket) => {
|
|
if (err) {
|
|
reject(err);
|
|
} else {
|
|
resolve(socket);
|
|
}
|
|
}
|
|
);
|
|
});
|
|
|
|
if (client.destroyed) {
|
|
util.destroy(socket.on('error', noop), new ClientDestroyedError());
|
|
return;
|
|
}
|
|
|
|
assert(socket);
|
|
|
|
try {
|
|
client[kHTTPContext] =
|
|
socket.alpnProtocol === 'h2' ?
|
|
await connectH2(client, socket)
|
|
: await connectH1(client, socket);
|
|
} catch (err) {
|
|
socket.destroy().on('error', noop);
|
|
throw err;
|
|
}
|
|
|
|
client[kConnecting] = false;
|
|
|
|
socket[kCounter] = 0;
|
|
socket[kMaxRequests] = client[kMaxRequests];
|
|
socket[kClient] = client;
|
|
socket[kError] = null;
|
|
|
|
if (channels.connected.hasSubscribers) {
|
|
channels.connected.publish({
|
|
connectParams: {
|
|
host,
|
|
hostname,
|
|
protocol,
|
|
port,
|
|
version: client[kHTTPContext]?.version,
|
|
servername: client[kServerName],
|
|
localAddress: client[kLocalAddress],
|
|
},
|
|
connector: client[kConnector],
|
|
socket,
|
|
});
|
|
}
|
|
client.emit('connect', client[kUrl], [client]);
|
|
} catch (err) {
|
|
if (client.destroyed) {
|
|
return;
|
|
}
|
|
|
|
client[kConnecting] = false;
|
|
|
|
if (channels.connectError.hasSubscribers) {
|
|
channels.connectError.publish({
|
|
connectParams: {
|
|
host,
|
|
hostname,
|
|
protocol,
|
|
port,
|
|
version: client[kHTTPContext]?.version,
|
|
servername: client[kServerName],
|
|
localAddress: client[kLocalAddress],
|
|
},
|
|
connector: client[kConnector],
|
|
error: err,
|
|
});
|
|
}
|
|
|
|
if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
|
|
assert(client[kRunning] === 0);
|
|
while (
|
|
client[kPending] > 0 &&
|
|
client[kQueue][client[kPendingIdx]].servername === client[kServerName]
|
|
) {
|
|
const request = client[kQueue][client[kPendingIdx]++];
|
|
util.errorRequest(client, request, err);
|
|
}
|
|
} else {
|
|
onError(client, err);
|
|
}
|
|
|
|
client.emit('connectionError', client[kUrl], [client], err);
|
|
}
|
|
|
|
client[kResume]();
|
|
}
|
|
|
|
function emitDrain(client) {
|
|
client[kNeedDrain] = 0;
|
|
client.emit('drain', client[kUrl], [client]);
|
|
}
|
|
|
|
function resume(client, sync) {
|
|
if (client[kResuming] === 2) {
|
|
return;
|
|
}
|
|
|
|
client[kResuming] = 2;
|
|
|
|
_resume(client, sync);
|
|
client[kResuming] = 0;
|
|
|
|
if (client[kRunningIdx] > 256) {
|
|
client[kQueue].splice(0, client[kRunningIdx]);
|
|
client[kPendingIdx] -= client[kRunningIdx];
|
|
client[kRunningIdx] = 0;
|
|
}
|
|
}
|
|
|
|
function _resume(client, sync) {
|
|
while (true) {
|
|
if (client.destroyed) {
|
|
assert(client[kPending] === 0);
|
|
return;
|
|
}
|
|
|
|
if (client[kClosedResolve] && !client[kSize]) {
|
|
client[kClosedResolve]();
|
|
client[kClosedResolve] = null;
|
|
return;
|
|
}
|
|
|
|
if (client[kHTTPContext]) {
|
|
client[kHTTPContext].resume();
|
|
}
|
|
|
|
if (client[kBusy]) {
|
|
client[kNeedDrain] = 2;
|
|
} else if (client[kNeedDrain] === 2) {
|
|
if (sync) {
|
|
client[kNeedDrain] = 1;
|
|
queueMicrotask(() => emitDrain(client));
|
|
} else {
|
|
emitDrain(client);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
if (client[kPending] === 0) {
|
|
return;
|
|
}
|
|
|
|
if (client[kRunning] >= (getPipelining(client) || 1)) {
|
|
return;
|
|
}
|
|
|
|
const request = client[kQueue][client[kPendingIdx]];
|
|
|
|
if (
|
|
client[kUrl].protocol === 'https:' &&
|
|
client[kServerName] !== request.servername
|
|
) {
|
|
if (client[kRunning] > 0) {
|
|
return;
|
|
}
|
|
|
|
client[kServerName] = request.servername;
|
|
client[kHTTPContext]?.destroy(
|
|
new InformationalError('servername changed'),
|
|
() => {
|
|
client[kHTTPContext] = null;
|
|
resume(client);
|
|
}
|
|
);
|
|
}
|
|
|
|
if (client[kConnecting]) {
|
|
return;
|
|
}
|
|
|
|
if (!client[kHTTPContext]) {
|
|
connect(client);
|
|
return;
|
|
}
|
|
|
|
if (client[kHTTPContext].destroyed) {
|
|
return;
|
|
}
|
|
|
|
if (client[kHTTPContext].busy(request)) {
|
|
return;
|
|
}
|
|
|
|
if (!request.aborted && client[kHTTPContext].write(request)) {
|
|
client[kPendingIdx]++;
|
|
} else {
|
|
client[kQueue].splice(client[kPendingIdx], 1);
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = Client;
|