548 lines
18 KiB
JavaScript
548 lines
18 KiB
JavaScript
'use strict';
|
||
|
||
const {
|
||
createDeferredPromise,
|
||
environmentSettingsObject,
|
||
} = require('../../fetch/util');
|
||
const { states, opcodes, sentCloseFrameState } = require('../constants');
|
||
const { webidl } = require('../../fetch/webidl');
|
||
const {
|
||
getURLRecord,
|
||
isValidSubprotocol,
|
||
isEstablished,
|
||
utf8Decode,
|
||
} = require('../util');
|
||
const {
|
||
establishWebSocketConnection,
|
||
failWebsocketConnection,
|
||
closeWebSocketConnection,
|
||
} = require('../connection');
|
||
const { types } = require('node:util');
|
||
const { channels } = require('../../../core/diagnostics');
|
||
const { WebsocketFrameSend } = require('../frame');
|
||
const { ByteParser } = require('../receiver');
|
||
const {
|
||
WebSocketError,
|
||
createUnvalidatedWebSocketError,
|
||
} = require('./websocketerror');
|
||
const { utf8DecodeBytes } = require('../../fetch/util');
|
||
const { kEnumerableProperty } = require('../../../core/util');
|
||
|
||
let emittedExperimentalWarning = false;
|
||
|
||
class WebSocketStream {
|
||
// Each WebSocketStream object has an associated url , which is a URL record .
|
||
/** @type {URL} */
|
||
#url;
|
||
|
||
// Each WebSocketStream object has an associated opened promise , which is a promise.
|
||
/** @type {ReturnType<typeof createDeferredPromise>} */
|
||
#openedPromise;
|
||
|
||
// Each WebSocketStream object has an associated closed promise , which is a promise.
|
||
/** @type {ReturnType<typeof createDeferredPromise>} */
|
||
#closedPromise;
|
||
|
||
// Each WebSocketStream object has an associated readable stream , which is a ReadableStream .
|
||
/** @type {ReadableStream} */
|
||
#readableStream;
|
||
/** @type {ReadableStreamDefaultController} */
|
||
#readableStreamController;
|
||
|
||
// Each WebSocketStream object has an associated writable stream , which is a WritableStream .
|
||
/** @type {WritableStream} */
|
||
#writableStream;
|
||
|
||
// Each WebSocketStream object has an associated boolean handshake aborted , which is initially false.
|
||
#handshakeAborted = false;
|
||
|
||
/** @type {import('../websocket').Handler} */
|
||
#handler = {
|
||
// https://whatpr.org/websockets/48/7b748d3...d5570f3.html#feedback-to-websocket-stream-from-the-protocol
|
||
onConnectionEstablished: (response, extensions) =>
|
||
this.#onConnectionEstablished(response, extensions),
|
||
onFail: (_code, _reason) => {},
|
||
onMessage: (opcode, data) => this.#onMessage(opcode, data),
|
||
onParserError: (err) =>
|
||
failWebsocketConnection(this.#handler, null, err.message),
|
||
onParserDrain: () => this.#handler.socket.resume(),
|
||
onSocketData: (chunk) => {
|
||
if (!this.#parser.write(chunk)) {
|
||
this.#handler.socket.pause();
|
||
}
|
||
},
|
||
onSocketError: (err) => {
|
||
this.#handler.readyState = states.CLOSING;
|
||
|
||
if (channels.socketError.hasSubscribers) {
|
||
channels.socketError.publish(err);
|
||
}
|
||
|
||
this.#handler.socket.destroy();
|
||
},
|
||
onSocketClose: () => this.#onSocketClose(),
|
||
|
||
readyState: states.CONNECTING,
|
||
socket: null,
|
||
closeState: new Set(),
|
||
controller: null,
|
||
wasEverConnected: false,
|
||
};
|
||
|
||
/** @type {import('../receiver').ByteParser} */
|
||
#parser;
|
||
|
||
constructor(url, options = undefined) {
|
||
if (!emittedExperimentalWarning) {
|
||
process.emitWarning(
|
||
'WebSocketStream is experimental! Expect it to change at any time.',
|
||
{
|
||
code: 'UNDICI-WSS',
|
||
}
|
||
);
|
||
emittedExperimentalWarning = true;
|
||
}
|
||
|
||
webidl.argumentLengthCheck(arguments, 1, 'WebSocket');
|
||
|
||
url = webidl.converters.USVString(url);
|
||
if (options !== null) {
|
||
options = webidl.converters.WebSocketStreamOptions(options);
|
||
}
|
||
|
||
// 1. Let baseURL be this 's relevant settings object 's API base URL .
|
||
const baseURL = environmentSettingsObject.settingsObject.baseUrl;
|
||
|
||
// 2. Let urlRecord be the result of getting a URL record given url and baseURL .
|
||
const urlRecord = getURLRecord(url, baseURL);
|
||
|
||
// 3. Let protocols be options [" protocols "] if it exists , otherwise an empty sequence.
|
||
const protocols = options.protocols;
|
||
|
||
// 4. If any of the values in protocols occur more than once or otherwise fail to match the requirements for elements that comprise the value of ` Sec-WebSocket-Protocol ` fields as defined by The WebSocket Protocol , then throw a " SyntaxError " DOMException . [WSP]
|
||
if (
|
||
protocols.length !== new Set(protocols.map((p) => p.toLowerCase())).size
|
||
) {
|
||
throw new DOMException(
|
||
'Invalid Sec-WebSocket-Protocol value',
|
||
'SyntaxError'
|
||
);
|
||
}
|
||
|
||
if (
|
||
protocols.length > 0 &&
|
||
!protocols.every((p) => isValidSubprotocol(p))
|
||
) {
|
||
throw new DOMException(
|
||
'Invalid Sec-WebSocket-Protocol value',
|
||
'SyntaxError'
|
||
);
|
||
}
|
||
|
||
// 5. Set this 's url to urlRecord .
|
||
this.#url = urlRecord.toString();
|
||
|
||
// 6. Set this 's opened promise and closed promise to new promises.
|
||
this.#openedPromise = createDeferredPromise();
|
||
this.#closedPromise = createDeferredPromise();
|
||
|
||
// 7. Apply backpressure to the WebSocket.
|
||
// TODO
|
||
|
||
// 8. If options [" signal "] exists ,
|
||
if (options.signal != null) {
|
||
// 8.1. Let signal be options [" signal "].
|
||
const signal = options.signal;
|
||
|
||
// 8.2. If signal is aborted , then reject this 's opened promise and closed promise with signal ’s abort reason
|
||
// and return.
|
||
if (signal.aborted) {
|
||
this.#openedPromise.reject(signal.reason);
|
||
this.#closedPromise.reject(signal.reason);
|
||
return;
|
||
}
|
||
|
||
// 8.3. Add the following abort steps to signal :
|
||
signal.addEventListener(
|
||
'abort',
|
||
() => {
|
||
// 8.3.1. If the WebSocket connection is not yet established : [WSP]
|
||
if (!isEstablished(this.#handler.readyState)) {
|
||
// 8.3.1.1. Fail the WebSocket connection .
|
||
failWebsocketConnection(this.#handler);
|
||
|
||
// Set this 's ready state to CLOSING .
|
||
this.#handler.readyState = states.CLOSING;
|
||
|
||
// Reject this 's opened promise and closed promise with signal ’s abort reason .
|
||
this.#openedPromise.reject(signal.reason);
|
||
this.#closedPromise.reject(signal.reason);
|
||
|
||
// Set this 's handshake aborted to true.
|
||
this.#handshakeAborted = true;
|
||
}
|
||
},
|
||
{ once: true }
|
||
);
|
||
}
|
||
|
||
// 9. Let client be this 's relevant settings object .
|
||
const client = environmentSettingsObject.settingsObject;
|
||
|
||
// 10. Run this step in parallel :
|
||
// 10.1. Establish a WebSocket connection given urlRecord , protocols , and client . [FETCH]
|
||
this.#handler.controller = establishWebSocketConnection(
|
||
urlRecord,
|
||
protocols,
|
||
client,
|
||
this.#handler,
|
||
options
|
||
);
|
||
}
|
||
|
||
// The url getter steps are to return this 's url , serialized .
|
||
get url() {
|
||
return this.#url.toString();
|
||
}
|
||
|
||
// The opened getter steps are to return this 's opened promise .
|
||
get opened() {
|
||
return this.#openedPromise.promise;
|
||
}
|
||
|
||
// The closed getter steps are to return this 's closed promise .
|
||
get closed() {
|
||
return this.#closedPromise.promise;
|
||
}
|
||
|
||
// The close( closeInfo ) method steps are:
|
||
close(closeInfo = undefined) {
|
||
if (closeInfo !== null) {
|
||
closeInfo = webidl.converters.WebSocketCloseInfo(closeInfo);
|
||
}
|
||
|
||
// 1. Let code be closeInfo [" closeCode "] if present, or null otherwise.
|
||
const code = closeInfo.closeCode ?? null;
|
||
|
||
// 2. Let reason be closeInfo [" reason "].
|
||
const reason = closeInfo.reason;
|
||
|
||
// 3. Close the WebSocket with this , code , and reason .
|
||
closeWebSocketConnection(this.#handler, code, reason, true);
|
||
}
|
||
|
||
#write(chunk) {
|
||
// 1. Let promise be a new promise created in stream ’s relevant realm .
|
||
const promise = createDeferredPromise();
|
||
|
||
// 2. Let data be null.
|
||
let data = null;
|
||
|
||
// 3. Let opcode be null.
|
||
let opcode = null;
|
||
|
||
// 4. If chunk is a BufferSource ,
|
||
if (ArrayBuffer.isView(chunk) || types.isArrayBuffer(chunk)) {
|
||
// 4.1. Set data to a copy of the bytes given chunk .
|
||
data = new Uint8Array(
|
||
ArrayBuffer.isView(chunk) ?
|
||
new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength)
|
||
: chunk
|
||
);
|
||
|
||
// 4.2. Set opcode to a binary frame opcode.
|
||
opcode = opcodes.BINARY;
|
||
} else {
|
||
// 5. Otherwise,
|
||
|
||
// 5.1. Let string be the result of converting chunk to an IDL USVString .
|
||
// If this throws an exception, return a promise rejected with the exception.
|
||
let string;
|
||
|
||
try {
|
||
string = webidl.converters.DOMString(chunk);
|
||
} catch (e) {
|
||
promise.reject(e);
|
||
return;
|
||
}
|
||
|
||
// 5.2. Set data to the result of UTF-8 encoding string .
|
||
data = new TextEncoder().encode(string);
|
||
|
||
// 5.3. Set opcode to a text frame opcode.
|
||
opcode = opcodes.TEXT;
|
||
}
|
||
|
||
// 6. In parallel,
|
||
// 6.1. Wait until there is sufficient buffer space in stream to send the message.
|
||
|
||
// 6.2. If the closing handshake has not yet started , Send a WebSocket Message to stream comprised of data using opcode .
|
||
if (
|
||
!this.#handler.closeState.has(sentCloseFrameState.SENT) &&
|
||
!this.#handler.closeState.has(sentCloseFrameState.RECEIVED)
|
||
) {
|
||
const frame = new WebsocketFrameSend(data);
|
||
|
||
this.#handler.socket.write(frame.createFrame(opcode), () => {
|
||
promise.resolve(undefined);
|
||
});
|
||
}
|
||
|
||
// 6.3. Queue a global task on the WebSocket task source given stream ’s relevant global object to resolve promise with undefined.
|
||
return promise;
|
||
}
|
||
|
||
/** @type {import('../websocket').Handler['onConnectionEstablished']} */
|
||
#onConnectionEstablished(response, parsedExtensions) {
|
||
this.#handler.socket = response.socket;
|
||
|
||
const parser = new ByteParser(this.#handler, parsedExtensions);
|
||
parser.on('drain', () => this.#handler.onParserDrain());
|
||
parser.on('error', (err) => this.#handler.onParserError(err));
|
||
|
||
this.#parser = parser;
|
||
|
||
// 1. Change stream ’s ready state to OPEN (1).
|
||
this.#handler.readyState = states.OPEN;
|
||
|
||
// 2. Set stream ’s was ever connected to true.
|
||
// This is done in the opening handshake.
|
||
|
||
// 3. Let extensions be the extensions in use .
|
||
const extensions = parsedExtensions ?? '';
|
||
|
||
// 4. Let protocol be the subprotocol in use .
|
||
const protocol = response.headersList.get('sec-websocket-protocol') ?? '';
|
||
|
||
// 5. Let pullAlgorithm be an action that pulls bytes from stream .
|
||
// 6. Let cancelAlgorithm be an action that cancels stream with reason , given reason .
|
||
// 7. Let readable be a new ReadableStream .
|
||
// 8. Set up readable with pullAlgorithm and cancelAlgorithm .
|
||
const readable = new ReadableStream({
|
||
start: (controller) => {
|
||
this.#readableStreamController = controller;
|
||
},
|
||
pull(controller) {
|
||
let chunk;
|
||
while (
|
||
controller.desiredSize > 0 &&
|
||
(chunk = response.socket.read()) !== null
|
||
) {
|
||
controller.enqueue(chunk);
|
||
}
|
||
},
|
||
cancel: (reason) => this.#cancel(reason),
|
||
});
|
||
|
||
// 9. Let writeAlgorithm be an action that writes chunk to stream , given chunk .
|
||
// 10. Let closeAlgorithm be an action that closes stream .
|
||
// 11. Let abortAlgorithm be an action that aborts stream with reason , given reason .
|
||
// 12. Let writable be a new WritableStream .
|
||
// 13. Set up writable with writeAlgorithm , closeAlgorithm , and abortAlgorithm .
|
||
const writable = new WritableStream({
|
||
write: (chunk) => this.#write(chunk),
|
||
close: () => closeWebSocketConnection(this.#handler, null, null),
|
||
abort: (reason) => this.#closeUsingReason(reason),
|
||
});
|
||
|
||
// Set stream ’s readable stream to readable .
|
||
this.#readableStream = readable;
|
||
|
||
// Set stream ’s writable stream to writable .
|
||
this.#writableStream = writable;
|
||
|
||
// Resolve stream ’s opened promise with WebSocketOpenInfo «[ " extensions " → extensions , " protocol " → protocol , " readable " → readable , " writable " → writable ]».
|
||
this.#openedPromise.resolve({
|
||
extensions,
|
||
protocol,
|
||
readable,
|
||
writable,
|
||
});
|
||
}
|
||
|
||
/** @type {import('../websocket').Handler['onMessage']} */
|
||
#onMessage(type, data) {
|
||
// 1. If stream’s ready state is not OPEN (1), then return.
|
||
if (this.#handler.readyState !== states.OPEN) {
|
||
return;
|
||
}
|
||
|
||
// 2. Let chunk be determined by switching on type:
|
||
// - type indicates that the data is Text
|
||
// a new DOMString containing data
|
||
// - type indicates that the data is Binary
|
||
// a new Uint8Array object, created in the relevant Realm of the
|
||
// WebSocketStream object, whose contents are data
|
||
let chunk;
|
||
|
||
if (type === opcodes.TEXT) {
|
||
try {
|
||
chunk = utf8Decode(data);
|
||
} catch {
|
||
failWebsocketConnection(
|
||
this.#handler,
|
||
'Received invalid UTF-8 in text frame.'
|
||
);
|
||
return;
|
||
}
|
||
} else if (type === opcodes.BINARY) {
|
||
chunk = new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
|
||
}
|
||
|
||
// 3. Enqueue chunk into stream’s readable stream.
|
||
this.#readableStreamController.enqueue(chunk);
|
||
|
||
// 4. Apply backpressure to the WebSocket.
|
||
}
|
||
|
||
/** @type {import('../websocket').Handler['onSocketClose']} */
|
||
#onSocketClose() {
|
||
const wasClean =
|
||
this.#handler.closeState.has(sentCloseFrameState.SENT) &&
|
||
this.#handler.closeState.has(sentCloseFrameState.RECEIVED);
|
||
|
||
// 1. Change the ready state to CLOSED (3).
|
||
this.#handler.readyState = states.CLOSED;
|
||
|
||
// 2. If stream ’s handshake aborted is true, then return.
|
||
if (this.#handshakeAborted) {
|
||
return;
|
||
}
|
||
|
||
// 3. If stream ’s was ever connected is false, then reject stream ’s opened promise with a new WebSocketError.
|
||
if (!this.#handler.wasEverConnected) {
|
||
this.#openedPromise.reject(new WebSocketError('Socket never opened'));
|
||
}
|
||
|
||
const result = this.#parser.closingInfo;
|
||
|
||
// 4. Let code be the WebSocket connection close code .
|
||
// https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
|
||
// If this Close control frame contains no status code, _The WebSocket
|
||
// Connection Close Code_ is considered to be 1005. If _The WebSocket
|
||
// Connection is Closed_ and no Close control frame was received by the
|
||
// endpoint (such as could occur if the underlying transport connection
|
||
// is lost), _The WebSocket Connection Close Code_ is considered to be
|
||
// 1006.
|
||
let code = result?.code ?? 1005;
|
||
|
||
if (
|
||
!this.#handler.closeState.has(sentCloseFrameState.SENT) &&
|
||
!this.#handler.closeState.has(sentCloseFrameState.RECEIVED)
|
||
) {
|
||
code = 1006;
|
||
}
|
||
|
||
// 5. Let reason be the result of applying UTF-8 decode without BOM to the WebSocket connection close reason .
|
||
const reason =
|
||
result?.reason == null ? '' : utf8DecodeBytes(Buffer.from(result.reason));
|
||
|
||
// 6. If the connection was closed cleanly ,
|
||
if (wasClean) {
|
||
// 6.1. Close stream ’s readable stream .
|
||
this.#readableStream.cancel().catch(() => {});
|
||
|
||
// 6.2. Error stream ’s writable stream with an " InvalidStateError " DOMException indicating that a closed WebSocketStream cannot be written to.
|
||
if (!this.#writableStream.locked) {
|
||
this.#writableStream.abort(
|
||
new DOMException(
|
||
'A closed WebSocketStream cannot be written to',
|
||
'InvalidStateError'
|
||
)
|
||
);
|
||
}
|
||
|
||
// 6.3. Resolve stream ’s closed promise with WebSocketCloseInfo «[ " closeCode " → code , " reason " → reason ]».
|
||
this.#closedPromise.resolve({
|
||
closeCode: code,
|
||
reason,
|
||
});
|
||
} else {
|
||
// 7. Otherwise,
|
||
|
||
// 7.1. Let error be a new WebSocketError whose closeCode is code and reason is reason .
|
||
const error = createUnvalidatedWebSocketError(
|
||
'unclean close',
|
||
code,
|
||
reason
|
||
);
|
||
|
||
// 7.2. Error stream ’s readable stream with error .
|
||
this.#readableStreamController.error(error);
|
||
|
||
// 7.3. Error stream ’s writable stream with error .
|
||
this.#writableStream.abort(error);
|
||
|
||
// 7.4. Reject stream ’s closed promise with error .
|
||
this.#closedPromise.reject(error);
|
||
}
|
||
}
|
||
|
||
#closeUsingReason(reason) {
|
||
// 1. Let code be null.
|
||
let code = null;
|
||
|
||
// 2. Let reasonString be the empty string.
|
||
let reasonString = '';
|
||
|
||
// 3. If reason implements WebSocketError ,
|
||
if (webidl.is.WebSocketError(reason)) {
|
||
// 3.1. Set code to reason ’s closeCode .
|
||
code = reason.closeCode;
|
||
|
||
// 3.2. Set reasonString to reason ’s reason .
|
||
reasonString = reason.reason;
|
||
}
|
||
|
||
// 4. Close the WebSocket with stream , code , and reasonString . If this throws an exception,
|
||
// discard code and reasonString and close the WebSocket with stream .
|
||
closeWebSocketConnection(this.#handler, code, reasonString);
|
||
}
|
||
|
||
// To cancel a WebSocketStream stream given reason , close using reason giving stream and reason .
|
||
#cancel(reason) {
|
||
this.#closeUsingReason(reason);
|
||
}
|
||
}
|
||
|
||
Object.defineProperties(WebSocketStream.prototype, {
|
||
url: kEnumerableProperty,
|
||
opened: kEnumerableProperty,
|
||
closed: kEnumerableProperty,
|
||
close: kEnumerableProperty,
|
||
[Symbol.toStringTag]: {
|
||
value: 'WebSocketStream',
|
||
writable: false,
|
||
enumerable: false,
|
||
configurable: true,
|
||
},
|
||
});
|
||
|
||
webidl.converters.WebSocketStreamOptions = webidl.dictionaryConverter([
|
||
{
|
||
key: 'protocols',
|
||
converter: webidl.sequenceConverter(webidl.converters.USVString),
|
||
defaultValue: () => [],
|
||
},
|
||
{
|
||
key: 'signal',
|
||
converter: webidl.nullableConverter(webidl.converters.AbortSignal),
|
||
defaultValue: () => null,
|
||
},
|
||
]);
|
||
|
||
webidl.converters.WebSocketCloseInfo = webidl.dictionaryConverter([
|
||
{
|
||
key: 'closeCode',
|
||
converter: (V) =>
|
||
webidl.converters['unsigned short'](V, { enforceRange: true }),
|
||
},
|
||
{
|
||
key: 'reason',
|
||
converter: webidl.converters.USVString,
|
||
defaultValue: () => '',
|
||
},
|
||
]);
|
||
|
||
module.exports = { WebSocketStream };
|