blob: a046f8b3714d5ec5a409ed8f394b16c4b6fef369 [file] [log] [blame]
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "starboard/shared/starboard/link_receiver.h"
#include <string>
#include <unordered_map>
#include "starboard/atomic.h"
#include "starboard/common/scoped_ptr.h"
#include "starboard/common/semaphore.h"
#include "starboard/file.h"
#include "starboard/log.h"
#include "starboard/shared/starboard/application.h"
#include "starboard/socket.h"
#include "starboard/socket_waiter.h"
#include "starboard/string.h"
#include "starboard/system.h"
namespace starboard {
namespace shared {
namespace starboard {
namespace {
// Returns an address that means bind to any interface on the given |port|. When
// |port| is zero, it means the system should choose the port.
SbSocketAddress GetUnspecifiedAddress(SbSocketAddressType address_type,
int port) {
SbSocketAddress address = {0};
address.type = address_type;
address.port = port;
return address;
}
// Returns an address that means bind to the loopback interface on the given
// |port|. When |port| is zero, it means the system should choose the port.
SbSocketAddress GetLocalhostAddress(SbSocketAddressType address_type,
int port) {
SbSocketAddress address = GetUnspecifiedAddress(address_type, port);
switch (address_type) {
case kSbSocketAddressTypeIpv4: {
address.address[0] = 127;
address.address[3] = 1;
return address;
}
case kSbSocketAddressTypeIpv6: {
address.address[15] = 1;
return address;
}
}
SB_LOG(ERROR) << __FUNCTION__ << ": unknown address type: " << address_type;
return address;
}
// Creates a socket that is appropriate for binding and listening, but is not
// bound and hasn't started listening yet.
scoped_ptr<Socket> CreateServerSocket(SbSocketAddressType address_type) {
scoped_ptr<Socket> socket(new Socket(address_type));
if (!socket->IsValid()) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "SbSocketCreate failed";
return scoped_ptr<Socket>().Pass();
}
if (!socket->SetReuseAddress(true)) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "SbSocketSetReuseAddress failed";
return scoped_ptr<Socket>().Pass();
}
return socket.Pass();
}
// Creates a server socket that is bound to the loopback interface.
scoped_ptr<Socket> CreateLocallyBoundSocket(SbSocketAddressType address_type,
int port) {
scoped_ptr<Socket> socket = CreateServerSocket(address_type);
if (!socket) {
return scoped_ptr<Socket>().Pass();
}
SbSocketAddress address = GetLocalhostAddress(address_type, port);
SbSocketError result = socket->Bind(&address);
if (result != kSbSocketOk) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "SbSocketBind to " << port << " failed: " << result;
return scoped_ptr<Socket>().Pass();
}
return socket.Pass();
}
// Creates a server socket that is bound and listening to the loopback interface
// on the given port.
scoped_ptr<Socket> CreateListeningSocket(SbSocketAddressType address_type,
int port) {
scoped_ptr<Socket> socket = CreateLocallyBoundSocket(address_type, port);
if (!socket) {
return scoped_ptr<Socket>().Pass();
}
SbSocketError result = socket->Listen();
if (result != kSbSocketOk) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "SbSocketListen failed: " << result;
return scoped_ptr<Socket>().Pass();
}
return socket.Pass();
}
// Gets the port socket is bound to.
bool GetBoundPort(Socket* socket, int* out_port) {
SB_DCHECK(out_port);
SB_DCHECK(socket);
SbSocketAddress socket_address = {0};
bool result = socket->GetLocalAddress(&socket_address);
if (!result) {
return false;
}
*out_port = socket_address.port;
return true;
}
std::string GetTemporaryDirectory() {
const int kMaxPathLength = SB_FILE_MAX_PATH;
scoped_array<char> temp_path(new char[kMaxPathLength]);
bool has_temp = SbSystemGetPath(kSbSystemPathTempDirectory, temp_path.get(),
kMaxPathLength);
if (!has_temp) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "No temporary directory.";
return "";
}
return std::string(temp_path.get());
}
// Writes |size| bytes of |contents| to the file at |name|.
void CreateTemporaryFile(const char* name, const char* contents, int size) {
std::string path = GetTemporaryDirectory();
if (path.empty()) {
return;
}
path += SB_FILE_SEP_STRING;
path += name;
ScopedFile file(path.c_str(), kSbFileCreateAlways | kSbFileWrite);
if (!file.IsValid()) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "Unable to create: " << path;
return;
}
file.WriteAll(contents, size);
file.Flush();
}
} // namespace
// PImpl of LinkReceiver.
class LinkReceiver::Impl {
public:
Impl(Application* application, int port);
~Impl();
private:
// Encapsulates connection state.
struct Connection {
explicit Connection(scoped_ptr<Socket> socket) : socket(socket.Pass()) {}
~Connection() {}
void FlushLink(Application* application) {
if (!data.empty()) {
application->Link(data.c_str());
data.clear();
}
}
scoped_ptr<Socket> socket;
std::string data;
};
// Runs the server, waiting on an SbSocketWaiter, and blocking until shut
// down.
void Run();
// Adds |socket| to the SbSocketWaiter to wait until ready for accepting a new
// connection.
bool AddForAccept(Socket* socket);
// Adds the |connection| to the SbSocketWaiter to wait until ready to read
// more data.
bool AddForRead(Connection* connection);
// Called when the listening socket has a connection available to accept.
void OnAcceptReady();
// Called when the waiter reports that a socket has more data to read.
void OnReadReady(SbSocket sb_socket);
// Called when the waiter reports that a connection has more data to read.
void OnReadReady(Connection* connection);
// Thread entry point.
static void* RunThread(void* context);
// SbSocketWaiter entry points.
static void HandleAccept(SbSocketWaiter waiter,
SbSocket socket,
void* context,
int ready_interests);
static void HandleRead(SbSocketWaiter waiter,
SbSocket socket,
void* context,
int ready_interests);
// The application to dispatch Link() calls to.
Application* application_;
// The port that was specified by the constructor.
const int specified_port_;
// The port that was queried off of the bound socket.
int actual_port_;
// The thread owned by this server.
SbThread thread_;
// An atomic flag that indicates whether to quit to the server thread.
atomic_bool quit_;
// The waiter to register sockets with and block on.
SbSocketWaiter waiter_;
// A semaphore that will be signaled by the internal thread once the waiter
// has been initialized, so the external thread can safely wake up the waiter.
Semaphore waiter_initialized_;
// A semaphore that will be signaled by the external thread indicating that it
// will no longer reference the waiter, so that the internal thread can safely
// destroy the waiter.
Semaphore destroy_waiter_;
// The server socket listening for new connections.
scoped_ptr<Socket> listen_socket_;
// A map of raw SbSockets to Connection objects.
std::unordered_map<SbSocket, Connection*> connections_;
};
LinkReceiver::Impl::Impl(Application* application, int port)
: application_(application),
specified_port_(port),
thread_(kSbThreadInvalid),
waiter_(kSbSocketWaiterInvalid) {
thread_ =
SbThreadCreate(0, kSbThreadNoPriority, kSbThreadNoAffinity, true,
"LinkReceiver", &LinkReceiver::Impl::RunThread, this);
// Block until waiter is set.
waiter_initialized_.Take();
}
LinkReceiver::Impl::~Impl() {
SB_DCHECK(!SbThreadIsEqual(thread_, SbThreadGetCurrent()));
quit_.store(true);
if (SbSocketWaiterIsValid(waiter_)) {
SbSocketWaiterWakeUp(waiter_);
}
destroy_waiter_.Put();
SbThreadJoin(thread_, NULL);
}
void LinkReceiver::Impl::Run() {
waiter_ = SbSocketWaiterCreate();
if (!SbSocketWaiterIsValid(waiter_)) {
SB_LOG(WARNING) << "Unable to create SbSocketWaiter.";
waiter_initialized_.Put();
return;
}
listen_socket_ =
CreateListeningSocket(kSbSocketAddressTypeIpv4, specified_port_);
if (!listen_socket_ || !listen_socket_->IsValid()) {
listen_socket_ = CreateListeningSocket(kSbSocketAddressTypeIpv6,
specified_port_);
}
if (!listen_socket_ || !listen_socket_->IsValid()) {
SB_LOG(WARNING) << "Unable to start LinkReceiver on port "
<< specified_port_ << ".";
SbSocketWaiterDestroy(waiter_);
waiter_ = kSbSocketWaiterInvalid;
waiter_initialized_.Put();
return;
}
actual_port_ = 0;
bool result = GetBoundPort(listen_socket_.get(), &actual_port_);
if (!result) {
SB_LOG(WARNING) << "Unable to get LinkReceiver bound port.";
SbSocketWaiterDestroy(waiter_);
waiter_ = kSbSocketWaiterInvalid;
waiter_initialized_.Put();
return;
}
char port_string[32] = {0};
SbStringFormatF(port_string, SB_ARRAY_SIZE(port_string), "%d", actual_port_);
CreateTemporaryFile("link_receiver_port", port_string,
SbStringGetLength(port_string));
if (!AddForAccept(listen_socket_.get())) {
quit_.store(true);
}
waiter_initialized_.Put();
while (!quit_.load()) {
SbSocketWaiterWait(waiter_);
}
for (auto& entry : connections_) {
SbSocketWaiterRemove(waiter_, entry.first);
delete entry.second;
}
connections_.clear();
SbSocketWaiterRemove(waiter_, listen_socket_->socket());
// Block until destroying thread will no longer reference waiter.
destroy_waiter_.Take();
SbSocketWaiterDestroy(waiter_);
}
bool LinkReceiver::Impl::AddForAccept(Socket* socket) {
if (!SbSocketWaiterAdd(waiter_, socket->socket(), this,
&LinkReceiver::Impl::HandleAccept,
kSbSocketWaiterInterestRead, true)) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "SbSocketWaiterAdd failed.";
return false;
}
return true;
}
bool LinkReceiver::Impl::AddForRead(Connection* connection) {
if (!SbSocketWaiterAdd(waiter_, connection->socket->socket(), this,
&LinkReceiver::Impl::HandleRead,
kSbSocketWaiterInterestRead, false)) {
SB_LOG(ERROR) << __FUNCTION__ << ": "
<< "SbSocketWaiterAdd failed.";
return false;
}
return true;
}
void LinkReceiver::Impl::OnAcceptReady() {
scoped_ptr<Socket> accepted_socket =
make_scoped_ptr(listen_socket_->Accept());
SB_DCHECK(accepted_socket);
Connection* connection = new Connection(accepted_socket.Pass());
connections_.emplace(connection->socket->socket(), connection);
AddForRead(connection);
}
void LinkReceiver::Impl::OnReadReady(SbSocket sb_socket) {
auto iter = connections_.find(sb_socket);
SB_DCHECK(iter != connections_.end());
OnReadReady(iter->second);
}
void LinkReceiver::Impl::OnReadReady(Connection* connection) {
auto socket = connection->socket.get();
char data[64] = {0};
int read = socket->ReceiveFrom(data, SB_ARRAY_SIZE_INT(data), NULL);
int last_null = 0;
for (int position = 0; position < read; ++position) {
if (data[position] == '\0' || data[position] == '\n' ||
data[position] == '\r') {
int length = position - last_null;
if (length) {
connection->data.append(&data[last_null], length);
connection->FlushLink(application_);
}
last_null = position + 1;
continue;
}
}
int remainder = read - last_null;
if (remainder > 0) {
connection->data.append(&data[last_null], remainder);
}
if (read == 0) {
// Terminate connection.
connection->FlushLink(application_);
connections_.erase(socket->socket());
delete connection;
return;
}
AddForRead(connection);
}
// static
void* LinkReceiver::Impl::RunThread(void* context) {
SB_DCHECK(context);
reinterpret_cast<LinkReceiver::Impl*>(context)->Run();
return NULL;
}
// static
void LinkReceiver::Impl::HandleAccept(SbSocketWaiter /*waiter*/,
SbSocket /*socket*/,
void* context,
int ready_interests) {
SB_DCHECK(context);
reinterpret_cast<LinkReceiver::Impl*>(context)->OnAcceptReady();
}
// static
void LinkReceiver::Impl::HandleRead(SbSocketWaiter /*waiter*/,
SbSocket socket,
void* context,
int /*ready_interests*/) {
SB_DCHECK(context);
reinterpret_cast<LinkReceiver::Impl*>(context)->OnReadReady(socket);
}
LinkReceiver::LinkReceiver(Application* application)
: impl_(new Impl(application, 0)) {}
LinkReceiver::LinkReceiver(Application* application, int port)
: impl_(new Impl(application, port)) {}
LinkReceiver::~LinkReceiver() {
delete impl_;
}
} // namespace starboard
} // namespace shared
} // namespace starboard