blob: 61b3974c02519aadbe9a75c14863f12b446757be [file] [log] [blame]
/*
* Copyright (C) 2017 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/tracing/core/shared_memory_arbiter_impl.h"
#include <algorithm>
#include <limits>
#include <utility>
#include "perfetto/base/logging.h"
#include "perfetto/base/task_runner.h"
#include "perfetto/base/time.h"
#include "perfetto/ext/tracing/core/commit_data_request.h"
#include "perfetto/ext/tracing/core/shared_memory.h"
#include "src/tracing/core/null_trace_writer.h"
#include "src/tracing/core/trace_writer_impl.h"
namespace perfetto {
using Chunk = SharedMemoryABI::Chunk;
namespace {
static_assert(sizeof(BufferID) == sizeof(uint16_t),
"The MaybeUnboundBufferID logic requires BufferID not to grow "
"above uint16_t.");
MaybeUnboundBufferID MakeTargetBufferIdForReservation(uint16_t reservation_id) {
// Reservation IDs are stored in the upper bits.
PERFETTO_CHECK(reservation_id > 0);
return static_cast<MaybeUnboundBufferID>(reservation_id) << 16;
}
bool IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id) {
return (buffer_id >> 16) > 0;
}
} // namespace
// static
SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout =
SharedMemoryABI::PageLayout::kPageDiv1;
// static
std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance(
SharedMemory* shared_memory,
size_t page_size,
TracingService::ProducerEndpoint* producer_endpoint,
base::TaskRunner* task_runner) {
return std::unique_ptr<SharedMemoryArbiterImpl>(
new SharedMemoryArbiterImpl(shared_memory->start(), shared_memory->size(),
page_size, producer_endpoint, task_runner));
}
// static
std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateUnboundInstance(
SharedMemory* shared_memory,
size_t page_size) {
return std::unique_ptr<SharedMemoryArbiterImpl>(new SharedMemoryArbiterImpl(
shared_memory->start(), shared_memory->size(), page_size,
/*producer_endpoint=*/nullptr, /*task_runner=*/nullptr));
}
SharedMemoryArbiterImpl::SharedMemoryArbiterImpl(
void* start,
size_t size,
size_t page_size,
TracingService::ProducerEndpoint* producer_endpoint,
base::TaskRunner* task_runner)
: producer_endpoint_(producer_endpoint),
task_runner_(task_runner),
shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size),
active_writer_ids_(kMaxWriterID),
fully_bound_(task_runner && producer_endpoint),
was_always_bound_(fully_bound_),
weak_ptr_factory_(this) {}
Chunk SharedMemoryArbiterImpl::GetNewChunk(
const SharedMemoryABI::ChunkHeader& header,
BufferExhaustedPolicy buffer_exhausted_policy,
size_t size_hint) {
PERFETTO_DCHECK(size_hint == 0); // Not implemented yet.
int stall_count = 0;
unsigned stall_interval_us = 0;
bool task_runner_runs_on_current_thread = false;
static const unsigned kMaxStallIntervalUs = 100000;
static const int kLogAfterNStalls = 3;
static const int kFlushCommitsAfterEveryNStalls = 2;
static const int kAssertAtNStalls = 200;
for (;;) {
// TODO(primiano): Probably this lock is not really required and this code
// could be rewritten leveraging only the Try* atomic operations in
// SharedMemoryABI. But let's not be too adventurous for the moment.
{
std::unique_lock<std::mutex> scoped_lock(lock_);
// If ever unbound, we do not support stalling. In theory, we could
// support stalling for TraceWriters created after the arbiter and startup
// buffer reservations were bound, but to avoid raciness between the
// creation of startup writers and binding, we categorically forbid kStall
// mode.
PERFETTO_DCHECK(was_always_bound_ ||
buffer_exhausted_policy == BufferExhaustedPolicy::kDrop);
task_runner_runs_on_current_thread =
task_runner_ && task_runner_->RunsTasksOnCurrentThread();
// If more than half of the SMB.size() is filled with completed chunks for
// which we haven't notified the service yet (i.e. they are still enqueued
// in |commit_data_req_|), force a synchronous CommitDataRequest() even if
// we acquire a chunk, to reduce the likeliness of stalling the writer.
//
// We can only do this if we're writing on the same thread that we access
// the producer endpoint on, since we cannot notify the producer endpoint
// to commit synchronously on a different thread. Attempting to flush
// synchronously on another thread will lead to subtle bugs caused by
// out-of-order commit requests (crbug.com/919187#c28).
bool should_commit_synchronously =
task_runner_runs_on_current_thread &&
buffer_exhausted_policy == BufferExhaustedPolicy::kStall &&
commit_data_req_ && bytes_pending_commit_ >= shmem_abi_.size() / 2;
const size_t initial_page_idx = page_idx_;
for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
bool is_new_page = false;
// TODO(primiano): make the page layout dynamic.
auto layout = SharedMemoryArbiterImpl::default_page_layout;
if (shmem_abi_.is_page_free(page_idx_)) {
// TODO(primiano): Use the |size_hint| here to decide the layout.
is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout);
}
uint32_t free_chunks;
if (is_new_page) {
free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
} else {
free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
}
for (uint32_t chunk_idx = 0; free_chunks;
chunk_idx++, free_chunks >>= 1) {
if (!(free_chunks & 1))
continue;
// We found a free chunk.
Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
page_idx_, chunk_idx, &header);
if (!chunk.is_valid())
continue;
if (stall_count > kLogAfterNStalls) {
PERFETTO_LOG("Recovered from stall after %d iterations",
stall_count);
}
if (should_commit_synchronously) {
// We can't flush while holding the lock.
scoped_lock.unlock();
FlushPendingCommitDataRequests();
return chunk;
} else {
return chunk;
}
}
}
} // scoped_lock
if (buffer_exhausted_policy == BufferExhaustedPolicy::kDrop) {
PERFETTO_DLOG("Shared memory buffer exhausted, returning invalid Chunk!");
return Chunk();
}
// Stalling is not supported if we were ever unbound (see earlier comment).
PERFETTO_CHECK(was_always_bound_);
// All chunks are taken (either kBeingWritten by us or kBeingRead by the
// Service).
if (stall_count++ == kLogAfterNStalls) {
PERFETTO_LOG("Shared memory buffer overrun! Stalling");
}
if (stall_count == kAssertAtNStalls) {
PERFETTO_FATAL(
"Shared memory buffer max stall count exceeded; possible deadlock");
}
// If the IPC thread itself is stalled because the current process has
// filled up the SMB, we need to make sure that the service can process and
// purge the chunks written by our process, by flushing any pending commit
// requests. Because other threads in our process can continue to
// concurrently grab, fill and commit any chunks purged by the service, it
// is possible that the SMB remains full and the IPC thread remains stalled,
// needing to flush the concurrently queued up commits again. This is
// particularly likely with in-process perfetto service where the IPC thread
// is the service thread. To avoid remaining stalled forever in such a
// situation, we attempt to flush periodically after every N stalls.
if (stall_count % kFlushCommitsAfterEveryNStalls == 0 &&
task_runner_runs_on_current_thread) {
// TODO(primiano): sending the IPC synchronously is a temporary workaround
// until the backpressure logic in probes_producer is sorted out. Until
// then the risk is that we stall the message loop waiting for the tracing
// service to consume the shared memory buffer (SMB) and, for this reason,
// never run the task that tells the service to purge the SMB. This must
// happen iff we are on the IPC thread, not doing this will cause
// deadlocks, doing this on the wrong thread causes out-of-order data
// commits (crbug.com/919187#c28).
FlushPendingCommitDataRequests();
} else {
base::SleepMicroseconds(stall_interval_us);
stall_interval_us =
std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8);
}
}
}
void SharedMemoryArbiterImpl::ReturnCompletedChunk(
Chunk chunk,
MaybeUnboundBufferID target_buffer,
PatchList* patch_list) {
PERFETTO_DCHECK(chunk.is_valid());
const WriterID writer_id = chunk.writer_id();
UpdateCommitDataRequest(std::move(chunk), writer_id, target_buffer,
patch_list);
}
void SharedMemoryArbiterImpl::SendPatches(WriterID writer_id,
MaybeUnboundBufferID target_buffer,
PatchList* patch_list) {
PERFETTO_DCHECK(!patch_list->empty() && patch_list->front().is_patched());
UpdateCommitDataRequest(Chunk(), writer_id, target_buffer, patch_list);
}
void SharedMemoryArbiterImpl::UpdateCommitDataRequest(
Chunk chunk,
WriterID writer_id,
MaybeUnboundBufferID target_buffer,
PatchList* patch_list) {
// Note: chunk will be invalid if the call came from SendPatches().
base::TaskRunner* task_runner_to_post_delayed_callback_on = nullptr;
// The delay with which the flush will be posted.
uint32_t flush_delay_ms = 0;
base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
{
std::lock_guard<std::mutex> scoped_lock(lock_);
if (!commit_data_req_) {
commit_data_req_.reset(new CommitDataRequest());
// Flushing the commit is only supported while we're |fully_bound_|. If we
// aren't, we'll flush when |fully_bound_| is updated.
if (fully_bound_ && !delayed_flush_scheduled_) {
weak_this = weak_ptr_factory_.GetWeakPtr();
task_runner_to_post_delayed_callback_on = task_runner_;
flush_delay_ms = batch_commits_duration_ms_;
delayed_flush_scheduled_ = true;
}
}
// If a valid chunk is specified, return it and attach it to the request.
if (chunk.is_valid()) {
PERFETTO_DCHECK(chunk.writer_id() == writer_id);
uint8_t chunk_idx = chunk.chunk_idx();
bytes_pending_commit_ += chunk.size();
size_t page_idx;
// If the chunk needs patching, it should not be marked as complete yet,
// because this would indicate to the service that the producer will not
// be writing to it anymore, while the producer might still apply patches
// to the chunk later on. In particular, when re-reading (e.g. because of
// periodic scraping) a completed chunk, the service expects the flags of
// that chunk not to be removed between reads. So, let's say the producer
// marked the chunk as complete here and the service then read it for the
// first time. If the producer then fully patched the chunk, thus removing
// the kChunkNeedsPatching flag, and the service re-read the chunk after
// the patching, the service would be thrown off by the removed flag.
if (direct_patching_enabled_ &&
(chunk.GetPacketCountAndFlags().second &
SharedMemoryABI::ChunkHeader::kChunkNeedsPatching)) {
page_idx = shmem_abi_.GetPageAndChunkIndex(std::move(chunk)).first;
} else {
// If the chunk doesn't need patching, we can mark it as complete
// immediately. This allows the service to read it in full while
// scraping, which would not be the case if the chunk was left in a
// kChunkBeingWritten state.
page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
}
// DO NOT access |chunk| after this point, it has been std::move()-d
// above.
CommitDataRequest::ChunksToMove* ctm =
commit_data_req_->add_chunks_to_move();
ctm->set_page(static_cast<uint32_t>(page_idx));
ctm->set_chunk(chunk_idx);
ctm->set_target_buffer(target_buffer);
}
// Process the completed patches for previous chunks from the |patch_list|.
CommitDataRequest::ChunkToPatch* last_patch_req = nullptr;
while (!patch_list->empty() && patch_list->front().is_patched()) {
Patch curr_patch = patch_list->front();
patch_list->pop_front();
// Patches for the same chunk are contiguous in the |patch_list|. So, to
// determine if there are any other patches that apply to the chunk that
// is being patched, check if the next patch in the |patch_list| applies
// to the same chunk.
bool chunk_needs_more_patching =
!patch_list->empty() &&
patch_list->front().chunk_id == curr_patch.chunk_id;
if (direct_patching_enabled_ &&
TryDirectPatchLocked(writer_id, curr_patch,
chunk_needs_more_patching)) {
continue;
}
// The chunk that this patch applies to has already been released to the
// service, so it cannot be patches here. Add the patch to the commit data
// request, so that it can be sent to the service and applied there.
if (!last_patch_req ||
last_patch_req->chunk_id() != curr_patch.chunk_id) {
last_patch_req = commit_data_req_->add_chunks_to_patch();
last_patch_req->set_writer_id(writer_id);
last_patch_req->set_chunk_id(curr_patch.chunk_id);
last_patch_req->set_target_buffer(target_buffer);
}
auto* patch = last_patch_req->add_patches();
patch->set_offset(curr_patch.offset);
patch->set_data(&curr_patch.size_field[0], curr_patch.size_field.size());
}
// Patches are enqueued in the |patch_list| in order and are notified to
// the service when the chunk is returned. The only case when the current
// patch list is incomplete is if there is an unpatched entry at the head of
// the |patch_list| that belongs to the same ChunkID as the last one we are
// about to send to the service.
if (last_patch_req && !patch_list->empty() &&
patch_list->front().chunk_id == last_patch_req->chunk_id()) {
last_patch_req->set_has_more_patches(true);
}
// If the buffer is filling up or if we are given a patch for a chunk
// that was already sent to the service, we don't want to wait for the next
// delayed flush to happen and we flush immediately. Otherwise, if we
// accumulate the patch and a crash occurs before the patch is sent, the
// service will not know of the patch and won't be able to reconstruct the
// trace.
if (fully_bound_ &&
(last_patch_req || bytes_pending_commit_ >= shmem_abi_.size() / 2)) {
weak_this = weak_ptr_factory_.GetWeakPtr();
task_runner_to_post_delayed_callback_on = task_runner_;
flush_delay_ms = 0;
}
} // scoped_lock(lock_)
// We shouldn't post tasks while locked.
// |task_runner_to_post_delayed_callback_on| remains valid after unlocking,
// because |task_runner_| is never reset.
if (task_runner_to_post_delayed_callback_on) {
task_runner_to_post_delayed_callback_on->PostDelayedTask(
[weak_this] {
if (!weak_this)
return;
{
std::lock_guard<std::mutex> scoped_lock(weak_this->lock_);
// Clear |delayed_flush_scheduled_|, allowing the next call to
// UpdateCommitDataRequest to start another batching period.
weak_this->delayed_flush_scheduled_ = false;
}
weak_this->FlushPendingCommitDataRequests();
},
flush_delay_ms);
}
}
bool SharedMemoryArbiterImpl::TryDirectPatchLocked(
WriterID writer_id,
const Patch& patch,
bool chunk_needs_more_patching) {
// Search the chunks that are being batched in |commit_data_req_| for a chunk
// that needs patching and that matches the provided |writer_id| and
// |patch.chunk_id|. Iterate |commit_data_req_| in reverse, since
// |commit_data_req_| is appended to at the end with newly-returned chunks,
// and patches are more likely to apply to chunks that have been returned
// recently.
SharedMemoryABI::Chunk chunk;
bool chunk_found = false;
auto& chunks_to_move = commit_data_req_->chunks_to_move();
for (auto ctm_it = chunks_to_move.rbegin(); ctm_it != chunks_to_move.rend();
++ctm_it) {
uint32_t layout = shmem_abi_.GetPageLayout(ctm_it->page());
auto chunk_state =
shmem_abi_.GetChunkStateFromLayout(layout, ctm_it->chunk());
// Note: the subset of |commit_data_req_| chunks that still need patching is
// also the subset of chunks that are still being written to. The rest of
// the chunks in |commit_data_req_| do not need patching and have already
// been marked as complete.
if (chunk_state != SharedMemoryABI::kChunkBeingWritten)
continue;
chunk =
shmem_abi_.GetChunkUnchecked(ctm_it->page(), layout, ctm_it->chunk());
if (chunk.writer_id() == writer_id &&
chunk.header()->chunk_id.load(std::memory_order_relaxed) ==
patch.chunk_id) {
chunk_found = true;
break;
}
}
if (!chunk_found) {
// The chunk has already been committed to the service and the patch cannot
// be applied in the producer.
return false;
}
// Apply the patch.
size_t page_idx;
uint8_t chunk_idx;
std::tie(page_idx, chunk_idx) = shmem_abi_.GetPageAndChunkIndex(chunk);
PERFETTO_DCHECK(shmem_abi_.GetChunkState(page_idx, chunk_idx) ==
SharedMemoryABI::ChunkState::kChunkBeingWritten);
auto chunk_begin = chunk.payload_begin();
uint8_t* ptr = chunk_begin + patch.offset;
PERFETTO_CHECK(ptr <= chunk.end() - SharedMemoryABI::kPacketHeaderSize);
// DCHECK that we are writing into a zero-filled size field and not into
// valid data. It relies on ScatteredStreamWriter::ReserveBytes() to
// zero-fill reservations in debug builds.
const char zero[SharedMemoryABI::kPacketHeaderSize]{};
PERFETTO_DCHECK(memcmp(ptr, &zero, SharedMemoryABI::kPacketHeaderSize) == 0);
memcpy(ptr, &patch.size_field[0], SharedMemoryABI::kPacketHeaderSize);
if (!chunk_needs_more_patching) {
// Mark that the chunk doesn't need more patching and mark it as complete,
// as the producer will not write to it anymore. This allows the service to
// read the chunk in full while scraping, which would not be the case if the
// chunk was left in a kChunkBeingWritten state.
chunk.ClearNeedsPatchingFlag();
shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
}
return true;
}
void SharedMemoryArbiterImpl::SetBatchCommitsDuration(
uint32_t batch_commits_duration_ms) {
std::lock_guard<std::mutex> scoped_lock(lock_);
batch_commits_duration_ms_ = batch_commits_duration_ms;
}
bool SharedMemoryArbiterImpl::EnableDirectSMBPatching() {
std::lock_guard<std::mutex> scoped_lock(lock_);
if (!direct_patching_supported_by_service_) {
return false;
}
return direct_patching_enabled_ = true;
}
void SharedMemoryArbiterImpl::SetDirectSMBPatchingSupportedByService() {
std::lock_guard<std::mutex> scoped_lock(lock_);
direct_patching_supported_by_service_ = true;
}
// This function is quite subtle. When making changes keep in mind these two
// challenges:
// 1) If the producer stalls and we happen to be on the |task_runner_| IPC
// thread (or, for in-process cases, on the same thread where
// TracingServiceImpl lives), the CommitData() call must be synchronous and
// not posted, to avoid deadlocks.
// 2) When different threads hit this function, we must guarantee that we don't
// accidentally make commits out of order. See commit 4e4fe8f56ef and
// crbug.com/919187 for more context.
void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
std::function<void()> callback) {
std::unique_ptr<CommitDataRequest> req;
{
std::unique_lock<std::mutex> scoped_lock(lock_);
// Flushing is only supported while |fully_bound_|, and there may still be
// unbound startup trace writers. If so, skip the commit for now - it'll be
// done when |fully_bound_| is updated.
if (!fully_bound_) {
if (callback)
pending_flush_callbacks_.push_back(callback);
return;
}
// May be called by TraceWriterImpl on any thread.
base::TaskRunner* task_runner = task_runner_;
if (!task_runner->RunsTasksOnCurrentThread()) {
// We shouldn't post a task while holding a lock. |task_runner| remains
// valid after unlocking, because |task_runner_| is never reset.
scoped_lock.unlock();
auto weak_this = weak_ptr_factory_.GetWeakPtr();
task_runner->PostTask([weak_this, callback] {
if (weak_this)
weak_this->FlushPendingCommitDataRequests(std::move(callback));
});
return;
}
// |commit_data_req_| could have become a nullptr, for example when a forced
// sync flush happens in GetNewChunk().
if (commit_data_req_) {
// Make sure any placeholder buffer IDs from StartupWriters are replaced
// before sending the request.
bool all_placeholders_replaced =
ReplaceCommitPlaceholderBufferIdsLocked();
// We're |fully_bound_|, thus all writers are bound and all placeholders
// should have been replaced.
PERFETTO_DCHECK(all_placeholders_replaced);
// In order to allow patching in the producer we delay the kChunkComplete
// transition and keep batched chunks in the kChunkBeingWritten state.
// Since we are about to notify the service of all batched chunks, it will
// not be possible to apply any more patches to them and we need to move
// them to kChunkComplete - otherwise the service won't look at them.
for (auto& ctm : commit_data_req_->chunks_to_move()) {
uint32_t layout = shmem_abi_.GetPageLayout(ctm.page());
auto chunk_state =
shmem_abi_.GetChunkStateFromLayout(layout, ctm.chunk());
// Note: the subset of |commit_data_req_| chunks that still need
// patching is also the subset of chunks that are still being written
// to. The rest of the chunks in |commit_data_req_| do not need patching
// and have already been marked as complete.
if (chunk_state != SharedMemoryABI::kChunkBeingWritten)
continue;
SharedMemoryABI::Chunk chunk =
shmem_abi_.GetChunkUnchecked(ctm.page(), layout, ctm.chunk());
shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
}
req = std::move(commit_data_req_);
bytes_pending_commit_ = 0;
}
} // scoped_lock
if (req) {
producer_endpoint_->CommitData(*req, callback);
} else if (callback) {
// If |req| was nullptr, it means that an enqueued deferred commit was
// executed just before this. At this point send an empty commit request
// to the service, just to linearize with it and give the guarantee to the
// caller that the data has been flushed into the service.
producer_endpoint_->CommitData(CommitDataRequest(), std::move(callback));
}
}
bool SharedMemoryArbiterImpl::TryShutdown() {
std::lock_guard<std::mutex> scoped_lock(lock_);
did_shutdown_ = true;
// Shutdown is safe if there are no active trace writers for this arbiter.
return active_writer_ids_.IsEmpty();
}
std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter(
BufferID target_buffer,
BufferExhaustedPolicy buffer_exhausted_policy) {
PERFETTO_CHECK(target_buffer > 0);
return CreateTraceWriterInternal(target_buffer, buffer_exhausted_policy);
}
std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateStartupTraceWriter(
uint16_t target_buffer_reservation_id) {
return CreateTraceWriterInternal(
MakeTargetBufferIdForReservation(target_buffer_reservation_id),
BufferExhaustedPolicy::kDrop);
}
void SharedMemoryArbiterImpl::BindToProducerEndpoint(
TracingService::ProducerEndpoint* producer_endpoint,
base::TaskRunner* task_runner) {
PERFETTO_DCHECK(producer_endpoint && task_runner);
PERFETTO_DCHECK(task_runner->RunsTasksOnCurrentThread());
bool should_flush = false;
std::function<void()> flush_callback;
{
std::lock_guard<std::mutex> scoped_lock(lock_);
PERFETTO_CHECK(!fully_bound_);
PERFETTO_CHECK(!producer_endpoint_ && !task_runner_);
producer_endpoint_ = producer_endpoint;
task_runner_ = task_runner;
// Now that we're bound to a task runner, also reset the WeakPtrFactory to
// it. Because this code runs on the task runner, the factory's weak
// pointers will be valid on it.
weak_ptr_factory_.Reset(this);
// All writers registered so far should be startup trace writers, since
// the producer cannot feasibly know the target buffer for any future
// session yet.
for (const auto& entry : pending_writers_) {
PERFETTO_CHECK(IsReservationTargetBufferId(entry.second));
}
// If all buffer reservations are bound, we can flush pending commits.
if (UpdateFullyBoundLocked()) {
should_flush = true;
flush_callback = TakePendingFlushCallbacksLocked();
}
} // scoped_lock
// Attempt to flush any pending commits (and run pending flush callbacks). If
// there are none, this will have no effect. If we ended up in a race that
// changed |fully_bound_| back to false, the commit will happen once we become
// |fully_bound_| again.
if (should_flush)
FlushPendingCommitDataRequests(flush_callback);
}
void SharedMemoryArbiterImpl::BindStartupTargetBuffer(
uint16_t target_buffer_reservation_id,
BufferID target_buffer_id) {
PERFETTO_DCHECK(target_buffer_id > 0);
std::unique_lock<std::mutex> scoped_lock(lock_);
// We should already be bound to an endpoint.
PERFETTO_CHECK(producer_endpoint_);
PERFETTO_CHECK(task_runner_);
PERFETTO_CHECK(task_runner_->RunsTasksOnCurrentThread());
BindStartupTargetBufferImpl(std::move(scoped_lock),
target_buffer_reservation_id, target_buffer_id);
}
void SharedMemoryArbiterImpl::AbortStartupTracingForReservation(
uint16_t target_buffer_reservation_id) {
std::unique_lock<std::mutex> scoped_lock(lock_);
// If we are already bound to an arbiter, we may need to flush after aborting
// the session, and thus should be running on the arbiter's task runner.
if (task_runner_ && !task_runner_->RunsTasksOnCurrentThread()) {
// We shouldn't post tasks while locked.
auto* task_runner = task_runner_;
scoped_lock.unlock();
auto weak_this = weak_ptr_factory_.GetWeakPtr();
task_runner->PostTask([weak_this, target_buffer_reservation_id]() {
if (!weak_this)
return;
weak_this->AbortStartupTracingForReservation(
target_buffer_reservation_id);
});
return;
}
// Bind the target buffer reservation to an invalid buffer (ID 0), so that
// existing commits, as well as future commits (of currently acquired chunks),
// will be released as free free by the service but otherwise ignored (i.e.
// not copied into any valid target buffer).
BindStartupTargetBufferImpl(std::move(scoped_lock),
target_buffer_reservation_id,
/*target_buffer_id=*/kInvalidBufferId);
}
void SharedMemoryArbiterImpl::BindStartupTargetBufferImpl(
std::unique_lock<std::mutex> scoped_lock,
uint16_t target_buffer_reservation_id,
BufferID target_buffer_id) {
// We should already be bound to an endpoint if the target buffer is valid.
PERFETTO_DCHECK((producer_endpoint_ && task_runner_) ||
target_buffer_id == kInvalidBufferId);
PERFETTO_DLOG("Binding startup target buffer reservation %" PRIu16
" to buffer %" PRIu16,
target_buffer_reservation_id, target_buffer_id);
MaybeUnboundBufferID reserved_id =
MakeTargetBufferIdForReservation(target_buffer_reservation_id);
bool should_flush = false;
std::function<void()> flush_callback;
std::vector<std::pair<WriterID, BufferID>> writers_to_register;
TargetBufferReservation& reservation =
target_buffer_reservations_[reserved_id];
PERFETTO_CHECK(!reservation.resolved);
reservation.resolved = true;
reservation.target_buffer = target_buffer_id;
// Collect trace writers associated with the reservation.
for (auto it = pending_writers_.begin(); it != pending_writers_.end();) {
if (it->second == reserved_id) {
// No need to register writers that have an invalid target buffer.
if (target_buffer_id != kInvalidBufferId) {
writers_to_register.push_back(
std::make_pair(it->first, target_buffer_id));
}
it = pending_writers_.erase(it);
} else {
it++;
}
}
// If all buffer reservations are bound, we can flush pending commits.
if (UpdateFullyBoundLocked()) {
should_flush = true;
flush_callback = TakePendingFlushCallbacksLocked();
}
scoped_lock.unlock();
// Register any newly bound trace writers with the service.
for (const auto& writer_and_target_buffer : writers_to_register) {
producer_endpoint_->RegisterTraceWriter(writer_and_target_buffer.first,
writer_and_target_buffer.second);
}
// Attempt to flush any pending commits (and run pending flush callbacks). If
// there are none, this will have no effect. If we ended up in a race that
// changed |fully_bound_| back to false, the commit will happen once we become
// |fully_bound_| again.
if (should_flush)
FlushPendingCommitDataRequests(flush_callback);
}
std::function<void()>
SharedMemoryArbiterImpl::TakePendingFlushCallbacksLocked() {
if (pending_flush_callbacks_.empty())
return std::function<void()>();
std::vector<std::function<void()>> pending_flush_callbacks;
pending_flush_callbacks.swap(pending_flush_callbacks_);
// Capture the callback list into the lambda by copy.
return [pending_flush_callbacks]() {
for (auto& callback : pending_flush_callbacks)
callback();
};
}
void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
base::TaskRunner* task_runner_to_commit_on = nullptr;
{
std::lock_guard<std::mutex> scoped_lock(lock_);
// If a commit_data_req_ exists it means that somebody else already posted a
// FlushPendingCommitDataRequests() task.
if (!commit_data_req_) {
commit_data_req_.reset(new CommitDataRequest());
// Flushing the commit is only supported while we're |fully_bound_|. If we
// aren't, we'll flush when |fully_bound_| is updated.
if (fully_bound_)
task_runner_to_commit_on = task_runner_;
} else {
// If there is another request queued and that also contains is a reply
// to a flush request, reply with the highest id.
req_id = std::max(req_id, commit_data_req_->flush_request_id());
}
commit_data_req_->set_flush_request_id(req_id);
} // scoped_lock
// We shouldn't post tasks while locked. |task_runner_to_commit_on|
// remains valid after unlocking, because |task_runner_| is never reset.
if (task_runner_to_commit_on) {
auto weak_this = weak_ptr_factory_.GetWeakPtr();
task_runner_to_commit_on->PostTask([weak_this] {
if (weak_this)
weak_this->FlushPendingCommitDataRequests();
});
}
}
std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriterInternal(
MaybeUnboundBufferID target_buffer,
BufferExhaustedPolicy buffer_exhausted_policy) {
WriterID id;
base::TaskRunner* task_runner_to_register_on = nullptr;
{
std::lock_guard<std::mutex> scoped_lock(lock_);
if (did_shutdown_)
return std::unique_ptr<TraceWriter>(new NullTraceWriter());
id = active_writer_ids_.Allocate();
if (!id)
return std::unique_ptr<TraceWriter>(new NullTraceWriter());
PERFETTO_DCHECK(!pending_writers_.count(id));
if (IsReservationTargetBufferId(target_buffer)) {
// If the reservation is new, mark it as unbound in
// |target_buffer_reservations_|. Otherwise, if the reservation was
// already bound, choose the bound buffer ID now.
auto it_and_inserted = target_buffer_reservations_.insert(
{target_buffer, TargetBufferReservation()});
if (it_and_inserted.first->second.resolved)
target_buffer = it_and_inserted.first->second.target_buffer;
}
if (IsReservationTargetBufferId(target_buffer)) {
// The arbiter and/or startup buffer reservations are not bound yet, so
// buffer the registration of the writer until after we're bound.
pending_writers_[id] = target_buffer;
// Mark the arbiter as not fully bound, since we now have at least one
// unbound trace writer / target buffer reservation.
fully_bound_ = false;
was_always_bound_ = false;
} else if (target_buffer != kInvalidBufferId) {
// Trace writer is bound, so arbiter should be bound to an endpoint, too.
PERFETTO_CHECK(producer_endpoint_ && task_runner_);
task_runner_to_register_on = task_runner_;
}
// All trace writers must use kDrop policy if the arbiter ever becomes
// unbound.
bool uses_drop_policy =
buffer_exhausted_policy == BufferExhaustedPolicy::kDrop;
all_writers_have_drop_policy_ &= uses_drop_policy;
PERFETTO_DCHECK(fully_bound_ || uses_drop_policy);
PERFETTO_CHECK(fully_bound_ || all_writers_have_drop_policy_);
PERFETTO_CHECK(was_always_bound_ || uses_drop_policy);
} // scoped_lock
// We shouldn't post tasks while locked. |task_runner_to_register_on|
// remains valid after unlocking, because |task_runner_| is never reset.
if (task_runner_to_register_on) {
auto weak_this = weak_ptr_factory_.GetWeakPtr();
task_runner_to_register_on->PostTask([weak_this, id, target_buffer] {
if (weak_this)
weak_this->producer_endpoint_->RegisterTraceWriter(id, target_buffer);
});
}
return std::unique_ptr<TraceWriter>(
new TraceWriterImpl(this, id, target_buffer, buffer_exhausted_policy));
}
void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
base::TaskRunner* task_runner = nullptr;
{
std::lock_guard<std::mutex> scoped_lock(lock_);
active_writer_ids_.Free(id);
auto it = pending_writers_.find(id);
if (it != pending_writers_.end()) {
// Writer hasn't been bound yet and thus also not yet registered with the
// service.
pending_writers_.erase(it);
return;
}
// A trace writer from an aborted session may be destroyed before the
// arbiter is bound to a task runner. In that case, it was never registered
// with the service.
if (!task_runner_)
return;
task_runner = task_runner_;
} // scoped_lock
// We shouldn't post tasks while locked. |task_runner| remains valid after
// unlocking, because |task_runner_| is never reset.
auto weak_this = weak_ptr_factory_.GetWeakPtr();
task_runner->PostTask([weak_this, id] {
if (weak_this)
weak_this->producer_endpoint_->UnregisterTraceWriter(id);
});
}
bool SharedMemoryArbiterImpl::ReplaceCommitPlaceholderBufferIdsLocked() {
if (!commit_data_req_)
return true;
bool all_placeholders_replaced = true;
for (auto& chunk : *commit_data_req_->mutable_chunks_to_move()) {
if (!IsReservationTargetBufferId(chunk.target_buffer()))
continue;
const auto it = target_buffer_reservations_.find(chunk.target_buffer());
PERFETTO_DCHECK(it != target_buffer_reservations_.end());
if (!it->second.resolved) {
all_placeholders_replaced = false;
continue;
}
chunk.set_target_buffer(it->second.target_buffer);
}
for (auto& chunk : *commit_data_req_->mutable_chunks_to_patch()) {
if (!IsReservationTargetBufferId(chunk.target_buffer()))
continue;
const auto it = target_buffer_reservations_.find(chunk.target_buffer());
PERFETTO_DCHECK(it != target_buffer_reservations_.end());
if (!it->second.resolved) {
all_placeholders_replaced = false;
continue;
}
chunk.set_target_buffer(it->second.target_buffer);
}
return all_placeholders_replaced;
}
bool SharedMemoryArbiterImpl::UpdateFullyBoundLocked() {
if (!producer_endpoint_) {
PERFETTO_DCHECK(!fully_bound_);
return false;
}
// We're fully bound if all target buffer reservations have a valid associated
// BufferID.
fully_bound_ = std::none_of(
target_buffer_reservations_.begin(), target_buffer_reservations_.end(),
[](std::pair<MaybeUnboundBufferID, TargetBufferReservation> entry) {
return !entry.second.resolved;
});
if (!fully_bound_)
was_always_bound_ = false;
return fully_bound_;
}
} // namespace perfetto