2025-04-02 06:50:39 -04:00

210 lines
4.6 KiB
JavaScript

'use strict';
const DispatcherBase = require('./dispatcher-base');
const FixedQueue = require('./fixed-queue');
const {
kConnected,
kSize,
kRunning,
kPending,
kQueued,
kBusy,
kFree,
kUrl,
kClose,
kDestroy,
kDispatch,
} = require('../core/symbols');
const PoolStats = require('./pool-stats');
const kClients = Symbol('clients');
const kNeedDrain = Symbol('needDrain');
const kQueue = Symbol('queue');
const kClosedResolve = Symbol('closed resolve');
const kOnDrain = Symbol('onDrain');
const kOnConnect = Symbol('onConnect');
const kOnDisconnect = Symbol('onDisconnect');
const kOnConnectionError = Symbol('onConnectionError');
const kGetDispatcher = Symbol('get dispatcher');
const kAddClient = Symbol('add client');
const kRemoveClient = Symbol('remove client');
const kStats = Symbol('stats');
class PoolBase extends DispatcherBase {
constructor() {
super();
this[kQueue] = new FixedQueue();
this[kClients] = [];
this[kQueued] = 0;
const pool = this;
this[kOnDrain] = function onDrain(origin, targets) {
const queue = pool[kQueue];
let needDrain = false;
while (!needDrain) {
const item = queue.shift();
if (!item) {
break;
}
pool[kQueued]--;
needDrain = !this.dispatch(item.opts, item.handler);
}
this[kNeedDrain] = needDrain;
if (!this[kNeedDrain] && pool[kNeedDrain]) {
pool[kNeedDrain] = false;
pool.emit('drain', origin, [pool, ...targets]);
}
if (pool[kClosedResolve] && queue.isEmpty()) {
Promise.all(pool[kClients].map((c) => c.close())).then(
pool[kClosedResolve]
);
}
};
this[kOnConnect] = (origin, targets) => {
pool.emit('connect', origin, [pool, ...targets]);
};
this[kOnDisconnect] = (origin, targets, err) => {
pool.emit('disconnect', origin, [pool, ...targets], err);
};
this[kOnConnectionError] = (origin, targets, err) => {
pool.emit('connectionError', origin, [pool, ...targets], err);
};
this[kStats] = new PoolStats(this);
}
get [kBusy]() {
return this[kNeedDrain];
}
get [kConnected]() {
return this[kClients].filter((client) => client[kConnected]).length;
}
get [kFree]() {
return this[kClients].filter(
(client) => client[kConnected] && !client[kNeedDrain]
).length;
}
get [kPending]() {
let ret = this[kQueued];
for (const { [kPending]: pending } of this[kClients]) {
ret += pending;
}
return ret;
}
get [kRunning]() {
let ret = 0;
for (const { [kRunning]: running } of this[kClients]) {
ret += running;
}
return ret;
}
get [kSize]() {
let ret = this[kQueued];
for (const { [kSize]: size } of this[kClients]) {
ret += size;
}
return ret;
}
get stats() {
return this[kStats];
}
async [kClose]() {
if (this[kQueue].isEmpty()) {
await Promise.all(this[kClients].map((c) => c.close()));
} else {
await new Promise((resolve) => {
this[kClosedResolve] = resolve;
});
}
}
async [kDestroy](err) {
while (true) {
const item = this[kQueue].shift();
if (!item) {
break;
}
item.handler.onError(err);
}
await Promise.all(this[kClients].map((c) => c.destroy(err)));
}
[kDispatch](opts, handler) {
const dispatcher = this[kGetDispatcher]();
if (!dispatcher) {
this[kNeedDrain] = true;
this[kQueue].push({ opts, handler });
this[kQueued]++;
} else if (!dispatcher.dispatch(opts, handler)) {
dispatcher[kNeedDrain] = true;
this[kNeedDrain] = !this[kGetDispatcher]();
}
return !this[kNeedDrain];
}
[kAddClient](client) {
client
.on('drain', this[kOnDrain])
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('connectionError', this[kOnConnectionError]);
this[kClients].push(client);
if (this[kNeedDrain]) {
queueMicrotask(() => {
if (this[kNeedDrain]) {
this[kOnDrain](client[kUrl], [this, client]);
}
});
}
return this;
}
[kRemoveClient](client) {
client.close(() => {
const idx = this[kClients].indexOf(client);
if (idx !== -1) {
this[kClients].splice(idx, 1);
}
});
this[kNeedDrain] = this[kClients].some(
(dispatcher) =>
!dispatcher[kNeedDrain] &&
dispatcher.closed !== true &&
dispatcher.destroyed !== true
);
}
}
module.exports = {
PoolBase,
kClients,
kNeedDrain,
kAddClient,
kRemoveClient,
kGetDispatcher,
};