| # Copyright 2023 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from abc import ABC, abstractmethod |
| import logging |
| import subprocess |
| import threading |
| import time |
| import uuid |
| |
| from devil.utils import reraiser_thread |
| |
| |
| class ExpensiveLineTransformer(ABC): |
| def __init__(self, process_start_timeout, minimum_timeout, per_line_timeout): |
| self._process_start_timeout = process_start_timeout |
| self._minimum_timeout = minimum_timeout |
| self._per_line_timeout = per_line_timeout |
| self._started = False |
| # Allow only one thread to call TransformLines() at a time. |
| self._lock = threading.Lock() |
| # Ensure that only one thread attempts to kill self._proc in Close(). |
| self._close_lock = threading.Lock() |
| self._closed_called = False |
| # Assign to None so that attribute exists if Popen() throws. |
| self._proc = None |
| # Start process eagerly to hide start-up latency. |
| self._proc_start_time = None |
| |
| def start(self): |
| # delay the start of the process, to allow the initialization of the |
| # descendant classes first. |
| if self._started: |
| logging.error('%s: Trying to start an already started command', self.name) |
| return |
| |
| # Start process eagerly to hide start-up latency. |
| self._proc_start_time = time.time() |
| |
| if not self.command: |
| logging.error('%s: No command available', self.name) |
| return |
| |
| self._proc = subprocess.Popen(self.command, |
| bufsize=1, |
| stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, |
| universal_newlines=True, |
| close_fds=True) |
| self._started = True |
| |
| def IsClosed(self): |
| return (not self._started or self._closed_called |
| or self._proc.returncode is not None) |
| |
| def IsBusy(self): |
| return self._lock.locked() |
| |
| def IsReady(self): |
| return self._started and not self.IsClosed() and not self.IsBusy() |
| |
| def TransformLines(self, lines): |
| """Symbolizes names found in the given lines. |
| |
| If anything goes wrong (process crashes, timeout, etc), returns |lines|. |
| |
| Args: |
| lines: A list of strings without trailing newlines. |
| |
| Returns: |
| A list of strings without trailing newlines. |
| """ |
| if not lines: |
| return [] |
| |
| # symbolized output contain more lines than the input, as the symbolized |
| # stacktraces will be added. To account for the extra output lines, keep |
| # reading until this eof_line token is reached. Using a format that will |
| # be considered a "useful line" without modifying its output by |
| # third_party/android_platform/development/scripts/stack_core.py |
| eof_line = self.getEofLine() |
| out_lines = [] |
| |
| def _reader(): |
| while True: |
| line = self._proc.stdout.readline() |
| # Return an empty string at EOF (when stdin is closed). |
| if not line: |
| break |
| line = line[:-1] |
| if line == eof_line: |
| break |
| out_lines.append(line) |
| |
| if self.IsBusy(): |
| logging.warning('%s: Having to wait for transformation.', self.name) |
| |
| # Allow only one thread to operate at a time. |
| with self._lock: |
| if self.IsClosed(): |
| if self._started and not self._closed_called: |
| logging.warning('%s: Process exited with code=%d.', self.name, |
| self._proc.returncode) |
| self.Close() |
| return lines |
| |
| reader_thread = reraiser_thread.ReraiserThread(_reader) |
| reader_thread.start() |
| |
| try: |
| self._proc.stdin.write('\n'.join(lines)) |
| self._proc.stdin.write('\n{}\n'.format(eof_line)) |
| self._proc.stdin.flush() |
| time_since_proc_start = time.time() - self._proc_start_time |
| timeout = (max(0, self._process_start_timeout - time_since_proc_start) + |
| max(self._minimum_timeout, |
| len(lines) * self._per_line_timeout)) |
| reader_thread.join(timeout) |
| if self.IsClosed(): |
| logging.warning('%s: Close() called by another thread during join().', |
| self.name) |
| return lines |
| if reader_thread.is_alive(): |
| logging.error('%s: Timed out after %f seconds with input:', self.name, |
| timeout) |
| for l in lines: |
| logging.error(l) |
| logging.error(eof_line) |
| logging.error('%s: End of timed out input.', self.name) |
| logging.error('%s: Timed out output was:', self.name) |
| for l in out_lines: |
| logging.error(l) |
| logging.error('%s: End of timed out output.', self.name) |
| self.Close() |
| return lines |
| return out_lines |
| except IOError: |
| logging.exception('%s: Exception during transformation', self.name) |
| self.Close() |
| return lines |
| |
| def Close(self): |
| with self._close_lock: |
| needs_closing = not self.IsClosed() |
| self._closed_called = True |
| |
| if needs_closing: |
| self._proc.stdin.close() |
| self._proc.kill() |
| self._proc.wait() |
| |
| def __del__(self): |
| # self._proc is None when Popen() fails. |
| if not self._closed_called and self._proc: |
| logging.error('%s: Forgot to Close()', self.name) |
| self.Close() |
| |
| @property |
| @abstractmethod |
| def name(self): |
| ... |
| |
| @property |
| @abstractmethod |
| def command(self): |
| ... |
| |
| @staticmethod |
| def getEofLine(): |
| # Use a format that will be considered a "useful line" without modifying its |
| # output by third_party/android_platform/development/scripts/stack_core.py |
| return "Generic useful log header: \'{}\'".format(uuid.uuid4().hex) |
| |
| |
| class ExpensiveLineTransformerPool(ABC): |
| def __init__(self, max_restarts, pool_size, passthrough_on_failure): |
| self._max_restarts = max_restarts |
| self._pool = [self.CreateTransformer() for _ in range(pool_size)] |
| self._passthrough_on_failure = passthrough_on_failure |
| # Allow only one thread to select from the pool at a time. |
| self._lock = threading.Lock() |
| self._num_restarts = 0 |
| |
| def __enter__(self): |
| pass |
| |
| def __exit__(self, *args): |
| self.Close() |
| |
| def TransformLines(self, lines): |
| with self._lock: |
| assert self._pool, 'TransformLines() called on a closed Pool.' |
| |
| # transformation is broken. |
| if self._num_restarts == self._max_restarts: |
| if self._passthrough_on_failure: |
| return lines |
| raise Exception('%s is broken.' % self.name) |
| |
| # Restart any closed transformer. |
| for i, d in enumerate(self._pool): |
| if d.IsClosed(): |
| logging.warning('%s: Restarting closed instance.', self.name) |
| self._pool[i] = self.CreateTransformer() |
| self._num_restarts += 1 |
| if self._num_restarts == self._max_restarts: |
| logging.warning('%s: MAX_RESTARTS reached.', self.name) |
| if self._passthrough_on_failure: |
| return lines |
| raise Exception('%s is broken.' % self.name) |
| |
| selected = next((x for x in self._pool if x.IsReady()), self._pool[0]) |
| # Rotate the order so that next caller will not choose the same one. |
| self._pool.remove(selected) |
| self._pool.append(selected) |
| |
| return selected.TransformLines(lines) |
| |
| def Close(self): |
| with self._lock: |
| for d in self._pool: |
| d.Close() |
| self._pool = None |
| |
| @abstractmethod |
| def CreateTransformer(self): |
| ... |
| |
| @property |
| @abstractmethod |
| def name(self): |
| ... |