blob: c92493f4acf023352f719b2078e8591efc93f964 [file] [log] [blame]
import hashlib
import json
import os
import traceback
import urlparse
from abc import ABCMeta, abstractmethod
from ..testrunner import Stop
here = os.path.split(__file__)[0]
def executor_kwargs(test_type, server_config, cache_manager, **kwargs):
timeout_multiplier = kwargs["timeout_multiplier"]
if timeout_multiplier is None:
timeout_multiplier = 1
executor_kwargs = {"server_config": server_config,
"timeout_multiplier": timeout_multiplier,
"debug_info": kwargs["debug_info"]}
if test_type == "reftest":
executor_kwargs["screenshot_cache"] = cache_manager.dict()
return executor_kwargs
def strip_server(url):
"""Remove the scheme and netloc from a url, leaving only the path and any query
or fragment.
url - the url to strip
e.g. http://example.org:8000/tests?id=1#2 becomes /tests?id=1#2"""
url_parts = list(urlparse.urlsplit(url))
url_parts[0] = ""
url_parts[1] = ""
return urlparse.urlunsplit(url_parts)
class TestharnessResultConverter(object):
harness_codes = {0: "OK",
1: "ERROR",
2: "TIMEOUT"}
test_codes = {0: "PASS",
1: "FAIL",
2: "TIMEOUT",
3: "NOTRUN"}
def __call__(self, test, result):
"""Convert a JSON result into a (TestResult, [SubtestResult]) tuple"""
result_url, status, message, stack, subtest_results = result
assert result_url == test.url, ("Got results from %s, expected %s" %
(result_url, test.url))
harness_result = test.result_cls(self.harness_codes[status], message)
return (harness_result,
[test.subtest_result_cls(name, self.test_codes[status], message, stack)
for name, status, message, stack in subtest_results])
testharness_result_converter = TestharnessResultConverter()
def reftest_result_converter(self, test, result):
return (test.result_cls(result["status"], result["message"],
extra=result.get("extra")), [])
def pytest_result_converter(self, test, data):
harness_data, subtest_data = data
if subtest_data is None:
subtest_data = []
harness_result = test.result_cls(*harness_data)
subtest_results = [test.subtest_result_cls(*item) for item in subtest_data]
return (harness_result, subtest_results)
class ExecutorException(Exception):
def __init__(self, status, message):
self.status = status
self.message = message
class TestExecutor(object):
__metaclass__ = ABCMeta
test_type = None
convert_result = None
def __init__(self, browser, server_config, timeout_multiplier=1,
debug_info=None, **kwargs):
"""Abstract Base class for object that actually executes the tests in a
specific browser. Typically there will be a different TestExecutor
subclass for each test type and method of executing tests.
:param browser: ExecutorBrowser instance providing properties of the
browser that will be tested.
:param server_config: Dictionary of wptserve server configuration of the
form stored in TestEnvironment.external_config
:param timeout_multiplier: Multiplier relative to base timeout to use
when setting test timeout.
"""
self.runner = None
self.browser = browser
self.server_config = server_config
self.timeout_multiplier = timeout_multiplier
self.debug_info = debug_info
self.last_environment = {"protocol": "http",
"prefs": {}}
self.protocol = None # This must be set in subclasses
@property
def logger(self):
"""StructuredLogger for this executor"""
if self.runner is not None:
return self.runner.logger
def setup(self, runner):
"""Run steps needed before tests can be started e.g. connecting to
browser instance
:param runner: TestRunner instance that is going to run the tests"""
self.runner = runner
if self.protocol is not None:
self.protocol.setup(runner)
def teardown(self):
"""Run cleanup steps after tests have finished"""
if self.protocol is not None:
self.protocol.teardown()
def run_test(self, test):
"""Run a particular test.
:param test: The test to run"""
if test.environment != self.last_environment:
self.on_environment_change(test.environment)
try:
result = self.do_test(test)
except Exception as e:
result = self.result_from_exception(test, e)
if result is Stop:
return result
# log result of parent test
if result[0].status == "ERROR":
self.logger.debug(result[0].message)
self.last_environment = test.environment
self.runner.send_message("test_ended", test, result)
def server_url(self, protocol):
return "%s://%s:%s" % (protocol,
self.server_config["host"],
self.server_config["ports"][protocol][0])
def test_url(self, test):
return urlparse.urljoin(self.server_url(test.environment["protocol"]), test.url)
@abstractmethod
def do_test(self, test):
"""Test-type and protocol specific implementation of running a
specific test.
:param test: The test to run."""
pass
def on_environment_change(self, new_environment):
pass
def result_from_exception(self, test, e):
if hasattr(e, "status") and e.status in test.result_cls.statuses:
status = e.status
else:
status = "ERROR"
message = unicode(getattr(e, "message", ""))
if message:
message += "\n"
message += traceback.format_exc(e)
return test.result_cls(status, message), []
class TestharnessExecutor(TestExecutor):
convert_result = testharness_result_converter
class RefTestExecutor(TestExecutor):
convert_result = reftest_result_converter
def __init__(self, browser, server_config, timeout_multiplier=1, screenshot_cache=None,
debug_info=None, **kwargs):
TestExecutor.__init__(self, browser, server_config,
timeout_multiplier=timeout_multiplier,
debug_info=debug_info)
self.screenshot_cache = screenshot_cache
class RefTestImplementation(object):
def __init__(self, executor):
self.timeout_multiplier = executor.timeout_multiplier
self.executor = executor
# Cache of url:(screenshot hash, screenshot). Typically the
# screenshot is None, but we set this value if a test fails
# and the screenshot was taken from the cache so that we may
# retrieve the screenshot from the cache directly in the future
self.screenshot_cache = self.executor.screenshot_cache
self.message = None
def setup(self):
pass
def teardown(self):
pass
@property
def logger(self):
return self.executor.logger
def get_hash(self, test, viewport_size, dpi):
timeout = test.timeout * self.timeout_multiplier
key = (test.url, viewport_size, dpi)
if key not in self.screenshot_cache:
success, data = self.executor.screenshot(test, viewport_size, dpi)
if not success:
return False, data
screenshot = data
hash_value = hashlib.sha1(screenshot).hexdigest()
self.screenshot_cache[key] = (hash_value, None)
rv = (hash_value, screenshot)
else:
rv = self.screenshot_cache[key]
self.message.append("%s %s" % (test.url, rv[0]))
return True, rv
def is_pass(self, lhs_hash, rhs_hash, relation):
assert relation in ("==", "!=")
self.message.append("Testing %s %s %s" % (lhs_hash, relation, rhs_hash))
return ((relation == "==" and lhs_hash == rhs_hash) or
(relation == "!=" and lhs_hash != rhs_hash))
def run_test(self, test):
viewport_size = test.viewport_size
dpi = test.dpi
self.message = []
# Depth-first search of reference tree, with the goal
# of reachings a leaf node with only pass results
stack = list(((test, item[0]), item[1]) for item in reversed(test.references))
while stack:
hashes = [None, None]
screenshots = [None, None]
nodes, relation = stack.pop()
for i, node in enumerate(nodes):
success, data = self.get_hash(node, viewport_size, dpi)
if success is False:
return {"status": data[0], "message": data[1]}
hashes[i], screenshots[i] = data
if self.is_pass(hashes[0], hashes[1], relation):
if nodes[1].references:
stack.extend(list(((nodes[1], item[0]), item[1]) for item in reversed(nodes[1].references)))
else:
# We passed
return {"status":"PASS", "message": None}
# We failed, so construct a failure message
for i, (node, screenshot) in enumerate(zip(nodes, screenshots)):
if screenshot is None:
success, screenshot = self.retake_screenshot(node, viewport_size, dpi)
if success:
screenshots[i] = screenshot
log_data = [{"url": nodes[0].url, "screenshot": screenshots[0]}, relation,
{"url": nodes[1].url, "screenshot": screenshots[1]}]
return {"status": "FAIL",
"message": "\n".join(self.message),
"extra": {"reftest_screenshots": log_data}}
def retake_screenshot(self, node, viewport_size, dpi):
success, data = self.executor.screenshot(node, viewport_size, dpi)
if not success:
return False, data
key = (node.url, viewport_size, dpi)
hash_val, _ = self.screenshot_cache[key]
self.screenshot_cache[key] = hash_val, data
return True, data
class WdspecExecutor(TestExecutor):
convert_result = pytest_result_converter
class Protocol(object):
def __init__(self, executor, browser):
self.executor = executor
self.browser = browser
@property
def logger(self):
return self.executor.logger
def setup(self, runner):
pass
def teardown(self):
pass
def wait(self):
pass