// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
#define BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_

#include <memory>
#include <queue>
#include <set>

#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/pending_task.h"
#include "base/task/sequence_manager/associated_thread_id.h"
#include "base/task/sequence_manager/enqueue_order.h"
#include "base/task/sequence_manager/intrusive_heap.h"
#include "base/task/sequence_manager/lazily_deallocated_deque.h"
#include "base/task/sequence_manager/sequenced_task_source.h"
#include "base/task/sequence_manager/task_queue.h"
#include "base/threading/thread_checker.h"
#include "base/trace_event/trace_event.h"
#include "base/trace_event/trace_event_argument.h"
#include "starboard/types.h"

namespace base {
namespace sequence_manager {

class LazyNow;
class TimeDomain;

namespace internal {

class SequenceManagerImpl;
class TaskQueueProxy;
class WorkQueue;
class WorkQueueSets;

struct IncomingImmediateWorkList {
  IncomingImmediateWorkList* next = nullptr;
  TaskQueueImpl* queue = nullptr;
  internal::EnqueueOrder order;
};

// TaskQueueImpl has four main queues:
//
// Immediate (non-delayed) tasks:
//    |immediate_incoming_queue| - PostTask enqueues tasks here.
//    |immediate_work_queue| - SequenceManager takes immediate tasks here.
//
// Delayed tasks
//    |delayed_incoming_queue| - PostDelayedTask enqueues tasks here.
//    |delayed_work_queue| - SequenceManager takes delayed tasks here.
//
// The |immediate_incoming_queue| can be accessed from any thread, the other
// queues are main-thread only. To reduce the overhead of locking,
// |immediate_work_queue| is swapped with |immediate_incoming_queue| when
// |immediate_work_queue| becomes empty.
//
// Delayed tasks are initially posted to |delayed_incoming_queue| and a wake-up
// is scheduled with the TimeDomain.  When the delay has elapsed, the TimeDomain
// calls UpdateDelayedWorkQueue and ready delayed tasks are moved into the
// |delayed_work_queue|. Note the EnqueueOrder (used for ordering) for a delayed
// task is not set until it's moved into the |delayed_work_queue|.
//
// TaskQueueImpl uses the WorkQueueSets and the TaskQueueSelector to implement
// prioritization. Task selection is done by the TaskQueueSelector and when a
// queue is selected, it round-robins between the |immediate_work_queue| and
// |delayed_work_queue|.  The reason for this is we want to make sure delayed
// tasks (normally the most common type) don't starve out immediate work.
class BASE_EXPORT TaskQueueImpl {
 public:
  TaskQueueImpl(SequenceManagerImpl* sequence_manager,
                TimeDomain* time_domain,
                const TaskQueue::Spec& spec);

  ~TaskQueueImpl();

  // Types of queues TaskQueueImpl is maintaining internally.
  enum class WorkQueueType { kImmediate, kDelayed };

  // Non-nestable tasks may get deferred but such queue is being maintained on
  // SequenceManager side, so we need to keep information how to requeue it.
  struct DeferredNonNestableTask {
    Task task;
    internal::TaskQueueImpl* task_queue;
    WorkQueueType work_queue_type;
  };

  using OnNextWakeUpChangedCallback = RepeatingCallback<void(TimeTicks)>;
  using OnTaskStartedHandler =
      RepeatingCallback<void(const Task&, const TaskQueue::TaskTiming&)>;
  using OnTaskCompletedHandler =
      RepeatingCallback<void(const Task&, const TaskQueue::TaskTiming&)>;

  // May be called from any thread.
  scoped_refptr<SingleThreadTaskRunner> CreateTaskRunner(int task_type) const;

