blob: 30f806c7b5ce7054003eae1e6bc8fcbdd2a290b8 [file] [log] [blame]
#!/usr/bin/env python
"""
Runs commands from a queue!
"""
import subprocess
import os
import signal
import time
from mozilla_buildtools.queuedir import QueueDir
from buildbot.util import json
import logging
log = logging.getLogger(__name__)
class Job(object):
def __init__(self, cmd, item_id, log_fp):
self.cmd = cmd
self.log = log_fp
self.item_id = item_id
self.started = None
self.last_signal_time = 0
self.last_signal = None
self.proc = None
def start(self):
devnull = open(os.devnull, 'r')
self.log.write("Running %s\n" % self.cmd)
self.log.flush()
self.proc = subprocess.Popen(self.cmd, close_fds=True, stdin=devnull,
stdout=self.log, stderr=self.log)
self.started = time.time()
def check(self):
now = time.time()
if now - self.started > self.max_time:
# Kill stuff off
if now - self.last_signal_time > 60:
s = {None: signal.SIGINT, signal.SIGINT:
signal.SIGTERM}.get(self.last_signal, signal.SIGKILL)
log.info("Killing %i with %i", self.proc.pid, s)
try:
self.log.write("Killing with %s\n" % s)
os.kill(self.proc.pid, s)
self.last_signal = s
self.last_signal_time = now
except OSError:
# Ok, process must have exited already
log.exception("Failed to kill")
pass
result = self.proc.poll()
if result is not None:
self.log.write("\nResult: %s, Elapsed: %1.1f seconds\n" % (result, time.time() - self.started))
self.log.close()
return result
class CommandRunner(object):
def __init__(self, options):
self.queuedir = options.queuedir
self.q = QueueDir('commands', self.queuedir)
self.concurrency = options.concurrency
self.retry_time = options.retry_time
self.max_retries = options.max_retries
self.max_time = options.max_time
self.active = []
# List of (signal_time, level, proc)
self.to_kill = []
def run(self, job):
"""
Runs the given job
"""
log.info("Running %s", job.cmd)
try:
job.start()
self.active.append(job)
except OSError:
job.log.write("\nFailed with OSError; requeuing in %i seconds\n" %
self.retry_time)
# Wait to requeue it
# If we die, then it's still in cur, and will be moved back into
# 'new' eventually
self.q.requeue(job.item_id, self.retry_time, self.max_retries)
def monitor(self):
"""
Monitor running jobs
"""
for job in self.active[:]:
self.q.touch(job.item_id)
result = job.check()
if result is not None:
self.active.remove(job)
if result == 0:
self.q.remove(job.item_id)
else:
log.warn("%s failed; requeuing", job.item_id)
# Requeue it!
self.q.requeue(
job.item_id, self.retry_time, self.max_retries)
def loop(self):
"""
Main processing loop. Read new items from the queue and run them!
"""
while True:
self.monitor()
if len(self.active) >= self.concurrency:
# Wait!
time.sleep(1)
continue
while len(self.active) < self.concurrency:
item = self.q.pop()
if not item:
# Don't wait for very long, since we have to check up on
# our children
if self.active:
self.q.wait(1)
else:
self.q.wait()
break
item_id, fp = item
try:
command = json.load(fp)
job = Job(command, item_id, self.q.getlog(item_id))
job.max_time = self.max_time
self.run(job)
except ValueError:
# Couldn't parse it as json
# There's no hope!
self.q.log(item_id, "Couldn't load json; murdering")
self.q.murder(item_id)
finally:
fp.close()
def main():
from optparse import OptionParser
import logging.handlers
parser = OptionParser()
parser.set_defaults(
concurrency=1,
max_retries=1,
retry_time=0,
verbosity=0,
logfile=None,
max_time=60,
)
parser.add_option("-q", "--queuedir", dest="queuedir")
parser.add_option("-j", "--jobs", dest="concurrency", type="int",
help="number of commands to run at once")
parser.add_option("-r", "--max_retries", dest="max_retries",
type="int", help="number of times to retry commands")
parser.add_option("-t", "--retry_time", dest="retry_time",
type="int", help="seconds to wait between retries")
parser.add_option("-v", "--verbose", dest="verbosity",
action="count", help="increase verbosity")
parser.add_option(
"-l", "--logfile", dest="logfile", help="where to send logs")
parser.add_option("-m", "--max_time", dest="max_time", type="int",
help="maximum time for a command to run")
options, args = parser.parse_args()
# Set up logging
if options.verbosity == 0:
log_level = logging.WARNING
elif options.verbosity == 1:
log_level = logging.INFO
else:
log_level = logging.DEBUG
if not options.logfile:
logging.basicConfig(
level=log_level, format="%(asctime)s - %(message)s")
else:
logger = logging.getLogger()
logger.setLevel(log_level)
handler = logging.handlers.RotatingFileHandler(
options.logfile, maxBytes=1024 ** 2, backupCount=5)
formatter = logging.Formatter("%(asctime)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
if not options.queuedir:
parser.error("-q/--queuedir is required")
runner = CommandRunner(options)
runner.loop()
if __name__ == '__main__':
main()