blob: 2cf3f032d87cf807285742aab27c573e9027b6d4 [file] [log] [blame]
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses as dc
from urllib.parse import urlparse
from typing import List, Optional
from perfetto.trace_processor.http import TraceProcessorHttp
from perfetto.trace_processor.platform import PlatformDelegate
from perfetto.trace_processor.protos import ProtoFactory
from perfetto.trace_processor.shell import load_shell
from perfetto.trace_uri_resolver import registry
from perfetto.trace_uri_resolver.registry import ResolverRegistry
# Defining this field as a module variable means this can be changed by
# implementations at startup and used for all TraceProcessor objects
# without having to specify on each one.
# In Google3, this field is rewritten using Copybara to a implementation
# which can integrates with internal infra.
PLATFORM_DELEGATE = PlatformDelegate
TraceReference = registry.TraceReference
# Custom exception raised if any trace_processor functions return a
# response with an error defined
class TraceProcessorException(Exception):
def __init__(self, message):
super().__init__(message)
@dc.dataclass
class TraceProcessorConfig:
bin_path: Optional[str]
unique_port: bool
verbose: bool
ingest_ftrace_in_raw: bool
enable_dev_features: bool
resolver_registry: Optional[ResolverRegistry]
def __init__(self,
bin_path: Optional[str] = None,
unique_port: bool = True,
verbose: bool = False,
ingest_ftrace_in_raw: bool = False,
enable_dev_features=False,
resolver_registry: Optional[ResolverRegistry] = None):
self.bin_path = bin_path
self.unique_port = unique_port
self.verbose = verbose
self.ingest_ftrace_in_raw = ingest_ftrace_in_raw
self.enable_dev_features = enable_dev_features
self.resolver_registry = resolver_registry
class TraceProcessor:
# Values of these constants correspond to the QueryResponse message at
# protos/perfetto/trace_processor/trace_processor.proto
QUERY_CELL_INVALID_FIELD_ID = 0
QUERY_CELL_NULL_FIELD_ID = 1
QUERY_CELL_VARINT_FIELD_ID = 2
QUERY_CELL_FLOAT64_FIELD_ID = 3
QUERY_CELL_STRING_FIELD_ID = 4
QUERY_CELL_BLOB_FIELD_ID = 5
# This is the class returned to the user and contains one row of the
# resultant query. Each column name is stored as an attribute of this
# class, with the value corresponding to the column name and row in
# the query results table.
class Row(object):
# Required for pytype to correctly infer attributes from Row objects
_HAS_DYNAMIC_ATTRIBUTES = True
def __str__(self):
return str(self.__dict__)
def __repr__(self):
return self.__dict__
class QueryResultIterator:
def __init__(self, column_names, batches):
self.__column_names = list(column_names)
self.__column_count = 0
self.__count = 0
self.__cells = []
self.__data_lists = [[], [], [], [], [], []]
self.__data_lists_index = [0, 0, 0, 0, 0, 0]
self.__current_index = 0
# Iterate over all the batches and collect their
# contents into lists based on the type of the batch
batch_index = 0
while True:
# It's possible on some occasions that there are non UTF-8 characters
# in the string_cells field. If this is the case, string_cells is
# a bytestring which needs to be decoded (but passing ignore so that
# we don't fail in decoding).
strings_batch_str = batches[batch_index].string_cells
try:
strings_batch_str = strings_batch_str.decode('utf-8', 'ignore')
except AttributeError:
# AttributeError can occur when |strings_batch_str| is an str which
# happens when everything in it is UTF-8 (protobuf automatically
# does the conversion if it can).
pass
# Null-terminated strings in a batch are concatenated
# into a single large byte array, so we split on the
# null-terminator to get the individual strings
strings_batch = strings_batch_str.split('\0')[:-1]
self.__data_lists[TraceProcessor.QUERY_CELL_STRING_FIELD_ID].extend(
strings_batch)
self.__data_lists[TraceProcessor.QUERY_CELL_VARINT_FIELD_ID].extend(
batches[batch_index].varint_cells)
self.__data_lists[TraceProcessor.QUERY_CELL_FLOAT64_FIELD_ID].extend(
batches[batch_index].float64_cells)
self.__data_lists[TraceProcessor.QUERY_CELL_BLOB_FIELD_ID].extend(
batches[batch_index].blob_cells)
self.__cells.extend(batches[batch_index].cells)
if batches[batch_index].is_last_batch:
break
batch_index += 1
# If there are no rows in the query result, don't bother updating the
# counts to avoid dealing with / 0 errors.
if len(self.__cells) == 0:
return
# The count we collected so far was a count of all individual columns
# in the query result, so we divide by the number of columns in a row
# to get the number of rows
self.__column_count = len(self.__column_names)
self.__count = int(len(self.__cells) / self.__column_count)
# Data integrity check - see that we have the expected amount of cells
# for the number of rows that we need to return
if len(self.__cells) % self.__column_count != 0:
raise TraceProcessorException("Cell count " + str(len(self.__cells)) +
" is not a multiple of column count " +
str(len(self.__column_names)))
# To use the query result as a populated Pandas dataframe, this
# function must be called directly after calling query inside
# TraceProcesor.
def as_pandas_dataframe(self):
try:
import pandas as pd
# Populate the dataframe with the query results
rows = []
for i in range(0, self.__count):
row = []
base_cell_index = i * self.__column_count
for num in range(len(self.__column_names)):
col_type = self.__cells[base_cell_index + num]
if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID:
raise TraceProcessorException('Invalid cell type')
if col_type == TraceProcessor.QUERY_CELL_NULL_FIELD_ID:
row.append(None)
else:
col_index = self.__data_lists_index[col_type]
self.__data_lists_index[col_type] += 1
row.append(self.__data_lists[col_type][col_index])
rows.append(row)
df = pd.DataFrame(rows, columns=self.__column_names)
return df.astype(object).where(df.notnull(),
None).reset_index(drop=True)
except ModuleNotFoundError:
raise TraceProcessorException(
'Python dependencies missing. Please pip3 install pandas numpy')
def __len__(self):
return self.__count
def __iter__(self):
return self
def __next__(self):
if self.__current_index == self.__count:
raise StopIteration
result = TraceProcessor.Row()
base_cell_index = self.__current_index * self.__column_count
for num, column_name in enumerate(self.__column_names):
col_type = self.__cells[base_cell_index + num]
if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID:
raise TraceProcessorException('Invalid cell type')
if col_type != TraceProcessor.QUERY_CELL_NULL_FIELD_ID:
col_index = self.__data_lists_index[col_type]
self.__data_lists_index[col_type] += 1
setattr(result, column_name, self.__data_lists[col_type][col_index])
else:
setattr(result, column_name, None)
self.__current_index += 1
return result
def __init__(self,
trace: Optional[TraceReference] = None,
addr: Optional[str] = None,
config: TraceProcessorConfig = TraceProcessorConfig(),
file_path: Optional[str] = None):
"""Create a trace processor instance.
Args:
trace: reference to a trace to be loaded into the trace
processor instance.
One of several types is supported:
1) path to a trace file to open and read
2) a file like object (file, io.BytesIO or similar) to read
3) a generator yielding bytes
4) a trace URI which resolves to one of the above types
5) a trace URI resolver; this is a subclass of
resolver.TraceUriResolver which generates a reference to a
trace when the |resolve| method is called on it.
An URI is similar to a connection string (e.g. for a web
address or SQL database) which specifies where to lookup traces
and which traces to pick from this data source. The format of a
string should be as follows:
resolver_name:key_1=list,of,values;key_2=value
Custom resolvers can be provided to handle URIs via
|config.resolver_registry|.
addr: address of a running trace processor instance. Useful to query an
already loaded trace.
config: configuration options which customize functionality of trace
processor and the Python binding.
file_path (deprecated): path to a trace file to load. Use
|trace| instead of this field: specifying both will cause
an exception to be thrown.
"""
if trace and file_path:
raise TraceProcessorException(
"trace and file_path cannot both be specified.")
self.config = config
self.platform_delegate = PLATFORM_DELEGATE()
self.protos = ProtoFactory(self.platform_delegate)
self.resolver_registry = config.resolver_registry or \
self.platform_delegate.default_resolver_registry()
self.http = self._create_tp_http(addr)
if trace or file_path:
try:
self._parse_trace(trace if trace else file_path)
except TraceProcessorException as ex:
self.close()
raise ex
def query(self, sql: str):
"""Executes passed in SQL query using class defined HTTP API, and returns
the response as a QueryResultIterator. Raises TraceProcessorException if
the response returns with an error.
Args:
sql: SQL query written as a String
Returns:
A class which can iterate through each row of the results table. This
can also be converted to a pandas dataframe by calling the
as_pandas_dataframe() function after calling query.
"""
response = self.http.execute_query(sql)
if response.error:
raise TraceProcessorException(response.error)
return TraceProcessor.QueryResultIterator(response.column_names,
response.batch)
def metric(self, metrics: List[str]):
"""Returns the metrics data corresponding to the passed in trace metric.
Raises TraceProcessorException if the response returns with an error.
Args:
metrics: A list of valid metrics as defined in TraceMetrics
Returns:
The metrics data as a proto message
"""
response = self.http.compute_metric(metrics)
if response.error:
raise TraceProcessorException(response.error)
metrics = self.protos.TraceMetrics()
metrics.ParseFromString(response.metrics)
return metrics
def enable_metatrace(self):
"""Enable metatrace for the currently running trace_processor.
"""
return self.http.enable_metatrace()
def disable_and_read_metatrace(self):
"""Disable and return the metatrace formed from the currently running
trace_processor. This must be enabled before attempting to disable. This
returns the serialized bytes of the metatrace data directly. Raises
TraceProcessorException if the response returns with an error.
"""
response = self.http.disable_and_read_metatrace()
if response.error:
raise TraceProcessorException(response.error)
return response.metatrace
def _create_tp_http(self, addr: str) -> TraceProcessorHttp:
if addr:
p = urlparse(addr)
parsed = p.netloc if p.netloc else p.path
return TraceProcessorHttp(parsed, protos=self.protos)
url, self.subprocess = load_shell(self.config.bin_path,
self.config.unique_port,
self.config.verbose,
self.config.ingest_ftrace_in_raw,
self.config.enable_dev_features,
self.platform_delegate)
return TraceProcessorHttp(url, protos=self.protos)
def _parse_trace(self, trace: TraceReference):
resolved_lst = self.resolver_registry.resolve(trace)
if not resolved_lst:
raise TraceProcessorException(
'trace argument did not resolve to a trace.')
if len(resolved_lst) > 1:
raise TraceProcessorException(
'trace argument resolved to more than one trace. Trace processor '
'only supports loading a single trace; please use '
'BatchTraceProcessor to operate on multiple traces.')
resolved = resolved_lst[0]
for chunk in resolved.generator:
result = self.http.parse(chunk)
if result.error:
raise TraceProcessorException(
f'Failed while parsing trace. Error message: {result.error}')
self.http.notify_eof()
def __enter__(self):
return self
def __exit__(self, a, b, c):
del a, b, c # Unused.
self.close()
return False
def close(self):
if hasattr(self, 'subprocess'):
self.subprocess.kill()
self.subprocess.wait()
if hasattr(self, 'http'):
self.http.conn.close()
def __del__(self):
self.close()