| // Copyright 2017 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "base/task/task_scheduler/scheduler_worker_pool.h" |
| |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/lazy_instance.h" |
| #include "base/task/task_scheduler/delayed_task_manager.h" |
| #include "base/task/task_scheduler/task_tracker.h" |
| #include "base/threading/thread_local.h" |
| |
| namespace base { |
| namespace internal { |
| |
| namespace { |
| |
| // The number of SchedulerWorkerPool that are alive in this process. This |
| // variable should only be incremented when the SchedulerWorkerPool instances |
| // are brought up (on the main thread; before any tasks are posted) and |
| // decremented when the same instances are brought down (i.e., only when unit |
| // tests tear down the task environment and never in production). This makes the |
| // variable const while worker threads are up and as such it doesn't need to be |
| // atomic. It is used to tell when a task is posted from the main thread after |
| // the task environment was brought down in unit tests so that |
| // SchedulerWorkerPool bound TaskRunners can return false on PostTask, letting |
| // such callers know they should complete necessary work synchronously. Note: |
| // |!g_active_pools_count| is generally equivalent to |
| // |!TaskScheduler::GetInstance()| but has the advantage of being valid in |
| // task_scheduler unit tests that don't instantiate a full TaskScheduler. |
| int g_active_pools_count = 0; |
| |
| // SchedulerWorkerPool that owns the current thread, if any. |
| LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky |
| tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; |
| |
| const SchedulerWorkerPool* GetCurrentWorkerPool() { |
| return tls_current_worker_pool.Get().Get(); |
| } |
| |
| } // namespace |
| |
| // A task runner that runs tasks in parallel. |
| class SchedulerParallelTaskRunner : public TaskRunner { |
| public: |
| // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
| // long as |worker_pool| is alive. |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerParallelTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| : traits_(traits), worker_pool_(worker_pool) { |
| DCHECK(worker_pool_); |
| } |
| |
| // TaskRunner: |
| bool PostDelayedTask(const Location& from_here, |
| OnceClosure closure, |
| TimeDelta delay) override { |
| if (!g_active_pools_count) |
| return false; |
| |
| // Post the task as part of a one-off single-task Sequence. |
| return worker_pool_->PostTaskWithSequence( |
| Task(from_here, std::move(closure), delay), |
| MakeRefCounted<Sequence>(traits_)); |
| } |
| |
| bool RunsTasksInCurrentSequence() const override { |
| return GetCurrentWorkerPool() == worker_pool_; |
| } |
| |
| private: |
| ~SchedulerParallelTaskRunner() override = default; |
| |
| const TaskTraits traits_; |
| SchedulerWorkerPool* const worker_pool_; |
| |
| DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); |
| }; |
| |
| // A task runner that runs tasks in sequence. |
| class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| public: |
| // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks |
| // so long as |worker_pool| is alive. |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| : sequence_(MakeRefCounted<Sequence>(traits)), worker_pool_(worker_pool) { |
| DCHECK(worker_pool_); |
| } |
| |
| // SequencedTaskRunner: |
| bool PostDelayedTask(const Location& from_here, |
| OnceClosure closure, |
| TimeDelta delay) override { |
| if (!g_active_pools_count) |
| return false; |
| |
| Task task(from_here, std::move(closure), delay); |
| task.sequenced_task_runner_ref = this; |
| |
| // Post the task as part of |sequence_|. |
| return worker_pool_->PostTaskWithSequence(std::move(task), sequence_); |
| } |
| |
| bool PostNonNestableDelayedTask(const Location& from_here, |
| OnceClosure closure, |
| base::TimeDelta delay) override { |
| // Tasks are never nested within the task scheduler. |
| return PostDelayedTask(from_here, std::move(closure), delay); |
| } |
| |
| bool RunsTasksInCurrentSequence() const override { |
| return sequence_->token() == SequenceToken::GetForCurrentThread(); |
| } |
| |
| private: |
| ~SchedulerSequencedTaskRunner() override = default; |
| |
| // Sequence for all Tasks posted through this TaskRunner. |
| const scoped_refptr<Sequence> sequence_; |
| SchedulerWorkerPool* const worker_pool_; |
| |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
| }; |
| |
| scoped_refptr<TaskRunner> SchedulerWorkerPool::CreateTaskRunnerWithTraits( |
| const TaskTraits& traits) { |
| return MakeRefCounted<SchedulerParallelTaskRunner>(traits, this); |
| } |
| |
| scoped_refptr<SequencedTaskRunner> |
| SchedulerWorkerPool::CreateSequencedTaskRunnerWithTraits( |
| const TaskTraits& traits) { |
| return MakeRefCounted<SchedulerSequencedTaskRunner>(traits, this); |
| } |
| |
| bool SchedulerWorkerPool::PostTaskWithSequence( |
| Task task, |
| scoped_refptr<Sequence> sequence) { |
| DCHECK(task.task); |
| DCHECK(sequence); |
| |
| if (!task_tracker_->WillPostTask(&task, |
| sequence->traits().shutdown_behavior())) { |
| return false; |
| } |
| |
| if (task.delayed_run_time.is_null()) { |
| PostTaskWithSequenceNow(std::move(task), std::move(sequence)); |
| } else { |
| // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 |
| // for details. |
| CHECK(task.task); |
| delayed_task_manager_->AddDelayedTask( |
| std::move(task), BindOnce( |
| [](scoped_refptr<Sequence> sequence, |
| SchedulerWorkerPool* worker_pool, Task task) { |
| worker_pool->PostTaskWithSequenceNow( |
| std::move(task), std::move(sequence)); |
| }, |
| std::move(sequence), Unretained(this))); |
| } |
| |
| return true; |
| } |
| |
| SchedulerWorkerPool::SchedulerWorkerPool( |
| TrackedRef<TaskTracker> task_tracker, |
| DelayedTaskManager* delayed_task_manager) |
| : task_tracker_(std::move(task_tracker)), |
| delayed_task_manager_(delayed_task_manager) { |
| DCHECK(task_tracker_); |
| DCHECK(delayed_task_manager_); |
| ++g_active_pools_count; |
| } |
| |
| SchedulerWorkerPool::~SchedulerWorkerPool() { |
| --g_active_pools_count; |
| DCHECK_GE(g_active_pools_count, 0); |
| } |
| |
| void SchedulerWorkerPool::BindToCurrentThread() { |
| DCHECK(!GetCurrentWorkerPool()); |
| tls_current_worker_pool.Get().Set(this); |
| } |
| |
| void SchedulerWorkerPool::UnbindFromCurrentThread() { |
| DCHECK(GetCurrentWorkerPool()); |
| tls_current_worker_pool.Get().Set(nullptr); |
| } |
| |
| void SchedulerWorkerPool::PostTaskWithSequenceNow( |
| Task task, |
| scoped_refptr<Sequence> sequence) { |
| DCHECK(task.task); |
| DCHECK(sequence); |
| |
| // Confirm that |task| is ready to run (its delayed run time is either null or |
| // in the past). |
| DCHECK_LE(task.delayed_run_time, TimeTicks::Now()); |
| |
| const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
| if (sequence_was_empty) { |
| // Try to schedule |sequence| if it was empty before |task| was inserted |
| // into it. Otherwise, one of these must be true: |
| // - |sequence| is already scheduled, or, |
| // - The pool is running a Task from |sequence|. The pool is expected to |
| // reschedule |sequence| once it's done running the Task. |
| sequence = task_tracker_->WillScheduleSequence(std::move(sequence), this); |
| if (sequence) |
| OnCanScheduleSequence(std::move(sequence)); |
| } |
| } |
| |
| } // namespace internal |
| } // namespace base |