'use strict'; const { InvalidArgumentError } = require('../core/errors'); const { kClients, kRunning, kClose, kDestroy, kDispatch, } = require('../core/symbols'); const DispatcherBase = require('./dispatcher-base'); const Pool = require('./pool'); const Client = require('./client'); const util = require('../core/util'); const kOnConnect = Symbol('onConnect'); const kOnDisconnect = Symbol('onDisconnect'); const kOnConnectionError = Symbol('onConnectionError'); const kOnDrain = Symbol('onDrain'); const kFactory = Symbol('factory'); const kOptions = Symbol('options'); function defaultFactory(origin, opts) { return opts && opts.connections === 1 ? new Client(origin, opts) : new Pool(origin, opts); } class Agent extends DispatcherBase { constructor({ factory = defaultFactory, connect, ...options } = {}) { if (typeof factory !== 'function') { throw new InvalidArgumentError('factory must be a function.'); } if ( connect != null && typeof connect !== 'function' && typeof connect !== 'object' ) { throw new InvalidArgumentError('connect must be a function or an object'); } super(); if (connect && typeof connect !== 'function') { connect = { ...connect }; } this[kOptions] = { ...util.deepClone(options), connect }; this[kFactory] = factory; this[kClients] = new Map(); this[kOnDrain] = (origin, targets) => { this.emit('drain', origin, [this, ...targets]); }; this[kOnConnect] = (origin, targets) => { this.emit('connect', origin, [this, ...targets]); }; this[kOnDisconnect] = (origin, targets, err) => { this.emit('disconnect', origin, [this, ...targets], err); }; this[kOnConnectionError] = (origin, targets, err) => { this.emit('connectionError', origin, [this, ...targets], err); }; } get [kRunning]() { let ret = 0; for (const client of this[kClients].values()) { ret += client[kRunning]; } return ret; } [kDispatch](opts, handler) { let key; if ( opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL) ) { key = String(opts.origin); } else { throw new InvalidArgumentError( 'opts.origin must be a non-empty string or URL.' ); } let dispatcher = this[kClients].get(key); if (!dispatcher) { dispatcher = this[kFactory](opts.origin, this[kOptions]) .on('drain', this[kOnDrain]) .on('connect', this[kOnConnect]) .on('disconnect', this[kOnDisconnect]) .on('connectionError', this[kOnConnectionError]); // This introduces a tiny memory leak, as dispatchers are never removed from the map. // TODO(mcollina): remove te timer when the client/pool do not have any more // active connections. this[kClients].set(key, dispatcher); } return dispatcher.dispatch(opts, handler); } async [kClose]() { const closePromises = []; for (const client of this[kClients].values()) { closePromises.push(client.close()); } this[kClients].clear(); await Promise.all(closePromises); } async [kDestroy](err) { const destroyPromises = []; for (const client of this[kClients].values()) { destroyPromises.push(client.destroy(err)); } this[kClients].clear(); await Promise.all(destroyPromises); } } module.exports = Agent;