// Copyright 2017 The Cobalt Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "starboard/shared/starboard/link_receiver.h"

#include <string>
#include <unordered_map>

#include "starboard/atomic.h"
#include "starboard/common/file.h"
#include "starboard/common/log.h"
#include "starboard/common/scoped_ptr.h"
#include "starboard/common/semaphore.h"
#include "starboard/common/socket.h"
#include "starboard/common/string.h"
#include "starboard/configuration_constants.h"
#include "starboard/shared/starboard/application.h"
#include "starboard/socket_waiter.h"
#include "starboard/system.h"

namespace starboard {
namespace shared {
namespace starboard {

namespace {
// Returns an address that means bind to any interface on the given |port|. When
// |port| is zero, it means the system should choose the port.
SbSocketAddress GetUnspecifiedAddress(SbSocketAddressType address_type,
                                      int port) {
  SbSocketAddress address = {0};
  address.type = address_type;
  address.port = port;
  return address;
}

// Returns an address that means bind to the loopback interface on the given
// |port|. When |port| is zero, it means the system should choose the port.
SbSocketAddress GetLocalhostAddress(SbSocketAddressType address_type,
                                    int port) {
  SbSocketAddress address = GetUnspecifiedAddress(address_type, port);
  switch (address_type) {
    case kSbSocketAddressTypeIpv4: {
      address.address[0] = 127;
      address.address[3] = 1;
      return address;
    }
    case kSbSocketAddressTypeIpv6: {
      address.address[15] = 1;
      return address;
    }
  }
  SB_LOG(ERROR) << __FUNCTION__ << ": unknown address type: " << address_type;
  return address;
}

// Creates a socket that is appropriate for binding and listening, but is not
// bound and hasn't started listening yet.
scoped_ptr<Socket> CreateServerSocket(SbSocketAddressType address_type) {
  scoped_ptr<Socket> socket(new Socket(address_type));
  if (!socket->IsValid()) {
    SB_LOG(ERROR) << __FUNCTION__ << ": "
                  << "SbSocketCreate failed";
    return scoped_ptr<Socket>().Pass();
  }

  if (!socket->SetReuseAddress(true)) {
    SB_LOG(ERROR) << __FUNCTION__ << ": "
                  << "SbSocketSetReuseAddress failed";
    return scoped_ptr<Socket>().Pass();
  }

  return socket.Pass();
}

// Creates a server socket that is bound to the loopback interface.
scoped_ptr<Socket> CreateLocallyBoundSocket(SbSocketAddressType address_type,
                                            int port) {
  scoped_ptr<Socket> socket = CreateServerSocket(address_type);
  if (!socket) {
    return scoped_ptr<Socket>().Pass();
  }

  SbSocketAddress address = GetLocalhostAddress(address_type, port);
  SbSocketError result = socket->Bind(&address);
  if (result != kSbSocketOk) {
    SB_LOG(ERROR) << __FUNCTION__ << ": "
                  << "SbSocketBind to " << port << " failed: " << result;
    return scoped_ptr<Socket>().Pass();
  }

  return socket.Pass();
}

// Creates a server socket that is bound and listening to the loopback interface
// on the given port.
scoped_ptr<Socket> CreateListeningSocket(SbSocketAddressType address_type,
                                         int port) {
  scoped_ptr<Socket> socket = CreateLocallyBoundSocket(address_type, port);
  if (!socket) {
    return scoped_ptr<Socket>().Pass();
  }

  SbSocketError result = socket->Listen();
  if (result != kSbSocketOk) {
    SB_LOG(ERROR) << __FUNCTION__ << ": "
                  << "SbSocketListen failed: " << result;
    return scoped_ptr<Socket>().Pass();
  }

  return socket.Pass();
}

// Gets the port socket is bound to.
bool GetBoundPort(Socket* socket, int* out_port) {
  SB_DCHECK(out_port);
  SB_DCHECK(socket);

  SbSocketAddress socket_address = {0};
  bool result = socket->GetLocalAddress(&socket_address);
  if (!result) {
    return false;
  }

  *out_port = socket_address.port;
  return true;
}

std::string GetTemporaryDirectory() {
  const int kMaxPathLength = kSbFileMaxPath;
  scoped_array<char> temp_path(new char[kMaxPathLength]);
  bool has_temp = SbSystemGetPath(kSbSystemPathTempDirectory, temp_path.get(),
                                  kMaxPathLength);
  if (!has_temp) {
    SB_LOG(ERROR) << __FUNCTION__ << ": "
                  << "No temporary directory.";
    return "";
  }

  return std::string(temp_path.get());
}

// Writes |size| bytes of |contents| to the file at |name|.
void CreateTemporaryFile(const char* name, const char* contents, int size) {
  std::string path = GetTemporaryDirectory();
  if (path.empty()) {
    return;
  }

  path += kSbFileSepString;
  path += name;
  ScopedFile file(path.c_str(), kSbFileCreateAlways | kSbFileWrite);
  if (!file.IsValid()) {
    SB_LOG(ERROR) << __FUNCTION__ << ": "
                  << "Unable to create: " << path;
    return;
  }

  file.WriteAll(contents, size);
  file.Flush();
}
}  // namespace

// PImpl of LinkReceiver.
class LinkReceiver::Impl {
 public:
  Impl(Application* application, int port);
  ~Impl();

