blob: dd05235b32407906c80622eeac9a18d22d9c802c [file] [log] [blame]
var util = require('util');
var events = require('events').EventEmitter;
var qjob = function(options) {
if(false === (this instanceof qjob)) {
return new qjob(options);
}
this.maxConcurrency = 10;
this.jobsRunning = 0;
this.jobsDone = 0;
this.jobsTotal = 0;
this.timeStart;
this.jobId = 0;
this.jobsList = [];
this.paused = false;
this.pausedId = null;
this.lastPause = 0;
this.interval = null;
this.stopAdding = false;
this.sleeping = false;
this.aborting = false;
if (options) {
this.maxConcurrency = options.maxConcurrency || this.maxConcurrency;
this.interval = options.interval || this.interval;
}
events.call(this);
};
util.inherits(qjob, events);
/*
* helper to set max concurrency
*/
qjob.prototype.setConcurrency = function(max) {
this.maxConcurrency = max;
}
/*
* helper to set delay between rafales
*/
qjob.prototype.setInterval = function(delay) {
this.interval = delay;
}
/*
* add some jobs in the queue
*/
qjob.prototype.add = function(job,args) {
var self = this;
self.jobsList.push([job,args]);
self.jobsTotal++;
}
/*
*
*/
qjob.prototype.sleepDueToInterval = function() {
var self = this;
if (this.interval === null) {
return;
}
if (this.sleeping) {
return true;
}
if (this.stopAdding) {
if (this.jobsRunning > 0) {
//console.log('waiting for '+jobsRunning+' jobs to finish');
return true;
}
//console.log('waiting for '+rafaleDelay+' ms');
this.sleeping = true;
self.emit('sleep');
setTimeout(function() {
this.stopAdding = false;
this.sleeping = false;
self.emit('continu');
self.run();
}.bind(self),this.interval);
return true;
}
if (this.jobsRunning + 1 == this.maxConcurrency) {
//console.log('max concurrent jobs reached');
this.stopAdding = true;
return true;
}
}
/*
* run the queue
*/
qjob.prototype.run = function() {
var self = this;
// first launch, let's emit start event
if (this.jobsDone == 0) {
self.emit('start');
this.timeStart = Date.now();
}
if (self.sleepDueToInterval()) return;
if (self.aborting) {
this.jobsList = [];
}
// while queue is empty and number of job running
// concurrently are less than max job running,
// then launch the next job
while (this.jobsList.length && this.jobsRunning < this.maxConcurrency) {
// get the next job and
// remove it from the queue
var job = self.jobsList.shift();
// increment number of job running
self.jobsRunning++;
// fetch args for the job
var args = job[1];
// add jobId in args
args._jobId = this.jobId++;
// emit jobStart event
self.emit('jobStart',args);
// run the job
setTimeout(function() {
this.j(this.args,self.next.bind(self,this.args));
}.bind({j:job[0],args:args}),1);
}
// all jobs done ? emit end event
if (this.jobsList.length == 0 && this.jobsRunning == 0) {
self.emit('end');
}
}
/*
* a task has been terminated,
* so 'next()' has been called
*/
qjob.prototype.next = function(args) {
var self = this;
// update counters
this.jobsRunning--;
this.jobsDone++;
// emit 'jobEnd' event
self.emit('jobEnd',args);
// if queue has been set to pause
// then do nothing
if (this.paused) return;
// else, execute run() function
self.run();
}
/*
* You can 'pause' jobs.
* it will not pause running jobs, but
* it will stop launching pending jobs
* until paused = false
*/
qjob.prototype.pause = function(status) {
var self = this;
this.paused = status;
if (!this.paused && this.pausedId) {
clearInterval(this.pausedId);
self.emit('unpause');
this.run();
}
if (this.paused && !this.pausedId) {
self.lastPause = Date.now();
this.pausedId = setInterval(function() {
var since = Date.now() - self.lastPause;
self.emit('pause',since);
},1000);
return;
}
}
qjob.prototype.stats = function() {
var now = Date.now();
var o = {};
o._timeStart = this.timeStart || 'N/A';
o._timeElapsed = (now - this.timeStart) || 'N/A';
o._jobsTotal = this.jobsTotal;
o._jobsRunning = this.jobsRunning;
o._jobsDone = this.jobsDone;
o._progress = Math.floor((this.jobsDone/this.jobsTotal)*100);
o._concurrency = this.maxConcurrency;
if (this.paused) {
o._status = 'Paused';
return o;
}
if (o._timeElapsed == 'N/A') {
o._status = 'Starting';
return o;
}
if (this.jobsTotal == this.jobsDone) {
o._status = 'Finished';
return o;
}
o._status = 'Running';
return o;
}
qjob.prototype.abort = function() {
this.aborting = true;
}
module.exports = qjob;