blob: 369cf684df6346515039023f410c7599e37f1f37 [file] [log] [blame]
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Adapted from udp_socket_libevent.cc
#include "net/socket/udp_socket_starboard.h"
#include "base/callback.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "base/rand_util.h"
#include "base/task/post_task.h"
#include "base/task_runner_util.h"
#include "base/trace_event/trace_event.h"
#include "net/base/io_buffer.h"
#include "net/base/ip_endpoint.h"
#include "net/base/net_errors.h"
#include "net/base/network_activity_monitor.h"
#include "net/log/net_log.h"
#include "net/log/net_log_event_type.h"
#include "net/log/net_log_source.h"
#include "net/log/net_log_source_type.h"
#include "net/socket/udp_net_log_parameters.h"
#include "starboard/common/socket.h"
#include "starboard/system.h"
namespace net {
UDPSocketStarboard::UDPSocketStarboard(DatagramSocket::BindType bind_type,
net::NetLog* net_log,
const net::NetLogSource& source)
: write_async_watcher_(std::make_unique<WriteAsyncWatcher>(this)),
sender_(new UDPSocketStarboardSender()),
socket_(kSbSocketInvalid),
socket_options_(0),
bind_type_(bind_type),
read_watcher_(this),
write_watcher_(this),
read_buf_len_(0),
recv_from_address_(NULL),
write_buf_len_(0),
net_log_(NetLogWithSource::Make(net_log, NetLogSourceType::UDP_SOCKET)),
weak_factory_(this) {
net_log_.BeginEvent(NetLogEventType::SOCKET_ALIVE,
source.ToEventParametersCallback());
}
UDPSocketStarboard::~UDPSocketStarboard() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
Close();
net_log_.EndEvent(NetLogEventType::SOCKET_ALIVE);
}
int UDPSocketStarboard::Open(AddressFamily address_family) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!SbSocketIsValid(socket_));
address_type_ =
(address_family == ADDRESS_FAMILY_IPV6 ? kSbSocketAddressTypeIpv6
: kSbSocketAddressTypeIpv4);
socket_ = SbSocketCreate(address_type_, kSbSocketProtocolUdp);
if (!SbSocketIsValid(socket_)) {
return MapLastSystemError();
}
return OK;
}
void UDPSocketStarboard::Close() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (socket_ == kSbSocketInvalid)
return;
// Zero out any pending read/write callback state.
read_buf_ = NULL;
read_buf_len_ = 0;
read_callback_.Reset();
recv_from_address_ = NULL;
write_buf_ = NULL;
write_buf_len_ = 0;
write_callback_.Reset();
send_to_address_.reset();
bool ok = read_socket_watcher_.StopWatchingSocket();
DCHECK(ok);
ok = write_socket_watcher_.StopWatchingSocket();
DCHECK(ok);
is_connected_ = false;
if (!SbSocketDestroy(socket_)) {
DPLOG(ERROR) << "SbSocketDestroy";
}
socket_ = kSbSocketInvalid;
}
int UDPSocketStarboard::GetPeerAddress(IPEndPoint* address) const {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(address);
if (!is_connected())
return ERR_SOCKET_NOT_CONNECTED;
DCHECK(remote_address_);
*address = *remote_address_;
return OK;
}
int UDPSocketStarboard::GetLocalAddress(IPEndPoint* address) const {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(address);
if (!is_connected())
return ERR_SOCKET_NOT_CONNECTED;
if (!local_address_.get()) {
SbSocketAddress address;
if (!SbSocketGetLocalAddress(socket_, &address))
return MapLastSocketError(socket_);
std::unique_ptr<IPEndPoint> endpoint(new IPEndPoint());
if (!endpoint->FromSbSocketAddress(&address))
return ERR_FAILED;
local_address_.reset(endpoint.release());
}
*address = *local_address_;
return OK;
}
int UDPSocketStarboard::Read(IOBuffer* buf,
int buf_len,
CompletionOnceCallback callback) {
return RecvFrom(buf, buf_len, NULL, std::move(callback));
}
int UDPSocketStarboard::RecvFrom(IOBuffer* buf,
int buf_len,
IPEndPoint* address,
CompletionOnceCallback callback) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK_NE(kSbSocketInvalid, socket_);
DCHECK(read_callback_.is_null());
DCHECK(!recv_from_address_);
DCHECK(!callback.is_null()); // Synchronous operation not supported
DCHECK_GT(buf_len, 0);
int nread = InternalRecvFrom(buf, buf_len, address);
if (nread != ERR_IO_PENDING)
return nread;
if (!base::MessageLoopForIO::current()->Watch(
socket_, true, base::MessageLoopCurrentForIO::WATCH_READ,
&read_socket_watcher_, &read_watcher_)) {
PLOG(ERROR) << "WatchSocket failed on read";
Error result = MapLastSocketError(socket_);
if (result == ERR_IO_PENDING) {
// Watch(...) might call SbSocketWaiterAdd() which does not guarantee
// setting system error on failure, but we need to treat this as an
// error since watching the socket failed.
result = ERR_FAILED;
}
LogRead(result, NULL, NULL);
return result;
}
read_buf_ = buf;
read_buf_len_ = buf_len;
recv_from_address_ = address;
read_callback_ = std::move(callback);
return ERR_IO_PENDING;
}
int UDPSocketStarboard::Write(IOBuffer* buf,
int buf_len,
CompletionOnceCallback callback,
const NetworkTrafficAnnotationTag&) {
DCHECK(remote_address_);
return SendToOrWrite(buf, buf_len, remote_address_.get(),
std::move(callback));
}
int UDPSocketStarboard::SendTo(IOBuffer* buf,
int buf_len,
const IPEndPoint& address,
CompletionOnceCallback callback) {
return SendToOrWrite(buf, buf_len, &address, std::move(callback));
}
int UDPSocketStarboard::SendToOrWrite(IOBuffer* buf,
int buf_len,
const IPEndPoint* address,
CompletionOnceCallback callback) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(SbSocketIsValid(socket_));
DCHECK(write_callback_.is_null());
DCHECK(!callback.is_null()); // Synchronous operation not supported
DCHECK_GT(buf_len, 0);
int result = InternalSendTo(buf, buf_len, address);
if (result != ERR_IO_PENDING)
return result;
if (!base::MessageLoopForIO::current()->Watch(
socket_, true, base::MessageLoopCurrentForIO::WATCH_WRITE,
&write_socket_watcher_, &write_watcher_)) {
DVLOG(1) << "Watch failed on write, error "
<< SbSocketGetLastError(socket_);
Error result = MapLastSocketError(socket_);
LogWrite(result, NULL, NULL);
return result;
}
write_buf_ = buf;
write_buf_len_ = buf_len;
DCHECK(!send_to_address_.get());
if (address) {
send_to_address_.reset(new IPEndPoint(*address));
}
write_callback_ = std::move(callback);
return ERR_IO_PENDING;
}
int UDPSocketStarboard::Connect(const IPEndPoint& address) {
DCHECK(SbSocketIsValid(socket_));
net_log_.BeginEvent(
NetLogEventType::UDP_CONNECT,
CreateNetLogUDPConnectCallback(
&address, NetworkChangeNotifier::kInvalidNetworkHandle));
int rv = InternalConnect(address);
is_connected_ = (rv == OK);
net_log_.EndEventWithNetErrorCode(NetLogEventType::UDP_CONNECT, rv);
return rv;
}
int UDPSocketStarboard::InternalConnect(const IPEndPoint& address) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(SbSocketIsValid(socket_));
DCHECK(!is_connected());
DCHECK(!remote_address_.get());
int rv = 0;
// Cobalt does random bind despite bind_type_ because we do not connect
// UDP sockets but Chromium does. And if a socket does recvfrom() without
// any sendto() before, it needs to be bound to have a local port.
rv = RandomBind(address.GetFamily() == ADDRESS_FAMILY_IPV4 ?
IPAddress::IPv4AllZeros() : IPAddress::IPv6AllZeros());
if (rv != OK)
return rv;
remote_address_.reset(new IPEndPoint(address));
return OK;
}
int UDPSocketStarboard::Bind(const IPEndPoint& address) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(SbSocketIsValid(socket_));
DCHECK(!is_connected());
int rv = DoBind(address);
if (rv != OK)
return rv;
local_address_.reset();
is_connected_ = true;
return OK;
}
int UDPSocketStarboard::BindToNetwork(
NetworkChangeNotifier::NetworkHandle network) {
NOTIMPLEMENTED();
return ERR_NOT_IMPLEMENTED;
}
int UDPSocketStarboard::SetReceiveBufferSize(int32_t size) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(SbSocketIsValid(socket_));
int result = OK;
if (!SbSocketSetReceiveBufferSize(socket_, size)) {
result = MapLastSocketError(socket_);
}
DCHECK_EQ(result, OK) << "Could not " << __FUNCTION__ << ": "
<< SbSocketGetLastError(socket_);
return result;
}
int UDPSocketStarboard::SetSendBufferSize(int32_t size) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(SbSocketIsValid(socket_));
int result = OK;
if (!SbSocketSetSendBufferSize(socket_, size)) {
result = MapLastSocketError(socket_);
}
DCHECK_EQ(result, OK) << "Could not " << __FUNCTION__ << ": "
<< SbSocketGetLastError(socket_);
return result;
}
int UDPSocketStarboard::AllowAddressReuse() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!is_connected());
DCHECK(SbSocketIsValid(socket_));
return SbSocketSetReuseAddress(socket_, true) ? OK : ERR_FAILED;
}
int UDPSocketStarboard::SetBroadcast(bool broadcast) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!is_connected());
DCHECK(SbSocketIsValid(socket_));
return SbSocketSetBroadcast(socket_, broadcast) ? OK : ERR_FAILED;
}
void UDPSocketStarboard::ReadWatcher::OnSocketReadyToRead(SbSocket /*socket*/) {
if (!socket_->read_callback_.is_null())
socket_->DidCompleteRead();
}
void UDPSocketStarboard::WriteWatcher::OnSocketReadyToWrite(
SbSocket /*socket*/) {
if (!socket_->write_callback_.is_null())
socket_->DidCompleteWrite();
}
void UDPSocketStarboard::WriteAsyncWatcher::OnSocketReadyToWrite(
SbSocket /*socket*/) {
DVLOG(1) << __func__ << " queue " << socket_->pending_writes_.size()
<< " out of " << socket_->write_async_outstanding_ << " total";
socket_->StopWatchingSocket();
socket_->FlushPending();
}
void UDPSocketStarboard::DoReadCallback(int rv) {
DCHECK_NE(rv, ERR_IO_PENDING);
DCHECK(!read_callback_.is_null());
// since Run may result in Read being called, clear read_callback_ up front.
CompletionOnceCallback c = std::move(read_callback_);
read_callback_.Reset();
std::move(c).Run(rv);
}
void UDPSocketStarboard::DoWriteCallback(int rv) {
DCHECK_NE(rv, ERR_IO_PENDING);
DCHECK(!write_callback_.is_null());
// Run may result in Write being called.
base::ResetAndReturn(&write_callback_).Run(rv);
}
void UDPSocketStarboard::DidCompleteRead() {
int result = InternalRecvFrom(read_buf_, read_buf_len_, recv_from_address_);
if (result != ERR_IO_PENDING) {
read_buf_ = NULL;
read_buf_len_ = 0;
recv_from_address_ = NULL;
bool ok = read_socket_watcher_.StopWatchingSocket();
DCHECK(ok);
DoReadCallback(result);
}
}
void UDPSocketStarboard::LogRead(int result,
const char* bytes,
const IPEndPoint* address) const {
if (result < 0) {
net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_RECEIVE_ERROR,
result);
return;
}
if (net_log_.IsCapturing()) {
net_log_.AddEvent(
NetLogEventType::UDP_BYTES_RECEIVED,
CreateNetLogUDPDataTranferCallback(result, bytes, address));
}
NetworkActivityMonitor::GetInstance()->IncrementBytesReceived(result);
}
void UDPSocketStarboard::DidCompleteWrite() {
int result =
InternalSendTo(write_buf_, write_buf_len_, send_to_address_.get());
if (result != ERR_IO_PENDING) {
write_buf_ = NULL;
write_buf_len_ = 0;
send_to_address_.reset();
write_socket_watcher_.StopWatchingSocket();
DoWriteCallback(result);
}
}
void UDPSocketStarboard::LogWrite(int result,
const char* bytes,
const IPEndPoint* address) const {
if (result < 0) {
net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_SEND_ERROR, result);
return;
}
if (net_log_.IsCapturing()) {
net_log_.AddEvent(
NetLogEventType::UDP_BYTES_SENT,
CreateNetLogUDPDataTranferCallback(result, bytes, address));
}
NetworkActivityMonitor::GetInstance()->IncrementBytesSent(result);
}
int UDPSocketStarboard::InternalRecvFrom(IOBuffer* buf,
int buf_len,
IPEndPoint* address) {
SbSocketAddress sb_address;
int bytes_transferred =
SbSocketReceiveFrom(socket_, buf->data(), buf_len, &sb_address);
int result;
if (bytes_transferred >= 0) {
result = bytes_transferred;
// Passing in NULL address is allowed. This is only to align with other
// platform's implementation.
if (address && !address->FromSbSocketAddress(&sb_address)) {
result = ERR_ADDRESS_INVALID;
}
} else {
result = MapLastSocketError(socket_);
}
if (result != ERR_IO_PENDING) {
IPEndPoint log_address;
if (result < 0 || !log_address.FromSbSocketAddress(&sb_address)) {
LogRead(result, buf->data(), NULL);
} else {
LogRead(result, buf->data(), &log_address);
}
}
return result;
}
int UDPSocketStarboard::InternalSendTo(IOBuffer* buf,
int buf_len,
const IPEndPoint* address) {
SbSocketAddress sb_address;
if (!address || !address->ToSbSocketAddress(&sb_address)) {
int result = ERR_FAILED;
LogWrite(result, NULL, NULL);
return result;
}
int result = SbSocketSendTo(socket_, buf->data(), buf_len, &sb_address);
if (result < 0)
result = MapLastSocketError(socket_);
if (result != ERR_IO_PENDING)
LogWrite(result, buf->data(), address);
return result;
}
int UDPSocketStarboard::DoBind(const IPEndPoint& address) {
SbSocketAddress sb_address;
if (!address.ToSbSocketAddress(&sb_address)) {
return ERR_UNEXPECTED;
}
SbSocketError rv = SbSocketBind(socket_, &sb_address);
return rv != kSbSocketOk ? MapLastSystemError() : OK;
}
int UDPSocketStarboard::RandomBind(const IPAddress& address) {
return DoBind(IPEndPoint(address, 0));
}
int UDPSocketStarboard::JoinGroup(const IPAddress& group_address) const {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (!is_connected())
return ERR_SOCKET_NOT_CONNECTED;
SbSocketAddress sb_address = {0};
if (!IPEndPoint(group_address, 0).ToSbSocketAddress(&sb_address)) {
return ERR_ADDRESS_INVALID;
}
if (!SbSocketJoinMulticastGroup(socket_, &sb_address)) {
LOG(WARNING) << "SbSocketJoinMulticastGroup failed on UDP socket.";
return MapLastSocketError(socket_);
}
return OK;
}
int UDPSocketStarboard::LeaveGroup(const IPAddress& group_address) const {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (!is_connected())
return ERR_SOCKET_NOT_CONNECTED;
DCHECK(false) << "Not supported on Starboard.";
return ERR_FAILED;
}
int UDPSocketStarboard::SetMulticastInterface(uint32_t interface_index) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (is_connected())
return ERR_SOCKET_IS_CONNECTED;
DCHECK_EQ(0, interface_index)
<< "Only the default multicast interface is supported on Starboard.";
return interface_index == 0 ? OK : ERR_FAILED;
}
int UDPSocketStarboard::SetMulticastTimeToLive(int time_to_live) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (is_connected())
return ERR_SOCKET_IS_CONNECTED;
DCHECK(false) << "Not supported on Starboard.";
return ERR_FAILED;
}
int UDPSocketStarboard::SetMulticastLoopbackMode(bool loopback) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (is_connected())
return ERR_SOCKET_IS_CONNECTED;
DCHECK(false) << "Not supported on Starboard.";
return ERR_FAILED;
}
int UDPSocketStarboard::SetDiffServCodePoint(DiffServCodePoint dscp) {
NOTIMPLEMENTED();
return OK;
}
void UDPSocketStarboard::DetachFromThread() {
DETACH_FROM_THREAD(thread_checker_);
}
void UDPSocketStarboard::ApplySocketTag(const SocketTag&) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
// SocketTag is not applicable to Starboard, see socket_tag.h for more info.
NOTIMPLEMENTED_LOG_ONCE();
}
UDPSocketStarboardSender::UDPSocketStarboardSender() {}
UDPSocketStarboardSender::~UDPSocketStarboardSender() {}
SendResult::SendResult() : rv(0), write_count(0) {}
SendResult::~SendResult() {}
SendResult::SendResult(int _rv, int _write_count, DatagramBuffers _buffers)
: rv(_rv), write_count(_write_count), buffers(std::move(_buffers)) {}
SendResult::SendResult(SendResult&& other) = default;
SendResult UDPSocketStarboardSender::InternalSendBuffers(
const SbSocket& socket,
DatagramBuffers buffers,
SbSocketAddress address) const {
int rv = 0;
int write_count = 0;
for (auto& buffer : buffers) {
int result = Send(socket, buffer->data(), buffer->length(), address);
if (result < 0) {
rv = MapLastSocketError(socket);
break;
}
write_count++;
}
return SendResult(rv, write_count, std::move(buffers));
}
SendResult UDPSocketStarboardSender::SendBuffers(const SbSocket& socket,
DatagramBuffers buffers,
SbSocketAddress address) {
return InternalSendBuffers(socket, std::move(buffers), address);
}
int UDPSocketStarboardSender::Send(const SbSocket& socket,
const char* buf,
size_t len,
SbSocketAddress address) const {
return SbSocketSendTo(socket, buf, len, &address);
}
int UDPSocketStarboard::WriteAsync(
const char* buffer,
size_t buf_len,
CompletionOnceCallback callback,
const NetworkTrafficAnnotationTag& traffic_annotation) {
DCHECK(datagram_buffer_pool_ != nullptr);
IncreaseWriteAsyncOutstanding(1);
datagram_buffer_pool_->Enqueue(buffer, buf_len, &pending_writes_);
return InternalWriteAsync(std::move(callback), traffic_annotation);
}
int UDPSocketStarboard::WriteAsync(
DatagramBuffers buffers,
CompletionOnceCallback callback,
const NetworkTrafficAnnotationTag& traffic_annotation) {
IncreaseWriteAsyncOutstanding(buffers.size());
pending_writes_.splice(pending_writes_.end(), std::move(buffers));
return InternalWriteAsync(std::move(callback), traffic_annotation);
}
int UDPSocketStarboard::InternalWriteAsync(
CompletionOnceCallback callback,
const NetworkTrafficAnnotationTag& traffic_annotation) {
CHECK(write_callback_.is_null());
// Surface error immediately if one is pending.
if (last_async_result_ < 0) {
return ResetLastAsyncResult();
}
size_t flush_threshold =
write_batching_active_ ? kWriteAsyncPostBuffersThreshold : 1;
if (pending_writes_.size() >= flush_threshold) {
FlushPending();
// Surface error immediately if one is pending.
if (last_async_result_ < 0) {
return ResetLastAsyncResult();
}
}
if (!write_async_timer_running_) {
write_async_timer_running_ = true;
write_async_timer_.Start(FROM_HERE, kWriteAsyncMsThreshold, this,
&UDPSocketStarboard::OnWriteAsyncTimerFired);
}
int blocking_threshold =
write_batching_active_ ? kWriteAsyncMaxBuffersThreshold : 1;
if (write_async_outstanding_ >= blocking_threshold) {
write_callback_ = std::move(callback);
return ERR_IO_PENDING;
}
DVLOG(2) << __func__ << " pending " << pending_writes_.size()
<< " outstanding " << write_async_outstanding_;
return ResetWrittenBytes();
}
DatagramBuffers UDPSocketStarboard::GetUnwrittenBuffers() {
write_async_outstanding_ -= pending_writes_.size();
return std::move(pending_writes_);
}
void UDPSocketStarboard::FlushPending() {
// Nothing to do if socket is blocked.
if (write_async_watcher_->watching())
return;
if (pending_writes_.empty())
return;
if (write_async_timer_running_)
write_async_timer_.Reset();
int num_pending_writes = static_cast<int>(pending_writes_.size());
if (!write_multi_core_enabled_ ||
// Don't bother with post if not enough buffers
(num_pending_writes <= kWriteAsyncMinBuffersThreshold &&
// but not if there is a previous post
// outstanding, to prevent out of order transmission.
(num_pending_writes == write_async_outstanding_))) {
LocalSendBuffers();
} else {
PostSendBuffers();
}
}
// TODO(ckrasic) Sad face. Do this lazily because many tests exploded
// otherwise. |threading_and_tasks.md| advises to instantiate a
// |base::test::ScopedTaskEnvironment| in the test, implementing that
// for all tests that might exercise QUIC is too daunting. Also, in
// some tests it seemed like following the advice just broke in other
// ways.
base::SequencedTaskRunner* UDPSocketStarboard::GetTaskRunner() {
if (task_runner_ == nullptr) {
task_runner_ = CreateSequencedTaskRunnerWithTraits(base::TaskTraits());
}
return task_runner_.get();
}
void UDPSocketStarboard::OnWriteAsyncTimerFired() {
DVLOG(2) << __func__ << " pending writes " << pending_writes_.size();
if (pending_writes_.empty()) {
write_async_timer_.Stop();
write_async_timer_running_ = false;
return;
}
if (last_async_result_ < 0) {
DVLOG(1) << __func__ << " socket not writeable";
return;
}
FlushPending();
}
void UDPSocketStarboard::LocalSendBuffers() {
DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
<< write_async_outstanding_ << " total";
SbSocketAddress sb_address;
int result = remote_address_.get()->ToSbSocketAddress(&sb_address);
DCHECK(result);
DidSendBuffers(
sender_->SendBuffers(socket_, std::move(pending_writes_), sb_address));
}
void UDPSocketStarboard::PostSendBuffers() {
DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
<< write_async_outstanding_ << " total";
SbSocketAddress sb_address;
DCHECK(remote_address_.get()->ToSbSocketAddress(&sb_address));
base::PostTaskAndReplyWithResult(
GetTaskRunner(), FROM_HERE,
base::BindOnce(&UDPSocketStarboardSender::SendBuffers, sender_, socket_,
std::move(pending_writes_), sb_address),
base::BindOnce(&UDPSocketStarboard::DidSendBuffers,
weak_factory_.GetWeakPtr()));
}
void UDPSocketStarboard::DidSendBuffers(SendResult send_result) {
DVLOG(3) << __func__;
int write_count = send_result.write_count;
DatagramBuffers& buffers = send_result.buffers;
DCHECK(!buffers.empty());
int num_buffers = buffers.size();
// Dequeue buffers that have been written.
if (write_count > 0) {
write_async_outstanding_ -= write_count;
DatagramBuffers::iterator it;
// Generate logs for written buffers
it = buffers.begin();
for (int i = 0; i < write_count; i++, it++) {
auto& buffer = *it;
LogWrite(buffer->length(), buffer->data(), NULL);
written_bytes_ += buffer->length();
}
// Return written buffers to pool
DatagramBuffers written_buffers;
if (write_count == num_buffers) {
it = buffers.end();
} else {
it = buffers.begin();
for (int i = 0; i < write_count; i++) {
it++;
}
}
written_buffers.splice(written_buffers.end(), buffers, buffers.begin(), it);
DCHECK(datagram_buffer_pool_ != nullptr);
datagram_buffer_pool_->Dequeue(&written_buffers);
}
// Requeue left-over (unwritten) buffers.
if (!buffers.empty()) {
DVLOG(2) << __func__ << " requeue " << buffers.size() << " buffers";
pending_writes_.splice(pending_writes_.begin(), std::move(buffers));
}
last_async_result_ = send_result.rv;
if (last_async_result_ == ERR_IO_PENDING) {
DVLOG(2) << __func__ << " WatchSocket start";
if (!WatchSocket()) {
last_async_result_ = MapLastSocketError(socket_);
DVLOG(1) << "WatchSocket failed on write, error: " << last_async_result_;
LogWrite(last_async_result_, NULL, NULL);
} else {
last_async_result_ = 0;
}
} else if (last_async_result_ < 0 || pending_writes_.empty()) {
DVLOG(2) << __func__ << " WatchSocket stop: result "
<< ErrorToShortString(last_async_result_) << " pending_writes "
<< pending_writes_.size();
StopWatchingSocket();
}
DCHECK(last_async_result_ != ERR_IO_PENDING);
if (write_callback_.is_null())
return;
if (last_async_result_ < 0) {
DVLOG(1) << last_async_result_;
// Update the writer with the latest result.
DoWriteCallback(ResetLastAsyncResult());
} else if (write_async_outstanding_ < kWriteAsyncCallbackBuffersThreshold) {
DVLOG(1) << write_async_outstanding_ << " < "
<< kWriteAsyncCallbackBuffersThreshold;
DoWriteCallback(ResetWrittenBytes());
}
}
int UDPSocketStarboard::SetDoNotFragment() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(SbSocketIsValid(socket_));
// Starboard does not supported sending non-fragmented packets yet.
// All QUIC Streams call this function at initialization, setting sockets to
// send non-fragmented packets may have a slight performance boost.
return ERR_NOT_IMPLEMENTED;
}
void UDPSocketStarboard::SetMsgConfirm(bool confirm) {
NOTIMPLEMENTED();
}
bool UDPSocketStarboard::WatchSocket() {
if (write_async_watcher_->watching())
return true;
bool result = InternalWatchSocket();
if (result) {
write_async_watcher_->set_watching(true);
}
return result;
}
void UDPSocketStarboard::StopWatchingSocket() {
if (!write_async_watcher_->watching())
return;
InternalStopWatchingSocket();
write_async_watcher_->set_watching(false);
}
bool UDPSocketStarboard::InternalWatchSocket() {
return base::MessageLoopForIO::current()->Watch(
socket_, true, base::MessageLoopCurrentForIO::WATCH_WRITE,
&write_socket_watcher_, write_async_watcher_.get());
}
void UDPSocketStarboard::InternalStopWatchingSocket() {
bool ok = write_socket_watcher_.StopWatchingSocket();
DCHECK(ok);
}
void UDPSocketStarboard::SetMaxPacketSize(size_t max_packet_size) {
datagram_buffer_pool_ = std::make_unique<DatagramBufferPool>(max_packet_size);
}
int UDPSocketStarboard::ResetLastAsyncResult() {
int result = last_async_result_;
last_async_result_ = 0;
return result;
}
int UDPSocketStarboard::ResetWrittenBytes() {
int bytes = written_bytes_;
written_bytes_ = 0;
return bytes;
}
} // namespace net