| // Copyright 2016 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "base/task/thread_pool/sequence.h" |
| |
| #include <utility> |
| |
| #include "base/check.h" |
| #include "base/critical_closure.h" |
| #include "base/feature_list.h" |
| #include "base/functional/bind.h" |
| #include "base/memory/ptr_util.h" |
| #include "base/task/task_features.h" |
| #include "base/time/time.h" |
| |
| namespace base { |
| namespace internal { |
| |
| namespace { |
| |
| // Asserts that a lock is acquired and annotates the scope such that |
| // base/thread_annotations.h can recognize that the lock is acquired. |
| class SCOPED_LOCKABLE AnnotateLockAcquired { |
| public: |
| explicit AnnotateLockAcquired(const CheckedLock& lock) |
| EXCLUSIVE_LOCK_FUNCTION(lock) |
| : acquired_lock_(lock) { |
| acquired_lock_->AssertAcquired(); |
| } |
| |
| ~AnnotateLockAcquired() UNLOCK_FUNCTION() { |
| acquired_lock_->AssertAcquired(); |
| } |
| |
| private: |
| const raw_ref<const CheckedLock> acquired_lock_; |
| }; |
| |
| void MaybeMakeCriticalClosure(TaskShutdownBehavior shutdown_behavior, |
| Task& task) { |
| switch (shutdown_behavior) { |
| case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN: |
| // Nothing to do. |
| break; |
| case TaskShutdownBehavior::SKIP_ON_SHUTDOWN: |
| // MakeCriticalClosure() is arguably useful for SKIP_ON_SHUTDOWN, possibly |
| // in combination with is_immediate=false. However, SKIP_ON_SHUTDOWN is |
| // the default and hence the theoretical benefits don't warrant the |
| // performance implications. |
| break; |
| case TaskShutdownBehavior::BLOCK_SHUTDOWN: |
| task.task = |
| MakeCriticalClosure(task.posted_from, std::move(task.task), |
| /*is_immediate=*/task.delayed_run_time.is_null()); |
| break; |
| } |
| } |
| |
| } // namespace |
| |
| Sequence::Transaction::Transaction(Sequence* sequence) |
| : TaskSource::Transaction(sequence) {} |
| |
| Sequence::Transaction::Transaction(Sequence::Transaction&& other) = default; |
| |
| Sequence::Transaction::~Transaction() = default; |
| |
| bool Sequence::Transaction::WillPushImmediateTask() { |
| // In a Transaction. |
| AnnotateLockAcquired annotate(sequence()->lock_); |
| |
| bool was_immediate = |
| sequence()->is_immediate_.exchange(true, std::memory_order_relaxed); |
| return !was_immediate; |
| } |
| |
| void Sequence::Transaction::PushImmediateTask(Task task) { |
| // In a Transaction. |
| AnnotateLockAcquired annotate(sequence()->lock_); |
| |
| // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 |
| // for details. |
| CHECK(task.task); |
| DCHECK(!task.queue_time.is_null()); |
| DCHECK(sequence()->is_immediate_.load(std::memory_order_relaxed)); |
| |
| bool was_unretained = sequence()->IsEmpty() && !sequence()->has_worker_; |
| bool queue_was_empty = sequence()->queue_.empty(); |
| |
| MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task); |
| |
| sequence()->queue_.push(std::move(task)); |
| |
| if (queue_was_empty) |
| sequence()->UpdateReadyTimes(); |
| |
| // AddRef() matched by manual Release() when the sequence has no more tasks |
| // to run (in DidProcessTask() or Clear()). |
| if (was_unretained && sequence()->task_runner()) |
| sequence()->task_runner()->AddRef(); |
| } |
| |
| bool Sequence::Transaction::PushDelayedTask(Task task) { |
| // In a Transaction. |
| AnnotateLockAcquired annotate(sequence()->lock_); |
| |
| // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 |
| // for details. |
| CHECK(task.task); |
| DCHECK(!task.queue_time.is_null()); |
| DCHECK(!task.delayed_run_time.is_null()); |
| |
| bool top_will_change = sequence()->DelayedSortKeyWillChange(task); |
| bool was_empty = sequence()->IsEmpty(); |
| |
| MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task); |
| |
| sequence()->delayed_queue_.insert(std::move(task)); |
| |
| if (sequence()->queue_.empty()) |
| sequence()->UpdateReadyTimes(); |
| |
| // AddRef() matched by manual Release() when the sequence has no more tasks |
| // to run (in DidProcessTask() or Clear()). |
| if (was_empty && !sequence()->has_worker_ && sequence()->task_runner()) |
| sequence()->task_runner()->AddRef(); |
| |
| return top_will_change; |
| } |
| |
| // Delayed tasks are ordered by latest_delayed_run_time(). The top task may |
| // not be the first task eligible to run, but tasks will always become ripe |
| // before their latest_delayed_run_time(). |
| bool Sequence::DelayedTaskGreater::operator()(const Task& lhs, |
| const Task& rhs) const { |
| TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time(); |
| TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time(); |
| return std::tie(lhs_latest_delayed_run_time, lhs.sequence_num) > |
| std::tie(rhs_latest_delayed_run_time, rhs.sequence_num); |
| } |
| |
| TaskSource::RunStatus Sequence::WillRunTask() { |
| // There should never be a second call to WillRunTask() before DidProcessTask |
| // since the RunStatus is always marked a saturated. |
| DCHECK(!has_worker_); |
| |
| // It's ok to access |has_worker_| outside of a Transaction since |
| // WillRunTask() is externally synchronized, always called in sequence with |
| // TakeTask() and DidProcessTask() and only called if HasReadyTasks(), which |
| // means it won't race with Push[Immediate/Delayed]Task(). |
| has_worker_ = true; |
| return RunStatus::kAllowedSaturated; |
| } |
| |
| bool Sequence::OnBecomeReady() { |
| DCHECK(!has_worker_); |
| // std::memory_order_relaxed is sufficient because no other state is |
| // synchronized with |is_immediate_| outside of |lock_|. |
| return !is_immediate_.exchange(true, std::memory_order_relaxed); |
| } |
| |
| size_t Sequence::GetRemainingConcurrency() const { |
| return 1; |
| } |
| |
| Task Sequence::TakeNextImmediateTask() { |
| Task next_task = std::move(queue_.front()); |
| queue_.pop(); |
| return next_task; |
| } |
| |
| Task Sequence::TakeEarliestTask() { |
| if (queue_.empty()) |
| return delayed_queue_.take_top(); |
| |
| if (delayed_queue_.empty()) |
| return TakeNextImmediateTask(); |
| |
| // Both queues contain at least a task. Decide from which one the task should |
| // be taken. |
| if (queue_.front().queue_time <= |
| delayed_queue_.top().latest_delayed_run_time()) |
| return TakeNextImmediateTask(); |
| |
| return delayed_queue_.take_top(); |
| } |
| |
| void Sequence::UpdateReadyTimes() { |
| DCHECK(!IsEmpty()); |
| if (queue_.empty()) { |
| latest_ready_time_.store(delayed_queue_.top().latest_delayed_run_time(), |
| std::memory_order_relaxed); |
| earliest_ready_time_.store(delayed_queue_.top().earliest_delayed_run_time(), |
| std::memory_order_relaxed); |
| return; |
| } |
| |
| if (delayed_queue_.empty()) { |
| latest_ready_time_.store(queue_.front().queue_time, |
| std::memory_order_relaxed); |
| } else { |
| latest_ready_time_.store( |
| std::min(queue_.front().queue_time, |
| delayed_queue_.top().latest_delayed_run_time()), |
| std::memory_order_relaxed); |
| } |
| earliest_ready_time_.store(TimeTicks(), std::memory_order_relaxed); |
| } |
| |
| Task Sequence::TakeTask(TaskSource::Transaction* transaction) { |
| CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_); |
| AnnotateLockAcquired annotate(lock_); |
| |
| DCHECK(has_worker_); |
| DCHECK(is_immediate_.load(std::memory_order_relaxed)); |
| DCHECK(!queue_.empty() || !delayed_queue_.empty()); |
| |
| auto next_task = TakeEarliestTask(); |
| |
| if (!IsEmpty()) |
| UpdateReadyTimes(); |
| |
| return next_task; |
| } |
| |
| bool Sequence::DidProcessTask(TaskSource::Transaction* transaction) { |
| CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_); |
| AnnotateLockAcquired annotate(lock_); |
| |
| // There should never be a call to DidProcessTask without an associated |
| // WillRunTask(). |
| DCHECK(has_worker_); |
| has_worker_ = false; |
| |
| // See comment on TaskSource::task_runner_ for lifetime management details. |
| if (IsEmpty()) { |
| is_immediate_.store(false, std::memory_order_relaxed); |
| ReleaseTaskRunner(); |
| return false; |
| } |
| |
| // Let the caller re-enqueue this non-empty Sequence regardless of |
| // |run_result| so it can continue churning through this Sequence's tasks and |
| // skip/delete them in the proper scope. |
| return true; |
| } |
| |
| bool Sequence::WillReEnqueue(TimeTicks now, |
| TaskSource::Transaction* transaction) { |
| CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_); |
| AnnotateLockAcquired annotate(lock_); |
| |
| // This should always be called from a worker thread and it will be |
| // called after DidProcessTask(). |
| DCHECK(is_immediate_.load(std::memory_order_relaxed)); |
| |
| bool has_ready_tasks = HasReadyTasks(now); |
| if (!has_ready_tasks) |
| is_immediate_.store(false, std::memory_order_relaxed); |
| |
| return has_ready_tasks; |
| } |
| |
| bool Sequence::DelayedSortKeyWillChange(const Task& delayed_task) const { |
| // If sequence has already been picked up by a worker or moved, no need to |
| // proceed further here. |
| if (is_immediate_.load(std::memory_order_relaxed)) { |
| return false; |
| } |
| |
| if (IsEmpty()) { |
| return true; |
| } |
| |
| return delayed_task.latest_delayed_run_time() < |
| delayed_queue_.top().latest_delayed_run_time(); |
| } |
| |
| bool Sequence::HasReadyTasks(TimeTicks now) const { |
| return now >= TS_UNCHECKED_READ(earliest_ready_time_) |
| .load(std::memory_order_relaxed); |
| } |
| |
| bool Sequence::HasImmediateTasks() const { |
| return !queue_.empty(); |
| } |
| |
| TaskSourceSortKey Sequence::GetSortKey() const { |
| return TaskSourceSortKey( |
| priority_racy(), |
| TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed)); |
| } |
| |
| TimeTicks Sequence::GetDelayedSortKey() const { |
| return TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed); |
| } |
| |
| Task Sequence::Clear(TaskSource::Transaction* transaction) { |
| CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_); |
| AnnotateLockAcquired annotate(lock_); |
| |
| // See comment on TaskSource::task_runner_ for lifetime management details. |
| if (!IsEmpty() && !has_worker_) { |
| ReleaseTaskRunner(); |
| } |
| |
| return Task( |
| FROM_HERE, |
| base::BindOnce( |
| [](base::queue<Task> queue, |
| base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue) { |
| while (!queue.empty()) |
| queue.pop(); |
| |
| while (!delayed_queue.empty()) |
| delayed_queue.pop(); |
| }, |
| std::move(queue_), std::move(delayed_queue_)), |
| TimeTicks(), TimeDelta()); |
| } |
| |
| void Sequence::ReleaseTaskRunner() { |
| if (!task_runner()) |
| return; |
| // No member access after this point, releasing |task_runner()| might delete |
| // |this|. |
| task_runner()->Release(); |
| } |
| |
| Sequence::Sequence(const TaskTraits& traits, |
| TaskRunner* task_runner, |
| TaskSourceExecutionMode execution_mode) |
| : TaskSource(traits, task_runner, execution_mode) {} |
| |
| Sequence::~Sequence() = default; |
| |
| Sequence::Transaction Sequence::BeginTransaction() { |
| return Transaction(this); |
| } |
| |
| ExecutionEnvironment Sequence::GetExecutionEnvironment() { |
| return {token_, &sequence_local_storage_}; |
| } |
| |
| bool Sequence::IsEmpty() const { |
| return queue_.empty() && delayed_queue_.empty(); |
| } |
| |
| } // namespace internal |
| } // namespace base |