blob: e67cbb7719df9cfffe92098dc278ce476a0ba7aa [file] [log] [blame]
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import time
from collections import defaultdict
try:
import blessings
except ImportError:
blessings = None
import base
from .process import strstatus
def format_seconds(total):
"""Format number of seconds to MM:SS.DD form."""
minutes, seconds = divmod(total, 60)
return '%2d:%05.2f' % (minutes, seconds)
class NullTerminal(object):
def __getattr__(self, name):
return self._id
def _id(self, value):
return value
class MachFormatter(base.BaseFormatter):
def __init__(self, start_time=None, write_interval=False, write_times=True,
terminal=None, disable_colors=False):
if disable_colors:
terminal = None
elif terminal is None and blessings is not None:
terminal = blessings.Terminal()
if start_time is None:
start_time = time.time()
start_time = int(start_time * 1000)
self.start_time = start_time
self.write_interval = write_interval
self.write_times = write_times
self.status_buffer = {}
self.has_unexpected = {}
self.last_time = None
self.terminal = terminal
self.verbose = False
self._known_pids = set()
self.summary_values = {"tests": 0,
"subtests": 0,
"expected": 0,
"unexpected": defaultdict(int),
"skipped": 0}
self.summary_unexpected = []
def __call__(self, data):
s = base.BaseFormatter.__call__(self, data)
if s is None:
return
time = format_seconds(self._time(data))
action = data["action"].upper()
thread = data["thread"]
# Not using the NullTerminal here is a small optimisation to cut the number of
# function calls
if self.terminal is not None:
test = self._get_test_id(data)
time = self.terminal.blue(time)
color = None
if data["action"] == "test_end":
if "expected" not in data and not self.has_unexpected[test]:
color = self.terminal.green
else:
color = self.terminal.red
elif data["action"] in ("suite_start", "suite_end",
"test_start", "test_status"):
color = self.terminal.yellow
elif data["action"] == "crash":
color = self.terminal.red
if color is not None:
action = color(action)
return "%s %s: %s %s\n" % (time, action, thread, s)
def _get_test_id(self, data):
test_id = data.get("test")
if isinstance(test_id, list):
test_id = tuple(test_id)
return test_id
def _get_file_name(self, test_id):
if isinstance(test_id, (str, unicode)):
return test_id
if isinstance(test_id, tuple):
return "".join(test_id)
assert False, "unexpected test_id"
def suite_start(self, data):
self.summary_values = {"tests": 0,
"subtests": 0,
"expected": 0,
"unexpected": defaultdict(int),
"skipped": 0}
self.summary_unexpected = []
return "%i" % len(data["tests"])
def suite_end(self, data):
term = self.terminal if self.terminal is not None else NullTerminal()
heading = "Summary"
rv = ["", heading, "=" * len(heading), ""]
has_subtests = self.summary_values["subtests"] > 0
if has_subtests:
rv.append("Ran %i tests (%i parents, %i subtests)" %
(self.summary_values["tests"] + self.summary_values["subtests"],
self.summary_values["tests"],
self.summary_values["subtests"]))
else:
rv.append("Ran %i tests" % self.summary_values["tests"])
rv.append("Expected results: %i" % self.summary_values["expected"])
unexpected_count = sum(self.summary_values["unexpected"].values())
if unexpected_count > 0:
unexpected_str = " (%s)" % ", ".join("%s: %i" % (key, value) for key, value in
sorted(self.summary_values["unexpected"].items()))
else:
unexpected_str = ""
rv.append("Unexpected results: %i%s" % (unexpected_count, unexpected_str))
if self.summary_values["skipped"] > 0:
rv.append("Skipped: %i" % self.summary_values["skipped"])
rv.append("")
if not self.summary_values["unexpected"]:
rv.append(term.green("OK"))
else:
heading = "Unexpected Results"
rv.extend([heading, "=" * len(heading), ""])
if has_subtests:
for test_id, results in self.summary_unexpected:
test = self._get_file_name(test_id)
rv.extend([test, "-" * len(test)])
for name, status, expected, message in results:
if name is None:
name = "[Parent]"
rv.append("%s %s" % (self.format_expected(status, expected), name))
else:
for test_id, results in self.summary_unexpected:
test = self._get_file_name(test_id)
assert len(results) == 1
name, status, expected, messge = results[0]
assert name is None
rv.append("%s %s" % (self.format_expected(status, expected), test))
return "\n".join(rv)
def format_expected(self, status, expected):
term = self.terminal if self.terminal is not None else NullTerminal()
if status == "ERROR":
color = term.red
else:
color = term.yellow
if expected in ("PASS", "OK"):
return color(status)
return color("%s expected %s" % (status, expected))
def test_start(self, data):
self.summary_values["tests"] += 1
return "%s" % (self._get_test_id(data),)
def test_end(self, data):
subtests = self._get_subtest_data(data)
unexpected = subtests["unexpected"]
message = data.get("message", "")
if "stack" in data:
stack = data["stack"]
if stack and stack[-1] != "\n":
stack += "\n"
message = stack + message
if "expected" in data:
parent_unexpected = True
expected_str = ", expected %s" % data["expected"]
unexpected.append((None, data["status"], data["expected"],
message))
else:
parent_unexpected = False
expected_str = ""
test = self._get_test_id(data)
if unexpected:
self.summary_unexpected.append((test, unexpected))
self._update_summary(data)
#Reset the counts to 0
self.status_buffer[test] = {"count": 0, "unexpected": [], "pass": 0}
self.has_unexpected[test] = bool(unexpected)
if subtests["count"] != 0:
rv = "Harness %s%s. Subtests passed %i/%i. Unexpected %s" % (
data["status"], expected_str, subtests["pass"], subtests["count"],
len(unexpected))
else:
rv = "%s%s" % (data["status"], expected_str)
if unexpected:
rv += "\n"
if len(unexpected) == 1 and parent_unexpected:
rv += "%s" % unexpected[0][-1]
else:
for name, status, expected, message in unexpected:
if name is None:
name = "[Parent]"
expected_str = "Expected %s, got %s" % (expected, status)
rv += "%s\n" % ("\n".join([name, "-" * len(name), expected_str, message]))
rv = rv[:-1]
return rv
def valgrind_error(self, data):
rv = " " + data['primary'] + "\n"
for line in data['secondary']:
rv = rv + line + "\n"
return rv
def test_status(self, data):
self.summary_values["subtests"] += 1
test = self._get_test_id(data)
if test not in self.status_buffer:
self.status_buffer[test] = {"count": 0, "unexpected": [], "pass": 0}
self.status_buffer[test]["count"] += 1
message = data.get("message", "")
if "stack" in data:
if message:
message += "\n"
message += data["stack"]
if data["status"] == "PASS":
self.status_buffer[test]["pass"] += 1
self._update_summary(data)
rv = None
status, subtest = data["status"], data["subtest"]
unexpected = "expected" in data
if self.verbose:
if self.terminal is not None:
status = (self.terminal.red if unexpected else self.terminal.green)(status)
rv = " ".join([subtest, status, message])
elif unexpected:
# We only append an unexpected summary if it was not logged
# directly by verbose mode.
self.status_buffer[test]["unexpected"].append((subtest,
status,
data["expected"],
message))
return rv
def _update_summary(self, data):
if "expected" in data:
self.summary_values["unexpected"][data["status"]] += 1
elif data["status"] == "SKIP":
self.summary_values["skipped"] += 1
else:
self.summary_values["expected"] += 1
def process_output(self, data):
rv = []
if "command" in data and data["process"] not in self._known_pids:
self._known_pids.add(data["process"])
rv.append('(pid:%s) Full command: %s' % (data["process"], data["command"]))
rv.append('(pid:%s) "%s"' % (data["process"], data["data"]))
return "\n".join(rv)
def crash(self, data):
test = self._get_test_id(data)
if data.get("stackwalk_returncode", 0) != 0 and not data.get("stackwalk_stderr"):
success = True
else:
success = False
rv = ["pid:%s. Test:%s. Minidump anaylsed:%s. Signature:[%s]" %
(data.get("pid", None), test, success, data["signature"])]
if data.get("minidump_path"):
rv.append("Crash dump filename: %s" % data["minidump_path"])
if data.get("stackwalk_returncode", 0) != 0:
rv.append("minidump_stackwalk exited with return code %d" %
data["stackwalk_returncode"])
if data.get("stackwalk_stderr"):
rv.append("stderr from minidump_stackwalk:")
rv.append(data["stackwalk_stderr"])
elif data.get("stackwalk_stdout"):
rv.append(data["stackwalk_stdout"])
if data.get("stackwalk_errors"):
rv.extend(data.get("stackwalk_errors"))
rv = "\n".join(rv)
if not rv[-1] == "\n":
rv += "\n"
return rv
def process_start(self, data):
rv = "Started process `%s`" % data['process']
desc = data.get('command')
if desc:
rv = '%s (%s)' % (rv, desc)
return rv
def process_exit(self, data):
return "%s: %s" % (data['process'], strstatus(data['exitcode']))
def log(self, data):
level = data.get("level").upper()
if self.terminal is not None:
if level in ("CRITICAL", "ERROR"):
level = self.terminal.red(level)
elif level == "WARNING":
level = self.terminal.yellow(level)
elif level == "INFO":
level = self.terminal.blue(level)
if data.get('component'):
rv = " ".join([data["component"], level, data["message"]])
else:
rv = "%s %s" % (level, data["message"])
if "stack" in data:
rv += "\n%s" % data["stack"]
return rv
def _get_subtest_data(self, data):
test = self._get_test_id(data)
return self.status_buffer.get(test, {"count": 0, "unexpected": [], "pass": 0})
def _time(self, data):
entry_time = data["time"]
if self.write_interval and self.last_time is not None:
t = entry_time - self.last_time
self.last_time = entry_time
else:
t = entry_time - self.start_time
return t / 1000.