| var util = require('util'); |
| var events = require('events').EventEmitter; |
| |
| var qjob = function(options) { |
| |
| if(false === (this instanceof qjob)) { |
| return new qjob(options); |
| } |
| |
| this.maxConcurrency = 10; |
| this.jobsRunning = 0; |
| this.jobsDone = 0; |
| this.jobsTotal = 0; |
| this.timeStart; |
| this.jobId = 0; |
| this.jobsList = []; |
| this.paused = false; |
| this.pausedId = null; |
| this.lastPause = 0; |
| |
| this.interval = null; |
| this.stopAdding = false; |
| this.sleeping = false; |
| |
| this.aborting = false; |
| |
| if (options) { |
| this.maxConcurrency = options.maxConcurrency || this.maxConcurrency; |
| this.interval = options.interval || this.interval; |
| } |
| events.call(this); |
| }; |
| |
| util.inherits(qjob, events); |
| |
| /* |
| * helper to set max concurrency |
| */ |
| qjob.prototype.setConcurrency = function(max) { |
| this.maxConcurrency = max; |
| } |
| |
| /* |
| * helper to set delay between rafales |
| */ |
| qjob.prototype.setInterval = function(delay) { |
| this.interval = delay; |
| } |
| |
| /* |
| * add some jobs in the queue |
| */ |
| qjob.prototype.add = function(job,args) { |
| var self = this; |
| self.jobsList.push([job,args]); |
| self.jobsTotal++; |
| } |
| |
| /* |
| * |
| */ |
| qjob.prototype.sleepDueToInterval = function() { |
| var self = this; |
| |
| if (this.interval === null) { |
| return; |
| } |
| |
| if (this.sleeping) { |
| return true; |
| } |
| |
| if (this.stopAdding) { |
| |
| if (this.jobsRunning > 0) { |
| //console.log('waiting for '+jobsRunning+' jobs to finish'); |
| return true; |
| } |
| |
| //console.log('waiting for '+rafaleDelay+' ms'); |
| this.sleeping = true; |
| self.emit('sleep'); |
| |
| setTimeout(function() { |
| this.stopAdding = false; |
| this.sleeping = false; |
| self.emit('continu'); |
| self.run(); |
| }.bind(self),this.interval); |
| |
| return true; |
| } |
| |
| if (this.jobsRunning + 1 == this.maxConcurrency) { |
| //console.log('max concurrent jobs reached'); |
| this.stopAdding = true; |
| return true; |
| } |
| } |
| |
| /* |
| * run the queue |
| */ |
| qjob.prototype.run = function() { |
| |
| var self = this; |
| |
| // first launch, let's emit start event |
| if (this.jobsDone == 0) { |
| self.emit('start'); |
| this.timeStart = Date.now(); |
| } |
| |
| if (self.sleepDueToInterval()) return; |
| |
| if (self.aborting) { |
| this.jobsList = []; |
| } |
| |
| // while queue is empty and number of job running |
| // concurrently are less than max job running, |
| // then launch the next job |
| |
| while (this.jobsList.length && this.jobsRunning < this.maxConcurrency) { |
| // get the next job and |
| // remove it from the queue |
| var job = self.jobsList.shift(); |
| |
| // increment number of job running |
| self.jobsRunning++; |
| |
| // fetch args for the job |
| var args = job[1]; |
| |
| // add jobId in args |
| args._jobId = this.jobId++; |
| |
| // emit jobStart event |
| self.emit('jobStart',args); |
| |
| // run the job |
| setTimeout(function() { |
| this.j(this.args,self.next.bind(self,this.args)); |
| }.bind({j:job[0],args:args}),1); |
| } |
| |
| // all jobs done ? emit end event |
| if (this.jobsList.length == 0 && this.jobsRunning == 0) { |
| self.emit('end'); |
| } |
| } |
| |
| /* |
| * a task has been terminated, |
| * so 'next()' has been called |
| */ |
| qjob.prototype.next = function(args) { |
| |
| var self = this; |
| |
| // update counters |
| this.jobsRunning--; |
| this.jobsDone++; |
| |
| // emit 'jobEnd' event |
| self.emit('jobEnd',args); |
| |
| // if queue has been set to pause |
| // then do nothing |
| if (this.paused) return; |
| |
| // else, execute run() function |
| self.run(); |
| } |
| |
| /* |
| * You can 'pause' jobs. |
| * it will not pause running jobs, but |
| * it will stop launching pending jobs |
| * until paused = false |
| */ |
| qjob.prototype.pause = function(status) { |
| var self = this; |
| this.paused = status; |
| if (!this.paused && this.pausedId) { |
| clearInterval(this.pausedId); |
| self.emit('unpause'); |
| this.run(); |
| } |
| if (this.paused && !this.pausedId) { |
| self.lastPause = Date.now(); |
| this.pausedId = setInterval(function() { |
| var since = Date.now() - self.lastPause; |
| self.emit('pause',since); |
| },1000); |
| return; |
| } |
| } |
| |
| qjob.prototype.stats = function() { |
| |
| var now = Date.now(); |
| |
| var o = {}; |
| o._timeStart = this.timeStart || 'N/A'; |
| o._timeElapsed = (now - this.timeStart) || 'N/A'; |
| o._jobsTotal = this.jobsTotal; |
| o._jobsRunning = this.jobsRunning; |
| o._jobsDone = this.jobsDone; |
| o._progress = Math.floor((this.jobsDone/this.jobsTotal)*100); |
| o._concurrency = this.maxConcurrency; |
| |
| if (this.paused) { |
| o._status = 'Paused'; |
| return o; |
| } |
| |
| if (o._timeElapsed == 'N/A') { |
| o._status = 'Starting'; |
| return o; |
| } |
| |
| if (this.jobsTotal == this.jobsDone) { |
| o._status = 'Finished'; |
| return o; |
| } |
| |
| o._status = 'Running'; |
| return o; |
| } |
| |
| qjob.prototype.abort = function() { |
| this.aborting = true; |
| } |
| |
| module.exports = qjob; |