blob: bbd1707d89f8c2f2fdbbb126614b9f079ddd2947 [file] [log] [blame]
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "base/message_pump_io_starboard.h"
#include "base/auto_reset.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/memory/scoped_ptr.h"
#include "base/observer_list.h"
#include "base/posix/eintr_wrapper.h"
#include "base/time.h"
#include "starboard/socket.h"
#include "starboard/socket_waiter.h"
namespace base {
MessagePumpIOStarboard::SocketWatcher::SocketWatcher()
: socket_(kSbSocketInvalid),
pump_(NULL),
watcher_(NULL),
ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)) {}
MessagePumpIOStarboard::SocketWatcher::~SocketWatcher() {
if (SbSocketIsValid(socket_)) {
StopWatchingSocket();
}
}
bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() {
SbSocket socket = Release();
bool result = true;
if (SbSocketIsValid(socket)) {
DCHECK(pump_);
result = pump_->StopWatching(socket);
}
pump_ = NULL;
watcher_ = NULL;
return result;
}
void MessagePumpIOStarboard::SocketWatcher::Init(SbSocket socket,
bool persistent) {
DCHECK(socket);
DCHECK(!socket_);
socket_ = socket;
persistent_ = persistent;
}
SbSocket MessagePumpIOStarboard::SocketWatcher::Release() {
SbSocket socket = socket_;
socket_ = kSbSocketInvalid;
return socket;
}
void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToRead(
SbSocket socket,
MessagePumpIOStarboard* pump) {
if (!watcher_)
return;
pump->WillProcessIOEvent();
watcher_->OnSocketReadyToRead(socket);
pump->DidProcessIOEvent();
}
void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToWrite(
SbSocket socket,
MessagePumpIOStarboard* pump) {
if (!watcher_)
return;
pump->WillProcessIOEvent();
watcher_->OnSocketReadyToWrite(socket);
pump->DidProcessIOEvent();
}
MessagePumpIOStarboard::MessagePumpIOStarboard()
: keep_running_(true),
in_run_(false),
processed_io_events_(false),
waiter_(SbSocketWaiterCreate()) {}
MessagePumpIOStarboard::~MessagePumpIOStarboard() {
DCHECK(SbSocketWaiterIsValid(waiter_));
SbSocketWaiterDestroy(waiter_);
}
bool MessagePumpIOStarboard::Watch(SbSocket socket,
bool persistent,
int mode,
SocketWatcher* controller,
Watcher* delegate) {
DCHECK(SbSocketIsValid(socket));
DCHECK(controller);
DCHECK(delegate);
DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
// Watch should be called on the pump thread. It is not threadsafe, and your
// watcher may never be registered.
DCHECK(watch_socket_caller_checker_.CalledOnValidThread());
int interests = kSbSocketWaiterInterestNone;
if (mode & WATCH_READ) {
interests |= kSbSocketWaiterInterestRead;
}
if (mode & WATCH_WRITE) {
interests |= kSbSocketWaiterInterestWrite;
}
SbSocket old_socket = controller->Release();
if (SbSocketIsValid(old_socket)) {
// It's illegal to use this function to listen on 2 separate fds with the
// same |controller|.
if (old_socket != socket) {
NOTREACHED() << "Sockets don't match" << old_socket << "!=" << socket;
return false;
}
// Make sure we don't pick up any funky internal masks.
int old_interest_mask =
controller->interests() &
(kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite);
// Combine old/new event masks.
interests |= old_interest_mask;
// Must disarm the event before we can reuse it.
SbSocketWaiterRemove(waiter_, old_socket);
}
// Set current interest mask and waiter for this event.
bool result =
SbSocketWaiterAdd(waiter_, socket, controller, OnSocketWaiterNotification,
interests, persistent);
DCHECK(result);
controller->Init(socket, persistent);
controller->set_watcher(delegate);
controller->set_pump(this);
return true;
}
bool MessagePumpIOStarboard::StopWatching(SbSocket socket) {
return SbSocketWaiterRemove(waiter_, socket);
}
void MessagePumpIOStarboard::AddIOObserver(IOObserver* obs) {
io_observers_.AddObserver(obs);
}
void MessagePumpIOStarboard::RemoveIOObserver(IOObserver* obs) {
io_observers_.RemoveObserver(obs);
}
// Reentrant!
void MessagePumpIOStarboard::Run(Delegate* delegate) {
DCHECK(keep_running_) << "Quit must have been called outside of Run!";
AutoReset<bool> auto_reset_in_run(&in_run_, true);
for (;;) {
bool did_work = delegate->DoWork();
if (!keep_running_)
break;
// NOTE: We need to have a wake-up pending any time there is work queued,
// and the MessageLoop only wakes up the pump when the work queue goes from
// 0 tasks to 1 task. If any work is scheduled on this MessageLoop (from
// another thread) anywhere in between the call to DoWork() above and the
// call to SbSocketWaiterWaitTimed() below, SbSocketWaiterWaitTimed() will
// consume a wake-up, but leave the work queued. This will cause the
// blocking wait further below to hang forever, no matter how many more
// items are added to the queue. To resolve this, if this wait consumes a
// wake-up, we set did_work to true so we will jump back to the top of the
// loop and call delegate->DoWork() before we decide to block.
SbSocketWaiterResult result = SbSocketWaiterWaitTimed(waiter_, 0);
DCHECK_NE(kSbSocketWaiterResultInvalid, result);
did_work |=
(result == kSbSocketWaiterResultWokenUp) || processed_io_events_;
processed_io_events_ = false;
if (!keep_running_)
break;
// Let's play catchup on all delayed work before we loop. This fixes bug
// #5534709 by processing a large number of short delayed tasks quickly
// before looping back to process non-delayed tasks (like paint).
bool did_delayed_work = false;
do {
did_delayed_work = delegate->DoDelayedWork(&delayed_work_time_);
did_work |= did_delayed_work;
} while (did_delayed_work && keep_running_);
if (!keep_running_)
break;
if (did_work)
continue;
did_work = delegate->DoIdleWork();
if (!keep_running_)
break;
if (did_work)
continue;
if (delayed_work_time_.is_null()) {
SbSocketWaiterWait(waiter_);
} else {
TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
if (delay > TimeDelta()) {
SbSocketWaiterWaitTimed(waiter_, delay.ToSbTime());
} else {
// It looks like delayed_work_time_ indicates a time in the past, so we
// need to call DoDelayedWork now.
delayed_work_time_ = TimeTicks();
}
}
}
keep_running_ = true;
}
void MessagePumpIOStarboard::Quit() {
DCHECK(in_run_);
// Tell both the SbObjectWaiter and Run that they should break out of their
// loops.
keep_running_ = false;
ScheduleWork();
}
void MessagePumpIOStarboard::ScheduleWork() {
SbSocketWaiterWakeUp(waiter_);
}
void MessagePumpIOStarboard::ScheduleDelayedWork(
const TimeTicks& delayed_work_time) {
// We know that we can't be blocked on Wait right now since this method can
// only be called on the same thread as Run, so we only need to update our
// record of how long to sleep when we do sleep.
delayed_work_time_ = delayed_work_time;
ScheduleWork();
}
void MessagePumpIOStarboard::WillProcessIOEvent() {
FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent());
}
void MessagePumpIOStarboard::DidProcessIOEvent() {
FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent());
}
// static
void MessagePumpIOStarboard::OnSocketWaiterNotification(SbSocketWaiter waiter,
SbSocket socket,
void* context,
int ready_interests) {
base::WeakPtr<SocketWatcher> controller =
static_cast<SocketWatcher*>(context)->weak_factory_.GetWeakPtr();
DCHECK(controller.get());
MessagePumpIOStarboard* pump = controller->pump();
pump->processed_io_events_ = true;
// If not persistent, the watch has been released at this point.
if (!controller->persistent()) {
controller->Release();
}
if (ready_interests & kSbSocketWaiterInterestWrite) {
controller->OnSocketReadyToWrite(socket, pump);
}
// Check |controller| in case it's been deleted previously.
if (controller.get() && ready_interests & kSbSocketWaiterInterestRead) {
controller->OnSocketReadyToRead(socket, pump);
}
}
} // namespace base