blob: 4eed2702307374478b14f852d7204d8b54d230db [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/cast/test/utility/udp_proxy.h"
#include <math.h>
#include <stdlib.h>
#include <deque>
#include <utility>
#include <vector>
#include "base/logging.h"
#include "base/macros.h"
#include "base/rand_util.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/thread.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/time/default_tick_clock.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/log/net_log_source.h"
#include "net/udp/udp_server_socket.h"
namespace media {
namespace cast {
namespace test {
const size_t kMaxPacketSize = 65536;
PacketPipe::PacketPipe() {}
PacketPipe::~PacketPipe() {}
void PacketPipe::InitOnIOThread(
const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
base::TickClock* clock) {
task_runner_ = task_runner;
clock_ = clock;
if (pipe_) {
pipe_->InitOnIOThread(task_runner, clock);
}
}
void PacketPipe::AppendToPipe(std::unique_ptr<PacketPipe> pipe) {
if (pipe_) {
pipe_->AppendToPipe(std::move(pipe));
} else {
pipe_ = std::move(pipe);
}
}
// Roughly emulates a buffer inside a device.
// If the buffer is full, packets are dropped.
// Packets are output at a maximum bandwidth.
class Buffer : public PacketPipe {
public:
Buffer(size_t buffer_size, double max_megabits_per_second)
: buffer_size_(0),
max_buffer_size_(buffer_size),
max_megabits_per_second_(max_megabits_per_second),
weak_factory_(this) {
CHECK_GT(max_buffer_size_, 0UL);
CHECK_GT(max_megabits_per_second, 0);
}
void Send(std::unique_ptr<Packet> packet) final {
if (packet->size() + buffer_size_ <= max_buffer_size_) {
buffer_size_ += packet->size();
buffer_.push_back(linked_ptr<Packet>(packet.release()));
if (buffer_.size() == 1) {
Schedule();
}
}
}
private:
void Schedule() {
last_schedule_ = clock_->NowTicks();
double megabits = buffer_.front()->size() * 8 / 1000000.0;
double seconds = megabits / max_megabits_per_second_;
int64_t microseconds = static_cast<int64_t>(seconds * 1E6);
task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
base::TimeDelta::FromMicroseconds(microseconds));
}
void ProcessBuffer() {
int64_t bytes_to_send = static_cast<int64_t>(
(clock_->NowTicks() - last_schedule_).InSecondsF() *
max_megabits_per_second_ * 1E6 / 8);
if (bytes_to_send < static_cast<int64_t>(buffer_.front()->size())) {
bytes_to_send = buffer_.front()->size();
}
while (!buffer_.empty() &&
static_cast<int64_t>(buffer_.front()->size()) <= bytes_to_send) {
CHECK(!buffer_.empty());
std::unique_ptr<Packet> packet(buffer_.front().release());
bytes_to_send -= packet->size();
buffer_size_ -= packet->size();
buffer_.pop_front();
pipe_->Send(std::move(packet));
}
if (!buffer_.empty()) {
Schedule();
}
}
std::deque<linked_ptr<Packet> > buffer_;
base::TimeTicks last_schedule_;
size_t buffer_size_;
size_t max_buffer_size_;
double max_megabits_per_second_; // megabits per second
base::WeakPtrFactory<Buffer> weak_factory_;
};
std::unique_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
return std::unique_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth));
}
class RandomDrop : public PacketPipe {
public:
explicit RandomDrop(double drop_fraction)
: drop_fraction_(static_cast<int>(drop_fraction * RAND_MAX)) {}
void Send(std::unique_ptr<Packet> packet) final {
if (rand() > drop_fraction_) {
pipe_->Send(std::move(packet));
}
}
private:
int drop_fraction_;
};
std::unique_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
return std::unique_ptr<PacketPipe>(new RandomDrop(drop_fraction));
}
class SimpleDelayBase : public PacketPipe {
public:
SimpleDelayBase() : weak_factory_(this) {}
~SimpleDelayBase() override {}
void Send(std::unique_ptr<Packet> packet) override {
double seconds = GetDelay();
task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(&SimpleDelayBase::SendInternal, weak_factory_.GetWeakPtr(),
base::Passed(&packet)),
base::TimeDelta::FromMicroseconds(static_cast<int64_t>(seconds * 1E6)));
}
protected:
virtual double GetDelay() = 0;
private:
virtual void SendInternal(std::unique_ptr<Packet> packet) {
pipe_->Send(std::move(packet));
}
base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
};
class ConstantDelay : public SimpleDelayBase {
public:
explicit ConstantDelay(double delay_seconds)
: delay_seconds_(delay_seconds) {}
double GetDelay() final { return delay_seconds_; }
private:
double delay_seconds_;
};
std::unique_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
return std::unique_ptr<PacketPipe>(new ConstantDelay(delay_seconds));
}
class RandomUnsortedDelay : public SimpleDelayBase {
public:
explicit RandomUnsortedDelay(double random_delay)
: random_delay_(random_delay) {}
double GetDelay() override { return random_delay_ * base::RandDouble(); }
private:
double random_delay_;
};
std::unique_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
return std::unique_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay));
}
class DuplicateAndDelay : public RandomUnsortedDelay {
public:
DuplicateAndDelay(double delay_min, double random_delay)
: RandomUnsortedDelay(random_delay), delay_min_(delay_min) {}
void Send(std::unique_ptr<Packet> packet) final {
pipe_->Send(std::unique_ptr<Packet>(new Packet(*packet.get())));
RandomUnsortedDelay::Send(std::move(packet));
}
double GetDelay() final {
return RandomUnsortedDelay::GetDelay() + delay_min_;
}
private:
double delay_min_;
};
std::unique_ptr<PacketPipe> NewDuplicateAndDelay(double delay_min,
double random_delay) {
return std::unique_ptr<PacketPipe>(
new DuplicateAndDelay(delay_min, random_delay));
}
class RandomSortedDelay : public PacketPipe {
public:
RandomSortedDelay(double random_delay, double extra_delay,
double seconds_between_extra_delay)
: random_delay_(random_delay),
extra_delay_(extra_delay),
seconds_between_extra_delay_(seconds_between_extra_delay),
weak_factory_(this) {}
void Send(std::unique_ptr<Packet> packet) final {
buffer_.push_back(linked_ptr<Packet>(packet.release()));
if (buffer_.size() == 1) {
next_send_ = std::max(
clock_->NowTicks() +
base::TimeDelta::FromSecondsD(base::RandDouble() * random_delay_),
next_send_);
ProcessBuffer();
}
}
void InitOnIOThread(
const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
base::TickClock* clock) final {
PacketPipe::InitOnIOThread(task_runner, clock);
// As we start the stream, assume that we are in a random
// place between two extra delays, thus multiplier = 1.0;
ScheduleExtraDelay(1.0);
}
private:
void ScheduleExtraDelay(double mult) {
double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
int64_t microseconds = static_cast<int64_t>(seconds * 1E6);
task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&RandomSortedDelay::CauseExtraDelay,
weak_factory_.GetWeakPtr()),
base::TimeDelta::FromMicroseconds(microseconds));
}
void CauseExtraDelay() {
next_send_ = std::max<base::TimeTicks>(
clock_->NowTicks() + base::TimeDelta::FromMicroseconds(
static_cast<int64_t>(extra_delay_ * 1E6)),
next_send_);
// An extra delay just happened, wait up to seconds_between_extra_delay_*2
// before scheduling another one to make the average equal to
// seconds_between_extra_delay_.
ScheduleExtraDelay(2.0);
}
void ProcessBuffer() {
base::TimeTicks now = clock_->NowTicks();
while (!buffer_.empty() && next_send_ <= now) {
std::unique_ptr<Packet> packet(buffer_.front().release());
pipe_->Send(std::move(packet));
buffer_.pop_front();
next_send_ +=
base::TimeDelta::FromSecondsD(base::RandDouble() * random_delay_);
}
if (!buffer_.empty()) {
task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&RandomSortedDelay::ProcessBuffer,
weak_factory_.GetWeakPtr()),
next_send_ - now);
}
}
base::TimeTicks block_until_;
std::deque<linked_ptr<Packet> > buffer_;
double random_delay_;
double extra_delay_;
double seconds_between_extra_delay_;
base::TimeTicks next_send_;
base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
};
std::unique_ptr<PacketPipe> NewRandomSortedDelay(
double random_delay, double extra_delay,
double seconds_between_extra_delay) {
return std::unique_ptr<PacketPipe>(new RandomSortedDelay(
random_delay, extra_delay, seconds_between_extra_delay));
}
class NetworkGlitchPipe : public PacketPipe {
public:
NetworkGlitchPipe(double average_work_time, double average_outage_time)
: works_(false),
max_work_time_(average_work_time * 2),
max_outage_time_(average_outage_time * 2),
weak_factory_(this) {}
void InitOnIOThread(
const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
base::TickClock* clock) final {
PacketPipe::InitOnIOThread(task_runner, clock);
Flip();
}
void Send(std::unique_ptr<Packet> packet) final {
if (works_) {
pipe_->Send(std::move(packet));
}
}
private:
void Flip() {
works_ = !works_;
double seconds =
base::RandDouble() * (works_ ? max_work_time_ : max_outage_time_);
int64_t microseconds = static_cast<int64_t>(seconds * 1E6);
task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()),
base::TimeDelta::FromMicroseconds(microseconds));
}
bool works_;
double max_work_time_;
double max_outage_time_;
base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
};
std::unique_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
double average_outage_time) {
return std::unique_ptr<PacketPipe>(
new NetworkGlitchPipe(average_work_time, average_outage_time));
}
// Internal buffer object for a client of the IPP model.
class InterruptedPoissonProcess::InternalBuffer : public PacketPipe {
public:
InternalBuffer(base::WeakPtr<InterruptedPoissonProcess> ipp, size_t size)
: ipp_(ipp),
stored_size_(0),
stored_limit_(size),
clock_(NULL),
weak_factory_(this) {}
void Send(std::unique_ptr<Packet> packet) final {
// Drop if buffer is full.
if (stored_size_ >= stored_limit_) return;
stored_size_ += packet->size();
buffer_.push_back(linked_ptr<Packet>(packet.release()));
buffer_time_.push_back(clock_->NowTicks());
DCHECK(buffer_.size() == buffer_time_.size());
}
void InitOnIOThread(
const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
base::TickClock* clock) final {
clock_ = clock;
if (ipp_) ipp_->InitOnIOThread(task_runner, clock);
PacketPipe::InitOnIOThread(task_runner, clock);
}
void SendOnePacket() {
std::unique_ptr<Packet> packet(buffer_.front().release());
stored_size_ -= packet->size();
buffer_.pop_front();
buffer_time_.pop_front();
pipe_->Send(std::move(packet));
DCHECK(buffer_.size() == buffer_time_.size());
}
bool Empty() const { return buffer_.empty(); }
base::TimeTicks FirstPacketTime() const {
DCHECK(!buffer_time_.empty());
return buffer_time_.front();
}
base::WeakPtr<InternalBuffer> GetWeakPtr() {
return weak_factory_.GetWeakPtr();
}
private:
const base::WeakPtr<InterruptedPoissonProcess> ipp_;
size_t stored_size_;
const size_t stored_limit_;
std::deque<linked_ptr<Packet> > buffer_;
std::deque<base::TimeTicks> buffer_time_;
base::TickClock* clock_;
base::WeakPtrFactory<InternalBuffer> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(InternalBuffer);
};
InterruptedPoissonProcess::InterruptedPoissonProcess(
const std::vector<double>& average_rates, double coef_burstiness,
double coef_variance, uint32_t rand_seed)
: clock_(NULL),
average_rates_(average_rates),
coef_burstiness_(coef_burstiness),
coef_variance_(coef_variance),
rate_index_(0),
on_state_(true),
weak_factory_(this) {
mt_rand_.init_genrand(rand_seed);
DCHECK(!average_rates.empty());
ComputeRates();
}
InterruptedPoissonProcess::~InterruptedPoissonProcess() {}
void InterruptedPoissonProcess::InitOnIOThread(
const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
base::TickClock* clock) {
// Already initialized and started.
if (task_runner_.get() && clock_) return;
task_runner_ = task_runner;
clock_ = clock;
UpdateRates();
SwitchOn();
SendPacket();
}
std::unique_ptr<PacketPipe> InterruptedPoissonProcess::NewBuffer(size_t size) {
std::unique_ptr<InternalBuffer> buffer(
new InternalBuffer(weak_factory_.GetWeakPtr(), size));
send_buffers_.push_back(buffer->GetWeakPtr());
return std::move(buffer);
}
base::TimeDelta InterruptedPoissonProcess::NextEvent(double rate) {
// Rate is per milliseconds.
// The time until next event is exponentially distributed to the
// inverse of |rate|.
return base::TimeDelta::FromMillisecondsD(
fabs(-log(1.0 - RandDouble()) / rate));
}
double InterruptedPoissonProcess::RandDouble() {
// Generate a 64-bits random number from MT19937 and then convert
// it to double.
uint64_t rand = mt_rand_.genrand_int32();
rand <<= 32;
rand |= mt_rand_.genrand_int32();
return base::BitsToOpenEndedUnitInterval(rand);
}
void InterruptedPoissonProcess::ComputeRates() {
double avg_rate = average_rates_[rate_index_];
send_rate_ = avg_rate / coef_burstiness_;
switch_off_rate_ = 2 * avg_rate * (1 - coef_burstiness_) *
(1 - coef_burstiness_) / coef_burstiness_ /
(coef_variance_ - 1);
switch_on_rate_ =
2 * avg_rate * (1 - coef_burstiness_) / (coef_variance_ - 1);
}
void InterruptedPoissonProcess::UpdateRates() {
ComputeRates();
// Rates are updated once per second.
rate_index_ = (rate_index_ + 1) % average_rates_.size();
task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&InterruptedPoissonProcess::UpdateRates,
weak_factory_.GetWeakPtr()),
base::TimeDelta::FromSeconds(1));
}
void InterruptedPoissonProcess::SwitchOff() {
on_state_ = false;
task_runner_->PostDelayedTask(FROM_HERE,
base::Bind(&InterruptedPoissonProcess::SwitchOn,
weak_factory_.GetWeakPtr()),
NextEvent(switch_on_rate_));
}
void InterruptedPoissonProcess::SwitchOn() {
on_state_ = true;
task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&InterruptedPoissonProcess::SwitchOff,
weak_factory_.GetWeakPtr()),
NextEvent(switch_off_rate_));
}
void InterruptedPoissonProcess::SendPacket() {
task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&InterruptedPoissonProcess::SendPacket,
weak_factory_.GetWeakPtr()),
NextEvent(send_rate_));
// If OFF then don't send.
if (!on_state_) return;
// Find the earliest packet to send.
base::TimeTicks earliest_time;
for (size_t i = 0; i < send_buffers_.size(); ++i) {
if (!send_buffers_[i]) continue;
if (send_buffers_[i]->Empty()) continue;
if (earliest_time.is_null() ||
send_buffers_[i]->FirstPacketTime() < earliest_time)
earliest_time = send_buffers_[i]->FirstPacketTime();
}
for (size_t i = 0; i < send_buffers_.size(); ++i) {
if (!send_buffers_[i]) continue;
if (send_buffers_[i]->Empty()) continue;
if (send_buffers_[i]->FirstPacketTime() != earliest_time) continue;
send_buffers_[i]->SendOnePacket();
break;
}
}
class UDPProxyImpl;
class PacketSender : public PacketPipe {
public:
PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
: udp_proxy_(udp_proxy), destination_(destination) {}
void Send(std::unique_ptr<Packet> packet) final;
void AppendToPipe(std::unique_ptr<PacketPipe> pipe) final { NOTREACHED(); }
private:
UDPProxyImpl* udp_proxy_;
const net::IPEndPoint* destination_; // not owned
};
namespace {
void BuildPipe(std::unique_ptr<PacketPipe>* pipe, PacketPipe* next) {
if (*pipe) {
(*pipe)->AppendToPipe(std::unique_ptr<PacketPipe>(next));
} else {
pipe->reset(next);
}
}
} // namespace
std::unique_ptr<PacketPipe> GoodNetwork() {
// This represents the buffer on the sender.
std::unique_ptr<PacketPipe> pipe;
BuildPipe(&pipe, new Buffer(2 << 20, 50));
BuildPipe(&pipe, new ConstantDelay(1E-3));
BuildPipe(&pipe, new RandomSortedDelay(1E-3, 2E-3, 3));
// This represents the buffer on the receiving device.
BuildPipe(&pipe, new Buffer(2 << 20, 50));
return pipe;
}
std::unique_ptr<PacketPipe> WifiNetwork() {
// This represents the buffer on the sender.
std::unique_ptr<PacketPipe> pipe;
BuildPipe(&pipe, new Buffer(256 << 10, 20));
BuildPipe(&pipe, new RandomDrop(0.005));
// This represents the buffer on the router.
BuildPipe(&pipe, new ConstantDelay(1E-3));
BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
BuildPipe(&pipe, new Buffer(256 << 10, 20));
BuildPipe(&pipe, new ConstantDelay(1E-3));
BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
BuildPipe(&pipe, new RandomDrop(0.005));
// This represents the buffer on the receiving device.
BuildPipe(&pipe, new Buffer(256 << 10, 20));
return pipe;
}
std::unique_ptr<PacketPipe> BadNetwork() {
std::unique_ptr<PacketPipe> pipe;
// This represents the buffer on the sender.
BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
BuildPipe(&pipe, new RandomDrop(0.05)); // 5% packet drop
BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1));
// This represents the buffer on the router.
BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 4mbit/s
BuildPipe(&pipe, new ConstantDelay(1E-3));
// Random 40ms every other second
// BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
BuildPipe(&pipe, new RandomUnsortedDelay(5E-3));
// This represents the buffer on the receiving device.
BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
return pipe;
}
std::unique_ptr<PacketPipe> EvilNetwork() {
// This represents the buffer on the sender.
std::unique_ptr<PacketPipe> pipe;
BuildPipe(&pipe, new Buffer(4 << 10, 5)); // 4 kb buf, 2mbit/s
// This represents the buffer on the router.
BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
BuildPipe(&pipe, new ConstantDelay(1E-3));
BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
// This represents the buffer on the receiving device.
BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
return pipe;
}
std::unique_ptr<InterruptedPoissonProcess> DefaultInterruptedPoissonProcess() {
// The following values are taken from a session reported from a user.
// They are experimentally tested to demonstrate challenging network
// conditions. The average bitrate is about 2mbits/s.
// Each element in this vector is the average number of packets sent
// per millisecond. The average changes and rotates every second.
std::vector<double> average_rates;
average_rates.push_back(0.609);
average_rates.push_back(0.495);
average_rates.push_back(0.561);
average_rates.push_back(0.458);
average_rates.push_back(0.538);
average_rates.push_back(0.513);
average_rates.push_back(0.585);
average_rates.push_back(0.592);
average_rates.push_back(0.658);
average_rates.push_back(0.556);
average_rates.push_back(0.371);
average_rates.push_back(0.595);
average_rates.push_back(0.490);
average_rates.push_back(0.980);
average_rates.push_back(0.781);
average_rates.push_back(0.463);
const double burstiness = 0.609;
const double variance = 4.1;
std::unique_ptr<InterruptedPoissonProcess> ipp(
new InterruptedPoissonProcess(average_rates, burstiness, variance, 0));
return ipp;
}
class UDPProxyImpl : public UDPProxy {
public:
UDPProxyImpl(const net::IPEndPoint& local_port,
const net::IPEndPoint& destination,
std::unique_ptr<PacketPipe> to_dest_pipe,
std::unique_ptr<PacketPipe> from_dest_pipe, net::NetLog* net_log)
: local_port_(local_port),
destination_(destination),
destination_is_mutable_(destination.address().empty()),
proxy_thread_("media::cast::test::UdpProxy Thread"),
to_dest_pipe_(std::move(to_dest_pipe)),
from_dest_pipe_(std::move(from_dest_pipe)),
blocked_(false),
weak_factory_(this) {
proxy_thread_.StartWithOptions(
base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
base::WaitableEvent start_event(
base::WaitableEvent::ResetPolicy::AUTOMATIC,
base::WaitableEvent::InitialState::NOT_SIGNALED);
proxy_thread_.task_runner()->PostTask(
FROM_HERE, base::Bind(&UDPProxyImpl::Start, base::Unretained(this),
base::Unretained(&start_event), net_log));
start_event.Wait();
}
~UDPProxyImpl() final {
base::WaitableEvent stop_event(
base::WaitableEvent::ResetPolicy::AUTOMATIC,
base::WaitableEvent::InitialState::NOT_SIGNALED);
proxy_thread_.task_runner()->PostTask(
FROM_HERE, base::Bind(&UDPProxyImpl::Stop, base::Unretained(this),
base::Unretained(&stop_event)));
stop_event.Wait();
proxy_thread_.Stop();
}
void Send(std::unique_ptr<Packet> packet,
const net::IPEndPoint& destination) {
if (blocked_) {
LOG(ERROR) << "Cannot write packet right now: blocked";
return;
}
VLOG(1) << "Sending packet, len = " << packet->size();
// We ignore all problems, callbacks and errors.
// If it didn't work we just drop the packet at and call it a day.
scoped_refptr<net::IOBuffer> buf =
new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
size_t buf_size = packet->size();
int result;
if (destination.address().empty()) {
VLOG(1) << "Destination has not been set yet.";
result = net::ERR_INVALID_ARGUMENT;
} else {
VLOG(1) << "Destination:" << destination.ToString();
result = socket_->SendTo(
buf.get(), static_cast<int>(buf_size), destination,
base::Bind(&UDPProxyImpl::AllowWrite, weak_factory_.GetWeakPtr(), buf,
base::Passed(&packet)));
}
if (result == net::ERR_IO_PENDING) {
blocked_ = true;
} else if (result < 0) {
LOG(ERROR) << "Failed to write packet.";
}
}
private:
void Start(base::WaitableEvent* start_event, net::NetLog* net_log) {
socket_.reset(new net::UDPServerSocket(net_log, net::NetLogSource()));
BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
to_dest_pipe_->InitOnIOThread(base::ThreadTaskRunnerHandle::Get(),
&tick_clock_);
from_dest_pipe_->InitOnIOThread(base::ThreadTaskRunnerHandle::Get(),
&tick_clock_);
VLOG(0) << "From:" << local_port_.ToString();
if (!destination_is_mutable_) VLOG(0) << "To:" << destination_.ToString();
CHECK_GE(socket_->Listen(local_port_), 0);
start_event->Signal();
PollRead();
}
void Stop(base::WaitableEvent* stop_event) {
to_dest_pipe_.reset(NULL);
from_dest_pipe_.reset(NULL);
socket_.reset(NULL);
stop_event->Signal();
}
void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
DCHECK_NE(len, net::ERR_IO_PENDING);
VLOG(1) << "Got packet, len = " << len;
if (len < 0) {
LOG(WARNING) << "Socket read error: " << len;
return;
}
packet_->resize(len);
if (destination_is_mutable_ && set_destination_next_ &&
!(recv_address_ == return_address_) &&
!(recv_address_ == destination_)) {
destination_ = recv_address_;
}
if (recv_address_ == destination_) {
set_destination_next_ = false;
from_dest_pipe_->Send(std::move(packet_));
} else {
set_destination_next_ = true;
VLOG(1) << "Return address = " << recv_address_.ToString();
return_address_ = recv_address_;
to_dest_pipe_->Send(std::move(packet_));
}
}
void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
ProcessPacket(recv_buf, len);
PollRead();
}
void PollRead() {
while (true) {
packet_.reset(new Packet(kMaxPacketSize));
scoped_refptr<net::IOBuffer> recv_buf =
new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
int len =
socket_->RecvFrom(recv_buf.get(), kMaxPacketSize, &recv_address_,
base::Bind(&UDPProxyImpl::ReadCallback,
base::Unretained(this), recv_buf));
if (len == net::ERR_IO_PENDING) break;
ProcessPacket(recv_buf, len);
}
}
void AllowWrite(scoped_refptr<net::IOBuffer> buf,
std::unique_ptr<Packet> packet, int unused_len) {
DCHECK(blocked_);
blocked_ = false;
}
// Input
net::IPEndPoint local_port_;
net::IPEndPoint destination_;
bool destination_is_mutable_;
net::IPEndPoint return_address_;
bool set_destination_next_;
base::DefaultTickClock tick_clock_;
base::Thread proxy_thread_;
std::unique_ptr<net::UDPServerSocket> socket_;
std::unique_ptr<PacketPipe> to_dest_pipe_;
std::unique_ptr<PacketPipe> from_dest_pipe_;
// For receiving.
net::IPEndPoint recv_address_;
std::unique_ptr<Packet> packet_;
// For sending.
bool blocked_;
base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
};
void PacketSender::Send(std::unique_ptr<Packet> packet) {
udp_proxy_->Send(std::move(packet), *destination_);
}
std::unique_ptr<UDPProxy> UDPProxy::Create(
const net::IPEndPoint& local_port, const net::IPEndPoint& destination,
std::unique_ptr<PacketPipe> to_dest_pipe,
std::unique_ptr<PacketPipe> from_dest_pipe, net::NetLog* net_log) {
std::unique_ptr<UDPProxy> ret(
new UDPProxyImpl(local_port, destination, std::move(to_dest_pipe),
std::move(from_dest_pipe), net_log));
return ret;
}
} // namespace test
} // namespace cast
} // namespace media