167 lines
3.9 KiB
JavaScript
167 lines
3.9 KiB
JavaScript
/*! multistream. MIT License. Feross Aboukhadijeh <https://feross.org/opensource> */
|
|
const stream = require('readable-stream');
|
|
const once = require('once');
|
|
|
|
function toStreams2Obj(s) {
|
|
return toStreams2(s, { objectMode: true, highWaterMark: 16 });
|
|
}
|
|
|
|
function toStreams2Buf(s) {
|
|
return toStreams2(s);
|
|
}
|
|
|
|
function toStreams2(s, opts) {
|
|
if (!s || typeof s === 'function' || s._readableState) return s;
|
|
|
|
const wrap = new stream.Readable(opts).wrap(s);
|
|
if (s.destroy) {
|
|
wrap.destroy = s.destroy.bind(s);
|
|
}
|
|
return wrap;
|
|
}
|
|
|
|
class MultiStream extends stream.Readable {
|
|
constructor(streams, opts) {
|
|
super({ ...opts, autoDestroy: true });
|
|
|
|
this._drained = false;
|
|
this._forwarding = false;
|
|
this._current = null;
|
|
this._toStreams2 = opts && opts.objectMode ? toStreams2Obj : toStreams2Buf;
|
|
|
|
if (typeof streams === 'function') {
|
|
this._queue = streams;
|
|
} else {
|
|
this._queue = streams.map(this._toStreams2);
|
|
this._queue.forEach((stream) => {
|
|
if (typeof stream !== 'function') this._attachErrorListener(stream);
|
|
});
|
|
}
|
|
|
|
this._next();
|
|
}
|
|
|
|
_read() {
|
|
this._drained = true;
|
|
this._forward();
|
|
}
|
|
|
|
_forward() {
|
|
if (this._forwarding || !this._drained || !this._current) return;
|
|
this._forwarding = true;
|
|
|
|
let chunk;
|
|
while (this._drained && (chunk = this._current.read()) !== null) {
|
|
this._drained = this.push(chunk);
|
|
}
|
|
|
|
this._forwarding = false;
|
|
}
|
|
|
|
_destroy(err, cb) {
|
|
let streams = [];
|
|
if (this._current) streams.push(this._current);
|
|
if (typeof this._queue !== 'function')
|
|
streams = streams.concat(this._queue);
|
|
|
|
if (streams.length === 0) {
|
|
cb(err);
|
|
} else {
|
|
let counter = streams.length;
|
|
let er = err;
|
|
streams.forEach((stream) => {
|
|
destroy(stream, err, (err) => {
|
|
er = er || err;
|
|
if (--counter === 0) {
|
|
cb(er);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
_next() {
|
|
this._current = null;
|
|
|
|
if (typeof this._queue === 'function') {
|
|
this._queue((err, stream) => {
|
|
if (err) return this.destroy(err);
|
|
stream = this._toStreams2(stream);
|
|
this._attachErrorListener(stream);
|
|
this._gotNextStream(stream);
|
|
});
|
|
} else {
|
|
let stream = this._queue.shift();
|
|
if (typeof stream === 'function') {
|
|
stream = this._toStreams2(stream());
|
|
this._attachErrorListener(stream);
|
|
}
|
|
this._gotNextStream(stream);
|
|
}
|
|
}
|
|
|
|
_gotNextStream(stream) {
|
|
if (!stream) {
|
|
this.push(null);
|
|
return;
|
|
}
|
|
|
|
this._current = stream;
|
|
this._forward();
|
|
|
|
const onReadable = () => {
|
|
this._forward();
|
|
};
|
|
|
|
const onClose = () => {
|
|
if (!stream._readableState.ended && !stream.destroyed) {
|
|
const err = new Error('ERR_STREAM_PREMATURE_CLOSE');
|
|
err.code = 'ERR_STREAM_PREMATURE_CLOSE';
|
|
this.destroy(err);
|
|
}
|
|
};
|
|
|
|
const onEnd = () => {
|
|
this._current = null;
|
|
stream.removeListener('readable', onReadable);
|
|
stream.removeListener('end', onEnd);
|
|
stream.removeListener('close', onClose);
|
|
stream.destroy();
|
|
this._next();
|
|
};
|
|
|
|
stream.on('readable', onReadable);
|
|
stream.once('end', onEnd);
|
|
stream.once('close', onClose);
|
|
}
|
|
|
|
_attachErrorListener(stream) {
|
|
if (!stream) return;
|
|
|
|
const onError = (err) => {
|
|
stream.removeListener('error', onError);
|
|
this.destroy(err);
|
|
};
|
|
|
|
stream.once('error', onError);
|
|
}
|
|
}
|
|
|
|
MultiStream.obj = (streams) =>
|
|
new MultiStream(streams, { objectMode: true, highWaterMark: 16 });
|
|
|
|
module.exports = MultiStream;
|
|
|
|
// Normalize stream destroy w/ callback.
|
|
function destroy(stream, err, cb) {
|
|
if (!stream.destroy || stream.destroyed) {
|
|
cb(err);
|
|
} else {
|
|
const callback = once((er) => cb(er || err));
|
|
stream
|
|
.on('error', callback)
|
|
.on('close', () => callback())
|
|
.destroy(err, callback);
|
|
}
|
|
}
|