thedesk/app/node_modules/pullstream/pullstream.js
2018-02-18 16:29:06 +09:00

138 lines
3.1 KiB
JavaScript

'use strict';
module.exports = PullStream;
require("setimmediate");
var inherits = require("util").inherits;
var PassThrough = require('readable-stream/passthrough');
var over = require('over');
var SliceStream = require('slice-stream');
function PullStream(opts) {
var self = this;
this.opts = opts || {};
PassThrough.call(this, opts);
this.once('finish', function() {
self._writesFinished = true;
if (self._flushed) {
self._finish();
}
});
this.on('readable', function() {
self._process();
});
}
inherits(PullStream, PassThrough);
PullStream.prototype.pull = over([
[over.numberOptionalWithDefault(null), over.func, function (len, callback) {
if (len === 0) {
return callback(null, new Buffer(0));
}
var self = this;
pullServiceRequest();
function pullServiceRequest() {
self._serviceRequests = null;
if (self._flushed) {
return callback(new Error('End of Stream'));
}
var data = self.read(len || undefined);
if (data) {
setImmediate(callback.bind(null, null, data));
} else {
self._serviceRequests = pullServiceRequest;
}
}
}]
]);
PullStream.prototype.pullUpTo = over([
[over.numberOptionalWithDefault(null), function (len) {
var data = this.read(len);
if (len && !data) {
data = this.read();
}
return data;
}]
]);
PullStream.prototype.pipe = over([
[over.numberOptionalWithDefault(null), over.object, function (len, destStream) {
if (!len) {
return PassThrough.prototype.pipe.call(this, destStream);
}
if (len === 0) {
return destStream.end();
}
var pullstream = this;
pullstream
.pipe(new SliceStream({ length: len }, function (buf, sliceEnd, extra) {
if (!sliceEnd) {
return this.push(buf);
}
pullstream.unpipe();
pullstream.unshift(extra);
this.push(buf);
return this.push(null);
}))
.pipe(destStream);
return destStream;
}]
]);
PullStream.prototype._process = function () {
if (this._serviceRequests) {
this._serviceRequests();
}
};
PullStream.prototype.prepend = function (chunk) {
this.unshift(chunk);
};
PullStream.prototype.drain = function (len, callback) {
if (this._flushed) {
return callback(new Error('End of Stream'));
}
var data = this.pullUpTo(len);
var bytesDrained = data && data.length || 0;
if (bytesDrained === len) {
setImmediate(callback);
} else if (bytesDrained > 0) {
this.drain(len - bytesDrained, callback);
} else {
//internal buffer is empty, wait until data can be consumed
this.once('readable', this.drain.bind(this, len - bytesDrained, callback));
}
};
PullStream.prototype._flush = function (callback) {
var self = this;
if (this._readableState.length > 0) {
return setImmediate(self._flush.bind(self, callback));
}
this._flushed = true;
if (self._writesFinished) {
self._finish(callback);
} else {
callback();
}
};
PullStream.prototype._finish = function (callback) {
callback = callback || function () {};
if (this._serviceRequests) {
this._serviceRequests();
}
setImmediate(callback);
};