blob: 3a31e234e645ef895ede8574c5bccc81a52bb291 [file] [log] [blame]
/*
* Copyright (C) 2017 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/tracing/core/trace_writer_impl.h"
#include <string.h>
#include <algorithm>
#include <type_traits>
#include <utility>
#include "perfetto/base/logging.h"
#include "perfetto/ext/base/thread_annotations.h"
#include "perfetto/protozero/message.h"
#include "perfetto/protozero/proto_utils.h"
#include "perfetto/protozero/root_message.h"
#include "perfetto/protozero/static_buffer.h"
#include "src/tracing/core/shared_memory_arbiter_impl.h"
#include "protos/perfetto/trace/trace_packet.pbzero.h"
using protozero::proto_utils::kMessageLengthFieldSize;
using protozero::proto_utils::WriteRedundantVarInt;
using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
namespace perfetto {
namespace {
constexpr size_t kPacketHeaderSize = SharedMemoryABI::kPacketHeaderSize;
uint8_t g_garbage_chunk[1024];
} // namespace
TraceWriterImpl::TraceWriterImpl(SharedMemoryArbiterImpl* shmem_arbiter,
WriterID id,
MaybeUnboundBufferID target_buffer,
BufferExhaustedPolicy buffer_exhausted_policy)
: shmem_arbiter_(shmem_arbiter),
id_(id),
target_buffer_(target_buffer),
buffer_exhausted_policy_(buffer_exhausted_policy),
protobuf_stream_writer_(this),
process_id_(base::GetProcessId()) {
// TODO(primiano): we could handle the case of running out of TraceWriterID(s)
// more gracefully and always return a no-op TracePacket in NewTracePacket().
PERFETTO_CHECK(id_ != 0);
cur_packet_.reset(new protozero::RootMessage<protos::pbzero::TracePacket>());
cur_packet_->Finalize(); // To avoid the CHECK in NewTracePacket().
}
TraceWriterImpl::~TraceWriterImpl() {
if (cur_chunk_.is_valid()) {
cur_packet_->Finalize();
Flush();
}
// This call may cause the shared memory arbiter (and the underlying memory)
// to get asynchronously deleted if this was the last trace writer targeting
// the arbiter and the arbiter was marked for shutdown.
shmem_arbiter_->ReleaseWriterID(id_);
}
void TraceWriterImpl::Flush(std::function<void()> callback) {
// Flush() cannot be called in the middle of a TracePacket.
PERFETTO_CHECK(cur_packet_->is_finalized());
if (cur_chunk_.is_valid()) {
shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
&patch_list_);
} else {
// When in stall mode, all patches should have been returned with the last
// chunk, since the last packet was completed. In drop_packets_ mode, this
// may not be the case because the packet may have been fragmenting when
// SMB exhaustion occurred and |cur_chunk_| became invalid. In this case,
// drop_packets_ should be true.
PERFETTO_DCHECK(patch_list_.empty() || drop_packets_);
}
// Always issue the Flush request, even if there is nothing to flush, just
// for the sake of getting the callback posted back.
shmem_arbiter_->FlushPendingCommitDataRequests(callback);
protobuf_stream_writer_.Reset({nullptr, nullptr});
// |last_packet_size_field_| might have pointed into the chunk we returned.
last_packet_size_field_ = nullptr;
}
TraceWriterImpl::TracePacketHandle TraceWriterImpl::NewTracePacket() {
// If we hit this, the caller is calling NewTracePacket() without having
// finalized the previous packet.
PERFETTO_CHECK(cur_packet_->is_finalized());
// If we hit this, this trace writer was created in a different process. This
// likely means that the process forked while tracing was active, and the
// forked child process tried to emit a trace event. This is not supported, as
// it would lead to two processes writing to the same tracing SMB.
PERFETTO_DCHECK(process_id_ == base::GetProcessId());
fragmenting_packet_ = false;
// Reserve space for the size of the message. Note: this call might re-enter
// into this class invoking GetNewBuffer() if there isn't enough space or if
// this is the very first call to NewTracePacket().
static_assert(kPacketHeaderSize == kMessageLengthFieldSize,
"The packet header must match the Message header size");
bool was_dropping_packets = drop_packets_;
// It doesn't make sense to begin a packet that is going to fragment
// immediately after (8 is just an arbitrary estimation on the minimum size of
// a realistic packet).
bool chunk_too_full =
protobuf_stream_writer_.bytes_available() < kPacketHeaderSize + 8;
if (chunk_too_full || reached_max_packets_per_chunk_ ||
retry_new_chunk_after_packet_) {
protobuf_stream_writer_.Reset(GetNewBuffer());
}
// Send any completed patches to the service to facilitate trace data
// recovery by the service. This should only happen when we're completing
// the first packet in a chunk which was a continuation from the previous
// chunk, i.e. at most once per chunk.
if (!patch_list_.empty() && patch_list_.front().is_patched()) {
shmem_arbiter_->SendPatches(id_, target_buffer_, &patch_list_);
}
cur_packet_->Reset(&protobuf_stream_writer_);
uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize);
memset(header, 0, kPacketHeaderSize);
cur_packet_->set_size_field(header);
last_packet_size_field_ = header;
TracePacketHandle handle(cur_packet_.get());
cur_fragment_start_ = protobuf_stream_writer_.write_ptr();
fragmenting_packet_ = true;
if (PERFETTO_LIKELY(!drop_packets_)) {
uint16_t new_packet_count = cur_chunk_.IncrementPacketCount();
reached_max_packets_per_chunk_ =
new_packet_count == ChunkHeader::Packets::kMaxCount;
if (PERFETTO_UNLIKELY(was_dropping_packets)) {
// We've succeeded to get a new chunk from the SMB after we entered
// drop_packets_ mode. Record a marker into the new packet to indicate the
// data loss.
cur_packet_->set_previous_packet_dropped(true);
}
}
if (PERFETTO_UNLIKELY(first_packet_on_sequence_)) {
cur_packet_->set_first_packet_on_sequence(true);
first_packet_on_sequence_ = false;
}
return handle;
}
// Called by the Message. We can get here in two cases:
// 1. In the middle of writing a Message,
// when |fragmenting_packet_| == true. In this case we want to update the
// chunk header with a partial packet and start a new partial packet in the
// new chunk.
// 2. While calling ReserveBytes() for the packet header in NewTracePacket().
// In this case |fragmenting_packet_| == false and we just want a new chunk
// without creating any fragments.
protozero::ContiguousMemoryRange TraceWriterImpl::GetNewBuffer() {
if (fragmenting_packet_ && drop_packets_) {
// We can't write the remaining data of the fragmenting packet to a new
// chunk, because we have already lost some of its data in the garbage
// chunk. Thus, we will wrap around in the garbage chunk, wait until the
// current packet was completed, and then attempt to get a new chunk from
// the SMB again. Instead, if |drop_packets_| is true and
// |fragmenting_packet_| is false, we try to acquire a valid chunk because
// the SMB exhaustion might be resolved.
retry_new_chunk_after_packet_ = true;
return protozero::ContiguousMemoryRange{
&g_garbage_chunk[0], &g_garbage_chunk[0] + sizeof(g_garbage_chunk)};
}
// Attempt to grab the next chunk before finalizing the current one, so that
// we know whether we need to start dropping packets before writing the
// current packet fragment's header.
ChunkHeader::Packets packets = {};
if (fragmenting_packet_) {
packets.count = 1;
packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
}
// The memory order of the stores below doesn't really matter. This |header|
// is just a local temporary object. The GetNewChunk() call below will copy it
// into the shared buffer with the proper barriers.
ChunkHeader header = {};
header.writer_id.store(id_, std::memory_order_relaxed);
header.chunk_id.store(next_chunk_id_, std::memory_order_relaxed);
header.packets.store(packets, std::memory_order_relaxed);
SharedMemoryABI::Chunk new_chunk =
shmem_arbiter_->GetNewChunk(header, buffer_exhausted_policy_);
if (!new_chunk.is_valid()) {
// Shared memory buffer exhausted, switch into |drop_packets_| mode. We'll
// drop data until the garbage chunk has been filled once and then retry.
// If we started a packet in one of the previous (valid) chunks, we need to
// tell the service to discard it.
if (fragmenting_packet_) {
// We can only end up here if the previous chunk was a valid chunk,
// because we never try to acquire a new chunk in |drop_packets_| mode
// while fragmenting.
PERFETTO_DCHECK(!drop_packets_);
// Backfill the last fragment's header with an invalid size (too large),
// so that the service's TraceBuffer throws out the incomplete packet.
// It'll restart reading from the next chunk we submit.
WriteRedundantVarInt(SharedMemoryABI::kPacketSizeDropPacket,
cur_packet_->size_field());
// Reset the size field, since we should not write the current packet's
// size anymore after this.
cur_packet_->set_size_field(nullptr);
// We don't set kLastPacketContinuesOnNextChunk or kChunkNeedsPatching on
// the last chunk, because its last fragment will be discarded anyway.
// However, the current packet fragment points to a valid |cur_chunk_| and
// may have non-finalized nested messages which will continue in the
// garbage chunk and currently still point into |cur_chunk_|. As we are
// about to return |cur_chunk_|, we need to invalidate the size fields of
// those nested messages. Normally we move them in the |patch_list_| (see
// below) but in this case, it doesn't make sense to send patches for a
// fragment that will be discarded for sure. Thus, we clean up any size
// field references into |cur_chunk_|.
for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
nested_msg = nested_msg->nested_message()) {
uint8_t* const cur_hdr = nested_msg->size_field();
// If this is false the protozero Message has already been instructed to
// write, upon Finalize(), its size into the patch list.
bool size_field_points_within_chunk =
cur_hdr >= cur_chunk_.payload_begin() &&
cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
if (size_field_points_within_chunk)
nested_msg->set_size_field(nullptr);
}
} else if (!drop_packets_ && last_packet_size_field_) {
// If we weren't dropping packets before, we should indicate to the
// service that we're about to lose data. We do this by invalidating the
// size of the last packet in |cur_chunk_|. The service will record
// statistics about packets with kPacketSizeDropPacket size.
PERFETTO_DCHECK(cur_packet_->is_finalized());
PERFETTO_DCHECK(cur_chunk_.is_valid());
// |last_packet_size_field_| should point within |cur_chunk_|'s payload.
PERFETTO_DCHECK(last_packet_size_field_ >= cur_chunk_.payload_begin() &&
last_packet_size_field_ + kMessageLengthFieldSize <=
cur_chunk_.end());
WriteRedundantVarInt(SharedMemoryABI::kPacketSizeDropPacket,
last_packet_size_field_);
}
if (cur_chunk_.is_valid()) {
shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_),
target_buffer_, &patch_list_);
}
drop_packets_ = true;
cur_chunk_ = SharedMemoryABI::Chunk(); // Reset to an invalid chunk.
reached_max_packets_per_chunk_ = false;
retry_new_chunk_after_packet_ = false;
last_packet_size_field_ = nullptr;
PERFETTO_ANNOTATE_BENIGN_RACE_SIZED(&g_garbage_chunk,
sizeof(g_garbage_chunk),
"nobody reads the garbage chunk")
return protozero::ContiguousMemoryRange{
&g_garbage_chunk[0], &g_garbage_chunk[0] + sizeof(g_garbage_chunk)};
} // if (!new_chunk.is_valid())
PERFETTO_DCHECK(new_chunk.is_valid());
if (fragmenting_packet_) {
// We should not be fragmenting a packet after we exited drop_packets_ mode,
// because we only retry to get a new chunk when a fresh packet is started.
PERFETTO_DCHECK(!drop_packets_);
uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
PERFETTO_DCHECK(wptr >= cur_fragment_start_);
uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
PERFETTO_DCHECK(partial_size < cur_chunk_.size());
// Backfill the packet header with the fragment size.
PERFETTO_DCHECK(partial_size > 0);
cur_packet_->inc_size_already_written(partial_size);
cur_chunk_.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
WriteRedundantVarInt(partial_size, cur_packet_->size_field());
// Descend in the stack of non-finalized nested submessages (if any) and
// detour their |size_field| into the |patch_list_|. At this point we have
// to release the chunk and they cannot write anymore into that.
for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
nested_msg = nested_msg->nested_message()) {
uint8_t* cur_hdr = nested_msg->size_field();
// If this is false the protozero Message has already been instructed to
// write, upon Finalize(), its size into the patch list.
bool size_field_points_within_chunk =
cur_hdr >= cur_chunk_.payload_begin() &&
cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
if (size_field_points_within_chunk) {
cur_hdr = TraceWriterImpl::AnnotatePatch(cur_hdr);
nested_msg->set_size_field(cur_hdr);
} else {
#if PERFETTO_DCHECK_IS_ON()
// Ensure that the size field of the message points to an element of the
// patch list.
auto patch_it = std::find_if(
patch_list_.begin(), patch_list_.end(),
[cur_hdr](const Patch& p) { return &p.size_field[0] == cur_hdr; });
PERFETTO_DCHECK(patch_it != patch_list_.end());
#endif
}
} // for(nested_msg)
} // if(fragmenting_packet)
if (cur_chunk_.is_valid()) {
// ReturnCompletedChunk will consume the first patched entries from
// |patch_list_| and shrink it.
shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
&patch_list_);
}
// Switch to the new chunk.
drop_packets_ = false;
reached_max_packets_per_chunk_ = false;
retry_new_chunk_after_packet_ = false;
next_chunk_id_++;
cur_chunk_ = std::move(new_chunk);
last_packet_size_field_ = nullptr;
uint8_t* payload_begin = cur_chunk_.payload_begin();
if (fragmenting_packet_) {
cur_packet_->set_size_field(payload_begin);
last_packet_size_field_ = payload_begin;
memset(payload_begin, 0, kPacketHeaderSize);
payload_begin += kPacketHeaderSize;
cur_fragment_start_ = payload_begin;
}
return protozero::ContiguousMemoryRange{payload_begin, cur_chunk_.end()};
}
void TraceWriterImpl::FinishTracePacket() {
// If we hit this, this trace writer was created in a different process. This
// likely means that the process forked while tracing was active, and the
// forked child process tried to emit a trace event. This is not supported, as
// it would lead to two processes writing to the same tracing SMB.
PERFETTO_DCHECK(process_id_ == base::GetProcessId());
// If the caller uses TakeStreamWriter(), cur_packet_->size() is not up to
// date, only the stream writer knows the exact size.
// cur_packet_->size_field() is still used to store the start of the fragment.
if (cur_packet_->size_field()) {
uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
PERFETTO_DCHECK(wptr >= cur_fragment_start_);
uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
WriteRedundantVarInt(partial_size, last_packet_size_field_);
}
cur_packet_->Reset(&protobuf_stream_writer_);
cur_packet_->Finalize(); // To avoid the CHECK in NewTracePacket().
// Send any completed patches to the service to facilitate trace data
// recovery by the service. This should only happen when we're completing
// the first packet in a chunk which was a continuation from the previous
// chunk, i.e. at most once per chunk.
if (!patch_list_.empty() && patch_list_.front().is_patched()) {
shmem_arbiter_->SendPatches(id_, target_buffer_, &patch_list_);
}
}
uint8_t* TraceWriterImpl::AnnotatePatch(uint8_t* to_patch) {
if (!cur_chunk_.is_valid()) {
return nullptr;
}
auto offset = static_cast<uint16_t>(to_patch - cur_chunk_.payload_begin());
const ChunkID cur_chunk_id =
cur_chunk_.header()->chunk_id.load(std::memory_order_relaxed);
static_assert(kPatchSize == sizeof(Patch::PatchContent),
"Patch size mismatch");
Patch* patch = patch_list_.emplace_back(cur_chunk_id, offset);
// Check that the flag is not already set before setting it. This is not
// necessary, but it makes the code faster.
if (!(cur_chunk_.GetPacketCountAndFlags().second &
ChunkHeader::kChunkNeedsPatching)) {
cur_chunk_.SetFlag(ChunkHeader::kChunkNeedsPatching);
}
return &patch->size_field[0];
}
WriterID TraceWriterImpl::writer_id() const {
return id_;
}
// Base class definitions.
TraceWriter::TraceWriter() = default;
TraceWriter::~TraceWriter() = default;
} // namespace perfetto