| 'use strict' | 
|   | 
| const DEFAULT_OPTIONS = { | 
|           workerOptions               : {} | 
|         , maxCallsPerWorker           : Infinity | 
|         , maxConcurrentWorkers        : (require('os').cpus() || { length: 1 }).length | 
|         , maxConcurrentCallsPerWorker : 10 | 
|         , maxConcurrentCalls          : Infinity | 
|         , maxCallTime                 : Infinity // exceed this and the whole worker is terminated | 
|         , maxRetries                  : Infinity | 
|         , forcedKillTime              : 100 | 
|         , autoStart                   : false | 
|         , onChild                     : function() {} | 
|       } | 
|   | 
| const fork                    = require('./fork') | 
|     , TimeoutError            = require('errno').create('TimeoutError') | 
|     , ProcessTerminatedError  = require('errno').create('ProcessTerminatedError') | 
|     , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError') | 
|   | 
|   | 
| function Farm (options, path) { | 
|   this.options     = Object.assign({}, DEFAULT_OPTIONS, options) | 
|   this.path        = path | 
|   this.activeCalls = 0 | 
| } | 
|   | 
|   | 
| // make a handle to pass back in the form of an external API | 
| Farm.prototype.mkhandle = function (method) { | 
|   return function () { | 
|     let args = Array.prototype.slice.call(arguments) | 
|     if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) { | 
|       let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')') | 
|       if (typeof args[args.length - 1] == 'function') | 
|         return process.nextTick(args[args.length - 1].bind(null, err)) | 
|       throw err | 
|     } | 
|     this.addCall({ | 
|         method   : method | 
|       , callback : args.pop() | 
|       , args     : args | 
|       , retries  : 0 | 
|     }) | 
|   }.bind(this) | 
| } | 
|   | 
|   | 
| // a constructor of sorts | 
| Farm.prototype.setup = function (methods) { | 
|   let iface | 
|   if (!methods) { // single-function export | 
|     iface = this.mkhandle() | 
|   } else { // multiple functions on the export | 
|     iface = {} | 
|     methods.forEach(function (m) { | 
|       iface[m] = this.mkhandle(m) | 
|     }.bind(this)) | 
|   } | 
|   | 
|   this.searchStart    = -1 | 
|   this.childId        = -1 | 
|   this.children       = {} | 
|   this.activeChildren = 0 | 
|   this.callQueue      = [] | 
|   | 
|   if (this.options.autoStart) { | 
|     while (this.activeChildren < this.options.maxConcurrentWorkers) | 
|       this.startChild() | 
|   } | 
|   | 
|   return iface | 
| } | 
|   | 
|   | 
| // when a child exits, check if there are any outstanding jobs and requeue them | 
| Farm.prototype.onExit = function (childId) { | 
|   // delay this to give any sends a chance to finish | 
|   setTimeout(function () { | 
|     let doQueue = false | 
|     if (this.children[childId] && this.children[childId].activeCalls) { | 
|       this.children[childId].calls.forEach(function (call, i) { | 
|         if (!call) return | 
|         else if (call.retries >= this.options.maxRetries) { | 
|           this.receive({ | 
|               idx   : i | 
|             , child : childId | 
|             , args  : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ] | 
|           }) | 
|         } else { | 
|           call.retries++ | 
|           this.callQueue.unshift(call) | 
|           doQueue = true | 
|         } | 
|       }.bind(this)) | 
|     } | 
|     this.stopChild(childId) | 
|     doQueue && this.processQueue() | 
|   }.bind(this), 10) | 
| } | 
|   | 
|   | 
| // start a new worker | 
| Farm.prototype.startChild = function () { | 
|   this.childId++ | 
|   | 
|   let forked = fork(this.path, this.options.workerOptions) | 
|     , id     = this.childId | 
|     , c      = { | 
|           send        : forked.send | 
|         , child       : forked.child | 
|         , calls       : [] | 
|         , activeCalls : 0 | 
|         , exitCode    : null | 
|       } | 
|   | 
|   this.options.onChild(forked.child); | 
|   | 
|   forked.child.on('message', function(data) { | 
|     if (data.owner !== 'farm') { | 
|       return; | 
|     } | 
|     this.receive(data); | 
|   }.bind(this)) | 
|   forked.child.once('exit', function (code) { | 
|     c.exitCode = code | 
|     this.onExit(id) | 
|   }.bind(this)) | 
|   | 
|   this.activeChildren++ | 
|   this.children[id] = c | 
| } | 
|   | 
|   | 
| // stop a worker, identified by id | 
| Farm.prototype.stopChild = function (childId) { | 
|   let child = this.children[childId] | 
|   if (child) { | 
|     child.send({owner: 'farm', event: 'die'}) | 
|     setTimeout(function () { | 
|       if (child.exitCode === null) | 
|         child.child.kill('SIGKILL') | 
|     }, this.options.forcedKillTime).unref() | 
|     ;delete this.children[childId] | 
|     this.activeChildren-- | 
|   } | 
| } | 
|   | 
|   | 
| // called from a child process, the data contains information needed to | 
| // look up the child and the original call so we can invoke the callback | 
| Farm.prototype.receive = function (data) { | 
|   let idx     = data.idx | 
|     , childId = data.child | 
|     , args    = data.args | 
|     , child   = this.children[childId] | 
|     , call | 
|   | 
|   if (!child) { | 
|     return console.error( | 
|         'Worker Farm: Received message for unknown child. ' | 
|       + 'This is likely as a result of premature child death, ' | 
|       + 'the operation will have been re-queued.' | 
|     ) | 
|   } | 
|   | 
|   call = child.calls[idx] | 
|   if (!call) { | 
|     return console.error( | 
|         'Worker Farm: Received message for unknown index for existing child. ' | 
|       + 'This should not happen!' | 
|     ) | 
|   } | 
|   | 
|   if (this.options.maxCallTime !== Infinity) | 
|     clearTimeout(call.timer) | 
|   | 
|   if (args[0] && args[0].$error == '$error') { | 
|     let e = args[0] | 
|     switch (e.type) { | 
|       case 'TypeError': args[0] = new TypeError(e.message); break | 
|       case 'RangeError': args[0] = new RangeError(e.message); break | 
|       case 'EvalError': args[0] = new EvalError(e.message); break | 
|       case 'ReferenceError': args[0] = new ReferenceError(e.message); break | 
|       case 'SyntaxError': args[0] = new SyntaxError(e.message); break | 
|       case 'URIError': args[0] = new URIError(e.message); break | 
|       default: args[0] = new Error(e.message) | 
|     } | 
|     args[0].type = e.type | 
|     args[0].stack = e.stack | 
|   | 
|     // Copy any custom properties to pass it on. | 
|     Object.keys(e).forEach(function(key) { | 
|       args[0][key] = e[key]; | 
|     }); | 
|   } | 
|   | 
|   process.nextTick(function () { | 
|     call.callback.apply(null, args) | 
|   }) | 
|   | 
|   ;delete child.calls[idx] | 
|   child.activeCalls-- | 
|   this.activeCalls-- | 
|   | 
|   if (child.calls.length >= this.options.maxCallsPerWorker | 
|       && !Object.keys(child.calls).length) { | 
|     // this child has finished its run, kill it | 
|     this.stopChild(childId) | 
|   } | 
|   | 
|   // allow any outstanding calls to be processed | 
|   this.processQueue() | 
| } | 
|   | 
|   | 
| Farm.prototype.childTimeout = function (childId) { | 
|   let child = this.children[childId] | 
|     , i | 
|   | 
|   if (!child) | 
|     return | 
|   | 
|   for (i in child.calls) { | 
|     this.receive({ | 
|         idx   : i | 
|       , child : childId | 
|       , args  : [ new TimeoutError('worker call timed out!') ] | 
|     }) | 
|   } | 
|   this.stopChild(childId) | 
| } | 
|   | 
|   | 
| // send a call to a worker, identified by id | 
| Farm.prototype.send = function (childId, call) { | 
|   let child = this.children[childId] | 
|     , idx   = child.calls.length | 
|   | 
|   child.calls.push(call) | 
|   child.activeCalls++ | 
|   this.activeCalls++ | 
|   | 
|   child.send({ | 
|       owner  : 'farm' | 
|     , idx    : idx | 
|     , child  : childId | 
|     , method : call.method | 
|     , args   : call.args | 
|   }) | 
|   | 
|   if (this.options.maxCallTime !== Infinity) { | 
|     call.timer = | 
|       setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime) | 
|   } | 
| } | 
|   | 
|   | 
| // a list of active worker ids, in order, but the starting offset is | 
| // shifted each time this method is called, so we work our way through | 
| // all workers when handing out jobs | 
| Farm.prototype.childKeys = function () { | 
|   let cka = Object.keys(this.children) | 
|     , cks | 
|   | 
|   if (this.searchStart >= cka.length - 1) | 
|     this.searchStart = 0 | 
|   else | 
|     this.searchStart++ | 
|   | 
|   cks = cka.splice(0, this.searchStart) | 
|   | 
|   return cka.concat(cks) | 
| } | 
|   | 
|   | 
| // Calls are added to a queue, this processes the queue and is called | 
| // whenever there might be a chance to send more calls to the workers. | 
| // The various options all impact on when we're able to send calls, | 
| // they may need to be kept in a queue until a worker is ready. | 
| Farm.prototype.processQueue = function () { | 
|   let cka, i = 0, childId | 
|   | 
|   if (!this.callQueue.length) | 
|     return this.ending && this.end() | 
|   | 
|   if (this.activeChildren < this.options.maxConcurrentWorkers) | 
|     this.startChild() | 
|   | 
|   for (cka = this.childKeys(); i < cka.length; i++) { | 
|     childId = +cka[i] | 
|     if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker | 
|         && this.children[childId].calls.length < this.options.maxCallsPerWorker) { | 
|   | 
|       this.send(childId, this.callQueue.shift()) | 
|       if (!this.callQueue.length) | 
|         return this.ending && this.end() | 
|     } /*else { | 
|       console.log( | 
|         , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker | 
|         , this.children[childId].calls.length < this.options.maxCallsPerWorker | 
|         , this.children[childId].calls.length , this.options.maxCallsPerWorker) | 
|     }*/ | 
|   } | 
|   | 
|   if (this.ending) | 
|     this.end() | 
| } | 
|   | 
|   | 
| // add a new call to the call queue, then trigger a process of the queue | 
| Farm.prototype.addCall = function (call) { | 
|   if (this.ending) | 
|     return this.end() // don't add anything new to the queue | 
|   this.callQueue.push(call) | 
|   this.processQueue() | 
| } | 
|   | 
|   | 
| // kills child workers when they're all done | 
| Farm.prototype.end = function (callback) { | 
|   let complete = true | 
|   if (this.ending === false) | 
|     return | 
|   if (callback) | 
|     this.ending = callback | 
|   else if (this.ending == null) | 
|     this.ending = true | 
|   Object.keys(this.children).forEach(function (child) { | 
|     if (!this.children[child]) | 
|       return | 
|     if (!this.children[child].activeCalls) | 
|       this.stopChild(child) | 
|     else | 
|       complete = false | 
|   }.bind(this)) | 
|   | 
|   if (complete && typeof this.ending == 'function') { | 
|     process.nextTick(function () { | 
|       this.ending() | 
|       this.ending = false | 
|     }.bind(this)) | 
|   } | 
| } | 
|   | 
|   | 
| module.exports              = Farm | 
| module.exports.TimeoutError = TimeoutError |