  // TaskQueue implementation.
  const char* GetName() const;
  bool RunsTasksInCurrentSequence() const;
  void PostTask(PostedTask task);
  // Require a reference to enclosing task queue for lifetime control.
  std::unique_ptr<TaskQueue::QueueEnabledVoter> CreateQueueEnabledVoter(
      scoped_refptr<TaskQueue> owning_task_queue);
  bool IsQueueEnabled() const;
  bool IsEmpty() const;
  size_t GetNumberOfPendingTasks() const;
  bool HasTaskToRunImmediately() const;
  Optional<TimeTicks> GetNextScheduledWakeUp();
  Optional<DelayedWakeUp> GetNextScheduledWakeUpImpl();
  void SetQueuePriority(TaskQueue::QueuePriority priority);
  TaskQueue::QueuePriority GetQueuePriority() const;
  void AddTaskObserver(MessageLoop::TaskObserver* task_observer);
  void RemoveTaskObserver(MessageLoop::TaskObserver* task_observer);
  void SetTimeDomain(TimeDomain* time_domain);
  TimeDomain* GetTimeDomain() const;
  void SetBlameContext(trace_event::BlameContext* blame_context);
  void InsertFence(TaskQueue::InsertFencePosition position);
  void InsertFenceAt(TimeTicks time);
  void RemoveFence();
  bool HasActiveFence();
  bool BlockedByFence() const;
  // Implementation of TaskQueue::SetObserver.
  void SetOnNextWakeUpChangedCallback(OnNextWakeUpChangedCallback callback);

  void UnregisterTaskQueue();

  // Returns true if a (potentially hypothetical) task with the specified
  // |enqueue_order| could run on the queue. Must be called from the main
  // thread.
  bool CouldTaskRun(EnqueueOrder enqueue_order) const;

  // Must only be called from the thread this task queue was created on.
  void ReloadImmediateWorkQueueIfEmpty();

  void AsValueInto(TimeTicks now, trace_event::TracedValue* state) const;

  bool GetQuiescenceMonitored() const { return should_monitor_quiescence_; }
  bool GetShouldNotifyObservers() const { return should_notify_observers_; }

  void NotifyWillProcessTask(const PendingTask& pending_task);
  void NotifyDidProcessTask(const PendingTask& pending_task);

  // Check for available tasks in immediate work queues.
  // Used to check if we need to generate notifications about delayed work.
  bool HasPendingImmediateWork();

  WorkQueue* delayed_work_queue() {
    return main_thread_only().delayed_work_queue.get();
  }

  const WorkQueue* delayed_work_queue() const {
    return main_thread_only().delayed_work_queue.get();
  }

  WorkQueue* immediate_work_queue() {
    return main_thread_only().immediate_work_queue.get();
  }

  const WorkQueue* immediate_work_queue() const {
    return main_thread_only().immediate_work_queue.get();
  }

  // Protected by SequenceManagerImpl's AnyThread lock.
  IncomingImmediateWorkList* immediate_work_list_storage() {
    return &immediate_work_list_storage_;
  }

  // Enqueues any delayed tasks which should be run now on the
  // |delayed_work_queue|.
  // Must be called from the main thread.
  void WakeUpForDelayedWork(LazyNow* lazy_now);

  HeapHandle heap_handle() const { return main_thread_only().heap_handle; }

  void set_heap_handle(HeapHandle heap_handle) {
    main_thread_only().heap_handle = heap_handle;
  }

  // Pushes |task| onto the front of the specified work queue. Caution must be
  // taken with this API because you could easily starve out other work.
  // TODO(kraynov): Simplify non-nestable task logic https://crbug.com/845437.
  void RequeueDeferredNonNestableTask(DeferredNonNestableTask task);

  void PushImmediateIncomingTaskForTest(Task&& task);

  class QueueEnabledVoterImpl : public TaskQueue::QueueEnabledVoter {
   public:
    explicit QueueEnabledVoterImpl(scoped_refptr<TaskQueue> task_queue);
    ~QueueEnabledVoterImpl() override;

    // QueueEnabledVoter implementation.
    void SetQueueEnabled(bool enabled) override;

