525 lines
15 KiB
JavaScript
525 lines
15 KiB
JavaScript
'use strict';
|
|
|
|
const { Writable } = require('node:stream');
|
|
const assert = require('node:assert');
|
|
const {
|
|
parserStates,
|
|
opcodes,
|
|
states,
|
|
emptyBuffer,
|
|
sentCloseFrameState,
|
|
} = require('./constants');
|
|
const { channels } = require('../../core/diagnostics');
|
|
const {
|
|
isValidStatusCode,
|
|
isValidOpcode,
|
|
websocketMessageReceived,
|
|
utf8Decode,
|
|
isControlFrame,
|
|
isTextBinaryFrame,
|
|
isContinuationFrame,
|
|
} = require('./util');
|
|
const { failWebsocketConnection } = require('./connection');
|
|
const { WebsocketFrameSend } = require('./frame');
|
|
const { PerMessageDeflate } = require('./permessage-deflate');
|
|
|
|
// This code was influenced by ws released under the MIT license.
|
|
// Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com>
|
|
// Copyright (c) 2013 Arnout Kazemier and contributors
|
|
// Copyright (c) 2016 Luigi Pinca and contributors
|
|
|
|
class ByteParser extends Writable {
|
|
#buffers = [];
|
|
#fragmentsBytes = 0;
|
|
#byteOffset = 0;
|
|
#loop = false;
|
|
|
|
#state = parserStates.INFO;
|
|
|
|
#info = {};
|
|
#fragments = [];
|
|
|
|
/** @type {Map<string, PerMessageDeflate>} */
|
|
#extensions;
|
|
|
|
/** @type {import('./websocket').Handler} */
|
|
#handler;
|
|
|
|
constructor(handler, extensions) {
|
|
super();
|
|
|
|
this.#handler = handler;
|
|
this.#extensions = extensions == null ? new Map() : extensions;
|
|
|
|
if (this.#extensions.has('permessage-deflate')) {
|
|
this.#extensions.set(
|
|
'permessage-deflate',
|
|
new PerMessageDeflate(extensions)
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param {Buffer} chunk
|
|
* @param {() => void} callback
|
|
*/
|
|
_write(chunk, _, callback) {
|
|
this.#buffers.push(chunk);
|
|
this.#byteOffset += chunk.length;
|
|
this.#loop = true;
|
|
|
|
this.run(callback);
|
|
}
|
|
|
|
/**
|
|
* Runs whenever a new chunk is received.
|
|
* Callback is called whenever there are no more chunks buffering,
|
|
* or not enough bytes are buffered to parse.
|
|
*/
|
|
run(callback) {
|
|
while (this.#loop) {
|
|
if (this.#state === parserStates.INFO) {
|
|
// If there aren't enough bytes to parse the payload length, etc.
|
|
if (this.#byteOffset < 2) {
|
|
return callback();
|
|
}
|
|
|
|
const buffer = this.consume(2);
|
|
const fin = (buffer[0] & 0x80) !== 0;
|
|
const opcode = buffer[0] & 0x0f;
|
|
const masked = (buffer[1] & 0x80) === 0x80;
|
|
|
|
const fragmented = !fin && opcode !== opcodes.CONTINUATION;
|
|
const payloadLength = buffer[1] & 0x7f;
|
|
|
|
const rsv1 = buffer[0] & 0x40;
|
|
const rsv2 = buffer[0] & 0x20;
|
|
const rsv3 = buffer[0] & 0x10;
|
|
|
|
if (!isValidOpcode(opcode)) {
|
|
failWebsocketConnection(
|
|
this.#handler,
|
|
1002,
|
|
'Invalid opcode received'
|
|
);
|
|
return callback();
|
|
}
|
|
|
|
if (masked) {
|
|
failWebsocketConnection(
|
|
this.#handler,
|
|
1002,
|
|
'Frame cannot be masked'
|
|
);
|
|
return callback();
|
|
}
|
|
|
|
// MUST be 0 unless an extension is negotiated that defines meanings
|
|
// for non-zero values. If a nonzero value is received and none of
|
|
// the negotiated extensions defines the meaning of such a nonzero
|
|
// value, the receiving endpoint MUST _Fail the WebSocket
|
|
// Connection_.
|
|
// This document allocates the RSV1 bit of the WebSocket header for
|
|
// PMCEs and calls the bit the "Per-Message Compressed" bit. On a
|
|
// WebSocket connection where a PMCE is in use, this bit indicates
|
|
// whether a message is compressed or not.
|
|
if (rsv1 !== 0 && !this.#extensions.has('permessage-deflate')) {
|
|
failWebsocketConnection(
|
|
this.#handler,
|
|
1002,
|
|
'Expected RSV1 to be clear.'
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (rsv2 !== 0 || rsv3 !== 0) {
|
|
failWebsocketConnection(
|
|
this.#handler,
|
|
1002,
|
|
'RSV1, RSV2, RSV3 must be clear'
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (fragmented && !isTextBinaryFrame(opcode)) {
|
|
// Only text and binary frames can be fragmented
|
|
failWebsocketConnection(
|
|
this.#handler,
|
|
1002,
|
|
'Invalid frame type was fragmented.'
|
|
);
|
|
return;
|
|
}
|
|
|
|
// If we are already parsing a text/binary frame and do not receive either
|
|
// a continuation frame or close frame, fail the connection.
|
|
if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) {
|
|
failWebsocketConnection(
|
|
this.#handler,
|
|
1002,
|
|
'Expected continuation frame'
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (this.#info.fragmented && fragmented) {
|
|
// A fragmented frame can't be fragmented itself
|
|
failWebsocketConnection(
|
|
this.#handler,
|
|
1002,
|
|
'Fragmented frame exceeded 125 bytes.'
|
|
);
|
|
return;
|
|
}
|
|
|
|
// "All control frames MUST have a payload length of 125 bytes or less
|
|
// and MUST NOT be fragmented."
|
|
if ((payloadLength > 125 || fragmented) && isControlFrame(opcode)) {
|
|
failWebsocketConnection(
|
|
this.#handler,
|
|
1002,
|
|
'Control frame either too large or fragmented'
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (
|
|
isContinuationFrame(opcode) &&
|
|
this.#fragments.length === 0 &&
|
|
!this.#info.compressed
|
|
) {
|
|
failWebsocketConnection(
|
|
this.#handler,
|
|
1002,
|
|
'Unexpected continuation frame'
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (payloadLength <= 125) {
|
|
this.#info.payloadLength = payloadLength;
|
|
this.#state = parserStates.READ_DATA;
|
|
} else if (payloadLength === 126) {
|
|
this.#state = parserStates.PAYLOADLENGTH_16;
|
|
} else if (payloadLength === 127) {
|
|
this.#state = parserStates.PAYLOADLENGTH_64;
|
|
}
|
|
|
|
if (isTextBinaryFrame(opcode)) {
|
|
this.#info.binaryType = opcode;
|
|
this.#info.compressed = rsv1 !== 0;
|
|
}
|
|
|
|
this.#info.opcode = opcode;
|
|
this.#info.masked = masked;
|
|
this.#info.fin = fin;
|
|
this.#info.fragmented = fragmented;
|
|
} else if (this.#state === parserStates.PAYLOADLENGTH_16) {
|
|
if (this.#byteOffset < 2) {
|
|
return callback();
|
|
}
|
|
|
|
const buffer = this.consume(2);
|
|
|
|
this.#info.payloadLength = buffer.readUInt16BE(0);
|
|
this.#state = parserStates.READ_DATA;
|
|
} else if (this.#state === parserStates.PAYLOADLENGTH_64) {
|
|
if (this.#byteOffset < 8) {
|
|
return callback();
|
|
}
|
|
|
|
const buffer = this.consume(8);
|
|
const upper = buffer.readUInt32BE(0);
|
|
|
|
// 2^31 is the maximum bytes an arraybuffer can contain
|
|
// on 32-bit systems. Although, on 64-bit systems, this is
|
|
// 2^53-1 bytes.
|
|
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Errors/Invalid_array_length
|
|
// https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275
|
|
// https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e
|
|
if (upper > 2 ** 31 - 1) {
|
|
failWebsocketConnection(
|
|
this.#handler,
|
|
1009,
|
|
'Received payload length > 2^31 bytes.'
|
|
);
|
|
return;
|
|
}
|
|
|
|
const lower = buffer.readUInt32BE(4);
|
|
|
|
this.#info.payloadLength = (upper << 8) + lower;
|
|
this.#state = parserStates.READ_DATA;
|
|
} else if (this.#state === parserStates.READ_DATA) {
|
|
if (this.#byteOffset < this.#info.payloadLength) {
|
|
return callback();
|
|
}
|
|
|
|
const body = this.consume(this.#info.payloadLength);
|
|
|
|
if (isControlFrame(this.#info.opcode)) {
|
|
this.#loop = this.parseControlFrame(body);
|
|
this.#state = parserStates.INFO;
|
|
} else {
|
|
if (!this.#info.compressed) {
|
|
this.writeFragments(body);
|
|
|
|
// If the frame is not fragmented, a message has been received.
|
|
// If the frame is fragmented, it will terminate with a fin bit set
|
|
// and an opcode of 0 (continuation), therefore we handle that when
|
|
// parsing continuation frames, not here.
|
|
if (!this.#info.fragmented && this.#info.fin) {
|
|
websocketMessageReceived(
|
|
this.#handler,
|
|
this.#info.binaryType,
|
|
this.consumeFragments()
|
|
);
|
|
}
|
|
|
|
this.#state = parserStates.INFO;
|
|
} else {
|
|
this.#extensions
|
|
.get('permessage-deflate')
|
|
.decompress(body, this.#info.fin, (error, data) => {
|
|
if (error) {
|
|
failWebsocketConnection(this.#handler, 1007, error.message);
|
|
return;
|
|
}
|
|
|
|
this.writeFragments(data);
|
|
|
|
if (!this.#info.fin) {
|
|
this.#state = parserStates.INFO;
|
|
this.#loop = true;
|
|
this.run(callback);
|
|
return;
|
|
}
|
|
|
|
websocketMessageReceived(
|
|
this.#handler,
|
|
this.#info.binaryType,
|
|
this.consumeFragments()
|
|
);
|
|
|
|
this.#loop = true;
|
|
this.#state = parserStates.INFO;
|
|
this.run(callback);
|
|
});
|
|
|
|
this.#loop = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Take n bytes from the buffered Buffers
|
|
* @param {number} n
|
|
* @returns {Buffer}
|
|
*/
|
|
consume(n) {
|
|
if (n > this.#byteOffset) {
|
|
throw new Error('Called consume() before buffers satiated.');
|
|
} else if (n === 0) {
|
|
return emptyBuffer;
|
|
}
|
|
|
|
this.#byteOffset -= n;
|
|
|
|
const first = this.#buffers[0];
|
|
|
|
if (first.length > n) {
|
|
// replace with remaining buffer
|
|
this.#buffers[0] = first.subarray(n, first.length);
|
|
return first.subarray(0, n);
|
|
} else if (first.length === n) {
|
|
// prefect match
|
|
return this.#buffers.shift();
|
|
} else {
|
|
let offset = 0;
|
|
// If Buffer.allocUnsafe is used, extra copies will be made because the offset is non-zero.
|
|
const buffer = Buffer.allocUnsafeSlow(n);
|
|
while (offset !== n) {
|
|
const next = this.#buffers[0];
|
|
const length = next.length;
|
|
|
|
if (length + offset === n) {
|
|
buffer.set(this.#buffers.shift(), offset);
|
|
break;
|
|
} else if (length + offset > n) {
|
|
buffer.set(next.subarray(0, n - offset), offset);
|
|
this.#buffers[0] = next.subarray(n - offset);
|
|
break;
|
|
} else {
|
|
buffer.set(this.#buffers.shift(), offset);
|
|
offset += length;
|
|
}
|
|
}
|
|
|
|
return buffer;
|
|
}
|
|
}
|
|
|
|
writeFragments(fragment) {
|
|
this.#fragmentsBytes += fragment.length;
|
|
this.#fragments.push(fragment);
|
|
}
|
|
|
|
consumeFragments() {
|
|
const fragments = this.#fragments;
|
|
|
|
if (fragments.length === 1) {
|
|
// single fragment
|
|
this.#fragmentsBytes = 0;
|
|
return fragments.shift();
|
|
}
|
|
|
|
let offset = 0;
|
|
// If Buffer.allocUnsafe is used, extra copies will be made because the offset is non-zero.
|
|
const output = Buffer.allocUnsafeSlow(this.#fragmentsBytes);
|
|
|
|
for (let i = 0; i < fragments.length; ++i) {
|
|
const buffer = fragments[i];
|
|
output.set(buffer, offset);
|
|
offset += buffer.length;
|
|
}
|
|
|
|
this.#fragments = [];
|
|
this.#fragmentsBytes = 0;
|
|
|
|
return output;
|
|
}
|
|
|
|
parseCloseBody(data) {
|
|
assert(data.length !== 1);
|
|
|
|
// https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
|
|
/** @type {number|undefined} */
|
|
let code;
|
|
|
|
if (data.length >= 2) {
|
|
// _The WebSocket Connection Close Code_ is
|
|
// defined as the status code (Section 7.4) contained in the first Close
|
|
// control frame received by the application
|
|
code = data.readUInt16BE(0);
|
|
}
|
|
|
|
if (code !== undefined && !isValidStatusCode(code)) {
|
|
return { code: 1002, reason: 'Invalid status code', error: true };
|
|
}
|
|
|
|
// https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.6
|
|
/** @type {Buffer} */
|
|
let reason = data.subarray(2);
|
|
|
|
// Remove BOM
|
|
if (reason[0] === 0xef && reason[1] === 0xbb && reason[2] === 0xbf) {
|
|
reason = reason.subarray(3);
|
|
}
|
|
|
|
try {
|
|
reason = utf8Decode(reason);
|
|
} catch {
|
|
return { code: 1007, reason: 'Invalid UTF-8', error: true };
|
|
}
|
|
|
|
return { code, reason, error: false };
|
|
}
|
|
|
|
/**
|
|
* Parses control frames.
|
|
* @param {Buffer} body
|
|
*/
|
|
parseControlFrame(body) {
|
|
const { opcode, payloadLength } = this.#info;
|
|
|
|
if (opcode === opcodes.CLOSE) {
|
|
if (payloadLength === 1) {
|
|
failWebsocketConnection(
|
|
this.#handler,
|
|
1002,
|
|
'Received close frame with a 1-byte body.'
|
|
);
|
|
return false;
|
|
}
|
|
|
|
this.#info.closeInfo = this.parseCloseBody(body);
|
|
|
|
if (this.#info.closeInfo.error) {
|
|
const { code, reason } = this.#info.closeInfo;
|
|
|
|
failWebsocketConnection(this.#handler, code, reason);
|
|
return false;
|
|
}
|
|
|
|
// Upon receiving such a frame, the other peer sends a
|
|
// Close frame in response, if it hasn't already sent one.
|
|
if (
|
|
!this.#handler.closeState.has(sentCloseFrameState.SENT) &&
|
|
!this.#handler.closeState.has(sentCloseFrameState.RECEIVED)
|
|
) {
|
|
// If an endpoint receives a Close frame and did not previously send a
|
|
// Close frame, the endpoint MUST send a Close frame in response. (When
|
|
// sending a Close frame in response, the endpoint typically echos the
|
|
// status code it received.)
|
|
let body = emptyBuffer;
|
|
if (this.#info.closeInfo.code) {
|
|
body = Buffer.allocUnsafe(2);
|
|
body.writeUInt16BE(this.#info.closeInfo.code, 0);
|
|
}
|
|
const closeFrame = new WebsocketFrameSend(body);
|
|
|
|
this.#handler.socket.write(closeFrame.createFrame(opcodes.CLOSE));
|
|
this.#handler.closeState.add(sentCloseFrameState.SENT);
|
|
}
|
|
|
|
// Upon either sending or receiving a Close control frame, it is said
|
|
// that _The WebSocket Closing Handshake is Started_ and that the
|
|
// WebSocket connection is in the CLOSING state.
|
|
this.#handler.readyState = states.CLOSING;
|
|
this.#handler.closeState.add(sentCloseFrameState.RECEIVED);
|
|
|
|
return false;
|
|
} else if (opcode === opcodes.PING) {
|
|
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
|
|
// response, unless it already received a Close frame.
|
|
// A Pong frame sent in response to a Ping frame must have identical
|
|
// "Application data"
|
|
|
|
if (!this.#handler.closeState.has(sentCloseFrameState.RECEIVED)) {
|
|
const frame = new WebsocketFrameSend(body);
|
|
|
|
this.#handler.socket.write(frame.createFrame(opcodes.PONG));
|
|
|
|
if (channels.ping.hasSubscribers) {
|
|
channels.ping.publish({
|
|
payload: body,
|
|
});
|
|
}
|
|
}
|
|
} else if (opcode === opcodes.PONG) {
|
|
// A Pong frame MAY be sent unsolicited. This serves as a
|
|
// unidirectional heartbeat. A response to an unsolicited Pong frame is
|
|
// not expected.
|
|
|
|
if (channels.pong.hasSubscribers) {
|
|
channels.pong.publish({
|
|
payload: body,
|
|
});
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
get closingInfo() {
|
|
return this.#info.closeInfo;
|
|
}
|
|
}
|
|
|
|
module.exports = {
|
|
ByteParser,
|
|
};
|