'use strict'; const assert = require('node:assert'); const { pipeline } = require('node:stream'); const util = require('../core/util.js'); const { RequestContentLengthMismatchError, RequestAbortedError, SocketError, InformationalError, } = require('../core/errors.js'); const { kUrl, kReset, kClient, kRunning, kPending, kQueue, kPendingIdx, kRunningIdx, kError, kSocket, kStrictContentLength, kOnError, kMaxConcurrentStreams, kHTTP2Session, kResume, kSize, kHTTPContext, kClosed, kBodyTimeout, } = require('../core/symbols.js'); const { channels } = require('../core/diagnostics.js'); const kOpenStreams = Symbol('open streams'); let extractBody; /** @type {import('http2')} */ let http2; try { http2 = require('node:http2'); } catch { // @ts-ignore http2 = { constants: {} }; } const { constants: { HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, HTTP2_HEADER_SCHEME, HTTP2_HEADER_CONTENT_LENGTH, HTTP2_HEADER_EXPECT, HTTP2_HEADER_STATUS, }, } = http2; function parseH2Headers(headers) { const result = []; for (const [name, value] of Object.entries(headers)) { // h2 may concat the header value by array // e.g. Set-Cookie if (Array.isArray(value)) { for (const subvalue of value) { // we need to provide each header value of header name // because the headers handler expect name-value pair result.push(Buffer.from(name), Buffer.from(subvalue)); } } else { result.push(Buffer.from(name), Buffer.from(value)); } } return result; } async function connectH2(client, socket) { client[kSocket] = socket; const session = http2.connect(client[kUrl], { createConnection: () => socket, peerMaxConcurrentStreams: client[kMaxConcurrentStreams], settings: { // TODO(metcoder95): add support for PUSH enablePush: false, }, }); session[kOpenStreams] = 0; session[kClient] = client; session[kSocket] = socket; session[kHTTP2Session] = null; util.addListener(session, 'error', onHttp2SessionError); util.addListener(session, 'frameError', onHttp2FrameError); util.addListener(session, 'end', onHttp2SessionEnd); util.addListener(session, 'goaway', onHttp2SessionGoAway); util.addListener(session, 'close', onHttp2SessionClose); session.unref(); client[kHTTP2Session] = session; socket[kHTTP2Session] = session; util.addListener(socket, 'error', onHttp2SocketError); util.addListener(socket, 'end', onHttp2SocketEnd); util.addListener(socket, 'close', onHttp2SocketClose); socket[kClosed] = false; socket.on('close', onSocketClose); return { version: 'h2', defaultPipelining: Infinity, write(request) { return writeH2(client, request); }, resume() { resumeH2(client); }, destroy(err, callback) { if (socket[kClosed]) { queueMicrotask(callback); } else { socket.destroy(err).on('close', callback); } }, get destroyed() { return socket.destroyed; }, busy() { return false; }, }; } function resumeH2(client) { const socket = client[kSocket]; if (socket?.destroyed === false) { if (client[kSize] === 0 || client[kMaxConcurrentStreams] === 0) { socket.unref(); client[kHTTP2Session].unref(); } else { socket.ref(); client[kHTTP2Session].ref(); } } } function onHttp2SessionError(err) { assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID'); this[kSocket][kError] = err; this[kClient][kOnError](err); } function onHttp2FrameError(type, code, id) { if (id === 0) { const err = new InformationalError( `HTTP/2: "frameError" received - type ${type}, code ${code}` ); this[kSocket][kError] = err; this[kClient][kOnError](err); } } function onHttp2SessionEnd() { const err = new SocketError( 'other side closed', util.getSocketInfo(this[kSocket]) ); this.destroy(err); util.destroy(this[kSocket], err); } /** * This is the root cause of #3011 * We need to handle GOAWAY frames properly, and trigger the session close * along with the socket right away * * @this {import('http2').ClientHttp2Session} * @param {number} errorCode */ function onHttp2SessionGoAway(errorCode) { // TODO(mcollina): Verify if GOAWAY implements the spec correctly: // https://datatracker.ietf.org/doc/html/rfc7540#section-6.8 // Specifically, we do not verify the "valid" stream id. const err = this[kError] || new SocketError( `HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(this[kSocket]) ); const client = this[kClient]; client[kSocket] = null; client[kHTTPContext] = null; // this is an HTTP2 session this.close(); this[kHTTP2Session] = null; util.destroy(this[kSocket], err); // Fail head of pipeline. if (client[kRunningIdx] < client[kQueue].length) { const request = client[kQueue][client[kRunningIdx]]; client[kQueue][client[kRunningIdx]++] = null; util.errorRequest(client, request, err); client[kPendingIdx] = client[kRunningIdx]; } assert(client[kRunning] === 0); client.emit('disconnect', client[kUrl], [client], err); client[kResume](); } function onHttp2SessionClose() { const { [kClient]: client } = this; const { [kSocket]: socket } = client; const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket)); client[kSocket] = null; client[kHTTPContext] = null; if (client.destroyed) { assert(client[kPending] === 0); // Fail entire queue. const requests = client[kQueue].splice(client[kRunningIdx]); for (let i = 0; i < requests.length; i++) { const request = requests[i]; util.errorRequest(client, request, err); } } } function onHttp2SocketClose() { const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)); const client = this[kHTTP2Session][kClient]; client[kSocket] = null; client[kHTTPContext] = null; if (this[kHTTP2Session] !== null) { this[kHTTP2Session].destroy(err); } client[kPendingIdx] = client[kRunningIdx]; assert(client[kRunning] === 0); client.emit('disconnect', client[kUrl], [client], err); client[kResume](); } function onHttp2SocketError(err) { assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID'); this[kError] = err; this[kClient][kOnError](err); } function onHttp2SocketEnd() { util.destroy( this, new SocketError('other side closed', util.getSocketInfo(this)) ); } function onSocketClose() { this[kClosed] = true; } // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2 function shouldSendContentLength(method) { return ( method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT' ); } function writeH2(client, request) { const requestTimeout = request.bodyTimeout ?? client[kBodyTimeout]; const session = client[kHTTP2Session]; const { method, path, host, upgrade, expectContinue, signal, headers: reqHeaders, } = request; let { body } = request; if (upgrade) { util.errorRequest( client, request, new Error('Upgrade not supported for H2') ); return false; } const headers = {}; for (let n = 0; n < reqHeaders.length; n += 2) { const key = reqHeaders[n + 0]; const val = reqHeaders[n + 1]; if (Array.isArray(val)) { for (let i = 0; i < val.length; i++) { if (headers[key]) { headers[key] += `,${val[i]}`; } else { headers[key] = val[i]; } } } else { headers[key] = val; } } /** @type {import('node:http2').ClientHttp2Stream} */ let stream = null; const { hostname, port } = client[kUrl]; headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`; headers[HTTP2_HEADER_METHOD] = method; const abort = (err) => { if (request.aborted || request.completed) { return; } err = err || new RequestAbortedError(); util.errorRequest(client, request, err); if (stream != null) { // Some chunks might still come after abort, // let's ignore them stream.removeAllListeners('data'); // On Abort, we close the stream to send RST_STREAM frame stream.close(); // We move the running index to the next request client[kOnError](err); client[kResume](); } // We do not destroy the socket as we can continue using the session // the stream gets destroyed and the session remains to create new streams util.destroy(body, err); }; try { // We are already connected, streams are pending. // We can call on connect, and wait for abort request.onConnect(abort); } catch (err) { util.errorRequest(client, request, err); } if (request.aborted) { return false; } if (method === 'CONNECT') { session.ref(); // We are already connected, streams are pending, first request // will create a new stream. We trigger a request to create the stream and wait until // `ready` event is triggered // We disabled endStream to allow the user to write to the stream stream = session.request(headers, { endStream: false, signal }); if (!stream.pending) { request.onUpgrade(null, null, stream); ++session[kOpenStreams]; client[kQueue][client[kRunningIdx]++] = null; } else { stream.once('ready', () => { request.onUpgrade(null, null, stream); ++session[kOpenStreams]; client[kQueue][client[kRunningIdx]++] = null; }); } stream.once('close', () => { session[kOpenStreams] -= 1; if (session[kOpenStreams] === 0) session.unref(); }); stream.setTimeout(requestTimeout); return true; } // https://tools.ietf.org/html/rfc7540#section-8.3 // :path and :scheme headers must be omitted when sending CONNECT headers[HTTP2_HEADER_PATH] = path; headers[HTTP2_HEADER_SCHEME] = 'https'; // https://tools.ietf.org/html/rfc7231#section-4.3.1 // https://tools.ietf.org/html/rfc7231#section-4.3.2 // https://tools.ietf.org/html/rfc7231#section-4.3.5 // Sending a payload body on a request that does not // expect it can cause undefined behavior on some // servers and corrupt connection state. Do not // re-use the connection for further requests. const expectsPayload = method === 'PUT' || method === 'POST' || method === 'PATCH'; if (body && typeof body.read === 'function') { // Try to read EOF in order to get length. body.read(0); } let contentLength = util.bodyLength(body); if (util.isFormDataLike(body)) { extractBody ??= require('../web/fetch/body.js').extractBody; const [bodyStream, contentType] = extractBody(body); headers['content-type'] = contentType; body = bodyStream.stream; contentLength = bodyStream.length; } if (contentLength == null) { contentLength = request.contentLength; } if (contentLength === 0 || !expectsPayload) { // https://tools.ietf.org/html/rfc7230#section-3.3.2 // A user agent SHOULD NOT send a Content-Length header field when // the request message does not contain a payload body and the method // semantics do not anticipate such a body. contentLength = null; } // https://github.com/nodejs/undici/issues/2046 // A user agent may send a Content-Length header with 0 value, this should be allowed. if ( shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength ) { if (client[kStrictContentLength]) { util.errorRequest( client, request, new RequestContentLengthMismatchError() ); return false; } process.emitWarning(new RequestContentLengthMismatchError()); } if (contentLength != null) { assert(body, 'no body must not have content length'); headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`; } session.ref(); if (channels.sendHeaders.hasSubscribers) { let header = ''; for (const key in headers) { header += `${key}: ${headers[key]}\r\n`; } channels.sendHeaders.publish({ request, headers: header, socket: session[kSocket], }); } // TODO(metcoder95): add support for sending trailers const shouldEndStream = method === 'GET' || method === 'HEAD' || body === null; if (expectContinue) { headers[HTTP2_HEADER_EXPECT] = '100-continue'; stream = session.request(headers, { endStream: shouldEndStream, signal }); stream.once('continue', writeBodyH2); } else { stream = session.request(headers, { endStream: shouldEndStream, signal, }); writeBodyH2(); } // Increment counter as we have new streams open ++session[kOpenStreams]; stream.setTimeout(requestTimeout); stream.once('response', (headers) => { const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers; request.onResponseStarted(); // Due to the stream nature, it is possible we face a race condition // where the stream has been assigned, but the request has been aborted // the request remains in-flight and headers hasn't been received yet // for those scenarios, best effort is to destroy the stream immediately // as there's no value to keep it open. if (request.aborted) { stream.removeAllListeners('data'); return; } if ( request.onHeaders( Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '' ) === false ) { stream.pause(); } }); stream.on('data', (chunk) => { if (request.onData(chunk) === false) { stream.pause(); } }); stream.once('end', (err) => { stream.removeAllListeners('data'); // When state is null, it means we haven't consumed body and the stream still do not have // a state. // Present specially when using pipeline or stream if (stream.state?.state == null || stream.state.state < 6) { // Do not complete the request if it was aborted // Not prone to happen for as safety net to avoid race conditions with 'trailers' if (!request.aborted && !request.completed) { request.onComplete({}); } client[kQueue][client[kRunningIdx]++] = null; client[kResume](); } else { // Stream is closed or half-closed-remote (6), decrement counter and cleanup // It does not have sense to continue working with the stream as we do not // have yet RST_STREAM support on client-side --session[kOpenStreams]; if (session[kOpenStreams] === 0) { session.unref(); } abort( err ?? new InformationalError('HTTP/2: stream half-closed (remote)') ); client[kQueue][client[kRunningIdx]++] = null; client[kPendingIdx] = client[kRunningIdx]; client[kResume](); } }); stream.once('close', () => { stream.removeAllListeners('data'); session[kOpenStreams] -= 1; if (session[kOpenStreams] === 0) { session.unref(); } }); stream.once('error', function (err) { stream.removeAllListeners('data'); abort(err); }); stream.once('frameError', (type, code) => { stream.removeAllListeners('data'); abort( new InformationalError( `HTTP/2: "frameError" received - type ${type}, code ${code}` ) ); }); stream.on('aborted', () => { stream.removeAllListeners('data'); }); stream.on('timeout', () => { const err = new InformationalError( `HTTP/2: "stream timeout after ${requestTimeout}"` ); stream.removeAllListeners('data'); session[kOpenStreams] -= 1; if (session[kOpenStreams] === 0) { session.unref(); } abort(err); }); stream.once('trailers', (trailers) => { if (request.aborted || request.completed) { return; } request.onComplete(trailers); }); return true; function writeBodyH2() { /* istanbul ignore else: assertion */ if (!body || contentLength === 0) { writeBuffer( abort, stream, null, client, request, client[kSocket], contentLength, expectsPayload ); } else if (util.isBuffer(body)) { writeBuffer( abort, stream, body, client, request, client[kSocket], contentLength, expectsPayload ); } else if (util.isBlobLike(body)) { if (typeof body.stream === 'function') { writeIterable( abort, stream, body.stream(), client, request, client[kSocket], contentLength, expectsPayload ); } else { writeBlob( abort, stream, body, client, request, client[kSocket], contentLength, expectsPayload ); } } else if (util.isStream(body)) { writeStream( abort, client[kSocket], expectsPayload, stream, body, client, request, contentLength ); } else if (util.isIterable(body)) { writeIterable( abort, stream, body, client, request, client[kSocket], contentLength, expectsPayload ); } else { assert(false); } } } function writeBuffer( abort, h2stream, body, client, request, socket, contentLength, expectsPayload ) { try { if (body != null && util.isBuffer(body)) { assert( contentLength === body.byteLength, 'buffer body must have content length' ); h2stream.cork(); h2stream.write(body); h2stream.uncork(); h2stream.end(); request.onBodySent(body); } if (!expectsPayload) { socket[kReset] = true; } request.onRequestSent(); client[kResume](); } catch (error) { abort(error); } } function writeStream( abort, socket, expectsPayload, h2stream, body, client, request, contentLength ) { assert( contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined' ); // For HTTP/2, is enough to pipe the stream const pipe = pipeline(body, h2stream, (err) => { if (err) { util.destroy(pipe, err); abort(err); } else { util.removeAllListeners(pipe); request.onRequestSent(); if (!expectsPayload) { socket[kReset] = true; } client[kResume](); } }); util.addListener(pipe, 'data', onPipeData); function onPipeData(chunk) { request.onBodySent(chunk); } } async function writeBlob( abort, h2stream, body, client, request, socket, contentLength, expectsPayload ) { assert(contentLength === body.size, 'blob body must have content length'); try { if (contentLength != null && contentLength !== body.size) { throw new RequestContentLengthMismatchError(); } const buffer = Buffer.from(await body.arrayBuffer()); h2stream.cork(); h2stream.write(buffer); h2stream.uncork(); h2stream.end(); request.onBodySent(buffer); request.onRequestSent(); if (!expectsPayload) { socket[kReset] = true; } client[kResume](); } catch (err) { abort(err); } } async function writeIterable( abort, h2stream, body, client, request, socket, contentLength, expectsPayload ) { assert( contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined' ); let callback = null; function onDrain() { if (callback) { const cb = callback; callback = null; cb(); } } const waitForDrain = () => new Promise((resolve, reject) => { assert(callback === null); if (socket[kError]) { reject(socket[kError]); } else { callback = resolve; } }); h2stream.on('close', onDrain).on('drain', onDrain); try { // It's up to the user to somehow abort the async iterable. for await (const chunk of body) { if (socket[kError]) { throw socket[kError]; } const res = h2stream.write(chunk); request.onBodySent(chunk); if (!res) { await waitForDrain(); } } h2stream.end(); request.onRequestSent(); if (!expectsPayload) { socket[kReset] = true; } client[kResume](); } catch (err) { abort(err); } finally { h2stream.off('close', onDrain).off('drain', onDrain); } } module.exports = connectH2;