thedesk/app/node_modules/pullstream/test/pullStreamTest.js

431 lines
10 KiB
JavaScript
Raw Normal View History

2018-02-18 18:29:06 +11:00
'use strict';
var nodeunit = require('nodeunit');
var fs = require("fs");
var path = require("path");
var streamBuffers = require("stream-buffers");
var async = require('async')
var PullStream = require('../');
module.exports = {
"source sending 1-byte at a time": function (t) {
t.expect(3);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 100
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal(' World', str);
ps.pull(function (err, data) {
if (err) {
return t.done(err);
}
t.equal('!', data.toString());
return t.done();
});
});
ps.pipe(' World'.length, writableStream);
});
},
"source sending twelve bytes at once": function (t) {
t.expect(3);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 100
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal(' World', str);
ps.pull(function (err, data) {
if (err) {
return t.done(err);
}
t.equal('!', data.toString());
return t.done();
});
});
ps.pipe(' World'.length, writableStream);
});
},
"source sending 512 bytes at once": function (t) {
t.expect(512 / 4);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('finish', function() {
sourceStream.destroy();
});
var values = [];
for (var i = 0; i < 512; i+=4) {
values.push(i + 1000);
}
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
sourceStream.pipe(ps);
values.forEach(function(val) {
sourceStream.put(val);
});
async.forEachSeries(values, function (val, callback) {
ps.pull(4, function (err, data) {
if (err) {
return callback(err);
}
t.equal(val, data.toString());
return callback(null);
});
}, function (err) {
t.done(err);
});
},
"two length pulls": function (t) {
t.expect(2);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
ps.pull(' World!'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal(' World!', data.toString());
return t.done();
});
});
},
"pulling zero bytes returns empty data": function (t) {
t.expect(1);
var ps = new PullStream({ lowWaterMark : 0 });
var sourceStream = new streamBuffers.ReadableStreamBuffer({
chunkSize: 1000
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull(0, function (err, data) {
if (err) {
return t.done(err);
}
t.equal(0, data.length, "data is empty");
sourceStream.destroy();
return t.done();
});
},
"read from file": function (t) {
t.expect(2);
var ps = new PullStream({ lowWaterMark : 0 });
var sourceStream = fs.createReadStream(path.join(__dirname, 'testFile.txt'));
sourceStream.pipe(ps);
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
ps.pull(' World!'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal(' World!', data.toString());
return t.done();
});
});
},
"read past end of stream": function (t) {
t.expect(2);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('finish', function () {
sourceStream.destroy();
});
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 1,
chunkSize: 1000
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello World!'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello World!', data.toString());
ps.pull(1, function (err, data) {
if (err) {
t.ok(err, 'should get an error');
}
t.done();
});
});
},
"pipe with no length": function (t) {
t.expect(2);
var ps = new PullStream({ lowWaterMark : 0 });
ps.on('end', function () {
t.ok(true, "pullstream should end");
});
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 100
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal('Hello World!', str);
t.done();
});
ps.pipe(writableStream);
process.nextTick(function () {
ps.write(new Buffer('Hello', 'utf8'));
ps.write(new Buffer(' World', 'utf8'));
process.nextTick(function () {
ps.write(new Buffer('!', 'utf8'));
ps.end();
});
});
},
"throw on calling write() after end": function (t) {
t.expect(1);
var ps = new PullStream({ lowWaterMark : 0 });
ps.end();
try {
ps.write(new Buffer('hello', 'utf8'));
t.fail("should throw error");
} catch (ex) {
t.ok(ex);
}
t.done();
},
"pipe more bytes than the pullstream buffer size": function (t) {
t.expect(1);
var ps = new PullStream();
ps.on('end', function() {
sourceStream.destroy();
});
var aVals = "", bVals = "";
for (var i = 0; i < 20 * 1000; i++) {
aVals += 'a';
}
for (var i = 0; i < 180 * 1000; i++) {
bVals += 'b';
}
var combined = aVals + bVals;
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 40 * 1024
});
sourceStream.pipe(ps);
sourceStream.put(aVals);
var writableStream = new streamBuffers.WritableStreamBuffer({
initialSize: 200 * 1000
});
writableStream.on('close', function () {
var str = writableStream.getContentsAsString('utf8');
t.equal(combined, str);
t.done();
});
ps.once('drain', function () {
ps.pipe(200 * 1000, writableStream);
process.nextTick(sourceStream.put.bind(null, bVals));
});
},
"mix asynchronous pull with synchronous pullUpTo - exact number of bytes returned": function (t) {
t.expect(2);
var ps = new PullStream();
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
var data = ps.pullUpTo(" World!".length);
t.equal(" World!", data.toString());
sourceStream.destroy();
t.done();
});
},
"mix asynchronous pull with pullUpTo - fewer bytes returned than requested": function (t) {
t.expect(2);
var ps = new PullStream();
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
var data = ps.pullUpTo(1000);
t.equal(" World!", data.toString());
sourceStream.destroy();
t.done();
});
},
"retrieve all currently remaining bytes": function (t) {
t.expect(2);
var ps = new PullStream();
var sourceStream = new streamBuffers.ReadableStreamBuffer({
frequency: 0,
chunkSize: 1000
});
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.pull('Hello'.length, function (err, data) {
if (err) {
return t.done(err);
}
t.equal('Hello', data.toString());
var data = ps.pullUpTo();
t.equal(" World!", data.toString());
sourceStream.destroy();
t.done();
});
},
// TODO: node PassThrough stream doesn't handle unshift the same way anymore.
// "prepend": function (t) {
// t.expect(1);
// var ps = new PullStream();
//
// var sourceStream = new streamBuffers.ReadableStreamBuffer();
//
// sourceStream.pipe(ps);
// sourceStream.put("World!");
// ps.prepend("Hello ");
//
// ps.pull('Hello World!'.length, function (err, data) {
// if (err) {
// return t.done(err);
// }
// t.equal('Hello World!', data.toString());
// sourceStream.destroy();
// t.done();
// });
// },
"drain": function (t) {
t.expect(1);
var ps = new PullStream();
var sourceStream = new streamBuffers.ReadableStreamBuffer();
sourceStream.pipe(ps);
sourceStream.put("Hello World!");
ps.drain('Hello '.length, function (err) {
if (err) {
return t.done(err);
}
ps.pull('World!'.length, function (err, data) {
t.equal('World!', data.toString());
sourceStream.destroy();
t.done();
});
});
}
};