| // Copyright 2017 The Chromium Authors. All rights reserved. | 
 | // Use of this source code is governed by a BSD-style license that can be | 
 | // found in the LICENSE file. | 
 |  | 
 | #include "base/synchronization/waitable_event.h" | 
 |  | 
 | #include <dispatch/dispatch.h> | 
 | #include <mach/mach.h> | 
 | #include <sys/event.h> | 
 |  | 
 | #include "base/debug/activity_tracker.h" | 
 | #include "base/files/scoped_file.h" | 
 | #include "base/mac/dispatch_source_mach.h" | 
 | #include "base/mac/mac_util.h" | 
 | #include "base/mac/mach_logging.h" | 
 | #include "base/mac/scoped_dispatch_object.h" | 
 | #include "base/posix/eintr_wrapper.h" | 
 | #include "base/threading/scoped_blocking_call.h" | 
 | #include "base/threading/thread_restrictions.h" | 
 | #include "build/build_config.h" | 
 | #include "starboard/types.h" | 
 |  | 
 | namespace base { | 
 |  | 
 | WaitableEvent::WaitableEvent(ResetPolicy reset_policy, | 
 |                              InitialState initial_state) | 
 |     : policy_(reset_policy) { | 
 |   mach_port_options_t options{}; | 
 |   options.flags = MPO_INSERT_SEND_RIGHT; | 
 |   options.mpl.mpl_qlimit = 1; | 
 |  | 
 |   mach_port_t name; | 
 |   kern_return_t kr = mach_port_construct(mach_task_self(), &options, 0, &name); | 
 |   MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_construct"; | 
 |  | 
 |   receive_right_ = new ReceiveRight(name, UseSlowWatchList(policy_)); | 
 |   send_right_.reset(name); | 
 |  | 
 |   if (initial_state == InitialState::SIGNALED) | 
 |     Signal(); | 
 | } | 
 |  | 
 | WaitableEvent::~WaitableEvent() = default; | 
 |  | 
 | void WaitableEvent::Reset() { | 
 |   PeekPort(receive_right_->Name(), true); | 
 | } | 
 |  | 
 | void WaitableEvent::Signal() { | 
 |   // If using the slow watch-list, copy the watchers to a local. After | 
 |   // mach_msg(), the event object may be deleted by an awoken thread. | 
 |   const bool use_slow_path = UseSlowWatchList(policy_); | 
 |   ReceiveRight* receive_right = nullptr;  // Manually reference counted. | 
 |   std::unique_ptr<std::list<OnceClosure>> watch_list; | 
 |   if (use_slow_path) { | 
 |     // To avoid a race condition of a WaitableEventWatcher getting added | 
 |     // while another thread is in this method, hold the watch-list lock for | 
 |     // the duration of mach_msg(). This requires ref-counting the | 
 |     // |receive_right_| object that contains it, in case the event is deleted | 
 |     // by a waiting thread after mach_msg(). | 
 |     receive_right = receive_right_.get(); | 
 |     receive_right->AddRef(); | 
 |  | 
 |     ReceiveRight::WatchList* slow_watch_list = receive_right->SlowWatchList(); | 
 |     slow_watch_list->lock.Acquire(); | 
 |  | 
 |     if (!slow_watch_list->list.empty()) { | 
 |       watch_list.reset(new std::list<OnceClosure>()); | 
 |       std::swap(*watch_list, slow_watch_list->list); | 
 |     } | 
 |   } | 
 |  | 
 |   mach_msg_empty_send_t msg{}; | 
 |   msg.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND); | 
 |   msg.header.msgh_size = sizeof(&msg); | 
 |   msg.header.msgh_remote_port = send_right_.get(); | 
 |   // If the event is already signaled, this will time out because the queue | 
 |   // has a length of one. | 
 |   kern_return_t kr = | 
 |       mach_msg(&msg.header, MACH_SEND_MSG | MACH_SEND_TIMEOUT, sizeof(msg), 0, | 
 |                MACH_PORT_NULL, 0, MACH_PORT_NULL); | 
 |   MACH_CHECK(kr == KERN_SUCCESS || kr == MACH_SEND_TIMED_OUT, kr) << "mach_msg"; | 
 |  | 
 |   if (use_slow_path) { | 
 |     // If a WaitableEventWatcher were to start watching when the event is | 
 |     // signaled, it runs the callback immediately without adding it to the | 
 |     // list. Therefore the watch list can only be non-empty if the event is | 
 |     // newly signaled. | 
 |     if (watch_list.get()) { | 
 |       MACH_CHECK(kr == KERN_SUCCESS, kr); | 
 |       for (auto& watcher : *watch_list) { | 
 |         std::move(watcher).Run(); | 
 |       } | 
 |     } | 
 |  | 
 |     receive_right->SlowWatchList()->lock.Release(); | 
 |     receive_right->Release(); | 
 |   } | 
 | } | 
 |  | 
 | bool WaitableEvent::IsSignaled() { | 
 |   return PeekPort(receive_right_->Name(), policy_ == ResetPolicy::AUTOMATIC); | 
 | } | 
 |  | 
 | void WaitableEvent::Wait() { | 
 |   bool result = TimedWaitUntil(TimeTicks::Max()); | 
 |   DCHECK(result) << "TimedWait() should never fail with infinite timeout"; | 
 | } | 
 |  | 
 | bool WaitableEvent::TimedWait(const TimeDelta& wait_delta) { | 
 |   return TimedWaitUntil(TimeTicks::Now() + wait_delta); | 
 | } | 
 |  | 
 | bool WaitableEvent::TimedWaitUntil(const TimeTicks& end_time) { | 
 |   internal::ScopedBlockingCallWithBaseSyncPrimitives scoped_blocking_call( | 
 |       BlockingType::MAY_BLOCK); | 
 |   // Record the event that this thread is blocking upon (for hang diagnosis). | 
 |   debug::ScopedEventWaitActivity event_activity(this); | 
 |  | 
 |   mach_msg_empty_rcv_t msg{}; | 
 |   msg.header.msgh_local_port = receive_right_->Name(); | 
 |  | 
 |   mach_msg_option_t options = MACH_RCV_MSG; | 
 |  | 
 |   if (!end_time.is_max()) | 
 |     options |= MACH_RCV_TIMEOUT | MACH_RCV_INTERRUPT; | 
 |  | 
 |   mach_msg_size_t rcv_size = sizeof(msg); | 
 |   if (policy_ == ResetPolicy::MANUAL) { | 
 |     // To avoid dequeing the message, receive with a size of 0 and set | 
 |     // MACH_RCV_LARGE to keep the message in the queue. | 
 |     options |= MACH_RCV_LARGE; | 
 |     rcv_size = 0; | 
 |   } | 
 |  | 
 |   kern_return_t kr; | 
 |   mach_msg_timeout_t timeout = MACH_MSG_TIMEOUT_NONE; | 
 |   do { | 
 |     if (!end_time.is_max()) { | 
 |       timeout = std::max<int64_t>( | 
 |           0, (end_time - TimeTicks::Now()).InMillisecondsRoundedUp()); | 
 |     } | 
 |     kr = mach_msg(&msg.header, options, 0, rcv_size, receive_right_->Name(), | 
 |                   timeout, MACH_PORT_NULL); | 
 |     // If the thread is interrupted during mach_msg(), the system call | 
 |     // will be restarted. However, the libsyscall wrapper does not adjust | 
 |     // the timeout by the amount of time already waited. | 
 |     // Using MACH_RCV_INTERRUPT will instead return from mach_msg(), | 
 |     // so that the call can be retried with an adjusted timeout. | 
 |   } while (kr == MACH_RCV_INTERRUPTED); | 
 |  | 
 |   if (kr == KERN_SUCCESS) { | 
 |     return true; | 
 |   } else if (rcv_size == 0 && kr == MACH_RCV_TOO_LARGE) { | 
 |     return true; | 
 |   } else { | 
 |     MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg"; | 
 |     return false; | 
 |   } | 
 | } | 
 |  | 
 | // static | 
 | bool WaitableEvent::UseSlowWatchList(ResetPolicy policy) { | 
 | #if defined(OS_IOS) | 
 |   const bool use_slow_path = false; | 
 | #else | 
 |   static bool use_slow_path = !mac::IsAtLeastOS10_12(); | 
 | #endif | 
 |   return policy == ResetPolicy::MANUAL && use_slow_path; | 
 | } | 
 |  | 
 | // static | 
 | size_t WaitableEvent::WaitMany(WaitableEvent** raw_waitables, size_t count) { | 
 |   DCHECK(count) << "Cannot wait on no events"; | 
 |   internal::ScopedBlockingCallWithBaseSyncPrimitives scoped_blocking_call( | 
 |       BlockingType::MAY_BLOCK); | 
 |   // Record an event (the first) that this thread is blocking upon. | 
 |   debug::ScopedEventWaitActivity event_activity(raw_waitables[0]); | 
 |  | 
 |   // On macOS 10.11+, using Mach port sets may cause system instability, per | 
 |   // https://crbug.com/756102. On macOS 10.12+, a kqueue can be used | 
 |   // instead to work around that. On macOS 10.9 and 10.10, kqueue only works | 
 |   // for port sets, so port sets are just used directly. On macOS 10.11, | 
 |   // libdispatch sources are used. Therefore, there are three different | 
 |   // primitives that can be used to implement WaitMany. Which one to use is | 
 |   // selected at run-time by OS version checks. | 
 |   enum WaitManyPrimitive { | 
 |     KQUEUE, | 
 |     DISPATCH, | 
 |     PORT_SET, | 
 |   }; | 
 | #if defined(OS_IOS) | 
 |   const WaitManyPrimitive kPrimitive = PORT_SET; | 
 | #else | 
 |   const WaitManyPrimitive kPrimitive = | 
 |       mac::IsAtLeastOS10_12() ? KQUEUE | 
 |                               : (mac::IsOS10_11() ? DISPATCH : PORT_SET); | 
 | #endif | 
 |   if (kPrimitive == KQUEUE) { | 
 |     std::vector<kevent64_s> events(count); | 
 |     for (size_t i = 0; i < count; ++i) { | 
 |       EV_SET64(&events[i], raw_waitables[i]->receive_right_->Name(), | 
 |                EVFILT_MACHPORT, EV_ADD, 0, 0, i, 0, 0); | 
 |     } | 
 |  | 
 |     std::vector<kevent64_s> out_events(count); | 
 |  | 
 |     ScopedFD wait_many(kqueue()); | 
 |     PCHECK(wait_many.is_valid()) << "kqueue"; | 
 |  | 
 |     int rv = HANDLE_EINTR(kevent64(wait_many.get(), events.data(), count, | 
 |                                    out_events.data(), count, 0, nullptr)); | 
 |     PCHECK(rv > 0) << "kevent64"; | 
 |  | 
 |     size_t triggered = -1; | 
 |     for (size_t i = 0; i < static_cast<size_t>(rv); ++i) { | 
 |       // WaitMany should return the lowest index in |raw_waitables| that was | 
 |       // triggered. | 
 |       size_t index = static_cast<size_t>(out_events[i].udata); | 
 |       triggered = std::min(triggered, index); | 
 |     } | 
 |  | 
 |     if (raw_waitables[triggered]->policy_ == ResetPolicy::AUTOMATIC) { | 
 |       // The message needs to be dequeued to reset the event. | 
 |       PeekPort(raw_waitables[triggered]->receive_right_->Name(), true); | 
 |     } | 
 |  | 
 |     return triggered; | 
 |   } else if (kPrimitive == DISPATCH) { | 
 |     // Each item in |raw_waitables| will be watched using a dispatch souce | 
 |     // scheduled on the serial |queue|. The first one to be invoked will | 
 |     // signal the |semaphore| that this method will wait on. | 
 |     ScopedDispatchObject<dispatch_queue_t> queue(dispatch_queue_create( | 
 |         "org.chromium.base.WaitableEvent.WaitMany", DISPATCH_QUEUE_SERIAL)); | 
 |     ScopedDispatchObject<dispatch_semaphore_t> semaphore( | 
 |         dispatch_semaphore_create(0)); | 
 |  | 
 |     // Block capture references. |signaled| will identify the index in | 
 |     // |raw_waitables| whose source was invoked. | 
 |     dispatch_semaphore_t semaphore_ref = semaphore.get(); | 
 |     const size_t kUnsignaled = -1; | 
 |     __block size_t signaled = kUnsignaled; | 
 |  | 
 |     // Create a MACH_RECV dispatch source for each event. These must be | 
 |     // destroyed before the |queue| and |semaphore|. | 
 |     std::vector<std::unique_ptr<DispatchSourceMach>> sources; | 
 |     for (size_t i = 0; i < count; ++i) { | 
 |       const bool auto_reset = | 
 |           raw_waitables[i]->policy_ == WaitableEvent::ResetPolicy::AUTOMATIC; | 
 |       // The block will copy a reference to |right|. | 
 |       scoped_refptr<WaitableEvent::ReceiveRight> right = | 
 |           raw_waitables[i]->receive_right_; | 
 |       auto source = | 
 |           std::make_unique<DispatchSourceMach>(queue, right->Name(), ^{ | 
 |             // After the semaphore is signaled, another event be signaled and | 
 |             // the source may have its block put on the |queue|. WaitMany | 
 |             // should only report (and auto-reset) one event, so the first | 
 |             // event to signal is reported. | 
 |             if (signaled == kUnsignaled) { | 
 |               signaled = i; | 
 |               if (auto_reset) { | 
 |                 PeekPort(right->Name(), true); | 
 |               } | 
 |               dispatch_semaphore_signal(semaphore_ref); | 
 |             } | 
 |           }); | 
 |       source->Resume(); | 
 |       sources.push_back(std::move(source)); | 
 |     } | 
 |  | 
 |     dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER); | 
 |     DCHECK_NE(signaled, kUnsignaled); | 
 |     return signaled; | 
 |   } else { | 
 |     DCHECK_EQ(kPrimitive, PORT_SET); | 
 |  | 
 |     kern_return_t kr; | 
 |  | 
 |     mac::ScopedMachPortSet port_set; | 
 |     { | 
 |       mach_port_t name; | 
 |       kr = | 
 |           mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &name); | 
 |       MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate"; | 
 |       port_set.reset(name); | 
 |     } | 
 |  | 
 |     for (size_t i = 0; i < count; ++i) { | 
 |       kr = mach_port_insert_member(mach_task_self(), | 
 |                                    raw_waitables[i]->receive_right_->Name(), | 
 |                                    port_set.get()); | 
 |       MACH_CHECK(kr == KERN_SUCCESS, kr) << "index " << i; | 
 |     } | 
 |  | 
 |     mach_msg_empty_rcv_t msg{}; | 
 |     // Wait on the port set. Only specify space enough for the header, to | 
 |     // identify which port in the set is signaled. Otherwise, receiving from the | 
 |     // port set may dequeue a message for a manual-reset event object, which | 
 |     // would cause it to be reset. | 
 |     kr = mach_msg(&msg.header, | 
 |                   MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY, 0, | 
 |                   sizeof(msg.header), port_set.get(), 0, MACH_PORT_NULL); | 
 |     MACH_CHECK(kr == MACH_RCV_TOO_LARGE, kr) << "mach_msg"; | 
 |  | 
 |     for (size_t i = 0; i < count; ++i) { | 
 |       WaitableEvent* event = raw_waitables[i]; | 
 |       if (msg.header.msgh_local_port == event->receive_right_->Name()) { | 
 |         if (event->policy_ == ResetPolicy::AUTOMATIC) { | 
 |           // The message needs to be dequeued to reset the event. | 
 |           PeekPort(msg.header.msgh_local_port, true); | 
 |         } | 
 |         return i; | 
 |       } | 
 |     } | 
 |  | 
 |     NOTREACHED(); | 
 |     return 0; | 
 |   } | 
 | } | 
 |  | 
 | // static | 
 | bool WaitableEvent::PeekPort(mach_port_t port, bool dequeue) { | 
 |   if (dequeue) { | 
 |     mach_msg_empty_rcv_t msg{}; | 
 |     msg.header.msgh_local_port = port; | 
 |     kern_return_t kr = mach_msg(&msg.header, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0, | 
 |                                 sizeof(msg), port, 0, MACH_PORT_NULL); | 
 |     if (kr == KERN_SUCCESS) { | 
 |       return true; | 
 |     } else { | 
 |       MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg"; | 
 |       return false; | 
 |     } | 
 |   } else { | 
 |     mach_port_seqno_t seqno = 0; | 
 |     mach_msg_size_t size; | 
 |     mach_msg_id_t id; | 
 |     mach_msg_trailer_t trailer; | 
 |     mach_msg_type_number_t trailer_size = sizeof(trailer); | 
 |     kern_return_t kr = mach_port_peek( | 
 |         mach_task_self(), port, MACH_RCV_TRAILER_TYPE(MACH_RCV_TRAILER_NULL), | 
 |         &seqno, &size, &id, reinterpret_cast<mach_msg_trailer_info_t>(&trailer), | 
 |         &trailer_size); | 
 |     if (kr == KERN_SUCCESS) { | 
 |       return true; | 
 |     } else { | 
 |       MACH_CHECK(kr == KERN_FAILURE, kr) << "mach_port_peek"; | 
 |       return false; | 
 |     } | 
 |   } | 
 | } | 
 |  | 
 | WaitableEvent::ReceiveRight::ReceiveRight(mach_port_t name, | 
 |                                           bool create_slow_watch_list) | 
 |     : right_(name), | 
 |       slow_watch_list_(create_slow_watch_list ? new WatchList() : nullptr) {} | 
 |  | 
 | WaitableEvent::ReceiveRight::~ReceiveRight() = default; | 
 |  | 
 | WaitableEvent::ReceiveRight::WatchList::WatchList() = default; | 
 |  | 
 | WaitableEvent::ReceiveRight::WatchList::~WatchList() = default; | 
 |  | 
 | }  // namespace base |