blob: 715263d6081fab79106eb1ca97f0223b25b91c20 [file] [log] [blame]
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "starboard/shared/starboard/net_log.h"
#include <windows.h>
#include <algorithm>
#include <deque>
#include <functional>
#include <map>
#include <string>
#include "starboard/atomic.h"
#include "starboard/common/semaphore.h"
#include "starboard/common/thread.h"
#include "starboard/log.h"
#include "starboard/mutex.h"
#include "starboard/once.h"
#include "starboard/socket.h"
#include "starboard/socket_waiter.h"
#include "starboard/string.h"
#ifndef NET_LOG_PORT
// Default port was generated from a random number generator.
#define NET_LOG_PORT 49353
#endif
// Controls whether using IPv4 or IPv6.
#ifndef NET_LOG_IP_VERSION
#define NET_LOG_IP_VERSION kSbSocketAddressTypeIpv4
#endif
// Default Socket write buffer is 100k.
#ifndef NET_LOG_SOCKET_BUFFER_SIZE
#define NET_LOG_SOCKET_BUFFER_SIZE (1024 * 100)
#endif
// Default in memory write buffer is 1mb.
#ifndef NET_LOG_MAX_IN_MEMORY_BUFFER
#define NET_LOG_MAX_IN_MEMORY_BUFFER (1024 * 1024)
#endif
// Default block to send to the socket is 1k.
#ifndef NET_LOG_SOCKET_SEND_SIZE
#define NET_LOG_SOCKET_SEND_SIZE 1024
#endif
namespace starboard {
namespace shared {
namespace starboard {
namespace {
using JoinFunction = std::function<void()>;
using RunFunction = std::function<void(Semaphore*, atomic_bool*)>;
using std::placeholders::_1;
using std::placeholders::_2;
class FunctionThread : public Thread {
public:
static scoped_ptr<Thread> Create(
const std::string& thread_name,
RunFunction run_function,
JoinFunction on_join_called = JoinFunction()) {
scoped_ptr<Thread> out(
new FunctionThread(thread_name, run_function, on_join_called));
return out.Pass();
}
FunctionThread(const std::string& name,
RunFunction run_function,
JoinFunction join_function)
: Thread(name),
run_function_(run_function),
join_function_(join_function) {
}
void Run() override {
run_function_(join_sema(), joined_bool());
}
void Join() override {
if (join_function_) {
join_function_();
}
Thread::Join();
}
private:
RunFunction run_function_;
JoinFunction join_function_;
};
std::string ToString(SbSocketError error) {
switch (error) {
case kSbSocketOk: { return "kSbSocketOk"; }
case kSbSocketPending: { return "kSbSocketErrorConnectionReset"; }
case kSbSocketErrorFailed: { return "kSbSocketErrorFailed"; }
#if SB_HAS(SOCKET_ERROR_CONNECTION_RESET_SUPPORT) || \
SB_API_VERSION >= 9
case kSbSocketErrorConnectionReset: {
return "kSbSocketErrorConnectionReset";
}
#endif // SB_HAS(SOCKET_ERROR_CONNECTION_RESET_SUPPORT) ||
// SB_API_VERSION >= 9
}
SB_NOTREACHED() << "Unexpected case " << error;
std::stringstream ss;
ss << "Unknown-" << error;
return ss.str();
}
scoped_ptr<Socket> CreateListenSocket() {
scoped_ptr<Socket> socket(
new Socket(NET_LOG_IP_VERSION, kSbSocketProtocolTcp));
socket->SetReuseAddress(true);
SbSocketAddress sock_addr;
// Ip address will be set to 0.0.0.0 so that it will bind to all sockets.
SbMemorySet(&sock_addr, 0, sizeof(SbSocketAddress));
sock_addr.type = NET_LOG_IP_VERSION;
sock_addr.port = NET_LOG_PORT;
SbSocketError sock_err = socket->Bind(&sock_addr);
const char kErrFmt[] = "Socket error while attempting to bind, error = %d\n";
// Can't use SB_LOG_IF() because SB_LOG_IF() might have triggered this call,
// and therefore would triggered reentrant behavior and then deadlock.
// SbLogRawFormat() is assumed to be safe to call. Note that NetLogWrite()
// ignores reentrant calls.
if (sock_err != kSbSocketOk) {
SbLogRawFormatF(kErrFmt, sock_err);
}
sock_err = socket->Listen();
if (sock_err != kSbSocketOk) {
SbLogRawFormatF(kErrFmt, sock_err);
}
return socket.Pass();
}
class BufferedSocketWriter {
public:
BufferedSocketWriter(int in_memory_buffer_size, int chunk_size)
: max_memory_buffer_size_(in_memory_buffer_size),
chunk_size_(chunk_size) {}
void Append(const char* data, size_t data_n) {
bool overflow = false;
log_mutex_.Acquire();
for (const char* curr = data; curr != data + data_n; ++curr) {
log_.push_back(*curr);
if (log_.size() > max_memory_buffer_size_) {
overflow = true;
log_.pop_front();
}
}
log_mutex_.Release();
if (overflow) {
// Can't use SB_LOG_IF() because SB_LOG_IF() might have triggered this
// call, and therefore would triggered reentrant behavior and then
// deadlock. SbLogRawFormat() is assumed to be safe to call. Note that
// NetLogWrite() ignores reentrant calls.
SbLogRaw("Net log dropped buffer data.");
}
}
void WaitUntilWritableOrConnectionReset(SbSocket sock) {
SbSocketWaiter waiter = SbSocketWaiterCreate();
struct F {
static void WakeUp(SbSocketWaiter waiter, SbSocket, void*, int) {
SbSocketWaiterWakeUp(waiter);
}
};
SbSocketWaiterAdd(waiter,
sock,
NULL,
&F::WakeUp,
kSbSocketWaiterInterestWrite,
false); // false means one shot.
SbSocketWaiterWait(waiter);
SbSocketWaiterRemove(waiter, sock);
SbSocketWaiterDestroy(waiter);
}
bool IsConnectionReset(SbSocketError err) {
#if SB_HAS(SOCKET_ERROR_CONNECTION_RESET_SUPPORT) || \
SB_API_VERSION >= 9
return err == kSbSocketErrorConnectionReset;
#else
return false;
#endif // SB_HAS(SOCKET_ERROR_CONNECTION_RESET_SUPPORT) ||
// SB_API_VERSION >= 9
}
// Will flush data through to the dest_socket. Returns |true| if
// flushed, else connection was dropped or an error occured.
bool Flush(SbSocket dest_socket) {
std::string curr_write_block;
while (TransferData(chunk_size_, &curr_write_block)) {
while (!curr_write_block.empty()) {
int bytes_to_write = static_cast<int>(curr_write_block.size());
int result = SbSocketSendTo(dest_socket, curr_write_block.c_str(),
bytes_to_write, NULL);
if (result < 0) {
SbSocketError err = SbSocketGetLastError(dest_socket);
SbSocketClearLastError(dest_socket);
if (err == kSbSocketPending) {
blocked_counts_.increment();
WaitUntilWritableOrConnectionReset(dest_socket);
continue;
} else if (IsConnectionReset(err)) {
return false;
} else {
SB_LOG(ERROR) << "An error happened while writing to socket: "
<< ToString(err);
return false;
}
break;
} else if (result == 0) {
// Socket has closed.
return false;
} else {
// Expected condition. Partial or full write was successful.
size_t bytes_written = static_cast<size_t>(result);
SB_DCHECK(bytes_written <= bytes_to_write);
curr_write_block.erase(0, bytes_written);
}
}
}
return true;
}
int32_t blocked_counts() const { return blocked_counts_.load(); }
private:
bool TransferData(size_t max_size, std::string* destination) {
ScopedLock lock_log(log_mutex_);
size_t log_size = log_.size();
if (log_size == 0) {
return false;
}
size_t size = std::min<size_t>(max_size, log_size);
std::deque<char>::iterator begin_it = log_.begin();
std::deque<char>::iterator end_it = begin_it;
std::advance(end_it, size);
destination->assign(begin_it, end_it);
log_.erase(begin_it, end_it);
return true;
}
void PrependData(const std::string& curr_write_block) {
ScopedLock lock_log(log_mutex_);
log_.insert(log_.begin(),
curr_write_block.begin(),
curr_write_block.end());
}
int max_memory_buffer_size_;
int chunk_size_;
Mutex log_mutex_;
std::deque<char> log_;
atomic_int32_t blocked_counts_;
};
// This class will listen to the provided socket for a client
// connection. When a client connection is established, a
// callback will be invoked.
class SocketListener {
public:
typedef std::function<void(scoped_ptr<Socket>)> Callback;
SocketListener(Socket* listen_socket, Callback cb)
: listen_socket_(listen_socket),
callback_(cb) {
RunFunction run_cb = std::bind(&SocketListener::Run, this, _1, _2);
thread_ = FunctionThread::Create("NetLogSocketListener",
run_cb);
thread_->Start();
}
~SocketListener() {
thread_->Join();
}
private:
void Run(Semaphore* joined_sema, atomic_bool*) {
while (!joined_sema->TakeWait(100 * kSbTimeMillisecond)) {
scoped_ptr<Socket> client_connection(listen_socket_->Accept());
if (client_connection) {
callback_(client_connection.Pass());
break;
}
}
}
Socket* listen_socket_;
Callback callback_;
scoped_ptr<Thread> thread_;
};
class NetLogServer {
public:
static NetLogServer* Instance();
NetLogServer() : buffered_socket_writer_(NET_LOG_MAX_IN_MEMORY_BUFFER,
NET_LOG_SOCKET_SEND_SIZE) {
ScopedLock lock(socket_mutex_);
listen_socket_ = CreateListenSocket();
ListenForClient();
writer_thread_ = FunctionThread::Create(
"NetLogSocketWriter",
std::bind(&NetLogServer::WriterTask, this, _1, _2),
std::bind(&NetLogServer::OnWriterTaskJoined, this));
writer_thread_->Start();
}
~NetLogServer() {
SB_NOTREACHED(); // Should never reach here because of singleton use.
Close();
}
void ListenForClient() {
SocketListener::Callback cb = std::bind(&NetLogServer::OnClientConnect,
this,
std::placeholders::_1);
socket_listener_.reset();
socket_listener_.reset(new SocketListener(listen_socket_.get(), cb));
}
void OnClientConnect(scoped_ptr<Socket> client_socket) {
ScopedLock lock(socket_mutex_);
client_socket_ = client_socket.Pass();
client_socket_->SetSendBufferSize(NET_LOG_SOCKET_BUFFER_SIZE);
client_socket_->SetTcpKeepAlive(true, kSbTimeSecond);
}
// Has a client ever connected?
bool HasClientConnected() {
ScopedLock lock(socket_mutex_);
return client_socket_;
}
void OnLog(const char* msg) {
buffered_socket_writer_.Append(msg, SbStringGetLength(msg));
writer_thread_sema_.Put();
}
void Close() {
if (writer_thread_) {
writer_thread_->Join();
writer_thread_.reset(nullptr);
Flush(); // One last flush to the socket.
}
socket_listener_.reset();
ScopedLock lock(socket_mutex_);
client_socket_.reset();
listen_socket_.reset();
}
// Return |true| if the data was written out to a connected socket,
// else |false| if
// 1. There was no connected client.
// 2. The connection was dropped.
// 3. Some other connection error happened.
bool Flush() {
ScopedLock lock(socket_mutex_);
if (!client_socket_) {
return false;
}
return buffered_socket_writer_.Flush(client_socket_->socket());
}
private:
void OnWriterTaskJoined() {
writer_thread_sema_.Put();
}
void WriterTask(Semaphore* /*joined_sema*/, atomic_bool* is_joined) {
while (true) {
writer_thread_sema_.Take();
if (HasClientConnected()) {
if (!Flush()) {
break; // Connection dropped.
}
}
if (is_joined->load()) {
break;
}
}
}
scoped_ptr<Socket> listen_socket_;
scoped_ptr<Socket> client_socket_;
Mutex socket_mutex_;
scoped_ptr<SocketListener> socket_listener_;
scoped_ptr<Thread> writer_thread_;
Semaphore writer_thread_sema_;
BufferedSocketWriter buffered_socket_writer_;
};
class ThreadLocalBoolean {
public:
ThreadLocalBoolean() : slot_(SbThreadCreateLocalKey(NULL)) {}
~ThreadLocalBoolean() { SbThreadDestroyLocalKey(slot_); }
void Set(bool val) {
void* ptr = val ? reinterpret_cast<void*>(0x1) : nullptr;
SbThreadSetLocalValue(slot_, ptr);
}
bool Get() const {
void* ptr = SbThreadGetLocalValue(slot_);
return ptr != nullptr;
}
private:
SbThreadLocalKey slot_;
SB_DISALLOW_COPY_AND_ASSIGN(ThreadLocalBoolean);
};
SB_ONCE_INITIALIZE_FUNCTION(NetLogServer, NetLogServer::Instance);
SB_ONCE_INITIALIZE_FUNCTION(ThreadLocalBoolean, ScopeGuardTLB);
// Prevents re-entrant behavior for sending logs. This is useful if/when
// sub-routines will invoke logging on an error condition.
class ScopeGuard {
public:
ScopeGuard() : disabled_(false), tlb_(ScopeGuardTLB()) {
disabled_ = tlb_->Get();
tlb_->Set(true);
}
~ScopeGuard() {
tlb_->Set(disabled_);
}
bool IsEnabled() {
return !disabled_;
}
private:
bool disabled_;
ThreadLocalBoolean* tlb_;
};
} // namespace.
const char kNetLogCommandSwitchWait[] = "net_log_wait_for_connection";
void NetLogWaitForClientConnected() {
#if !SB_LOGGING_IS_OFFICIAL_BUILD
ScopeGuard guard;
if (guard.IsEnabled()) {
while (!NetLogServer::Instance()->HasClientConnected()) {
SbThreadSleep(kSbTimeMillisecond);
}
}
#endif
}
void NetLogWrite(const char* data) {
#if !SB_LOGGING_IS_OFFICIAL_BUILD
ScopeGuard guard;
if (guard.IsEnabled()) {
NetLogServer::Instance()->OnLog(data);
}
#endif
}
void NetLogFlush() {
#if !SB_LOGGING_IS_OFFICIAL_BUILD
ScopeGuard guard;
if (guard.IsEnabled()) {
NetLogServer::Instance()->Flush();
}
#endif
}
void NetLogFlushThenClose() {
#if !SB_LOGGING_IS_OFFICIAL_BUILD
ScopeGuard guard;
if (guard.IsEnabled()) {
NetLogServer* net_log = NetLogServer::Instance();
net_log->OnLog("Netlog is closing down\n");
net_log->Close();
}
#endif
}
} // namespace starboard
} // namespace shared
} // namespace starboard