| // Copyright 2014 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "media/cast/receiver/frame_receiver.h" |
| |
| #include <algorithm> |
| #include <string> |
| |
| #include "base/big_endian.h" |
| #include "base/bind.h" |
| #include "base/logging.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/numerics/safe_conversions.h" |
| #include "media/cast/cast_config.h" |
| #include "media/cast/cast_environment.h" |
| #include "media/cast/constants.h" |
| #include "media/cast/net/rtcp/rtcp_utility.h" |
| |
| namespace { |
| |
| const int kMinSchedulingDelayMs = 1; |
| |
| media::cast::RtcpTimeData CreateRtcpTimeData(base::TimeTicks now) { |
| media::cast::RtcpTimeData ret; |
| ret.timestamp = now; |
| media::cast::ConvertTimeTicksToNtp(now, &ret.ntp_seconds, &ret.ntp_fraction); |
| return ret; |
| } |
| |
| } // namespace |
| |
| namespace cobalt { |
| namespace media { |
| namespace cast { |
| |
| FrameReceiver::FrameReceiver( |
| const scoped_refptr<CastEnvironment>& cast_environment, |
| const FrameReceiverConfig& config, EventMediaType event_media_type, |
| CastTransport* const transport) |
| : cast_environment_(cast_environment), |
| transport_(transport), |
| packet_parser_( |
| config.sender_ssrc, |
| config.rtp_payload_type <= RtpPayloadType::AUDIO_LAST ? 127 : 96), |
| stats_(cast_environment->Clock()), |
| event_media_type_(event_media_type), |
| event_subscriber_(kReceiverRtcpEventHistorySize, event_media_type), |
| rtp_timebase_(config.rtp_timebase), |
| target_playout_delay_( |
| base::TimeDelta::FromMilliseconds(config.rtp_max_delay_ms)), |
| expected_frame_duration_( |
| base::TimeDelta::FromSecondsD(1.0 / config.target_frame_rate)), |
| reports_are_scheduled_(false), |
| framer_(cast_environment->Clock(), this, config.sender_ssrc, true, |
| static_cast<int>(config.rtp_max_delay_ms * |
| config.target_frame_rate / 1000)), |
| rtcp_(cast_environment_->Clock(), config.receiver_ssrc, |
| config.sender_ssrc), |
| is_waiting_for_consecutive_frame_(false), |
| lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()), |
| weak_factory_(this) { |
| transport_->AddValidRtpReceiver(config.sender_ssrc, config.receiver_ssrc); |
| DCHECK_GT(config.rtp_max_delay_ms, 0); |
| DCHECK_GT(config.target_frame_rate, 0); |
| decryptor_.Initialize(config.aes_key, config.aes_iv_mask); |
| cast_environment_->logger()->Subscribe(&event_subscriber_); |
| } |
| |
| FrameReceiver::~FrameReceiver() { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| cast_environment_->logger()->Unsubscribe(&event_subscriber_); |
| } |
| |
| void FrameReceiver::RequestEncodedFrame( |
| const ReceiveEncodedFrameCallback& callback) { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| frame_request_queue_.push_back(callback); |
| EmitAvailableEncodedFrames(); |
| } |
| |
| bool FrameReceiver::ProcessPacket(std::unique_ptr<Packet> packet) { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| |
| if (IsRtcpPacket(&packet->front(), packet->size())) { |
| rtcp_.IncomingRtcpPacket(&packet->front(), packet->size()); |
| } else { |
| RtpCastHeader rtp_header; |
| const uint8_t* payload_data; |
| size_t payload_size; |
| if (!packet_parser_.ParsePacket(&packet->front(), packet->size(), |
| &rtp_header, &payload_data, |
| &payload_size)) { |
| return false; |
| } |
| |
| ProcessParsedPacket(rtp_header, payload_data, payload_size); |
| stats_.UpdateStatistics(rtp_header, rtp_timebase_); |
| } |
| |
| if (!reports_are_scheduled_) { |
| ScheduleNextRtcpReport(); |
| ScheduleNextCastMessage(); |
| reports_are_scheduled_ = true; |
| } |
| |
| return true; |
| } |
| |
| void FrameReceiver::ProcessParsedPacket(const RtpCastHeader& rtp_header, |
| const uint8_t* payload_data, |
| size_t payload_size) { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| |
| const base::TimeTicks now = cast_environment_->Clock()->NowTicks(); |
| |
| frame_id_to_rtp_timestamp_[rtp_header.frame_id.lower_8_bits()] = |
| rtp_header.rtp_timestamp; |
| |
| std::unique_ptr<PacketEvent> receive_event(new PacketEvent()); |
| receive_event->timestamp = now; |
| receive_event->type = PACKET_RECEIVED; |
| receive_event->media_type = event_media_type_; |
| receive_event->rtp_timestamp = rtp_header.rtp_timestamp; |
| receive_event->frame_id = rtp_header.frame_id; |
| receive_event->packet_id = rtp_header.packet_id; |
| receive_event->max_packet_id = rtp_header.max_packet_id; |
| receive_event->size = base::checked_cast<uint32_t>(payload_size); |
| cast_environment_->logger()->DispatchPacketEvent(std::move(receive_event)); |
| |
| bool duplicate = false; |
| const bool complete = |
| framer_.InsertPacket(payload_data, payload_size, rtp_header, &duplicate); |
| |
| // Duplicate packets are ignored. |
| if (duplicate) return; |
| |
| // Update lip-sync values upon receiving the first packet of each frame, or if |
| // they have never been set yet. |
| if (rtp_header.packet_id == 0 || lip_sync_reference_time_.is_null()) { |
| RtpTimeTicks fresh_sync_rtp; |
| base::TimeTicks fresh_sync_reference; |
| if (!rtcp_.GetLatestLipSyncTimes(&fresh_sync_rtp, &fresh_sync_reference)) { |
| // HACK: The sender should have provided Sender Reports before the first |
| // frame was sent. However, the spec does not currently require this. |
| // Therefore, when the data is missing, the local clock is used to |
| // generate reference timestamps. |
| VLOG(2) << "Lip sync info missing. Falling-back to local clock."; |
| fresh_sync_rtp = rtp_header.rtp_timestamp; |
| fresh_sync_reference = now; |
| } |
| // |lip_sync_reference_time_| is always incremented according to the time |
| // delta computed from the difference in RTP timestamps. Then, |
| // |lip_sync_drift_| accounts for clock drift and also smoothes-out any |
| // sudden/discontinuous shifts in the series of reference time values. |
| if (lip_sync_reference_time_.is_null()) { |
| lip_sync_reference_time_ = fresh_sync_reference; |
| } else { |
| // Note: It's okay for the conversion ToTimeDelta() to be approximate |
| // because |lip_sync_drift_| will account for accumulated errors. |
| lip_sync_reference_time_ += |
| (fresh_sync_rtp - lip_sync_rtp_timestamp_).ToTimeDelta(rtp_timebase_); |
| } |
| lip_sync_rtp_timestamp_ = fresh_sync_rtp; |
| lip_sync_drift_.Update(now, |
| fresh_sync_reference - lip_sync_reference_time_); |
| } |
| |
| // Another frame is complete from a non-duplicate packet. Attempt to emit |
| // more frames to satisfy enqueued requests. |
| if (complete) EmitAvailableEncodedFrames(); |
| } |
| |
| void FrameReceiver::CastFeedback(const RtcpCastMessage& cast_message) { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| |
| base::TimeTicks now = cast_environment_->Clock()->NowTicks(); |
| RtpTimeTicks rtp_timestamp = |
| frame_id_to_rtp_timestamp_[cast_message.ack_frame_id.lower_8_bits()]; |
| |
| std::unique_ptr<FrameEvent> ack_sent_event(new FrameEvent()); |
| ack_sent_event->timestamp = now; |
| ack_sent_event->type = FRAME_ACK_SENT; |
| ack_sent_event->media_type = event_media_type_; |
| ack_sent_event->rtp_timestamp = rtp_timestamp; |
| ack_sent_event->frame_id = cast_message.ack_frame_id; |
| cast_environment_->logger()->DispatchFrameEvent(std::move(ack_sent_event)); |
| |
| ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events; |
| event_subscriber_.GetRtcpEventsWithRedundancy(&rtcp_events); |
| SendRtcpReport(rtcp_.local_ssrc(), rtcp_.remote_ssrc(), |
| CreateRtcpTimeData(now), &cast_message, nullptr, |
| target_playout_delay_, &rtcp_events, nullptr); |
| } |
| |
| void FrameReceiver::EmitAvailableEncodedFrames() { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| |
| while (!frame_request_queue_.empty()) { |
| // Attempt to peek at the next completed frame from the |framer_|. |
| // TODO(miu): We should only be peeking at the metadata, and not copying the |
| // payload yet! Or, at least, peek using a StringPiece instead of a copy. |
| std::unique_ptr<EncodedFrame> encoded_frame(new EncodedFrame()); |
| bool is_consecutively_next_frame = false; |
| bool have_multiple_complete_frames = false; |
| if (!framer_.GetEncodedFrame(encoded_frame.get(), |
| &is_consecutively_next_frame, |
| &have_multiple_complete_frames)) { |
| VLOG(1) << "Wait for more packets to produce a completed frame."; |
| return; // ProcessParsedPacket() will invoke this method in the future. |
| } |
| |
| const base::TimeTicks now = cast_environment_->Clock()->NowTicks(); |
| const base::TimeTicks playout_time = GetPlayoutTime(*encoded_frame); |
| |
| // If we have multiple decodable frames, and the current frame is |
| // too old, then skip it and decode the next frame instead. |
| if (have_multiple_complete_frames && now > playout_time) { |
| framer_.ReleaseFrame(encoded_frame->frame_id); |
| continue; |
| } |
| |
| // If |framer_| has a frame ready that is out of sequence, examine the |
| // playout time to determine whether it's acceptable to continue, thereby |
| // skipping one or more frames. Skip if the missing frame wouldn't complete |
| // playing before the start of playback of the available frame. |
| if (!is_consecutively_next_frame) { |
| // This assumes that decoding takes as long as playing, which might |
| // not be true. |
| const base::TimeTicks earliest_possible_end_time_of_missing_frame = |
| now + expected_frame_duration_ * 2; |
| if (earliest_possible_end_time_of_missing_frame < playout_time) { |
| VLOG(1) << "Wait for next consecutive frame instead of skipping."; |
| if (!is_waiting_for_consecutive_frame_) { |
| is_waiting_for_consecutive_frame_ = true; |
| cast_environment_->PostDelayedTask( |
| CastEnvironment::MAIN, FROM_HERE, |
| base::Bind(&FrameReceiver::EmitAvailableEncodedFramesAfterWaiting, |
| weak_factory_.GetWeakPtr()), |
| playout_time - now); |
| } |
| return; |
| } |
| } |
| |
| // At this point, we have the complete next frame, or a decodable |
| // frame from somewhere later in the stream, AND we have given up |
| // on waiting for any frames in between, so now we can ACK the frame. |
| framer_.AckFrame(encoded_frame->frame_id); |
| |
| // Decrypt the payload data in the frame, if crypto is being used. |
| if (decryptor_.is_activated()) { |
| std::string decrypted_data; |
| if (!decryptor_.Decrypt(encoded_frame->frame_id, encoded_frame->data, |
| &decrypted_data)) { |
| // Decryption failed. Give up on this frame. |
| framer_.ReleaseFrame(encoded_frame->frame_id); |
| continue; |
| } |
| encoded_frame->data.swap(decrypted_data); |
| } |
| |
| // At this point, we have a decrypted EncodedFrame ready to be emitted. |
| encoded_frame->reference_time = playout_time; |
| framer_.ReleaseFrame(encoded_frame->frame_id); |
| if (encoded_frame->new_playout_delay_ms) { |
| target_playout_delay_ = base::TimeDelta::FromMilliseconds( |
| encoded_frame->new_playout_delay_ms); |
| } |
| cast_environment_->PostTask( |
| CastEnvironment::MAIN, FROM_HERE, |
| base::Bind(&FrameReceiver::EmitOneFrame, weak_factory_.GetWeakPtr(), |
| frame_request_queue_.front(), base::Passed(&encoded_frame))); |
| frame_request_queue_.pop_front(); |
| } |
| } |
| |
| void FrameReceiver::EmitAvailableEncodedFramesAfterWaiting() { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| DCHECK(is_waiting_for_consecutive_frame_); |
| is_waiting_for_consecutive_frame_ = false; |
| EmitAvailableEncodedFrames(); |
| } |
| |
| void FrameReceiver::EmitOneFrame( |
| const ReceiveEncodedFrameCallback& callback, |
| std::unique_ptr<EncodedFrame> encoded_frame) const { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| if (!callback.is_null()) callback.Run(std::move(encoded_frame)); |
| } |
| |
| base::TimeTicks FrameReceiver::GetPlayoutTime(const EncodedFrame& frame) const { |
| base::TimeDelta target_playout_delay = target_playout_delay_; |
| if (frame.new_playout_delay_ms) { |
| target_playout_delay = |
| base::TimeDelta::FromMilliseconds(frame.new_playout_delay_ms); |
| } |
| return lip_sync_reference_time_ + lip_sync_drift_.Current() + |
| (frame.rtp_timestamp - lip_sync_rtp_timestamp_) |
| .ToTimeDelta(rtp_timebase_) + |
| target_playout_delay; |
| } |
| |
| void FrameReceiver::ScheduleNextCastMessage() { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| base::TimeTicks send_time; |
| framer_.TimeToSendNextCastMessage(&send_time); |
| base::TimeDelta time_to_send = |
| send_time - cast_environment_->Clock()->NowTicks(); |
| time_to_send = std::max( |
| time_to_send, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs)); |
| cast_environment_->PostDelayedTask( |
| CastEnvironment::MAIN, FROM_HERE, |
| base::Bind(&FrameReceiver::SendNextCastMessage, |
| weak_factory_.GetWeakPtr()), |
| time_to_send); |
| } |
| |
| void FrameReceiver::SendNextCastMessage() { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| framer_.SendCastMessage(); // Will only send a message if it is time. |
| ScheduleNextCastMessage(); |
| } |
| |
| void FrameReceiver::ScheduleNextRtcpReport() { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| |
| cast_environment_->PostDelayedTask( |
| CastEnvironment::MAIN, FROM_HERE, |
| base::Bind(&FrameReceiver::SendNextRtcpReport, |
| weak_factory_.GetWeakPtr()), |
| base::TimeDelta::FromMilliseconds(kRtcpReportIntervalMs)); |
| } |
| |
| void FrameReceiver::SendNextRtcpReport() { |
| DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); |
| const base::TimeTicks now = cast_environment_->Clock()->NowTicks(); |
| RtpReceiverStatistics stats = stats_.GetStatistics(); |
| SendRtcpReport(rtcp_.local_ssrc(), rtcp_.remote_ssrc(), |
| CreateRtcpTimeData(now), nullptr, nullptr, base::TimeDelta(), |
| nullptr, &stats); |
| ScheduleNextRtcpReport(); |
| } |
| |
| void FrameReceiver::SendRtcpReport( |
| uint32_t rtp_receiver_ssrc, uint32_t rtp_sender_ssrc, |
| const RtcpTimeData& time_data, const RtcpCastMessage* cast_message, |
| const RtcpPliMessage* pli_message, base::TimeDelta target_delay, |
| const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events, |
| const RtpReceiverStatistics* rtp_receiver_statistics) { |
| transport_->InitializeRtpReceiverRtcpBuilder(rtp_receiver_ssrc, time_data); |
| RtcpReportBlock report_block; |
| if (rtp_receiver_statistics) { |
| report_block.remote_ssrc = 0; // Not needed to set send side. |
| report_block.media_ssrc = |
| rtp_sender_ssrc; // SSRC of the RTP packet sender. |
| report_block.fraction_lost = rtp_receiver_statistics->fraction_lost; |
| report_block.cumulative_lost = rtp_receiver_statistics->cumulative_lost; |
| report_block.extended_high_sequence_number = |
| rtp_receiver_statistics->extended_high_sequence_number; |
| report_block.jitter = rtp_receiver_statistics->jitter; |
| report_block.last_sr = rtcp_.last_report_truncated_ntp(); |
| base::TimeTicks last_report_received_time = |
| rtcp_.time_last_report_received(); |
| if (!last_report_received_time.is_null()) { |
| uint32_t delay_seconds = 0; |
| uint32_t delay_fraction = 0; |
| base::TimeDelta delta = time_data.timestamp - last_report_received_time; |
| ConvertTimeToFractions(delta.InMicroseconds(), &delay_seconds, |
| &delay_fraction); |
| report_block.delay_since_last_sr = |
| ConvertToNtpDiff(delay_seconds, delay_fraction); |
| } else { |
| report_block.delay_since_last_sr = 0; |
| } |
| transport_->AddRtpReceiverReport(report_block); |
| } |
| if (cast_message) transport_->AddCastFeedback(*cast_message, target_delay); |
| if (pli_message) transport_->AddPli(*pli_message); |
| if (rtcp_events) transport_->AddRtcpEvents(*rtcp_events); |
| transport_->SendRtcpFromRtpReceiver(); |
| } |
| |
| } // namespace cast |
| } // namespace media |
| } // namespace cobalt |