blob: 9bf42f52be5b37d9945272177e1e91781e9bea62 [file] [log] [blame]
/*
* Copyright (C) 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_CHANNEL_H_
#define INCLUDE_PERFETTO_EXT_BASE_THREADING_CHANNEL_H_
#include <mutex>
#include <optional>
#include "perfetto/base/compiler.h"
#include "perfetto/base/platform_handle.h"
#include "perfetto/ext/base/circular_queue.h"
#include "perfetto/ext/base/event_fd.h"
namespace perfetto {
namespace base {
// Unidirectional conduit used to send values between threads with a fixed-sized
// buffer in-between.
//
// When a channel is read from when empty or written to when full, the operation
// will not succeed and the caller can choose to a) abandon the operation,
// or b) use |read_fd| or |write_fd| (as appropriate) which will be become
// "ready" (i.e. base::TaskRunner watches will fire) when the operation would
// succeed.
//
// A channel is very similar to a Unix pipe except with the values being sent
// a) not needing to be serializable b) data does not go through the kernel.
template <typename T>
class Channel {
public:
struct ReadResult {
ReadResult(std::optional<T> _item, bool _is_closed)
: item(std::move(_item)), is_closed(_is_closed) {}
bool operator==(const ReadResult& res) const {
return item == res.item && is_closed == res.is_closed;
}
// The item read from the channel or std::nullopt if the channel is empty.
// If so, callers can use |read_fd| to be notified when a read operation
// would succeed.
std::optional<T> item;
// Indicates the channel is closed. Readers can continue to read from the
// channel and any buffered elements will be correctly returned. Moreover,
// any future reads will also have |is_closed| == true and |read_fd| will be
// ready forever.
//
// Once a ReadResult is returned with |item| == std::nullopt and
// |is_closed| == true, no further values will ever be returned.
bool is_closed;
};
struct WriteResult {
WriteResult(bool _success, bool _is_closed)
: success(std::move(_success)), is_closed(_is_closed) {}
bool operator==(const WriteResult& res) const {
return success == res.success && is_closed == res.is_closed;
}
// Returns whether the write to the channel was successful. If this is
// false, callers can use |write_fd| to be notified when future writes
// would succeed. Note that callers should also check |is_closed| as another
// writer may have closed the channel.
bool success;
// Indicates that the channel is closed. If this value is true, |success|
// will be |false| Moreover, any further writes will continue to return
// |success| == false, |is_closed| == true and |write_fd| will be ready
// forever.
bool is_closed;
};
// Creates a channel with a capacity at least as large as |capacity_hint|. The
// capacity *must* be greater than zero.
//
// Note that it's possible that a capacity > |capacity_hint| will be chosen:
// it is implementation defined when this might happen.
explicit Channel(uint32_t capacity_hint) : elements_(capacity_hint) {
PERFETTO_DCHECK(capacity_hint > 0);
// It's very important that we make sure |write_fd| is ready to avoid
// deadlocks.
write_fd_.Notify();
}
// Attempts to read from the channel and returns the result of the attempt.
// See |ReadResult| for more information on the result.
PERFETTO_WARN_UNUSED_RESULT ReadResult ReadNonBlocking() {
std::lock_guard<std::mutex> lock(mutex_);
if (elements_.empty()) {
return ReadResult(std::nullopt, is_closed_);
}
if (elements_.capacity() == elements_.size()) {
write_fd_.Notify();
}
T value = std::move(elements_.front());
elements_.pop_front();
if (!is_closed_ && elements_.empty()) {
read_fd_.Clear();
}
return ReadResult(std::move(value), is_closed_);
}
// Attempts to write to the channel and returns the result of the attempt.
// See |WriteResult| for more information on the result.
//
// IMPORTANT: if this function returns |success| == false, |element| *will
// not* be modified. This allows the caller to try again with the same value.
PERFETTO_WARN_UNUSED_RESULT WriteResult WriteNonBlocking(T&& element) {
std::lock_guard<std::mutex> lock(mutex_);
if (is_closed_) {
return WriteResult{false, true};
}
if (elements_.size() == elements_.capacity()) {
return WriteResult{false, false};
}
if (elements_.empty()) {
read_fd_.Notify();
}
elements_.emplace_back(std::move(element));
if (elements_.size() == elements_.capacity()) {
write_fd_.Clear();
}
return WriteResult{true, false};
}
// Closes the channel for to any further writes.
//
// Note: this function will make both |read_fd| and |write_fd| ready to
// avoid deadlocks. Callers should correctly handle |is_closed| being
// false from |ReadNonBlocking| and |WriteNonBlocking| to stop watching the
// fds to avoid poll returning immediately.
//
// We prefer this behaviour as it's a lot more obvious something is wrong when
// it spins and takes 100% CPU rather than silently deadlocking.
void Close() {
std::lock_guard<std::mutex> lock(mutex_);
is_closed_ = true;
// Make both fds ready to avoid deadlocks.
read_fd_.Notify();
write_fd_.Notify();
}
// Notification FD for when |ReadNonBlocking| would succeed. Can be useful to
// pass to AddFileDescriptorWatch to read data from the channel.
base::PlatformHandle read_fd() const { return read_fd_.fd(); }
// Notification FD for when |WriteNonBlocking| would succeed. Can be useful to
// pass to AddFileDescriptorWatch to send data through the channel.
base::PlatformHandle write_fd() const { return write_fd_.fd(); }
private:
std::mutex mutex_;
base::CircularQueue<T> elements_;
bool is_closed_ = false;
base::EventFd read_fd_;
base::EventFd write_fd_;
};
} // namespace base
} // namespace perfetto
#endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_CHANNEL_H_