|  | // Copyright 2018 Google Inc. All Rights Reserved. | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  |  | 
|  | #include "base/message_loop/message_pump_io_starboard.h" | 
|  |  | 
|  | #include "base/auto_reset.h" | 
|  | #include "base/compiler_specific.h" | 
|  | #include "base/logging.h" | 
|  | #include "base/observer_list.h" | 
|  | #include "base/posix/eintr_wrapper.h" | 
|  | #include "base/time/time.h" | 
|  | #include "starboard/common/socket.h" | 
|  | #include "starboard/socket_waiter.h" | 
|  |  | 
|  | namespace base { | 
|  |  | 
|  | MessagePumpIOStarboard::SocketWatcher::SocketWatcher() | 
|  | : interests_(kSbSocketWaiterInterestNone), | 
|  | socket_(kSbSocketInvalid), | 
|  | pump_(NULL), | 
|  | watcher_(NULL), | 
|  | weak_factory_(this) {} | 
|  |  | 
|  | MessagePumpIOStarboard::SocketWatcher::~SocketWatcher() { | 
|  | if (SbSocketIsValid(socket_)) { | 
|  | StopWatchingSocket(); | 
|  | } | 
|  | } | 
|  |  | 
|  | bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() { | 
|  | if (!SbSocketIsValid(socket_)) { | 
|  | // If this watcher is not watching anything, no-op and return success. | 
|  | return true; | 
|  | } | 
|  |  | 
|  | SbSocket socket = Release(); | 
|  | bool result = true; | 
|  | if (SbSocketIsValid(socket)) { | 
|  | DCHECK(pump_); | 
|  | result = pump_->StopWatching(socket); | 
|  | } | 
|  | pump_ = NULL; | 
|  | watcher_ = NULL; | 
|  | interests_ = kSbSocketWaiterInterestNone; | 
|  | return result; | 
|  | } | 
|  |  | 
|  | void MessagePumpIOStarboard::SocketWatcher::Init(SbSocket socket, | 
|  | bool persistent) { | 
|  | DCHECK(socket); | 
|  | DCHECK(!socket_); | 
|  | socket_ = socket; | 
|  | persistent_ = persistent; | 
|  | } | 
|  |  | 
|  | SbSocket MessagePumpIOStarboard::SocketWatcher::Release() { | 
|  | SbSocket socket = socket_; | 
|  | socket_ = kSbSocketInvalid; | 
|  | return socket; | 
|  | } | 
|  |  | 
|  | void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToRead( | 
|  | SbSocket socket, | 
|  | MessagePumpIOStarboard* pump) { | 
|  | if (!watcher_) | 
|  | return; | 
|  | pump->WillProcessIOEvent(); | 
|  | watcher_->OnSocketReadyToRead(socket); | 
|  | pump->DidProcessIOEvent(); | 
|  | } | 
|  |  | 
|  | void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToWrite( | 
|  | SbSocket socket, | 
|  | MessagePumpIOStarboard* pump) { | 
|  | if (!watcher_) | 
|  | return; | 
|  | pump->WillProcessIOEvent(); | 
|  | watcher_->OnSocketReadyToWrite(socket); | 
|  | pump->DidProcessIOEvent(); | 
|  | } | 
|  |  | 
|  | MessagePumpIOStarboard::MessagePumpIOStarboard() | 
|  | : keep_running_(true), | 
|  | in_run_(false), | 
|  | processed_io_events_(false), | 
|  | waiter_(SbSocketWaiterCreate()) {} | 
|  |  | 
|  | MessagePumpIOStarboard::~MessagePumpIOStarboard() { | 
|  | DCHECK(SbSocketWaiterIsValid(waiter_)); | 
|  | SbSocketWaiterDestroy(waiter_); | 
|  | } | 
|  |  | 
|  | bool MessagePumpIOStarboard::Watch(SbSocket socket, | 
|  | bool persistent, | 
|  | int mode, | 
|  | SocketWatcher* controller, | 
|  | Watcher* delegate) { | 
|  | DCHECK(SbSocketIsValid(socket)); | 
|  | DCHECK(controller); | 
|  | DCHECK(delegate); | 
|  | DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); | 
|  | // Watch should be called on the pump thread. It is not threadsafe, and your | 
|  | // watcher may never be registered. | 
|  | DCHECK_CALLED_ON_VALID_THREAD(watch_socket_caller_checker_); | 
|  |  | 
|  | int interests = kSbSocketWaiterInterestNone; | 
|  | if (mode & WATCH_READ) { | 
|  | interests |= kSbSocketWaiterInterestRead; | 
|  | } | 
|  | if (mode & WATCH_WRITE) { | 
|  | interests |= kSbSocketWaiterInterestWrite; | 
|  | } | 
|  |  | 
|  | SbSocket old_socket = controller->Release(); | 
|  | if (SbSocketIsValid(old_socket)) { | 
|  | // It's illegal to use this function to listen on 2 separate fds with the | 
|  | // same |controller|. | 
|  | if (old_socket != socket) { | 
|  | NOTREACHED() << "Sockets don't match" << old_socket << "!=" << socket; | 
|  | return false; | 
|  | } | 
|  |  | 
|  | // Make sure we don't pick up any funky internal masks. | 
|  | int old_interest_mask = | 
|  | controller->interests() & | 
|  | (kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite); | 
|  |  | 
|  | // Combine old/new event masks. | 
|  | interests |= old_interest_mask; | 
|  |  | 
|  | // Must disarm the event before we can reuse it. | 
|  | SbSocketWaiterRemove(waiter_, old_socket); | 
|  | } | 
|  |  | 
|  | // Set current interest mask and waiter for this event. | 
|  | if (!SbSocketWaiterAdd(waiter_, socket, controller, | 
|  | OnSocketWaiterNotification, interests, persistent)) { | 
|  | return false; | 
|  | } | 
|  |  | 
|  | controller->Init(socket, persistent); | 
|  | controller->set_watcher(delegate); | 
|  | controller->set_pump(this); | 
|  | controller->set_interests(interests); | 
|  | return true; | 
|  | } | 
|  |  | 
|  | bool MessagePumpIOStarboard::StopWatching(SbSocket socket) { | 
|  | return SbSocketWaiterRemove(waiter_, socket); | 
|  | } | 
|  |  | 
|  | void MessagePumpIOStarboard::AddIOObserver(IOObserver* obs) { | 
|  | io_observers_.AddObserver(obs); | 
|  | } | 
|  |  | 
|  | void MessagePumpIOStarboard::RemoveIOObserver(IOObserver* obs) { | 
|  | io_observers_.RemoveObserver(obs); | 
|  | } | 
|  |  | 
|  | // Reentrant! | 
|  | void MessagePumpIOStarboard::Run(Delegate* delegate) { | 
|  | AutoReset<bool> auto_reset_in_run(&in_run_, true); | 
|  |  | 
|  | for (;;) { | 
|  | bool did_work = delegate->DoWork(); | 
|  | if (!keep_running_) | 
|  | break; | 
|  |  | 
|  | // NOTE: We need to have a wake-up pending any time there is work queued, | 
|  | // and the MessageLoop only wakes up the pump when the work queue goes from | 
|  | // 0 tasks to 1 task. If any work is scheduled on this MessageLoop (from | 
|  | // another thread) anywhere in between the call to DoWork() above and the | 
|  | // call to SbSocketWaiterWaitTimed() below, SbSocketWaiterWaitTimed() will | 
|  | // consume a wake-up, but leave the work queued. This will cause the | 
|  | // blocking wait further below to hang forever, no matter how many more | 
|  | // items are added to the queue. To resolve this, if this wait consumes a | 
|  | // wake-up, we set did_work to true so we will jump back to the top of the | 
|  | // loop and call delegate->DoWork() before we decide to block. | 
|  |  | 
|  | SbSocketWaiterResult result = SbSocketWaiterWaitTimed(waiter_, 0); | 
|  | DCHECK_NE(kSbSocketWaiterResultInvalid, result); | 
|  | did_work |= | 
|  | (result == kSbSocketWaiterResultWokenUp) || processed_io_events_; | 
|  | processed_io_events_ = false; | 
|  | if (!keep_running_) | 
|  | break; | 
|  |  | 
|  | did_work |= delegate->DoDelayedWork(&delayed_work_time_); | 
|  | if (!keep_running_) | 
|  | break; | 
|  |  | 
|  | if (did_work) | 
|  | continue; | 
|  |  | 
|  | did_work = delegate->DoIdleWork(); | 
|  | if (!keep_running_) | 
|  | break; | 
|  |  | 
|  | if (did_work) | 
|  | continue; | 
|  |  | 
|  | if (delayed_work_time_.is_null()) { | 
|  | SbSocketWaiterWait(waiter_); | 
|  | } else { | 
|  | TimeDelta delay = delayed_work_time_ - TimeTicks::Now(); | 
|  | if (delay > TimeDelta()) { | 
|  | SbSocketWaiterWaitTimed(waiter_, delay.ToSbTime()); | 
|  | } else { | 
|  | // It looks like delayed_work_time_ indicates a time in the past, so we | 
|  | // need to call DoDelayedWork now. | 
|  | delayed_work_time_ = TimeTicks(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | keep_running_ = true; | 
|  | } | 
|  |  | 
|  | void MessagePumpIOStarboard::Quit() { | 
|  | DCHECK(in_run_); | 
|  | // Tell both the SbObjectWaiter and Run that they should break out of their | 
|  | // loops. | 
|  | keep_running_ = false; | 
|  | ScheduleWork(); | 
|  | } | 
|  |  | 
|  | void MessagePumpIOStarboard::ScheduleWork() { | 
|  | SbSocketWaiterWakeUp(waiter_); | 
|  | } | 
|  |  | 
|  | void MessagePumpIOStarboard::ScheduleDelayedWork( | 
|  | const TimeTicks& delayed_work_time) { | 
|  | // We know that we can't be blocked on Wait right now since this method can | 
|  | // only be called on the same thread as Run, so we only need to update our | 
|  | // record of how long to sleep when we do sleep. | 
|  | delayed_work_time_ = delayed_work_time; | 
|  | ScheduleWork(); | 
|  | } | 
|  |  | 
|  | void MessagePumpIOStarboard::WillProcessIOEvent() { | 
|  | for (IOObserver& observer : io_observers_) { | 
|  | observer.WillProcessIOEvent(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void MessagePumpIOStarboard::DidProcessIOEvent() { | 
|  | for (IOObserver& observer : io_observers_) { | 
|  | observer.DidProcessIOEvent(); | 
|  | } | 
|  | } | 
|  |  | 
|  | // static | 
|  | void MessagePumpIOStarboard::OnSocketWaiterNotification(SbSocketWaiter waiter, | 
|  | SbSocket socket, | 
|  | void* context, | 
|  | int ready_interests) { | 
|  | base::WeakPtr<SocketWatcher> controller = | 
|  | static_cast<SocketWatcher*>(context)->weak_factory_.GetWeakPtr(); | 
|  | DCHECK(controller.get()); | 
|  |  | 
|  | MessagePumpIOStarboard* pump = controller->pump(); | 
|  | pump->processed_io_events_ = true; | 
|  |  | 
|  | // If not persistent, the watch has been released at this point. | 
|  | if (!controller->persistent()) { | 
|  | controller->Release(); | 
|  | } | 
|  |  | 
|  | if (ready_interests & kSbSocketWaiterInterestWrite) { | 
|  | controller->OnSocketReadyToWrite(socket, pump); | 
|  | } | 
|  |  | 
|  | // Check |controller| in case it's been deleted previously. | 
|  | if (controller.get() && ready_interests & kSbSocketWaiterInterestRead) { | 
|  | controller->OnSocketReadyToRead(socket, pump); | 
|  | } | 
|  | } | 
|  |  | 
|  | }  // namespace base |