blob: e727f84206f655ab089b9cf5dfc805a31cad1757 [file] [log] [blame]
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "perfetto/ext/base/threading/spawn.h"
#include <optional>
#include "perfetto/base/task_runner.h"
#include "perfetto/ext/base/thread_checker.h"
#include "perfetto/ext/base/threading/future.h"
#include "perfetto/ext/base/threading/poll.h"
#include "perfetto/ext/base/threading/stream.h"
namespace perfetto {
namespace base {
// Represents a future which is being polled to completion. Owned by
// SpawnHandle.
class PolledFuture {
public:
explicit PolledFuture(TaskRunner* task_runner, Future<FVoid> future)
: task_runner_(task_runner), future_(std::move(future)) {
PERFETTO_DCHECK(task_runner_->RunsTasksOnCurrentThread());
PollUntilFinish();
}
~PolledFuture() {
PERFETTO_DCHECK_THREAD(thread_checker);
ClearFutureAndWatches(interested_);
}
private:
PolledFuture(PolledFuture&&) = delete;
PolledFuture& operator=(PolledFuture&&) = delete;
void PollUntilFinish() {
PERFETTO_DCHECK(task_runner_->RunsTasksOnCurrentThread());
auto pre_poll_interested = std::move(interested_);
interested_.clear();
FuturePollResult<FVoid> res = future_->Poll(&context_);
if (!res.IsPending()) {
ClearFutureAndWatches(pre_poll_interested);
return;
}
for (PlatformHandle fd : SetDifference(pre_poll_interested, interested_)) {
task_runner_->RemoveFileDescriptorWatch(fd);
}
auto weak_this = weak_ptr_factory_.GetWeakPtr();
for (PlatformHandle fd : SetDifference(interested_, pre_poll_interested)) {
task_runner_->AddFileDescriptorWatch(fd, [weak_this, fd]() {
if (!weak_this) {
return;
}
weak_this->ready_ = {fd};
weak_this->PollUntilFinish();
});
}
}
void ClearFutureAndWatches(FlatSet<PlatformHandle> interested) {
future_ = std::nullopt;
for (PlatformHandle fd : interested) {
task_runner_->RemoveFileDescriptorWatch(fd);
}
interested_.clear();
ready_.clear();
}
static std::vector<PlatformHandle> SetDifference(
const FlatSet<PlatformHandle>& f,
const FlatSet<PlatformHandle>& s) {
std::vector<PlatformHandle> out(f.size());
auto it = std::set_difference(f.begin(), f.end(), s.begin(), s.end(),
out.begin());
out.resize(static_cast<size_t>(std::distance(out.begin(), it)));
return out;
}
TaskRunner* const task_runner_ = nullptr;
std::optional<Future<FVoid>> future_;
FlatSet<PlatformHandle> interested_;
FlatSet<PlatformHandle> ready_;
PollContext context_{&interested_, &ready_};
PERFETTO_THREAD_CHECKER(thread_checker)
// Keep this last.
WeakPtrFactory<PolledFuture> weak_ptr_factory_{this};
};
SpawnHandle::SpawnHandle(TaskRunner* task_runner,
std::function<Future<FVoid>()> fn)
: task_runner_(task_runner),
polled_future_(std::make_shared<std::unique_ptr<PolledFuture>>()) {
task_runner->PostTask(
[t = task_runner, fn = std::move(fn), p = polled_future_]() mutable {
p->reset(new PolledFuture(t, fn()));
});
}
SpawnHandle::~SpawnHandle() {
task_runner_->PostTask(
[f = std::move(polled_future_)]() mutable { f.reset(); });
}
} // namespace base
} // namespace perfetto