| var tape = require('tape') | 
| var through = require('through2') | 
| var pumpify = require('./') | 
| var stream = require('stream') | 
| var duplexify = require('duplexify') | 
|   | 
| tape('basic', function(t) { | 
|   t.plan(3) | 
|   | 
|   var pipeline = pumpify( | 
|     through(function(data, enc, cb) { | 
|       t.same(data.toString(), 'hello') | 
|       cb(null, data.toString().toUpperCase()) | 
|     }), | 
|     through(function(data, enc, cb) { | 
|       t.same(data.toString(), 'HELLO') | 
|       cb(null, data.toString().toLowerCase()) | 
|     }) | 
|   ) | 
|   | 
|   pipeline.write('hello') | 
|   pipeline.on('data', function(data) { | 
|     t.same(data.toString(), 'hello') | 
|     t.end() | 
|   }) | 
| }) | 
|   | 
| tape('3 times', function(t) { | 
|   t.plan(4) | 
|   | 
|   var pipeline = pumpify( | 
|     through(function(data, enc, cb) { | 
|       t.same(data.toString(), 'hello') | 
|       cb(null, data.toString().toUpperCase()) | 
|     }), | 
|     through(function(data, enc, cb) { | 
|       t.same(data.toString(), 'HELLO') | 
|       cb(null, data.toString().toLowerCase()) | 
|     }), | 
|     through(function(data, enc, cb) { | 
|       t.same(data.toString(), 'hello') | 
|       cb(null, data.toString().toUpperCase()) | 
|     }) | 
|   ) | 
|   | 
|   pipeline.write('hello') | 
|   pipeline.on('data', function(data) { | 
|     t.same(data.toString(), 'HELLO') | 
|     t.end() | 
|   }) | 
| }) | 
|   | 
| tape('destroy', function(t) { | 
|   var test = through() | 
|   test.destroy = function() { | 
|     t.ok(true) | 
|     t.end() | 
|   } | 
|   | 
|   var pipeline = pumpify(through(), test) | 
|   | 
|   pipeline.destroy() | 
| }) | 
|   | 
| tape('close', function(t) { | 
|   var test = through() | 
|   var pipeline = pumpify(through(), test) | 
|   | 
|   pipeline.on('error', function(err) { | 
|     t.same(err.message, 'lol') | 
|     t.end() | 
|   }) | 
|   | 
|   test.emit('error', new Error('lol')) | 
| }) | 
|   | 
| tape('end waits for last one', function(t) { | 
|   var ran = false | 
|   | 
|   var a = through() | 
|   var b = through() | 
|   var c = through(function(data, enc, cb) { | 
|     setTimeout(function() { | 
|       ran = true | 
|       cb() | 
|     }, 100) | 
|   }) | 
|   | 
|   var pipeline = pumpify(a, b, c) | 
|   | 
|   pipeline.write('foo') | 
|   pipeline.end(function() { | 
|     t.ok(ran) | 
|     t.end() | 
|   }) | 
|   | 
|   t.ok(!ran) | 
| }) | 
|   | 
| tape('always wait for finish', function(t) { | 
|   var a = new stream.Readable() | 
|   a._read = function() {} | 
|   a.push('hello') | 
|   | 
|   var pipeline = pumpify(a, through(), through()) | 
|   var ran = false | 
|   | 
|   pipeline.on('finish', function() { | 
|     t.ok(ran) | 
|     t.end() | 
|   }) | 
|   | 
|   setTimeout(function() { | 
|     ran = true | 
|     a.push(null) | 
|   }, 100) | 
| }) | 
|   | 
| tape('async', function(t) { | 
|   var pipeline = pumpify() | 
|   | 
|   t.plan(4) | 
|   | 
|   pipeline.write('hello') | 
|   pipeline.on('data', function(data) { | 
|     t.same(data.toString(), 'HELLO') | 
|     t.end() | 
|   }) | 
|   | 
|   setTimeout(function() { | 
|     pipeline.setPipeline( | 
|       through(function(data, enc, cb) { | 
|         t.same(data.toString(), 'hello') | 
|         cb(null, data.toString().toUpperCase()) | 
|       }), | 
|       through(function(data, enc, cb) { | 
|         t.same(data.toString(), 'HELLO') | 
|         cb(null, data.toString().toLowerCase()) | 
|       }), | 
|       through(function(data, enc, cb) { | 
|         t.same(data.toString(), 'hello') | 
|         cb(null, data.toString().toUpperCase()) | 
|       }) | 
|     ) | 
|   }, 100) | 
| }) | 
|   | 
| tape('early destroy', function(t) { | 
|   var a = through() | 
|   var b = through() | 
|   var c = through() | 
|   | 
|   b.destroy = function() { | 
|     t.ok(true) | 
|     t.end() | 
|   } | 
|   | 
|   var pipeline = pumpify() | 
|   | 
|   pipeline.destroy() | 
|   setTimeout(function() { | 
|     pipeline.setPipeline(a, b, c) | 
|   }, 100) | 
| }) | 
|   | 
| tape('preserves error', function (t) { | 
|   var a = through() | 
|   var b = through(function (data, enc, cb) { | 
|     cb(new Error('stop')) | 
|   }) | 
|   var c = through() | 
|   var s = pumpify() | 
|   | 
|   s.on('error', function (err) { | 
|     t.same(err.message, 'stop') | 
|     t.end() | 
|   }) | 
|   | 
|   s.setPipeline(a, b, c) | 
|   s.resume() | 
|   s.write('hi') | 
| }) | 
|   | 
| tape('preserves error again', function (t) { | 
|   var ws = new stream.Writable() | 
|   var rs = new stream.Readable({highWaterMark: 16}) | 
|   | 
|   ws._write = function (data, enc, cb) { | 
|     cb(null) | 
|   } | 
|   | 
|   rs._read = function () { | 
|     process.nextTick(function () { | 
|       rs.push('hello world') | 
|     }) | 
|   } | 
|   | 
|   var pumpifyErr = pumpify( | 
|     through(), | 
|     through(function(chunk, _, cb) { | 
|       cb(new Error('test')) | 
|     }), | 
|     ws | 
|   ) | 
|   | 
|   rs.pipe(pumpifyErr) | 
|     .on('error', function (err) { | 
|       t.ok(err) | 
|       t.ok(err.message !== 'premature close', 'does not close with premature close') | 
|       t.end() | 
|     }) | 
| }) | 
|   | 
| tape('returns error from duplexify', function (t) { | 
|   var a = through() | 
|   var b = duplexify() | 
|   var s = pumpify() | 
|   | 
|   s.setPipeline(a, b) | 
|   | 
|   s.on('error', function (err) { | 
|     t.same(err.message, 'stop') | 
|     t.end() | 
|   }) | 
|   | 
|   s.write('data') | 
|   // Test passes if `.end()` is not called | 
|   s.end() | 
|   | 
|   b.setWritable(through()) | 
|   | 
|   setImmediate(function () { | 
|     b.destroy(new Error('stop')) | 
|   }) | 
| }) |