blob: 08cbe52a673eb4693a270984a1bc4695aaee2378 [file] [log] [blame]
# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from abc import ABC, abstractmethod
import logging
import subprocess
import threading
import time
import uuid
from devil.utils import reraiser_thread
class ExpensiveLineTransformer(ABC):
def __init__(self, process_start_timeout, minimum_timeout, per_line_timeout):
self._process_start_timeout = process_start_timeout
self._minimum_timeout = minimum_timeout
self._per_line_timeout = per_line_timeout
self._started = False
# Allow only one thread to call TransformLines() at a time.
self._lock = threading.Lock()
# Ensure that only one thread attempts to kill self._proc in Close().
self._close_lock = threading.Lock()
self._closed_called = False
# Assign to None so that attribute exists if Popen() throws.
self._proc = None
# Start process eagerly to hide start-up latency.
self._proc_start_time = None
def start(self):
# delay the start of the process, to allow the initialization of the
# descendant classes first.
if self._started:
logging.error('%s: Trying to start an already started command', self.name)
return
# Start process eagerly to hide start-up latency.
self._proc_start_time = time.time()
if not self.command:
logging.error('%s: No command available', self.name)
return
self._proc = subprocess.Popen(self.command,
bufsize=1,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
close_fds=True)
self._started = True
def IsClosed(self):
return (not self._started or self._closed_called
or self._proc.returncode is not None)
def IsBusy(self):
return self._lock.locked()
def IsReady(self):
return self._started and not self.IsClosed() and not self.IsBusy()
def TransformLines(self, lines):
"""Symbolizes names found in the given lines.
If anything goes wrong (process crashes, timeout, etc), returns |lines|.
Args:
lines: A list of strings without trailing newlines.
Returns:
A list of strings without trailing newlines.
"""
if not lines:
return []
# symbolized output contain more lines than the input, as the symbolized
# stacktraces will be added. To account for the extra output lines, keep
# reading until this eof_line token is reached. Using a format that will
# be considered a "useful line" without modifying its output by
# third_party/android_platform/development/scripts/stack_core.py
eof_line = self.getEofLine()
out_lines = []
def _reader():
while True:
line = self._proc.stdout.readline()
# Return an empty string at EOF (when stdin is closed).
if not line:
break
line = line[:-1]
if line == eof_line:
break
out_lines.append(line)
if self.IsBusy():
logging.warning('%s: Having to wait for transformation.', self.name)
# Allow only one thread to operate at a time.
with self._lock:
if self.IsClosed():
if self._started and not self._closed_called:
logging.warning('%s: Process exited with code=%d.', self.name,
self._proc.returncode)
self.Close()
return lines
reader_thread = reraiser_thread.ReraiserThread(_reader)
reader_thread.start()
try:
self._proc.stdin.write('\n'.join(lines))
self._proc.stdin.write('\n{}\n'.format(eof_line))
self._proc.stdin.flush()
time_since_proc_start = time.time() - self._proc_start_time
timeout = (max(0, self._process_start_timeout - time_since_proc_start) +
max(self._minimum_timeout,
len(lines) * self._per_line_timeout))
reader_thread.join(timeout)
if self.IsClosed():
logging.warning('%s: Close() called by another thread during join().',
self.name)
return lines
if reader_thread.is_alive():
logging.error('%s: Timed out after %f seconds with input:', self.name,
timeout)
for l in lines:
logging.error(l)
logging.error(eof_line)
logging.error('%s: End of timed out input.', self.name)
logging.error('%s: Timed out output was:', self.name)
for l in out_lines:
logging.error(l)
logging.error('%s: End of timed out output.', self.name)
self.Close()
return lines
return out_lines
except IOError:
logging.exception('%s: Exception during transformation', self.name)
self.Close()
return lines
def Close(self):
with self._close_lock:
needs_closing = not self.IsClosed()
self._closed_called = True
if needs_closing:
self._proc.stdin.close()
self._proc.kill()
self._proc.wait()
def __del__(self):
# self._proc is None when Popen() fails.
if not self._closed_called and self._proc:
logging.error('%s: Forgot to Close()', self.name)
self.Close()
@property
@abstractmethod
def name(self):
...
@property
@abstractmethod
def command(self):
...
@staticmethod
def getEofLine():
# Use a format that will be considered a "useful line" without modifying its
# output by third_party/android_platform/development/scripts/stack_core.py
return "Generic useful log header: \'{}\'".format(uuid.uuid4().hex)
class ExpensiveLineTransformerPool(ABC):
def __init__(self, max_restarts, pool_size, passthrough_on_failure):
self._max_restarts = max_restarts
self._pool = [self.CreateTransformer() for _ in range(pool_size)]
self._passthrough_on_failure = passthrough_on_failure
# Allow only one thread to select from the pool at a time.
self._lock = threading.Lock()
self._num_restarts = 0
def __enter__(self):
pass
def __exit__(self, *args):
self.Close()
def TransformLines(self, lines):
with self._lock:
assert self._pool, 'TransformLines() called on a closed Pool.'
# transformation is broken.
if self._num_restarts == self._max_restarts:
if self._passthrough_on_failure:
return lines
raise Exception('%s is broken.' % self.name)
# Restart any closed transformer.
for i, d in enumerate(self._pool):
if d.IsClosed():
logging.warning('%s: Restarting closed instance.', self.name)
self._pool[i] = self.CreateTransformer()
self._num_restarts += 1
if self._num_restarts == self._max_restarts:
logging.warning('%s: MAX_RESTARTS reached.', self.name)
if self._passthrough_on_failure:
return lines
raise Exception('%s is broken.' % self.name)
selected = next((x for x in self._pool if x.IsReady()), self._pool[0])
# Rotate the order so that next caller will not choose the same one.
self._pool.remove(selected)
self._pool.append(selected)
return selected.TransformLines(lines)
def Close(self):
with self._lock:
for d in self._pool:
d.Close()
self._pool = None
@abstractmethod
def CreateTransformer(self):
...
@property
@abstractmethod
def name(self):
...