| """Base class for WebDriver tests.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import json |
| import logging |
| import os |
| import sys |
| import time |
| import unittest |
| |
| # This directory is a package |
| sys.path.insert(0, os.path.abspath(".")) |
| # pylint: disable=C6204,C6203 |
| import c_val_names |
| import tv |
| import tv_testcase_runner |
| import tv_testcase_util |
| |
| # selenium imports |
| # pylint: disable=C0103 |
| WebDriverWait = tv_testcase_util.import_selenium_module( |
| submodule="webdriver.support.ui").WebDriverWait |
| |
| ElementNotVisibleException = tv_testcase_util.import_selenium_module( |
| submodule="common.exceptions").ElementNotVisibleException |
| |
| WINDOWDRIVER_CREATED_TIMEOUT_SECONDS = 30 |
| PAGE_LOAD_WAIT_SECONDS = 30 |
| PROCESSING_TIMEOUT_SECONDS = 15 |
| MEDIA_TIMEOUT_SECONDS = 30 |
| TITLE_CARD_HIDDEN_TIMEOUT_SECONDS = 30 |
| # Currently, after loading a new URL with cookies enabled, Kabuki randomizes |
| # the shelf entries when the page finishes loading. Sleep for a bit to allow |
| # this to occur before starting any additional logic. |
| COBALT_POST_NAVIGATE_SLEEP_SECONDS = 5 |
| |
| _is_initialized = False |
| |
| |
| class TvTestCase(unittest.TestCase): |
| """Base class for WebDriver tests. |
| |
| Style note: snake_case is used for function names here so as to match |
| with an internal class with the same name. |
| """ |
| |
| class WindowDriverCreatedTimeoutException(BaseException): |
| """Exception thrown when WindowDriver was not created in time.""" |
| |
| class ProcessingTimeoutException(BaseException): |
| """Exception thrown when processing did not complete in time.""" |
| |
| class MediaTimeoutException(BaseException): |
| """Exception thrown when media did not complete in time.""" |
| |
| class TitleCardHiddenTimeoutException(BaseException): |
| """Exception thrown when title card did not disappear in time.""" |
| |
| @classmethod |
| def setUpClass(cls): |
| print("Running " + cls.__name__) |
| |
| @classmethod |
| def tearDownClass(cls): |
| print("Done " + cls.__name__) |
| |
| def setUp(self): |
| global _is_initialized |
| if not _is_initialized: |
| # Initialize the tests. This involves loading a URL which applies the |
| # forcedOffAllExperiments cookies, ensuring that no subsequent loads |
| # include experiments. Additionally, loading this URL triggers a reload. |
| query_params = {"env_forcedOffAllExperiments": True} |
| triggers_reload = True |
| self.load_tv(None, query_params, triggers_reload) |
| _is_initialized = True |
| |
| def get_webdriver(self): |
| return tv_testcase_runner.GetWebDriver() |
| |
| def get_windowdriver_created(self): |
| return tv_testcase_runner.GetWindowDriverCreated() |
| |
| def get_cval(self, cval_name): |
| """Returns the Python object represented by a JSON cval string. |
| |
| Args: |
| cval_name: Name of the cval. |
| Returns: |
| Python object represented by the JSON cval string |
| """ |
| javascript_code = "return h5vcc.cVal.getValue('{}')".format(cval_name) |
| json_result = self.get_webdriver().execute_script(javascript_code) |
| if json_result is None: |
| return None |
| else: |
| return json.loads(json_result) |
| |
| def load_tv(self, label=None, additional_query_params=None, |
| triggers_reload=False): |
| """Loads the main TV page and waits for it to display. |
| |
| Args: |
| label: A value for the label query parameter. |
| additional_query_params: A dict containing additional query parameters. |
| triggers_reload: Whether or not the navigation will trigger a reload. |
| Raises: |
| Underlying WebDriver exceptions |
| """ |
| query_params = {} |
| if label is not None: |
| query_params = {"label": label} |
| if additional_query_params is not None: |
| query_params.update(additional_query_params) |
| self.get_windowdriver_created().clear() |
| self.get_webdriver().get(tv_testcase_util.get_tv_url(query_params)) |
| self.wait_for_windowdriver_created() |
| if triggers_reload: |
| self.get_windowdriver_created().clear() |
| self.wait_for_windowdriver_created() |
| time.sleep(COBALT_POST_NAVIGATE_SLEEP_SECONDS) |
| # Note that the internal tests use "expect_transition" which is |
| # a mechanism that sets a maximum timeout for a "@with_retries" |
| # decorator-driven success retry loop for subsequent webdriver requests. |
| # |
| # We'll skip that sophistication here. |
| self.wait_for_processing_complete_after_focused_shelf() |
| |
| def poll_until_found(self, css_selector): |
| """Polls until an element is found. |
| |
| Args: |
| css_selector: A CSS selector |
| Raises: |
| Underlying WebDriver exceptions |
| """ |
| start_time = time.time() |
| while ((not self.find_elements(css_selector)) and |
| (time.time() - start_time < PAGE_LOAD_WAIT_SECONDS)): |
| time.sleep(1) |
| self.assert_displayed(css_selector) |
| |
| def unique_find(self, unique_selector): |
| """Finds and returns a uniquely selected element. |
| |
| Args: |
| unique_selector: A CSS selector that will select only one element |
| Raises: |
| Underlying WebDriver exceptions |
| AssertError: the element isn't unique |
| Returns: |
| Element |
| """ |
| return self.find_elements(unique_selector, expected_num=1)[0] |
| |
| def assert_displayed(self, css_selector): |
| """Asserts that an element is displayed. |
| |
| Args: |
| css_selector: A CSS selector |
| Raises: |
| Underlying WebDriver exceptions |
| AssertError: the element isn't found |
| """ |
| # TODO does not actually assert that it's visible, like webdriver.py |
| # probably does. |
| self.assertTrue(self.unique_find(css_selector)) |
| |
| def find_elements(self, css_selector, expected_num=None): |
| """Finds elements based on a selector. |
| |
| Args: |
| css_selector: A CSS selector |
| expected_num: Expected number of matching elements |
| Raises: |
| Underlying WebDriver exceptions |
| AssertError: expected_num isn't met |
| Returns: |
| Array of selected elements |
| """ |
| elements = self.get_webdriver().find_elements_by_css_selector(css_selector) |
| if expected_num is not None: |
| self.assertEqual(len(elements), expected_num) |
| return elements |
| |
| def send_keys(self, css_selector, keys): |
| """Sends keys to an element uniquely identified by a selector. |
| |
| This method retries for a timeout period if the selected element |
| could not be immediately found. If the retries do not succeed, |
| the underlying exception is passed through. |
| |
| Args: |
| css_selector: A CSS selector |
| keys: key events |
| |
| Raises: |
| Underlying WebDriver exceptions |
| """ |
| start_time = time.time() |
| while True: |
| try: |
| element = self.unique_find(css_selector) |
| element.send_keys(keys) |
| return |
| except ElementNotVisibleException: |
| # TODO ElementNotVisibleException seems to be considered |
| # a "falsy" exception in the internal tests, which seems to mean |
| # it would not be retried. But here, it seems essential. |
| if time.time() - start_time >= PAGE_LOAD_WAIT_SECONDS: |
| raise |
| time.sleep(1) |
| |
| def wait_for_windowdriver_created(self): |
| """Waits for Cobalt to create a WindowDriver.""" |
| windowdriver_created = self.get_windowdriver_created() |
| if not windowdriver_created.wait(WINDOWDRIVER_CREATED_TIMEOUT_SECONDS): |
| raise TvTestCase.WindowDriverCreatedTimeoutException() |
| |
| def wait_for_processing_complete_after_focused_shelf(self): |
| """Waits for Cobalt to focus on a shelf and complete pending layouts.""" |
| self.poll_until_found(tv.FOCUSED_SHELF) |
| self.assert_displayed(tv.FOCUSED_SHELF_TITLE) |
| self.wait_for_processing_complete() |
| |
| def wait_for_processing_complete(self, check_animations=True): |
| """Waits for Cobalt to complete processing. |
| |
| This method requires two consecutive iterations through its loop where |
| Cobalt is not processing before treating processing as complete. This |
| protects against a brief window between two different processing sections |
| being mistaken as completed processing. |
| |
| Args: |
| check_animations: Whether or not animations should be checked when |
| determining if processing is complete. |
| |
| Raises: |
| ProcessingTimeoutException: Processing is not complete within the |
| required time. |
| """ |
| start_time = time.time() |
| count = 0 |
| while count < 2: |
| if self.is_processing(check_animations): |
| count = 0 |
| else: |
| count += 1 |
| |
| if time.time() - start_time > PROCESSING_TIMEOUT_SECONDS: |
| raise TvTestCase.ProcessingTimeoutException() |
| |
| time.sleep(0.1) |
| |
| def is_processing(self, check_animations): |
| """Checks to see if Cobalt is currently processing.""" |
| return (self.get_cval(c_val_names.count_dom_active_dispatch_events()) or |
| self.get_cval(c_val_names.layout_is_dirty()) or |
| (check_animations and |
| self.get_cval(c_val_names.renderer_has_active_animations())) or |
| self.get_cval(c_val_names.count_image_cache_loading_resources())) |
| |
| def wait_for_media_element_playing(self): |
| """Waits for a video to begin playing. |
| |
| Raises: |
| MediaTimeoutException: The video does not start playing within the |
| required time. |
| """ |
| start_time = time.time() |
| while self.get_cval( |
| c_val_names.event_duration_dom_video_start_delay()) == 0: |
| if time.time() - start_time > MEDIA_TIMEOUT_SECONDS: |
| raise TvTestCase.MediaTimeoutException() |
| time.sleep(0.1) |
| |
| def wait_for_title_card_hidden(self): |
| """Waits for the title to disappear while a video is playing. |
| |
| Raises: |
| TitleCardHiddenTimeoutException: The title card did not become hidden in |
| the required time. |
| """ |
| start_time = time.time() |
| while not self.find_elements(tv.TITLE_CARD_HIDDEN): |
| if time.time() - start_time > TITLE_CARD_HIDDEN_TIMEOUT_SECONDS: |
| raise TvTestCase.TitleCardHiddenTimeoutException() |
| time.sleep(1) |
| |
| |
| def main(): |
| logging.basicConfig(level=logging.DEBUG) |
| tv_testcase_runner.main() |