 private:
  // Encapsulates connection state.
  struct Connection {
    explicit Connection(scoped_ptr<Socket> socket) : socket(socket.Pass()) {}
    ~Connection() {}
    void FlushLink(Application* application) {
      if (!data.empty()) {
        application->Link(data.c_str());
        data.clear();
      }
    }

    scoped_ptr<Socket> socket;
    std::string data;
  };

  // Runs the server, waiting on an SbSocketWaiter, and blocking until shut
  // down.
  void Run();

  // Adds |socket| to the SbSocketWaiter to wait until ready for accepting a new
  // connection.
  bool AddForAccept(Socket* socket);

  // Adds the |connection| to the SbSocketWaiter to wait until ready to read
  // more data.
  bool AddForRead(Connection* connection);

  // Called when the listening socket has a connection available to accept.
  void OnAcceptReady();

  // Called when the waiter reports that a socket has more data to read.
  void OnReadReady(SbSocket sb_socket);

  // Called when the waiter reports that a connection has more data to read.
  void OnReadReady(Connection* connection);

  // Thread entry point.
  static void* RunThread(void* context);

  // SbSocketWaiter entry points.
  static void HandleAccept(SbSocketWaiter waiter,
                           SbSocket socket,
                           void* context,
                           int ready_interests);
  static void HandleRead(SbSocketWaiter waiter,
                         SbSocket socket,
                         void* context,
                         int ready_interests);

  // The application to dispatch Link() calls to.
  Application* application_;

  // The port that was specified by the constructor.
  const int specified_port_;

  // The port that was queried off of the bound socket.
  int actual_port_;

  // The thread owned by this server.
  SbThread thread_;

  // An atomic flag that indicates whether to quit to the server thread.
  atomic_bool quit_;

  // The waiter to register sockets with and block on.
  SbSocketWaiter waiter_;

  // A semaphore that will be signaled by the internal thread once the waiter
  // has been initialized, so the external thread can safely wake up the waiter.
  Semaphore waiter_initialized_;

  // A semaphore that will be signaled by the external thread indicating that it
  // will no longer reference the waiter, so that the internal thread can safely
  // destroy the waiter.
  Semaphore destroy_waiter_;

  // The server socket listening for new connections.
  scoped_ptr<Socket> listen_socket_;

