blob: 483afbe68af9993351ca5ce6a4f90e529c1cae29 [file] [log] [blame]
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/task_scheduler/platform_native_worker_pool_win.h"
#include "base/task/task_scheduler/task_tracker.h"
namespace base {
namespace internal {
PlatformNativeWorkerPoolWin::PlatformNativeWorkerPoolWin(
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager)
: SchedulerWorkerPool(task_tracker, delayed_task_manager) {}
PlatformNativeWorkerPoolWin::~PlatformNativeWorkerPoolWin() {
#if DCHECK_IS_ON()
// Verify join_for_testing has been called to ensure that there is no more
// outstanding work. Otherwise, work may try to de-reference an invalid
// pointer to this class.
DCHECK(join_for_testing_returned_.IsSet());
#endif
::DestroyThreadpoolEnvironment(&environment_);
::CloseThreadpoolWork(work_);
::CloseThreadpool(pool_);
}
void PlatformNativeWorkerPoolWin::Start() {
::InitializeThreadpoolEnvironment(&environment_);
pool_ = ::CreateThreadpool(nullptr);
DCHECK(pool_) << "LastError: " << ::GetLastError();
::SetThreadpoolThreadMinimum(pool_, 1);
::SetThreadpoolThreadMaximum(pool_, 256);
work_ = ::CreateThreadpoolWork(&RunNextSequence, this, &environment_);
DCHECK(work_) << "LastError: " << GetLastError();
::SetThreadpoolCallbackPool(&environment_, pool_);
size_t local_num_sequences_before_start;
{
auto transaction(priority_queue_.BeginTransaction());
DCHECK(!started_);
started_ = true;
local_num_sequences_before_start = transaction->Size();
}
// Schedule sequences added to |priority_queue_| before Start().
for (size_t i = 0; i < local_num_sequences_before_start; ++i)
::SubmitThreadpoolWork(work_);
}
void PlatformNativeWorkerPoolWin::JoinForTesting() {
::WaitForThreadpoolWorkCallbacks(work_, true);
#if DCHECK_IS_ON()
DCHECK(!join_for_testing_returned_.IsSet());
join_for_testing_returned_.Set();
#endif
}
// static
void CALLBACK PlatformNativeWorkerPoolWin::RunNextSequence(
PTP_CALLBACK_INSTANCE,
void* scheduler_worker_pool_windows_impl,
PTP_WORK) {
auto* worker_pool = static_cast<PlatformNativeWorkerPoolWin*>(
scheduler_worker_pool_windows_impl);
worker_pool->BindToCurrentThread();
scoped_refptr<Sequence> sequence = worker_pool->GetWork();
DCHECK(sequence);
sequence = worker_pool->task_tracker_->RunAndPopNextTask(
std::move(sequence.get()), worker_pool);
// Re-enqueue sequence and then submit another task to the Windows thread
// pool.
if (sequence)
worker_pool->OnCanScheduleSequence(std::move(sequence));
worker_pool->UnbindFromCurrentThread();
}
scoped_refptr<Sequence> PlatformNativeWorkerPoolWin::GetWork() {
auto transaction(priority_queue_.BeginTransaction());
// The PQ should never be empty here as there's a 1:1 correspondence between
// a call to ScheduleSequence()/SubmitThreadpoolWork() and GetWork().
DCHECK(!transaction->IsEmpty());
return transaction->PopSequence();
}
void PlatformNativeWorkerPoolWin::OnCanScheduleSequence(
scoped_refptr<Sequence> sequence) {
const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
auto transaction(priority_queue_.BeginTransaction());
transaction->Push(std::move(sequence), sequence_sort_key);
if (started_) {
// TODO(fdoray): Handle priorities by having different work objects and
// using ::SetThreadpoolCallbackPriority() and
// ::SetThreadpoolCallbackRunsLong().
::SubmitThreadpoolWork(work_);
}
}
} // namespace internal
} // namespace base