// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Adapted from udp_socket_libevent.cc

#include "net/socket/udp_socket_starboard.h"

#include "base/callback.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "base/rand_util.h"
#include "base/task/post_task.h"
#include "base/task_runner_util.h"
#include "base/trace_event/trace_event.h"
#include "net/base/io_buffer.h"
#include "net/base/ip_endpoint.h"
#include "net/base/net_errors.h"
#include "net/base/network_activity_monitor.h"
#include "net/log/net_log.h"
#include "net/log/net_log_event_type.h"
#include "net/log/net_log_source.h"
#include "net/log/net_log_source_type.h"
#include "net/socket/udp_net_log_parameters.h"
#include "starboard/common/socket.h"
#include "starboard/system.h"

namespace net {

UDPSocketStarboard::UDPSocketStarboard(DatagramSocket::BindType bind_type,
                                       net::NetLog* net_log,
                                       const net::NetLogSource& source)
    : write_async_watcher_(std::make_unique<WriteAsyncWatcher>(this)),
      sender_(new UDPSocketStarboardSender()),
      socket_(kSbSocketInvalid),
      socket_options_(0),
      bind_type_(bind_type),
      read_watcher_(this),
      write_watcher_(this),
      read_buf_len_(0),
      recv_from_address_(NULL),
      write_buf_len_(0),
      net_log_(NetLogWithSource::Make(net_log, NetLogSourceType::UDP_SOCKET)),
      weak_factory_(this) {
  net_log_.BeginEvent(NetLogEventType::SOCKET_ALIVE,
                      source.ToEventParametersCallback());
}

UDPSocketStarboard::~UDPSocketStarboard() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  Close();
  net_log_.EndEvent(NetLogEventType::SOCKET_ALIVE);
}

int UDPSocketStarboard::Open(AddressFamily address_family) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(!SbSocketIsValid(socket_));

  address_type_ =
      (address_family == ADDRESS_FAMILY_IPV6 ? kSbSocketAddressTypeIpv6
                                             : kSbSocketAddressTypeIpv4);
  socket_ = SbSocketCreate(address_type_, kSbSocketProtocolUdp);
  if (!SbSocketIsValid(socket_)) {
    return MapLastSystemError();
  }

  return OK;
}

void UDPSocketStarboard::Close() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  if (socket_ == kSbSocketInvalid)
    return;

  // Zero out any pending read/write callback state.
  read_buf_ = NULL;
  read_buf_len_ = 0;
  read_callback_.Reset();
  recv_from_address_ = NULL;
  write_buf_ = NULL;
  write_buf_len_ = 0;
  write_callback_.Reset();
  send_to_address_.reset();

  bool ok = read_socket_watcher_.StopWatchingSocket();
  DCHECK(ok);
  ok = write_socket_watcher_.StopWatchingSocket();
  DCHECK(ok);

  is_connected_ = false;
  if (!SbSocketDestroy(socket_)) {
    DPLOG(ERROR) << "SbSocketDestroy";
  }

  socket_ = kSbSocketInvalid;
}

int UDPSocketStarboard::GetPeerAddress(IPEndPoint* address) const {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(address);
  if (!is_connected())
    return ERR_SOCKET_NOT_CONNECTED;

  DCHECK(remote_address_);
  *address = *remote_address_;
  return OK;
}

int UDPSocketStarboard::GetLocalAddress(IPEndPoint* address) const {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(address);
  if (!is_connected())
    return ERR_SOCKET_NOT_CONNECTED;

  if (!local_address_.get()) {
    SbSocketAddress address;
    if (!SbSocketGetLocalAddress(socket_, &address))
      return MapLastSocketError(socket_);
    std::unique_ptr<IPEndPoint> endpoint(new IPEndPoint());
    if (!endpoint->FromSbSocketAddress(&address))
      return ERR_FAILED;
    local_address_.reset(endpoint.release());
  }

  *address = *local_address_;
  return OK;
}

int UDPSocketStarboard::Read(IOBuffer* buf,
                             int buf_len,
                             CompletionOnceCallback callback) {
  return RecvFrom(buf, buf_len, NULL, std::move(callback));
}

