blob: 54715687743e594deb7e88468cb8c7af2c63bede [file] [log] [blame]
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
#define BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
#include <memory>
#include <queue>
#include <set>
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/pending_task.h"
#include "base/task/sequence_manager/associated_thread_id.h"
#include "base/task/sequence_manager/enqueue_order.h"
#include "base/task/sequence_manager/intrusive_heap.h"
#include "base/task/sequence_manager/lazily_deallocated_deque.h"
#include "base/task/sequence_manager/sequenced_task_source.h"
#include "base/task/sequence_manager/task_queue.h"
#include "base/threading/thread_checker.h"
#include "base/trace_event/trace_event.h"
#include "base/trace_event/trace_event_argument.h"
#include "starboard/types.h"
namespace base {
namespace sequence_manager {
class LazyNow;
class TimeDomain;
namespace internal {
class SequenceManagerImpl;
class TaskQueueProxy;
class WorkQueue;
class WorkQueueSets;
struct IncomingImmediateWorkList {
IncomingImmediateWorkList* next = nullptr;
TaskQueueImpl* queue = nullptr;
internal::EnqueueOrder order;
};
// TaskQueueImpl has four main queues:
//
// Immediate (non-delayed) tasks:
// |immediate_incoming_queue| - PostTask enqueues tasks here.
// |immediate_work_queue| - SequenceManager takes immediate tasks here.
//
// Delayed tasks
// |delayed_incoming_queue| - PostDelayedTask enqueues tasks here.
// |delayed_work_queue| - SequenceManager takes delayed tasks here.
//
// The |immediate_incoming_queue| can be accessed from any thread, the other
// queues are main-thread only. To reduce the overhead of locking,
// |immediate_work_queue| is swapped with |immediate_incoming_queue| when
// |immediate_work_queue| becomes empty.
//
// Delayed tasks are initially posted to |delayed_incoming_queue| and a wake-up
// is scheduled with the TimeDomain. When the delay has elapsed, the TimeDomain
// calls UpdateDelayedWorkQueue and ready delayed tasks are moved into the
// |delayed_work_queue|. Note the EnqueueOrder (used for ordering) for a delayed
// task is not set until it's moved into the |delayed_work_queue|.
//
// TaskQueueImpl uses the WorkQueueSets and the TaskQueueSelector to implement
// prioritization. Task selection is done by the TaskQueueSelector and when a
// queue is selected, it round-robins between the |immediate_work_queue| and
// |delayed_work_queue|. The reason for this is we want to make sure delayed
// tasks (normally the most common type) don't starve out immediate work.
class BASE_EXPORT TaskQueueImpl {
public:
TaskQueueImpl(SequenceManagerImpl* sequence_manager,
TimeDomain* time_domain,
const TaskQueue::Spec& spec);
~TaskQueueImpl();
// Types of queues TaskQueueImpl is maintaining internally.
enum class WorkQueueType { kImmediate, kDelayed };
// Non-nestable tasks may get deferred but such queue is being maintained on
// SequenceManager side, so we need to keep information how to requeue it.
struct DeferredNonNestableTask {
Task task;
internal::TaskQueueImpl* task_queue;
WorkQueueType work_queue_type;
};
using OnNextWakeUpChangedCallback = RepeatingCallback<void(TimeTicks)>;
using OnTaskStartedHandler =
RepeatingCallback<void(const Task&, const TaskQueue::TaskTiming&)>;
using OnTaskCompletedHandler =
RepeatingCallback<void(const Task&, const TaskQueue::TaskTiming&)>;
// May be called from any thread.
scoped_refptr<SingleThreadTaskRunner> CreateTaskRunner(int task_type) const;
// TaskQueue implementation.
const char* GetName() const;
bool RunsTasksInCurrentSequence() const;
void PostTask(PostedTask task);
// Require a reference to enclosing task queue for lifetime control.
std::unique_ptr<TaskQueue::QueueEnabledVoter> CreateQueueEnabledVoter(
scoped_refptr<TaskQueue> owning_task_queue);
bool IsQueueEnabled() const;
bool IsEmpty() const;
size_t GetNumberOfPendingTasks() const;
bool HasTaskToRunImmediately() const;
Optional<TimeTicks> GetNextScheduledWakeUp();
Optional<DelayedWakeUp> GetNextScheduledWakeUpImpl();
void SetQueuePriority(TaskQueue::QueuePriority priority);
TaskQueue::QueuePriority GetQueuePriority() const;
void AddTaskObserver(MessageLoop::TaskObserver* task_observer);
void RemoveTaskObserver(MessageLoop::TaskObserver* task_observer);
void SetTimeDomain(TimeDomain* time_domain);
TimeDomain* GetTimeDomain() const;
void SetBlameContext(trace_event::BlameContext* blame_context);
void InsertFence(TaskQueue::InsertFencePosition position);
void InsertFenceAt(TimeTicks time);
void RemoveFence();
bool HasActiveFence();
bool BlockedByFence() const;
// Implementation of TaskQueue::SetObserver.
void SetOnNextWakeUpChangedCallback(OnNextWakeUpChangedCallback callback);
void UnregisterTaskQueue();
// Returns true if a (potentially hypothetical) task with the specified
// |enqueue_order| could run on the queue. Must be called from the main
// thread.
bool CouldTaskRun(EnqueueOrder enqueue_order) const;
// Must only be called from the thread this task queue was created on.
void ReloadImmediateWorkQueueIfEmpty();
void AsValueInto(TimeTicks now, trace_event::TracedValue* state) const;
bool GetQuiescenceMonitored() const { return should_monitor_quiescence_; }
bool GetShouldNotifyObservers() const { return should_notify_observers_; }
void NotifyWillProcessTask(const PendingTask& pending_task);
void NotifyDidProcessTask(const PendingTask& pending_task);
// Check for available tasks in immediate work queues.
// Used to check if we need to generate notifications about delayed work.
bool HasPendingImmediateWork();
WorkQueue* delayed_work_queue() {
return main_thread_only().delayed_work_queue.get();
}
const WorkQueue* delayed_work_queue() const {
return main_thread_only().delayed_work_queue.get();
}
WorkQueue* immediate_work_queue() {
return main_thread_only().immediate_work_queue.get();
}
const WorkQueue* immediate_work_queue() const {
return main_thread_only().immediate_work_queue.get();
}
// Protected by SequenceManagerImpl's AnyThread lock.
IncomingImmediateWorkList* immediate_work_list_storage() {
return &immediate_work_list_storage_;
}
// Enqueues any delayed tasks which should be run now on the
// |delayed_work_queue|.
// Must be called from the main thread.
void WakeUpForDelayedWork(LazyNow* lazy_now);
HeapHandle heap_handle() const { return main_thread_only().heap_handle; }
void set_heap_handle(HeapHandle heap_handle) {
main_thread_only().heap_handle = heap_handle;
}
// Pushes |task| onto the front of the specified work queue. Caution must be
// taken with this API because you could easily starve out other work.
// TODO(kraynov): Simplify non-nestable task logic https://crbug.com/845437.
void RequeueDeferredNonNestableTask(DeferredNonNestableTask task);
void PushImmediateIncomingTaskForTest(Task&& task);
class QueueEnabledVoterImpl : public TaskQueue::QueueEnabledVoter {
public:
explicit QueueEnabledVoterImpl(scoped_refptr<TaskQueue> task_queue);
~QueueEnabledVoterImpl() override;
// QueueEnabledVoter implementation.
void SetQueueEnabled(bool enabled) override;
TaskQueueImpl* GetTaskQueueForTest() const {
return task_queue_->GetTaskQueueImpl();
}
private:
friend class TaskQueueImpl;
scoped_refptr<TaskQueue> task_queue_;
bool enabled_;
};
// Iterates over |delayed_incoming_queue| removing canceled tasks.
void SweepCanceledDelayedTasks(TimeTicks now);
// Allows wrapping TaskQueue to set a handler to subscribe for notifications
// about started and completed tasks.
void SetOnTaskStartedHandler(OnTaskStartedHandler handler);
void OnTaskStarted(const Task& task,
const TaskQueue::TaskTiming& task_timing);
void SetOnTaskCompletedHandler(OnTaskCompletedHandler handler);
void OnTaskCompleted(const Task& task,
const TaskQueue::TaskTiming& task_timing);
bool RequiresTaskTiming() const;
WeakPtr<SequenceManagerImpl> GetSequenceManagerWeakPtr();
SequenceManagerImpl* sequence_manager() {
return main_thread_only().sequence_manager;
}
scoped_refptr<GracefulQueueShutdownHelper> GetGracefulQueueShutdownHelper();
// Returns true if this queue is unregistered or task queue manager is deleted
// and this queue can be safely deleted on any thread.
bool IsUnregistered() const;
// Disables queue for testing purposes, when a QueueEnabledVoter can't be
// constructed due to not having TaskQueue.
void SetQueueEnabledForTest(bool enabled);
// TODO(alexclarke): Remove when possible.
void ClearSequenceManagerForTesting();
protected:
void SetDelayedWakeUpForTesting(Optional<DelayedWakeUp> wake_up);
private:
friend class WorkQueue;
friend class WorkQueueTest;
struct AnyThread {
AnyThread(SequenceManagerImpl* sequence_manager, TimeDomain* time_domain);
~AnyThread();
// SequenceManagerImpl, TimeDomain and Observer are maintained in two
// copies: inside AnyThread and inside MainThreadOnly. They can be changed
// only from main thread, so it should be locked before accessing from other
// threads.
SequenceManagerImpl* sequence_manager;
TimeDomain* time_domain;
// Callback corresponding to TaskQueue::Observer::OnQueueNextChanged.
OnNextWakeUpChangedCallback on_next_wake_up_changed_callback;
};
struct MainThreadOnly {
MainThreadOnly(SequenceManagerImpl* sequence_manager,
TaskQueueImpl* task_queue,
TimeDomain* time_domain);
~MainThreadOnly();
// Another copy of SequenceManagerImpl, TimeDomain and Observer
// for lock-free access from the main thread.
// See description inside struct AnyThread for details.
SequenceManagerImpl* sequence_manager;
TimeDomain* time_domain;
// Callback corresponding to TaskQueue::Observer::OnQueueNextChanged.
OnNextWakeUpChangedCallback on_next_wake_up_changed_callback;
std::unique_ptr<WorkQueue> delayed_work_queue;
std::unique_ptr<WorkQueue> immediate_work_queue;
std::priority_queue<Task> delayed_incoming_queue;
ObserverList<MessageLoop::TaskObserver>::Unchecked task_observers;
size_t set_index;
HeapHandle heap_handle;
int is_enabled_refcount;
int voter_refcount;
trace_event::BlameContext* blame_context; // Not owned.
EnqueueOrder current_fence;
Optional<TimeTicks> delayed_fence;
OnTaskStartedHandler on_task_started_handler;
OnTaskCompletedHandler on_task_completed_handler;
// Last reported wake up, used only in UpdateWakeUp to avoid
// excessive calls.
Optional<DelayedWakeUp> scheduled_wake_up;
// If false, queue will be disabled. Used only for tests.
bool is_enabled_for_test;
};
void PostImmediateTaskImpl(PostedTask task);
void PostDelayedTaskImpl(PostedTask task);
// Push the task onto the |delayed_incoming_queue|. Lock-free main thread
// only fast path.
void PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,
TimeTicks now,
bool notify_task_annotator);
// Push the task onto the |delayed_incoming_queue|. Slow path from other
// threads.
void PushOntoDelayedIncomingQueueLocked(Task pending_task);
void ScheduleDelayedWorkTask(Task pending_task);
void MoveReadyImmediateTasksToImmediateWorkQueueLocked();
// Push the task onto the |immediate_incoming_queue| and for auto pumped
// queues it calls MaybePostDoWorkOnMainRunner if the Incoming queue was
// empty.
void PushOntoImmediateIncomingQueueLocked(Task task);
using TaskDeque = LazilyDeallocatedDeque<Task>;
// Extracts all the tasks from the immediate incoming queue and swaps it with
// |queue| which must be empty.
// Can be called from any thread.
void ReloadEmptyImmediateQueue(TaskDeque* queue);
void TraceQueueSize() const;
static void QueueAsValueInto(const TaskDeque& queue,
TimeTicks now,
trace_event::TracedValue* state);
static void QueueAsValueInto(const std::priority_queue<Task>& queue,
TimeTicks now,
trace_event::TracedValue* state);
static void TaskAsValueInto(const Task& task,
TimeTicks now,
trace_event::TracedValue* state);
void RemoveQueueEnabledVoter(const QueueEnabledVoterImpl* voter);
void OnQueueEnabledVoteChanged(bool enabled);
void EnableOrDisableWithSelector(bool enable);
// Schedules delayed work on time domain and calls the observer.
void UpdateDelayedWakeUp(LazyNow* lazy_now);
void UpdateDelayedWakeUpImpl(LazyNow* lazy_now,
Optional<DelayedWakeUp> wake_up);
// Activate a delayed fence if a time has come.
void ActivateDelayedFenceIfNeeded(TimeTicks now);
const char* name_;
scoped_refptr<AssociatedThreadId> associated_thread_;
mutable Lock any_thread_lock_;
AnyThread any_thread_;
struct AnyThread& any_thread() {
any_thread_lock_.AssertAcquired();
return any_thread_;
}
const struct AnyThread& any_thread() const {
any_thread_lock_.AssertAcquired();
return any_thread_;
}
MainThreadOnly main_thread_only_;
MainThreadOnly& main_thread_only() {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
return main_thread_only_;
}
const MainThreadOnly& main_thread_only() const {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
return main_thread_only_;
}
// Proxy which allows TaskQueueTaskRunner to dispatch tasks and it can be
// detached from TaskQueueImpl to leave dangling task runners behind sefely.
const scoped_refptr<TaskQueueProxy> proxy_;
mutable Lock immediate_incoming_queue_lock_;
TaskDeque immediate_incoming_queue_;
TaskDeque& immediate_incoming_queue() {
immediate_incoming_queue_lock_.AssertAcquired();
return immediate_incoming_queue_;
}
const TaskDeque& immediate_incoming_queue() const {
immediate_incoming_queue_lock_.AssertAcquired();
return immediate_incoming_queue_;
}
// Protected by SequenceManagerImpl's AnyThread lock.
IncomingImmediateWorkList immediate_work_list_storage_;
const bool should_monitor_quiescence_;
const bool should_notify_observers_;
DISALLOW_COPY_AND_ASSIGN(TaskQueueImpl);
};
} // namespace internal
} // namespace sequence_manager
} // namespace base
#endif // BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_