| // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "base/threading/sequenced_worker_pool.h" |
| |
| #include <list> |
| #include <map> |
| #include <set> |
| #include <utility> |
| #include <vector> |
| |
| #include "base/atomicops.h" |
| #include "base/bind.h" |
| #include "base/callback.h" |
| #include "base/compiler_specific.h" |
| #include "base/critical_closure.h" |
| #include "base/debug/trace_event.h" |
| #include "base/logging.h" |
| #include "base/memory/linked_ptr.h" |
| #include "base/message_loop_proxy.h" |
| #include "base/metrics/histogram.h" |
| #include "base/stl_util.h" |
| #include "base/stringprintf.h" |
| #include "base/synchronization/condition_variable.h" |
| #include "base/synchronization/lock.h" |
| #include "base/synchronization/waitable_event.h" |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/simple_thread.h" |
| #include "base/threading/thread_restrictions.h" |
| #include "base/time.h" |
| #include "base/tracked_objects.h" |
| |
| #if defined(OS_MACOSX) |
| #include "base/mac/scoped_nsautorelease_pool.h" |
| #endif |
| |
| namespace base { |
| |
| namespace { |
| |
| struct SequencedTask : public TrackingInfo { |
| SequencedTask() |
| : sequence_token_id(0), |
| trace_id(0), |
| sequence_task_number(0), |
| shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} |
| |
| explicit SequencedTask(const tracked_objects::Location& from_here) |
| : base::TrackingInfo(from_here, TimeTicks()), |
| sequence_token_id(0), |
| trace_id(0), |
| sequence_task_number(0), |
| shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} |
| |
| ~SequencedTask() {} |
| |
| int sequence_token_id; |
| int trace_id; |
| int64 sequence_task_number; |
| SequencedWorkerPool::WorkerShutdown shutdown_behavior; |
| tracked_objects::Location posted_from; |
| Closure task; |
| |
| // Non-delayed tasks and delayed tasks are managed together by time-to-run |
| // order. We calculate the time by adding the posted time and the given delay. |
| TimeTicks time_to_run; |
| }; |
| |
| struct SequencedTaskLessThan { |
| public: |
| bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { |
| if (lhs.time_to_run < rhs.time_to_run) |
| return true; |
| |
| if (lhs.time_to_run > rhs.time_to_run) |
| return false; |
| |
| // If the time happen to match, then we use the sequence number to decide. |
| return lhs.sequence_task_number < rhs.sequence_task_number; |
| } |
| }; |
| |
| // SequencedWorkerPoolTaskRunner --------------------------------------------- |
| // A TaskRunner which posts tasks to a SequencedWorkerPool with a |
| // fixed ShutdownBehavior. |
| // |
| // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). |
| class SequencedWorkerPoolTaskRunner : public TaskRunner { |
| public: |
| SequencedWorkerPoolTaskRunner( |
| const scoped_refptr<SequencedWorkerPool>& pool, |
| SequencedWorkerPool::WorkerShutdown shutdown_behavior); |
| |
| // TaskRunner implementation |
| virtual bool PostDelayedTask(const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay) OVERRIDE; |
| #if defined(COBALT) |
| virtual bool PostBlockingTask(const tracked_objects::Location& from_here, |
| const Closure& task) OVERRIDE; |
| #endif |
| virtual bool RunsTasksOnCurrentThread() const OVERRIDE; |
| |
| private: |
| virtual ~SequencedWorkerPoolTaskRunner(); |
| |
| const scoped_refptr<SequencedWorkerPool> pool_; |
| |
| const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; |
| |
| DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); |
| }; |
| |
| SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( |
| const scoped_refptr<SequencedWorkerPool>& pool, |
| SequencedWorkerPool::WorkerShutdown shutdown_behavior) |
| : pool_(pool), |
| shutdown_behavior_(shutdown_behavior) { |
| } |
| |
| SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { |
| } |
| |
| bool SequencedWorkerPoolTaskRunner::PostDelayedTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay) { |
| if (delay == TimeDelta()) { |
| return pool_->PostWorkerTaskWithShutdownBehavior( |
| from_here, task, shutdown_behavior_); |
| } |
| return pool_->PostDelayedWorkerTask(from_here, task, delay); |
| } |
| |
| #if defined(COBALT) |
| bool SequencedWorkerPoolTaskRunner::PostBlockingTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task) { |
| return pool_->PostBlockingTask(from_here, task); |
| } |
| #endif |
| |
| bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { |
| return pool_->RunsTasksOnCurrentThread(); |
| } |
| |
| // SequencedWorkerPoolSequencedTaskRunner ------------------------------------ |
| // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a |
| // fixed sequence token. |
| // |
| // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). |
| class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { |
| public: |
| SequencedWorkerPoolSequencedTaskRunner( |
| const scoped_refptr<SequencedWorkerPool>& pool, |
| SequencedWorkerPool::SequenceToken token, |
| SequencedWorkerPool::WorkerShutdown shutdown_behavior); |
| |
| // TaskRunner implementation |
| virtual bool PostDelayedTask(const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay) OVERRIDE; |
| #if defined(COBALT) |
| virtual bool PostBlockingTask(const tracked_objects::Location& from_here, |
| const Closure& task) OVERRIDE; |
| #endif |
| virtual bool RunsTasksOnCurrentThread() const OVERRIDE; |
| |
| // SequencedTaskRunner implementation |
| virtual bool PostNonNestableDelayedTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay) OVERRIDE; |
| |
| private: |
| virtual ~SequencedWorkerPoolSequencedTaskRunner(); |
| |
| const scoped_refptr<SequencedWorkerPool> pool_; |
| |
| const SequencedWorkerPool::SequenceToken token_; |
| |
| const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; |
| |
| DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner); |
| }; |
| |
| SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner( |
| const scoped_refptr<SequencedWorkerPool>& pool, |
| SequencedWorkerPool::SequenceToken token, |
| SequencedWorkerPool::WorkerShutdown shutdown_behavior) |
| : pool_(pool), |
| token_(token), |
| shutdown_behavior_(shutdown_behavior) { |
| } |
| |
| SequencedWorkerPoolSequencedTaskRunner:: |
| ~SequencedWorkerPoolSequencedTaskRunner() { |
| } |
| |
| bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay) { |
| if (delay == TimeDelta()) { |
| return pool_->PostSequencedWorkerTaskWithShutdownBehavior( |
| token_, from_here, task, shutdown_behavior_); |
| } |
| return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay); |
| } |
| |
| #if defined(COBALT) |
| bool SequencedWorkerPoolSequencedTaskRunner::PostBlockingTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task) { |
| return pool_->PostBlockingTask(from_here, task); |
| } |
| #endif |
| |
| bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const { |
| return pool_->IsRunningSequenceOnCurrentThread(token_); |
| } |
| |
| bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay) { |
| // There's no way to run nested tasks, so simply forward to |
| // PostDelayedTask. |
| return PostDelayedTask(from_here, task, delay); |
| } |
| |
| // Create a process-wide unique ID to represent this task in trace events. This |
| // will be mangled with a Process ID hash to reduce the likelyhood of colliding |
| // with MessageLoop pointers on other processes. |
| uint64 GetTaskTraceID(const SequencedTask& task, |
| void* pool) { |
| return (static_cast<uint64>(task.trace_id) << 32) | |
| static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); |
| } |
| |
| } // namespace |
| |
| // Worker --------------------------------------------------------------------- |
| |
| class SequencedWorkerPool::Worker : public SimpleThread { |
| public: |
| // Hold a (cyclic) ref to |worker_pool|, since we want to keep it |
| // around as long as we are running. |
| Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, |
| int thread_number, |
| const std::string& thread_name_prefix); |
| virtual ~Worker(); |
| |
| // SimpleThread implementation. This actually runs the background thread. |
| virtual void Run() OVERRIDE; |
| |
| void set_running_sequence(SequenceToken token) { |
| running_sequence_ = token; |
| } |
| |
| SequenceToken running_sequence() const { |
| return running_sequence_; |
| } |
| |
| private: |
| scoped_refptr<SequencedWorkerPool> worker_pool_; |
| SequenceToken running_sequence_; |
| |
| DISALLOW_COPY_AND_ASSIGN(Worker); |
| }; |
| |
| // Inner ---------------------------------------------------------------------- |
| |
| class SequencedWorkerPool::Inner { |
| public: |
| // Take a raw pointer to |worker| to avoid cycles (since we're owned |
| // by it). |
| Inner(SequencedWorkerPool* worker_pool, size_t max_threads, |
| const std::string& thread_name_prefix, |
| TestingObserver* observer); |
| |
| ~Inner(); |
| |
| SequenceToken GetSequenceToken(); |
| |
| SequenceToken GetNamedSequenceToken(const std::string& name); |
| |
| // This function accepts a name and an ID. If the name is null, the |
| // token ID is used. This allows us to implement the optional name lookup |
| // from a single function without having to enter the lock a separate time. |
| bool PostTask(const std::string* optional_token_name, |
| SequenceToken sequence_token, |
| WorkerShutdown shutdown_behavior, |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay); |
| |
| #if defined(COBALT) |
| bool PostBlockingTask(const std::string* optional_token_name, |
| SequenceToken sequence_token, |
| WorkerShutdown shutdown_behavior, |
| const tracked_objects::Location& from_here, |
| const Closure& task); |
| #endif |
| |
| bool RunsTasksOnCurrentThread() const; |
| |
| bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
| |
| void FlushForTesting(); |
| |
| void SignalHasWorkForTesting(); |
| |
| int GetWorkSignalCountForTesting() const; |
| |
| void Shutdown(); |
| |
| // Runs the worker loop on the background thread. |
| void ThreadLoop(Worker* this_worker); |
| |
| private: |
| enum GetWorkStatus { |
| GET_WORK_FOUND, |
| GET_WORK_NOT_FOUND, |
| GET_WORK_WAIT, |
| }; |
| |
| // Returns whether there are no more pending tasks and all threads |
| // are idle. Must be called under lock. |
| bool IsIdle() const; |
| |
| // Called from within the lock, this converts the given token name into a |
| // token ID, creating a new one if necessary. |
| int LockedGetNamedTokenID(const std::string& name); |
| |
| // Called from within the lock, this returns the next sequence task number. |
| int64 LockedGetNextSequenceTaskNumber(); |
| |
| // Gets new task. There are 3 cases depending on the return value: |
| // |
| // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should |
| // be run immediately. |
| // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, |
| // and |task| is not filled in. In this case, the caller should wait until |
| // a task is posted. |
| // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run |
| // immediately, and |task| is not filled in. Likewise, |wait_time| is |
| // filled in the time to wait until the next task to run. In this case, the |
| // caller should wait the time. |
| // |
| // In any case, the calling code should clear the given |
| // delete_these_outside_lock vector the next time the lock is released. |
| // See the implementation for a more detailed description. |
| GetWorkStatus GetWork(SequencedTask* task, |
| TimeDelta* wait_time, |
| std::vector<Closure>* delete_these_outside_lock); |
| |
| // Peforms init and cleanup around running the given task. WillRun... |
| // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
| // The calling code should call FinishStartingAdditionalThread once the |
| // lock is released if the return values is nonzero. |
| int WillRunWorkerTask(const SequencedTask& task); |
| void DidRunWorkerTask(const SequencedTask& task); |
| |
| // Returns true if there are no threads currently running the given |
| // sequence token. |
| bool IsSequenceTokenRunnable(int sequence_token_id) const; |
| |
| // Checks if all threads are busy and the addition of one more could run an |
| // additional task waiting in the queue. This must be called from within |
| // the lock. |
| // |
| // If another thread is helpful, this will mark the thread as being in the |
| // process of starting and returns the index of the new thread which will be |
| // 0 or more. The caller should then call FinishStartingAdditionalThread to |
| // complete initialization once the lock is released. |
| // |
| // If another thread is not necessary, returne 0; |
| // |
| // See the implementedion for more. |
| int PrepareToStartAdditionalThreadIfHelpful(); |
| |
| // The second part of thread creation after |
| // PrepareToStartAdditionalThreadIfHelpful with the thread number it |
| // generated. This actually creates the thread and should be called outside |
| // the lock to avoid blocking important work starting a thread in the lock. |
| void FinishStartingAdditionalThread(int thread_number); |
| |
| // Signal |has_work_| and increment |has_work_signal_count_|. |
| void SignalHasWork(); |
| |
| // Checks whether there is work left that's blocking shutdown. Must be |
| // called inside the lock. |
| bool CanShutdown() const; |
| |
| SequencedWorkerPool* const worker_pool_; |
| |
| // The last sequence number used. Managed by GetSequenceToken, since this |
| // only does threadsafe increment operations, you do not need to hold the |
| // lock. |
| volatile subtle::Atomic32 last_sequence_number_; |
| |
| // This lock protects |everything in this class|. Do not read or modify |
| // anything without holding this lock. Do not block while holding this |
| // lock. |
| mutable Lock lock_; |
| |
| // Condition variable that is waited on by worker threads until new |
| // tasks are posted or shutdown starts. |
| ConditionVariable has_work_cv_; |
| |
| // Condition variable that is waited on by non-worker threads (in |
| // FlushForTesting()) until IsIdle() goes to true. |
| ConditionVariable is_idle_cv_; |
| |
| // Condition variable that is waited on by non-worker threads (in |
| // Shutdown()) until CanShutdown() goes to true. |
| ConditionVariable can_shutdown_cv_; |
| |
| // The maximum number of worker threads we'll create. |
| const size_t max_threads_; |
| |
| const std::string thread_name_prefix_; |
| |
| // Associates all known sequence token names with their IDs. |
| std::map<std::string, int> named_sequence_tokens_; |
| |
| // Owning pointers to all threads we've created so far, indexed by |
| // ID. Since we lazily create threads, this may be less than |
| // max_threads_ and will be initially empty. |
| typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap; |
| ThreadMap threads_; |
| |
| // Set to true when we're in the process of creating another thread. |
| // See PrepareToStartAdditionalThreadIfHelpful for more. |
| bool thread_being_created_; |
| |
| // Number of threads currently waiting for work. |
| size_t waiting_thread_count_; |
| |
| // Number of threads currently running tasks that have the BLOCK_SHUTDOWN |
| // or SKIP_ON_SHUTDOWN flag set. |
| size_t blocking_shutdown_thread_count_; |
| |
| // A set of all pending tasks in time-to-run order. These are tasks that are |
| // either waiting for a thread to run on, waiting for their time to run, |
| // or blocked on a previous task in their sequence. We have to iterate over |
| // the tasks by time-to-run order, so we use the set instead of the |
| // traditional priority_queue. |
| typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet; |
| PendingTaskSet pending_tasks_; |
| |
| // The next sequence number for a new sequenced task. |
| int64 next_sequence_task_number_; |
| |
| // Number of tasks in the pending_tasks_ list that are marked as blocking |
| // shutdown. |
| size_t blocking_shutdown_pending_task_count_; |
| |
| // Lists all sequence tokens currently executing. |
| std::set<int> current_sequences_; |
| |
| // An ID for each posted task to distinguish the task from others in traces. |
| int trace_id_; |
| |
| // Set when Shutdown is called and no further tasks should be |
| // allowed, though we may still be running existing tasks. |
| bool shutdown_called_; |
| |
| TestingObserver* const testing_observer_; |
| |
| DISALLOW_COPY_AND_ASSIGN(Inner); |
| }; |
| |
| // Worker definitions --------------------------------------------------------- |
| |
| SequencedWorkerPool::Worker::Worker( |
| const scoped_refptr<SequencedWorkerPool>& worker_pool, |
| int thread_number, |
| const std::string& prefix) |
| : SimpleThread( |
| prefix + StringPrintf("Worker%d", thread_number).c_str()), |
| worker_pool_(worker_pool) { |
| Start(); |
| } |
| |
| SequencedWorkerPool::Worker::~Worker() { |
| } |
| |
| void SequencedWorkerPool::Worker::Run() { |
| // Just jump back to the Inner object to run the thread, since it has all the |
| // tracking information and queues. It might be more natural to implement |
| // using DelegateSimpleThread and have Inner implement the Delegate to avoid |
| // having these worker objects at all, but that method lacks the ability to |
| // send thread-specific information easily to the thread loop. |
| worker_pool_->inner_->ThreadLoop(this); |
| // Release our cyclic reference once we're done. |
| worker_pool_ = NULL; |
| } |
| |
| // Inner definitions --------------------------------------------------------- |
| |
| SequencedWorkerPool::Inner::Inner( |
| SequencedWorkerPool* worker_pool, |
| size_t max_threads, |
| const std::string& thread_name_prefix, |
| TestingObserver* observer) |
| : worker_pool_(worker_pool), |
| last_sequence_number_(0), |
| lock_(), |
| has_work_cv_(&lock_), |
| is_idle_cv_(&lock_), |
| can_shutdown_cv_(&lock_), |
| max_threads_(max_threads), |
| thread_name_prefix_(thread_name_prefix), |
| thread_being_created_(false), |
| waiting_thread_count_(0), |
| blocking_shutdown_thread_count_(0), |
| next_sequence_task_number_(0), |
| blocking_shutdown_pending_task_count_(0), |
| trace_id_(0), |
| shutdown_called_(false), |
| testing_observer_(observer) {} |
| |
| SequencedWorkerPool::Inner::~Inner() { |
| // You must call Shutdown() before destroying the pool. |
| DCHECK(shutdown_called_); |
| |
| // Need to explicitly join with the threads before they're destroyed or else |
| // they will be running when our object is half torn down. |
| for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) |
| it->second->Join(); |
| threads_.clear(); |
| |
| if (testing_observer_) |
| testing_observer_->OnDestruct(); |
| } |
| |
| SequencedWorkerPool::SequenceToken |
| SequencedWorkerPool::Inner::GetSequenceToken() { |
| subtle::Atomic32 result = |
| subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); |
| return SequenceToken(static_cast<int>(result)); |
| } |
| |
| SequencedWorkerPool::SequenceToken |
| SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { |
| AutoLock lock(lock_); |
| return SequenceToken(LockedGetNamedTokenID(name)); |
| } |
| |
| bool SequencedWorkerPool::Inner::PostTask( |
| const std::string* optional_token_name, |
| SequenceToken sequence_token, |
| WorkerShutdown shutdown_behavior, |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay) { |
| DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); |
| SequencedTask sequenced(from_here); |
| sequenced.sequence_token_id = sequence_token.id_; |
| sequenced.shutdown_behavior = shutdown_behavior; |
| sequenced.posted_from = from_here; |
| sequenced.task = |
| shutdown_behavior == BLOCK_SHUTDOWN ? |
| base::MakeCriticalClosure(task) : task; |
| sequenced.time_to_run = TimeTicks::Now() + delay; |
| |
| int create_thread_id = 0; |
| { |
| AutoLock lock(lock_); |
| if (shutdown_called_) |
| return false; |
| |
| // The trace_id is used for identifying the task in about:tracing. |
| sequenced.trace_id = trace_id_++; |
| |
| TRACE_EVENT_FLOW_BEGIN0("task", "SequencedWorkerPool::PostTask", |
| TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this)))); |
| |
| sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); |
| |
| // Now that we have the lock, apply the named token rules. |
| if (optional_token_name) |
| sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| |
| pending_tasks_.insert(sequenced); |
| if (shutdown_behavior == BLOCK_SHUTDOWN) |
| blocking_shutdown_pending_task_count_++; |
| |
| create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
| } |
| |
| // Actually start the additional thread or signal an existing one now that |
| // we're outside the lock. |
| if (create_thread_id) |
| FinishStartingAdditionalThread(create_thread_id); |
| else |
| SignalHasWork(); |
| |
| return true; |
| } |
| |
| #if defined(COBALT) |
| namespace { |
| // Runs the given task, and then signals the given WaitableEvent. |
| void RunAndSignal(const base::Closure& task, base::WaitableEvent* event) { |
| task.Run(); |
| event->Signal(); |
| } |
| } // namespace |
| |
| bool SequencedWorkerPool::Inner::PostBlockingTask( |
| const std::string* optional_token_name, |
| SequenceToken sequence_token, |
| WorkerShutdown shutdown_behavior, |
| const tracked_objects::Location& from_here, |
| const Closure& task) { |
| DCHECK(!IsRunningSequenceOnCurrentThread(sequence_token)) |
| << "PostBlockingTask cannot be called from a SequenceWorkerPool's own " |
| << "thread." << from_here.ToString(); |
| WaitableEvent task_finished(false /* automatic reset */, |
| false /* initially unsignaled */); |
| bool posted = PostTask( |
| optional_token_name, |
| sequence_token, |
| shutdown_behavior, |
| from_here, |
| Bind(&RunAndSignal, task, Unretained(&task_finished)), |
| TimeDelta()); |
| if (!posted) { |
| return false; |
| } |
| |
| // Wait for the task to complete before proceeding. |
| task_finished.Wait(); |
| return true; |
| } |
| #endif |
| |
| bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| AutoLock lock(lock_); |
| return ContainsKey(threads_, PlatformThread::CurrentId()); |
| } |
| |
| bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| SequenceToken sequence_token) const { |
| AutoLock lock(lock_); |
| ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| if (found == threads_.end()) |
| return false; |
| return found->second->running_sequence().Equals(sequence_token); |
| } |
| |
| void SequencedWorkerPool::Inner::FlushForTesting() { |
| AutoLock lock(lock_); |
| while (!IsIdle()) |
| is_idle_cv_.Wait(); |
| } |
| |
| void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { |
| SignalHasWork(); |
| } |
| |
| void SequencedWorkerPool::Inner::Shutdown() { |
| // Mark us as terminated and go through and drop all tasks that aren't |
| // required to run on shutdown. Since no new tasks will get posted once the |
| // terminated flag is set, this ensures that all remaining tasks are required |
| // for shutdown whenever the termianted_ flag is set. |
| { |
| AutoLock lock(lock_); |
| |
| if (shutdown_called_) |
| return; |
| shutdown_called_ = true; |
| |
| // Tickle the threads. This will wake up a waiting one so it will know that |
| // it can exit, which in turn will wake up any other waiting ones. |
| SignalHasWork(); |
| |
| // There are no pending or running tasks blocking shutdown, we're done. |
| if (CanShutdown()) |
| return; |
| } |
| |
| // If we're here, then something is blocking shutdown. So wait for |
| // CanShutdown() to go to true. |
| |
| if (testing_observer_) |
| testing_observer_->WillWaitForShutdown(); |
| |
| TimeTicks shutdown_wait_begin = TimeTicks::Now(); |
| |
| { |
| base::ThreadRestrictions::ScopedAllowWait allow_wait; |
| AutoLock lock(lock_); |
| while (!CanShutdown()) |
| can_shutdown_cv_.Wait(); |
| } |
| UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", |
| TimeTicks::Now() - shutdown_wait_begin); |
| } |
| |
| void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
| { |
| AutoLock lock(lock_); |
| DCHECK(thread_being_created_); |
| thread_being_created_ = false; |
| std::pair<ThreadMap::iterator, bool> result = |
| threads_.insert( |
| std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); |
| DCHECK(result.second); |
| |
| while (true) { |
| #if defined(OS_MACOSX) |
| base::mac::ScopedNSAutoreleasePool autorelease_pool; |
| #endif |
| |
| // See GetWork for what delete_these_outside_lock is doing. |
| SequencedTask task; |
| TimeDelta wait_time; |
| std::vector<Closure> delete_these_outside_lock; |
| GetWorkStatus status = |
| GetWork(&task, &wait_time, &delete_these_outside_lock); |
| if (status == GET_WORK_FOUND) { |
| TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", |
| TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); |
| TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", |
| "src_file", task.posted_from.file_name(), |
| "src_func", task.posted_from.function_name()); |
| int new_thread_id = WillRunWorkerTask(task); |
| { |
| AutoUnlock unlock(lock_); |
| // There may be more work available, so wake up another |
| // worker thread. (Technically not required, since we |
| // already get a signal for each new task, but it doesn't |
| // hurt.) |
| SignalHasWork(); |
| delete_these_outside_lock.clear(); |
| |
| // Complete thread creation outside the lock if necessary. |
| if (new_thread_id) |
| FinishStartingAdditionalThread(new_thread_id); |
| |
| this_worker->set_running_sequence( |
| SequenceToken(task.sequence_token_id)); |
| |
| tracked_objects::TrackedTime start_time = |
| tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally); |
| |
| task.task.Run(); |
| |
| tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task, |
| start_time, tracked_objects::ThreadData::NowForEndOfRun()); |
| |
| this_worker->set_running_sequence(SequenceToken()); |
| |
| // Make sure our task is erased outside the lock for the same reason |
| // we do this with delete_these_oustide_lock. |
| task.task = Closure(); |
| } |
| DidRunWorkerTask(task); // Must be done inside the lock. |
| } else { |
| // When we're terminating and there's no more work, we can |
| // shut down. You can't get more tasks posted once |
| // shutdown_called_ is set. There may be some tasks stuck |
| // behind running ones with the same sequence token, but |
| // additional threads won't help this case. |
| if (shutdown_called_ && |
| blocking_shutdown_pending_task_count_ == 0) |
| break; |
| waiting_thread_count_++; |
| // This is the only time that IsIdle() can go to true. |
| if (IsIdle()) |
| is_idle_cv_.Signal(); |
| |
| switch (status) { |
| case GET_WORK_NOT_FOUND: |
| has_work_cv_.Wait(); |
| break; |
| case GET_WORK_WAIT: |
| has_work_cv_.TimedWait(wait_time); |
| break; |
| default: |
| NOTREACHED(); |
| } |
| waiting_thread_count_--; |
| } |
| } |
| } // Release lock_. |
| |
| // We noticed we should exit. Wake up the next worker so it knows it should |
| // exit as well (because the Shutdown() code only signals once). |
| SignalHasWork(); |
| |
| // Possibly unblock shutdown. |
| can_shutdown_cv_.Signal(); |
| } |
| |
| bool SequencedWorkerPool::Inner::IsIdle() const { |
| lock_.AssertAcquired(); |
| return pending_tasks_.empty() && waiting_thread_count_ == threads_.size(); |
| } |
| |
| int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
| const std::string& name) { |
| lock_.AssertAcquired(); |
| DCHECK(!name.empty()); |
| |
| std::map<std::string, int>::const_iterator found = |
| named_sequence_tokens_.find(name); |
| if (found != named_sequence_tokens_.end()) |
| return found->second; // Got an existing one. |
| |
| // Create a new one for this name. |
| SequenceToken result = GetSequenceToken(); |
| named_sequence_tokens_.insert(std::make_pair(name, result.id_)); |
| return result.id_; |
| } |
| |
| int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { |
| lock_.AssertAcquired(); |
| // We assume that we never create enough tasks to wrap around. |
| return next_sequence_task_number_++; |
| } |
| |
| SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( |
| SequencedTask* task, |
| TimeDelta* wait_time, |
| std::vector<Closure>* delete_these_outside_lock) { |
| lock_.AssertAcquired(); |
| |
| UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", |
| static_cast<int>(pending_tasks_.size())); |
| |
| // Find the next task with a sequence token that's not currently in use. |
| // If the token is in use, that means another thread is running something |
| // in that sequence, and we can't run it without going out-of-order. |
| // |
| // This algorithm is simple and fair, but inefficient in some cases. For |
| // example, say somebody schedules 1000 slow tasks with the same sequence |
| // number. We'll have to go through all those tasks each time we feel like |
| // there might be work to schedule. If this proves to be a problem, we |
| // should make this more efficient. |
| // |
| // One possible enhancement would be to keep a map from sequence ID to a |
| // list of pending but currently blocked SequencedTasks for that ID. |
| // When a worker finishes a task of one sequence token, it can pick up the |
| // next one from that token right away. |
| // |
| // This may lead to starvation if there are sufficient numbers of sequences |
| // in use. To alleviate this, we could add an incrementing priority counter |
| // to each SequencedTask. Then maintain a priority_queue of all runnable |
| // tasks, sorted by priority counter. When a sequenced task is completed |
| // we would pop the head element off of that tasks pending list and add it |
| // to the priority queue. Then we would run the first item in the priority |
| // queue. |
| |
| GetWorkStatus status = GET_WORK_NOT_FOUND; |
| int unrunnable_tasks = 0; |
| PendingTaskSet::iterator i = pending_tasks_.begin(); |
| // We assume that the loop below doesn't take too long and so we can just do |
| // a single call to TimeTicks::Now(). |
| const TimeTicks current_time = TimeTicks::Now(); |
| while (i != pending_tasks_.end()) { |
| if (!IsSequenceTokenRunnable(i->sequence_token_id)) { |
| unrunnable_tasks++; |
| ++i; |
| continue; |
| } |
| |
| if (i->time_to_run > current_time) { |
| // The time to run has not come yet. |
| *wait_time = i->time_to_run - current_time; |
| status = GET_WORK_WAIT; |
| break; |
| } |
| |
| if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { |
| // We're shutting down and the task we just found isn't blocking |
| // shutdown. Delete it and get more work. |
| // |
| // Note that we do not want to delete unrunnable tasks. Deleting a task |
| // can have side effects (like freeing some objects) and deleting a |
| // task that's supposed to run after one that's currently running could |
| // cause an obscure crash. |
| // |
| // We really want to delete these tasks outside the lock in case the |
| // closures are holding refs to objects that want to post work from |
| // their destructorss (which would deadlock). The closures are |
| // internally refcounted, so we just need to keep a copy of them alive |
| // until the lock is exited. The calling code can just clear() the |
| // vector they passed to us once the lock is exited to make this |
| // happen. |
| delete_these_outside_lock->push_back(i->task); |
| pending_tasks_.erase(i++); |
| } else { |
| // Found a runnable task. |
| *task = *i; |
| pending_tasks_.erase(i); |
| if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
| blocking_shutdown_pending_task_count_--; |
| } |
| |
| status = GET_WORK_FOUND; |
| break; |
| } |
| } |
| |
| // Track the number of tasks we had to skip over to see if we should be |
| // making this more efficient. If this number ever becomes large or is |
| // frequently "some", we should consider the optimization above. |
| UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", |
| unrunnable_tasks); |
| return status; |
| } |
| |
| int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { |
| lock_.AssertAcquired(); |
| |
| // Mark the task's sequence number as in use. |
| if (task.sequence_token_id) |
| current_sequences_.insert(task.sequence_token_id); |
| |
| // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN |
| // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread |
| // completes. |
| if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) |
| blocking_shutdown_thread_count_++; |
| |
| // We just picked up a task. Since StartAdditionalThreadIfHelpful only |
| // creates a new thread if there is no free one, there is a race when posting |
| // tasks that many tasks could have been posted before a thread started |
| // running them, so only one thread would have been created. So we also check |
| // whether we should create more threads after removing our task from the |
| // queue, which also has the nice side effect of creating the workers from |
| // background threads rather than the main thread of the app. |
| // |
| // If another thread wasn't created, we want to wake up an existing thread |
| // if there is one waiting to pick up the next task. |
| // |
| // Note that we really need to do this *before* running the task, not |
| // after. Otherwise, if more than one task is posted, the creation of the |
| // second thread (since we only create one at a time) will be blocked by |
| // the execution of the first task, which could be arbitrarily long. |
| return PrepareToStartAdditionalThreadIfHelpful(); |
| } |
| |
| void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
| lock_.AssertAcquired(); |
| |
| if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { |
| DCHECK_GT(blocking_shutdown_thread_count_, 0u); |
| blocking_shutdown_thread_count_--; |
| } |
| |
| if (task.sequence_token_id) |
| current_sequences_.erase(task.sequence_token_id); |
| } |
| |
| bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( |
| int sequence_token_id) const { |
| lock_.AssertAcquired(); |
| return !sequence_token_id || |
| current_sequences_.find(sequence_token_id) == |
| current_sequences_.end(); |
| } |
| |
| int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { |
| lock_.AssertAcquired(); |
| // How thread creation works: |
| // |
| // We'de like to avoid creating threads with the lock held. However, we |
| // need to be sure that we have an accurate accounting of the threads for |
| // proper Joining and deltion on shutdown. |
| // |
| // We need to figure out if we need another thread with the lock held, which |
| // is what this function does. It then marks us as in the process of creating |
| // a thread. When we do shutdown, we wait until the thread_being_created_ |
| // flag is cleared, which ensures that the new thread is properly added to |
| // all the data structures and we can't leak it. Once shutdown starts, we'll |
| // refuse to create more threads or they would be leaked. |
| // |
| // Note that this creates a mostly benign race condition on shutdown that |
| // will cause fewer workers to be created than one would expect. It isn't |
| // much of an issue in real life, but affects some tests. Since we only spawn |
| // one worker at a time, the following sequence of events can happen: |
| // |
| // 1. Main thread posts a bunch of unrelated tasks that would normally be |
| // run on separate threads. |
| // 2. The first task post causes us to start a worker. Other tasks do not |
| // cause a worker to start since one is pending. |
| // 3. Main thread initiates shutdown. |
| // 4. No more threads are created since the shutdown_called_ flag is set. |
| // |
| // The result is that one may expect that max_threads_ workers to be created |
| // given the workload, but in reality fewer may be created because the |
| // sequence of thread creation on the background threads is racing with the |
| // shutdown call. |
| if (!shutdown_called_ && |
| !thread_being_created_ && |
| threads_.size() < max_threads_ && |
| waiting_thread_count_ == 0) { |
| // We could use an additional thread if there's work to be done. |
| for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); |
| i != pending_tasks_.end(); ++i) { |
| if (IsSequenceTokenRunnable(i->sequence_token_id)) { |
| // Found a runnable task, mark the thread as being started. |
| thread_being_created_ = true; |
| return static_cast<int>(threads_.size() + 1); |
| } |
| } |
| } |
| return 0; |
| } |
| |
| void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
| int thread_number) { |
| // Called outside of the lock. |
| DCHECK(thread_number > 0); |
| |
| // The worker is assigned to the list when the thread actually starts, which |
| // will manage the memory of the pointer. |
| new Worker(worker_pool_, thread_number, thread_name_prefix_); |
| } |
| |
| void SequencedWorkerPool::Inner::SignalHasWork() { |
| has_work_cv_.Signal(); |
| if (testing_observer_) { |
| testing_observer_->OnHasWork(); |
| } |
| } |
| |
| bool SequencedWorkerPool::Inner::CanShutdown() const { |
| lock_.AssertAcquired(); |
| // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
| return !thread_being_created_ && |
| blocking_shutdown_thread_count_ == 0 && |
| blocking_shutdown_pending_task_count_ == 0; |
| } |
| |
| // SequencedWorkerPool -------------------------------------------------------- |
| |
| SequencedWorkerPool::SequencedWorkerPool( |
| size_t max_threads, |
| const std::string& thread_name_prefix) |
| : constructor_message_loop_(MessageLoopProxy::current()), |
| inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this), |
| max_threads, thread_name_prefix, NULL)) { |
| } |
| |
| SequencedWorkerPool::SequencedWorkerPool( |
| size_t max_threads, |
| const std::string& thread_name_prefix, |
| TestingObserver* observer) |
| : constructor_message_loop_(MessageLoopProxy::current()), |
| inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this), |
| max_threads, thread_name_prefix, observer)) { |
| } |
| |
| SequencedWorkerPool::~SequencedWorkerPool() {} |
| |
| void SequencedWorkerPool::OnDestruct() const { |
| DCHECK(constructor_message_loop_.get()); |
| // Avoid deleting ourselves on a worker thread (which would |
| // deadlock). |
| if (RunsTasksOnCurrentThread()) { |
| constructor_message_loop_->DeleteSoon(FROM_HERE, this); |
| } else { |
| delete this; |
| } |
| } |
| |
| SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
| return inner_->GetSequenceToken(); |
| } |
| |
| SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
| const std::string& name) { |
| return inner_->GetNamedSequenceToken(name); |
| } |
| |
| scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( |
| SequenceToken token) { |
| return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); |
| } |
| |
| scoped_refptr<SequencedTaskRunner> |
| SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( |
| SequenceToken token, WorkerShutdown shutdown_behavior) { |
| return new SequencedWorkerPoolSequencedTaskRunner( |
| this, token, shutdown_behavior); |
| } |
| |
| scoped_refptr<TaskRunner> |
| SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( |
| WorkerShutdown shutdown_behavior) { |
| return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); |
| } |
| |
| bool SequencedWorkerPool::PostWorkerTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task) { |
| return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, |
| from_here, task, TimeDelta()); |
| } |
| |
| bool SequencedWorkerPool::PostDelayedWorkerTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay) { |
| WorkerShutdown shutdown_behavior = |
| delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; |
| return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, |
| from_here, task, delay); |
| } |
| |
| #if defined(COBALT) |
| bool SequencedWorkerPool::PostBlockingWorkerTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task) { |
| return inner_->PostBlockingTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, |
| from_here, task); |
| } |
| #endif |
| |
| bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| WorkerShutdown shutdown_behavior) { |
| return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, |
| from_here, task, TimeDelta()); |
| } |
| |
| bool SequencedWorkerPool::PostSequencedWorkerTask( |
| SequenceToken sequence_token, |
| const tracked_objects::Location& from_here, |
| const Closure& task) { |
| return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, |
| from_here, task, TimeDelta()); |
| } |
| |
| bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( |
| SequenceToken sequence_token, |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay) { |
| WorkerShutdown shutdown_behavior = |
| delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; |
| return inner_->PostTask(NULL, sequence_token, shutdown_behavior, |
| from_here, task, delay); |
| } |
| |
| bool SequencedWorkerPool::PostNamedSequencedWorkerTask( |
| const std::string& token_name, |
| const tracked_objects::Location& from_here, |
| const Closure& task) { |
| DCHECK(!token_name.empty()); |
| return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, |
| from_here, task, TimeDelta()); |
| } |
| |
| bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( |
| SequenceToken sequence_token, |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| WorkerShutdown shutdown_behavior) { |
| return inner_->PostTask(NULL, sequence_token, shutdown_behavior, |
| from_here, task, TimeDelta()); |
| } |
| |
| bool SequencedWorkerPool::PostDelayedTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task, |
| TimeDelta delay) { |
| return PostDelayedWorkerTask(from_here, task, delay); |
| } |
| |
| #if defined(COBALT) |
| bool SequencedWorkerPool::PostBlockingTask( |
| const tracked_objects::Location& from_here, |
| const Closure& task) { |
| return PostBlockingWorkerTask(from_here, task); |
| } |
| #endif |
| |
| bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
| return inner_->RunsTasksOnCurrentThread(); |
| } |
| |
| bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
| SequenceToken sequence_token) const { |
| return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
| } |
| |
| void SequencedWorkerPool::FlushForTesting() { |
| inner_->FlushForTesting(); |
| } |
| |
| void SequencedWorkerPool::SignalHasWorkForTesting() { |
| inner_->SignalHasWorkForTesting(); |
| } |
| |
| void SequencedWorkerPool::Shutdown() { |
| DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
| inner_->Shutdown(); |
| } |
| |
| } // namespace base |