int UDPSocketStarboard::RecvFrom(IOBuffer* buf,
                                 int buf_len,
                                 IPEndPoint* address,
                                 CompletionOnceCallback callback) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK_NE(kSbSocketInvalid, socket_);
  DCHECK(read_callback_.is_null());
  DCHECK(!recv_from_address_);
  DCHECK(!callback.is_null());  // Synchronous operation not supported
  DCHECK_GT(buf_len, 0);

  int nread = InternalRecvFrom(buf, buf_len, address);
  if (nread != ERR_IO_PENDING)
    return nread;

  if (!base::MessageLoopForIO::current()->Watch(
          socket_, true, base::MessageLoopCurrentForIO::WATCH_READ,
          &read_socket_watcher_, &read_watcher_)) {
    PLOG(ERROR) << "WatchSocket failed on read";
    Error result = MapLastSocketError(socket_);
    if (result == ERR_IO_PENDING) {
      // Watch(...) might call SbSocketWaiterAdd() which does not guarantee
      // setting system error on failure, but we need to treat this as an
      // error since watching the socket failed.
      result = ERR_FAILED;
    }
    LogRead(result, NULL, NULL);
    return result;
  }

  read_buf_ = buf;
  read_buf_len_ = buf_len;
  recv_from_address_ = address;
  read_callback_ = std::move(callback);
  return ERR_IO_PENDING;
}

int UDPSocketStarboard::Write(IOBuffer* buf,
                              int buf_len,
                              CompletionOnceCallback callback,
                              const NetworkTrafficAnnotationTag&) {
  DCHECK(remote_address_);
  return SendToOrWrite(buf, buf_len, remote_address_.get(),
                       std::move(callback));
}

int UDPSocketStarboard::SendTo(IOBuffer* buf,
                               int buf_len,
                               const IPEndPoint& address,
                               CompletionOnceCallback callback) {
  return SendToOrWrite(buf, buf_len, &address, std::move(callback));
}

int UDPSocketStarboard::SendToOrWrite(IOBuffer* buf,
                                      int buf_len,
                                      const IPEndPoint* address,
                                      CompletionOnceCallback callback) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(SbSocketIsValid(socket_));
  DCHECK(write_callback_.is_null());
  DCHECK(!callback.is_null());  // Synchronous operation not supported
  DCHECK_GT(buf_len, 0);

  int result = InternalSendTo(buf, buf_len, address);
  if (result != ERR_IO_PENDING)
    return result;

  if (!base::MessageLoopForIO::current()->Watch(
          socket_, true, base::MessageLoopCurrentForIO::WATCH_WRITE,
          &write_socket_watcher_, &write_watcher_)) {
    DVLOG(1) << "Watch failed on write, error "
             << SbSocketGetLastError(socket_);
    Error result = MapLastSocketError(socket_);
    LogWrite(result, NULL, NULL);
    return result;
  }

  write_buf_ = buf;
  write_buf_len_ = buf_len;
  DCHECK(!send_to_address_.get());
  if (address) {
    send_to_address_.reset(new IPEndPoint(*address));
  }
  write_callback_ = std::move(callback);
  return ERR_IO_PENDING;
}

int UDPSocketStarboard::Connect(const IPEndPoint& address) {
  DCHECK(SbSocketIsValid(socket_));
  net_log_.BeginEvent(
      NetLogEventType::UDP_CONNECT,
      CreateNetLogUDPConnectCallback(
          &address, NetworkChangeNotifier::kInvalidNetworkHandle));
  int rv = InternalConnect(address);
  is_connected_ = (rv == OK);
  net_log_.EndEventWithNetErrorCode(NetLogEventType::UDP_CONNECT, rv);
  return rv;
}

int UDPSocketStarboard::InternalConnect(const IPEndPoint& address) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(SbSocketIsValid(socket_));
  DCHECK(!is_connected());
  DCHECK(!remote_address_.get());

  int rv = 0;
  // Cobalt does random bind despite bind_type_ because we do not connect
  // UDP sockets but Chromium does. And if a socket does recvfrom() without
  // any sendto() before, it needs to be bound to have a local port.
  rv = RandomBind(address.GetFamily() == ADDRESS_FAMILY_IPV4 ?
                      IPAddress::IPv4AllZeros() : IPAddress::IPv6AllZeros());

  if (rv != OK)
    return rv;

  remote_address_.reset(new IPEndPoint(address));

  return OK;
}

