'use strict'; /* eslint-disable no-var */ var reusify = require('reusify'); function fastqueue(context, worker, _concurrency) { if (typeof context === 'function') { _concurrency = worker; worker = context; context = null; } if (!(_concurrency >= 1)) { throw new Error('fastqueue concurrency must be equal to or greater than 1'); } var cache = reusify(Task); var queueHead = null; var queueTail = null; var _running = 0; var errorHandler = null; var self = { push: push, drain: noop, saturated: noop, pause: pause, paused: false, get concurrency() { return _concurrency; }, set concurrency(value) { if (!(value >= 1)) { throw new Error( 'fastqueue concurrency must be equal to or greater than 1' ); } _concurrency = value; if (self.paused) return; for (; queueHead && _running < _concurrency; ) { _running++; release(); } }, running: running, resume: resume, idle: idle, length: length, getQueue: getQueue, unshift: unshift, empty: noop, kill: kill, killAndDrain: killAndDrain, error: error, }; return self; function running() { return _running; } function pause() { self.paused = true; } function length() { var current = queueHead; var counter = 0; while (current) { current = current.next; counter++; } return counter; } function getQueue() { var current = queueHead; var tasks = []; while (current) { tasks.push(current.value); current = current.next; } return tasks; } function resume() { if (!self.paused) return; self.paused = false; if (queueHead === null) { _running++; release(); return; } for (; queueHead && _running < _concurrency; ) { _running++; release(); } } function idle() { return _running === 0 && self.length() === 0; } function push(value, done) { var current = cache.get(); current.context = context; current.release = release; current.value = value; current.callback = done || noop; current.errorHandler = errorHandler; if (_running >= _concurrency || self.paused) { if (queueTail) { queueTail.next = current; queueTail = current; } else { queueHead = current; queueTail = current; self.saturated(); } } else { _running++; worker.call(context, current.value, current.worked); } } function unshift(value, done) { var current = cache.get(); current.context = context; current.release = release; current.value = value; current.callback = done || noop; current.errorHandler = errorHandler; if (_running >= _concurrency || self.paused) { if (queueHead) { current.next = queueHead; queueHead = current; } else { queueHead = current; queueTail = current; self.saturated(); } } else { _running++; worker.call(context, current.value, current.worked); } } function release(holder) { if (holder) { cache.release(holder); } var next = queueHead; if (next && _running <= _concurrency) { if (!self.paused) { if (queueTail === queueHead) { queueTail = null; } queueHead = next.next; next.next = null; worker.call(context, next.value, next.worked); if (queueTail === null) { self.empty(); } } else { _running--; } } else if (--_running === 0) { self.drain(); } } function kill() { queueHead = null; queueTail = null; self.drain = noop; } function killAndDrain() { queueHead = null; queueTail = null; self.drain(); self.drain = noop; } function error(handler) { errorHandler = handler; } } function noop() {} function Task() { this.value = null; this.callback = noop; this.next = null; this.release = noop; this.context = null; this.errorHandler = null; var self = this; this.worked = function worked(err, result) { var callback = self.callback; var errorHandler = self.errorHandler; var val = self.value; self.value = null; self.callback = noop; if (self.errorHandler) { errorHandler(err, val); } callback.call(self.context, err, result); self.release(self); }; } function queueAsPromised(context, worker, _concurrency) { if (typeof context === 'function') { _concurrency = worker; worker = context; context = null; } function asyncWrapper(arg, cb) { worker.call(this, arg).then(function (res) { cb(null, res); }, cb); } var queue = fastqueue(context, asyncWrapper, _concurrency); var pushCb = queue.push; var unshiftCb = queue.unshift; queue.push = push; queue.unshift = unshift; queue.drained = drained; return queue; function push(value) { var p = new Promise(function (resolve, reject) { pushCb(value, function (err, result) { if (err) { reject(err); return; } resolve(result); }); }); // Let's fork the promise chain to // make the error bubble up to the user but // not lead to a unhandledRejection p.catch(noop); return p; } function unshift(value) { var p = new Promise(function (resolve, reject) { unshiftCb(value, function (err, result) { if (err) { reject(err); return; } resolve(result); }); }); // Let's fork the promise chain to // make the error bubble up to the user but // not lead to a unhandledRejection p.catch(noop); return p; } function drained() { var p = new Promise(function (resolve) { process.nextTick(function () { if (queue.idle()) { resolve(); } else { var previousDrain = queue.drain; queue.drain = function () { if (typeof previousDrain === 'function') previousDrain(); resolve(); queue.drain = previousDrain; }; } }); }); return p; } } module.exports = fastqueue; module.exports.promise = queueAsPromised;