blob: c9a16ca41037763c1b766f0e9b5d7b2cc4c299de [file] [log] [blame]
"""Base class for WebDriver tests."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import logging
import math
import os
import sys
import time
import unittest
# pylint: disable=C6204
try:
# This code works for Python 2.
import urlparse
from urllib import urlencode
except ImportError:
# This code works for Python 3.
import urllib.parse as urlparse
from urllib.parse import urlencode
# This directory is a package
sys.path.insert(0, os.path.abspath("."))
# pylint: disable=C6204,C6203
import partial_layout_benchmark
import tv
# selenium imports
# pylint: disable=C0103
WebDriverWait = partial_layout_benchmark.ImportSeleniumModule(
submodule="webdriver.support.ui").WebDriverWait
ElementNotVisibleException = (partial_layout_benchmark.ImportSeleniumModule(
submodule="common.exceptions").ElementNotVisibleException)
BASE_URL = "https://www.youtube.com/"
TV_APP_PATH = "/tv"
BASE_PARAMS = {"env_forcedOffAllExperiments": True}
PAGE_LOAD_WAIT_SECONDS = 30
LAYOUT_TIMEOUT_SECONDS = 5
# Today, Cobalt's WebDriver has a race that
# can leave the former page's DOM visible after a navigate.
# As a workaround, sleep for a bit
# b/33275371
COBALT_POST_NAVIGATE_SLEEP_SECONDS = 5
def _percentile(results, percentile):
"""Returns the percentile of an array.
This method interpolates between two numbers if the percentile lands between
two data points.
Args:
results: Sortable results array.
percentile: A number ranging from 0-100.
Returns:
Appropriate value.
Raises:
RuntimeError: Raised on invalid args.
"""
if not results:
return None
if percentile > 100 or percentile < 0:
raise RuntimeError("percentile must be 0-100")
sorted_results = sorted(results)
if percentile == 100:
return sorted_results[-1]
fractional, index = math.modf((len(sorted_results) - 1) * (percentile * 0.01))
index = int(index)
if len(sorted_results) == index + 1:
return sorted_results[index]
return sorted_results[index] * (1 - fractional
) + sorted_results[index + 1] * fractional
def _merge_dict(merge_into, merge_from):
"""Merges the second dict into the first dict.
Merge into differs from update in that it will not override values. If the
values already exist, the resulting value will be a list with a union of
existing and new items.
Args:
merge_into: An output dict to merge values into.
merge_from: An input dict to iterate over and insert values from.
Returns:
None
"""
if not merge_from:
return
for k, v in merge_from.items():
try:
existing_value = merge_into[k]
except KeyError:
merge_into[k] = v
continue
if not isinstance(v, list):
v = [v]
if isinstance(existing_value, list):
existing_value.extend(v)
else:
new_value = [existing_value]
new_value.extend(v)
merge_into[k] = new_value
class TvTestCase(unittest.TestCase):
"""Base class for WebDriver tests.
Style note: snake_case is used for function names here so as to match
with an internal class with the same name.
"""
class LayoutTimeoutException(BaseException):
"""Exception thrown when layout did not complete in time."""
@classmethod
def setUpClass(cls):
print("Running " + cls.__name__)
@classmethod
def tearDownClass(cls):
print("Done " + cls.__name__)
def get_webdriver(self):
return partial_layout_benchmark.GetWebDriver()
def get_cval(self, cval_name):
"""Returns value of a cval.
Args:
cval_name: Name of the cval.
Returns:
Value of the cval.
"""
javascript_code = "return h5vcc.cVal.getValue('{}')".format(cval_name)
return self.get_webdriver().execute_script(javascript_code)
def get_int_cval(self, cval_name):
"""Returns int value of a cval.
The cval value must be an integer, if it is a float, then this function will
throw a ValueError.
Args:
cval_name: Name of the cval.
Returns:
Value of the cval.
Raises:
ValueError if the cval is cannot be converted to int.
"""
answer = self.get_cval(cval_name)
if answer is None:
return answer
return int(answer)
def goto(self, path, query_params=None):
"""Goes to a path off of BASE_URL.
Args:
path: URL path without the hostname.
query_params: Dictionary of parameter names and values.
Raises:
Underlying WebDriver exceptions
"""
parsed_url = list(urlparse.urlparse(BASE_URL))
parsed_url[2] = path
query_dict = BASE_PARAMS.copy()
if query_params:
query_dict.update(urlparse.parse_qsl(parsed_url[4]))
_merge_dict(query_dict, query_params)
parsed_url[4] = urlencode(query_dict, doseq=True)
final_url = urlparse.urlunparse(parsed_url)
self.get_webdriver().get(final_url)
time.sleep(COBALT_POST_NAVIGATE_SLEEP_SECONDS)
def load_tv(self, label=None, additional_query_params=None):
"""Loads the main TV page and waits for it to display.
Args:
label: A value for the label query parameter.
additional_query_params: A dict containing additional query parameters.
Raises:
Underlying WebDriver exceptions
"""
query_params = {}
if label is not None:
query_params = {"label": label}
if additional_query_params is not None:
query_params.update(additional_query_params)
self.goto(TV_APP_PATH, query_params)
# Note that the internal tests use "expect_transition" which is
# a mechanism that sets a maximum timeout for a "@with_retries"
# decorator-driven success retry loop for subsequent webdriver requests.
#
# We'll skip that sophistication here.
self.poll_until_found(tv.FOCUSED_SHELF)
def poll_until_found(self, css_selector):
"""Polls until an element is found.
Args:
css_selector: A CSS selector
Raises:
Underlying WebDriver exceptions
"""
start_time = time.time()
while ((not self.find_elements(css_selector)) and
(time.time() - start_time < PAGE_LOAD_WAIT_SECONDS)):
time.sleep(1)
self.assert_displayed(css_selector)
def unique_find(self, unique_selector):
"""Finds and returns a uniquely selected element.
Args:
unique_selector: A CSS selector that will select only one element
Raises:
Underlying WebDriver exceptions
AssertError: the element isn't unique
Returns:
Element
"""
return self.find_elements(unique_selector, expected_num=1)[0]
def assert_displayed(self, css_selector):
"""Asserts that an element is displayed.
Args:
css_selector: A CSS selector
Raises:
Underlying WebDriver exceptions
AssertError: the element isn't found
"""
# TODO does not actually assert that it's visible, like webdriver.py
# probably does.
self.assertTrue(self.unique_find(css_selector))
def find_elements(self, css_selector, expected_num=None):
"""Finds elements based on a selector.
Args:
css_selector: A CSS selector
expected_num: Expected number of matching elements
Raises:
Underlying WebDriver exceptions
AssertError: expected_num isn't met
Returns:
Array of selected elements
"""
elements = self.get_webdriver().find_elements_by_css_selector(css_selector)
if expected_num is not None:
self.assertEqual(len(elements), expected_num)
return elements
def send_keys(self, css_selector, keys):
"""Sends keys to an element uniquely identified by a selector.
This method retries for a timeout period if the selected element
could not be immediately found. If the retries do not succeed,
the underlying exception is passed through.
Args:
css_selector: A CSS selector
keys: key events
Raises:
Underlying WebDriver exceptions
"""
start_time = time.time()
while True:
try:
element = self.unique_find(css_selector)
element.send_keys(keys)
return
except ElementNotVisibleException:
# TODO ElementNotVisibleException seems to be considered
# a "falsy" exception in the internal tests, which seems to mean
# it would not be retried. But here, it seems essential.
if time.time() - start_time >= PAGE_LOAD_WAIT_SECONDS:
raise
time.sleep(1)
def wait_for_layout_complete(self):
"""Waits for Cobalt to complete pending layouts."""
start_time = time.time()
while self.get_int_cval("Event.MainWebModule.IsProcessing"):
if time.time() - start_time > LAYOUT_TIMEOUT_SECONDS:
raise TvTestCase.LayoutTimeoutException()
time.sleep(0.1)
def get_keyup_layout_duration_us(self):
return self.get_int_cval("Event.Duration.MainWebModule.KeyUp")
def wait_for_layout_complete_after_focused_shelf(self):
"""Waits for Cobalt to focus on a shelf and complete pending layouts."""
self.poll_until_found(tv.FOCUSED_SHELF)
self.assert_displayed(tv.FOCUSED_SHELF_TITLE)
self.wait_for_layout_complete()
def record_result(self, name, result):
"""Records an individual scalar result of a benchmark.
Args:
name: name of test case
result: Test result. Must be JSON encodable scalar.
"""
value_to_record = result
string_value_to_record = json.JSONEncoder().encode(value_to_record)
print("tv_testcase RESULT: {} {}".format(name, string_value_to_record))
def record_result_median(self, name, results):
"""Records the median of an array of results.
Args:
name: name of test case
results: Test results array. Must be array of JSON encodable scalar.
"""
value_to_record = _percentile(results, 50)
string_value_to_record = json.JSONEncoder().encode(value_to_record)
print("tv_testcase RESULT: {} {}".format(name, string_value_to_record))
def record_result_percentile(self, name, results, percentile):
"""Records the percentile of an array of results.
Args:
name: The (string) name of test case.
results: Test results array. Must be array of JSON encodable scalars.
percentile: A number ranging from 0-100.
Raises:
RuntimeError: Raised on invalid args.
"""
value_to_record = _percentile(results, percentile)
string_value_to_record = json.JSONEncoder().encode(value_to_record)
print("tv_testcase RESULT: {} {}".format(name, string_value_to_record))
def main():
logging.basicConfig(level=logging.DEBUG)
partial_layout_benchmark.main()