| var Transform = require('readable-stream').Transform; | 
| var inherits = require('inherits'); | 
| var cyclist = require('cyclist'); | 
| var util = require('util'); | 
|   | 
| var ParallelTransform = function(maxParallel, opts, ontransform) { | 
|     if (!(this instanceof ParallelTransform)) return new ParallelTransform(maxParallel, opts, ontransform); | 
|   | 
|     if (typeof maxParallel === 'function') { | 
|         ontransform = maxParallel; | 
|         opts = null; | 
|         maxParallel = 1; | 
|     } | 
|     if (typeof opts === 'function') { | 
|         ontransform = opts; | 
|         opts = null; | 
|     } | 
|   | 
|     if (!opts) opts = {}; | 
|     if (!opts.highWaterMark) opts.highWaterMark = Math.max(maxParallel, 16); | 
|     if (opts.objectMode !== false) opts.objectMode = true; | 
|   | 
|     Transform.call(this, opts); | 
|   | 
|     this._maxParallel = maxParallel; | 
|     this._ontransform = ontransform; | 
|     this._destroyed = false; | 
|     this._flushed = false; | 
|     this._ordered = opts.ordered !== false; | 
|     this._buffer = this._ordered ? cyclist(maxParallel) : []; | 
|     this._top = 0; | 
|     this._bottom = 0; | 
|     this._ondrain = null; | 
| }; | 
|   | 
| inherits(ParallelTransform, Transform); | 
|   | 
| ParallelTransform.prototype.destroy = function() { | 
|     if (this._destroyed) return; | 
|     this._destroyed = true; | 
|     this.emit('close'); | 
| }; | 
|   | 
| ParallelTransform.prototype._transform = function(chunk, enc, callback) { | 
|     var self = this; | 
|     var pos = this._top++; | 
|   | 
|     this._ontransform(chunk, function(err, data) { | 
|         if (self._destroyed) return; | 
|         if (err) { | 
|             self.emit('error', err); | 
|             self.push(null); | 
|             self.destroy(); | 
|             return; | 
|         } | 
|         if (self._ordered) { | 
|             self._buffer.put(pos, (data === undefined || data === null) ? null : data); | 
|         } | 
|         else { | 
|             self._buffer.push(data); | 
|         } | 
|         self._drain(); | 
|     }); | 
|   | 
|     if (this._top - this._bottom < this._maxParallel) return callback(); | 
|     this._ondrain = callback; | 
| }; | 
|   | 
| ParallelTransform.prototype._flush = function(callback) { | 
|     this._flushed = true; | 
|     this._ondrain = callback; | 
|     this._drain(); | 
| }; | 
|   | 
| ParallelTransform.prototype._drain = function() { | 
|     if (this._ordered) { | 
|         while (this._buffer.get(this._bottom) !== undefined) { | 
|             var data = this._buffer.del(this._bottom++); | 
|             if (data === null) continue; | 
|             this.push(data); | 
|         } | 
|     } | 
|     else { | 
|         while (this._buffer.length > 0) { | 
|             var data =  this._buffer.pop(); | 
|             this._bottom++; | 
|             if (data === null) continue; | 
|             this.push(data); | 
|         } | 
|     } | 
|   | 
|   | 
|     if (!this._drained() || !this._ondrain) return; | 
|   | 
|     var ondrain = this._ondrain; | 
|     this._ondrain = null; | 
|     ondrain(); | 
| }; | 
|   | 
| ParallelTransform.prototype._drained = function() { | 
|     var diff = this._top - this._bottom; | 
|     return this._flushed ? !diff : diff < this._maxParallel; | 
| }; | 
|   | 
| module.exports = ParallelTransform; |