| // Copyright 2012 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ |
| #define BASE_OBSERVER_LIST_THREADSAFE_H_ |
| |
| #include <unordered_map> |
| #include <utility> |
| |
| #include "base/auto_reset.h" |
| #include "base/base_export.h" |
| #include "base/check.h" |
| #include "base/check_op.h" |
| #include "base/containers/contains.h" |
| #include "base/dcheck_is_on.h" |
| #include "base/functional/bind.h" |
| #include "base/location.h" |
| #include "base/memory/raw_ptr.h" |
| #include "base/memory/ref_counted.h" |
| #include "base/observer_list.h" |
| #include "base/synchronization/lock.h" |
| #include "base/task/sequenced_task_runner.h" |
| #include "base/task/single_thread_task_runner.h" |
| #include "build/build_config.h" |
| #include "third_party/abseil-cpp/absl/base/attributes.h" |
| |
| #if defined(STARBOARD) |
| #include <pthread.h> |
| #endif |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // |
| // OVERVIEW: |
| // |
| // A thread-safe container for a list of observers. This is similar to the |
| // observer_list (see observer_list.h), but it is more robust for multi- |
| // threaded situations. |
| // |
| // The following use cases are supported: |
| // * Observers can register for notifications from any sequence. They are |
| // always notified on the sequence from which they were registered. |
| // * Any sequence may trigger a notification via Notify(). |
| // * Observers can remove themselves from the observer list inside of a |
| // callback. |
| // * If one sequence is notifying observers concurrently with an observer |
| // removing itself from the observer list, the notifications will be |
| // silently dropped. |
| // |
| // The drawback of the threadsafe observer list is that notifications are not |
| // as real-time as the non-threadsafe version of this class. Notifications |
| // will always be done via PostTask() to another sequence, whereas with the |
| // non-thread-safe ObserverList, notifications happen synchronously. |
| // |
| // Note: this class previously supported synchronous notifications for |
| // same-sequence observers, but it was error-prone and removed in |
| // crbug.com/1193750, think twice before re-considering this paradigm. |
| // |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| namespace base { |
| namespace internal { |
| |
| class BASE_EXPORT ObserverListThreadSafeBase |
| : public RefCountedThreadSafe<ObserverListThreadSafeBase> { |
| public: |
| struct NotificationDataBase { |
| NotificationDataBase(void* observer_list_in, const Location& from_here_in) |
| : observer_list(observer_list_in), from_here(from_here_in) {} |
| |
| raw_ptr<void> observer_list; |
| Location from_here; |
| }; |
| |
| ObserverListThreadSafeBase() = default; |
| ObserverListThreadSafeBase(const ObserverListThreadSafeBase&) = delete; |
| ObserverListThreadSafeBase& operator=(const ObserverListThreadSafeBase&) = |
| delete; |
| |
| protected: |
| template <typename ObserverType, typename Method> |
| struct Dispatcher; |
| |
| template <typename ObserverType, typename ReceiverType, typename... Params> |
| struct Dispatcher<ObserverType, void (ReceiverType::*)(Params...)> { |
| static void Run(void (ReceiverType::*m)(Params...), |
| Params... params, |
| ObserverType* obj) { |
| (obj->*m)(std::forward<Params>(params)...); |
| } |
| }; |
| |
| static const NotificationDataBase*& GetCurrentNotification(); |
| |
| #if defined(STARBOARD) |
| static void EnsureThreadLocalKeyInited(); |
| static const pthread_key_t GetThreadLocalKey(); |
| #endif |
| |
| virtual ~ObserverListThreadSafeBase() = default; |
| |
| private: |
| friend class RefCountedThreadSafe<ObserverListThreadSafeBase>; |
| }; |
| |
| } // namespace internal |
| |
| template <class ObserverType> |
| class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase { |
| public: |
| enum class AddObserverResult { |
| kBecameNonEmpty, |
| kWasAlreadyNonEmpty, |
| }; |
| enum class RemoveObserverResult { |
| kWasOrBecameEmpty, |
| kRemainsNonEmpty, |
| }; |
| |
| ObserverListThreadSafe() = default; |
| explicit ObserverListThreadSafe(ObserverListPolicy policy) |
| : policy_(policy) {} |
| ObserverListThreadSafe(const ObserverListThreadSafe&) = delete; |
| ObserverListThreadSafe& operator=(const ObserverListThreadSafe&) = delete; |
| |
| // Adds |observer| to the list. |observer| must not already be in the list. |
| AddObserverResult AddObserver(ObserverType* observer) { |
| DCHECK(SequencedTaskRunner::HasCurrentDefault()) |
| << "An observer can only be registered when " |
| "SequencedTaskRunner::HasCurrentDefault. If this is in a unit test, " |
| "you're likely merely missing a " |
| "base::test::(SingleThread)TaskEnvironment in your fixture. " |
| "Otherwise, try running this code on a named thread (main/UI/IO) or " |
| "from a task posted to a base::SequencedTaskRunner or " |
| "base::SingleThreadTaskRunner."; |
| |
| AutoLock auto_lock(lock_); |
| |
| bool was_empty = observers_.empty(); |
| |
| // Add |observer| to the list of observers. |
| DCHECK(!Contains(observers_, observer)); |
| const scoped_refptr<SequencedTaskRunner> task_runner = |
| SequencedTaskRunner::GetCurrentDefault(); |
| // Each observer gets a unique identifier. These unique identifiers are used |
| // to avoid execution of pending posted-tasks over removed or released |
| // observers. |
| const size_t observer_id = ++observer_id_counter_; |
| ObserverTaskRunnerInfo task_info = {task_runner, observer_id}; |
| observers_[observer] = std::move(task_info); |
| |
| // If this is called while a notification is being dispatched on this thread |
| // and |policy_| is ALL, |observer| must be notified (if a notification is |
| // being dispatched on another thread in parallel, the notification may or |
| // may not make it to |observer| depending on the outcome of the race to |
| // |lock_|). |
| if (policy_ == ObserverListPolicy::ALL) { |
| #if defined(STARBOARD) |
| void* current_notification_void = pthread_getspecific(GetThreadLocalKey()); |
| if (current_notification_void) { |
| if (const NotificationDataBase* const current_notification = |
| static_cast<NotificationDataBase*>(current_notification_void); |
| #else |
| if (const NotificationDataBase* const current_notification = |
| GetCurrentNotification(); |
| #endif |
| current_notification && current_notification->observer_list == this) { |
| const NotificationData* notification_data = |
| static_cast<const NotificationData*>(current_notification); |
| task_runner->PostTask( |
| current_notification->from_here, |
| BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, |
| // While `observer` may be dangling, we pass it and |
| // check it wasn't deallocated in NotifyWrapper() which can |
| // check `observers_` to verify presence (the owner of the |
| // observer is responsible for removing it from that list |
| // before deallocation). |
| UnsafeDangling(observer), |
| NotificationData(this, observer_id, |
| current_notification->from_here, |
| notification_data->method))); |
| } |
| #if defined(STARBOARD) |
| } |
| #endif |
| } |
| |
| return was_empty ? AddObserverResult::kBecameNonEmpty |
| : AddObserverResult::kWasAlreadyNonEmpty; |
| } |
| |
| // Remove an observer from the list if it is in the list. |
| // |
| // If a notification was sent to the observer but hasn't started to run yet, |
| // it will be aborted. If a notification has started to run, removing the |
| // observer won't stop it. |
| RemoveObserverResult RemoveObserver(ObserverType* observer) { |
| AutoLock auto_lock(lock_); |
| observers_.erase(observer); |
| return observers_.empty() ? RemoveObserverResult::kWasOrBecameEmpty |
| : RemoveObserverResult::kRemainsNonEmpty; |
| } |
| |
| // Verifies that the list is currently empty (i.e. there are no observers). |
| void AssertEmpty() const { |
| #if DCHECK_IS_ON() |
| AutoLock auto_lock(lock_); |
| DCHECK(observers_.empty()); |
| #endif |
| } |
| |
| // Asynchronously invokes a callback on all observers, on their registration |
| // sequence. You cannot assume that at the completion of the Notify call that |
| // all Observers have been Notified. The notification may still be pending |
| // delivery. |
| template <typename Method, typename... Params> |
| void Notify(const Location& from_here, Method m, Params&&... params) { |
| RepeatingCallback<void(ObserverType*)> method = |
| BindRepeating(&Dispatcher<ObserverType, Method>::Run, m, |
| std::forward<Params>(params)...); |
| |
| AutoLock lock(lock_); |
| for (const auto& observer : observers_) { |
| observer.second.task_runner->PostTask( |
| from_here, |
| BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, |
| // While `observer.first` may be dangling, we pass it and |
| // check it wasn't deallocated in NotifyWrapper() which can |
| // check `observers_` to verify presence (the owner of the |
| // observer is responsible for removing it from that list |
| // before deallocation). |
| UnsafeDangling(observer.first), |
| NotificationData(this, observer.second.observer_id, |
| from_here, method))); |
| } |
| } |
| |
| private: |
| friend class RefCountedThreadSafe<ObserverListThreadSafeBase>; |
| |
| struct NotificationData : public NotificationDataBase { |
| NotificationData(ObserverListThreadSafe* observer_list_in, |
| size_t observer_id_in, |
| const Location& from_here_in, |
| const RepeatingCallback<void(ObserverType*)>& method_in) |
| : NotificationDataBase(observer_list_in, from_here_in), |
| method(method_in), |
| observer_id(observer_id_in) {} |
| |
| RepeatingCallback<void(ObserverType*)> method; |
| size_t observer_id; |
| }; |
| |
| ~ObserverListThreadSafe() override = default; |
| |
| void NotifyWrapper(MayBeDangling<ObserverType> observer, |
| const NotificationData& notification) { |
| { |
| AutoLock auto_lock(lock_); |
| |
| // Check whether the observer still needs a notification. |
| DCHECK_EQ(notification.observer_list, this); |
| auto it = observers_.find(observer); |
| if (it == observers_.end() || |
| it->second.observer_id != notification.observer_id) { |
| return; |
| } |
| DCHECK(it->second.task_runner->RunsTasksInCurrentSequence()); |
| } |
| |
| // Keep track of the notification being dispatched on the current thread. |
| // This will be used if the callback below calls AddObserver(). |
| // |
| // Note: GetCurrentNotification() may not return null if this runs in a |
| // nested loop started by a notification callback. In that case, it is |
| // important to save the previous value to restore it later. |
| #if defined(STARBOARD) |
| EnsureThreadLocalKeyInited(); |
| void* scoped_reset_value = pthread_getspecific(GetThreadLocalKey()); |
| pthread_setspecific(GetThreadLocalKey(), const_cast<NotificationData*>(¬ification)); |
| #else |
| const AutoReset<const NotificationDataBase*> resetter_( |
| &GetCurrentNotification(), ¬ification); |
| #endif |
| |
| // Invoke the callback. |
| notification.method.Run(observer); |
| |
| #if defined(STARBOARD) |
| pthread_setspecific(GetThreadLocalKey(), scoped_reset_value); |
| #endif |
| } |
| |
| const ObserverListPolicy policy_ = ObserverListPolicy::ALL; |
| |
| mutable Lock lock_; |
| |
| size_t observer_id_counter_ GUARDED_BY(lock_) = 0; |
| |
| struct ObserverTaskRunnerInfo { |
| scoped_refptr<SequencedTaskRunner> task_runner; |
| size_t observer_id = 0; |
| }; |
| |
| // Keys are observers. Values are the SequencedTaskRunners on which they must |
| // be notified. |
| std::unordered_map<ObserverType*, ObserverTaskRunnerInfo> observers_ |
| GUARDED_BY(lock_); |
| }; |
| |
| } // namespace base |
| |
| #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ |