| """ |
| Base class for gdb-remote test cases. |
| """ |
| |
| from __future__ import print_function |
| |
| |
| import errno |
| import os |
| import os.path |
| import platform |
| import random |
| import re |
| import select |
| import signal |
| import socket |
| import subprocess |
| import sys |
| import tempfile |
| import time |
| from lldbsuite.test import configuration |
| from lldbsuite.test.lldbtest import * |
| from lldbgdbserverutils import * |
| import logging |
| |
| |
| class _ConnectionRefused(IOError): |
| pass |
| |
| |
| class GdbRemoteTestCaseBase(TestBase): |
| |
| NO_DEBUG_INFO_TESTCASE = True |
| |
| _TIMEOUT_SECONDS = 120 |
| |
| _GDBREMOTE_KILL_PACKET = "$k#6b" |
| |
| # Start the inferior separately, attach to the inferior on the stub |
| # command line. |
| _STARTUP_ATTACH = "attach" |
| # Start the inferior separately, start the stub without attaching, allow |
| # the test to attach to the inferior however it wants (e.g. $vAttach;pid). |
| _STARTUP_ATTACH_MANUALLY = "attach_manually" |
| # Start the stub, and launch the inferior with an $A packet via the |
| # initial packet stream. |
| _STARTUP_LAUNCH = "launch" |
| |
| # GDB Signal numbers that are not target-specific used for common |
| # exceptions |
| TARGET_EXC_BAD_ACCESS = 0x91 |
| TARGET_EXC_BAD_INSTRUCTION = 0x92 |
| TARGET_EXC_ARITHMETIC = 0x93 |
| TARGET_EXC_EMULATION = 0x94 |
| TARGET_EXC_SOFTWARE = 0x95 |
| TARGET_EXC_BREAKPOINT = 0x96 |
| |
| _verbose_log_handler = None |
| _log_formatter = logging.Formatter( |
| fmt='%(asctime)-15s %(levelname)-8s %(message)s') |
| |
| def setUpBaseLogging(self): |
| self.logger = logging.getLogger(__name__) |
| |
| if len(self.logger.handlers) > 0: |
| return # We have set up this handler already |
| |
| self.logger.propagate = False |
| self.logger.setLevel(logging.DEBUG) |
| |
| # log all warnings to stderr |
| handler = logging.StreamHandler() |
| handler.setLevel(logging.WARNING) |
| handler.setFormatter(self._log_formatter) |
| self.logger.addHandler(handler) |
| |
| def isVerboseLoggingRequested(self): |
| # We will report our detailed logs if the user requested that the "gdb-remote" channel is |
| # logged. |
| return any(("gdb-remote" in channel) |
| for channel in lldbtest_config.channels) |
| |
| def setUp(self): |
| TestBase.setUp(self) |
| |
| self.setUpBaseLogging() |
| self.debug_monitor_extra_args = [] |
| self._pump_queues = socket_packet_pump.PumpQueues() |
| |
| if self.isVerboseLoggingRequested(): |
| # If requested, full logs go to a log file |
| self._verbose_log_handler = logging.FileHandler( |
| self.log_basename + "-host.log") |
| self._verbose_log_handler.setFormatter(self._log_formatter) |
| self._verbose_log_handler.setLevel(logging.DEBUG) |
| self.logger.addHandler(self._verbose_log_handler) |
| |
| self.test_sequence = GdbRemoteTestSequence(self.logger) |
| self.set_inferior_startup_launch() |
| self.port = self.get_next_port() |
| self.named_pipe_path = None |
| self.named_pipe = None |
| self.named_pipe_fd = None |
| self.stub_sends_two_stop_notifications_on_kill = False |
| if configuration.lldb_platform_url: |
| if configuration.lldb_platform_url.startswith('unix-'): |
| url_pattern = '(.+)://\[?(.+?)\]?/.*' |
| else: |
| url_pattern = '(.+)://(.+):\d+' |
| scheme, host = re.match( |
| url_pattern, configuration.lldb_platform_url).groups() |
| if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': |
| self.stub_device = host |
| self.stub_hostname = 'localhost' |
| else: |
| self.stub_device = None |
| self.stub_hostname = host |
| else: |
| self.stub_hostname = "localhost" |
| |
| def tearDown(self): |
| self._pump_queues.verify_queues_empty() |
| |
| self.logger.removeHandler(self._verbose_log_handler) |
| self._verbose_log_handler = None |
| TestBase.tearDown(self) |
| |
| def getLocalServerLogFile(self): |
| return self.log_basename + "-server.log" |
| |
| def setUpServerLogging(self, is_llgs): |
| if len(lldbtest_config.channels) == 0: |
| return # No logging requested |
| |
| if lldb.remote_platform: |
| log_file = lldbutil.join_remote_paths( |
| lldb.remote_platform.GetWorkingDirectory(), "server.log") |
| else: |
| log_file = self.getLocalServerLogFile() |
| |
| if is_llgs: |
| self.debug_monitor_extra_args.append("--log-file=" + log_file) |
| self.debug_monitor_extra_args.append( |
| "--log-channels={}".format(":".join(lldbtest_config.channels))) |
| else: |
| self.debug_monitor_extra_args = [ |
| "--log-file=" + log_file, "--log-flags=0x800000"] |
| |
| def get_next_port(self): |
| return 12000 + random.randint(0, 3999) |
| |
| def reset_test_sequence(self): |
| self.test_sequence = GdbRemoteTestSequence(self.logger) |
| |
| def create_named_pipe(self): |
| # Create a temp dir and name for a pipe. |
| temp_dir = tempfile.mkdtemp() |
| named_pipe_path = os.path.join(temp_dir, "stub_port_number") |
| |
| # Create the named pipe. |
| os.mkfifo(named_pipe_path) |
| |
| # Open the read side of the pipe in non-blocking mode. This will |
| # return right away, ready or not. |
| named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK) |
| |
| # Create the file for the named pipe. Note this will follow semantics of |
| # a non-blocking read side of a named pipe, which has different semantics |
| # than a named pipe opened for read in non-blocking mode. |
| named_pipe = os.fdopen(named_pipe_fd, "r") |
| self.assertIsNotNone(named_pipe) |
| |
| def shutdown_named_pipe(): |
| # Close the pipe. |
| try: |
| named_pipe.close() |
| except: |
| print("failed to close named pipe") |
| None |
| |
| # Delete the pipe. |
| try: |
| os.remove(named_pipe_path) |
| except: |
| print("failed to delete named pipe: {}".format(named_pipe_path)) |
| None |
| |
| # Delete the temp directory. |
| try: |
| os.rmdir(temp_dir) |
| except: |
| print( |
| "failed to delete temp dir: {}, directory contents: '{}'".format( |
| temp_dir, os.listdir(temp_dir))) |
| None |
| |
| # Add the shutdown hook to clean up the named pipe. |
| self.addTearDownHook(shutdown_named_pipe) |
| |
| # Clear the port so the stub selects a port number. |
| self.port = 0 |
| |
| return (named_pipe_path, named_pipe, named_pipe_fd) |
| |
| def get_stub_port_from_named_socket(self, read_timeout_seconds=5): |
| # Wait for something to read with a max timeout. |
| (ready_readers, _, _) = select.select( |
| [self.named_pipe_fd], [], [], read_timeout_seconds) |
| self.assertIsNotNone( |
| ready_readers, |
| "write side of pipe has not written anything - stub isn't writing to pipe.") |
| self.assertNotEqual( |
| len(ready_readers), |
| 0, |
| "write side of pipe has not written anything - stub isn't writing to pipe.") |
| |
| # Read the port from the named pipe. |
| stub_port_raw = self.named_pipe.read() |
| self.assertIsNotNone(stub_port_raw) |
| self.assertNotEqual( |
| len(stub_port_raw), |
| 0, |
| "no content to read on pipe") |
| |
| # Trim null byte, convert to int. |
| stub_port_raw = stub_port_raw[:-1] |
| stub_port = int(stub_port_raw) |
| self.assertTrue(stub_port > 0) |
| |
| return stub_port |
| |
| def init_llgs_test(self, use_named_pipe=True): |
| if lldb.remote_platform: |
| # Remote platforms don't support named pipe based port negotiation |
| use_named_pipe = False |
| |
| # Grab the ppid from /proc/[shell pid]/stat |
| err, retcode, shell_stat = self.run_platform_command( |
| "cat /proc/$$/stat") |
| self.assertTrue( |
| err.Success() and retcode == 0, |
| "Failed to read file /proc/$$/stat: %s, retcode: %d" % |
| (err.GetCString(), |
| retcode)) |
| |
| # [pid] ([executable]) [state] [*ppid*] |
| pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) |
| err, retcode, ls_output = self.run_platform_command( |
| "ls -l /proc/%s/exe" % pid) |
| self.assertTrue( |
| err.Success() and retcode == 0, |
| "Failed to read file /proc/%s/exe: %s, retcode: %d" % |
| (pid, |
| err.GetCString(), |
| retcode)) |
| exe = ls_output.split()[-1] |
| |
| # If the binary has been deleted, the link name has " (deleted)" appended. |
| # Remove if it's there. |
| self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe) |
| else: |
| self.debug_monitor_exe = get_lldb_server_exe() |
| if not self.debug_monitor_exe: |
| self.skipTest("lldb-server exe not found") |
| |
| self.debug_monitor_extra_args = ["gdbserver"] |
| self.setUpServerLogging(is_llgs=True) |
| |
| if use_named_pipe: |
| (self.named_pipe_path, self.named_pipe, |
| self.named_pipe_fd) = self.create_named_pipe() |
| |
| def init_debugserver_test(self, use_named_pipe=True): |
| self.debug_monitor_exe = get_debugserver_exe() |
| if not self.debug_monitor_exe: |
| self.skipTest("debugserver exe not found") |
| self.setUpServerLogging(is_llgs=False) |
| if use_named_pipe: |
| (self.named_pipe_path, self.named_pipe, |
| self.named_pipe_fd) = self.create_named_pipe() |
| # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification |
| # when the process truly dies. |
| self.stub_sends_two_stop_notifications_on_kill = True |
| |
| def forward_adb_port(self, source, target, direction, device): |
| adb = ['adb'] + (['-s', device] if device else []) + [direction] |
| |
| def remove_port_forward(): |
| subprocess.call(adb + ["--remove", "tcp:%d" % source]) |
| |
| subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) |
| self.addTearDownHook(remove_port_forward) |
| |
| def _verify_socket(self, sock): |
| # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the |
| # connect() attempt. However, due to the way how ADB forwarding works, on android targets |
| # the connect() will always be successful, but the connection will be immediately dropped |
| # if ADB could not connect on the remote side. This function tries to detect this |
| # situation, and report it as "connection refused" so that the upper layers attempt the |
| # connection again. |
| triple = self.dbg.GetSelectedPlatform().GetTriple() |
| if not re.match(".*-.*-.*-android", triple): |
| return # Not android. |
| can_read, _, _ = select.select([sock], [], [], 0.1) |
| if sock not in can_read: |
| return # Data is not available, but the connection is alive. |
| if len(sock.recv(1, socket.MSG_PEEK)) == 0: |
| raise _ConnectionRefused() # Got EOF, connection dropped. |
| |
| def create_socket(self): |
| sock = socket.socket() |
| logger = self.logger |
| |
| triple = self.dbg.GetSelectedPlatform().GetTriple() |
| if re.match(".*-.*-.*-android", triple): |
| self.forward_adb_port( |
| self.port, |
| self.port, |
| "forward", |
| self.stub_device) |
| |
| logger.info( |
| "Connecting to debug monitor on %s:%d", |
| self.stub_hostname, |
| self.port) |
| connect_info = (self.stub_hostname, self.port) |
| try: |
| sock.connect(connect_info) |
| except socket.error as serr: |
| if serr.errno == errno.ECONNREFUSED: |
| raise _ConnectionRefused() |
| raise serr |
| |
| def shutdown_socket(): |
| if sock: |
| try: |
| # send the kill packet so lldb-server shuts down gracefully |
| sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) |
| except: |
| logger.warning( |
| "failed to send kill packet to debug monitor: {}; ignoring".format( |
| sys.exc_info()[0])) |
| |
| try: |
| sock.close() |
| except: |
| logger.warning( |
| "failed to close socket to debug monitor: {}; ignoring".format( |
| sys.exc_info()[0])) |
| |
| self.addTearDownHook(shutdown_socket) |
| |
| self._verify_socket(sock) |
| |
| return sock |
| |
| def set_inferior_startup_launch(self): |
| self._inferior_startup = self._STARTUP_LAUNCH |
| |
| def set_inferior_startup_attach(self): |
| self._inferior_startup = self._STARTUP_ATTACH |
| |
| def set_inferior_startup_attach_manually(self): |
| self._inferior_startup = self._STARTUP_ATTACH_MANUALLY |
| |
| def get_debug_monitor_command_line_args(self, attach_pid=None): |
| if lldb.remote_platform: |
| commandline_args = self.debug_monitor_extra_args + \ |
| ["*:{}".format(self.port)] |
| else: |
| commandline_args = self.debug_monitor_extra_args + \ |
| ["127.0.0.1:{}".format(self.port)] |
| |
| if attach_pid: |
| commandline_args += ["--attach=%d" % attach_pid] |
| if self.named_pipe_path: |
| commandline_args += ["--named-pipe", self.named_pipe_path] |
| return commandline_args |
| |
| def launch_debug_monitor(self, attach_pid=None, logfile=None): |
| # Create the command line. |
| commandline_args = self.get_debug_monitor_command_line_args( |
| attach_pid=attach_pid) |
| |
| # Start the server. |
| server = self.spawnSubprocess( |
| self.debug_monitor_exe, |
| commandline_args, |
| install_remote=False) |
| self.addTearDownHook(self.cleanupSubprocesses) |
| self.assertIsNotNone(server) |
| |
| # If we're receiving the stub's listening port from the named pipe, do |
| # that here. |
| if self.named_pipe: |
| self.port = self.get_stub_port_from_named_socket() |
| |
| return server |
| |
| def connect_to_debug_monitor(self, attach_pid=None): |
| if self.named_pipe: |
| # Create the stub. |
| server = self.launch_debug_monitor(attach_pid=attach_pid) |
| self.assertIsNotNone(server) |
| |
| def shutdown_debug_monitor(): |
| try: |
| server.terminate() |
| except: |
| logger.warning( |
| "failed to terminate server for debug monitor: {}; ignoring".format( |
| sys.exc_info()[0])) |
| self.addTearDownHook(shutdown_debug_monitor) |
| |
| # Schedule debug monitor to be shut down during teardown. |
| logger = self.logger |
| |
| # Attach to the stub and return a socket opened to it. |
| self.sock = self.create_socket() |
| return server |
| |
| # We're using a random port algorithm to try not to collide with other ports, |
| # and retry a max # times. |
| attempts = 0 |
| MAX_ATTEMPTS = 20 |
| |
| while attempts < MAX_ATTEMPTS: |
| server = self.launch_debug_monitor(attach_pid=attach_pid) |
| |
| # Schedule debug monitor to be shut down during teardown. |
| logger = self.logger |
| |
| def shutdown_debug_monitor(): |
| try: |
| server.terminate() |
| except: |
| logger.warning( |
| "failed to terminate server for debug monitor: {}; ignoring".format( |
| sys.exc_info()[0])) |
| self.addTearDownHook(shutdown_debug_monitor) |
| |
| connect_attemps = 0 |
| MAX_CONNECT_ATTEMPTS = 10 |
| |
| while connect_attemps < MAX_CONNECT_ATTEMPTS: |
| # Create a socket to talk to the server |
| try: |
| logger.info("Connect attempt %d", connect_attemps + 1) |
| self.sock = self.create_socket() |
| return server |
| except _ConnectionRefused as serr: |
| # Ignore, and try again. |
| pass |
| time.sleep(0.5) |
| connect_attemps += 1 |
| |
| # We should close the server here to be safe. |
| server.terminate() |
| |
| # Increment attempts. |
| print( |
| "connect to debug monitor on port %d failed, attempt #%d of %d" % |
| (self.port, attempts + 1, MAX_ATTEMPTS)) |
| attempts += 1 |
| |
| # And wait a random length of time before next attempt, to avoid |
| # collisions. |
| time.sleep(random.randint(1, 5)) |
| |
| # Now grab a new port number. |
| self.port = self.get_next_port() |
| |
| raise Exception( |
| "failed to create a socket to the launched debug monitor after %d tries" % |
| attempts) |
| |
| def launch_process_for_attach( |
| self, |
| inferior_args=None, |
| sleep_seconds=3, |
| exe_path=None): |
| # We're going to start a child process that the debug monitor stub can later attach to. |
| # This process needs to be started so that it just hangs around for a while. We'll |
| # have it sleep. |
| if not exe_path: |
| exe_path = self.getBuildArtifact("a.out") |
| |
| args = [] |
| if inferior_args: |
| args.extend(inferior_args) |
| if sleep_seconds: |
| args.append("sleep:%d" % sleep_seconds) |
| |
| inferior = self.spawnSubprocess(exe_path, args) |
| |
| def shutdown_process_for_attach(): |
| try: |
| inferior.terminate() |
| except: |
| logger.warning( |
| "failed to terminate inferior process for attach: {}; ignoring".format( |
| sys.exc_info()[0])) |
| self.addTearDownHook(shutdown_process_for_attach) |
| return inferior |
| |
| def prep_debug_monitor_and_inferior( |
| self, |
| inferior_args=None, |
| inferior_sleep_seconds=3, |
| inferior_exe_path=None): |
| """Prep the debug monitor, the inferior, and the expected packet stream. |
| |
| Handle the separate cases of using the debug monitor in attach-to-inferior mode |
| and in launch-inferior mode. |
| |
| For attach-to-inferior mode, the inferior process is first started, then |
| the debug monitor is started in attach to pid mode (using --attach on the |
| stub command line), and the no-ack-mode setup is appended to the packet |
| stream. The packet stream is not yet executed, ready to have more expected |
| packet entries added to it. |
| |
| For launch-inferior mode, the stub is first started, then no ack mode is |
| setup on the expected packet stream, then the verified launch packets are added |
| to the expected socket stream. The packet stream is not yet executed, ready |
| to have more expected packet entries added to it. |
| |
| The return value is: |
| {inferior:<inferior>, server:<server>} |
| """ |
| inferior = None |
| attach_pid = None |
| |
| if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: |
| # Launch the process that we'll use as the inferior. |
| inferior = self.launch_process_for_attach( |
| inferior_args=inferior_args, |
| sleep_seconds=inferior_sleep_seconds, |
| exe_path=inferior_exe_path) |
| self.assertIsNotNone(inferior) |
| self.assertTrue(inferior.pid > 0) |
| if self._inferior_startup == self._STARTUP_ATTACH: |
| # In this case, we want the stub to attach via the command |
| # line, so set the command line attach pid here. |
| attach_pid = inferior.pid |
| |
| if self._inferior_startup == self._STARTUP_LAUNCH: |
| # Build launch args |
| if not inferior_exe_path: |
| inferior_exe_path = self.getBuildArtifact("a.out") |
| |
| if lldb.remote_platform: |
| remote_path = lldbutil.append_to_process_working_directory(self, |
| os.path.basename(inferior_exe_path)) |
| remote_file_spec = lldb.SBFileSpec(remote_path, False) |
| err = lldb.remote_platform.Install(lldb.SBFileSpec( |
| inferior_exe_path, True), remote_file_spec) |
| if err.Fail(): |
| raise Exception( |
| "remote_platform.Install('%s', '%s') failed: %s" % |
| (inferior_exe_path, remote_path, err)) |
| inferior_exe_path = remote_path |
| |
| launch_args = [inferior_exe_path] |
| if inferior_args: |
| launch_args.extend(inferior_args) |
| |
| # Launch the debug monitor stub, attaching to the inferior. |
| server = self.connect_to_debug_monitor(attach_pid=attach_pid) |
| self.assertIsNotNone(server) |
| |
| # Build the expected protocol stream |
| self.add_no_ack_remote_stream() |
| if self._inferior_startup == self._STARTUP_LAUNCH: |
| self.add_verified_launch_packets(launch_args) |
| |
| return {"inferior": inferior, "server": server} |
| |
| def expect_socket_recv( |
| self, |
| sock, |
| expected_content_regex, |
| timeout_seconds): |
| response = "" |
| timeout_time = time.time() + timeout_seconds |
| |
| while not expected_content_regex.match( |
| response) and time.time() < timeout_time: |
| can_read, _, _ = select.select([sock], [], [], timeout_seconds) |
| if can_read and sock in can_read: |
| recv_bytes = sock.recv(4096) |
| if recv_bytes: |
| response += recv_bytes |
| |
| self.assertTrue(expected_content_regex.match(response)) |
| |
| def expect_socket_send(self, sock, content, timeout_seconds): |
| request_bytes_remaining = content |
| timeout_time = time.time() + timeout_seconds |
| |
| while len(request_bytes_remaining) > 0 and time.time() < timeout_time: |
| _, can_write, _ = select.select([], [sock], [], timeout_seconds) |
| if can_write and sock in can_write: |
| written_byte_count = sock.send(request_bytes_remaining) |
| request_bytes_remaining = request_bytes_remaining[ |
| written_byte_count:] |
| self.assertEqual(len(request_bytes_remaining), 0) |
| |
| def do_handshake(self, stub_socket, timeout_seconds=5): |
| # Write the ack. |
| self.expect_socket_send(stub_socket, "+", timeout_seconds) |
| |
| # Send the start no ack mode packet. |
| NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0" |
| bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST) |
| self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST)) |
| |
| # Receive the ack and "OK" |
| self.expect_socket_recv(stub_socket, re.compile( |
| r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds) |
| |
| # Send the final ack. |
| self.expect_socket_send(stub_socket, "+", timeout_seconds) |
| |
| def add_no_ack_remote_stream(self): |
| self.test_sequence.add_log_lines( |
| ["read packet: +", |
| "read packet: $QStartNoAckMode#b0", |
| "send packet: +", |
| "send packet: $OK#9a", |
| "read packet: +"], |
| True) |
| |
| def add_verified_launch_packets(self, launch_args): |
| self.test_sequence.add_log_lines( |
| ["read packet: %s" % build_gdbremote_A_packet(launch_args), |
| "send packet: $OK#00", |
| "read packet: $qLaunchSuccess#a5", |
| "send packet: $OK#00"], |
| True) |
| |
| def add_thread_suffix_request_packets(self): |
| self.test_sequence.add_log_lines( |
| ["read packet: $QThreadSuffixSupported#e4", |
| "send packet: $OK#00", |
| ], True) |
| |
| def add_process_info_collection_packets(self): |
| self.test_sequence.add_log_lines( |
| ["read packet: $qProcessInfo#dc", |
| {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], |
| True) |
| |
| _KNOWN_PROCESS_INFO_KEYS = [ |
| "pid", |
| "parent-pid", |
| "real-uid", |
| "real-gid", |
| "effective-uid", |
| "effective-gid", |
| "cputype", |
| "cpusubtype", |
| "ostype", |
| "triple", |
| "vendor", |
| "endian", |
| "elf_abi", |
| "ptrsize" |
| ] |
| |
| def parse_process_info_response(self, context): |
| # Ensure we have a process info response. |
| self.assertIsNotNone(context) |
| process_info_raw = context.get("process_info_raw") |
| self.assertIsNotNone(process_info_raw) |
| |
| # Pull out key:value; pairs. |
| process_info_dict = { |
| match.group(1): match.group(2) for match in re.finditer( |
| r"([^:]+):([^;]+);", process_info_raw)} |
| |
| # Validate keys are known. |
| for (key, val) in list(process_info_dict.items()): |
| self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) |
| self.assertIsNotNone(val) |
| |
| return process_info_dict |
| |
| def add_register_info_collection_packets(self): |
| self.test_sequence.add_log_lines( |
| [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, |
| "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), |
| "save_key": "reg_info_responses"}], |
| True) |
| |
| def parse_register_info_packets(self, context): |
| """Return an array of register info dictionaries, one per register info.""" |
| reg_info_responses = context.get("reg_info_responses") |
| self.assertIsNotNone(reg_info_responses) |
| |
| # Parse register infos. |
| return [parse_reg_info_response(reg_info_response) |
| for reg_info_response in reg_info_responses] |
| |
| def expect_gdbremote_sequence(self, timeout_seconds=None): |
| if not timeout_seconds: |
| timeout_seconds = self._TIMEOUT_SECONDS |
| return expect_lldb_gdbserver_replay( |
| self, |
| self.sock, |
| self.test_sequence, |
| self._pump_queues, |
| timeout_seconds, |
| self.logger) |
| |
| _KNOWN_REGINFO_KEYS = [ |
| "name", |
| "alt-name", |
| "bitsize", |
| "offset", |
| "encoding", |
| "format", |
| "set", |
| "gcc", |
| "ehframe", |
| "dwarf", |
| "generic", |
| "container-regs", |
| "invalidate-regs", |
| "dynamic_size_dwarf_expr_bytes", |
| "dynamic_size_dwarf_len" |
| ] |
| |
| def assert_valid_reg_info(self, reg_info): |
| # Assert we know about all the reginfo keys parsed. |
| for key in reg_info: |
| self.assertTrue(key in self._KNOWN_REGINFO_KEYS) |
| |
| # Check the bare-minimum expected set of register info keys. |
| self.assertTrue("name" in reg_info) |
| self.assertTrue("bitsize" in reg_info) |
| self.assertTrue("offset" in reg_info) |
| self.assertTrue("encoding" in reg_info) |
| self.assertTrue("format" in reg_info) |
| |
| def find_pc_reg_info(self, reg_infos): |
| lldb_reg_index = 0 |
| for reg_info in reg_infos: |
| if ("generic" in reg_info) and (reg_info["generic"] == "pc"): |
| return (lldb_reg_index, reg_info) |
| lldb_reg_index += 1 |
| |
| return (None, None) |
| |
| def add_lldb_register_index(self, reg_infos): |
| """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. |
| |
| We'll use this when we want to call packets like P/p with a register index but do so |
| on only a subset of the full register info set. |
| """ |
| self.assertIsNotNone(reg_infos) |
| |
| reg_index = 0 |
| for reg_info in reg_infos: |
| reg_info["lldb_register_index"] = reg_index |
| reg_index += 1 |
| |
| def add_query_memory_region_packets(self, address): |
| self.test_sequence.add_log_lines( |
| ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), |
| {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], |
| True) |
| |
| def parse_key_val_dict(self, key_val_text, allow_dupes=True): |
| self.assertIsNotNone(key_val_text) |
| kv_dict = {} |
| for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): |
| key = match.group(1) |
| val = match.group(2) |
| if key in kv_dict: |
| if allow_dupes: |
| if isinstance(kv_dict[key], list): |
| kv_dict[key].append(val) |
| else: |
| # Promote to list |
| kv_dict[key] = [kv_dict[key], val] |
| else: |
| self.fail( |
| "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( |
| key, val, key_val_text, kv_dict)) |
| else: |
| kv_dict[key] = val |
| return kv_dict |
| |
| def parse_memory_region_packet(self, context): |
| # Ensure we have a context. |
| self.assertIsNotNone(context.get("memory_region_response")) |
| |
| # Pull out key:value; pairs. |
| mem_region_dict = self.parse_key_val_dict( |
| context.get("memory_region_response")) |
| |
| # Validate keys are known. |
| for (key, val) in list(mem_region_dict.items()): |
| self.assertTrue( |
| key in [ |
| "start", |
| "size", |
| "permissions", |
| "name", |
| "error"]) |
| self.assertIsNotNone(val) |
| |
| # Return the dictionary of key-value pairs for the memory region. |
| return mem_region_dict |
| |
| def assert_address_within_memory_region( |
| self, test_address, mem_region_dict): |
| self.assertIsNotNone(mem_region_dict) |
| self.assertTrue("start" in mem_region_dict) |
| self.assertTrue("size" in mem_region_dict) |
| |
| range_start = int(mem_region_dict["start"], 16) |
| range_size = int(mem_region_dict["size"], 16) |
| range_end = range_start + range_size |
| |
| if test_address < range_start: |
| self.fail( |
| "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( |
| test_address, |
| range_start, |
| range_end, |
| range_size)) |
| elif test_address >= range_end: |
| self.fail( |
| "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( |
| test_address, |
| range_start, |
| range_end, |
| range_size)) |
| |
| def add_threadinfo_collection_packets(self): |
| self.test_sequence.add_log_lines( |
| [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", |
| "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), |
| "save_key": "threadinfo_responses"}], |
| True) |
| |
| def parse_threadinfo_packets(self, context): |
| """Return an array of thread ids (decimal ints), one per thread.""" |
| threadinfo_responses = context.get("threadinfo_responses") |
| self.assertIsNotNone(threadinfo_responses) |
| |
| thread_ids = [] |
| for threadinfo_response in threadinfo_responses: |
| new_thread_infos = parse_threadinfo_response(threadinfo_response) |
| thread_ids.extend(new_thread_infos) |
| return thread_ids |
| |
| def wait_for_thread_count(self, thread_count, timeout_seconds=3): |
| start_time = time.time() |
| timeout_time = start_time + timeout_seconds |
| |
| actual_thread_count = 0 |
| while actual_thread_count < thread_count: |
| self.reset_test_sequence() |
| self.add_threadinfo_collection_packets() |
| |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| |
| threads = self.parse_threadinfo_packets(context) |
| self.assertIsNotNone(threads) |
| |
| actual_thread_count = len(threads) |
| |
| if time.time() > timeout_time: |
| raise Exception( |
| 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format( |
| timeout_seconds, thread_count, actual_thread_count)) |
| |
| return threads |
| |
| def add_set_breakpoint_packets( |
| self, |
| address, |
| z_packet_type=0, |
| do_continue=True, |
| breakpoint_kind=1): |
| self.test_sequence.add_log_lines( |
| [ # Set the breakpoint. |
| "read packet: $Z{2},{0:x},{1}#00".format( |
| address, breakpoint_kind, z_packet_type), |
| # Verify the stub could set it. |
| "send packet: $OK#00", |
| ], True) |
| |
| if (do_continue): |
| self.test_sequence.add_log_lines( |
| [ # Continue the inferior. |
| "read packet: $c#63", |
| # Expect a breakpoint stop report. |
| {"direction": "send", |
| "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", |
| "capture": {1: "stop_signo", |
| 2: "stop_thread_id"}}, |
| ], True) |
| |
| def add_remove_breakpoint_packets( |
| self, |
| address, |
| z_packet_type=0, |
| breakpoint_kind=1): |
| self.test_sequence.add_log_lines( |
| [ # Remove the breakpoint. |
| "read packet: $z{2},{0:x},{1}#00".format( |
| address, breakpoint_kind, z_packet_type), |
| # Verify the stub could unset it. |
| "send packet: $OK#00", |
| ], True) |
| |
| def add_qSupported_packets(self): |
| self.test_sequence.add_log_lines( |
| ["read packet: $qSupported#00", |
| {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, |
| ], True) |
| |
| _KNOWN_QSUPPORTED_STUB_FEATURES = [ |
| "augmented-libraries-svr4-read", |
| "PacketSize", |
| "QStartNoAckMode", |
| "QThreadSuffixSupported", |
| "QListThreadsInStopReply", |
| "qXfer:auxv:read", |
| "qXfer:libraries:read", |
| "qXfer:libraries-svr4:read", |
| "qXfer:features:read", |
| "qEcho", |
| "QPassSignals" |
| ] |
| |
| def parse_qSupported_response(self, context): |
| self.assertIsNotNone(context) |
| |
| raw_response = context.get("qSupported_response") |
| self.assertIsNotNone(raw_response) |
| |
| # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the |
| # +,-,? is stripped from the key and set as the value. |
| supported_dict = {} |
| for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): |
| key = match.group(1) |
| val = match.group(3) |
| |
| # key=val: store as is |
| if val and len(val) > 0: |
| supported_dict[key] = val |
| else: |
| if len(key) < 2: |
| raise Exception( |
| "singular stub feature is too short: must be stub_feature{+,-,?}") |
| supported_type = key[-1] |
| key = key[:-1] |
| if not supported_type in ["+", "-", "?"]: |
| raise Exception( |
| "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) |
| supported_dict[key] = supported_type |
| # Ensure we know the supported element |
| if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: |
| raise Exception( |
| "unknown qSupported stub feature reported: %s" % |
| key) |
| |
| return supported_dict |
| |
| def run_process_then_stop(self, run_seconds=1): |
| # Tell the stub to continue. |
| self.test_sequence.add_log_lines( |
| ["read packet: $vCont;c#a8"], |
| True) |
| context = self.expect_gdbremote_sequence() |
| |
| # Wait for run_seconds. |
| time.sleep(run_seconds) |
| |
| # Send an interrupt, capture a T response. |
| self.reset_test_sequence() |
| self.test_sequence.add_log_lines( |
| ["read packet: {}".format(chr(3)), |
| {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}], |
| True) |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| self.assertIsNotNone(context.get("stop_result")) |
| |
| return context |
| |
| def select_modifiable_register(self, reg_infos): |
| """Find a register that can be read/written freely.""" |
| PREFERRED_REGISTER_NAMES = set(["rax", ]) |
| |
| # First check for the first register from the preferred register name |
| # set. |
| alternative_register_index = None |
| |
| self.assertIsNotNone(reg_infos) |
| for reg_info in reg_infos: |
| if ("name" in reg_info) and ( |
| reg_info["name"] in PREFERRED_REGISTER_NAMES): |
| # We found a preferred register. Use it. |
| return reg_info["lldb_register_index"] |
| if ("generic" in reg_info) and (reg_info["generic"] == "fp" or |
| reg_info["generic"] == "arg1"): |
| # A frame pointer or first arg register will do as a |
| # register to modify temporarily. |
| alternative_register_index = reg_info["lldb_register_index"] |
| |
| # We didn't find a preferred register. Return whatever alternative register |
| # we found, if any. |
| return alternative_register_index |
| |
| def extract_registers_from_stop_notification(self, stop_key_vals_text): |
| self.assertIsNotNone(stop_key_vals_text) |
| kv_dict = self.parse_key_val_dict(stop_key_vals_text) |
| |
| registers = {} |
| for (key, val) in list(kv_dict.items()): |
| if re.match(r"^[0-9a-fA-F]+$", key): |
| registers[int(key, 16)] = val |
| return registers |
| |
| def gather_register_infos(self): |
| self.reset_test_sequence() |
| self.add_register_info_collection_packets() |
| |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| |
| reg_infos = self.parse_register_info_packets(context) |
| self.assertIsNotNone(reg_infos) |
| self.add_lldb_register_index(reg_infos) |
| |
| return reg_infos |
| |
| def find_generic_register_with_name(self, reg_infos, generic_name): |
| self.assertIsNotNone(reg_infos) |
| for reg_info in reg_infos: |
| if ("generic" in reg_info) and ( |
| reg_info["generic"] == generic_name): |
| return reg_info |
| return None |
| |
| def decode_gdbremote_binary(self, encoded_bytes): |
| decoded_bytes = "" |
| i = 0 |
| while i < len(encoded_bytes): |
| if encoded_bytes[i] == "}": |
| # Handle escaped char. |
| self.assertTrue(i + 1 < len(encoded_bytes)) |
| decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) |
| i += 2 |
| elif encoded_bytes[i] == "*": |
| # Handle run length encoding. |
| self.assertTrue(len(decoded_bytes) > 0) |
| self.assertTrue(i + 1 < len(encoded_bytes)) |
| repeat_count = ord(encoded_bytes[i + 1]) - 29 |
| decoded_bytes += decoded_bytes[-1] * repeat_count |
| i += 2 |
| else: |
| decoded_bytes += encoded_bytes[i] |
| i += 1 |
| return decoded_bytes |
| |
| def build_auxv_dict(self, endian, word_size, auxv_data): |
| self.assertIsNotNone(endian) |
| self.assertIsNotNone(word_size) |
| self.assertIsNotNone(auxv_data) |
| |
| auxv_dict = {} |
| |
| # PowerPC64le's auxvec has a special key that must be ignored. |
| # This special key may be used multiple times, resulting in |
| # multiple key/value pairs with the same key, which would otherwise |
| # break this test check for repeated keys. |
| # |
| # AT_IGNOREPPC = 22 |
| ignored_keys_for_arch = { 'powerpc64le' : [22] } |
| arch = self.getArchitecture() |
| ignore_keys = None |
| if arch in ignored_keys_for_arch: |
| ignore_keys = ignored_keys_for_arch[arch] |
| |
| while len(auxv_data) > 0: |
| # Chop off key. |
| raw_key = auxv_data[:word_size] |
| auxv_data = auxv_data[word_size:] |
| |
| # Chop of value. |
| raw_value = auxv_data[:word_size] |
| auxv_data = auxv_data[word_size:] |
| |
| # Convert raw text from target endian. |
| key = unpack_endian_binary_string(endian, raw_key) |
| value = unpack_endian_binary_string(endian, raw_value) |
| |
| if ignore_keys and key in ignore_keys: |
| continue |
| |
| # Handle ending entry. |
| if key == 0: |
| self.assertEqual(value, 0) |
| return auxv_dict |
| |
| # The key should not already be present. |
| self.assertFalse(key in auxv_dict) |
| auxv_dict[key] = value |
| |
| self.fail( |
| "should not reach here - implies required double zero entry not found") |
| return auxv_dict |
| |
| def read_binary_data_in_chunks(self, command_prefix, chunk_length): |
| """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" |
| offset = 0 |
| done = False |
| decoded_data = "" |
| |
| while not done: |
| # Grab the next iteration of data. |
| self.reset_test_sequence() |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: ${}{:x},{:x}:#00".format( |
| command_prefix, |
| offset, |
| chunk_length), |
| { |
| "direction": "send", |
| "regex": re.compile( |
| r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", |
| re.MULTILINE | re.DOTALL), |
| "capture": { |
| 1: "response_type", |
| 2: "content_raw"}}], |
| True) |
| |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| |
| response_type = context.get("response_type") |
| self.assertIsNotNone(response_type) |
| self.assertTrue(response_type in ["l", "m"]) |
| |
| # Move offset along. |
| offset += chunk_length |
| |
| # Figure out if we're done. We're done if the response type is l. |
| done = response_type == "l" |
| |
| # Decode binary data. |
| content_raw = context.get("content_raw") |
| if content_raw and len(content_raw) > 0: |
| self.assertIsNotNone(content_raw) |
| decoded_data += self.decode_gdbremote_binary(content_raw) |
| return decoded_data |
| |
| def add_interrupt_packets(self): |
| self.test_sequence.add_log_lines([ |
| # Send the intterupt. |
| "read packet: {}".format(chr(3)), |
| # And wait for the stop notification. |
| {"direction": "send", |
| "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", |
| "capture": {1: "stop_signo", |
| 2: "stop_key_val_text"}}, |
| ], True) |
| |
| def parse_interrupt_packets(self, context): |
| self.assertIsNotNone(context.get("stop_signo")) |
| self.assertIsNotNone(context.get("stop_key_val_text")) |
| return (int(context["stop_signo"], 16), self.parse_key_val_dict( |
| context["stop_key_val_text"])) |
| |
| def add_QSaveRegisterState_packets(self, thread_id): |
| if thread_id: |
| # Use the thread suffix form. |
| request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( |
| thread_id) |
| else: |
| request = "read packet: $QSaveRegisterState#00" |
| |
| self.test_sequence.add_log_lines([request, |
| {"direction": "send", |
| "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", |
| "capture": {1: "save_response"}}, |
| ], |
| True) |
| |
| def parse_QSaveRegisterState_response(self, context): |
| self.assertIsNotNone(context) |
| |
| save_response = context.get("save_response") |
| self.assertIsNotNone(save_response) |
| |
| if len(save_response) < 1 or save_response[0] == "E": |
| # error received |
| return (False, None) |
| else: |
| return (True, int(save_response)) |
| |
| def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): |
| if thread_id: |
| # Use the thread suffix form. |
| request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( |
| save_id, thread_id) |
| else: |
| request = "read packet: $QRestoreRegisterState:{}#00".format( |
| save_id) |
| |
| self.test_sequence.add_log_lines([ |
| request, |
| "send packet: $OK#00" |
| ], True) |
| |
| def flip_all_bits_in_each_register_value( |
| self, reg_infos, endian, thread_id=None): |
| self.assertIsNotNone(reg_infos) |
| |
| successful_writes = 0 |
| failed_writes = 0 |
| |
| for reg_info in reg_infos: |
| # Use the lldb register index added to the reg info. We're not necessarily |
| # working off a full set of register infos, so an inferred register |
| # index could be wrong. |
| reg_index = reg_info["lldb_register_index"] |
| self.assertIsNotNone(reg_index) |
| |
| reg_byte_size = int(reg_info["bitsize"]) / 8 |
| self.assertTrue(reg_byte_size > 0) |
| |
| # Handle thread suffix. |
| if thread_id: |
| p_request = "read packet: $p{:x};thread:{:x}#00".format( |
| reg_index, thread_id) |
| else: |
| p_request = "read packet: $p{:x}#00".format(reg_index) |
| |
| # Read the existing value. |
| self.reset_test_sequence() |
| self.test_sequence.add_log_lines([ |
| p_request, |
| {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, |
| ], True) |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| |
| # Verify the response length. |
| p_response = context.get("p_response") |
| self.assertIsNotNone(p_response) |
| initial_reg_value = unpack_register_hex_unsigned( |
| endian, p_response) |
| |
| # Flip the value by xoring with all 1s |
| all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8) |
| flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) |
| # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) |
| |
| # Handle thread suffix for P. |
| if thread_id: |
| P_request = "read packet: $P{:x}={};thread:{:x}#00".format( |
| reg_index, pack_register_hex( |
| endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) |
| else: |
| P_request = "read packet: $P{:x}={}#00".format( |
| reg_index, pack_register_hex( |
| endian, flipped_bits_int, byte_size=reg_byte_size)) |
| |
| # Write the flipped value to the register. |
| self.reset_test_sequence() |
| self.test_sequence.add_log_lines([P_request, |
| {"direction": "send", |
| "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", |
| "capture": {1: "P_response"}}, |
| ], |
| True) |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| |
| # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail |
| # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them |
| # all flipping perfectly. |
| P_response = context.get("P_response") |
| self.assertIsNotNone(P_response) |
| if P_response == "OK": |
| successful_writes += 1 |
| else: |
| failed_writes += 1 |
| # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) |
| |
| # Read back the register value, ensure it matches the flipped |
| # value. |
| if P_response == "OK": |
| self.reset_test_sequence() |
| self.test_sequence.add_log_lines([ |
| p_request, |
| {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, |
| ], True) |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| |
| verify_p_response_raw = context.get("p_response") |
| self.assertIsNotNone(verify_p_response_raw) |
| verify_bits = unpack_register_hex_unsigned( |
| endian, verify_p_response_raw) |
| |
| if verify_bits != flipped_bits_int: |
| # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. |
| # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) |
| successful_writes -= 1 |
| failed_writes += 1 |
| |
| return (successful_writes, failed_writes) |
| |
| def is_bit_flippable_register(self, reg_info): |
| if not reg_info: |
| return False |
| if not "set" in reg_info: |
| return False |
| if reg_info["set"] != "General Purpose Registers": |
| return False |
| if ("container-regs" in reg_info) and ( |
| len(reg_info["container-regs"]) > 0): |
| # Don't try to bit flip registers contained in another register. |
| return False |
| if re.match("^.s$", reg_info["name"]): |
| # This is a 2-letter register name that ends in "s", like a segment register. |
| # Don't try to bit flip these. |
| return False |
| if re.match("^(c|)psr$", reg_info["name"]): |
| # This is an ARM program status register; don't flip it. |
| return False |
| # Okay, this looks fine-enough. |
| return True |
| |
| def read_register_values(self, reg_infos, endian, thread_id=None): |
| self.assertIsNotNone(reg_infos) |
| values = {} |
| |
| for reg_info in reg_infos: |
| # We append a register index when load reg infos so we can work |
| # with subsets. |
| reg_index = reg_info.get("lldb_register_index") |
| self.assertIsNotNone(reg_index) |
| |
| # Handle thread suffix. |
| if thread_id: |
| p_request = "read packet: $p{:x};thread:{:x}#00".format( |
| reg_index, thread_id) |
| else: |
| p_request = "read packet: $p{:x}#00".format(reg_index) |
| |
| # Read it with p. |
| self.reset_test_sequence() |
| self.test_sequence.add_log_lines([ |
| p_request, |
| {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, |
| ], True) |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| |
| # Convert value from target endian to integral. |
| p_response = context.get("p_response") |
| self.assertIsNotNone(p_response) |
| self.assertTrue(len(p_response) > 0) |
| self.assertFalse(p_response[0] == "E") |
| |
| values[reg_index] = unpack_register_hex_unsigned( |
| endian, p_response) |
| |
| return values |
| |
| def add_vCont_query_packets(self): |
| self.test_sequence.add_log_lines(["read packet: $vCont?#49", |
| {"direction": "send", |
| "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", |
| "capture": {2: "vCont_query_response"}}, |
| ], |
| True) |
| |
| def parse_vCont_query_response(self, context): |
| self.assertIsNotNone(context) |
| vCont_query_response = context.get("vCont_query_response") |
| |
| # Handle case of no vCont support at all - in which case the capture |
| # group will be none or zero length. |
| if not vCont_query_response or len(vCont_query_response) == 0: |
| return {} |
| |
| return {key: 1 for key in vCont_query_response.split( |
| ";") if key and len(key) > 0} |
| |
| def count_single_steps_until_true( |
| self, |
| thread_id, |
| predicate, |
| args, |
| max_step_count=100, |
| use_Hc_packet=True, |
| step_instruction="s"): |
| """Used by single step test that appears in a few different contexts.""" |
| single_step_count = 0 |
| |
| while single_step_count < max_step_count: |
| self.assertIsNotNone(thread_id) |
| |
| # Build the packet for the single step instruction. We replace |
| # {thread}, if present, with the thread_id. |
| step_packet = "read packet: ${}#00".format( |
| re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) |
| # print("\nstep_packet created: {}\n".format(step_packet)) |
| |
| # Single step. |
| self.reset_test_sequence() |
| if use_Hc_packet: |
| self.test_sequence.add_log_lines( |
| [ # Set the continue thread. |
| "read packet: $Hc{0:x}#00".format(thread_id), |
| "send packet: $OK#00", |
| ], True) |
| self.test_sequence.add_log_lines([ |
| # Single step. |
| step_packet, |
| # "read packet: $vCont;s:{0:x}#00".format(thread_id), |
| # Expect a breakpoint stop report. |
| {"direction": "send", |
| "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", |
| "capture": {1: "stop_signo", |
| 2: "stop_thread_id"}}, |
| ], True) |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| self.assertIsNotNone(context.get("stop_signo")) |
| self.assertEqual(int(context.get("stop_signo"), 16), |
| lldbutil.get_signal_number('SIGTRAP')) |
| |
| single_step_count += 1 |
| |
| # See if the predicate is true. If so, we're done. |
| if predicate(args): |
| return (True, single_step_count) |
| |
| # The predicate didn't return true within the runaway step count. |
| return (False, single_step_count) |
| |
| def g_c1_c2_contents_are(self, args): |
| """Used by single step test that appears in a few different contexts.""" |
| g_c1_address = args["g_c1_address"] |
| g_c2_address = args["g_c2_address"] |
| expected_g_c1 = args["expected_g_c1"] |
| expected_g_c2 = args["expected_g_c2"] |
| |
| # Read g_c1 and g_c2 contents. |
| self.reset_test_sequence() |
| self.test_sequence.add_log_lines( |
| ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), |
| {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, |
| "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), |
| {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], |
| True) |
| |
| # Run the packet stream. |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| |
| # Check if what we read from inferior memory is what we are expecting. |
| self.assertIsNotNone(context.get("g_c1_contents")) |
| self.assertIsNotNone(context.get("g_c2_contents")) |
| |
| return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and ( |
| context.get("g_c2_contents").decode("hex") == expected_g_c2) |
| |
| def single_step_only_steps_one_instruction( |
| self, use_Hc_packet=True, step_instruction="s"): |
| """Used by single step test that appears in a few different contexts.""" |
| # Start up the inferior. |
| procs = self.prep_debug_monitor_and_inferior( |
| inferior_args=[ |
| "get-code-address-hex:swap_chars", |
| "get-data-address-hex:g_c1", |
| "get-data-address-hex:g_c2", |
| "sleep:1", |
| "call-function:swap_chars", |
| "sleep:5"]) |
| |
| # Run the process |
| self.test_sequence.add_log_lines( |
| [ # Start running after initial stop. |
| "read packet: $c#63", |
| # Match output line that prints the memory address of the function call entry point. |
| # Note we require launch-only testing so we can get inferior otuput. |
| {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", |
| "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, |
| # Now stop the inferior. |
| "read packet: {}".format(chr(3)), |
| # And wait for the stop notification. |
| {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], |
| True) |
| |
| # Run the packet stream. |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| |
| # Grab the main thread id. |
| self.assertIsNotNone(context.get("stop_thread_id")) |
| main_thread_id = int(context.get("stop_thread_id"), 16) |
| |
| # Grab the function address. |
| self.assertIsNotNone(context.get("function_address")) |
| function_address = int(context.get("function_address"), 16) |
| |
| # Grab the data addresses. |
| self.assertIsNotNone(context.get("g_c1_address")) |
| g_c1_address = int(context.get("g_c1_address"), 16) |
| |
| self.assertIsNotNone(context.get("g_c2_address")) |
| g_c2_address = int(context.get("g_c2_address"), 16) |
| |
| # Set a breakpoint at the given address. |
| if self.getArchitecture() == "arm": |
| # TODO: Handle case when setting breakpoint in thumb code |
| BREAKPOINT_KIND = 4 |
| else: |
| BREAKPOINT_KIND = 1 |
| self.reset_test_sequence() |
| self.add_set_breakpoint_packets( |
| function_address, |
| do_continue=True, |
| breakpoint_kind=BREAKPOINT_KIND) |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| |
| # Remove the breakpoint. |
| self.reset_test_sequence() |
| self.add_remove_breakpoint_packets( |
| function_address, breakpoint_kind=BREAKPOINT_KIND) |
| context = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(context) |
| |
| # Verify g_c1 and g_c2 match expected initial state. |
| args = {} |
| args["g_c1_address"] = g_c1_address |
| args["g_c2_address"] = g_c2_address |
| args["expected_g_c1"] = "0" |
| args["expected_g_c2"] = "1" |
| |
| self.assertTrue(self.g_c1_c2_contents_are(args)) |
| |
| # Verify we take only a small number of steps to hit the first state. |
| # Might need to work through function entry prologue code. |
| args["expected_g_c1"] = "1" |
| args["expected_g_c2"] = "1" |
| (state_reached, |
| step_count) = self.count_single_steps_until_true(main_thread_id, |
| self.g_c1_c2_contents_are, |
| args, |
| max_step_count=25, |
| use_Hc_packet=use_Hc_packet, |
| step_instruction=step_instruction) |
| self.assertTrue(state_reached) |
| |
| # Verify we hit the next state. |
| args["expected_g_c1"] = "1" |
| args["expected_g_c2"] = "0" |
| (state_reached, |
| step_count) = self.count_single_steps_until_true(main_thread_id, |
| self.g_c1_c2_contents_are, |
| args, |
| max_step_count=5, |
| use_Hc_packet=use_Hc_packet, |
| step_instruction=step_instruction) |
| self.assertTrue(state_reached) |
| expected_step_count = 1 |
| arch = self.getArchitecture() |
| |
| # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation |
| # of variable value |
| if re.match("mips", arch): |
| expected_step_count = 3 |
| # S390X requires "2" (LARL, MVI) machine instructions for updation of |
| # variable value |
| if re.match("s390x", arch): |
| expected_step_count = 2 |
| self.assertEqual(step_count, expected_step_count) |
| |
| # Verify we hit the next state. |
| args["expected_g_c1"] = "0" |
| args["expected_g_c2"] = "0" |
| (state_reached, |
| step_count) = self.count_single_steps_until_true(main_thread_id, |
| self.g_c1_c2_contents_are, |
| args, |
| max_step_count=5, |
| use_Hc_packet=use_Hc_packet, |
| step_instruction=step_instruction) |
| self.assertTrue(state_reached) |
| self.assertEqual(step_count, expected_step_count) |
| |
| # Verify we hit the next state. |
| args["expected_g_c1"] = "0" |
| args["expected_g_c2"] = "1" |
| (state_reached, |
| step_count) = self.count_single_steps_until_true(main_thread_id, |
| self.g_c1_c2_contents_are, |
| args, |
| max_step_count=5, |
| use_Hc_packet=use_Hc_packet, |
| step_instruction=step_instruction) |
| self.assertTrue(state_reached) |
| self.assertEqual(step_count, expected_step_count) |
| |
| def maybe_strict_output_regex(self, regex): |
| return '.*' + regex + \ |
| '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' |
| |
| def install_and_create_launch_args(self): |
| exe_path = self.getBuildArtifact("a.out") |
| if not lldb.remote_platform: |
| return [exe_path] |
| remote_path = lldbutil.append_to_process_working_directory(self, |
| os.path.basename(exe_path)) |
| remote_file_spec = lldb.SBFileSpec(remote_path, False) |
| err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), |
| remote_file_spec) |
| if err.Fail(): |
| raise Exception("remote_platform.Install('%s', '%s') failed: %s" % |
| (exe_path, remote_path, err)) |
| return [remote_path] |