| // Copyright 2014 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "media/cast/net/pacing/paced_sender.h" |
| |
| #include "base/big_endian.h" |
| #include "base/bind.h" |
| #include "base/logging.h" |
| #include "base/numerics/safe_conversions.h" |
| |
| namespace media { |
| namespace cast { |
| |
| namespace { |
| |
| static const int64_t kPacingIntervalMs = 10; |
| // Each frame will be split into no more than kPacingMaxBurstsPerFrame |
| // bursts of packets. |
| static const size_t kPacingMaxBurstsPerFrame = 3; |
| static const size_t kMaxDedupeWindowMs = 500; |
| |
| } // namespace |
| |
| DedupInfo::DedupInfo() : last_byte_acked_for_audio(0) {} |
| |
| PacketKey::PacketKey() : ssrc(0), packet_id(0) {} |
| |
| PacketKey::PacketKey(base::TimeTicks capture_time, |
| uint32_t ssrc, |
| FrameId frame_id, |
| uint16_t packet_id) |
| : capture_time(capture_time), |
| ssrc(ssrc), |
| frame_id(frame_id), |
| packet_id(packet_id) {} |
| |
| PacketKey::PacketKey(const PacketKey& other) = default; |
| |
| PacketKey::~PacketKey() = default; |
| |
| struct PacedSender::PacketSendRecord { |
| PacketSendRecord() |
| : last_byte_sent(0), last_byte_sent_for_audio(0), cancel_count(0) {} |
| |
| base::TimeTicks time; // Time when the packet was sent. |
| int64_t last_byte_sent; // Number of bytes sent to network just after this |
| // packet was sent. |
| int64_t last_byte_sent_for_audio; // Number of bytes sent to network from |
| // audio stream just before this packet. |
| int cancel_count; // Number of times the packet was canceled (debugging). |
| }; |
| |
| struct PacedSender::RtpSession { |
| explicit RtpSession(bool is_audio_stream) |
| : last_byte_sent(0), is_audio(is_audio_stream) {} |
| RtpSession() = default; |
| |
| // Tracks recently-logged RTP timestamps so that it can expand the truncated |
| // values found in packets. |
| RtpTimeTicks last_logged_rtp_timestamp_; |
| int64_t last_byte_sent; |
| bool is_audio; |
| }; |
| |
| PacedSender::PacedSender( |
| size_t target_burst_size, |
| size_t max_burst_size, |
| const base::TickClock* clock, |
| std::vector<PacketEvent>* recent_packet_events, |
| PacketTransport* transport, |
| const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner) |
| : clock_(clock), |
| recent_packet_events_(recent_packet_events), |
| transport_(transport), |
| transport_task_runner_(transport_task_runner), |
| last_byte_sent_for_audio_(0), |
| target_burst_size_(target_burst_size), |
| max_burst_size_(max_burst_size), |
| current_max_burst_size_(target_burst_size_), |
| next_max_burst_size_(target_burst_size_), |
| next_next_max_burst_size_(target_burst_size_), |
| current_burst_size_(0), |
| state_(State_Unblocked) {} |
| |
| PacedSender::~PacedSender() = default; |
| |
| void PacedSender::RegisterSsrc(uint32_t ssrc, bool is_audio) { |
| if (sessions_.find(ssrc) != sessions_.end()) |
| DVLOG(1) << "Re-register ssrc: " << ssrc; |
| |
| sessions_[ssrc] = RtpSession(is_audio); |
| } |
| |
| void PacedSender::RegisterPrioritySsrc(uint32_t ssrc) { |
| priority_ssrcs_.push_back(ssrc); |
| } |
| |
| int64_t PacedSender::GetLastByteSentForPacket(const PacketKey& packet_key) { |
| PacketSendHistory::const_iterator it = send_history_.find(packet_key); |
| if (it == send_history_.end()) |
| return 0; |
| return it->second.last_byte_sent; |
| } |
| |
| int64_t PacedSender::GetLastByteSentForSsrc(uint32_t ssrc) { |
| auto it = sessions_.find(ssrc); |
| // Return 0 for unknown session. |
| if (it == sessions_.end()) |
| return 0; |
| return it->second.last_byte_sent; |
| } |
| |
| bool PacedSender::SendPackets(const SendPacketVector& packets) { |
| if (packets.empty()) { |
| return true; |
| } |
| const bool high_priority = IsHighPriority(packets.begin()->first); |
| for (size_t i = 0; i < packets.size(); i++) { |
| if (VLOG_IS_ON(2)) { |
| PacketSendHistory::const_iterator history_it = |
| send_history_.find(packets[i].first); |
| if (history_it != send_history_.end() && |
| history_it->second.cancel_count > 0) { |
| VLOG(2) << "PacedSender::SendPackets() called for packet CANCELED " |
| << history_it->second.cancel_count << " times: " |
| << "ssrc=" << packets[i].first.ssrc |
| << ", frame_id=" << packets[i].first.frame_id |
| << ", packet_id=" << packets[i].first.packet_id; |
| } |
| } |
| |
| DCHECK(IsHighPriority(packets[i].first) == high_priority); |
| if (high_priority) { |
| priority_packet_list_[packets[i].first] = |
| make_pair(PacketType_Normal, packets[i].second); |
| } else { |
| packet_list_[packets[i].first] = |
| make_pair(PacketType_Normal, packets[i].second); |
| } |
| } |
| if (state_ == State_Unblocked) { |
| SendStoredPackets(); |
| } |
| return true; |
| } |
| |
| bool PacedSender::ShouldResend(const PacketKey& packet_key, |
| const DedupInfo& dedup_info, |
| const base::TimeTicks& now) { |
| PacketSendHistory::const_iterator it = send_history_.find(packet_key); |
| |
| // No history of previous transmission. It might be sent too long ago. |
| if (it == send_history_.end()) |
| return true; |
| |
| // Suppose there is request to retransmit X and there is an audio |
| // packet Y sent just before X. Reject retransmission of X if ACK for |
| // Y has not been received. |
| // Only do this for video packets. |
| // |
| // TODO(miu): This sounds wrong. Audio packets are always transmitted first |
| // (because they are put in |priority_packet_list_|, see PopNextPacket()). |
| auto session_it = sessions_.find(packet_key.ssrc); |
| // The session should always have been registered in |sessions_|. |
| DCHECK(session_it != sessions_.end()); |
| if (!session_it->second.is_audio) { |
| if (dedup_info.last_byte_acked_for_audio && |
| it->second.last_byte_sent_for_audio && |
| dedup_info.last_byte_acked_for_audio < |
| it->second.last_byte_sent_for_audio) { |
| return false; |
| } |
| } |
| // Retransmission interval has to be greater than |resend_interval|. |
| if (now - it->second.time < dedup_info.resend_interval) |
| return false; |
| return true; |
| } |
| |
| bool PacedSender::ResendPackets(const SendPacketVector& packets, |
| const DedupInfo& dedup_info) { |
| if (packets.empty()) { |
| return true; |
| } |
| const bool high_priority = IsHighPriority(packets.begin()->first); |
| const base::TimeTicks now = clock_->NowTicks(); |
| for (size_t i = 0; i < packets.size(); i++) { |
| if (VLOG_IS_ON(2)) { |
| PacketSendHistory::const_iterator history_it = |
| send_history_.find(packets[i].first); |
| if (history_it != send_history_.end() && |
| history_it->second.cancel_count > 0) { |
| VLOG(2) << "PacedSender::ReendPackets() called for packet CANCELED " |
| << history_it->second.cancel_count << " times: " |
| << "ssrc=" << packets[i].first.ssrc |
| << ", frame_id=" << packets[i].first.frame_id |
| << ", packet_id=" << packets[i].first.packet_id; |
| } |
| } |
| |
| if (!ShouldResend(packets[i].first, dedup_info, now)) { |
| LogPacketEvent(packets[i].second->data, PACKET_RTX_REJECTED); |
| continue; |
| } |
| |
| DCHECK(IsHighPriority(packets[i].first) == high_priority); |
| if (high_priority) { |
| priority_packet_list_[packets[i].first] = |
| make_pair(PacketType_Resend, packets[i].second); |
| } else { |
| packet_list_[packets[i].first] = |
| make_pair(PacketType_Resend, packets[i].second); |
| } |
| } |
| if (state_ == State_Unblocked) { |
| SendStoredPackets(); |
| } |
| return true; |
| } |
| |
| bool PacedSender::SendRtcpPacket(uint32_t ssrc, PacketRef packet) { |
| if (state_ == State_TransportBlocked) { |
| const PacketKey key(base::TimeTicks(), ssrc, FrameId::first(), 0); |
| priority_packet_list_[key] = make_pair(PacketType_RTCP, packet); |
| } else { |
| // We pass the RTCP packets straight through. |
| if (!transport_->SendPacket(packet, |
| base::BindOnce(&PacedSender::SendStoredPackets, |
| weak_factory_.GetWeakPtr()))) { |
| state_ = State_TransportBlocked; |
| } |
| } |
| return true; |
| } |
| |
| void PacedSender::CancelSendingPacket(const PacketKey& packet_key) { |
| packet_list_.erase(packet_key); |
| priority_packet_list_.erase(packet_key); |
| |
| if (VLOG_IS_ON(2)) { |
| auto history_it = send_history_.find(packet_key); |
| if (history_it != send_history_.end()) |
| ++history_it->second.cancel_count; |
| } |
| } |
| |
| PacketRef PacedSender::PopNextPacket(PacketType* packet_type, |
| PacketKey* packet_key) { |
| // Always pop from the priority list first. |
| PacketList* list = |
| !priority_packet_list_.empty() ? &priority_packet_list_ : &packet_list_; |
| DCHECK(!list->empty()); |
| |
| // Determine which packet in the frame should be popped by examining the |
| // |send_history_| for prior transmission attempts. Packets that have never |
| // been transmitted will be popped first. If all packets have transmitted |
| // before, pop the one that has not been re-attempted for the longest time. |
| auto it = list->begin(); |
| PacketKey last_key = it->first; |
| last_key.packet_id = UINT16_C(0xffff); |
| PacketSendHistory::const_iterator history_it = |
| send_history_.lower_bound(it->first); |
| base::TimeTicks earliest_send_time = |
| base::TimeTicks() + base::TimeDelta::Max(); |
| auto found_it = it; |
| while (true) { |
| if (history_it == send_history_.end() || it->first < history_it->first) { |
| // There is no send history for this packet, which means it has not been |
| // transmitted yet. |
| found_it = it; |
| break; |
| } |
| |
| DCHECK(it->first == history_it->first); |
| if (history_it->second.time < earliest_send_time) { |
| earliest_send_time = history_it->second.time; |
| found_it = it; |
| } |
| |
| // Advance to next packet for the current frame, or break if there are no |
| // more. |
| ++it; |
| if (it == list->end() || last_key < it->first) |
| break; |
| |
| // Advance to next history entry. Since there may be "holes" in the packet |
| // list (e.g., due to packets canceled for retransmission), it's possible |
| // |history_it| will have to be advanced more than once even though |it| was |
| // only advanced once. |
| do { |
| ++history_it; |
| } while (history_it != send_history_.end() && |
| history_it->first < it->first); |
| } |
| |
| *packet_type = found_it->second.first; |
| *packet_key = found_it->first; |
| PacketRef ret = found_it->second.second; |
| list->erase(found_it); |
| return ret; |
| } |
| |
| bool PacedSender::IsHighPriority(const PacketKey& packet_key) const { |
| return std::find(priority_ssrcs_.begin(), priority_ssrcs_.end(), |
| packet_key.ssrc) != priority_ssrcs_.end(); |
| } |
| |
| bool PacedSender::empty() const { |
| return packet_list_.empty() && priority_packet_list_.empty(); |
| } |
| |
| size_t PacedSender::size() const { |
| return packet_list_.size() + priority_packet_list_.size(); |
| } |
| |
| // This function can be called from three places: |
| // 1. User called one of the Send* functions and we were in an unblocked state. |
| // 2. state_ == State_TransportBlocked and the transport is calling us to |
| // let us know that it's ok to send again. |
| // 3. state_ == State_BurstFull and there are still packets to send. In this |
| // case we called PostDelayedTask on this function to start a new burst. |
| void PacedSender::SendStoredPackets() { |
| State previous_state = state_; |
| state_ = State_Unblocked; |
| if (empty()) { |
| return; |
| } |
| |
| base::TimeTicks now = clock_->NowTicks(); |
| // I don't actually trust that PostDelayTask(x - now) will mean that |
| // now >= x when the call happens, so check if the previous state was |
| // State_BurstFull too. |
| if (now >= burst_end_ || previous_state == State_BurstFull) { |
| // Start a new burst. |
| current_burst_size_ = 0; |
| burst_end_ = now + base::Milliseconds(kPacingIntervalMs); |
| |
| // The goal here is to try to send out the queued packets over the next |
| // three bursts, while trying to keep the burst size below 10 if possible. |
| // We have some evidence that sending more than 12 packets in a row doesn't |
| // work very well, but we don't actually know why yet. Sending out packets |
| // sooner is better than sending out packets later as that gives us more |
| // time to re-send them if needed. So if we have less than 30 packets, just |
| // send 10 at a time. If we have less than 60 packets, send n / 3 at a time. |
| // if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s |
| // which is more bandwidth than the cast library should need, and sending |
| // out more data per second is unlikely to be helpful. |
| size_t max_burst_size = std::min( |
| max_burst_size_, |
| std::max(target_burst_size_, size() / kPacingMaxBurstsPerFrame)); |
| current_max_burst_size_ = std::max(next_max_burst_size_, max_burst_size); |
| next_max_burst_size_ = std::max(next_next_max_burst_size_, max_burst_size); |
| next_next_max_burst_size_ = max_burst_size; |
| } |
| |
| base::RepeatingClosure cb = base::BindRepeating( |
| &PacedSender::SendStoredPackets, weak_factory_.GetWeakPtr()); |
| while (!empty()) { |
| if (current_burst_size_ >= current_max_burst_size_) { |
| transport_task_runner_->PostDelayedTask(FROM_HERE, |
| cb, |
| burst_end_ - now); |
| state_ = State_BurstFull; |
| return; |
| } |
| PacketType packet_type; |
| PacketKey packet_key; |
| PacketRef packet = PopNextPacket(&packet_type, &packet_key); |
| PacketSendRecord* const send_record = &(send_history_[packet_key]); |
| send_record->time = now; |
| |
| if (send_record->cancel_count > 0 && packet_type != PacketType_RTCP) { |
| VLOG(2) << "PacedSender is sending a packet known to have been CANCELED " |
| << send_record->cancel_count << " times: " |
| << "ssrc=" << packet_key.ssrc |
| << ", frame_id=" << packet_key.frame_id |
| << ", packet_id=" << packet_key.packet_id; |
| } |
| |
| switch (packet_type) { |
| case PacketType_Resend: |
| LogPacketEvent(packet->data, PACKET_RETRANSMITTED); |
| break; |
| case PacketType_Normal: |
| LogPacketEvent(packet->data, PACKET_SENT_TO_NETWORK); |
| break; |
| case PacketType_RTCP: |
| break; |
| } |
| |
| const bool socket_blocked = !transport_->SendPacket(packet, cb); |
| |
| // Save the send record. |
| send_record->last_byte_sent = transport_->GetBytesSent(); |
| send_record->last_byte_sent_for_audio = last_byte_sent_for_audio_; |
| send_history_buffer_[packet_key] = *send_record; |
| |
| auto it = sessions_.find(packet_key.ssrc); |
| // The session should always have been registered in |sessions_|. |
| DCHECK(it != sessions_.end()); |
| it->second.last_byte_sent = send_record->last_byte_sent; |
| if (it->second.is_audio) |
| last_byte_sent_for_audio_ = send_record->last_byte_sent; |
| |
| if (socket_blocked) { |
| state_ = State_TransportBlocked; |
| return; |
| } |
| current_burst_size_++; |
| } |
| |
| // Keep ~0.5 seconds of data (1000 packets). |
| // |
| // TODO(miu): This has no relation to the actual size of the frames, and so |
| // there's no way to reason whether 1000 is enough or too much, or whatever. |
| if (send_history_buffer_.size() >= |
| max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs) { |
| send_history_.swap(send_history_buffer_); |
| send_history_buffer_.clear(); |
| } |
| DCHECK_LE(send_history_buffer_.size(), |
| max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs); |
| state_ = State_Unblocked; |
| } |
| |
| void PacedSender::LogPacketEvent(const Packet& packet, CastLoggingEvent type) { |
| if (!recent_packet_events_) |
| return; |
| |
| recent_packet_events_->push_back(PacketEvent()); |
| PacketEvent& event = recent_packet_events_->back(); |
| |
| // Populate the new PacketEvent by parsing the wire-format |packet|. |
| // |
| // TODO(miu): This parsing logic belongs in RtpParser. |
| event.timestamp = clock_->NowTicks(); |
| event.type = type; |
| base::BigEndianReader reader(reinterpret_cast<const char*>(&packet[0]), |
| packet.size()); |
| bool success = reader.Skip(4); |
| uint32_t truncated_rtp_timestamp; |
| success &= reader.ReadU32(&truncated_rtp_timestamp); |
| uint32_t ssrc; |
| success &= reader.ReadU32(&ssrc); |
| |
| auto it = sessions_.find(ssrc); |
| // The session should always have been registered in |sessions_|. |
| DCHECK(it != sessions_.end()); |
| event.rtp_timestamp = it->second.last_logged_rtp_timestamp_ = |
| it->second.last_logged_rtp_timestamp_.Expand(truncated_rtp_timestamp); |
| event.media_type = it->second.is_audio ? AUDIO_EVENT : VIDEO_EVENT; |
| success &= reader.Skip(2); |
| success &= reader.ReadU16(&event.packet_id); |
| success &= reader.ReadU16(&event.max_packet_id); |
| event.size = base::checked_cast<uint32_t>(packet.size()); |
| DCHECK(success); |
| } |
| |
| } // namespace cast |
| } // namespace media |