int UDPSocketStarboard::Bind(const IPEndPoint& address) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(SbSocketIsValid(socket_));
  DCHECK(!is_connected());

  int rv = DoBind(address);
  if (rv != OK)
    return rv;
  local_address_.reset();
  is_connected_ = true;
  return OK;
}

int UDPSocketStarboard::BindToNetwork(
    NetworkChangeNotifier::NetworkHandle network) {
  NOTIMPLEMENTED();
  return ERR_NOT_IMPLEMENTED;
}

int UDPSocketStarboard::SetReceiveBufferSize(int32_t size) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(SbSocketIsValid(socket_));

  int result = OK;
  if (!SbSocketSetReceiveBufferSize(socket_, size)) {
    result = MapLastSocketError(socket_);
  }
  DCHECK_EQ(result, OK) << "Could not " << __FUNCTION__ << ": "
                        << SbSocketGetLastError(socket_);
  return result;
}

int UDPSocketStarboard::SetSendBufferSize(int32_t size) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(SbSocketIsValid(socket_));

  int result = OK;
  if (!SbSocketSetSendBufferSize(socket_, size)) {
    result = MapLastSocketError(socket_);
  }
  DCHECK_EQ(result, OK) << "Could not " << __FUNCTION__ << ": "
                        << SbSocketGetLastError(socket_);
  return result;
}

int UDPSocketStarboard::AllowAddressReuse() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(!is_connected());
  DCHECK(SbSocketIsValid(socket_));

  return SbSocketSetReuseAddress(socket_, true) ? OK : ERR_FAILED;
}

int UDPSocketStarboard::SetBroadcast(bool broadcast) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(!is_connected());
  DCHECK(SbSocketIsValid(socket_));

  return SbSocketSetBroadcast(socket_, broadcast) ? OK : ERR_FAILED;
}

void UDPSocketStarboard::ReadWatcher::OnSocketReadyToRead(SbSocket /*socket*/) {
  if (!socket_->read_callback_.is_null())
    socket_->DidCompleteRead();
}

void UDPSocketStarboard::WriteWatcher::OnSocketReadyToWrite(
    SbSocket /*socket*/) {
  if (!socket_->write_callback_.is_null())
    socket_->DidCompleteWrite();
}

void UDPSocketStarboard::WriteAsyncWatcher::OnSocketReadyToWrite(
    SbSocket /*socket*/) {
  DVLOG(1) << __func__ << " queue " << socket_->pending_writes_.size()
           << " out of " << socket_->write_async_outstanding_ << " total";
  socket_->StopWatchingSocket();
  socket_->FlushPending();
}

void UDPSocketStarboard::DoReadCallback(int rv) {
  DCHECK_NE(rv, ERR_IO_PENDING);
  DCHECK(!read_callback_.is_null());

  // since Run may result in Read being called, clear read_callback_ up front.
  CompletionOnceCallback c = std::move(read_callback_);
  read_callback_.Reset();
  std::move(c).Run(rv);
}

void UDPSocketStarboard::DoWriteCallback(int rv) {
  DCHECK_NE(rv, ERR_IO_PENDING);
  DCHECK(!write_callback_.is_null());

  // Run may result in Write being called.
  base::ResetAndReturn(&write_callback_).Run(rv);
}

void UDPSocketStarboard::DidCompleteRead() {
  int result = InternalRecvFrom(read_buf_, read_buf_len_, recv_from_address_);
  if (result != ERR_IO_PENDING) {
    read_buf_ = NULL;
    read_buf_len_ = 0;
    recv_from_address_ = NULL;
    bool ok = read_socket_watcher_.StopWatchingSocket();
    DCHECK(ok);
    DoReadCallback(result);
  }
}

void UDPSocketStarboard::LogRead(int result,
                                 const char* bytes,
                                 const IPEndPoint* address) const {
  if (result < 0) {
    net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_RECEIVE_ERROR,
                                      result);
    return;
  }

  if (net_log_.IsCapturing()) {
    net_log_.AddEvent(
        NetLogEventType::UDP_BYTES_RECEIVED,
        CreateNetLogUDPDataTranferCallback(result, bytes, address));
  }

  NetworkActivityMonitor::GetInstance()->IncrementBytesReceived(result);
}

