| var Stream = require('stream') | 
|   | 
| // through | 
| // | 
| // a stream that does nothing but re-emit the input. | 
| // useful for aggregating a series of changing but not ending streams into one stream) | 
|   | 
| exports = module.exports = through | 
| through.through = through | 
|   | 
| //create a readable writable stream. | 
|   | 
| function through (write, end, opts) { | 
|   write = write || function (data) { this.queue(data) } | 
|   end = end || function () { this.queue(null) } | 
|   | 
|   var ended = false, destroyed = false, buffer = [], _ended = false | 
|   var stream = new Stream() | 
|   stream.readable = stream.writable = true | 
|   stream.paused = false | 
|   | 
| //  stream.autoPause   = !(opts && opts.autoPause   === false) | 
|   stream.autoDestroy = !(opts && opts.autoDestroy === false) | 
|   | 
|   stream.write = function (data) { | 
|     write.call(this, data) | 
|     return !stream.paused | 
|   } | 
|   | 
|   function drain() { | 
|     while(buffer.length && !stream.paused) { | 
|       var data = buffer.shift() | 
|       if(null === data) | 
|         return stream.emit('end') | 
|       else | 
|         stream.emit('data', data) | 
|     } | 
|   } | 
|   | 
|   stream.queue = stream.push = function (data) { | 
| //    console.error(ended) | 
|     if(_ended) return stream | 
|     if(data === null) _ended = true | 
|     buffer.push(data) | 
|     drain() | 
|     return stream | 
|   } | 
|   | 
|   //this will be registered as the first 'end' listener | 
|   //must call destroy next tick, to make sure we're after any | 
|   //stream piped from here. | 
|   //this is only a problem if end is not emitted synchronously. | 
|   //a nicer way to do this is to make sure this is the last listener for 'end' | 
|   | 
|   stream.on('end', function () { | 
|     stream.readable = false | 
|     if(!stream.writable && stream.autoDestroy) | 
|       process.nextTick(function () { | 
|         stream.destroy() | 
|       }) | 
|   }) | 
|   | 
|   function _end () { | 
|     stream.writable = false | 
|     end.call(stream) | 
|     if(!stream.readable && stream.autoDestroy) | 
|       stream.destroy() | 
|   } | 
|   | 
|   stream.end = function (data) { | 
|     if(ended) return | 
|     ended = true | 
|     if(arguments.length) stream.write(data) | 
|     _end() // will emit or queue | 
|     return stream | 
|   } | 
|   | 
|   stream.destroy = function () { | 
|     if(destroyed) return | 
|     destroyed = true | 
|     ended = true | 
|     buffer.length = 0 | 
|     stream.writable = stream.readable = false | 
|     stream.emit('close') | 
|     return stream | 
|   } | 
|   | 
|   stream.pause = function () { | 
|     if(stream.paused) return | 
|     stream.paused = true | 
|     return stream | 
|   } | 
|   | 
|   stream.resume = function () { | 
|     if(stream.paused) { | 
|       stream.paused = false | 
|       stream.emit('resume') | 
|     } | 
|     drain() | 
|     //may have become paused again, | 
|     //as drain emits 'data'. | 
|     if(!stream.paused) | 
|       stream.emit('drain') | 
|     return stream | 
|   } | 
|   return stream | 
| } |