    TaskQueueImpl* GetTaskQueueForTest() const {
      return task_queue_->GetTaskQueueImpl();
    }

   private:
    friend class TaskQueueImpl;

    scoped_refptr<TaskQueue> task_queue_;
    bool enabled_;
  };

  // Iterates over |delayed_incoming_queue| removing canceled tasks.
  void SweepCanceledDelayedTasks(TimeTicks now);

  // Allows wrapping TaskQueue to set a handler to subscribe for notifications
  // about started and completed tasks.
  void SetOnTaskStartedHandler(OnTaskStartedHandler handler);
  void OnTaskStarted(const Task& task,
                     const TaskQueue::TaskTiming& task_timing);
  void SetOnTaskCompletedHandler(OnTaskCompletedHandler handler);
  void OnTaskCompleted(const Task& task,
                       const TaskQueue::TaskTiming& task_timing);
  bool RequiresTaskTiming() const;

  WeakPtr<SequenceManagerImpl> GetSequenceManagerWeakPtr();
  SequenceManagerImpl* sequence_manager() {
    return main_thread_only().sequence_manager;
  }

  scoped_refptr<GracefulQueueShutdownHelper> GetGracefulQueueShutdownHelper();

  // Returns true if this queue is unregistered or task queue manager is deleted
  // and this queue can be safely deleted on any thread.
  bool IsUnregistered() const;

  // Disables queue for testing purposes, when a QueueEnabledVoter can't be
  // constructed due to not having TaskQueue.
  void SetQueueEnabledForTest(bool enabled);

  // TODO(alexclarke): Remove when possible.
  void ClearSequenceManagerForTesting();

 protected:
  void SetDelayedWakeUpForTesting(Optional<DelayedWakeUp> wake_up);

 private:
  friend class WorkQueue;
  friend class WorkQueueTest;

  struct AnyThread {
    AnyThread(SequenceManagerImpl* sequence_manager, TimeDomain* time_domain);
    ~AnyThread();

    // SequenceManagerImpl, TimeDomain and Observer are maintained in two
    // copies: inside AnyThread and inside MainThreadOnly. They can be changed
    // only from main thread, so it should be locked before accessing from other
    // threads.
    SequenceManagerImpl* sequence_manager;
    TimeDomain* time_domain;
    // Callback corresponding to TaskQueue::Observer::OnQueueNextChanged.
    OnNextWakeUpChangedCallback on_next_wake_up_changed_callback;
  };

  struct MainThreadOnly {
    MainThreadOnly(SequenceManagerImpl* sequence_manager,
                   TaskQueueImpl* task_queue,
                   TimeDomain* time_domain);
    ~MainThreadOnly();

    // Another copy of SequenceManagerImpl, TimeDomain and Observer
    // for lock-free access from the main thread.
    // See description inside struct AnyThread for details.
    SequenceManagerImpl* sequence_manager;
    TimeDomain* time_domain;
    // Callback corresponding to TaskQueue::Observer::OnQueueNextChanged.
    OnNextWakeUpChangedCallback on_next_wake_up_changed_callback;

    std::unique_ptr<WorkQueue> delayed_work_queue;
    std::unique_ptr<WorkQueue> immediate_work_queue;
    std::priority_queue<Task> delayed_incoming_queue;
    ObserverList<MessageLoop::TaskObserver>::Unchecked task_observers;
    size_t set_index;
    HeapHandle heap_handle;
    int is_enabled_refcount;
    int voter_refcount;
    trace_event::BlameContext* blame_context;  // Not owned.
    EnqueueOrder current_fence;
    Optional<TimeTicks> delayed_fence;
    OnTaskStartedHandler on_task_started_handler;
    OnTaskCompletedHandler on_task_completed_handler;
    // Last reported wake up, used only in UpdateWakeUp to avoid
    // excessive calls.
    Optional<DelayedWakeUp> scheduled_wake_up;
    // If false, queue will be disabled. Used only for tests.
    bool is_enabled_for_test;
  };

