2025-04-02 06:50:39 -04:00

227 lines
5.5 KiB
JavaScript

'use strict';
const {
BalancedPoolMissingUpstreamError,
InvalidArgumentError,
} = require('../core/errors');
const {
PoolBase,
kClients,
kNeedDrain,
kAddClient,
kRemoveClient,
kGetDispatcher,
} = require('./pool-base');
const Pool = require('./pool');
const { kUrl } = require('../core/symbols');
const { parseOrigin } = require('../core/util');
const kFactory = Symbol('factory');
const kOptions = Symbol('options');
const kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor');
const kCurrentWeight = Symbol('kCurrentWeight');
const kIndex = Symbol('kIndex');
const kWeight = Symbol('kWeight');
const kMaxWeightPerServer = Symbol('kMaxWeightPerServer');
const kErrorPenalty = Symbol('kErrorPenalty');
/**
* Calculate the greatest common divisor of two numbers by
* using the Euclidean algorithm.
*
* @param {number} a
* @param {number} b
* @returns {number}
*/
function getGreatestCommonDivisor(a, b) {
if (a === 0) return b;
while (b !== 0) {
const t = b;
b = a % b;
a = t;
}
return a;
}
function defaultFactory(origin, opts) {
return new Pool(origin, opts);
}
class BalancedPool extends PoolBase {
constructor(upstreams = [], { factory = defaultFactory, ...opts } = {}) {
if (typeof factory !== 'function') {
throw new InvalidArgumentError('factory must be a function.');
}
super();
this[kOptions] = opts;
this[kIndex] = -1;
this[kCurrentWeight] = 0;
this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100;
this[kErrorPenalty] = this[kOptions].errorPenalty || 15;
if (!Array.isArray(upstreams)) {
upstreams = [upstreams];
}
this[kFactory] = factory;
for (const upstream of upstreams) {
this.addUpstream(upstream);
}
this._updateBalancedPoolStats();
}
addUpstream(upstream) {
const upstreamOrigin = parseOrigin(upstream).origin;
if (
this[kClients].find(
(pool) =>
pool[kUrl].origin === upstreamOrigin &&
pool.closed !== true &&
pool.destroyed !== true
)
) {
return this;
}
const pool = this[kFactory](
upstreamOrigin,
Object.assign({}, this[kOptions])
);
this[kAddClient](pool);
pool.on('connect', () => {
pool[kWeight] = Math.min(
this[kMaxWeightPerServer],
pool[kWeight] + this[kErrorPenalty]
);
});
pool.on('connectionError', () => {
pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty]);
this._updateBalancedPoolStats();
});
pool.on('disconnect', (...args) => {
const err = args[2];
if (err && err.code === 'UND_ERR_SOCKET') {
// decrease the weight of the pool.
pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty]);
this._updateBalancedPoolStats();
}
});
for (const client of this[kClients]) {
client[kWeight] = this[kMaxWeightPerServer];
}
this._updateBalancedPoolStats();
return this;
}
_updateBalancedPoolStats() {
let result = 0;
for (let i = 0; i < this[kClients].length; i++) {
result = getGreatestCommonDivisor(this[kClients][i][kWeight], result);
}
this[kGreatestCommonDivisor] = result;
}
removeUpstream(upstream) {
const upstreamOrigin = parseOrigin(upstream).origin;
const pool = this[kClients].find(
(pool) =>
pool[kUrl].origin === upstreamOrigin &&
pool.closed !== true &&
pool.destroyed !== true
);
if (pool) {
this[kRemoveClient](pool);
}
return this;
}
get upstreams() {
return this[kClients]
.filter(
(dispatcher) =>
dispatcher.closed !== true && dispatcher.destroyed !== true
)
.map((p) => p[kUrl].origin);
}
[kGetDispatcher]() {
// We validate that pools is greater than 0,
// otherwise we would have to wait until an upstream
// is added, which might never happen.
if (this[kClients].length === 0) {
throw new BalancedPoolMissingUpstreamError();
}
const dispatcher = this[kClients].find(
(dispatcher) =>
!dispatcher[kNeedDrain] &&
dispatcher.closed !== true &&
dispatcher.destroyed !== true
);
if (!dispatcher) {
return;
}
const allClientsBusy = this[kClients]
.map((pool) => pool[kNeedDrain])
.reduce((a, b) => a && b, true);
if (allClientsBusy) {
return;
}
let counter = 0;
let maxWeightIndex = this[kClients].findIndex((pool) => !pool[kNeedDrain]);
while (counter++ < this[kClients].length) {
this[kIndex] = (this[kIndex] + 1) % this[kClients].length;
const pool = this[kClients][this[kIndex]];
// find pool index with the largest weight
if (
pool[kWeight] > this[kClients][maxWeightIndex][kWeight] &&
!pool[kNeedDrain]
) {
maxWeightIndex = this[kIndex];
}
// decrease the current weight every `this[kClients].length`.
if (this[kIndex] === 0) {
// Set the current weight to the next lower weight.
this[kCurrentWeight] =
this[kCurrentWeight] - this[kGreatestCommonDivisor];
if (this[kCurrentWeight] <= 0) {
this[kCurrentWeight] = this[kMaxWeightPerServer];
}
}
if (pool[kWeight] >= this[kCurrentWeight] && !pool[kNeedDrain]) {
return pool;
}
}
this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight];
this[kIndex] = maxWeightIndex;
return this[kClients][maxWeightIndex];
}
}
module.exports = BalancedPool;