blob: f135f30d0dc0abc1554315886e3eb78fe72ec532 [file] [log] [blame]
"""Base class for tests that need to launch Cobalt."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import os
import sys
import time
import traceback
import unittest
# pylint: disable=C6204
import _env # pylint: disable=unused-import
from cobalt.tools.automated_testing import c_val_names
from cobalt.tools.automated_testing import cobalt_runner
from cobalt.tools.automated_testing import webdriver_utils
from starboard.tools import command_line
# selenium imports
# pylint: disable=C0103
ActionChains = webdriver_utils.import_selenium_module(
submodule='webdriver.common.action_chains').ActionChains
keys = webdriver_utils.import_selenium_module('webdriver.common.keys')
WINDOWDRIVER_CREATED_TIMEOUT_SECONDS = 30
WEBMODULE_LOADED_TIMEOUT_SECONDS = 30
PAGE_LOAD_WAIT_SECONDS = 30
PROCESSING_TIMEOUT_SECONDS = 180
HTML_SCRIPT_ELEMENT_EXECUTE_TIMEOUT_SECONDS = 30
MEDIA_TIMEOUT_SECONDS = 30
DEFAULT_TEST_SUCCESS_MESSAGE = 'Cobalt Test Case passed.'
DEFAULT_URL = 'https://www.youtube.com/tv'
_platform = 'unknown'
class CobaltTestCase(unittest.TestCase):
class WindowDriverCreatedTimeoutException(Exception):
"""Exception thrown when WindowDriver was not created in time."""
class WebModuleLoadedTimeoutException(Exception):
"""Exception thrown when WebModule was not loaded in time."""
class ProcessingTimeoutException(Exception):
"""Exception thrown when processing did not complete in time."""
class HtmlScriptElementExecuteTimeoutException(Exception):
"""Exception thrown when processing did not complete in time."""
def setUp(self):
pass
@classmethod
def setUpClass(cls):
print('Running ' + cls.__name__)
@classmethod
def tearDownClass(cls):
print('Done ' + cls.__name__)
def get_platform(self):
return _platform
def get_webdriver(self):
return cobalt_runner.GetWebDriver()
def send_resume(self):
return cobalt_runner.SendResume()
def send_suspend(self):
return cobalt_runner.SendSuspend()
def get_cval(self, cval_name):
"""Returns the Python object represented by a JSON cval string.
Args:
cval_name: Name of the cval.
Returns:
Python object represented by the JSON cval string
"""
javascript_code = 'return h5vcc.cVal.getValue(\'{}\')'.format(cval_name)
json_result = self.get_webdriver().execute_script(javascript_code)
if json_result is None:
return None
else:
return json.loads(json_result)
def poll_until_found(self, css_selector, expected_num=None):
"""Polls until an element is found.
Args:
css_selector: A CSS selector
expected_num: The expected number of the selector type to be found.
Raises:
Underlying WebDriver exceptions
"""
start_time = time.time()
while ((not self.find_elements(css_selector)) and
(time.time() - start_time < PAGE_LOAD_WAIT_SECONDS)):
time.sleep(1)
if expected_num:
self.find_elements(css_selector, expected_num)
def unique_find(self, unique_selector):
"""Finds and returns a uniquely selected element.
Args:
unique_selector: A CSS selector that will select only one element
Raises:
Underlying WebDriver exceptions
AssertError: the element isn't unique
Returns:
Element
"""
return self.find_elements(unique_selector, expected_num=1)[0]
def assert_displayed(self, css_selector):
"""Asserts that an element is displayed.
Args:
css_selector: A CSS selector
Raises:
Underlying WebDriver exceptions
AssertError: the element isn't found
"""
# TODO does not actually assert that it's visible, like webdriver.py
# probably does.
self.assertTrue(self.unique_find(css_selector))
def find_elements(self, css_selector, expected_num=None):
"""Finds elements based on a selector.
Args:
css_selector: A CSS selector
expected_num: Expected number of matching elements
Raises:
Underlying WebDriver exceptions
AssertError: expected_num isn't met
Returns:
Array of selected elements
"""
elements = self.get_webdriver().find_elements_by_css_selector(css_selector)
if expected_num is not None:
self.assertEqual(len(elements), expected_num)
return elements
def send_keys(self, key_events):
"""Sends keys to whichever element currently has focus.
Args:
key_events: key events
Raises:
Underlying WebDriver exceptions
"""
ActionChains(self.get_webdriver()).send_keys(key_events).perform()
def clear_url_loaded_events(self):
"""Clear the events that indicate that Cobalt finished loading a URL."""
cobalt_runner.GetWindowDriverCreated().clear()
cobalt_runner.GetWebModuleLoaded().clear()
def load_blank(self):
"""Loads about:blank and waits for it to finish.
Raises:
Underlying WebDriver exceptions
"""
self.clear_url_loaded_events()
self.get_webdriver().get('about:blank')
self.wait_for_url_loaded_events()
def wait_for_url_loaded_events(self):
"""Wait for the events indicating that Cobalt finished loading a URL."""
windowdriver_created = cobalt_runner.GetWindowDriverCreated()
if not windowdriver_created.wait(WINDOWDRIVER_CREATED_TIMEOUT_SECONDS):
raise CobaltTestCase.WindowDriverCreatedTimeoutException()
webmodule_loaded = cobalt_runner.GetWebModuleLoaded()
if not webmodule_loaded.wait(WEBMODULE_LOADED_TIMEOUT_SECONDS):
raise CobaltTestCase.WebModuleLoadedTimeoutException()
def wait_for_processing_complete(self, check_animations=True):
"""Waits for Cobalt to complete processing.
This method requires two consecutive iterations through its loop where
Cobalt is not processing before treating processing as complete. This
protects against a brief window between two different processing sections
being mistaken as completed processing.
Args:
check_animations: Whether or not animations should be checked when
determining if processing is complete.
Raises:
ProcessingTimeoutException: Processing is not complete within the
required time.
"""
start_time = time.time()
# First simply check for whether or not the event is still processing.
# There's no need to check anything else while the event is still going on.
# Once it is done processing, it won't get re-set, so there's no need to
# re-check it.
while self.get_cval(c_val_names.event_is_processing()):
if time.time() - start_time > PROCESSING_TIMEOUT_SECONDS:
raise CobaltTestCase.ProcessingTimeoutException()
time.sleep(0.1)
# Now wait for all processing to complete in Cobalt.
count = 0
while count < 2:
if self.is_processing(check_animations):
count = 0
else:
count += 1
if time.time() - start_time > PROCESSING_TIMEOUT_SECONDS:
raise CobaltTestCase.ProcessingTimeoutException()
time.sleep(0.1)
def is_processing(self, check_animations):
"""Checks to see if Cobalt is currently processing."""
return (self.get_cval(c_val_names.count_dom_active_java_script_events()) or
self.get_cval(c_val_names.is_render_tree_generation_pending()) or
self.get_cval(c_val_names.is_render_tree_rasterization_pending()) or
(check_animations and
self.get_cval(c_val_names.renderer_has_active_animations())) or
self.get_cval(c_val_names.count_image_cache_resource_loading()))
def wait_for_media_element_playing(self):
"""Waits for a video to begin playing.
Returns:
Whether or not the video started.
"""
start_time = time.time()
while self.get_cval(
c_val_names.event_duration_dom_video_start_delay()) == 0:
if time.time() - start_time > MEDIA_TIMEOUT_SECONDS:
return False
time.sleep(0.1)
return True
def wait_for_html_script_element_execute_count(self, required_count):
"""Waits for specified number of html script element Execute() calls.
Args:
required_count: the number of executions that must occur
Raises:
HtmlScriptElementExecuteTimeoutException: The required html script element
executions did not occur within the required time.
"""
start_time = time.time()
while self.get_cval(
c_val_names.count_dom_html_script_element_execute()) < required_count:
if time.time() - start_time > HTML_SCRIPT_ELEMENT_EXECUTE_TIMEOUT_SECONDS:
raise CobaltTestCase.HtmlScriptElementExecuteTimeoutException()
time.sleep(0.1)
def is_in_preload(self):
"""We assume Cobalt is in preload mode if no render tree is generated."""
render_tree_count = self.get_cval(
c_val_names.count_rasterize_new_render_tree())
if render_tree_count is not None:
return False
return True
def main(url):
"""Launch Cobalt and run python unit test main function."""
global _platform
arg_parser = command_line.CreateParser()
arg_parser.add_argument(
'-l', '--log_file', help='Logfile pathname. stdout if absent.')
arg_parser.add_argument(
'--success_message', help='Custom message to be printed on test success.')
args, _ = arg_parser.parse_known_args()
# Keep unittest module from seeing these args
sys.argv = sys.argv[:1]
success_message = DEFAULT_TEST_SUCCESS_MESSAGE
if args.success_message is not None:
success_message = args.success_message
_platform = args.platform
if _platform is None:
try:
_platform = os.environ['BUILD_PLATFORM']
except KeyError:
sys.stderr.write('Must specify --platform\n')
sys.exit(1)
if not args.log_file:
log_file = sys.stdout
else:
log_file = open(args.log_file)
if not url:
url = DEFAULT_URL
try:
with cobalt_runner.CobaltRunner(
platform=_platform,
config=args.config,
device_id=args.device_id,
target_params=args.target_params,
out_directory=args.out_directory,
url=url,
log_file=log_file,
success_message=success_message):
unittest.main(
testRunner=unittest.TextTestRunner(verbosity=0, stream=log_file))
except cobalt_runner.TimeoutException:
print('Timeout waiting for Cobalt to start', file=sys.stderr)
sys.exit(1)
# User pressed Ctrl + C, or an error in the launcher caused a shutdown.
except KeyboardInterrupt:
sys.exit(1)
except Exception: # pylint: disable=W0703
sys.stderr.write('Exception while running test:\n')
traceback.print_exc(file=sys.stderr)
sys.exit(1)