blob: 3abdcef2824df3b70c7504fa8d4e6a0c880812a7 [file] [log] [blame]
Andrew Top0d1858f2019-05-15 22:01:47 -07001// Copyright 2015 Google Inc. All Rights Reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Adapted from udp_socket_libevent.cc
16
17#include "net/socket/udp_socket_starboard.h"
18
19#include "base/callback.h"
20#include "base/logging.h"
21#include "base/message_loop/message_loop.h"
22#include "base/rand_util.h"
23#include "base/task/post_task.h"
24#include "base/task_runner_util.h"
25#include "base/trace_event/trace_event.h"
26#include "net/base/io_buffer.h"
27#include "net/base/ip_endpoint.h"
28#include "net/base/net_errors.h"
29#include "net/base/network_activity_monitor.h"
30#include "net/log/net_log.h"
31#include "net/log/net_log_event_type.h"
32#include "net/log/net_log_source.h"
33#include "net/log/net_log_source_type.h"
34#include "net/socket/udp_net_log_parameters.h"
Chad Duffinac9ac062019-07-23 10:06:45 -070035#include "starboard/common/socket.h"
Andrew Top0d1858f2019-05-15 22:01:47 -070036#include "starboard/system.h"
37
Andrew Top0d1858f2019-05-15 22:01:47 -070038namespace net {
39
40UDPSocketStarboard::UDPSocketStarboard(DatagramSocket::BindType bind_type,
41 net::NetLog* net_log,
42 const net::NetLogSource& source)
43 : write_async_watcher_(std::make_unique<WriteAsyncWatcher>(this)),
44 sender_(new UDPSocketStarboardSender()),
45 socket_(kSbSocketInvalid),
46 socket_options_(0),
47 bind_type_(bind_type),
Andrew Top0d1858f2019-05-15 22:01:47 -070048 read_buf_len_(0),
49 recv_from_address_(NULL),
50 write_buf_len_(0),
51 net_log_(NetLogWithSource::Make(net_log, NetLogSourceType::UDP_SOCKET)),
52 weak_factory_(this) {
53 net_log_.BeginEvent(NetLogEventType::SOCKET_ALIVE,
54 source.ToEventParametersCallback());
55}
56
57UDPSocketStarboard::~UDPSocketStarboard() {
58 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
59 Close();
60 net_log_.EndEvent(NetLogEventType::SOCKET_ALIVE);
61}
62
63int UDPSocketStarboard::Open(AddressFamily address_family) {
64 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
65 DCHECK(!SbSocketIsValid(socket_));
66
67 address_type_ =
68 (address_family == ADDRESS_FAMILY_IPV6 ? kSbSocketAddressTypeIpv6
69 : kSbSocketAddressTypeIpv4);
70 socket_ = SbSocketCreate(address_type_, kSbSocketProtocolUdp);
71 if (!SbSocketIsValid(socket_)) {
72 return MapLastSystemError();
73 }
74
75 return OK;
76}
77
78void UDPSocketStarboard::Close() {
79 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
80
81 if (socket_ == kSbSocketInvalid)
82 return;
83
84 // Zero out any pending read/write callback state.
85 read_buf_ = NULL;
86 read_buf_len_ = 0;
87 read_callback_.Reset();
88 recv_from_address_ = NULL;
89 write_buf_ = NULL;
90 write_buf_len_ = 0;
91 write_callback_.Reset();
92 send_to_address_.reset();
93
Kaido Kert03affbc2020-06-29 16:29:13 -070094 bool ok = socket_watcher_.StopWatchingSocket();
Andrew Top0d1858f2019-05-15 22:01:47 -070095 DCHECK(ok);
96
97 is_connected_ = false;
98 if (!SbSocketDestroy(socket_)) {
99 DPLOG(ERROR) << "SbSocketDestroy";
100 }
101
102 socket_ = kSbSocketInvalid;
103}
104
105int UDPSocketStarboard::GetPeerAddress(IPEndPoint* address) const {
106 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
107 DCHECK(address);
108 if (!is_connected())
109 return ERR_SOCKET_NOT_CONNECTED;
110
111 DCHECK(remote_address_);
112 *address = *remote_address_;
113 return OK;
114}
115
116int UDPSocketStarboard::GetLocalAddress(IPEndPoint* address) const {
117 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
118 DCHECK(address);
119 if (!is_connected())
120 return ERR_SOCKET_NOT_CONNECTED;
121
122 if (!local_address_.get()) {
123 SbSocketAddress address;
124 if (!SbSocketGetLocalAddress(socket_, &address))
125 return MapLastSocketError(socket_);
126 std::unique_ptr<IPEndPoint> endpoint(new IPEndPoint());
127 if (!endpoint->FromSbSocketAddress(&address))
128 return ERR_FAILED;
129 local_address_.reset(endpoint.release());
130 }
131
132 *address = *local_address_;
133 return OK;
134}
135
136int UDPSocketStarboard::Read(IOBuffer* buf,
137 int buf_len,
138 CompletionOnceCallback callback) {
139 return RecvFrom(buf, buf_len, NULL, std::move(callback));
140}
141
142int UDPSocketStarboard::RecvFrom(IOBuffer* buf,
143 int buf_len,
144 IPEndPoint* address,
145 CompletionOnceCallback callback) {
146 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
147 DCHECK_NE(kSbSocketInvalid, socket_);
148 DCHECK(read_callback_.is_null());
149 DCHECK(!recv_from_address_);
150 DCHECK(!callback.is_null()); // Synchronous operation not supported
151 DCHECK_GT(buf_len, 0);
152
153 int nread = InternalRecvFrom(buf, buf_len, address);
154 if (nread != ERR_IO_PENDING)
155 return nread;
156
157 if (!base::MessageLoopForIO::current()->Watch(
158 socket_, true, base::MessageLoopCurrentForIO::WATCH_READ,
Kaido Kert03affbc2020-06-29 16:29:13 -0700159 &socket_watcher_, this)) {
Andrew Top0d1858f2019-05-15 22:01:47 -0700160 PLOG(ERROR) << "WatchSocket failed on read";
161 Error result = MapLastSocketError(socket_);
Andrew Topf058e8f2019-08-19 15:14:57 -0700162 if (result == ERR_IO_PENDING) {
163 // Watch(...) might call SbSocketWaiterAdd() which does not guarantee
164 // setting system error on failure, but we need to treat this as an
165 // error since watching the socket failed.
166 result = ERR_FAILED;
167 }
Andrew Top0d1858f2019-05-15 22:01:47 -0700168 LogRead(result, NULL, NULL);
169 return result;
170 }
171
172 read_buf_ = buf;
173 read_buf_len_ = buf_len;
174 recv_from_address_ = address;
175 read_callback_ = std::move(callback);
176 return ERR_IO_PENDING;
177}
178
179int UDPSocketStarboard::Write(IOBuffer* buf,
180 int buf_len,
181 CompletionOnceCallback callback,
182 const NetworkTrafficAnnotationTag&) {
Chad Duffinac9ac062019-07-23 10:06:45 -0700183 DCHECK(remote_address_);
184 return SendToOrWrite(buf, buf_len, remote_address_.get(),
185 std::move(callback));
Andrew Top0d1858f2019-05-15 22:01:47 -0700186}
187
188int UDPSocketStarboard::SendTo(IOBuffer* buf,
189 int buf_len,
190 const IPEndPoint& address,
191 CompletionOnceCallback callback) {
192 return SendToOrWrite(buf, buf_len, &address, std::move(callback));
193}
194
195int UDPSocketStarboard::SendToOrWrite(IOBuffer* buf,
196 int buf_len,
197 const IPEndPoint* address,
198 CompletionOnceCallback callback) {
199 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
200 DCHECK(SbSocketIsValid(socket_));
201 DCHECK(write_callback_.is_null());
202 DCHECK(!callback.is_null()); // Synchronous operation not supported
203 DCHECK_GT(buf_len, 0);
204
205 int result = InternalSendTo(buf, buf_len, address);
206 if (result != ERR_IO_PENDING)
207 return result;
208
209 if (!base::MessageLoopForIO::current()->Watch(
210 socket_, true, base::MessageLoopCurrentForIO::WATCH_WRITE,
Kaido Kert03affbc2020-06-29 16:29:13 -0700211 &socket_watcher_, this)) {
Andrew Top0d1858f2019-05-15 22:01:47 -0700212 DVLOG(1) << "Watch failed on write, error "
213 << SbSocketGetLastError(socket_);
214 Error result = MapLastSocketError(socket_);
215 LogWrite(result, NULL, NULL);
216 return result;
217 }
218
219 write_buf_ = buf;
220 write_buf_len_ = buf_len;
221 DCHECK(!send_to_address_.get());
222 if (address) {
223 send_to_address_.reset(new IPEndPoint(*address));
224 }
225 write_callback_ = std::move(callback);
226 return ERR_IO_PENDING;
227}
228
229int UDPSocketStarboard::Connect(const IPEndPoint& address) {
230 DCHECK(SbSocketIsValid(socket_));
231 net_log_.BeginEvent(
232 NetLogEventType::UDP_CONNECT,
233 CreateNetLogUDPConnectCallback(
234 &address, NetworkChangeNotifier::kInvalidNetworkHandle));
235 int rv = InternalConnect(address);
236 is_connected_ = (rv == OK);
237 net_log_.EndEventWithNetErrorCode(NetLogEventType::UDP_CONNECT, rv);
238 return rv;
239}
240
241int UDPSocketStarboard::InternalConnect(const IPEndPoint& address) {
242 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
243 DCHECK(SbSocketIsValid(socket_));
244 DCHECK(!is_connected());
245 DCHECK(!remote_address_.get());
246
247 int rv = 0;
Chad Duffinac9ac062019-07-23 10:06:45 -0700248 // Cobalt does random bind despite bind_type_ because we do not connect
249 // UDP sockets but Chromium does. And if a socket does recvfrom() without
250 // any sendto() before, it needs to be bound to have a local port.
251 rv = RandomBind(address.GetFamily() == ADDRESS_FAMILY_IPV4 ?
252 IPAddress::IPv4AllZeros() : IPAddress::IPv6AllZeros());
Andrew Top0d1858f2019-05-15 22:01:47 -0700253
254 if (rv != OK)
255 return rv;
256
Andrew Top0d1858f2019-05-15 22:01:47 -0700257 remote_address_.reset(new IPEndPoint(address));
258
259 return OK;
260}
261
262int UDPSocketStarboard::Bind(const IPEndPoint& address) {
263 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
264 DCHECK(SbSocketIsValid(socket_));
265 DCHECK(!is_connected());
266
267 int rv = DoBind(address);
268 if (rv != OK)
269 return rv;
270 local_address_.reset();
271 is_connected_ = true;
272 return OK;
273}
274
275int UDPSocketStarboard::BindToNetwork(
276 NetworkChangeNotifier::NetworkHandle network) {
277 NOTIMPLEMENTED();
278 return ERR_NOT_IMPLEMENTED;
279}
280
281int UDPSocketStarboard::SetReceiveBufferSize(int32_t size) {
282 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
283 DCHECK(SbSocketIsValid(socket_));
284
285 int result = OK;
286 if (!SbSocketSetReceiveBufferSize(socket_, size)) {
287 result = MapLastSocketError(socket_);
288 }
289 DCHECK_EQ(result, OK) << "Could not " << __FUNCTION__ << ": "
290 << SbSocketGetLastError(socket_);
291 return result;
292}
293
294int UDPSocketStarboard::SetSendBufferSize(int32_t size) {
295 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
296 DCHECK(SbSocketIsValid(socket_));
297
298 int result = OK;
299 if (!SbSocketSetSendBufferSize(socket_, size)) {
300 result = MapLastSocketError(socket_);
301 }
302 DCHECK_EQ(result, OK) << "Could not " << __FUNCTION__ << ": "
303 << SbSocketGetLastError(socket_);
304 return result;
305}
306
307int UDPSocketStarboard::AllowAddressReuse() {
308 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
309 DCHECK(!is_connected());
310 DCHECK(SbSocketIsValid(socket_));
311
Chad Duffinac9ac062019-07-23 10:06:45 -0700312 return SbSocketSetReuseAddress(socket_, true) ? OK : ERR_FAILED;
Andrew Top0d1858f2019-05-15 22:01:47 -0700313}
314
315int UDPSocketStarboard::SetBroadcast(bool broadcast) {
316 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
317 DCHECK(!is_connected());
318 DCHECK(SbSocketIsValid(socket_));
319
Chad Duffinac9ac062019-07-23 10:06:45 -0700320 return SbSocketSetBroadcast(socket_, broadcast) ? OK : ERR_FAILED;
Andrew Top0d1858f2019-05-15 22:01:47 -0700321}
322
Kaido Kert03affbc2020-06-29 16:29:13 -0700323void UDPSocketStarboard::OnSocketReadyToRead(SbSocket /*socket*/) {
324 if (!read_callback_.is_null())
325 DidCompleteRead();
Andrew Top0d1858f2019-05-15 22:01:47 -0700326}
327
Kaido Kert03affbc2020-06-29 16:29:13 -0700328void UDPSocketStarboard::OnSocketReadyToWrite(SbSocket socket) {
329 if (write_async_watcher_->watching()) {
330 write_async_watcher_->OnSocketReadyToWrite(socket);
331 return;
332 }
333
334 if (!write_callback_.is_null())
335 DidCompleteWrite();
Andrew Top0d1858f2019-05-15 22:01:47 -0700336}
337
338void UDPSocketStarboard::WriteAsyncWatcher::OnSocketReadyToWrite(
339 SbSocket /*socket*/) {
340 DVLOG(1) << __func__ << " queue " << socket_->pending_writes_.size()
341 << " out of " << socket_->write_async_outstanding_ << " total";
342 socket_->StopWatchingSocket();
343 socket_->FlushPending();
344}
345
346void UDPSocketStarboard::DoReadCallback(int rv) {
347 DCHECK_NE(rv, ERR_IO_PENDING);
348 DCHECK(!read_callback_.is_null());
349
350 // since Run may result in Read being called, clear read_callback_ up front.
351 CompletionOnceCallback c = std::move(read_callback_);
352 read_callback_.Reset();
353 std::move(c).Run(rv);
354}
355
356void UDPSocketStarboard::DoWriteCallback(int rv) {
357 DCHECK_NE(rv, ERR_IO_PENDING);
358 DCHECK(!write_callback_.is_null());
359
360 // Run may result in Write being called.
361 base::ResetAndReturn(&write_callback_).Run(rv);
362}
363
364void UDPSocketStarboard::DidCompleteRead() {
365 int result = InternalRecvFrom(read_buf_, read_buf_len_, recv_from_address_);
366 if (result != ERR_IO_PENDING) {
367 read_buf_ = NULL;
368 read_buf_len_ = 0;
369 recv_from_address_ = NULL;
Kaido Kert03affbc2020-06-29 16:29:13 -0700370 InternalStopWatchingSocket();
Andrew Top0d1858f2019-05-15 22:01:47 -0700371 DoReadCallback(result);
372 }
373}
374
375void UDPSocketStarboard::LogRead(int result,
376 const char* bytes,
377 const IPEndPoint* address) const {
378 if (result < 0) {
379 net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_RECEIVE_ERROR,
380 result);
381 return;
382 }
383
384 if (net_log_.IsCapturing()) {
385 net_log_.AddEvent(
386 NetLogEventType::UDP_BYTES_RECEIVED,
387 CreateNetLogUDPDataTranferCallback(result, bytes, address));
388 }
389
390 NetworkActivityMonitor::GetInstance()->IncrementBytesReceived(result);
391}
392
393void UDPSocketStarboard::DidCompleteWrite() {
394 int result =
395 InternalSendTo(write_buf_, write_buf_len_, send_to_address_.get());
396
397 if (result != ERR_IO_PENDING) {
398 write_buf_ = NULL;
399 write_buf_len_ = 0;
400 send_to_address_.reset();
Kaido Kert03affbc2020-06-29 16:29:13 -0700401 InternalStopWatchingSocket();
Andrew Top0d1858f2019-05-15 22:01:47 -0700402 DoWriteCallback(result);
403 }
404}
405
406void UDPSocketStarboard::LogWrite(int result,
407 const char* bytes,
408 const IPEndPoint* address) const {
409 if (result < 0) {
410 net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_SEND_ERROR, result);
411 return;
412 }
413
414 if (net_log_.IsCapturing()) {
415 net_log_.AddEvent(
416 NetLogEventType::UDP_BYTES_SENT,
417 CreateNetLogUDPDataTranferCallback(result, bytes, address));
418 }
419
420 NetworkActivityMonitor::GetInstance()->IncrementBytesSent(result);
421}
422
423int UDPSocketStarboard::InternalRecvFrom(IOBuffer* buf,
424 int buf_len,
425 IPEndPoint* address) {
426 SbSocketAddress sb_address;
427 int bytes_transferred =
428 SbSocketReceiveFrom(socket_, buf->data(), buf_len, &sb_address);
429 int result;
430 if (bytes_transferred >= 0) {
431 result = bytes_transferred;
Chad Duffinac9ac062019-07-23 10:06:45 -0700432 // Passing in NULL address is allowed. This is only to align with other
433 // platform's implementation.
434 if (address && !address->FromSbSocketAddress(&sb_address)) {
435 result = ERR_ADDRESS_INVALID;
Andrew Top0d1858f2019-05-15 22:01:47 -0700436 }
437 } else {
438 result = MapLastSocketError(socket_);
439 }
440
441 if (result != ERR_IO_PENDING) {
442 IPEndPoint log_address;
Andrew Topa7b1cfa2019-12-18 19:15:07 -0800443 if (result < 0 || !log_address.FromSbSocketAddress(&sb_address)) {
Andrew Top0d1858f2019-05-15 22:01:47 -0700444 LogRead(result, buf->data(), NULL);
445 } else {
446 LogRead(result, buf->data(), &log_address);
447 }
448 }
449
450 return result;
451}
452
453int UDPSocketStarboard::InternalSendTo(IOBuffer* buf,
454 int buf_len,
455 const IPEndPoint* address) {
456 SbSocketAddress sb_address;
457 if (!address || !address->ToSbSocketAddress(&sb_address)) {
458 int result = ERR_FAILED;
459 LogWrite(result, NULL, NULL);
460 return result;
461 }
462
463 int result = SbSocketSendTo(socket_, buf->data(), buf_len, &sb_address);
464
465 if (result < 0)
466 result = MapLastSocketError(socket_);
467
468 if (result != ERR_IO_PENDING)
469 LogWrite(result, buf->data(), address);
470
471 return result;
472}
473
474int UDPSocketStarboard::DoBind(const IPEndPoint& address) {
475 SbSocketAddress sb_address;
476 if (!address.ToSbSocketAddress(&sb_address)) {
477 return ERR_UNEXPECTED;
478 }
479
480 SbSocketError rv = SbSocketBind(socket_, &sb_address);
481 return rv != kSbSocketOk ? MapLastSystemError() : OK;
482}
483
484int UDPSocketStarboard::RandomBind(const IPAddress& address) {
Andrew Top0d1858f2019-05-15 22:01:47 -0700485 return DoBind(IPEndPoint(address, 0));
486}
487
488int UDPSocketStarboard::JoinGroup(const IPAddress& group_address) const {
489 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
490 if (!is_connected())
491 return ERR_SOCKET_NOT_CONNECTED;
492
493 SbSocketAddress sb_address = {0};
494 if (!IPEndPoint(group_address, 0).ToSbSocketAddress(&sb_address)) {
495 return ERR_ADDRESS_INVALID;
496 }
497
498 if (!SbSocketJoinMulticastGroup(socket_, &sb_address)) {
499 LOG(WARNING) << "SbSocketJoinMulticastGroup failed on UDP socket.";
500 return MapLastSocketError(socket_);
501 }
502 return OK;
503}
504
505int UDPSocketStarboard::LeaveGroup(const IPAddress& group_address) const {
506 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
507 if (!is_connected())
508 return ERR_SOCKET_NOT_CONNECTED;
509
510 DCHECK(false) << "Not supported on Starboard.";
511 return ERR_FAILED;
512}
513
514int UDPSocketStarboard::SetMulticastInterface(uint32_t interface_index) {
515 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
516 if (is_connected())
517 return ERR_SOCKET_IS_CONNECTED;
518 DCHECK_EQ(0, interface_index)
519 << "Only the default multicast interface is supported on Starboard.";
520 return interface_index == 0 ? OK : ERR_FAILED;
521}
522
523int UDPSocketStarboard::SetMulticastTimeToLive(int time_to_live) {
524 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
525 if (is_connected())
526 return ERR_SOCKET_IS_CONNECTED;
527
528 DCHECK(false) << "Not supported on Starboard.";
529 return ERR_FAILED;
530}
531
532int UDPSocketStarboard::SetMulticastLoopbackMode(bool loopback) {
533 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
534 if (is_connected())
535 return ERR_SOCKET_IS_CONNECTED;
536
537 DCHECK(false) << "Not supported on Starboard.";
538 return ERR_FAILED;
539}
540
541int UDPSocketStarboard::SetDiffServCodePoint(DiffServCodePoint dscp) {
542 NOTIMPLEMENTED();
543 return OK;
544}
545
546void UDPSocketStarboard::DetachFromThread() {
547 DETACH_FROM_THREAD(thread_checker_);
548}
549
550void UDPSocketStarboard::ApplySocketTag(const SocketTag&) {
551 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
Chad Duffinac9ac062019-07-23 10:06:45 -0700552 // SocketTag is not applicable to Starboard, see socket_tag.h for more info.
553 NOTIMPLEMENTED_LOG_ONCE();
Andrew Top0d1858f2019-05-15 22:01:47 -0700554}
555
556UDPSocketStarboardSender::UDPSocketStarboardSender() {}
557UDPSocketStarboardSender::~UDPSocketStarboardSender() {}
558
559SendResult::SendResult() : rv(0), write_count(0) {}
560SendResult::~SendResult() {}
561SendResult::SendResult(int _rv, int _write_count, DatagramBuffers _buffers)
562 : rv(_rv), write_count(_write_count), buffers(std::move(_buffers)) {}
563SendResult::SendResult(SendResult&& other) = default;
564
565SendResult UDPSocketStarboardSender::InternalSendBuffers(
566 const SbSocket& socket,
567 DatagramBuffers buffers,
568 SbSocketAddress address) const {
569 int rv = 0;
570 int write_count = 0;
571 for (auto& buffer : buffers) {
572 int result = Send(socket, buffer->data(), buffer->length(), address);
573 if (result < 0) {
574 rv = MapLastSocketError(socket);
575 break;
576 }
577 write_count++;
578 }
579 return SendResult(rv, write_count, std::move(buffers));
580}
581
582SendResult UDPSocketStarboardSender::SendBuffers(const SbSocket& socket,
583 DatagramBuffers buffers,
584 SbSocketAddress address) {
585 return InternalSendBuffers(socket, std::move(buffers), address);
586}
587
588int UDPSocketStarboardSender::Send(const SbSocket& socket,
589 const char* buf,
590 size_t len,
591 SbSocketAddress address) const {
592 return SbSocketSendTo(socket, buf, len, &address);
593}
594
595int UDPSocketStarboard::WriteAsync(
596 const char* buffer,
597 size_t buf_len,
598 CompletionOnceCallback callback,
599 const NetworkTrafficAnnotationTag& traffic_annotation) {
600 DCHECK(datagram_buffer_pool_ != nullptr);
601 IncreaseWriteAsyncOutstanding(1);
602 datagram_buffer_pool_->Enqueue(buffer, buf_len, &pending_writes_);
603 return InternalWriteAsync(std::move(callback), traffic_annotation);
604}
605
606int UDPSocketStarboard::WriteAsync(
607 DatagramBuffers buffers,
608 CompletionOnceCallback callback,
609 const NetworkTrafficAnnotationTag& traffic_annotation) {
610 IncreaseWriteAsyncOutstanding(buffers.size());
611 pending_writes_.splice(pending_writes_.end(), std::move(buffers));
612 return InternalWriteAsync(std::move(callback), traffic_annotation);
613}
614
615int UDPSocketStarboard::InternalWriteAsync(
616 CompletionOnceCallback callback,
617 const NetworkTrafficAnnotationTag& traffic_annotation) {
618 CHECK(write_callback_.is_null());
619
620 // Surface error immediately if one is pending.
621 if (last_async_result_ < 0) {
622 return ResetLastAsyncResult();
623 }
624
625 size_t flush_threshold =
626 write_batching_active_ ? kWriteAsyncPostBuffersThreshold : 1;
627 if (pending_writes_.size() >= flush_threshold) {
628 FlushPending();
629 // Surface error immediately if one is pending.
630 if (last_async_result_ < 0) {
631 return ResetLastAsyncResult();
632 }
633 }
634
635 if (!write_async_timer_running_) {
636 write_async_timer_running_ = true;
637 write_async_timer_.Start(FROM_HERE, kWriteAsyncMsThreshold, this,
638 &UDPSocketStarboard::OnWriteAsyncTimerFired);
639 }
640
641 int blocking_threshold =
642 write_batching_active_ ? kWriteAsyncMaxBuffersThreshold : 1;
643 if (write_async_outstanding_ >= blocking_threshold) {
644 write_callback_ = std::move(callback);
645 return ERR_IO_PENDING;
646 }
647
648 DVLOG(2) << __func__ << " pending " << pending_writes_.size()
649 << " outstanding " << write_async_outstanding_;
650 return ResetWrittenBytes();
651}
652
653DatagramBuffers UDPSocketStarboard::GetUnwrittenBuffers() {
654 write_async_outstanding_ -= pending_writes_.size();
655 return std::move(pending_writes_);
656}
657
658void UDPSocketStarboard::FlushPending() {
659 // Nothing to do if socket is blocked.
660 if (write_async_watcher_->watching())
661 return;
662
663 if (pending_writes_.empty())
664 return;
665
666 if (write_async_timer_running_)
667 write_async_timer_.Reset();
668
669 int num_pending_writes = static_cast<int>(pending_writes_.size());
670 if (!write_multi_core_enabled_ ||
671 // Don't bother with post if not enough buffers
672 (num_pending_writes <= kWriteAsyncMinBuffersThreshold &&
673 // but not if there is a previous post
674 // outstanding, to prevent out of order transmission.
675 (num_pending_writes == write_async_outstanding_))) {
676 LocalSendBuffers();
677 } else {
678 PostSendBuffers();
679 }
680}
681
682// TODO(ckrasic) Sad face. Do this lazily because many tests exploded
683// otherwise. |threading_and_tasks.md| advises to instantiate a
684// |base::test::ScopedTaskEnvironment| in the test, implementing that
685// for all tests that might exercise QUIC is too daunting. Also, in
686// some tests it seemed like following the advice just broke in other
687// ways.
688base::SequencedTaskRunner* UDPSocketStarboard::GetTaskRunner() {
689 if (task_runner_ == nullptr) {
690 task_runner_ = CreateSequencedTaskRunnerWithTraits(base::TaskTraits());
691 }
692 return task_runner_.get();
693}
694
695void UDPSocketStarboard::OnWriteAsyncTimerFired() {
696 DVLOG(2) << __func__ << " pending writes " << pending_writes_.size();
697 if (pending_writes_.empty()) {
698 write_async_timer_.Stop();
699 write_async_timer_running_ = false;
700 return;
701 }
702 if (last_async_result_ < 0) {
703 DVLOG(1) << __func__ << " socket not writeable";
704 return;
705 }
706 FlushPending();
707}
708
709void UDPSocketStarboard::LocalSendBuffers() {
710 DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
711 << write_async_outstanding_ << " total";
712 SbSocketAddress sb_address;
713 int result = remote_address_.get()->ToSbSocketAddress(&sb_address);
714 DCHECK(result);
715 DidSendBuffers(
716 sender_->SendBuffers(socket_, std::move(pending_writes_), sb_address));
717}
718
719void UDPSocketStarboard::PostSendBuffers() {
720 DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
721 << write_async_outstanding_ << " total";
722 SbSocketAddress sb_address;
723 DCHECK(remote_address_.get()->ToSbSocketAddress(&sb_address));
724 base::PostTaskAndReplyWithResult(
725 GetTaskRunner(), FROM_HERE,
726 base::BindOnce(&UDPSocketStarboardSender::SendBuffers, sender_, socket_,
727 std::move(pending_writes_), sb_address),
728 base::BindOnce(&UDPSocketStarboard::DidSendBuffers,
729 weak_factory_.GetWeakPtr()));
730}
731
732void UDPSocketStarboard::DidSendBuffers(SendResult send_result) {
733 DVLOG(3) << __func__;
734 int write_count = send_result.write_count;
735 DatagramBuffers& buffers = send_result.buffers;
736
737 DCHECK(!buffers.empty());
738 int num_buffers = buffers.size();
739
740 // Dequeue buffers that have been written.
741 if (write_count > 0) {
742 write_async_outstanding_ -= write_count;
743
744 DatagramBuffers::iterator it;
745 // Generate logs for written buffers
746 it = buffers.begin();
747 for (int i = 0; i < write_count; i++, it++) {
748 auto& buffer = *it;
749 LogWrite(buffer->length(), buffer->data(), NULL);
750 written_bytes_ += buffer->length();
751 }
752 // Return written buffers to pool
753 DatagramBuffers written_buffers;
754 if (write_count == num_buffers) {
755 it = buffers.end();
756 } else {
757 it = buffers.begin();
758 for (int i = 0; i < write_count; i++) {
759 it++;
760 }
761 }
762 written_buffers.splice(written_buffers.end(), buffers, buffers.begin(), it);
763 DCHECK(datagram_buffer_pool_ != nullptr);
764 datagram_buffer_pool_->Dequeue(&written_buffers);
765 }
766
767 // Requeue left-over (unwritten) buffers.
768 if (!buffers.empty()) {
769 DVLOG(2) << __func__ << " requeue " << buffers.size() << " buffers";
770 pending_writes_.splice(pending_writes_.begin(), std::move(buffers));
771 }
772
773 last_async_result_ = send_result.rv;
774 if (last_async_result_ == ERR_IO_PENDING) {
775 DVLOG(2) << __func__ << " WatchSocket start";
776 if (!WatchSocket()) {
777 last_async_result_ = MapLastSocketError(socket_);
778 DVLOG(1) << "WatchSocket failed on write, error: " << last_async_result_;
779 LogWrite(last_async_result_, NULL, NULL);
780 } else {
781 last_async_result_ = 0;
782 }
783 } else if (last_async_result_ < 0 || pending_writes_.empty()) {
784 DVLOG(2) << __func__ << " WatchSocket stop: result "
785 << ErrorToShortString(last_async_result_) << " pending_writes "
786 << pending_writes_.size();
787 StopWatchingSocket();
788 }
789 DCHECK(last_async_result_ != ERR_IO_PENDING);
790
791 if (write_callback_.is_null())
792 return;
793
794 if (last_async_result_ < 0) {
795 DVLOG(1) << last_async_result_;
796 // Update the writer with the latest result.
797 DoWriteCallback(ResetLastAsyncResult());
798 } else if (write_async_outstanding_ < kWriteAsyncCallbackBuffersThreshold) {
799 DVLOG(1) << write_async_outstanding_ << " < "
800 << kWriteAsyncCallbackBuffersThreshold;
801 DoWriteCallback(ResetWrittenBytes());
802 }
803}
804
805int UDPSocketStarboard::SetDoNotFragment() {
806 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
807 DCHECK(SbSocketIsValid(socket_));
808
Andrew Topf058e8f2019-08-19 15:14:57 -0700809 // Starboard does not supported sending non-fragmented packets yet.
810 // All QUIC Streams call this function at initialization, setting sockets to
811 // send non-fragmented packets may have a slight performance boost.
Andrew Top0d1858f2019-05-15 22:01:47 -0700812 return ERR_NOT_IMPLEMENTED;
813}
814
815void UDPSocketStarboard::SetMsgConfirm(bool confirm) {
816 NOTIMPLEMENTED();
817}
818
819bool UDPSocketStarboard::WatchSocket() {
820 if (write_async_watcher_->watching())
821 return true;
822 bool result = InternalWatchSocket();
823 if (result) {
824 write_async_watcher_->set_watching(true);
825 }
826 return result;
827}
828
829void UDPSocketStarboard::StopWatchingSocket() {
Kaido Kert6f4c7af2020-07-31 15:06:27 -0700830 if (!write_async_watcher_->watching())
Andrew Top0d1858f2019-05-15 22:01:47 -0700831 return;
Andrew Top0d1858f2019-05-15 22:01:47 -0700832 write_async_watcher_->set_watching(false);
Kaido Kert6f4c7af2020-07-31 15:06:27 -0700833 InternalStopWatchingSocket();
Andrew Top0d1858f2019-05-15 22:01:47 -0700834}
835
836bool UDPSocketStarboard::InternalWatchSocket() {
837 return base::MessageLoopForIO::current()->Watch(
838 socket_, true, base::MessageLoopCurrentForIO::WATCH_WRITE,
Kaido Kert03affbc2020-06-29 16:29:13 -0700839 &socket_watcher_, this);
Andrew Top0d1858f2019-05-15 22:01:47 -0700840}
841
842void UDPSocketStarboard::InternalStopWatchingSocket() {
Kaido Kert6f4c7af2020-07-31 15:06:27 -0700843 if (!read_buf_ && !write_buf_ && !write_async_watcher_->watching()) {
844 bool ok = socket_watcher_.StopWatchingSocket();
845 DCHECK(ok);
846 }
Andrew Top0d1858f2019-05-15 22:01:47 -0700847}
848
849void UDPSocketStarboard::SetMaxPacketSize(size_t max_packet_size) {
850 datagram_buffer_pool_ = std::make_unique<DatagramBufferPool>(max_packet_size);
851}
852
853int UDPSocketStarboard::ResetLastAsyncResult() {
854 int result = last_async_result_;
855 last_async_result_ = 0;
856 return result;
857}
858
859int UDPSocketStarboard::ResetWrittenBytes() {
860 int bytes = written_bytes_;
861 written_bytes_ = 0;
862 return bytes;
863}
864
865} // namespace net