| const Minipass = require('minipass') | 
| const EE = require('events') | 
| const isStream = s => s && s instanceof EE && ( | 
|   typeof s.pipe === 'function' || // readable | 
|   (typeof s.write === 'function' && typeof s.end === 'function') // writable | 
| ) | 
|   | 
| const _head = Symbol('_head') | 
| const _tail = Symbol('_tail') | 
| const _linkStreams = Symbol('_linkStreams') | 
| const _setHead = Symbol('_setHead') | 
| const _setTail = Symbol('_setTail') | 
| const _onError = Symbol('_onError') | 
| const _onData = Symbol('_onData') | 
| const _onEnd = Symbol('_onEnd') | 
| const _onDrain = Symbol('_onDrain') | 
| const _streams = Symbol('_streams') | 
| class Pipeline extends Minipass { | 
|   constructor (opts, ...streams) { | 
|     if (isStream(opts)) { | 
|       streams.unshift(opts) | 
|       opts = {} | 
|     } | 
|   | 
|     super(opts) | 
|     this[_streams] = [] | 
|     if (streams.length) | 
|       this.push(...streams) | 
|   } | 
|   | 
|   [_linkStreams] (streams) { | 
|     // reduce takes (left,right), and we return right to make it the | 
|     // new left value. | 
|     return streams.reduce((src, dest) => { | 
|       src.on('error', er => dest.emit('error', er)) | 
|       src.pipe(dest) | 
|       return dest | 
|     }) | 
|   } | 
|   | 
|   push (...streams) { | 
|     this[_streams].push(...streams) | 
|     if (this[_tail]) | 
|       streams.unshift(this[_tail]) | 
|   | 
|     const linkRet = this[_linkStreams](streams) | 
|   | 
|     this[_setTail](linkRet) | 
|     if (!this[_head]) | 
|       this[_setHead](streams[0]) | 
|   } | 
|   | 
|   unshift (...streams) { | 
|     this[_streams].unshift(...streams) | 
|     if (this[_head]) | 
|       streams.push(this[_head]) | 
|   | 
|     const linkRet = this[_linkStreams](streams) | 
|     this[_setHead](streams[0]) | 
|     if (!this[_tail]) | 
|       this[_setTail](linkRet) | 
|   } | 
|   | 
|   destroy (er) { | 
|     // set fire to the whole thing. | 
|     this[_streams].forEach(s => | 
|       typeof s.destroy === 'function' && s.destroy()) | 
|     return super.destroy(er) | 
|   } | 
|   | 
|   // readable interface -> tail | 
|   [_setTail] (stream) { | 
|     this[_tail] = stream | 
|     stream.on('error', er => this[_onError](stream, er)) | 
|     stream.on('data', chunk => this[_onData](stream, chunk)) | 
|     stream.on('end', () => this[_onEnd](stream)) | 
|     stream.on('finish', () => this[_onEnd](stream)) | 
|   } | 
|   | 
|   // errors proxied down the pipeline | 
|   // they're considered part of the "read" interface | 
|   [_onError] (stream, er) { | 
|     if (stream === this[_tail]) | 
|       this.emit('error', er) | 
|   } | 
|   [_onData] (stream, chunk) { | 
|     if (stream === this[_tail]) | 
|       super.write(chunk) | 
|   } | 
|   [_onEnd] (stream) { | 
|     if (stream === this[_tail]) | 
|       super.end() | 
|   } | 
|   pause () { | 
|     super.pause() | 
|     return this[_tail] && this[_tail].pause && this[_tail].pause() | 
|   } | 
|   | 
|   // NB: Minipass calls its internal private [RESUME] method during | 
|   // pipe drains, to avoid hazards where stream.resume() is overridden. | 
|   // Thus, we need to listen to the resume *event*, not override the | 
|   // resume() method, and proxy *that* to the tail. | 
|   emit (ev, ...args) { | 
|     if (ev === 'resume' && this[_tail] && this[_tail].resume) | 
|       this[_tail].resume() | 
|     return super.emit(ev, ...args) | 
|   } | 
|   | 
|   // writable interface -> head | 
|   [_setHead] (stream) { | 
|     this[_head] = stream | 
|     stream.on('drain', () => this[_onDrain](stream)) | 
|   } | 
|   [_onDrain] (stream) { | 
|     if (stream === this[_head]) | 
|       this.emit('drain') | 
|   } | 
|   write (chunk, enc, cb) { | 
|     return this[_head].write(chunk, enc, cb) && | 
|       (this.flowing || this.buffer.length === 0) | 
|   } | 
|   end (chunk, enc, cb) { | 
|     this[_head].end(chunk, enc, cb) | 
|     return this | 
|   } | 
| } | 
|   | 
| module.exports = Pipeline |