|  | // Copyright 2015 Google Inc. All Rights Reserved. | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  |  | 
|  | #include "starboard/shared/libevent/socket_waiter_internal.h" | 
|  |  | 
|  | #include <errno.h> | 
|  | #include <fcntl.h> | 
|  | #include <sys/time.h> | 
|  | #include <unistd.h> | 
|  |  | 
|  | #include <map> | 
|  | #include <utility> | 
|  |  | 
|  | #include "starboard/log.h" | 
|  | #include "starboard/shared/posix/handle_eintr.h" | 
|  | #include "starboard/shared/posix/set_non_blocking_internal.h" | 
|  | #include "starboard/shared/posix/socket_internal.h" | 
|  | #include "starboard/shared/posix/time_internal.h" | 
|  | #include "starboard/thread.h" | 
|  | #include "third_party/libevent/event.h" | 
|  |  | 
|  | namespace sbposix = starboard::shared::posix; | 
|  |  | 
|  | namespace { | 
|  | // We do this because it's our style to use explicitly-sized ints when not just | 
|  | // using int, but libevent uses shorts explicitly in its interface. | 
|  | SB_COMPILE_ASSERT(sizeof(int16_t) == sizeof(short),  // NOLINT[runtime/int] | 
|  | Short_is_not_int16); | 
|  |  | 
|  | SbSocketAddress GetIpv4Localhost() { | 
|  | SbSocketAddress address = {0}; | 
|  | address.type = kSbSocketAddressTypeIpv4; | 
|  | address.port = 0; | 
|  | address.address[0] = 127; | 
|  | address.address[3] = 1; | 
|  | return address; | 
|  | } | 
|  |  | 
|  | SbSocket AcceptBySpinning(SbSocket server_socket, SbTime timeout) { | 
|  | SbTimeMonotonic start = SbTimeGetMonotonicNow(); | 
|  | while (true) { | 
|  | SbSocket accepted_socket = SbSocketAccept(server_socket); | 
|  | if (SbSocketIsValid(accepted_socket)) { | 
|  | return accepted_socket; | 
|  | } | 
|  |  | 
|  | // If we didn't get a socket, it should be pending. | 
|  | SB_DCHECK(SbSocketGetLastError(server_socket) == kSbSocketPending); | 
|  |  | 
|  | // Check if we have passed our timeout. | 
|  | if (SbTimeGetMonotonicNow() - start >= timeout) { | 
|  | break; | 
|  | } | 
|  |  | 
|  | // Just being polite. | 
|  | SbThreadYield(); | 
|  | } | 
|  |  | 
|  | return kSbSocketInvalid; | 
|  | } | 
|  |  | 
|  | void GetSocketPipe(SbSocket* client_socket, SbSocket* server_socket) { | 
|  | int result; | 
|  | SbSocketError sb_socket_result; | 
|  | const SbTimeMonotonic kTimeout = kSbTimeSecond / 15; | 
|  | SbSocketAddress address = GetIpv4Localhost(); | 
|  |  | 
|  | // Setup a listening socket. | 
|  | SbSocket listen_socket = | 
|  | SbSocketCreate(kSbSocketAddressTypeIpv4, kSbSocketProtocolTcp); | 
|  | SB_DCHECK(SbSocketIsValid(listen_socket)); | 
|  | result = SbSocketSetReuseAddress(listen_socket, true); | 
|  | SB_DCHECK(result); | 
|  | sb_socket_result = SbSocketBind(listen_socket, &address); | 
|  | SB_DCHECK(sb_socket_result == kSbSocketOk); | 
|  | sb_socket_result = SbSocketListen(listen_socket); | 
|  | SB_DCHECK(sb_socket_result == kSbSocketOk); | 
|  | // Update the address after a free port has been assigned. | 
|  | SbSocketGetLocalAddress(listen_socket, &address); | 
|  |  | 
|  | // Create a new socket to connect to the listening socket. | 
|  | *client_socket = | 
|  | SbSocketCreate(kSbSocketAddressTypeIpv4, kSbSocketProtocolTcp); | 
|  | SB_DCHECK(SbSocketIsValid(*client_socket)); | 
|  | // This connect will probably return pending, but we'll assume it will connect | 
|  | // eventually. | 
|  | sb_socket_result = SbSocketConnect(*client_socket, &address); | 
|  | SB_DCHECK(sb_socket_result == kSbSocketOk || | 
|  | sb_socket_result == kSbSocketPending); | 
|  |  | 
|  | // Spin until the accept happens (or we get impatient). | 
|  | *server_socket = AcceptBySpinning(listen_socket, kTimeout); | 
|  | SB_DCHECK(SbSocketIsValid(*server_socket)); | 
|  |  | 
|  | result = SbSocketDestroy(listen_socket); | 
|  | SB_DCHECK(result); | 
|  | } | 
|  | }  // namespace | 
|  |  | 
|  | SbSocketWaiterPrivate::SbSocketWaiterPrivate() | 
|  | : thread_(SbThreadGetCurrent()), | 
|  | base_(event_base_new()), | 
|  | waiting_(false), | 
|  | woken_up_(false) { | 
|  | #if SB_HAS(PIPE) | 
|  | int fds[2]; | 
|  | int result = pipe(fds); | 
|  | SB_DCHECK(result == 0); | 
|  |  | 
|  | wakeup_read_fd_ = fds[0]; | 
|  | result = sbposix::SetNonBlocking(wakeup_read_fd_); | 
|  | SB_DCHECK(result); | 
|  |  | 
|  | wakeup_write_fd_ = fds[1]; | 
|  | result = sbposix::SetNonBlocking(wakeup_write_fd_); | 
|  | SB_DCHECK(result); | 
|  | #else | 
|  | GetSocketPipe(&client_socket_, &server_socket_); | 
|  |  | 
|  | // Set TCP_NODELAY on the server socket, so it immediately sends its tiny | 
|  | // payload without waiting for more data. | 
|  | SbSocketSetTcpNoDelay(server_socket_, true); | 
|  |  | 
|  | wakeup_read_fd_ = client_socket_->socket_fd; | 
|  | wakeup_write_fd_ = server_socket_->socket_fd; | 
|  | #endif | 
|  |  | 
|  | event_set(&wakeup_event_, wakeup_read_fd_, EV_READ | EV_PERSIST, | 
|  | &SbSocketWaiterPrivate::LibeventWakeUpCallback, this); | 
|  | event_base_set(base_, &wakeup_event_); | 
|  | event_add(&wakeup_event_, NULL); | 
|  | } | 
|  |  | 
|  | SbSocketWaiterPrivate::~SbSocketWaiterPrivate() { | 
|  | WaiteesMap::iterator it = waitees_.begin(); | 
|  | while (it != waitees_.end()) { | 
|  | Waitee* waitee = it->second; | 
|  | ++it;  // Increment before removal. | 
|  | Remove(waitee->socket); | 
|  | } | 
|  |  | 
|  | event_del(&wakeup_event_); | 
|  | event_base_free(base_); | 
|  |  | 
|  | #if SB_HAS(PIPE) | 
|  | close(wakeup_read_fd_); | 
|  | close(wakeup_write_fd_); | 
|  | #else | 
|  | SbSocketDestroy(server_socket_); | 
|  | SbSocketDestroy(client_socket_); | 
|  | #endif | 
|  | } | 
|  |  | 
|  | bool SbSocketWaiterPrivate::Add(SbSocket socket, | 
|  | void* context, | 
|  | SbSocketWaiterCallback callback, | 
|  | int interests, | 
|  | bool persistent) { | 
|  | SB_DCHECK(SbThreadIsCurrent(thread_)); | 
|  |  | 
|  | if (!SbSocketIsValid(socket)) { | 
|  | SB_DLOG(ERROR) << __FUNCTION__ << ": Socket (" << socket << ") is invalid."; | 
|  | return false; | 
|  | } | 
|  |  | 
|  | if (!interests) { | 
|  | SB_DLOG(ERROR) << __FUNCTION__ << ": No interests provided."; | 
|  | return false; | 
|  | } | 
|  |  | 
|  | // The policy is not to add a socket to a waiter if it is registered with | 
|  | // another waiter. | 
|  |  | 
|  | // TODO: If anyone were to want to add a socket to a different waiter, | 
|  | // it would probably be another thread, so doing this check without locking is | 
|  | // probably wrong. But, it is also a pain, and, at this precise moment, socket | 
|  | // access is all going to come from one I/O thread anyway, and there will only | 
|  | // be one waiter. | 
|  | if (SbSocketWaiterIsValid(socket->waiter)) { | 
|  | if (socket->waiter == this) { | 
|  | SB_DLOG(ERROR) << __FUNCTION__ << ": Socket already has this waiter (" | 
|  | << this << ")."; | 
|  | } else { | 
|  | SB_DLOG(ERROR) << __FUNCTION__ << ": Socket already has waiter (" | 
|  | << socket->waiter << ", this=" << this << ")."; | 
|  | } | 
|  | return false; | 
|  | } | 
|  |  | 
|  | Waitee* waitee = | 
|  | new Waitee(this, socket, context, callback, interests, persistent); | 
|  | AddWaitee(waitee); | 
|  |  | 
|  | int16_t events = 0; | 
|  | if (interests & kSbSocketWaiterInterestRead) { | 
|  | events |= EV_READ; | 
|  | } | 
|  |  | 
|  | if (interests & kSbSocketWaiterInterestWrite) { | 
|  | events |= EV_WRITE; | 
|  | } | 
|  |  | 
|  | if (persistent) { | 
|  | events |= EV_PERSIST; | 
|  | } | 
|  |  | 
|  | event_set(&waitee->event, socket->socket_fd, events, | 
|  | &SbSocketWaiterPrivate::LibeventSocketCallback, waitee); | 
|  | event_base_set(base_, &waitee->event); | 
|  | socket->waiter = this; | 
|  | event_add(&waitee->event, NULL); | 
|  | return true; | 
|  | } | 
|  |  | 
|  | bool SbSocketWaiterPrivate::Remove(SbSocket socket) { | 
|  | SB_DCHECK(SbThreadIsCurrent(thread_)); | 
|  | if (!SbSocketIsValid(socket)) { | 
|  | SB_DLOG(ERROR) << __FUNCTION__ << ": Socket (" << socket << ") is invalid."; | 
|  | return false; | 
|  | } | 
|  |  | 
|  | if (socket->waiter != this) { | 
|  | SB_DLOG(ERROR) << __FUNCTION__ << ": Socket (" << socket << ") " | 
|  | << "is watched by Waiter (" << socket->waiter << "), " | 
|  | << "not this Waiter (" << this << ")."; | 
|  | SB_DSTACK(ERROR); | 
|  | return false; | 
|  | } | 
|  |  | 
|  | Waitee* waitee = RemoveWaitee(socket); | 
|  | if (!waitee) { | 
|  | return false; | 
|  | } | 
|  |  | 
|  | event_del(&waitee->event); | 
|  | socket->waiter = kSbSocketWaiterInvalid; | 
|  |  | 
|  | delete waitee; | 
|  | return true; | 
|  | } | 
|  |  | 
|  | void SbSocketWaiterPrivate::Wait() { | 
|  | SB_DCHECK(SbThreadIsCurrent(thread_)); | 
|  |  | 
|  | // We basically wait for the largest amount of time to achieve an indefinite | 
|  | // block. | 
|  | WaitTimed(kSbTimeMax); | 
|  | } | 
|  |  | 
|  | SbSocketWaiterResult SbSocketWaiterPrivate::WaitTimed(SbTime duration) { | 
|  | SB_DCHECK(SbThreadIsCurrent(thread_)); | 
|  |  | 
|  | // The way to do this is apparently to create a timeout event, call WakeUp | 
|  | // inside that callback, and then just do a normal wait. | 
|  | struct event event; | 
|  | timeout_set(&event, &SbSocketWaiterPrivate::LibeventTimeoutCallback, this); | 
|  | event_base_set(base_, &event); | 
|  |  | 
|  | struct timeval tv; | 
|  | ToTimevalDuration(duration, &tv); | 
|  | timeout_add(&event, &tv); | 
|  |  | 
|  | waiting_ = true; | 
|  | event_base_loop(base_, 0); | 
|  | waiting_ = false; | 
|  |  | 
|  | SbSocketWaiterResult result = | 
|  | woken_up_ ? kSbSocketWaiterResultWokenUp : kSbSocketWaiterResultTimedOut; | 
|  | woken_up_ = false; | 
|  |  | 
|  | // We clean this up, in case we were bewakened early, to prevent a suprious | 
|  | // wake-up later. | 
|  | timeout_del(&event); | 
|  |  | 
|  | return result; | 
|  | } | 
|  |  | 
|  | void SbSocketWaiterPrivate::WakeUp(bool timeout) { | 
|  | // We may be calling from a separate thread, so we have to be clever. The | 
|  | // version of libevent we are using (14.x) does not really do thread-safety, | 
|  | // despite the documentation that says otherwise. But, sending a byte through | 
|  | // a local pipe gets the job done safely. | 
|  | char buf = timeout ? 0 : 1; | 
|  | int bytes_written = HANDLE_EINTR(write(wakeup_write_fd_, &buf, 1)); | 
|  | SB_DCHECK(bytes_written == 1 || errno == EAGAIN) | 
|  | << "[bytes_written:" << bytes_written << "] [errno:" << errno << "]"; | 
|  | } | 
|  |  | 
|  | // static | 
|  | void SbSocketWaiterPrivate::LibeventSocketCallback(int /*fd*/, | 
|  | int16_t event, | 
|  | void* context) { | 
|  | Waitee* waitee = reinterpret_cast<Waitee*>(context); | 
|  | waitee->waiter->HandleSignal(waitee, event); | 
|  | } | 
|  |  | 
|  | // static | 
|  | void SbSocketWaiterPrivate::LibeventTimeoutCallback(int /*fd*/, | 
|  | int16_t /*event*/, | 
|  | void* context) { | 
|  | reinterpret_cast<SbSocketWaiter>(context)->WakeUp(true); | 
|  | } | 
|  |  | 
|  | // static | 
|  | void SbSocketWaiterPrivate::LibeventWakeUpCallback(int /*fd*/, | 
|  | int16_t /*event*/, | 
|  | void* context) { | 
|  | reinterpret_cast<SbSocketWaiter>(context)->HandleWakeUpRead(); | 
|  | } | 
|  |  | 
|  | void SbSocketWaiterPrivate::HandleSignal(Waitee* waitee, | 
|  | short events) {  // NOLINT[runtime/int] | 
|  | int interests = 0; | 
|  | if (events & EV_READ) { | 
|  | interests |= kSbSocketWaiterInterestRead; | 
|  | } | 
|  |  | 
|  | if (events & EV_WRITE) { | 
|  | interests |= kSbSocketWaiterInterestWrite; | 
|  | } | 
|  |  | 
|  | // Remove the non-persistent waitee before calling the callback, so that we | 
|  | // can add another waitee in the callback if we need to. This is also why we | 
|  | // copy all the fields we need out of waitee. | 
|  | SbSocket socket = waitee->socket; | 
|  | void* context = waitee->context; | 
|  | SbSocketWaiterCallback callback = waitee->callback; | 
|  | if (!waitee->persistent) { | 
|  | Remove(waitee->socket); | 
|  | } | 
|  |  | 
|  | callback(this, socket, context, interests); | 
|  | } | 
|  |  | 
|  | void SbSocketWaiterPrivate::HandleWakeUpRead() { | 
|  | SB_DCHECK(waiting_); | 
|  | // Remove and discard the wakeup byte. | 
|  | char buf; | 
|  | int bytes_read = HANDLE_EINTR(read(wakeup_read_fd_, &buf, 1)); | 
|  | SB_DCHECK(bytes_read == 1); | 
|  | if (buf != 0) { | 
|  | woken_up_ = true; | 
|  | } | 
|  | event_base_loopbreak(base_); | 
|  | } | 
|  |  | 
|  | void SbSocketWaiterPrivate::AddWaitee(Waitee* waitee) { | 
|  | waitees_.insert(std::make_pair(waitee->socket, waitee)); | 
|  | } | 
|  |  | 
|  | SbSocketWaiterPrivate::Waitee* SbSocketWaiterPrivate::GetWaitee( | 
|  | SbSocket socket) { | 
|  | WaiteesMap::iterator it = waitees_.find(socket); | 
|  | if (it == waitees_.end()) { | 
|  | return NULL; | 
|  | } | 
|  |  | 
|  | return it->second; | 
|  | } | 
|  |  | 
|  | SbSocketWaiterPrivate::Waitee* SbSocketWaiterPrivate::RemoveWaitee( | 
|  | SbSocket socket) { | 
|  | WaiteesMap::iterator it = waitees_.find(socket); | 
|  | if (it == waitees_.end()) { | 
|  | return NULL; | 
|  | } | 
|  |  | 
|  | Waitee* result = it->second; | 
|  | waitees_.erase(it); | 
|  | return result; | 
|  | } |