void UDPSocketStarboard::DidCompleteWrite() {
  int result =
      InternalSendTo(write_buf_, write_buf_len_, send_to_address_.get());

  if (result != ERR_IO_PENDING) {
    write_buf_ = NULL;
    write_buf_len_ = 0;
    send_to_address_.reset();
    write_socket_watcher_.StopWatchingSocket();
    DoWriteCallback(result);
  }
}

void UDPSocketStarboard::LogWrite(int result,
                                  const char* bytes,
                                  const IPEndPoint* address) const {
  if (result < 0) {
    net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_SEND_ERROR, result);
    return;
  }

  if (net_log_.IsCapturing()) {
    net_log_.AddEvent(
        NetLogEventType::UDP_BYTES_SENT,
        CreateNetLogUDPDataTranferCallback(result, bytes, address));
  }

  NetworkActivityMonitor::GetInstance()->IncrementBytesSent(result);
}

int UDPSocketStarboard::InternalRecvFrom(IOBuffer* buf,
                                         int buf_len,
                                         IPEndPoint* address) {
  SbSocketAddress sb_address;
  int bytes_transferred =
      SbSocketReceiveFrom(socket_, buf->data(), buf_len, &sb_address);
  int result;
  if (bytes_transferred >= 0) {
    result = bytes_transferred;
    // Passing in NULL address is allowed. This is only to align with other
    // platform's implementation.
    if (address && !address->FromSbSocketAddress(&sb_address)) {
      result = ERR_ADDRESS_INVALID;
    }
  } else {
    result = MapLastSocketError(socket_);
  }

  if (result != ERR_IO_PENDING) {
    IPEndPoint log_address;
    if (result < 0 || !log_address.FromSbSocketAddress(&sb_address)) {
      LogRead(result, buf->data(), NULL);
    } else {
      LogRead(result, buf->data(), &log_address);
    }
  }

  return result;
}

int UDPSocketStarboard::InternalSendTo(IOBuffer* buf,
                                       int buf_len,
                                       const IPEndPoint* address) {
  SbSocketAddress sb_address;
  if (!address || !address->ToSbSocketAddress(&sb_address)) {
    int result = ERR_FAILED;
    LogWrite(result, NULL, NULL);
    return result;
  }

  int result = SbSocketSendTo(socket_, buf->data(), buf_len, &sb_address);

  if (result < 0)
    result = MapLastSocketError(socket_);

  if (result != ERR_IO_PENDING)
    LogWrite(result, buf->data(), address);

  return result;
}

int UDPSocketStarboard::DoBind(const IPEndPoint& address) {
  SbSocketAddress sb_address;
  if (!address.ToSbSocketAddress(&sb_address)) {
    return ERR_UNEXPECTED;
  }

  SbSocketError rv = SbSocketBind(socket_, &sb_address);
  return rv != kSbSocketOk ? MapLastSystemError() : OK;
}

int UDPSocketStarboard::RandomBind(const IPAddress& address) {
  return DoBind(IPEndPoint(address, 0));
}

int UDPSocketStarboard::JoinGroup(const IPAddress& group_address) const {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  if (!is_connected())
    return ERR_SOCKET_NOT_CONNECTED;

  SbSocketAddress sb_address = {0};
  if (!IPEndPoint(group_address, 0).ToSbSocketAddress(&sb_address)) {
    return ERR_ADDRESS_INVALID;
  }

  if (!SbSocketJoinMulticastGroup(socket_, &sb_address)) {
    LOG(WARNING) << "SbSocketJoinMulticastGroup failed on UDP socket.";
    return MapLastSocketError(socket_);
  }
  return OK;
}

int UDPSocketStarboard::LeaveGroup(const IPAddress& group_address) const {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  if (!is_connected())
    return ERR_SOCKET_NOT_CONNECTED;

  DCHECK(false) << "Not supported on Starboard.";
  return ERR_FAILED;
}

