blob: 32785f15a6c422aafb14d5d5796cf75a09e9c442 [file] [log] [blame]
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common methods and variables used by Cr-Fuchsia testing infrastructure."""
import enum
import json
import logging
import os
import re
import signal
import shutil
import subprocess
import sys
import time
from argparse import ArgumentParser
from typing import Iterable, List, Optional, Tuple
from compatible_utils import get_ssh_prefix, get_host_arch
DIR_SRC_ROOT = os.path.abspath(
os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir))
IMAGES_ROOT = os.path.join(DIR_SRC_ROOT, 'third_party', 'fuchsia-sdk',
'images')
REPO_ALIAS = 'fuchsia.com'
SDK_ROOT = os.path.join(DIR_SRC_ROOT, 'third_party', 'fuchsia-sdk', 'sdk')
SDK_TOOLS_DIR = os.path.join(SDK_ROOT, 'tools', get_host_arch())
_ENABLE_ZEDBOOT = 'discovery.zedboot.enabled=true'
_FFX_TOOL = os.path.join(SDK_TOOLS_DIR, 'ffx')
# This global variable is used to set the environment variable
# |FFX_ISOLATE_DIR| when running ffx commands in E2E testing scripts.
_FFX_ISOLATE_DIR = None
class TargetState(enum.Enum):
"""State of a target."""
UNKNOWN = enum.auto()
DISCONNECTED = enum.auto()
PRODUCT = enum.auto()
FASTBOOT = enum.auto()
ZEDBOOT = enum.auto()
class BootMode(enum.Enum):
"""Specifies boot mode for device."""
REGULAR = enum.auto()
RECOVERY = enum.auto()
BOOTLOADER = enum.auto()
_STATE_TO_BOOTMODE = {
TargetState.PRODUCT: BootMode.REGULAR,
TargetState.FASTBOOT: BootMode.BOOTLOADER,
TargetState.ZEDBOOT: BootMode.RECOVERY
}
_BOOTMODE_TO_STATE = {value: key for key, value in _STATE_TO_BOOTMODE.items()}
class StateNotFoundError(Exception):
"""Raised when target's state cannot be found."""
class StateTransitionError(Exception):
"""Raised when target does not transition to desired state."""
def _state_string_to_state(state_str: str) -> TargetState:
state_str = state_str.strip().lower()
if state_str == 'product':
return TargetState.PRODUCT
if state_str == 'zedboot (r)':
return TargetState.ZEDBOOT
if state_str == 'fastboot':
return TargetState.FASTBOOT
if state_str == 'unknown':
return TargetState.UNKNOWN
if state_str == 'disconnected':
return TargetState.DISCONNECTED
raise NotImplementedError(f'State {state_str} not supported')
def get_target_state(target_id: Optional[str],
serial_num: Optional[str],
num_attempts: int = 1) -> TargetState:
"""Return state of target or the default target.
Args:
target_id: Optional nodename of the target. If not given, default target
is used.
serial_num: Optional serial number of target. Only usable if device is
in fastboot.
num_attempts: Optional number of times to attempt getting status.
Returns:
TargetState of the given node, if found.
Raises:
StateNotFoundError: If target cannot be found, or default target is not
defined if |target_id| is not given.
"""
for i in range(num_attempts):
targets = json.loads(
run_ffx_command(('target', 'list'),
check=True,
configs=[_ENABLE_ZEDBOOT],
capture_output=True,
json_out=True).stdout.strip())
for target in targets:
if target_id is None and target['is_default']:
return _state_string_to_state(target['target_state'])
if target_id == target['nodename']:
return _state_string_to_state(target['target_state'])
if serial_num == target['serial']:
# Should only return Fastboot.
return _state_string_to_state(target['target_state'])
# Do not sleep for last attempt.
if i < num_attempts - 1:
time.sleep(10)
# Could not find a state for given target.
error_target = target_id
if target_id is None:
error_target = 'default target'
raise StateNotFoundError(f'Could not find state for {error_target}.')
def set_ffx_isolate_dir(isolate_dir: str) -> None:
"""Overwrites |_FFX_ISOLATE_DIR|."""
global _FFX_ISOLATE_DIR # pylint: disable=global-statement
_FFX_ISOLATE_DIR = isolate_dir
def get_host_tool_path(tool):
"""Get a tool from the SDK."""
return os.path.join(SDK_TOOLS_DIR, tool)
def get_host_os():
"""Get host operating system."""
host_platform = sys.platform
if host_platform.startswith('linux'):
return 'linux'
if host_platform.startswith('darwin'):
return 'mac'
raise Exception('Unsupported host platform: %s' % host_platform)
def make_clean_directory(directory_name):
"""If the directory exists, delete it and remake with no contents."""
if os.path.exists(directory_name):
shutil.rmtree(directory_name)
os.mkdir(directory_name)
def _get_daemon_status():
"""Determines daemon status via `ffx daemon socket`.
Returns:
dict of status of the socket. Status will have a key Running or
NotRunning to indicate if the daemon is running.
"""
status = json.loads(
run_ffx_command(('daemon', 'socket'),
check=True,
capture_output=True,
json_out=True,
suppress_repair=True).stdout.strip())
return status.get('pid', {}).get('status', {'NotRunning': True})
def _is_daemon_running():
return 'Running' in _get_daemon_status()
def check_ssh_config_file() -> None:
"""Checks for ssh keys and generates them if they are missing."""
script_path = os.path.join(SDK_ROOT, 'bin', 'fuchsia-common.sh')
check_cmd = ['bash', '-c', f'. {script_path}; check-fuchsia-ssh-config']
subprocess.run(check_cmd, check=True)
def _wait_for_daemon(start=True, timeout_seconds=100):
"""Waits for daemon to reach desired state in a polling loop.
Sleeps for 5s between polls.
Args:
start: bool. Indicates to wait for daemon to start up. If False,
indicates waiting for daemon to die.
timeout_seconds: int. Number of seconds to wait for the daemon to reach
the desired status.
Raises:
TimeoutError: if the daemon does not reach the desired state in time.
"""
wanted_status = 'start' if start else 'stop'
sleep_period_seconds = 5
attempts = int(timeout_seconds / sleep_period_seconds)
for i in range(attempts):
if _is_daemon_running() == start:
return
if i != attempts:
logging.info('Waiting for daemon to %s...', wanted_status)
time.sleep(sleep_period_seconds)
raise TimeoutError(f'Daemon did not {wanted_status} in time.')
def _run_repair_command(output):
"""Scans |output| for a self-repair command to run and, if found, runs it.
Returns:
True if a repair command was found and ran successfully. False otherwise.
"""
# Check for a string along the lines of:
# "Run `ffx doctor --restart-daemon` for further diagnostics."
match = re.search('`ffx ([^`]+)`', output)
if not match or len(match.groups()) != 1:
return False # No repair command found.
args = match.groups()[0].split()
try:
run_ffx_command(args, suppress_repair=True)
# Need the daemon to be up at the end of this.
_wait_for_daemon(start=True)
except subprocess.CalledProcessError:
return False # Repair failed.
return True # Repair succeeded.
def run_ffx_command(cmd: Iterable[str],
target_id: Optional[str] = None,
check: bool = True,
suppress_repair: bool = False,
configs: Optional[List[str]] = None,
json_out: bool = False,
**kwargs) -> subprocess.CompletedProcess:
"""Runs `ffx` with the given arguments, waiting for it to exit.
If `ffx` exits with a non-zero exit code, the output is scanned for a
recommended repair command (e.g., "Run `ffx doctor --restart-daemon` for
further diagnostics."). If such a command is found, it is run and then the
original command is retried. This behavior can be suppressed via the
`suppress_repair` argument.
Args:
cmd: A sequence of arguments to ffx.
target_id: Whether to execute the command for a specific target. The
target_id could be in the form of a nodename or an address.
check: If True, CalledProcessError is raised if ffx returns a non-zero
exit code.
suppress_repair: If True, do not attempt to find and run a repair
command.
configs: A list of configs to be applied to the current command.
json_out: Have command output returned as JSON. Must be parsed by
caller.
Returns:
A CompletedProcess instance
Raises:
CalledProcessError if |check| is true.
"""
ffx_cmd = [_FFX_TOOL]
if json_out:
ffx_cmd.extend(('--machine', 'json'))
if target_id:
ffx_cmd.extend(('--target', target_id))
if configs:
for config in configs:
ffx_cmd.extend(('--config', config))
ffx_cmd.extend(cmd)
env = os.environ
if _FFX_ISOLATE_DIR:
env['FFX_ISOLATE_DIR'] = _FFX_ISOLATE_DIR
try:
if not suppress_repair:
# If we want to repair, we need to capture output in STDOUT and
# STDERR. This could conflict with expectations of the caller.
output_captured = kwargs.get('capture_output') or (
kwargs.get('stdout') and kwargs.get('stderr'))
if not output_captured:
# Force output to combine into STDOUT.
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.STDOUT
return subprocess.run(ffx_cmd,
check=check,
encoding='utf-8',
env=env,
**kwargs)
except subprocess.CalledProcessError as cpe:
logging.error('%s %s failed with returncode %s.',
os.path.relpath(_FFX_TOOL),
subprocess.list2cmdline(ffx_cmd[1:]), cpe.returncode)
if cpe.output:
logging.error('stdout of the command: %s', cpe.output)
if suppress_repair or (cpe.output
and not _run_repair_command(cpe.output)):
raise
# If the original command failed but a repair command was found and
# succeeded, try one more time with the original command.
return run_ffx_command(cmd, target_id, check, True, configs, json_out,
**kwargs)
def run_continuous_ffx_command(cmd: Iterable[str],
target_id: Optional[str] = None,
encoding: Optional[str] = 'utf-8',
**kwargs) -> subprocess.Popen:
"""Runs an ffx command asynchronously."""
ffx_cmd = [_FFX_TOOL]
if target_id:
ffx_cmd.extend(('--target', target_id))
ffx_cmd.extend(cmd)
return subprocess.Popen(ffx_cmd, encoding=encoding, **kwargs)
def read_package_paths(out_dir: str, pkg_name: str) -> List[str]:
"""
Returns:
A list of the absolute path to all FAR files the package depends on.
"""
with open(
os.path.join(DIR_SRC_ROOT, out_dir, 'gen', 'package_metadata',
f'{pkg_name}.meta')) as meta_file:
data = json.load(meta_file)
packages = []
for package in data['packages']:
packages.append(os.path.join(DIR_SRC_ROOT, out_dir, package))
return packages
def register_common_args(parser: ArgumentParser) -> None:
"""Register commonly used arguments."""
common_args = parser.add_argument_group('common', 'common arguments')
common_args.add_argument(
'--out-dir',
'-C',
type=os.path.realpath,
help='Path to the directory in which build files are located. ')
def register_device_args(parser: ArgumentParser) -> None:
"""Register device arguments."""
device_args = parser.add_argument_group('device', 'device arguments')
device_args.add_argument('--target-id',
default=os.environ.get('FUCHSIA_NODENAME'),
help=('Specify the target device. This could be '
'a node-name (e.g. fuchsia-emulator) or an '
'an ip address along with an optional port '
'(e.g. [fe80::e1c4:fd22:5ee5:878e]:22222, '
'1.2.3.4, 1.2.3.4:33333). If unspecified, '
'the default target in ffx will be used.'))
def register_log_args(parser: ArgumentParser) -> None:
"""Register commonly used arguments."""
log_args = parser.add_argument_group('logging', 'logging arguments')
log_args.add_argument('--logs-dir',
type=os.path.realpath,
help=('Directory to write logs to.'))
def get_component_uri(package: str) -> str:
"""Retrieve the uri for a package."""
return f'fuchsia-pkg://{REPO_ALIAS}/{package}#meta/{package}.cm'
def resolve_packages(packages: List[str], target_id: Optional[str]) -> None:
"""Ensure that all |packages| are installed on a device."""
ssh_prefix = get_ssh_prefix(get_ssh_address(target_id))
subprocess.run(ssh_prefix + ['--', 'pkgctl', 'gc'], check=False)
for package in packages:
resolve_cmd = [
'--', 'pkgctl', 'resolve',
'fuchsia-pkg://%s/%s' % (REPO_ALIAS, package)
]
retry_command(ssh_prefix + resolve_cmd)
def retry_command(cmd: List[str], retries: int = 2,
**kwargs) -> Optional[subprocess.CompletedProcess]:
"""Helper function for retrying a subprocess.run command."""
for i in range(retries):
if i == retries - 1:
proc = subprocess.run(cmd, **kwargs, check=True)
return proc
proc = subprocess.run(cmd, **kwargs, check=False)
if proc.returncode == 0:
return proc
time.sleep(3)
return None
def get_ssh_address(target_id: Optional[str]) -> str:
"""Determines SSH address for given target."""
return run_ffx_command(('target', 'get-ssh-address'),
target_id,
capture_output=True).stdout.strip()
def find_in_dir(target_name: str, parent_dir: str) -> Optional[str]:
"""Finds path in SDK.
Args:
target_name: Name of target to find, as a string.
parent_dir: Directory to start search in.
Returns:
Full path to the target, None if not found.
"""
# Doesn't make sense to look for a full path. Only extract the basename.
target_name = os.path.basename(target_name)
for root, dirs, _ in os.walk(parent_dir):
if target_name in dirs:
return os.path.abspath(os.path.join(root, target_name))
return None
def find_image_in_sdk(product_name: str) -> Optional[str]:
"""Finds image dir in SDK for product given.
Args:
product_name: Name of product's image directory to find.
Returns:
Full path to the target, None if not found.
"""
top_image_dir = os.path.join(SDK_ROOT, os.pardir, 'images')
path = find_in_dir(product_name, parent_dir=top_image_dir)
if path:
return find_in_dir('images', parent_dir=path)
return path
def catch_sigterm() -> None:
"""Catches the kill signal and allows the process to exit cleanly."""
def _sigterm_handler(*_):
sys.exit(0)
signal.signal(signal.SIGTERM, _sigterm_handler)
def get_system_info(target: Optional[str] = None) -> Tuple[str, str]:
"""Retrieves installed OS version frm device.
Returns:
Tuple of strings, containing {product, version number), or a pair of
empty strings to indicate an error.
"""
info_cmd = run_ffx_command(('target', 'show', '--json'),
target_id=target,
capture_output=True,
check=False)
if info_cmd.returncode == 0:
info_json = json.loads(info_cmd.stdout.strip())
for info in info_json:
if info['title'] == 'Build':
return (info['child'][1]['value'], info['child'][0]['value'])
# If the information was not retrieved, return empty strings to indicate
# unknown system info.
return ('', '')
def boot_device(target_id: Optional[str],
mode: BootMode,
serial_num: Optional[str] = None,
must_boot: bool = False) -> None:
"""Boot device into desired mode, with fallback to SSH on failure.
Args:
target_id: Optional target_id of device.
mode: Desired boot mode.
must_boot: Forces device to boot, regardless of current state.
Raises:
StateTransitionError: When final state of device is not desired.
"""
# Skip boot call if already in the state and not skipping check.
state = get_target_state(target_id, serial_num, num_attempts=3)
wanted_state = _BOOTMODE_TO_STATE.get(mode)
if not must_boot:
logging.debug('Current state %s. Want state %s', str(state),
str(wanted_state))
must_boot = state != wanted_state
if not must_boot:
logging.debug('Skipping boot - already in good state')
return
def _reboot(reboot_cmd, current_state: TargetState):
reboot_cmd()
local_state = None
# Check that we transition out of current state.
for _ in range(30):
try:
local_state = get_target_state(target_id, serial_num)
if local_state != current_state:
# Changed states - can continue
break
except StateNotFoundError:
logging.debug('Device disconnected...')
if current_state != TargetState.DISCONNECTED:
# Changed states - can continue
break
finally:
time.sleep(2)
else:
logging.warning(
'Device did not change from initial state. Exiting early')
return local_state or TargetState.DISCONNECTED
# Now we want to transition to the new state.
for _ in range(90):
try:
local_state = get_target_state(target_id, serial_num)
if local_state == wanted_state:
return local_state
except StateNotFoundError:
logging.warning('Could not find target state.'
' Sleeping then retrying...')
finally:
time.sleep(2)
return local_state or TargetState.DISCONNECTED
state = _reboot(
(lambda: _boot_device_ffx(target_id, serial_num, state, mode)), state)
if state == TargetState.DISCONNECTED:
raise StateNotFoundError('Target could not be found!')
if state == wanted_state:
return
logging.warning(
'Booting with FFX to %s did not succeed. Attempting with DM', mode)
# Fallback to SSH, with no retry if we tried with ffx.:
state = _reboot(
(lambda: _boot_device_dm(target_id, serial_num, state, mode)), state)
if state != wanted_state:
raise StateTransitionError(
f'Could not get device to desired state. Wanted {wanted_state},'
f' got {state}')
logging.debug('Got desired state: %s', state)
def _boot_device_ffx(target_id: Optional[str], serial_num: Optional[str],
current_state: TargetState, mode: BootMode):
cmd = ['target', 'reboot']
if mode == BootMode.REGULAR:
logging.info('Triggering regular boot')
elif mode == BootMode.RECOVERY:
cmd.append('-r')
elif mode == BootMode.BOOTLOADER:
cmd.append('-b')
else:
raise NotImplementedError(f'BootMode {mode} not supported')
logging.debug('FFX reboot with command [%s]', ' '.join(cmd))
if current_state == TargetState.FASTBOOT:
run_ffx_command(cmd,
configs=[_ENABLE_ZEDBOOT],
target_id=serial_num,
check=False)
else:
run_ffx_command(cmd,
configs=[_ENABLE_ZEDBOOT],
target_id=target_id,
check=False)
def _boot_device_dm(target_id: Optional[str], serial_num: Optional[str],
current_state: TargetState, mode: BootMode):
# Can only use DM if device is in regular boot.
if current_state != TargetState.PRODUCT:
if mode == BootMode.REGULAR:
raise StateTransitionError('Cannot boot to Regular via DM - '
'FFX already failed to do so.')
# Boot to regular.
_boot_device_ffx(target_id, serial_num, current_state,
BootMode.REGULAR)
ssh_prefix = get_ssh_prefix(get_ssh_address(target_id))
reboot_cmd = None
if mode == BootMode.REGULAR:
reboot_cmd = 'reboot'
elif mode == BootMode.RECOVERY:
reboot_cmd = 'reboot-recovery'
elif mode == BootMode.BOOTLOADER:
reboot_cmd = 'reboot-bootloader'
else:
raise NotImplementedError(f'BootMode {mode} not supported')
# Boot commands can fail due to SSH connections timeout.
full_cmd = ssh_prefix + ['--', 'dm', reboot_cmd]
logging.debug('DM reboot with command [%s]', ' '.join(full_cmd))
subprocess.run(full_cmd, check=False)