| 'use strict' | 
|   | 
| var transport = require('../../../spdy-transport') | 
| var utils = transport.utils | 
|   | 
| var assert = require('assert') | 
| var util = require('util') | 
| var debug = require('debug')('spdy:scheduler') | 
| var Readable = require('readable-stream').Readable | 
|   | 
| /* | 
|  * We create following structure in `pending`: | 
|  * [ [ id = 0 ], [ id = 1 ], [ id = 2 ], [ id = 0 ] ] | 
|  *     chunks      chunks      chunks      chunks | 
|  *     chunks                  chunks | 
|  *     chunks | 
|  * | 
|  * Then on the `.tick()` pass we pick one chunks from each item and remove the | 
|  * item if it is empty: | 
|  * | 
|  * [ [ id = 0 ], [ id = 2 ] ] | 
|  *     chunks      chunks | 
|  *     chunks | 
|  * | 
|  * Writing out: chunks for 0, chunks for 1, chunks for 2, chunks for 0 | 
|  * | 
|  * This way data is interleaved between the different streams. | 
|  */ | 
|   | 
| function Scheduler (options) { | 
|   Readable.call(this) | 
|   | 
|   // Pretty big window by default | 
|   this.window = 0.25 | 
|   | 
|   if (options && options.window) { this.window = options.window } | 
|   | 
|   this.sync = [] | 
|   this.list = [] | 
|   this.count = 0 | 
|   this.pendingTick = false | 
| } | 
| util.inherits(Scheduler, Readable) | 
| module.exports = Scheduler | 
|   | 
| // Just for testing, really | 
| Scheduler.create = function create (options) { | 
|   return new Scheduler(options) | 
| } | 
|   | 
| function insertCompare (a, b) { | 
|   return a.priority === b.priority | 
|     ? a.stream - b.stream | 
|     : b.priority - a.priority | 
| } | 
|   | 
| Scheduler.prototype.schedule = function schedule (data) { | 
|   var priority = data.priority | 
|   var stream = data.stream | 
|   var chunks = data.chunks | 
|   | 
|   // Synchronous frames should not be interleaved | 
|   if (priority === false) { | 
|     debug('queue sync', chunks) | 
|     this.sync.push(data) | 
|     this.count += chunks.length | 
|   | 
|     this._read() | 
|     return | 
|   } | 
|   | 
|   debug('queue async priority=%d stream=%d', priority, stream, chunks) | 
|   var item = new SchedulerItem(stream, priority) | 
|   var index = utils.binaryLookup(this.list, item, insertCompare) | 
|   | 
|   // Push new item | 
|   if (index >= this.list.length || insertCompare(this.list[index], item) !== 0) { | 
|     this.list.splice(index, 0, item) | 
|   } else { // Coalesce | 
|     item = this.list[index] | 
|   } | 
|   | 
|   item.push(data) | 
|   | 
|   this.count += chunks.length | 
|   | 
|   this._read() | 
| } | 
|   | 
| Scheduler.prototype._read = function _read () { | 
|   if (this.count === 0) { | 
|     return | 
|   } | 
|   | 
|   if (this.pendingTick) { | 
|     return | 
|   } | 
|   this.pendingTick = true | 
|   | 
|   var self = this | 
|   process.nextTick(function () { | 
|     self.pendingTick = false | 
|     self.tick() | 
|   }) | 
| } | 
|   | 
| Scheduler.prototype.tick = function tick () { | 
|   // No luck for async frames | 
|   if (!this.tickSync()) { return false } | 
|   | 
|   return this.tickAsync() | 
| } | 
|   | 
| Scheduler.prototype.tickSync = function tickSync () { | 
|   // Empty sync queue first | 
|   var sync = this.sync | 
|   var res = true | 
|   this.sync = [] | 
|   for (var i = 0; i < sync.length; i++) { | 
|     var item = sync[i] | 
|     debug('tick sync pending=%d', this.count, item.chunks) | 
|     for (var j = 0; j < item.chunks.length; j++) { | 
|       this.count-- | 
|       // TODO: handle stream backoff properly | 
|       try { | 
|         res = this.push(item.chunks[j]) | 
|       } catch (err) { | 
|         this.emit('error', err) | 
|         return false | 
|       } | 
|     } | 
|     debug('after tick sync pending=%d', this.count) | 
|   | 
|     // TODO(indutny): figure out the way to invoke callback on actual write | 
|     if (item.callback) { | 
|       item.callback(null) | 
|     } | 
|   } | 
|   return res | 
| } | 
|   | 
| Scheduler.prototype.tickAsync = function tickAsync () { | 
|   var res = true | 
|   var list = this.list | 
|   if (list.length === 0) { | 
|     return res | 
|   } | 
|   | 
|   var startPriority = list[0].priority | 
|   for (var index = 0; list.length > 0; index++) { | 
|     // Loop index | 
|     index %= list.length | 
|     if (startPriority - list[index].priority > this.window) { index = 0 } | 
|     debug('tick async index=%d start=%d', index, startPriority) | 
|   | 
|     var current = list[index] | 
|     var item = current.shift() | 
|   | 
|     if (current.isEmpty()) { | 
|       list.splice(index, 1) | 
|       if (index === 0 && list.length > 0) { | 
|         startPriority = list[0].priority | 
|       } | 
|       index-- | 
|     } | 
|   | 
|     debug('tick async pending=%d', this.count, item.chunks) | 
|     for (var i = 0; i < item.chunks.length; i++) { | 
|       this.count-- | 
|       // TODO: handle stream backoff properly | 
|       try { | 
|         res = this.push(item.chunks[i]) | 
|       } catch (err) { | 
|         this.emit('error', err) | 
|         return false | 
|       } | 
|     } | 
|     debug('after tick pending=%d', this.count) | 
|   | 
|     // TODO(indutny): figure out the way to invoke callback on actual write | 
|     if (item.callback) { | 
|       item.callback(null) | 
|     } | 
|     if (!res) { break } | 
|   } | 
|   | 
|   return res | 
| } | 
|   | 
| Scheduler.prototype.dump = function dump () { | 
|   this.tickSync() | 
|   | 
|   // Write everything out | 
|   while (!this.tickAsync()) { | 
|     // Intentional no-op | 
|   } | 
|   assert.strictEqual(this.count, 0) | 
| } | 
|   | 
| function SchedulerItem (stream, priority) { | 
|   this.stream = stream | 
|   this.priority = priority | 
|   this.queue = [] | 
| } | 
|   | 
| SchedulerItem.prototype.push = function push (chunks) { | 
|   this.queue.push(chunks) | 
| } | 
|   | 
| SchedulerItem.prototype.shift = function shift () { | 
|   return this.queue.shift() | 
| } | 
|   | 
| SchedulerItem.prototype.isEmpty = function isEmpty () { | 
|   return this.queue.length === 0 | 
| } |