'use strict'; module.exports = PullStream; require("setimmediate"); var inherits = require("util").inherits; var PassThrough = require('readable-stream/passthrough'); var over = require('over'); var SliceStream = require('slice-stream'); function PullStream(opts) { var self = this; this.opts = opts || {}; PassThrough.call(this, opts); this.once('finish', function() { self._writesFinished = true; if (self._flushed) { self._finish(); } }); this.on('readable', function() { self._process(); }); } inherits(PullStream, PassThrough); PullStream.prototype.pull = over([ [over.numberOptionalWithDefault(null), over.func, function (len, callback) { if (len === 0) { return callback(null, new Buffer(0)); } var self = this; pullServiceRequest(); function pullServiceRequest() { self._serviceRequests = null; if (self._flushed) { return callback(new Error('End of Stream')); } var data = self.read(len || undefined); if (data) { setImmediate(callback.bind(null, null, data)); } else { self._serviceRequests = pullServiceRequest; } } }] ]); PullStream.prototype.pullUpTo = over([ [over.numberOptionalWithDefault(null), function (len) { var data = this.read(len); if (len && !data) { data = this.read(); } return data; }] ]); PullStream.prototype.pipe = over([ [over.numberOptionalWithDefault(null), over.object, function (len, destStream) { if (!len) { return PassThrough.prototype.pipe.call(this, destStream); } if (len === 0) { return destStream.end(); } var pullstream = this; pullstream .pipe(new SliceStream({ length: len }, function (buf, sliceEnd, extra) { if (!sliceEnd) { return this.push(buf); } pullstream.unpipe(); pullstream.unshift(extra); this.push(buf); return this.push(null); })) .pipe(destStream); return destStream; }] ]); PullStream.prototype._process = function () { if (this._serviceRequests) { this._serviceRequests(); } }; PullStream.prototype.prepend = function (chunk) { this.unshift(chunk); }; PullStream.prototype.drain = function (len, callback) { if (this._flushed) { return callback(new Error('End of Stream')); } var data = this.pullUpTo(len); var bytesDrained = data && data.length || 0; if (bytesDrained === len) { setImmediate(callback); } else if (bytesDrained > 0) { this.drain(len - bytesDrained, callback); } else { //internal buffer is empty, wait until data can be consumed this.once('readable', this.drain.bind(this, len - bytesDrained, callback)); } }; PullStream.prototype._flush = function (callback) { var self = this; if (this._readableState.length > 0) { return setImmediate(self._flush.bind(self, callback)); } this._flushed = true; if (self._writesFinished) { self._finish(callback); } else { callback(); } }; PullStream.prototype._finish = function (callback) { callback = callback || function () {}; if (this._serviceRequests) { this._serviceRequests(); } setImmediate(callback); };