  // A map of raw SbSockets to Connection objects.
  std::unordered_map<SbSocket, Connection*> connections_;
};

LinkReceiver::Impl::Impl(Application* application, int port)
    : application_(application),
      specified_port_(port),
      thread_(kSbThreadInvalid),
      waiter_(kSbSocketWaiterInvalid) {
  thread_ =
      SbThreadCreate(0, kSbThreadNoPriority, kSbThreadNoAffinity, true,
                     "LinkReceiver", &LinkReceiver::Impl::RunThread, this);

  // Block until waiter is set.
  waiter_initialized_.Take();
}

LinkReceiver::Impl::~Impl() {
  SB_DCHECK(!SbThreadIsEqual(thread_, SbThreadGetCurrent()));
  quit_.store(true);
  if (SbSocketWaiterIsValid(waiter_)) {
    SbSocketWaiterWakeUp(waiter_);
  }
  destroy_waiter_.Put();
  SbThreadJoin(thread_, NULL);
}

void LinkReceiver::Impl::Run() {
  waiter_ = SbSocketWaiterCreate();
  if (!SbSocketWaiterIsValid(waiter_)) {
    SB_LOG(WARNING) << "Unable to create SbSocketWaiter.";
    waiter_initialized_.Put();
    return;
  }

  listen_socket_ =
      CreateListeningSocket(kSbSocketAddressTypeIpv4, specified_port_);
  if (!listen_socket_ || !listen_socket_->IsValid()) {
      listen_socket_ = CreateListeningSocket(kSbSocketAddressTypeIpv6,
                                             specified_port_);
  }
  if (!listen_socket_ || !listen_socket_->IsValid()) {
    SB_LOG(WARNING) << "Unable to start LinkReceiver on port "
                    << specified_port_ << ".";
    SbSocketWaiterDestroy(waiter_);
    waiter_ = kSbSocketWaiterInvalid;
    waiter_initialized_.Put();
    return;
  }

  actual_port_ = 0;
  bool result = GetBoundPort(listen_socket_.get(), &actual_port_);
  if (!result) {
    SB_LOG(WARNING) << "Unable to get LinkReceiver bound port.";
    SbSocketWaiterDestroy(waiter_);
    waiter_ = kSbSocketWaiterInvalid;
    waiter_initialized_.Put();
    return;
  }

  char port_string[32] = {0};
  SbStringFormatF(port_string, SB_ARRAY_SIZE(port_string), "%d", actual_port_);
  CreateTemporaryFile("link_receiver_port", port_string,
                      SbStringGetLength(port_string));

  if (!AddForAccept(listen_socket_.get())) {
    quit_.store(true);
  }

  waiter_initialized_.Put();
  while (!quit_.load()) {
    SbSocketWaiterWait(waiter_);
  }

  for (auto& entry : connections_) {
    SbSocketWaiterRemove(waiter_, entry.first);
    delete entry.second;
  }
  connections_.clear();

  SbSocketWaiterRemove(waiter_, listen_socket_->socket());

  // Block until destroying thread will no longer reference waiter.
  destroy_waiter_.Take();
  SbSocketWaiterDestroy(waiter_);
}

bool LinkReceiver::Impl::AddForAccept(Socket* socket) {
  if (!SbSocketWaiterAdd(waiter_, socket->socket(), this,
                         &LinkReceiver::Impl::HandleAccept,
                         kSbSocketWaiterInterestRead, true)) {
    SB_LOG(ERROR) << __FUNCTION__ << ": "
                  << "SbSocketWaiterAdd failed.";
    return false;
  }
  return true;
}

bool LinkReceiver::Impl::AddForRead(Connection* connection) {
  if (!SbSocketWaiterAdd(waiter_, connection->socket->socket(), this,
                         &LinkReceiver::Impl::HandleRead,
                         kSbSocketWaiterInterestRead, false)) {
    SB_LOG(ERROR) << __FUNCTION__ << ": "
                  << "SbSocketWaiterAdd failed.";
    return false;
  }
  return true;
}

void LinkReceiver::Impl::OnAcceptReady() {
  scoped_ptr<Socket> accepted_socket =
      make_scoped_ptr(listen_socket_->Accept());
  SB_DCHECK(accepted_socket);
  Connection* connection = new Connection(accepted_socket.Pass());
  connections_.emplace(connection->socket->socket(), connection);
  AddForRead(connection);
}

void LinkReceiver::Impl::OnReadReady(SbSocket sb_socket) {
  auto iter = connections_.find(sb_socket);
  SB_DCHECK(iter != connections_.end());
  OnReadReady(iter->second);
}

void LinkReceiver::Impl::OnReadReady(Connection* connection) {
  auto socket = connection->socket.get();

  char data[64] = {0};
  int read = socket->ReceiveFrom(data, SB_ARRAY_SIZE_INT(data), NULL);
  int last_null = 0;
  for (int position = 0; position < read; ++position) {
    if (data[position] == '\0' || data[position] == '\n' ||
        data[position] == '\r') {
      int length = position - last_null;
      if (length) {
        connection->data.append(&data[last_null], length);
        connection->FlushLink(application_);
      }
      last_null = position + 1;
      continue;
    }
  }

  int remainder = read - last_null;
  if (remainder > 0) {
    connection->data.append(&data[last_null], remainder);
  }

  if (read == 0) {
    // Terminate connection.
    connection->FlushLink(application_);
    connections_.erase(socket->socket());
    delete connection;
    return;
  }

  AddForRead(connection);
}

// static
void* LinkReceiver::Impl::RunThread(void* context) {
  SB_DCHECK(context);
  reinterpret_cast<LinkReceiver::Impl*>(context)->Run();
  return NULL;
}

// static
void LinkReceiver::Impl::HandleAccept(SbSocketWaiter waiter,
                                      SbSocket socket,
                                      void* context,
                                      int ready_interests) {
  SB_DCHECK(context);
  reinterpret_cast<LinkReceiver::Impl*>(context)->OnAcceptReady();
}

// static
void LinkReceiver::Impl::HandleRead(SbSocketWaiter waiter,
                                    SbSocket socket,
                                    void* context,
                                    int ready_interests) {
  SB_DCHECK(context);
  reinterpret_cast<LinkReceiver::Impl*>(context)->OnReadReady(socket);
}

LinkReceiver::LinkReceiver(Application* application)
    : impl_(new Impl(application, 0)) {}

LinkReceiver::LinkReceiver(Application* application, int port)
    : impl_(new Impl(application, port)) {}

LinkReceiver::~LinkReceiver() {
  delete impl_;
}

}  // namespace starboard
}  // namespace shared
}  // namespace starboard