int UDPSocketStarboard::SetMulticastInterface(uint32_t interface_index) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  if (is_connected())
    return ERR_SOCKET_IS_CONNECTED;
  DCHECK_EQ(0, interface_index)
      << "Only the default multicast interface is supported on Starboard.";
  return interface_index == 0 ? OK : ERR_FAILED;
}

int UDPSocketStarboard::SetMulticastTimeToLive(int time_to_live) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  if (is_connected())
    return ERR_SOCKET_IS_CONNECTED;

  DCHECK(false) << "Not supported on Starboard.";
  return ERR_FAILED;
}

int UDPSocketStarboard::SetMulticastLoopbackMode(bool loopback) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  if (is_connected())
    return ERR_SOCKET_IS_CONNECTED;

  DCHECK(false) << "Not supported on Starboard.";
  return ERR_FAILED;
}

int UDPSocketStarboard::SetDiffServCodePoint(DiffServCodePoint dscp) {
  NOTIMPLEMENTED();
  return OK;
}

void UDPSocketStarboard::DetachFromThread() {
  DETACH_FROM_THREAD(thread_checker_);
}

void UDPSocketStarboard::ApplySocketTag(const SocketTag&) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  // SocketTag is not applicable to Starboard, see socket_tag.h for more info.
  NOTIMPLEMENTED_LOG_ONCE();
}

UDPSocketStarboardSender::UDPSocketStarboardSender() {}
UDPSocketStarboardSender::~UDPSocketStarboardSender() {}

SendResult::SendResult() : rv(0), write_count(0) {}
SendResult::~SendResult() {}
SendResult::SendResult(int _rv, int _write_count, DatagramBuffers _buffers)
    : rv(_rv), write_count(_write_count), buffers(std::move(_buffers)) {}
SendResult::SendResult(SendResult&& other) = default;

SendResult UDPSocketStarboardSender::InternalSendBuffers(
    const SbSocket& socket,
    DatagramBuffers buffers,
    SbSocketAddress address) const {
  int rv = 0;
  int write_count = 0;
  for (auto& buffer : buffers) {
    int result = Send(socket, buffer->data(), buffer->length(), address);
    if (result < 0) {
      rv = MapLastSocketError(socket);
      break;
    }
    write_count++;
  }
  return SendResult(rv, write_count, std::move(buffers));
}

SendResult UDPSocketStarboardSender::SendBuffers(const SbSocket& socket,
                                                 DatagramBuffers buffers,
                                                 SbSocketAddress address) {
  return InternalSendBuffers(socket, std::move(buffers), address);
}

int UDPSocketStarboardSender::Send(const SbSocket& socket,
                                   const char* buf,
                                   size_t len,
                                   SbSocketAddress address) const {
  return SbSocketSendTo(socket, buf, len, &address);
}

int UDPSocketStarboard::WriteAsync(
    const char* buffer,
    size_t buf_len,
    CompletionOnceCallback callback,
    const NetworkTrafficAnnotationTag& traffic_annotation) {
  DCHECK(datagram_buffer_pool_ != nullptr);
  IncreaseWriteAsyncOutstanding(1);
  datagram_buffer_pool_->Enqueue(buffer, buf_len, &pending_writes_);
  return InternalWriteAsync(std::move(callback), traffic_annotation);
}

int UDPSocketStarboard::WriteAsync(
    DatagramBuffers buffers,
    CompletionOnceCallback callback,
    const NetworkTrafficAnnotationTag& traffic_annotation) {
  IncreaseWriteAsyncOutstanding(buffers.size());
  pending_writes_.splice(pending_writes_.end(), std::move(buffers));
  return InternalWriteAsync(std::move(callback), traffic_annotation);
}

