| # This Source Code Form is subject to the terms of the Mozilla Public |
| # License, v. 2.0. If a copy of the MPL was not distributed with this file, |
| # You can obtain one at http://mozilla.org/MPL/2.0/. |
| |
| |
| import time |
| import os |
| import mozinfo |
| |
| |
| class TestContext(object): |
| """ Stores context data about the test """ |
| |
| attrs = ['hostname', 'arch', 'env', 'os', 'os_version', 'tree', 'revision', |
| 'product', 'logfile', 'testgroup', 'harness', 'buildtype'] |
| |
| def __init__(self, hostname='localhost', tree='', revision='', product='', |
| logfile=None, arch='', operating_system='', testgroup='', |
| harness='moztest', buildtype=''): |
| self.hostname = hostname |
| self.arch = arch or mozinfo.processor |
| self.env = os.environ.copy() |
| self.os = operating_system or mozinfo.os |
| self.os_version = mozinfo.version |
| self.tree = tree |
| self.revision = revision |
| self.product = product |
| self.logfile = logfile |
| self.testgroup = testgroup |
| self.harness = harness |
| self.buildtype = buildtype |
| |
| def __str__(self): |
| return '%s (%s, %s)' % (self.hostname, self.os, self.arch) |
| |
| def __repr__(self): |
| return '<%s>' % self.__str__() |
| |
| def __eq__(self, other): |
| if not isinstance(other, TestContext): |
| return False |
| diffs = [a for a in self.attrs if getattr(self, a) != getattr(other, a)] |
| return len(diffs) == 0 |
| |
| def __hash__(self): |
| def get(attr): |
| value = getattr(self, attr) |
| if isinstance(value, dict): |
| value = frozenset(value.items()) |
| return value |
| return hash(frozenset([get(a) for a in self.attrs])) |
| |
| |
| class TestResult(object): |
| """ Stores test result data """ |
| |
| FAIL_RESULTS = [ |
| 'UNEXPECTED-PASS', |
| 'UNEXPECTED-FAIL', |
| 'ERROR', |
| ] |
| COMPUTED_RESULTS = FAIL_RESULTS + [ |
| 'PASS', |
| 'KNOWN-FAIL', |
| 'SKIPPED', |
| ] |
| POSSIBLE_RESULTS = [ |
| 'PASS', |
| 'FAIL', |
| 'SKIP', |
| 'ERROR', |
| ] |
| |
| def __init__(self, name, test_class='', time_start=None, context=None, |
| result_expected='PASS'): |
| """ Create a TestResult instance. |
| name = name of the test that is running |
| test_class = the class that the test belongs to |
| time_start = timestamp (seconds since UNIX epoch) of when the test started |
| running; if not provided, defaults to the current time |
| ! Provide 0 if you only have the duration |
| context = TestContext instance; can be None |
| result_expected = string representing the expected outcome of the test""" |
| |
| msg = "Result '%s' not in possible results: %s" %\ |
| (result_expected, ', '.join(self.POSSIBLE_RESULTS)) |
| assert isinstance(name, basestring), "name has to be a string" |
| assert result_expected in self.POSSIBLE_RESULTS, msg |
| |
| self.name = name |
| self.test_class = test_class |
| self.context = context |
| self.time_start = time_start if time_start is not None else time.time() |
| self.time_end = None |
| self._result_expected = result_expected |
| self._result_actual = None |
| self.result = None |
| self.filename = None |
| self.description = None |
| self.output = [] |
| self.reason = None |
| |
| @property |
| def test_name(self): |
| return '%s.py %s.%s' % (self.test_class.split('.')[0], |
| self.test_class, |
| self.name) |
| |
| def __str__(self): |
| return '%s | %s (%s) | %s' % (self.result or 'PENDING', |
| self.name, self.test_class, self.reason) |
| |
| def __repr__(self): |
| return '<%s>' % self.__str__() |
| |
| def calculate_result(self, expected, actual): |
| if actual == 'ERROR': |
| return 'ERROR' |
| if actual == 'SKIP': |
| return 'SKIPPED' |
| |
| if expected == 'PASS': |
| if actual == 'PASS': |
| return 'PASS' |
| if actual == 'FAIL': |
| return 'UNEXPECTED-FAIL' |
| |
| if expected == 'FAIL': |
| if actual == 'PASS': |
| return 'UNEXPECTED-PASS' |
| if actual == 'FAIL': |
| return 'KNOWN-FAIL' |
| |
| # if actual is skip or error, we return at the beginning, so if we get |
| # here it is definitely some kind of error |
| return 'ERROR' |
| |
| def infer_results(self, computed_result): |
| assert computed_result in self.COMPUTED_RESULTS |
| if computed_result == 'UNEXPECTED-PASS': |
| expected = 'FAIL' |
| actual = 'PASS' |
| elif computed_result == 'UNEXPECTED-FAIL': |
| expected = 'PASS' |
| actual = 'FAIL' |
| elif computed_result == 'KNOWN-FAIL': |
| expected = actual = 'FAIL' |
| elif computed_result == 'SKIPPED': |
| expected = actual = 'SKIP' |
| else: |
| return |
| self._result_expected = expected |
| self._result_actual = actual |
| |
| def finish(self, result, time_end=None, output=None, reason=None): |
| """ Marks the test as finished, storing its end time and status |
| ! Provide the duration as time_end if you only have that. """ |
| |
| if result in self.POSSIBLE_RESULTS: |
| self._result_actual = result |
| self.result = self.calculate_result(self._result_expected, |
| self._result_actual) |
| elif result in self.COMPUTED_RESULTS: |
| self.infer_results(result) |
| self.result = result |
| else: |
| valid = self.POSSIBLE_RESULTS + self.COMPUTED_RESULTS |
| msg = "Result '%s' not valid. Need one of: %s" %\ |
| (result, ', '.join(valid)) |
| raise ValueError(msg) |
| |
| # use lists instead of multiline strings |
| if isinstance(output, basestring): |
| output = output.splitlines() |
| |
| self.time_end = time_end if time_end is not None else time.time() |
| self.output = output or self.output |
| self.reason = reason |
| |
| @property |
| def finished(self): |
| """ Boolean saying if the test is finished or not """ |
| return self.result is not None |
| |
| @property |
| def duration(self): |
| """ Returns the time it took for the test to finish. If the test is |
| not finished, returns the elapsed time so far """ |
| if self.result is not None: |
| return self.time_end - self.time_start |
| else: |
| # returns the elapsed time |
| return time.time() - self.time_start |
| |
| |
| class TestResultCollection(list): |
| """ Container class that stores test results """ |
| |
| resultClass = TestResult |
| |
| def __init__(self, suite_name, time_taken=0, resultClass=None): |
| list.__init__(self) |
| self.suite_name = suite_name |
| self.time_taken = time_taken |
| if resultClass is not None: |
| self.resultClass = resultClass |
| |
| def __str__(self): |
| return "%s (%.2fs)\n%s" % (self.suite_name, self.time_taken, |
| list.__str__(self)) |
| |
| def subset(self, predicate): |
| tests = self.filter(predicate) |
| duration = 0 |
| sub = TestResultCollection(self.suite_name) |
| for t in tests: |
| sub.append(t) |
| duration += t.duration |
| sub.time_taken = duration |
| return sub |
| |
| @property |
| def contexts(self): |
| """ List of unique contexts for the test results contained """ |
| cs = [tr.context for tr in self] |
| return list(set(cs)) |
| |
| def filter(self, predicate): |
| """ Returns a generator of TestResults that satisfy a given predicate """ |
| return (tr for tr in self if predicate(tr)) |
| |
| def tests_with_result(self, result): |
| """ Returns a generator of TestResults with the given result """ |
| msg = "Result '%s' not in possible results: %s" %\ |
| (result, ', '.join(self.resultClass.COMPUTED_RESULTS)) |
| assert result in self.resultClass.COMPUTED_RESULTS, msg |
| return self.filter(lambda t: t.result == result) |
| |
| @property |
| def tests(self): |
| """ Generator of all tests in the collection """ |
| return (t for t in self) |
| |
| def add_result(self, test, result_expected='PASS', |
| result_actual='PASS', output='', context=None): |
| def get_class(test): |
| return test.__class__.__module__ + '.' + test.__class__.__name__ |
| |
| t = self.resultClass(name=str(test).split()[0], test_class=get_class(test), |
| time_start=0, result_expected=result_expected, |
| context=context) |
| t.finish(result_actual, time_end=0, reason=relevant_line(output), |
| output=output) |
| self.append(t) |
| |
| @property |
| def num_failures(self): |
| fails = 0 |
| for t in self: |
| if t.result in self.resultClass.FAIL_RESULTS: |
| fails += 1 |
| return fails |
| |
| def add_unittest_result(self, result, context=None): |
| """ Adds the python unittest result provided to the collection""" |
| if hasattr(result, 'time_taken'): |
| self.time_taken += result.time_taken |
| |
| for test, output in result.errors: |
| self.add_result(test, result_actual='ERROR', output=output) |
| |
| for test, output in result.failures: |
| self.add_result(test, result_actual='FAIL', |
| output=output) |
| |
| if hasattr(result, 'unexpectedSuccesses'): |
| for test in result.unexpectedSuccesses: |
| self.add_result(test, result_expected='FAIL', |
| result_actual='PASS') |
| |
| if hasattr(result, 'skipped'): |
| for test, output in result.skipped: |
| self.add_result(test, result_expected='SKIP', |
| result_actual='SKIP', output=output) |
| |
| if hasattr(result, 'expectedFailures'): |
| for test, output in result.expectedFailures: |
| self.add_result(test, result_expected='FAIL', |
| result_actual='FAIL', output=output) |
| |
| # unittest does not store these by default |
| if hasattr(result, 'tests_passed'): |
| for test in result.tests_passed: |
| self.add_result(test) |
| |
| @classmethod |
| def from_unittest_results(cls, context, *results): |
| """ Creates a TestResultCollection containing the given python |
| unittest results """ |
| |
| if not results: |
| return cls('from unittest') |
| |
| # all the TestResult instances share the same context |
| context = context or TestContext() |
| |
| collection = cls('from %s' % results[0].__class__.__name__) |
| |
| for result in results: |
| collection.add_unittest_result(result, context) |
| |
| return collection |
| |
| |
| # used to get exceptions/errors from tracebacks |
| def relevant_line(s): |
| KEYWORDS = ('Error:', 'Exception:', 'error:', 'exception:') |
| lines = s.splitlines() |
| for line in lines: |
| for keyword in KEYWORDS: |
| if keyword in line: |
| return line |
| return 'N/A' |