blob: 7a4540c60d8c0c5ae86a080cf3fc0a6586c37841 [file] [log] [blame]
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/. */
from __future__ import print_function, unicode_literals, division
import subprocess
import sys
from datetime import datetime, timedelta
from progressbar import ProgressBar
from results import NullTestOutput, TestOutput, escape_cmdline
from threading import Thread
from Queue import Queue, Empty
class EndMarker:
pass
class TaskFinishedMarker:
pass
def _do_work(qTasks, qResults, qWatch, prefix, run_skipped, timeout, show_cmd):
while True:
test = qTasks.get(block=True, timeout=sys.maxint)
if test is EndMarker:
qWatch.put(EndMarker)
qResults.put(EndMarker)
return
if not test.enable and not run_skipped:
qResults.put(NullTestOutput(test))
continue
# Spawn the test task.
cmd = test.get_command(prefix)
if show_cmd:
print(escape_cmdline(cmd))
tStart = datetime.now()
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# Push the task to the watchdog -- it will kill the task
# if it goes over the timeout while we keep its stdout
# buffer clear on the "main" worker thread.
qWatch.put(proc)
out, err = proc.communicate()
qWatch.put(TaskFinishedMarker)
# Create a result record and forward to result processing.
dt = datetime.now() - tStart
result = TestOutput(test, cmd, out, err, proc.returncode, dt.total_seconds(),
dt > timedelta(seconds=timeout))
qResults.put(result)
def _do_watch(qWatch, timeout):
while True:
proc = qWatch.get(True)
if proc == EndMarker:
return
try:
fin = qWatch.get(block=True, timeout=timeout)
assert fin is TaskFinishedMarker, "invalid finish marker"
except Empty:
# Timed out, force-kill the test.
try:
proc.terminate()
except WindowsError as ex:
# If the process finishes after we time out but before we
# terminate, the terminate call will fail. We can safely
# ignore this.
if ex.winerror != 5:
raise
fin = qWatch.get(block=True, timeout=sys.maxint)
assert fin is TaskFinishedMarker, "invalid finish marker"
def run_all_tests(tests, prefix, pb, options):
"""
Uses scatter-gather to a thread-pool to manage children.
"""
qTasks, qResults = Queue(), Queue()
workers = []
watchdogs = []
for _ in range(options.worker_count):
qWatch = Queue()
watcher = Thread(target=_do_watch, args=(qWatch, options.timeout))
watcher.setDaemon(True)
watcher.start()
watchdogs.append(watcher)
worker = Thread(target=_do_work, args=(qTasks, qResults, qWatch,
prefix, options.run_skipped,
options.timeout, options.show_cmd))
worker.setDaemon(True)
worker.start()
workers.append(worker)
# Insert all jobs into the queue, followed by the queue-end
# marker, one per worker. This will not block on growing the
# queue, only on waiting for more items in the generator. The
# workers are already started, however, so this will process as
# fast as we can produce tests from the filesystem.
def _do_push(num_workers, qTasks):
for test in tests:
qTasks.put(test)
for _ in range(num_workers):
qTasks.put(EndMarker)
pusher = Thread(target=_do_push, args=(len(workers), qTasks))
pusher.setDaemon(True)
pusher.start()
# Read from the results.
ended = 0
delay = ProgressBar.update_granularity().total_seconds()
while ended < len(workers):
try:
result = qResults.get(block=True, timeout=delay)
if result is EndMarker:
ended += 1
else:
yield result
except Empty:
pb.poke()
# Cleanup and exit.
pusher.join()
for worker in workers:
worker.join()
for watcher in watchdogs:
watcher.join()
assert qTasks.empty(), "Send queue not drained"
assert qResults.empty(), "Result queue not drained"