blob: 8676b96715c68d2b6e4bbd4dda5b9304d559d25b [file] [log] [blame]
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at
import time
import os
import mozinfo
class TestContext(object):
""" Stores context data about the test """
attrs = ['hostname', 'arch', 'env', 'os', 'os_version', 'tree', 'revision',
'product', 'logfile', 'testgroup', 'harness', 'buildtype']
def __init__(self, hostname='localhost', tree='', revision='', product='',
logfile=None, arch='', operating_system='', testgroup='',
harness='moztest', buildtype=''):
self.hostname = hostname
self.arch = arch or mozinfo.processor
self.env = os.environ.copy()
self.os = operating_system or mozinfo.os
self.os_version = mozinfo.version
self.tree = tree
self.revision = revision
self.product = product
self.logfile = logfile
self.testgroup = testgroup
self.harness = harness
self.buildtype = buildtype
def __str__(self):
return '%s (%s, %s)' % (self.hostname, self.os, self.arch)
def __repr__(self):
return '<%s>' % self.__str__()
def __eq__(self, other):
if not isinstance(other, TestContext):
return False
diffs = [a for a in self.attrs if getattr(self, a) != getattr(other, a)]
return len(diffs) == 0
def __hash__(self):
def get(attr):
value = getattr(self, attr)
if isinstance(value, dict):
value = frozenset(value.items())
return value
return hash(frozenset([get(a) for a in self.attrs]))
class TestResult(object):
""" Stores test result data """
def __init__(self, name, test_class='', time_start=None, context=None,
""" Create a TestResult instance.
name = name of the test that is running
test_class = the class that the test belongs to
time_start = timestamp (seconds since UNIX epoch) of when the test started
running; if not provided, defaults to the current time
! Provide 0 if you only have the duration
context = TestContext instance; can be None
result_expected = string representing the expected outcome of the test"""
msg = "Result '%s' not in possible results: %s" %\
(result_expected, ', '.join(self.POSSIBLE_RESULTS))
assert isinstance(name, basestring), "name has to be a string"
assert result_expected in self.POSSIBLE_RESULTS, msg = name
self.test_class = test_class
self.context = context
self.time_start = time_start if time_start is not None else time.time()
self.time_end = None
self._result_expected = result_expected
self._result_actual = None
self.result = None
self.filename = None
self.description = None
self.output = []
self.reason = None
def test_name(self):
return ' %s.%s' % (self.test_class.split('.')[0],
def __str__(self):
return '%s | %s (%s) | %s' % (self.result or 'PENDING',, self.test_class, self.reason)
def __repr__(self):
return '<%s>' % self.__str__()
def calculate_result(self, expected, actual):
if actual == 'ERROR':
return 'ERROR'
if actual == 'SKIP':
return 'SKIPPED'
if expected == 'PASS':
if actual == 'PASS':
return 'PASS'
if actual == 'FAIL':
if expected == 'FAIL':
if actual == 'PASS':
if actual == 'FAIL':
return 'KNOWN-FAIL'
# if actual is skip or error, we return at the beginning, so if we get
# here it is definitely some kind of error
return 'ERROR'
def infer_results(self, computed_result):
assert computed_result in self.COMPUTED_RESULTS
if computed_result == 'UNEXPECTED-PASS':
expected = 'FAIL'
actual = 'PASS'
elif computed_result == 'UNEXPECTED-FAIL':
expected = 'PASS'
actual = 'FAIL'
elif computed_result == 'KNOWN-FAIL':
expected = actual = 'FAIL'
elif computed_result == 'SKIPPED':
expected = actual = 'SKIP'
self._result_expected = expected
self._result_actual = actual
def finish(self, result, time_end=None, output=None, reason=None):
""" Marks the test as finished, storing its end time and status
! Provide the duration as time_end if you only have that. """
if result in self.POSSIBLE_RESULTS:
self._result_actual = result
self.result = self.calculate_result(self._result_expected,
elif result in self.COMPUTED_RESULTS:
self.result = result
msg = "Result '%s' not valid. Need one of: %s" %\
(result, ', '.join(valid))
raise ValueError(msg)
# use lists instead of multiline strings
if isinstance(output, basestring):
output = output.splitlines()
self.time_end = time_end if time_end is not None else time.time()
self.output = output or self.output
self.reason = reason
def finished(self):
""" Boolean saying if the test is finished or not """
return self.result is not None
def duration(self):
""" Returns the time it took for the test to finish. If the test is
not finished, returns the elapsed time so far """
if self.result is not None:
return self.time_end - self.time_start
# returns the elapsed time
return time.time() - self.time_start
class TestResultCollection(list):
""" Container class that stores test results """
resultClass = TestResult
def __init__(self, suite_name, time_taken=0, resultClass=None):
self.suite_name = suite_name
self.time_taken = time_taken
if resultClass is not None:
self.resultClass = resultClass
def __str__(self):
return "%s (%.2fs)\n%s" % (self.suite_name, self.time_taken,
def subset(self, predicate):
tests = self.filter(predicate)
duration = 0
sub = TestResultCollection(self.suite_name)
for t in tests:
duration += t.duration
sub.time_taken = duration
return sub
def contexts(self):
""" List of unique contexts for the test results contained """
cs = [tr.context for tr in self]
return list(set(cs))
def filter(self, predicate):
""" Returns a generator of TestResults that satisfy a given predicate """
return (tr for tr in self if predicate(tr))
def tests_with_result(self, result):
""" Returns a generator of TestResults with the given result """
msg = "Result '%s' not in possible results: %s" %\
(result, ', '.join(self.resultClass.COMPUTED_RESULTS))
assert result in self.resultClass.COMPUTED_RESULTS, msg
return self.filter(lambda t: t.result == result)
def tests(self):
""" Generator of all tests in the collection """
return (t for t in self)
def add_result(self, test, result_expected='PASS',
result_actual='PASS', output='', context=None):
def get_class(test):
return test.__class__.__module__ + '.' + test.__class__.__name__
t = self.resultClass(name=str(test).split()[0], test_class=get_class(test),
time_start=0, result_expected=result_expected,
t.finish(result_actual, time_end=0, reason=relevant_line(output),
def num_failures(self):
fails = 0
for t in self:
if t.result in self.resultClass.FAIL_RESULTS:
fails += 1
return fails
def add_unittest_result(self, result, context=None):
""" Adds the python unittest result provided to the collection"""
if hasattr(result, 'time_taken'):
self.time_taken += result.time_taken
for test, output in result.errors:
self.add_result(test, result_actual='ERROR', output=output)
for test, output in result.failures:
self.add_result(test, result_actual='FAIL',
if hasattr(result, 'unexpectedSuccesses'):
for test in result.unexpectedSuccesses:
self.add_result(test, result_expected='FAIL',
if hasattr(result, 'skipped'):
for test, output in result.skipped:
self.add_result(test, result_expected='SKIP',
result_actual='SKIP', output=output)
if hasattr(result, 'expectedFailures'):
for test, output in result.expectedFailures:
self.add_result(test, result_expected='FAIL',
result_actual='FAIL', output=output)
# unittest does not store these by default
if hasattr(result, 'tests_passed'):
for test in result.tests_passed:
def from_unittest_results(cls, context, *results):
""" Creates a TestResultCollection containing the given python
unittest results """
if not results:
return cls('from unittest')
# all the TestResult instances share the same context
context = context or TestContext()
collection = cls('from %s' % results[0].__class__.__name__)
for result in results:
collection.add_unittest_result(result, context)
return collection
# used to get exceptions/errors from tracebacks
def relevant_line(s):
KEYWORDS = ('Error:', 'Exception:', 'error:', 'exception:')
lines = s.splitlines()
for line in lines:
for keyword in KEYWORDS:
if keyword in line:
return line
return 'N/A'