blob: 716a2a43fbb9d27fc6b9f8ee24422a1ddd413ad6 [file] [log] [blame]
// Copyright 2012 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/threading/thread.h"
#include <memory>
#include <type_traits>
#include <utility>
#include "base/dcheck_is_on.h"
#include "base/functional/bind.h"
#include "base/functional/callback_helpers.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/memory/scoped_refptr.h"
#include "base/message_loop/message_pump.h"
#include "base/run_loop.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/current_thread.h"
#include "base/task/sequence_manager/sequence_manager_impl.h"
#include "base/task/sequence_manager/task_queue.h"
#include "base/task/single_thread_task_runner.h"
#include "base/third_party/dynamic_annotations/dynamic_annotations.h"
#include "base/threading/thread_id_name_manager.h"
#include "base/threading/thread_restrictions.h"
#include "base/types/pass_key.h"
#include "build/build_config.h"
#include "third_party/abseil-cpp/absl/base/attributes.h"
#if defined(STARBOARD)
#include <pthread.h>
#include "base/check_op.h"
#include "starboard/thread.h"
#else
#if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
#include "base/files/file_descriptor_watcher_posix.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
#endif
#if BUILDFLAG(IS_WIN)
#include "base/win/scoped_com_initializer.h"
#endif
#endif
namespace base {
#if DCHECK_IS_ON()
namespace {
#if defined(STARBOARD)
ABSL_CONST_INIT pthread_once_t s_once_flag = PTHREAD_ONCE_INIT;
ABSL_CONST_INIT pthread_key_t s_thread_local_key = 0;
void InitThreadLocalKey() {
int res = pthread_key_create(&s_thread_local_key , NULL);
DCHECK(res == 0);
}
void EnsureThreadLocalKeyInited() {
pthread_once(&s_once_flag, InitThreadLocalKey);
}
bool GetWasQuitProperly() {
EnsureThreadLocalKeyInited();
void* was_quit_properly = pthread_getspecific(s_thread_local_key);
return !!was_quit_properly ? reinterpret_cast<intptr_t>(was_quit_properly) != 0 : false;
}
#else
// We use this thread-local variable to record whether or not a thread exited
// because its Stop method was called. This allows us to catch cases where
// MessageLoop::QuitWhenIdle() is called directly, which is unexpected when
// using a Thread to setup and run a MessageLoop.
ABSL_CONST_INIT thread_local bool was_quit_properly = false;
#endif
} // namespace
#endif
namespace internal {
class SequenceManagerThreadDelegate : public Thread::Delegate {
public:
explicit SequenceManagerThreadDelegate(
MessagePumpType message_pump_type,
OnceCallback<std::unique_ptr<MessagePump>()> message_pump_factory)
: sequence_manager_(
sequence_manager::internal::CreateUnboundSequenceManagerImpl(
PassKey<base::internal::SequenceManagerThreadDelegate>(),
sequence_manager::SequenceManager::Settings::Builder()
.SetMessagePumpType(message_pump_type)
.Build())),
default_task_queue_(sequence_manager_->CreateTaskQueue(
sequence_manager::TaskQueue::Spec(
sequence_manager::QueueName::DEFAULT_TQ))),
message_pump_factory_(std::move(message_pump_factory)) {
sequence_manager_->SetDefaultTaskRunner(default_task_queue_->task_runner());
}
~SequenceManagerThreadDelegate() override = default;
scoped_refptr<SingleThreadTaskRunner> GetDefaultTaskRunner() override {
// Surprisingly this might not be default_task_queue_->task_runner() which
// we set in the constructor. The Thread::Init() method could create a
// SequenceManager on top of the current one and call
// SequenceManager::SetDefaultTaskRunner which would propagate the new
// TaskRunner down to our SequenceManager. Turns out, code actually relies
// on this and somehow relies on
// SequenceManagerThreadDelegate::GetDefaultTaskRunner returning this new
// TaskRunner. So instead of returning default_task_queue_->task_runner() we
// need to query the SequenceManager for it.
// The underlying problem here is that Subclasses of Thread can do crazy
// stuff in Init() but they are not really in control of what happens in the
// Thread::Delegate, as this is passed in on calling StartWithOptions which
// could happen far away from where the Thread is created. We should
// consider getting rid of StartWithOptions, and pass them as a constructor
// argument instead.
return sequence_manager_->GetTaskRunner();
}
void BindToCurrentThread(TimerSlack timer_slack) override {
sequence_manager_->BindToMessagePump(
std::move(message_pump_factory_).Run());
sequence_manager_->SetTimerSlack(timer_slack);
}
private:
std::unique_ptr<sequence_manager::internal::SequenceManagerImpl>
sequence_manager_;
scoped_refptr<sequence_manager::TaskQueue> default_task_queue_;
OnceCallback<std::unique_ptr<MessagePump>()> message_pump_factory_;
};
} // namespace internal
Thread::Options::Options() = default;
Thread::Options::Options(MessagePumpType type, size_t size)
: message_pump_type(type), stack_size(size) {}
Thread::Options::Options(ThreadType thread_type) : thread_type(thread_type) {}
Thread::Options::Options(Options&& other)
: message_pump_type(std::move(other.message_pump_type)),
delegate(std::move(other.delegate)),
timer_slack(std::move(other.timer_slack)),
message_pump_factory(std::move(other.message_pump_factory)),
stack_size(std::move(other.stack_size)),
thread_type(std::move(other.thread_type)),
joinable(std::move(other.joinable)) {
other.moved_from = true;
}
Thread::Options& Thread::Options::operator=(Thread::Options&& other) {
DCHECK_NE(this, &other);
message_pump_type = std::move(other.message_pump_type);
delegate = std::move(other.delegate);
timer_slack = std::move(other.timer_slack);
message_pump_factory = std::move(other.message_pump_factory);
stack_size = std::move(other.stack_size);
thread_type = std::move(other.thread_type);
joinable = std::move(other.joinable);
other.moved_from = true;
return *this;
}
Thread::Options::~Options() = default;
Thread::Thread(const std::string& name)
: id_event_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
name_(name),
start_event_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED) {
// Only bind the sequence on Start(): the state is constant between
// construction and Start() and it's thus valid for Start() to be called on
// another sequence as long as every other operation is then performed on that
// sequence.
owning_sequence_checker_.DetachFromSequence();
}
Thread::~Thread() {
Stop();
}
bool Thread::Start() {
DCHECK(owning_sequence_checker_.CalledOnValidSequence());
Options options;
#if BUILDFLAG(IS_WIN)
if (com_status_ == STA)
options.message_pump_type = MessagePumpType::UI;
#endif
return StartWithOptions(std::move(options));
}
bool Thread::StartWithOptions(Options options) {
DCHECK(options.IsValid());
DCHECK(owning_sequence_checker_.CalledOnValidSequence());
DCHECK(!delegate_);
DCHECK(!IsRunning());
DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's "
<< "not allowed!";
#if BUILDFLAG(IS_WIN)
DCHECK((com_status_ != STA) ||
(options.message_pump_type == MessagePumpType::UI));
#endif
// Reset |id_| here to support restarting the thread.
id_event_.Reset();
id_ = kInvalidThreadId;
SetThreadWasQuitProperly(false);
timer_slack_ = options.timer_slack;
if (options.delegate) {
DCHECK(!options.message_pump_factory);
delegate_ = std::move(options.delegate);
} else if (options.message_pump_factory) {
delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
MessagePumpType::CUSTOM, options.message_pump_factory);
} else {
delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
options.message_pump_type,
BindOnce([](MessagePumpType type) { return MessagePump::Create(type); },
options.message_pump_type));
}
start_event_.Reset();
// Hold |thread_lock_| while starting the new thread to synchronize with
// Stop() while it's not guaranteed to be sequenced (until crbug/629139 is
// fixed).
{
AutoLock lock(thread_lock_);
bool success = options.joinable
? PlatformThread::CreateWithType(
options.stack_size, this, &thread_,
options.thread_type, options.message_pump_type)
: PlatformThread::CreateNonJoinableWithType(
options.stack_size, this, options.thread_type,
options.message_pump_type);
if (!success) {
DLOG(ERROR) << "failed to create thread";
return false;
}
}
joinable_ = options.joinable;
return true;
}
bool Thread::StartAndWaitForTesting() {
DCHECK(owning_sequence_checker_.CalledOnValidSequence());
bool result = Start();
if (!result)
return false;
WaitUntilThreadStarted();
return true;
}
bool Thread::WaitUntilThreadStarted() const {
DCHECK(owning_sequence_checker_.CalledOnValidSequence());
if (!delegate_)
return false;
// https://crbug.com/918039
base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
start_event_.Wait();
return true;
}
void Thread::FlushForTesting() {
DCHECK(owning_sequence_checker_.CalledOnValidSequence());
if (!delegate_)
return;
WaitableEvent done(WaitableEvent::ResetPolicy::AUTOMATIC,
WaitableEvent::InitialState::NOT_SIGNALED);
task_runner()->PostTask(FROM_HERE,
BindOnce(&WaitableEvent::Signal, Unretained(&done)));
done.Wait();
}
void Thread::Stop() {
DCHECK(joinable_);
// TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
// enable this check, until then synchronization with Start() via
// |thread_lock_| is required...
// DCHECK(owning_sequence_checker_.CalledOnValidSequence());
AutoLock lock(thread_lock_);
StopSoon();
// Can't join if the |thread_| is either already gone or is non-joinable.
if (thread_.is_null())
return;
// Wait for the thread to exit.
//
// TODO(darin): Unfortunately, we need to keep |delegate_| around
// until the thread exits. Some consumers are abusing the API. Make them stop.
PlatformThread::Join(thread_);
thread_ = base::PlatformThreadHandle();
// The thread should release |delegate_| on exit (note: Join() adds
// an implicit memory barrier and no lock is thus required for this check).
DCHECK(!delegate_);
stopping_ = false;
}
void Thread::StopSoon() {
// TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
// enable this check.
// DCHECK(owning_sequence_checker_.CalledOnValidSequence());
if (stopping_ || !delegate_)
return;
stopping_ = true;
task_runner()->PostTask(
FROM_HERE, base::BindOnce(&Thread::ThreadQuitHelper, Unretained(this)));
}
void Thread::DetachFromSequence() {
DCHECK(owning_sequence_checker_.CalledOnValidSequence());
owning_sequence_checker_.DetachFromSequence();
}
PlatformThreadId Thread::GetThreadId() const {
if (!id_event_.IsSignaled()) {
// If the thread is created but not started yet, wait for |id_| being ready.
base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
id_event_.Wait();
}
return id_;
}
bool Thread::IsRunning() const {
// TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
// enable this check.
// DCHECK(owning_sequence_checker_.CalledOnValidSequence());
// If the thread's already started (i.e. |delegate_| is non-null) and
// not yet requested to stop (i.e. |stopping_| is false) we can just return
// true. (Note that |stopping_| is touched only on the same sequence that
// starts / started the new thread so we need no locking here.)
if (delegate_ && !stopping_)
return true;
// Otherwise check the |running_| flag, which is set to true by the new thread
// only while it is inside Run().
AutoLock lock(running_lock_);
return running_;
}
void Thread::Run(RunLoop* run_loop) {
// Overridable protected method to be called from our |thread_| only.
DCHECK(id_event_.IsSignaled());
DCHECK_EQ(id_, PlatformThread::CurrentId());
run_loop->Run();
}
// static
void Thread::SetThreadWasQuitProperly(bool flag) {
#if DCHECK_IS_ON()
#if defined(STARBOARD)
EnsureThreadLocalKeyInited();
pthread_setspecific(s_thread_local_key, reinterpret_cast<void*>(static_cast<intptr_t>(flag)));
#else
was_quit_properly = flag;
#endif
#endif
}
// static
bool Thread::GetThreadWasQuitProperly() {
#if DCHECK_IS_ON()
#if defined(STARBOARD)
return GetWasQuitProperly();
#else
return was_quit_properly;
#endif
#else
return true;
#endif
}
void Thread::ThreadMain() {
// First, make GetThreadId() available to avoid deadlocks. It could be called
// any place in the following thread initialization code.
DCHECK(!id_event_.IsSignaled());
// Note: this read of |id_| while |id_event_| isn't signaled is exceptionally
// okay because ThreadMain has a happens-after relationship with the other
// write in StartWithOptions().
DCHECK_EQ(kInvalidThreadId, id_);
id_ = PlatformThread::CurrentId();
DCHECK_NE(kInvalidThreadId, id_);
id_event_.Signal();
// Complete the initialization of our Thread object.
PlatformThread::SetName(name_.c_str());
ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
// Lazily initialize the |message_loop| so that it can run on this thread.
DCHECK(delegate_);
// This binds CurrentThread and SingleThreadTaskRunner::CurrentDefaultHandle.
delegate_->BindToCurrentThread(timer_slack_);
DCHECK(CurrentThread::Get());
DCHECK(SingleThreadTaskRunner::HasCurrentDefault());
#if !defined(STARBOARD)
#if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
// Allow threads running a MessageLoopForIO to use FileDescriptorWatcher API.
std::unique_ptr<FileDescriptorWatcher> file_descriptor_watcher;
if (CurrentIOThread::IsSet()) {
file_descriptor_watcher = std::make_unique<FileDescriptorWatcher>(
delegate_->GetDefaultTaskRunner());
}
#endif // (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
#if BUILDFLAG(IS_WIN)
std::unique_ptr<win::ScopedCOMInitializer> com_initializer;
if (com_status_ != NONE) {
com_initializer.reset(
(com_status_ == STA)
? new win::ScopedCOMInitializer()
: new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
}
#endif
#endif
// Let the thread do extra initialization.
Init();
{
AutoLock lock(running_lock_);
running_ = true;
}
start_event_.Signal();
RunLoop run_loop;
run_loop_ = &run_loop;
Run(run_loop_);
{
AutoLock lock(running_lock_);
running_ = false;
}
// Let the thread do extra cleanup.
CleanUp();
#if BUILDFLAG(IS_WIN)
com_initializer.reset();
#endif
DCHECK(GetThreadWasQuitProperly());
// We can't receive messages anymore.
// (The message loop is destructed at the end of this block)
delegate_.reset();
run_loop_ = nullptr;
}
void Thread::ThreadQuitHelper() {
DCHECK(run_loop_);
run_loop_->QuitWhenIdle();
SetThreadWasQuitProperly(true);
}
} // namespace base