| # Copyright 2020 the V8 project authors. All rights reserved. | 
 | # Use of this source code is governed by a BSD-style license that can be | 
 | # found in the LICENSE file. | 
 |  | 
 | import re | 
 | import socket | 
 | import struct | 
 | import subprocess | 
 | import time | 
 | import xml.etree.ElementTree | 
 | import sys | 
 |  | 
 | # Python2 has both int and long, Python3 only has int, which is the same as | 
 | # Python2 long. | 
 | try: | 
 |   long | 
 | except NameError: | 
 |   long = int | 
 |  | 
 | # Python2 has xrange and range. Python3 range is Python2 xrange. | 
 | try: | 
 |   xrange | 
 | except NameError: | 
 |   xrange = range | 
 |  | 
 | SOCKET_ADDR = ('localhost', 8765) | 
 |  | 
 | SIGTRAP = 5 | 
 | SIGSEGV = 11 | 
 | RETURNCODE_KILL = -9 | 
 |  | 
 | ARCH = 'wasm32' | 
 | REG_DEFS = { | 
 |     ARCH: [('pc', 'Q'), ], | 
 | } | 
 |  | 
 |  | 
 | def EnsurePortIsAvailable(addr=SOCKET_ADDR): | 
 |   # As a validity check, check that the TCP port is available by binding to it | 
 |   # ourselves (and then unbinding).  Otherwise, we could end up talking to an | 
 |   # old instance of the GDB stub that is still hanging around, or to some | 
 |   # unrelated service that uses the same port number. Of course, there is still | 
 |   # a race condition because an unrelated process could bind the port after we | 
 |   # unbind. | 
 |   sock = socket.socket() | 
 |   sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) | 
 |   sock.bind(addr) | 
 |   sock.close() | 
 |  | 
 | def RspChecksum(data): | 
 |   checksum = 0 | 
 |   for char in data: | 
 |     checksum = (checksum + ord(char)) & 0xff | 
 |   return checksum | 
 |  | 
 | class GdbRspConnection(object): | 
 |  | 
 |   def __init__(self, addr=SOCKET_ADDR): | 
 |     self._socket = self._Connect(addr) | 
 |  | 
 |   def _Connect(self, addr): | 
 |     # We have to poll because we do not know when the GDB stub has successfully | 
 |     # done bind() on the TCP port. This is inherently unreliable. | 
 |     timeout_in_seconds = 10 | 
 |     poll_time_in_seconds = 0.1 | 
 |     for i in xrange(int(timeout_in_seconds / poll_time_in_seconds)): | 
 |       # On Mac OS X, we have to create a new socket FD for each retry. | 
 |       sock = socket.socket() | 
 |       # Do not delay sending small packets. This significantly speeds up debug | 
 |       # stub tests. | 
 |       sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) | 
 |       try: | 
 |         sock.connect(addr) | 
 |       except socket.error: | 
 |         # Retry after a delay. | 
 |         time.sleep(poll_time_in_seconds) | 
 |       else: | 
 |         return sock | 
 |     raise Exception('Could not connect to the debug stub in %i seconds' | 
 |                     % timeout_in_seconds) | 
 |  | 
 |   def _GetReply(self): | 
 |     reply = '' | 
 |     message_finished = re.compile('#[0-9a-fA-F]{2}') | 
 |     while True: | 
 |       data = self._socket.recv(1024) | 
 |       if len(data) == 0: | 
 |         raise AssertionError('EOF on socket reached with ' | 
 |                              'incomplete reply message: %r' % reply) | 
 |       reply += data | 
 |       if message_finished.match(reply[-3:]): | 
 |         break | 
 |     match = re.match('\+?\$([^#]*)#([0-9a-fA-F]{2})$', reply) | 
 |     if match is None: | 
 |       raise AssertionError('Unexpected reply message: %r' % reply) | 
 |     reply_body = match.group(1) | 
 |     checksum = match.group(2) | 
 |     expected_checksum = '%02x' % RspChecksum(reply_body) | 
 |     if checksum != expected_checksum: | 
 |       raise AssertionError('Bad RSP checksum: %r != %r' % | 
 |                            (checksum, expected_checksum)) | 
 |     # Send acknowledgement. | 
 |     self._socket.send('+') | 
 |     return reply_body | 
 |  | 
 |   # Send an rsp message, but don't wait for or expect a reply. | 
 |   def RspSendOnly(self, data): | 
 |     msg = '$%s#%02x' % (data, RspChecksum(data)) | 
 |     return self._socket.send(msg) | 
 |  | 
 |   def RspRequest(self, data): | 
 |     self.RspSendOnly(data) | 
 |     reply = self._GetReply() | 
 |     return reply | 
 |  | 
 |   def Close(self): | 
 |     self._socket.close() | 
 |  | 
 |  | 
 | def PopenDebugStub(command): | 
 |   EnsurePortIsAvailable() | 
 |   return subprocess.Popen(command) | 
 |  | 
 |  | 
 | def KillProcess(process): | 
 |   if process.returncode is not None: | 
 |     # kill() won't work if we've already wait()'ed on the process. | 
 |     return | 
 |   try: | 
 |     process.kill() | 
 |   except OSError: | 
 |     if sys.platform == 'win32': | 
 |       # If process is already terminated, kill() throws | 
 |       # "WindowsError: [Error 5] Access is denied" on Windows. | 
 |       pass | 
 |     else: | 
 |       raise | 
 |   process.wait() | 
 |  | 
 |  | 
 | class LaunchDebugStub(object): | 
 |   def __init__(self, command): | 
 |     self._proc = PopenDebugStub(command) | 
 |  | 
 |   def __enter__(self): | 
 |     try: | 
 |       return GdbRspConnection() | 
 |     except: | 
 |       KillProcess(self._proc) | 
 |       raise | 
 |  | 
 |   def __exit__(self, exc_type, exc_value, traceback): | 
 |     KillProcess(self._proc) | 
 |  | 
 |  | 
 | def AssertEquals(x, y): | 
 |   if x != y: | 
 |     raise AssertionError('%r != %r' % (x, y)) | 
 |  | 
 | def DecodeHex(data): | 
 |   assert len(data) % 2 == 0, data | 
 |   return bytes(bytearray([int(data[index * 2 : (index + 1) * 2], 16) for index in xrange(len(data) // 2)])) | 
 |  | 
 | def EncodeHex(data): | 
 |   return ''.join('%02x' % ord(byte) for byte in data) | 
 |  | 
 | def DecodeUInt64Array(data): | 
 |   assert len(data) % 16 == 0, data | 
 |   result = [] | 
 |   for index in xrange(len(data) // 16): | 
 |     value = 0 | 
 |     for digit in xrange(7, -1, -1): | 
 |       value = value * 256 + int(data[index * 16 + digit * 2 : index * 16 + (digit + 1) * 2], 16) | 
 |     result.append(value) | 
 |   return result | 
 |  | 
 | def AssertReplySignal(reply, signal): | 
 |   AssertEquals(ParseThreadStopReply(reply)['signal'], signal) | 
 |  | 
 | def ParseThreadStopReply(reply): | 
 |   match = re.match('T([0-9a-f]{2})thread-pcs:([0-9a-f]+);thread:([0-9a-f]+);$', reply) | 
 |   if not match: | 
 |     raise AssertionError('Bad thread stop reply: %r' % reply) | 
 |   return {'signal': int(match.group(1), 16), | 
 |           'thread_pc': int(match.group(2), 16), | 
 |           'thread_id': int(match.group(3), 16)} | 
 |  | 
 | def CheckInstructionPtr(connection, expected_ip): | 
 |   ip_value = DecodeRegs(connection.RspRequest('g'))['pc'] | 
 |   AssertEquals(ip_value, expected_ip) | 
 |  | 
 | def DecodeRegs(reply): | 
 |   defs = REG_DEFS[ARCH] | 
 |   names = [reg_name for reg_name, reg_fmt in defs] | 
 |   fmt = ''.join([reg_fmt for reg_name, reg_fmt in defs]) | 
 |  | 
 |   values = struct.unpack_from(fmt, DecodeHex(reply)) | 
 |   return dict(zip(names, values)) | 
 |  | 
 | def GetLoadedModules(connection): | 
 |   modules = {} | 
 |   reply = connection.RspRequest('qXfer:libraries:read') | 
 |   AssertEquals(reply[0], 'l') | 
 |   library_list = xml.etree.ElementTree.fromstring(reply[1:]) | 
 |   AssertEquals(library_list.tag, 'library-list') | 
 |   for library in library_list: | 
 |     AssertEquals(library.tag, 'library') | 
 |     section = library.find('section') | 
 |     address = section.get('address') | 
 |     assert long(address) > 0 | 
 |     modules[long(address)] = library.get('name') | 
 |   return modules | 
 |  | 
 | def GetLoadedModuleAddress(connection): | 
 |   modules = GetLoadedModules(connection) | 
 |   assert len(modules) > 0 | 
 |   return modules.keys()[0] | 
 |  | 
 | def ReadCodeMemory(connection, address, size): | 
 |   reply = connection.RspRequest('m%x,%x' % (address, size)) | 
 |   assert not reply.startswith('E'), reply | 
 |   return DecodeHex(reply) |