2025-04-02 06:50:39 -04:00

401 lines
12 KiB
JavaScript

'use strict';
const { Transform } = require('node:stream');
const { isASCIINumber, isValidLastEventId } = require('./util');
/**
* @type {number[]} BOM
*/
const BOM = [0xef, 0xbb, 0xbf];
/**
* @type {10} LF
*/
const LF = 0x0a;
/**
* @type {13} CR
*/
const CR = 0x0d;
/**
* @type {58} COLON
*/
const COLON = 0x3a;
/**
* @type {32} SPACE
*/
const SPACE = 0x20;
/**
* @typedef {object} EventSourceStreamEvent
* @type {object}
* @property {string} [event] The event type.
* @property {string} [data] The data of the message.
* @property {string} [id] A unique ID for the event.
* @property {string} [retry] The reconnection time, in milliseconds.
*/
/**
* @typedef eventSourceSettings
* @type {object}
* @property {string} [lastEventId] The last event ID received from the server.
* @property {string} [origin] The origin of the event source.
* @property {number} [reconnectionTime] The reconnection time, in milliseconds.
*/
class EventSourceStream extends Transform {
/**
* @type {eventSourceSettings}
*/
state;
/**
* Leading byte-order-mark check.
* @type {boolean}
*/
checkBOM = true;
/**
* @type {boolean}
*/
crlfCheck = false;
/**
* @type {boolean}
*/
eventEndCheck = false;
/**
* @type {Buffer|null}
*/
buffer = null;
pos = 0;
event = {
data: undefined,
event: undefined,
id: undefined,
retry: undefined,
};
/**
* @param {object} options
* @param {boolean} [options.readableObjectMode]
* @param {eventSourceSettings} [options.eventSourceSettings]
* @param {(chunk: any, encoding?: BufferEncoding | undefined) => boolean} [options.push]
*/
constructor(options = {}) {
// Enable object mode as EventSourceStream emits objects of shape
// EventSourceStreamEvent
options.readableObjectMode = true;
super(options);
this.state = options.eventSourceSettings || {};
if (options.push) {
this.push = options.push;
}
}
/**
* @param {Buffer} chunk
* @param {string} _encoding
* @param {Function} callback
* @returns {void}
*/
_transform(chunk, _encoding, callback) {
if (chunk.length === 0) {
callback();
return;
}
// Cache the chunk in the buffer, as the data might not be complete while
// processing it
// TODO: Investigate if there is a more performant way to handle
// incoming chunks
// see: https://github.com/nodejs/undici/issues/2630
if (this.buffer) {
this.buffer = Buffer.concat([this.buffer, chunk]);
} else {
this.buffer = chunk;
}
// Strip leading byte-order-mark if we opened the stream and started
// the processing of the incoming data
if (this.checkBOM) {
switch (this.buffer.length) {
case 1:
// Check if the first byte is the same as the first byte of the BOM
if (this.buffer[0] === BOM[0]) {
// If it is, we need to wait for more data
callback();
return;
}
// Set the checkBOM flag to false as we don't need to check for the
// BOM anymore
this.checkBOM = false;
// The buffer only contains one byte so we need to wait for more data
callback();
return;
case 2:
// Check if the first two bytes are the same as the first two bytes
// of the BOM
if (this.buffer[0] === BOM[0] && this.buffer[1] === BOM[1]) {
// If it is, we need to wait for more data, because the third byte
// is needed to determine if it is the BOM or not
callback();
return;
}
// Set the checkBOM flag to false as we don't need to check for the
// BOM anymore
this.checkBOM = false;
break;
case 3:
// Check if the first three bytes are the same as the first three
// bytes of the BOM
if (
this.buffer[0] === BOM[0] &&
this.buffer[1] === BOM[1] &&
this.buffer[2] === BOM[2]
) {
// If it is, we can drop the buffered data, as it is only the BOM
this.buffer = Buffer.alloc(0);
// Set the checkBOM flag to false as we don't need to check for the
// BOM anymore
this.checkBOM = false;
// Await more data
callback();
return;
}
// If it is not the BOM, we can start processing the data
this.checkBOM = false;
break;
default:
// The buffer is longer than 3 bytes, so we can drop the BOM if it is
// present
if (
this.buffer[0] === BOM[0] &&
this.buffer[1] === BOM[1] &&
this.buffer[2] === BOM[2]
) {
// Remove the BOM from the buffer
this.buffer = this.buffer.subarray(3);
}
// Set the checkBOM flag to false as we don't need to check for the
this.checkBOM = false;
break;
}
}
while (this.pos < this.buffer.length) {
// If the previous line ended with an end-of-line, we need to check
// if the next character is also an end-of-line.
if (this.eventEndCheck) {
// If the the current character is an end-of-line, then the event
// is finished and we can process it
// If the previous line ended with a carriage return, we need to
// check if the current character is a line feed and remove it
// from the buffer.
if (this.crlfCheck) {
// If the current character is a line feed, we can remove it
// from the buffer and reset the crlfCheck flag
if (this.buffer[this.pos] === LF) {
this.buffer = this.buffer.subarray(this.pos + 1);
this.pos = 0;
this.crlfCheck = false;
// It is possible that the line feed is not the end of the
// event. We need to check if the next character is an
// end-of-line character to determine if the event is
// finished. We simply continue the loop to check the next
// character.
// As we removed the line feed from the buffer and set the
// crlfCheck flag to false, we basically don't make any
// distinction between a line feed and a carriage return.
continue;
}
this.crlfCheck = false;
}
if (this.buffer[this.pos] === LF || this.buffer[this.pos] === CR) {
// If the current character is a carriage return, we need to
// set the crlfCheck flag to true, as we need to check if the
// next character is a line feed so we can remove it from the
// buffer
if (this.buffer[this.pos] === CR) {
this.crlfCheck = true;
}
this.buffer = this.buffer.subarray(this.pos + 1);
this.pos = 0;
if (
this.event.data !== undefined ||
this.event.event ||
this.event.id ||
this.event.retry
) {
this.processEvent(this.event);
}
this.clearEvent();
continue;
}
// If the current character is not an end-of-line, then the event
// is not finished and we have to reset the eventEndCheck flag
this.eventEndCheck = false;
continue;
}
// If the current character is an end-of-line, we can process the
// line
if (this.buffer[this.pos] === LF || this.buffer[this.pos] === CR) {
// If the current character is a carriage return, we need to
// set the crlfCheck flag to true, as we need to check if the
// next character is a line feed
if (this.buffer[this.pos] === CR) {
this.crlfCheck = true;
}
// In any case, we can process the line as we reached an
// end-of-line character
this.parseLine(this.buffer.subarray(0, this.pos), this.event);
// Remove the processed line from the buffer
this.buffer = this.buffer.subarray(this.pos + 1);
// Reset the position as we removed the processed line from the buffer
this.pos = 0;
// A line was processed and this could be the end of the event. We need
// to check if the next line is empty to determine if the event is
// finished.
this.eventEndCheck = true;
continue;
}
this.pos++;
}
callback();
}
/**
* @param {Buffer} line
* @param {EventSourceStreamEvent} event
*/
parseLine(line, event) {
// If the line is empty (a blank line)
// Dispatch the event, as defined below.
// This will be handled in the _transform method
if (line.length === 0) {
return;
}
// If the line starts with a U+003A COLON character (:)
// Ignore the line.
const colonPosition = line.indexOf(COLON);
if (colonPosition === 0) {
return;
}
let field = '';
let value = '';
// If the line contains a U+003A COLON character (:)
if (colonPosition !== -1) {
// Collect the characters on the line before the first U+003A COLON
// character (:), and let field be that string.
// TODO: Investigate if there is a more performant way to extract the
// field
// see: https://github.com/nodejs/undici/issues/2630
field = line.subarray(0, colonPosition).toString('utf8');
// Collect the characters on the line after the first U+003A COLON
// character (:), and let value be that string.
// If value starts with a U+0020 SPACE character, remove it from value.
let valueStart = colonPosition + 1;
if (line[valueStart] === SPACE) {
++valueStart;
}
// TODO: Investigate if there is a more performant way to extract the
// value
// see: https://github.com/nodejs/undici/issues/2630
value = line.subarray(valueStart).toString('utf8');
// Otherwise, the string is not empty but does not contain a U+003A COLON
// character (:)
} else {
// Process the field using the steps described below, using the whole
// line as the field name, and the empty string as the field value.
field = line.toString('utf8');
value = '';
}
// Modify the event with the field name and value. The value is also
// decoded as UTF-8
switch (field) {
case 'data':
if (event[field] === undefined) {
event[field] = value;
} else {
event[field] += `\n${value}`;
}
break;
case 'retry':
if (isASCIINumber(value)) {
event[field] = value;
}
break;
case 'id':
if (isValidLastEventId(value)) {
event[field] = value;
}
break;
case 'event':
if (value.length > 0) {
event[field] = value;
}
break;
}
}
/**
* @param {EventSourceStreamEvent} event
*/
processEvent(event) {
if (event.retry && isASCIINumber(event.retry)) {
this.state.reconnectionTime = parseInt(event.retry, 10);
}
if (event.id && isValidLastEventId(event.id)) {
this.state.lastEventId = event.id;
}
// only dispatch event, when data is provided
if (event.data !== undefined) {
this.push({
type: event.event || 'message',
options: {
data: event.data,
lastEventId: this.state.lastEventId,
origin: this.state.origin,
},
});
}
}
clearEvent() {
this.event = {
data: undefined,
event: undefined,
id: undefined,
retry: undefined,
};
}
}
module.exports = {
EventSourceStream,
};