blob: 27c670cf415dee83af4e7c610ce4295f9b88ed03 [file] [log] [blame]
#
# Copyright 2023 The Cobalt Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""RDK implementation of Starboard launcher abstraction."""
import functools
import logging
import os
import re
import signal
import six
import sys
import threading
import time
import contextlib
import pexpect
from starboard.tools import abstract_launcher
from starboard.shared import retry
# pylint: disable=unused-argument
def _sigint_or_sigterm_handler(signum, frame):
"""Clean up and exit with status |signum|.
Args:
signum: Signal number that triggered this callback. Passed in when the
signal handler is called by python runtime.
frame: Current stack frame. Passed in when the signal handler is called by
python runtime.
"""
sys.exit(signum)
# First call returns True, otherwise return false.
def first_run():
v = globals()
if 'first_run' not in v:
v['first_run'] = False
return True
return False
class Launcher(abstract_launcher.AbstractLauncher):
"""Class for launching Cobalt/tools on RDK."""
_STARTUP_TIMEOUT_SECONDS = 1800
_RDK_USERNAME = 'root'
_RDK_PASSWORD = ''
_RDK_PROMPT = 'root@AmlogicFirebolt:'
_RDK_LOG_FILE = '/opt/logs/wpeframework.log'
_SSH_LOGIN_SIGNAL = 'cobalt-launcher-login-success'
_SSH_SLEEP_SIGNAL = 'cobalt-launcher-done-sleeping'
# pexpect times out each second to allow Kill to quickly stop a test run
_PEXPECT_TIMEOUT = 1
# SSH shell command retries
_PEXPECT_SPAWN_RETRIES = 20
# pexpect.sendline retries
_PEXPECT_SENDLINE_RETRIES = 3
# Old process kill retries
_KILL_RETRIES = 3
_PEXPECT_SHUTDOWN_SLEEP_TIME = 3
# Time to wait after processes were killed
_PROCESS_KILL_SLEEP_TIME = 10
# Retrys for getting a clean prompt
_PROMPT_WAIT_MAX_RETRIES = 5
# Wait up to 10 seconds for the password prompt from the RDK
_PEXPECT_PASSWORD_TIMEOUT_MAX_RETRIES = 10
# Wait up to 900 seconds for new output from the RDK
_PEXPECT_READLINE_TIMEOUT_MAX_RETRIES = 900
# Delay between subsequent SSH commands
_INTER_COMMAND_DELAY_SECONDS = 1.5
# This is used to strip ansi color codes from pexpect output.
_PEXPECT_SANITIZE_LINE_RE = re.compile(r'\x1b[^m]*m')
# Exceptions to retry
_RETRY_EXCEPTIONS = (pexpect.TIMEOUT, pexpect.ExceptionPexpect,
pexpect.exceptions.EOF, OSError)
def __init__(self, platform, target_name, config, device_id, **kwargs):
# pylint: disable=super-with-arguments
super().__init__(platform, target_name, config, device_id, **kwargs)
env = os.environ.copy()
env.update(self.env_variables)
self.full_env = env
self.platform = platform
self.device_id = device_id
if not self.device_id:
self.device_id = self.full_env.get('RDK_ADDR')
if not self.device_id:
raise ValueError(
'Unable to determine target, please pass it in, or set RDK_ADDR '
'environment variable.')
self.startup_timeout_seconds = Launcher._STARTUP_TIMEOUT_SECONDS
self.pexpect_process = None
self._InitPexpectCommands()
self.run_inactive = threading.Event()
self.run_inactive.set()
self.shutdown_initiated = threading.Event()
self.log_targets = kwargs.get('log_targets', True)
signal.signal(signal.SIGINT, functools.partial(_sigint_or_sigterm_handler))
signal.signal(signal.SIGTERM, functools.partial(_sigint_or_sigterm_handler))
self.last_run_pexpect_cmd = ''
def _InitPexpectCommands(self):
"""Initializes all of the pexpect commands needed for running the test."""
# Ensure no trailing slashes
self.out_directory = self.out_directory.rstrip('/')
rdk_user_hostname = f'{Launcher._RDK_USERNAME}@{self.device_id}'
rdk_test_dir = '/usr/share/content/data/app'
# RDK has limited storage space left on the partition /usr and may
# not be able to fit the entire test target with content folder.
# Therefore the test data will be uploaded to /data partition and create
# symlink under /usr/share/content/data/app for loader to use.
rdk_storage_dir = '/data/cobalt_test_data'
# rsync command setup
options = '-avzLhc'
source = os.path.join(self.out_directory, 'content', 'app',
self.target_name)
destination = f'{rdk_user_hostname}:{rdk_storage_dir}/'
self.rsync_command = 'rsync ' + options + ' ' + source + ' ' + destination
# ssh command setup
rsa_options = (
'-o \"LogLevel ERROR\" '
'-o \"UserKnownHostsFile=/dev/null\" -o \"StrictHostKeyChecking=no\"')
self.ssh_command = (f'ssh -t {rsa_options} {rdk_user_hostname} '
f'TERM=dumb bash -l')
# test file preparation
# The very last 'cobalt' in the path is to meet the hard coded path
# used in RDK's loader
rdk_tmp = '/var/lib/persistent/rdkservices/Cobalt/Cobalt/.cobalt_storage'
self.test_prep_command = (f'rm -rf {rdk_tmp}; '
f'rm -rf {rdk_test_dir}/cobalt; '
f'ln -s {rdk_storage_dir}/{self.target_name} '
f'{rdk_test_dir}/cobalt')
# test output tags
self.test_complete_tag_1 = 'test suite ran.'
self.test_complete_tag_2 = 'test suites ran.'
self.test_failure_tag = 'tests, listed below'
self.test_success_tag = 'succeeded'
def _request_payload(method):
"""Create the request paylaod needed for running the test."""
cmd_test_param = (f'"sbmainargs":{self.target_command_line_params}'
if self.target_command_line_params else '')
json_cmd = (
f'\'{{"jsonrpc": "2.0","id": 3,"method": "org.rdk.RDKShell.{method}",'
f'"params": {{"callsign":"YouTube","type":"Cobalt",'
f'"configuration":{{ {cmd_test_param}}} }} }}\' ')
return json_cmd
# test command setup
cmd_log = f'tail -f {Launcher._RDK_LOG_FILE}'
self.test_command = (f'curl -X POST http://127.0.0.1:9998/jsonrpc -d '
f'{_request_payload("1.launch")}; '
f'{cmd_log}')
self.terminate_command = (f'curl -X POST http://127.0.0.1:9998/jsonrpc -d '
f'{_request_payload("destroy")}; ')
# pylint: disable=no-method-argument
def _CommandBackoff():
time.sleep(Launcher._INTER_COMMAND_DELAY_SECONDS)
def _ShutdownBackoff(self):
Launcher._CommandBackoff()
return self.shutdown_initiated.is_set()
@retry.retry(
exceptions=_RETRY_EXCEPTIONS,
retries=_PEXPECT_SPAWN_RETRIES,
backoff=_CommandBackoff)
def _PexpectSpawnAndConnect(self, command):
"""Spawns a process with pexpect and connect to the RDK.
Args:
command: The command to use when spawning the pexpect process.
"""
logging.info('executing: %s', command)
kwargs = {} if six.PY2 else {'encoding': 'utf-8'}
self.pexpect_process = pexpect.spawn(
command, timeout=Launcher._PEXPECT_TIMEOUT, **kwargs)
# Let pexpect output directly to our output stream
self.pexpect_process.logfile_read = self.output_file
expected_prompts = [
r'.*Are\syou\ssure.*', # Fingerprint verification
r'.* password:', # Password prompt
'.*[a-zA-Z]+.*', # Any other text input
]
# pylint: disable=unnecessary-lambda
@retry.retry(
exceptions=Launcher._RETRY_EXCEPTIONS,
retries=Launcher._PEXPECT_PASSWORD_TIMEOUT_MAX_RETRIES,
backoff=lambda: self._ShutdownBackoff(),
wrap_exceptions=False)
def _inner():
i = self.pexpect_process.expect(expected_prompts)
if i == 0:
self._PexpectSendLine('yes')
elif i == 1:
self._PexpectSendLine(Launcher._RDK_PASSWORD)
else:
# If any other input comes in, maybe we've logged in with rsa key or
# RDK does not have password. Check if we've logged in by echoing
# a special sentence and expect it back.
self._PexpectSendLine('echo ' + Launcher._SSH_LOGIN_SIGNAL)
i = self.pexpect_process.expect([Launcher._SSH_LOGIN_SIGNAL])
_inner()
@retry.retry(
exceptions=_RETRY_EXCEPTIONS,
retries=_PEXPECT_SENDLINE_RETRIES,
wrap_exceptions=False)
def _PexpectSendLine(self, cmd):
"""Send lines to Pexpect and record the last command for logging purposes"""
logging.info('sending >> : %s ', cmd)
self.last_run_pexpect_cmd = cmd
self.pexpect_process.sendline(cmd)
def _PexpectReadLines(self):
"""Reads all lines from the pexpect process."""
# pylint: disable=unnecessary-lambda
@retry.retry(
exceptions=Launcher._RETRY_EXCEPTIONS,
retries=Launcher._PEXPECT_READLINE_TIMEOUT_MAX_RETRIES,
backoff=lambda: self.shutdown_initiated.is_set(),
wrap_exceptions=False)
def _readloop():
while True:
# Sanitize the line to remove ansi color codes.
line = Launcher._PEXPECT_SANITIZE_LINE_RE.sub(
'', self.pexpect_process.readline())
self.output_file.flush()
if not line:
return
# Check for the test complete tag. It will be followed by either a
# success or failure tag.
if (line.find(self.test_complete_tag_1) != -1 or
line.find(self.test_complete_tag_2) != -1):
self.return_value = 0
return
_readloop()
def _Sleep(self, val):
self._PexpectSendLine(f'sleep {val};echo {Launcher._SSH_SLEEP_SIGNAL}')
self.pexpect_process.expect([Launcher._SSH_SLEEP_SIGNAL])
def _CleanupPexpectProcess(self):
"""Closes current pexpect process."""
if self.pexpect_process is not None and self.pexpect_process.isalive():
# Check if kernel logged OOM kill or any other system failure message
if self.return_value:
logging.info('Sending dmesg')
with contextlib.suppress(Launcher._RETRY_EXCEPTIONS):
self._PexpectSendLine('dmesg -P --color=never | tail -n 100')
time.sleep(self._PEXPECT_SHUTDOWN_SLEEP_TIME)
with contextlib.suppress(Launcher._RETRY_EXCEPTIONS):
self.pexpect_process.readlines()
logging.info('Done sending dmesg')
# Send ctrl-c to the RDK and close the process.
with contextlib.suppress(Launcher._RETRY_EXCEPTIONS):
self._PexpectSendLine(chr(3))
time.sleep(self._PEXPECT_TIMEOUT) # Allow time for normal shutdown
with contextlib.suppress(Launcher._RETRY_EXCEPTIONS):
self.pexpect_process.close()
def _WaitForPrompt(self):
"""Sends empty commands, until a bash prompt is returned"""
def backoff():
self._PexpectSendLine('echo ' + Launcher._SSH_SLEEP_SIGNAL)
return self._ShutdownBackoff()
retry.with_retry(
lambda: self.pexpect_process.expect(self._RDK_PROMPT),
exceptions=Launcher._RETRY_EXCEPTIONS,
retries=Launcher._PROMPT_WAIT_MAX_RETRIES,
backoff=backoff,
wrap_exceptions=False)
@retry.retry(
exceptions=_RETRY_EXCEPTIONS,
retries=_KILL_RETRIES,
backoff=_CommandBackoff)
def _KillExistingCobaltProcesses(self):
"""If there are leftover Cobalt processes, kill them.
It is possible that a previous process did not exit cleanly.
Zombie Cobalt instances can block the WebDriver port or
cause other problems.
"""
logging.info('Killing existing processes')
self._PexpectSendLine(self.terminate_command)
self._WaitForPrompt()
logging.info('Done killing existing processes')
def Run(self):
"""Runs launcher's executable on the target RDK.
Returns:
Whether or not the run finished successfully.
"""
if self.log_targets:
logging.info('-' * 32)
logging.info('Starting to run target: %s', self.target_name)
logging.info('=' * 32)
self.return_value = 1
try:
# Notify other threads that the run is now active
self.run_inactive.clear()
# copy the test files to the RDK
if not self.shutdown_initiated.is_set():
self._PexpectSpawnAndConnect(self.rsync_command)
if not self.shutdown_initiated.is_set():
self._PexpectReadLines()
# ssh into the RDK and run the test
if not self.shutdown_initiated.is_set():
self._PexpectSpawnAndConnect(self.ssh_command)
self._Sleep(self._INTER_COMMAND_DELAY_SECONDS)
# Execute debugging commands on the first run
first_run_commands = []
if self.test_result_xml_path:
first_run_commands.append(f'touch {self.test_result_xml_path}')
first_run_commands.extend(['free -mh', 'ps -ux', 'df -h'])
if first_run():
for cmd in first_run_commands:
if not self.shutdown_initiated.is_set():
self._PexpectSendLine(cmd)
def _readline():
line = self.pexpect_process.readline()
self.output_file.write(line)
retry.with_retry(
_readline,
exceptions=Launcher._RETRY_EXCEPTIONS,
retries=Launcher._PROMPT_WAIT_MAX_RETRIES)
self._WaitForPrompt()
self.output_file.flush()
self._Sleep(self._INTER_COMMAND_DELAY_SECONDS)
self._KillExistingCobaltProcesses()
self._Sleep(self._INTER_COMMAND_DELAY_SECONDS)
if not self.shutdown_initiated.is_set():
self._PexpectSendLine(self.test_prep_command)
self._Sleep(self._INTER_COMMAND_DELAY_SECONDS)
self._PexpectSendLine(self.test_command)
self._PexpectReadLines()
except retry.RetriesExceeded:
logging.exception('Command retry exceeded (cmd: %s)',
self.last_run_pexpect_cmd)
except pexpect.EOF:
logging.exception('pexpect encountered EOF while reading line. (cmd: %s)',
self.last_run_pexpect_cmd)
except pexpect.TIMEOUT:
logging.exception('pexpect timed out while reading line. (cmd: %s)',
self.last_run_pexpect_cmd)
except Exception: # pylint: disable=broad-except
logging.exception('Error occurred while running test. (cmd: %s)',
self.last_run_pexpect_cmd)
finally:
self._CleanupPexpectProcess()
# Notify other threads that the run is no longer active
self.run_inactive.set()
if self.log_targets:
logging.info('-' * 32)
logging.info('Finished running target: %s', self.target_name)
logging.info('=' * 32)
return self.return_value
def Kill(self):
"""Stops the run so that the launcher can be killed."""
sys.stderr.write('\n***Killing Launcher***\n')
if self.run_inactive.is_set():
return
# Initiate the shutdown. This causes the run to abort within one second.
self.shutdown_initiated.set()
# Wait up to three seconds for the run to be set to inactive.
self.run_inactive.wait(Launcher._PEXPECT_SHUTDOWN_SLEEP_TIME)
def GetDeviceIp(self):
"""Gets the device IP."""
return self.device_id
def GetDeviceOutputPath(self):
"""Writable path where test targets can output files"""
return '/tmp'