104 lines
2.1 KiB
JavaScript
104 lines
2.1 KiB
JavaScript
var Readable = require('readable-stream').Readable;
|
|
var inherits = require('inherits');
|
|
|
|
module.exports = from2;
|
|
|
|
from2.ctor = ctor;
|
|
from2.obj = obj;
|
|
|
|
var Proto = ctor();
|
|
|
|
function toFunction(list) {
|
|
list = list.slice();
|
|
return function (_, cb) {
|
|
var err = null;
|
|
var item = list.length ? list.shift() : null;
|
|
if (item instanceof Error) {
|
|
err = item;
|
|
item = null;
|
|
}
|
|
|
|
cb(err, item);
|
|
};
|
|
}
|
|
|
|
function from2(opts, read) {
|
|
if (typeof opts !== 'object' || Array.isArray(opts)) {
|
|
read = opts;
|
|
opts = {};
|
|
}
|
|
|
|
var rs = new Proto(opts);
|
|
rs._from = Array.isArray(read) ? toFunction(read) : read || noop;
|
|
return rs;
|
|
}
|
|
|
|
function ctor(opts, read) {
|
|
if (typeof opts === 'function') {
|
|
read = opts;
|
|
opts = {};
|
|
}
|
|
|
|
opts = defaults(opts);
|
|
|
|
inherits(Class, Readable);
|
|
function Class(override) {
|
|
if (!(this instanceof Class)) return new Class(override);
|
|
this._reading = false;
|
|
this._callback = check;
|
|
this.destroyed = false;
|
|
Readable.call(this, override || opts);
|
|
|
|
var self = this;
|
|
var hwm = this._readableState.highWaterMark;
|
|
|
|
function check(err, data) {
|
|
if (self.destroyed) return;
|
|
if (err) return self.destroy(err);
|
|
if (data === null) return self.push(null);
|
|
self._reading = false;
|
|
if (self.push(data)) self._read(hwm);
|
|
}
|
|
}
|
|
|
|
Class.prototype._from = read || noop;
|
|
Class.prototype._read = function (size) {
|
|
if (this._reading || this.destroyed) return;
|
|
this._reading = true;
|
|
this._from(size, this._callback);
|
|
};
|
|
|
|
Class.prototype.destroy = function (err) {
|
|
if (this.destroyed) return;
|
|
this.destroyed = true;
|
|
|
|
var self = this;
|
|
process.nextTick(function () {
|
|
if (err) self.emit('error', err);
|
|
self.emit('close');
|
|
});
|
|
};
|
|
|
|
return Class;
|
|
}
|
|
|
|
function obj(opts, read) {
|
|
if (typeof opts === 'function' || Array.isArray(opts)) {
|
|
read = opts;
|
|
opts = {};
|
|
}
|
|
|
|
opts = defaults(opts);
|
|
opts.objectMode = true;
|
|
opts.highWaterMark = 16;
|
|
|
|
return from2(opts, read);
|
|
}
|
|
|
|
function noop() {}
|
|
|
|
function defaults(opts) {
|
|
opts = opts || {};
|
|
return opts;
|
|
}
|