int UDPSocketStarboard::InternalWriteAsync(
    CompletionOnceCallback callback,
    const NetworkTrafficAnnotationTag& traffic_annotation) {
  CHECK(write_callback_.is_null());

  // Surface error immediately if one is pending.
  if (last_async_result_ < 0) {
    return ResetLastAsyncResult();
  }

  size_t flush_threshold =
      write_batching_active_ ? kWriteAsyncPostBuffersThreshold : 1;
  if (pending_writes_.size() >= flush_threshold) {
    FlushPending();
    // Surface error immediately if one is pending.
    if (last_async_result_ < 0) {
      return ResetLastAsyncResult();
    }
  }

  if (!write_async_timer_running_) {
    write_async_timer_running_ = true;
    write_async_timer_.Start(FROM_HERE, kWriteAsyncMsThreshold, this,
                             &UDPSocketStarboard::OnWriteAsyncTimerFired);
  }

  int blocking_threshold =
      write_batching_active_ ? kWriteAsyncMaxBuffersThreshold : 1;
  if (write_async_outstanding_ >= blocking_threshold) {
    write_callback_ = std::move(callback);
    return ERR_IO_PENDING;
  }

  DVLOG(2) << __func__ << " pending " << pending_writes_.size()
           << " outstanding " << write_async_outstanding_;
  return ResetWrittenBytes();
}

DatagramBuffers UDPSocketStarboard::GetUnwrittenBuffers() {
  write_async_outstanding_ -= pending_writes_.size();
  return std::move(pending_writes_);
}

void UDPSocketStarboard::FlushPending() {
  // Nothing to do if socket is blocked.
  if (write_async_watcher_->watching())
    return;

  if (pending_writes_.empty())
    return;

  if (write_async_timer_running_)
    write_async_timer_.Reset();

  int num_pending_writes = static_cast<int>(pending_writes_.size());
  if (!write_multi_core_enabled_ ||
      // Don't bother with post if not enough buffers
      (num_pending_writes <= kWriteAsyncMinBuffersThreshold &&
       // but not if there is a previous post
       // outstanding, to prevent out of order transmission.
       (num_pending_writes == write_async_outstanding_))) {
    LocalSendBuffers();
  } else {
    PostSendBuffers();
  }
}

// TODO(ckrasic) Sad face.  Do this lazily because many tests exploded
// otherwise.  |threading_and_tasks.md| advises to instantiate a
// |base::test::ScopedTaskEnvironment| in the test, implementing that
// for all tests that might exercise QUIC is too daunting.  Also, in
// some tests it seemed like following the advice just broke in other
// ways.
base::SequencedTaskRunner* UDPSocketStarboard::GetTaskRunner() {
  if (task_runner_ == nullptr) {
    task_runner_ = CreateSequencedTaskRunnerWithTraits(base::TaskTraits());
  }
  return task_runner_.get();
}

void UDPSocketStarboard::OnWriteAsyncTimerFired() {
  DVLOG(2) << __func__ << " pending writes " << pending_writes_.size();
  if (pending_writes_.empty()) {
    write_async_timer_.Stop();
    write_async_timer_running_ = false;
    return;
  }
  if (last_async_result_ < 0) {
    DVLOG(1) << __func__ << " socket not writeable";
    return;
  }
  FlushPending();
}

void UDPSocketStarboard::LocalSendBuffers() {
  DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
           << write_async_outstanding_ << " total";
  SbSocketAddress sb_address;
  int result = remote_address_.get()->ToSbSocketAddress(&sb_address);
  DCHECK(result);
  DidSendBuffers(
      sender_->SendBuffers(socket_, std::move(pending_writes_), sb_address));
}

void UDPSocketStarboard::PostSendBuffers() {
  DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
           << write_async_outstanding_ << " total";
  SbSocketAddress sb_address;
  DCHECK(remote_address_.get()->ToSbSocketAddress(&sb_address));
  base::PostTaskAndReplyWithResult(
      GetTaskRunner(), FROM_HERE,
      base::BindOnce(&UDPSocketStarboardSender::SendBuffers, sender_, socket_,
                     std::move(pending_writes_), sb_address),
      base::BindOnce(&UDPSocketStarboard::DidSendBuffers,
                     weak_factory_.GetWeakPtr()));
}

