| // Copyright 2015 Google Inc. All Rights Reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "starboard/shared/libevent/socket_waiter_internal.h" |
| |
| #include <errno.h> |
| #include <fcntl.h> |
| #include <sys/time.h> |
| #include <unistd.h> |
| |
| #include <map> |
| #include <utility> |
| |
| #include "starboard/log.h" |
| #include "starboard/shared/posix/handle_eintr.h" |
| #include "starboard/shared/posix/set_non_blocking_internal.h" |
| #include "starboard/shared/posix/socket_internal.h" |
| #include "starboard/shared/posix/time_internal.h" |
| #include "starboard/thread.h" |
| #include "third_party/libevent/event.h" |
| |
| namespace sbposix = starboard::shared::posix; |
| |
| namespace { |
| // We do this because it's our style to use explicitly-sized ints when not just |
| // using int, but libevent uses shorts explicitly in its interface. |
| SB_COMPILE_ASSERT(sizeof(int16_t) == sizeof(short), // NOLINT[runtime/int] |
| Short_is_not_int16); |
| |
| SbSocketAddress GetIpv4Localhost() { |
| SbSocketAddress address = {0}; |
| address.type = kSbSocketAddressTypeIpv4; |
| address.port = 0; |
| address.address[0] = 127; |
| address.address[3] = 1; |
| return address; |
| } |
| |
| SbSocket AcceptBySpinning(SbSocket server_socket, SbTime timeout) { |
| SbTimeMonotonic start = SbTimeGetMonotonicNow(); |
| while (true) { |
| SbSocket accepted_socket = SbSocketAccept(server_socket); |
| if (SbSocketIsValid(accepted_socket)) { |
| return accepted_socket; |
| } |
| |
| // If we didn't get a socket, it should be pending. |
| SB_DCHECK(SbSocketGetLastError(server_socket) == kSbSocketPending); |
| |
| // Check if we have passed our timeout. |
| if (SbTimeGetMonotonicNow() - start >= timeout) { |
| break; |
| } |
| |
| // Just being polite. |
| SbThreadYield(); |
| } |
| |
| return kSbSocketInvalid; |
| } |
| |
| void GetSocketPipe(SbSocket* client_socket, SbSocket* server_socket) { |
| int result; |
| SbSocketError sb_socket_result; |
| const SbTimeMonotonic kTimeout = kSbTimeSecond / 15; |
| SbSocketAddress address = GetIpv4Localhost(); |
| |
| // Setup a listening socket. |
| SbSocket listen_socket = |
| SbSocketCreate(kSbSocketAddressTypeIpv4, kSbSocketProtocolTcp); |
| SB_DCHECK(SbSocketIsValid(listen_socket)); |
| result = SbSocketSetReuseAddress(listen_socket, true); |
| SB_DCHECK(result); |
| sb_socket_result = SbSocketBind(listen_socket, &address); |
| SB_DCHECK(sb_socket_result == kSbSocketOk); |
| sb_socket_result = SbSocketListen(listen_socket); |
| SB_DCHECK(sb_socket_result == kSbSocketOk); |
| // Update the address after a free port has been assigned. |
| SbSocketGetLocalAddress(listen_socket, &address); |
| |
| // Create a new socket to connect to the listening socket. |
| *client_socket = |
| SbSocketCreate(kSbSocketAddressTypeIpv4, kSbSocketProtocolTcp); |
| SB_DCHECK(SbSocketIsValid(*client_socket)); |
| // This connect will probably return pending, but we'll assume it will connect |
| // eventually. |
| sb_socket_result = SbSocketConnect(*client_socket, &address); |
| SB_DCHECK(sb_socket_result == kSbSocketOk || |
| sb_socket_result == kSbSocketPending); |
| |
| // Spin until the accept happens (or we get impatient). |
| *server_socket = AcceptBySpinning(listen_socket, kTimeout); |
| SB_DCHECK(SbSocketIsValid(*server_socket)); |
| |
| result = SbSocketDestroy(listen_socket); |
| SB_DCHECK(result); |
| } |
| } // namespace |
| |
| SbSocketWaiterPrivate::SbSocketWaiterPrivate() |
| : thread_(SbThreadGetCurrent()), |
| base_(event_base_new()), |
| waiting_(false), |
| woken_up_(false) { |
| #if SB_HAS(PIPE) |
| int fds[2]; |
| int result = pipe(fds); |
| SB_DCHECK(result == 0); |
| |
| wakeup_read_fd_ = fds[0]; |
| result = sbposix::SetNonBlocking(wakeup_read_fd_); |
| SB_DCHECK(result); |
| |
| wakeup_write_fd_ = fds[1]; |
| result = sbposix::SetNonBlocking(wakeup_write_fd_); |
| SB_DCHECK(result); |
| #else |
| GetSocketPipe(&client_socket_, &server_socket_); |
| |
| // Set TCP_NODELAY on the server socket, so it immediately sends its tiny |
| // payload without waiting for more data. |
| SbSocketSetTcpNoDelay(server_socket_, true); |
| |
| wakeup_read_fd_ = client_socket_->socket_fd; |
| wakeup_write_fd_ = server_socket_->socket_fd; |
| #endif |
| |
| event_set(&wakeup_event_, wakeup_read_fd_, EV_READ | EV_PERSIST, |
| &SbSocketWaiterPrivate::LibeventWakeUpCallback, this); |
| event_base_set(base_, &wakeup_event_); |
| event_add(&wakeup_event_, NULL); |
| } |
| |
| SbSocketWaiterPrivate::~SbSocketWaiterPrivate() { |
| WaiteesMap::iterator it = waitees_.begin(); |
| while (it != waitees_.end()) { |
| Waitee* waitee = it->second; |
| ++it; // Increment before removal. |
| Remove(waitee->socket); |
| } |
| |
| event_del(&wakeup_event_); |
| event_base_free(base_); |
| |
| #if SB_HAS(PIPE) |
| close(wakeup_read_fd_); |
| close(wakeup_write_fd_); |
| #else |
| SbSocketDestroy(server_socket_); |
| SbSocketDestroy(client_socket_); |
| #endif |
| } |
| |
| bool SbSocketWaiterPrivate::Add(SbSocket socket, |
| void* context, |
| SbSocketWaiterCallback callback, |
| int interests, |
| bool persistent) { |
| SB_DCHECK(SbThreadIsCurrent(thread_)); |
| |
| if (!SbSocketIsValid(socket)) { |
| SB_DLOG(ERROR) << __FUNCTION__ << ": Socket (" << socket << ") is invalid."; |
| return false; |
| } |
| |
| if (!interests) { |
| SB_DLOG(ERROR) << __FUNCTION__ << ": No interests provided."; |
| return false; |
| } |
| |
| // The policy is not to add a socket to a waiter if it is registered with |
| // another waiter. |
| |
| // TODO: If anyone were to want to add a socket to a different waiter, |
| // it would probably be another thread, so doing this check without locking is |
| // probably wrong. But, it is also a pain, and, at this precise moment, socket |
| // access is all going to come from one I/O thread anyway, and there will only |
| // be one waiter. |
| if (SbSocketWaiterIsValid(socket->waiter)) { |
| if (socket->waiter == this) { |
| SB_DLOG(ERROR) << __FUNCTION__ << ": Socket already has this waiter (" |
| << this << ")."; |
| } else { |
| SB_DLOG(ERROR) << __FUNCTION__ << ": Socket already has waiter (" |
| << socket->waiter << ", this=" << this << ")."; |
| } |
| return false; |
| } |
| |
| Waitee* waitee = |
| new Waitee(this, socket, context, callback, interests, persistent); |
| AddWaitee(waitee); |
| |
| int16_t events = 0; |
| if (interests & kSbSocketWaiterInterestRead) { |
| events |= EV_READ; |
| } |
| |
| if (interests & kSbSocketWaiterInterestWrite) { |
| events |= EV_WRITE; |
| } |
| |
| if (persistent) { |
| events |= EV_PERSIST; |
| } |
| |
| event_set(&waitee->event, socket->socket_fd, events, |
| &SbSocketWaiterPrivate::LibeventSocketCallback, waitee); |
| event_base_set(base_, &waitee->event); |
| socket->waiter = this; |
| event_add(&waitee->event, NULL); |
| return true; |
| } |
| |
| bool SbSocketWaiterPrivate::Remove(SbSocket socket) { |
| SB_DCHECK(SbThreadIsCurrent(thread_)); |
| if (!SbSocketIsValid(socket)) { |
| SB_DLOG(ERROR) << __FUNCTION__ << ": Socket (" << socket << ") is invalid."; |
| return false; |
| } |
| |
| if (socket->waiter != this) { |
| SB_DLOG(ERROR) << __FUNCTION__ << ": Socket (" << socket << ") " |
| << "is watched by Waiter (" << socket->waiter << "), " |
| << "not this Waiter (" << this << ")."; |
| SB_DSTACK(ERROR); |
| return false; |
| } |
| |
| Waitee* waitee = RemoveWaitee(socket); |
| if (!waitee) { |
| return false; |
| } |
| |
| event_del(&waitee->event); |
| socket->waiter = kSbSocketWaiterInvalid; |
| |
| delete waitee; |
| return true; |
| } |
| |
| void SbSocketWaiterPrivate::Wait() { |
| SB_DCHECK(SbThreadIsCurrent(thread_)); |
| |
| // We basically wait for the largest amount of time to achieve an indefinite |
| // block. |
| WaitTimed(kSbTimeMax); |
| } |
| |
| SbSocketWaiterResult SbSocketWaiterPrivate::WaitTimed(SbTime duration) { |
| SB_DCHECK(SbThreadIsCurrent(thread_)); |
| |
| // The way to do this is apparently to create a timeout event, call WakeUp |
| // inside that callback, and then just do a normal wait. |
| struct event event; |
| timeout_set(&event, &SbSocketWaiterPrivate::LibeventTimeoutCallback, this); |
| event_base_set(base_, &event); |
| |
| if (duration < kSbTimeMax) { |
| struct timeval tv; |
| ToTimevalDuration(duration, &tv); |
| timeout_add(&event, &tv); |
| } |
| |
| waiting_ = true; |
| event_base_loop(base_, 0); |
| waiting_ = false; |
| |
| SbSocketWaiterResult result = |
| woken_up_ ? kSbSocketWaiterResultWokenUp : kSbSocketWaiterResultTimedOut; |
| woken_up_ = false; |
| |
| if (duration < kSbTimeMax) { |
| // We clean this up, in case we were awakened early, to prevent a spurious |
| // wake-up later. |
| timeout_del(&event); |
| } |
| |
| return result; |
| } |
| |
| void SbSocketWaiterPrivate::WakeUp(bool timeout) { |
| // We may be calling from a separate thread, so we have to be clever. The |
| // version of libevent we are using (14.x) does not really do thread-safety, |
| // despite the documentation that says otherwise. But, sending a byte through |
| // a local pipe gets the job done safely. |
| char buf = timeout ? 0 : 1; |
| int bytes_written = HANDLE_EINTR(write(wakeup_write_fd_, &buf, 1)); |
| SB_DCHECK(bytes_written == 1 || errno == EAGAIN) |
| << "[bytes_written:" << bytes_written << "] [errno:" << errno << "]"; |
| } |
| |
| // static |
| void SbSocketWaiterPrivate::LibeventSocketCallback(int /*fd*/, |
| int16_t event, |
| void* context) { |
| Waitee* waitee = reinterpret_cast<Waitee*>(context); |
| waitee->waiter->HandleSignal(waitee, event); |
| } |
| |
| // static |
| void SbSocketWaiterPrivate::LibeventTimeoutCallback(int /*fd*/, |
| int16_t /*event*/, |
| void* context) { |
| reinterpret_cast<SbSocketWaiter>(context)->WakeUp(true); |
| } |
| |
| // static |
| void SbSocketWaiterPrivate::LibeventWakeUpCallback(int /*fd*/, |
| int16_t /*event*/, |
| void* context) { |
| reinterpret_cast<SbSocketWaiter>(context)->HandleWakeUpRead(); |
| } |
| |
| void SbSocketWaiterPrivate::HandleSignal(Waitee* waitee, |
| short events) { // NOLINT[runtime/int] |
| int interests = 0; |
| if (events & EV_READ) { |
| interests |= kSbSocketWaiterInterestRead; |
| } |
| |
| if (events & EV_WRITE) { |
| interests |= kSbSocketWaiterInterestWrite; |
| } |
| |
| // Remove the non-persistent waitee before calling the callback, so that we |
| // can add another waitee in the callback if we need to. This is also why we |
| // copy all the fields we need out of waitee. |
| SbSocket socket = waitee->socket; |
| void* context = waitee->context; |
| SbSocketWaiterCallback callback = waitee->callback; |
| if (!waitee->persistent) { |
| Remove(waitee->socket); |
| } |
| |
| callback(this, socket, context, interests); |
| } |
| |
| void SbSocketWaiterPrivate::HandleWakeUpRead() { |
| SB_DCHECK(waiting_); |
| // Remove and discard the wakeup byte. |
| char buf; |
| int bytes_read = HANDLE_EINTR(read(wakeup_read_fd_, &buf, 1)); |
| SB_DCHECK(bytes_read == 1); |
| if (buf != 0) { |
| woken_up_ = true; |
| } |
| event_base_loopbreak(base_); |
| } |
| |
| void SbSocketWaiterPrivate::AddWaitee(Waitee* waitee) { |
| waitees_.insert(std::make_pair(waitee->socket, waitee)); |
| } |
| |
| SbSocketWaiterPrivate::Waitee* SbSocketWaiterPrivate::GetWaitee( |
| SbSocket socket) { |
| WaiteesMap::iterator it = waitees_.find(socket); |
| if (it == waitees_.end()) { |
| return NULL; |
| } |
| |
| return it->second; |
| } |
| |
| SbSocketWaiterPrivate::Waitee* SbSocketWaiterPrivate::RemoveWaitee( |
| SbSocket socket) { |
| WaiteesMap::iterator it = waitees_.find(socket); |
| if (it == waitees_.end()) { |
| return NULL; |
| } |
| |
| Waitee* result = it->second; |
| waitees_.erase(it); |
| return result; |
| } |