| # Copyright (C) 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import dataclasses as dc |
| from urllib.parse import urlparse |
| from typing import List, Optional |
| |
| from perfetto.trace_processor.http import TraceProcessorHttp |
| from perfetto.trace_processor.platform import PlatformDelegate |
| from perfetto.trace_processor.protos import ProtoFactory |
| from perfetto.trace_processor.shell import load_shell |
| from perfetto.trace_uri_resolver import registry |
| from perfetto.trace_uri_resolver.registry import ResolverRegistry |
| |
| # Defining this field as a module variable means this can be changed by |
| # implementations at startup and used for all TraceProcessor objects |
| # without having to specify on each one. |
| # In Google3, this field is rewritten using Copybara to a implementation |
| # which can integrates with internal infra. |
| PLATFORM_DELEGATE = PlatformDelegate |
| |
| TraceReference = registry.TraceReference |
| |
| # Custom exception raised if any trace_processor functions return a |
| # response with an error defined |
| class TraceProcessorException(Exception): |
| |
| def __init__(self, message): |
| super().__init__(message) |
| |
| |
| @dc.dataclass |
| class TraceProcessorConfig: |
| bin_path: Optional[str] |
| unique_port: bool |
| verbose: bool |
| ingest_ftrace_in_raw: bool |
| enable_dev_features: bool |
| resolver_registry: Optional[ResolverRegistry] |
| |
| def __init__(self, |
| bin_path: Optional[str] = None, |
| unique_port: bool = True, |
| verbose: bool = False, |
| ingest_ftrace_in_raw: bool = False, |
| enable_dev_features=False, |
| resolver_registry: Optional[ResolverRegistry] = None): |
| self.bin_path = bin_path |
| self.unique_port = unique_port |
| self.verbose = verbose |
| self.ingest_ftrace_in_raw = ingest_ftrace_in_raw |
| self.enable_dev_features = enable_dev_features |
| self.resolver_registry = resolver_registry |
| |
| |
| class TraceProcessor: |
| |
| # Values of these constants correspond to the QueryResponse message at |
| # protos/perfetto/trace_processor/trace_processor.proto |
| QUERY_CELL_INVALID_FIELD_ID = 0 |
| QUERY_CELL_NULL_FIELD_ID = 1 |
| QUERY_CELL_VARINT_FIELD_ID = 2 |
| QUERY_CELL_FLOAT64_FIELD_ID = 3 |
| QUERY_CELL_STRING_FIELD_ID = 4 |
| QUERY_CELL_BLOB_FIELD_ID = 5 |
| |
| # This is the class returned to the user and contains one row of the |
| # resultant query. Each column name is stored as an attribute of this |
| # class, with the value corresponding to the column name and row in |
| # the query results table. |
| class Row(object): |
| # Required for pytype to correctly infer attributes from Row objects |
| _HAS_DYNAMIC_ATTRIBUTES = True |
| |
| def __str__(self): |
| return str(self.__dict__) |
| |
| def __repr__(self): |
| return self.__dict__ |
| |
| class QueryResultIterator: |
| |
| def __init__(self, column_names, batches): |
| self.__column_names = list(column_names) |
| self.__column_count = 0 |
| self.__count = 0 |
| self.__cells = [] |
| self.__data_lists = [[], [], [], [], [], []] |
| self.__data_lists_index = [0, 0, 0, 0, 0, 0] |
| self.__current_index = 0 |
| |
| # Iterate over all the batches and collect their |
| # contents into lists based on the type of the batch |
| batch_index = 0 |
| while True: |
| # It's possible on some occasions that there are non UTF-8 characters |
| # in the string_cells field. If this is the case, string_cells is |
| # a bytestring which needs to be decoded (but passing ignore so that |
| # we don't fail in decoding). |
| strings_batch_str = batches[batch_index].string_cells |
| try: |
| strings_batch_str = strings_batch_str.decode('utf-8', 'ignore') |
| except AttributeError: |
| # AttributeError can occur when |strings_batch_str| is an str which |
| # happens when everything in it is UTF-8 (protobuf automatically |
| # does the conversion if it can). |
| pass |
| |
| # Null-terminated strings in a batch are concatenated |
| # into a single large byte array, so we split on the |
| # null-terminator to get the individual strings |
| strings_batch = strings_batch_str.split('\0')[:-1] |
| self.__data_lists[TraceProcessor.QUERY_CELL_STRING_FIELD_ID].extend( |
| strings_batch) |
| self.__data_lists[TraceProcessor.QUERY_CELL_VARINT_FIELD_ID].extend( |
| batches[batch_index].varint_cells) |
| self.__data_lists[TraceProcessor.QUERY_CELL_FLOAT64_FIELD_ID].extend( |
| batches[batch_index].float64_cells) |
| self.__data_lists[TraceProcessor.QUERY_CELL_BLOB_FIELD_ID].extend( |
| batches[batch_index].blob_cells) |
| self.__cells.extend(batches[batch_index].cells) |
| |
| if batches[batch_index].is_last_batch: |
| break |
| batch_index += 1 |
| |
| # If there are no rows in the query result, don't bother updating the |
| # counts to avoid dealing with / 0 errors. |
| if len(self.__cells) == 0: |
| return |
| |
| # The count we collected so far was a count of all individual columns |
| # in the query result, so we divide by the number of columns in a row |
| # to get the number of rows |
| self.__column_count = len(self.__column_names) |
| self.__count = int(len(self.__cells) / self.__column_count) |
| |
| # Data integrity check - see that we have the expected amount of cells |
| # for the number of rows that we need to return |
| if len(self.__cells) % self.__column_count != 0: |
| raise TraceProcessorException("Cell count " + str(len(self.__cells)) + |
| " is not a multiple of column count " + |
| str(len(self.__column_names))) |
| |
| # To use the query result as a populated Pandas dataframe, this |
| # function must be called directly after calling query inside |
| # TraceProcesor. |
| def as_pandas_dataframe(self): |
| try: |
| import pandas as pd |
| |
| # Populate the dataframe with the query results |
| rows = [] |
| for i in range(0, self.__count): |
| row = [] |
| base_cell_index = i * self.__column_count |
| for num in range(len(self.__column_names)): |
| col_type = self.__cells[base_cell_index + num] |
| if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID: |
| raise TraceProcessorException('Invalid cell type') |
| |
| if col_type == TraceProcessor.QUERY_CELL_NULL_FIELD_ID: |
| row.append(None) |
| else: |
| col_index = self.__data_lists_index[col_type] |
| self.__data_lists_index[col_type] += 1 |
| row.append(self.__data_lists[col_type][col_index]) |
| rows.append(row) |
| |
| df = pd.DataFrame(rows, columns=self.__column_names) |
| return df.astype(object).where(df.notnull(), |
| None).reset_index(drop=True) |
| |
| except ModuleNotFoundError: |
| raise TraceProcessorException( |
| 'Python dependencies missing. Please pip3 install pandas numpy') |
| |
| def __len__(self): |
| return self.__count |
| |
| def __iter__(self): |
| return self |
| |
| def __next__(self): |
| if self.__current_index == self.__count: |
| raise StopIteration |
| result = TraceProcessor.Row() |
| base_cell_index = self.__current_index * self.__column_count |
| for num, column_name in enumerate(self.__column_names): |
| col_type = self.__cells[base_cell_index + num] |
| if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID: |
| raise TraceProcessorException('Invalid cell type') |
| if col_type != TraceProcessor.QUERY_CELL_NULL_FIELD_ID: |
| col_index = self.__data_lists_index[col_type] |
| self.__data_lists_index[col_type] += 1 |
| setattr(result, column_name, self.__data_lists[col_type][col_index]) |
| else: |
| setattr(result, column_name, None) |
| |
| self.__current_index += 1 |
| return result |
| |
| def __init__(self, |
| trace: Optional[TraceReference] = None, |
| addr: Optional[str] = None, |
| config: TraceProcessorConfig = TraceProcessorConfig(), |
| file_path: Optional[str] = None): |
| """Create a trace processor instance. |
| |
| Args: |
| trace: reference to a trace to be loaded into the trace |
| processor instance. |
| |
| One of several types is supported: |
| 1) path to a trace file to open and read |
| 2) a file like object (file, io.BytesIO or similar) to read |
| 3) a generator yielding bytes |
| 4) a trace URI which resolves to one of the above types |
| 5) a trace URI resolver; this is a subclass of |
| resolver.TraceUriResolver which generates a reference to a |
| trace when the |resolve| method is called on it. |
| |
| An URI is similar to a connection string (e.g. for a web |
| address or SQL database) which specifies where to lookup traces |
| and which traces to pick from this data source. The format of a |
| string should be as follows: |
| resolver_name:key_1=list,of,values;key_2=value |
| |
| Custom resolvers can be provided to handle URIs via |
| |config.resolver_registry|. |
| addr: address of a running trace processor instance. Useful to query an |
| already loaded trace. |
| config: configuration options which customize functionality of trace |
| processor and the Python binding. |
| file_path (deprecated): path to a trace file to load. Use |
| |trace| instead of this field: specifying both will cause |
| an exception to be thrown. |
| """ |
| |
| if trace and file_path: |
| raise TraceProcessorException( |
| "trace and file_path cannot both be specified.") |
| |
| self.config = config |
| self.platform_delegate = PLATFORM_DELEGATE() |
| self.protos = ProtoFactory(self.platform_delegate) |
| self.resolver_registry = config.resolver_registry or \ |
| self.platform_delegate.default_resolver_registry() |
| self.http = self._create_tp_http(addr) |
| |
| if trace or file_path: |
| try: |
| self._parse_trace(trace if trace else file_path) |
| except TraceProcessorException as ex: |
| self.close() |
| raise ex |
| |
| def query(self, sql: str): |
| """Executes passed in SQL query using class defined HTTP API, and returns |
| the response as a QueryResultIterator. Raises TraceProcessorException if |
| the response returns with an error. |
| |
| Args: |
| sql: SQL query written as a String |
| |
| Returns: |
| A class which can iterate through each row of the results table. This |
| can also be converted to a pandas dataframe by calling the |
| as_pandas_dataframe() function after calling query. |
| """ |
| response = self.http.execute_query(sql) |
| if response.error: |
| raise TraceProcessorException(response.error) |
| |
| return TraceProcessor.QueryResultIterator(response.column_names, |
| response.batch) |
| |
| def metric(self, metrics: List[str]): |
| """Returns the metrics data corresponding to the passed in trace metric. |
| Raises TraceProcessorException if the response returns with an error. |
| |
| Args: |
| metrics: A list of valid metrics as defined in TraceMetrics |
| |
| Returns: |
| The metrics data as a proto message |
| """ |
| response = self.http.compute_metric(metrics) |
| if response.error: |
| raise TraceProcessorException(response.error) |
| |
| metrics = self.protos.TraceMetrics() |
| metrics.ParseFromString(response.metrics) |
| return metrics |
| |
| def enable_metatrace(self): |
| """Enable metatrace for the currently running trace_processor. |
| """ |
| return self.http.enable_metatrace() |
| |
| def disable_and_read_metatrace(self): |
| """Disable and return the metatrace formed from the currently running |
| trace_processor. This must be enabled before attempting to disable. This |
| returns the serialized bytes of the metatrace data directly. Raises |
| TraceProcessorException if the response returns with an error. |
| """ |
| response = self.http.disable_and_read_metatrace() |
| if response.error: |
| raise TraceProcessorException(response.error) |
| |
| return response.metatrace |
| |
| def _create_tp_http(self, addr: str) -> TraceProcessorHttp: |
| if addr: |
| p = urlparse(addr) |
| parsed = p.netloc if p.netloc else p.path |
| return TraceProcessorHttp(parsed, protos=self.protos) |
| |
| url, self.subprocess = load_shell(self.config.bin_path, |
| self.config.unique_port, |
| self.config.verbose, |
| self.config.ingest_ftrace_in_raw, |
| self.config.enable_dev_features, |
| self.platform_delegate) |
| return TraceProcessorHttp(url, protos=self.protos) |
| |
| def _parse_trace(self, trace: TraceReference): |
| resolved_lst = self.resolver_registry.resolve(trace) |
| if not resolved_lst: |
| raise TraceProcessorException( |
| 'trace argument did not resolve to a trace.') |
| |
| if len(resolved_lst) > 1: |
| raise TraceProcessorException( |
| 'trace argument resolved to more than one trace. Trace processor ' |
| 'only supports loading a single trace; please use ' |
| 'BatchTraceProcessor to operate on multiple traces.') |
| |
| resolved = resolved_lst[0] |
| for chunk in resolved.generator: |
| result = self.http.parse(chunk) |
| if result.error: |
| raise TraceProcessorException( |
| f'Failed while parsing trace. Error message: {result.error}') |
| self.http.notify_eof() |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, a, b, c): |
| del a, b, c # Unused. |
| self.close() |
| return False |
| |
| def close(self): |
| if hasattr(self, 'subprocess'): |
| self.subprocess.kill() |
| self.subprocess.wait() |
| |
| if hasattr(self, 'http'): |
| self.http.conn.close() |
| |
| def __del__(self): |
| self.close() |