  void PostImmediateTaskImpl(PostedTask task);
  void PostDelayedTaskImpl(PostedTask task);

  // Push the task onto the |delayed_incoming_queue|. Lock-free main thread
  // only fast path.
  void PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,
                                                  TimeTicks now,
                                                  bool notify_task_annotator);

  // Push the task onto the |delayed_incoming_queue|.  Slow path from other
  // threads.
  void PushOntoDelayedIncomingQueueLocked(Task pending_task);

  void ScheduleDelayedWorkTask(Task pending_task);

  void MoveReadyImmediateTasksToImmediateWorkQueueLocked();

  // Push the task onto the |immediate_incoming_queue| and for auto pumped
  // queues it calls MaybePostDoWorkOnMainRunner if the Incoming queue was
  // empty.
  void PushOntoImmediateIncomingQueueLocked(Task task);

  using TaskDeque = LazilyDeallocatedDeque<Task>;

  // Extracts all the tasks from the immediate incoming queue and swaps it with
  // |queue| which must be empty.
  // Can be called from any thread.
  void ReloadEmptyImmediateQueue(TaskDeque* queue);

  void TraceQueueSize() const;
  static void QueueAsValueInto(const TaskDeque& queue,
                               TimeTicks now,
                               trace_event::TracedValue* state);
  static void QueueAsValueInto(const std::priority_queue<Task>& queue,
                               TimeTicks now,
                               trace_event::TracedValue* state);
  static void TaskAsValueInto(const Task& task,
                              TimeTicks now,
                              trace_event::TracedValue* state);

  void RemoveQueueEnabledVoter(const QueueEnabledVoterImpl* voter);
  void OnQueueEnabledVoteChanged(bool enabled);
  void EnableOrDisableWithSelector(bool enable);

  // Schedules delayed work on time domain and calls the observer.
  void UpdateDelayedWakeUp(LazyNow* lazy_now);
  void UpdateDelayedWakeUpImpl(LazyNow* lazy_now,
                               Optional<DelayedWakeUp> wake_up);

  // Activate a delayed fence if a time has come.
  void ActivateDelayedFenceIfNeeded(TimeTicks now);

  const char* name_;

  scoped_refptr<AssociatedThreadId> associated_thread_;

  mutable Lock any_thread_lock_;
  AnyThread any_thread_;
  struct AnyThread& any_thread() {
    any_thread_lock_.AssertAcquired();
    return any_thread_;
  }
  const struct AnyThread& any_thread() const {
    any_thread_lock_.AssertAcquired();
    return any_thread_;
  }

  MainThreadOnly main_thread_only_;
  MainThreadOnly& main_thread_only() {
    DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
    return main_thread_only_;
  }
  const MainThreadOnly& main_thread_only() const {
    DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
    return main_thread_only_;
  }

  // Proxy which allows TaskQueueTaskRunner to dispatch tasks and it can be
  // detached from TaskQueueImpl to leave dangling task runners behind sefely.
  const scoped_refptr<TaskQueueProxy> proxy_;

  mutable Lock immediate_incoming_queue_lock_;
  TaskDeque immediate_incoming_queue_;
  TaskDeque& immediate_incoming_queue() {
    immediate_incoming_queue_lock_.AssertAcquired();
    return immediate_incoming_queue_;
  }
  const TaskDeque& immediate_incoming_queue() const {
    immediate_incoming_queue_lock_.AssertAcquired();
    return immediate_incoming_queue_;
  }

  // Protected by SequenceManagerImpl's AnyThread lock.
  IncomingImmediateWorkList immediate_work_list_storage_;

  const bool should_monitor_quiescence_;
  const bool should_notify_observers_;

  DISALLOW_COPY_AND_ASSIGN(TaskQueueImpl);
};

}  // namespace internal
}  // namespace sequence_manager
}  // namespace base

#endif  // BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