void UDPSocketStarboard::DidSendBuffers(SendResult send_result) {
  DVLOG(3) << __func__;
  int write_count = send_result.write_count;
  DatagramBuffers& buffers = send_result.buffers;

  DCHECK(!buffers.empty());
  int num_buffers = buffers.size();

  // Dequeue buffers that have been written.
  if (write_count > 0) {
    write_async_outstanding_ -= write_count;

    DatagramBuffers::iterator it;
    // Generate logs for written buffers
    it = buffers.begin();
    for (int i = 0; i < write_count; i++, it++) {
      auto& buffer = *it;
      LogWrite(buffer->length(), buffer->data(), NULL);
      written_bytes_ += buffer->length();
    }
    // Return written buffers to pool
    DatagramBuffers written_buffers;
    if (write_count == num_buffers) {
      it = buffers.end();
    } else {
      it = buffers.begin();
      for (int i = 0; i < write_count; i++) {
        it++;
      }
    }
    written_buffers.splice(written_buffers.end(), buffers, buffers.begin(), it);
    DCHECK(datagram_buffer_pool_ != nullptr);
    datagram_buffer_pool_->Dequeue(&written_buffers);
  }

  // Requeue left-over (unwritten) buffers.
  if (!buffers.empty()) {
    DVLOG(2) << __func__ << " requeue " << buffers.size() << " buffers";
    pending_writes_.splice(pending_writes_.begin(), std::move(buffers));
  }

  last_async_result_ = send_result.rv;
  if (last_async_result_ == ERR_IO_PENDING) {
    DVLOG(2) << __func__ << " WatchSocket start";
    if (!WatchSocket()) {
      last_async_result_ = MapLastSocketError(socket_);
      DVLOG(1) << "WatchSocket failed on write, error: " << last_async_result_;
      LogWrite(last_async_result_, NULL, NULL);
    } else {
      last_async_result_ = 0;
    }
  } else if (last_async_result_ < 0 || pending_writes_.empty()) {
    DVLOG(2) << __func__ << " WatchSocket stop: result "
             << ErrorToShortString(last_async_result_) << " pending_writes "
             << pending_writes_.size();
    StopWatchingSocket();
  }
  DCHECK(last_async_result_ != ERR_IO_PENDING);

  if (write_callback_.is_null())
    return;

  if (last_async_result_ < 0) {
    DVLOG(1) << last_async_result_;
    // Update the writer with the latest result.
    DoWriteCallback(ResetLastAsyncResult());
  } else if (write_async_outstanding_ < kWriteAsyncCallbackBuffersThreshold) {
    DVLOG(1) << write_async_outstanding_ << " < "
             << kWriteAsyncCallbackBuffersThreshold;
    DoWriteCallback(ResetWrittenBytes());
  }
}

int UDPSocketStarboard::SetDoNotFragment() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(SbSocketIsValid(socket_));

  // Starboard does not supported sending non-fragmented packets yet.
  // All QUIC Streams call this function at initialization, setting sockets to
  // send non-fragmented packets may have a slight performance boost.
  return ERR_NOT_IMPLEMENTED;
}

void UDPSocketStarboard::SetMsgConfirm(bool confirm) {
  NOTIMPLEMENTED();
}

bool UDPSocketStarboard::WatchSocket() {
  if (write_async_watcher_->watching())
    return true;
  bool result = InternalWatchSocket();
  if (result) {
    write_async_watcher_->set_watching(true);
  }
  return result;
}

void UDPSocketStarboard::StopWatchingSocket() {
  if (!write_async_watcher_->watching())
    return;
  InternalStopWatchingSocket();
  write_async_watcher_->set_watching(false);
}

bool UDPSocketStarboard::InternalWatchSocket() {
  return base::MessageLoopForIO::current()->Watch(
      socket_, true, base::MessageLoopCurrentForIO::WATCH_WRITE,
      &write_socket_watcher_, write_async_watcher_.get());
}

void UDPSocketStarboard::InternalStopWatchingSocket() {
  bool ok = write_socket_watcher_.StopWatchingSocket();
  DCHECK(ok);
}

void UDPSocketStarboard::SetMaxPacketSize(size_t max_packet_size) {
  datagram_buffer_pool_ = std::make_unique<DatagramBufferPool>(max_packet_size);
}

int UDPSocketStarboard::ResetLastAsyncResult() {
  int result = last_async_result_;
  last_async_result_ = 0;
  return result;
}

int UDPSocketStarboard::ResetWrittenBytes() {
  int bytes = written_bytes_;
  written_bytes_ = 0;
  return bytes;
}

}  // namespace net
