| // Copyright 2016 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "media/remoting/courier_renderer.h" |
| |
| #include <algorithm> |
| #include <limits> |
| #include <memory> |
| #include <utility> |
| |
| #include "base/bind.h" |
| #include "base/callback_helpers.h" |
| #include "base/memory/ptr_util.h" |
| #include "base/numerics/safe_math.h" |
| #include "base/threading/thread_task_runner_handle.h" |
| #include "base/time/default_tick_clock.h" |
| #include "base/time/time.h" |
| #include "media/base/bind_to_current_loop.h" |
| #include "media/base/buffering_state.h" |
| #include "media/base/media_resource.h" |
| #include "media/base/renderer_client.h" |
| #include "media/base/video_renderer_sink.h" |
| #include "media/base/waiting.h" |
| #include "media/remoting/demuxer_stream_adapter.h" |
| #include "media/remoting/proto_enum_utils.h" |
| #include "media/remoting/proto_utils.h" |
| #include "media/remoting/renderer_controller.h" |
| |
| using openscreen::cast::RpcMessenger; |
| |
| namespace media { |
| namespace remoting { |
| |
| namespace { |
| |
| // The moving time window to track the media time and statistics updates. |
| constexpr base::TimeDelta kTrackingWindow = base::Seconds(5); |
| |
| // The allowed delay for the remoting playback. When continuously exceeds this |
| // limit for |kPlaybackDelayCountThreshold| times, the user experience is likely |
| // poor and the controller is notified. |
| constexpr base::TimeDelta kMediaPlaybackDelayThreshold = |
| base::Milliseconds(750); |
| constexpr int kPlaybackDelayCountThreshold = 10; |
| |
| // The allowed percentage of the number of video frames dropped vs. the number |
| // of the video frames decoded. When exceeds this limit, the user experience is |
| // likely poor and the controller is notified. |
| constexpr int kMaxNumVideoFramesDroppedPercentage = 3; |
| |
| // The time period to allow receiver get stable after playback rate change or |
| // Flush(). |
| constexpr base::TimeDelta kStabilizationPeriod = base::Seconds(2); |
| |
| // The amount of time between polling the DemuxerStreamAdapters to measure their |
| // data flow rates for metrics. |
| constexpr base::TimeDelta kDataFlowPollPeriod = base::Seconds(10); |
| |
| } // namespace |
| |
| CourierRenderer::CourierRenderer( |
| scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, |
| const base::WeakPtr<RendererController>& controller, |
| VideoRendererSink* video_renderer_sink) |
| : state_(STATE_UNINITIALIZED), |
| main_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| media_task_runner_(std::move(media_task_runner)), |
| media_resource_(nullptr), |
| client_(nullptr), |
| controller_(controller), |
| rpc_messenger_(controller_->GetRpcMessenger()), |
| rpc_handle_(rpc_messenger_->GetUniqueHandle()), |
| remote_renderer_handle_(RpcMessenger::kInvalidHandle), |
| video_renderer_sink_(video_renderer_sink), |
| clock_(base::DefaultTickClock::GetInstance()) { |
| // Note: The constructor is running on the main thread, but will be destroyed |
| // on the media thread. Therefore, all weak pointers must be dereferenced on |
| // the media thread. |
| rpc_messenger_->RegisterMessageReceiverCallback( |
| rpc_handle_, |
| [runner = media_task_runner_, ptr = weak_factory_.GetWeakPtr()]( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| if (ptr) { |
| CourierRenderer::OnMessageReceivedOnMainThread(runner, ptr, |
| std::move(message)); |
| } else { |
| LOG(WARNING) << "Invalid weak factory pointer."; |
| } |
| }); |
| } |
| |
| CourierRenderer::~CourierRenderer() { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| |
| // Post task on main thread to unregister message receiver. |
| main_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&CourierRenderer::DeregisterFromRpcMessaging, |
| weak_factory_.GetWeakPtr())); |
| |
| if (video_renderer_sink_) { |
| video_renderer_sink_->PaintSingleFrame( |
| VideoFrame::CreateBlackFrame(gfx::Size(1280, 720))); |
| } |
| } |
| |
| void CourierRenderer::Initialize(MediaResource* media_resource, |
| RendererClient* client, |
| PipelineStatusCallback init_cb) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(media_resource); |
| DCHECK(client); |
| |
| if (state_ != STATE_UNINITIALIZED) { |
| media_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce(std::move(init_cb), PIPELINE_ERROR_INVALID_STATE)); |
| return; |
| } |
| |
| media_resource_ = media_resource; |
| client_ = client; |
| init_workflow_done_callback_ = std::move(init_cb); |
| |
| state_ = STATE_CREATE_PIPE; |
| |
| // TODO(servolk): Add support for multiple streams. For now use the first |
| // enabled audio and video streams to preserve the existing behavior. |
| ::media::DemuxerStream* audio_demuxer_stream = |
| media_resource_->GetFirstStream(DemuxerStream::AUDIO); |
| ::media::DemuxerStream* video_demuxer_stream = |
| media_resource_->GetFirstStream(DemuxerStream::VIDEO); |
| |
| // Establish remoting data pipe connection using main thread. |
| uint32_t data_pipe_capacity = |
| DemuxerStreamAdapter::kMojoDataPipeCapacityInBytes; |
| main_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce( |
| &RendererController::StartDataPipe, controller_, data_pipe_capacity, |
| audio_demuxer_stream, video_demuxer_stream, |
| base::BindOnce(&CourierRenderer::OnDataPipeCreatedOnMainThread, |
| media_task_runner_, weak_factory_.GetWeakPtr(), |
| rpc_messenger_))); |
| } |
| |
| void CourierRenderer::SetLatencyHint( |
| absl::optional<base::TimeDelta> latency_hint) {} |
| |
| void CourierRenderer::Flush(base::OnceClosure flush_cb) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(!flush_cb_); |
| |
| if (state_ != STATE_PLAYING) { |
| DCHECK_EQ(state_, STATE_ERROR); |
| // In the error state, this renderer will be shut down shortly. To prevent |
| // breaking the pipeline impl, just run the done callback (interface |
| // requirement). |
| media_task_runner_->PostTask(FROM_HERE, std::move(flush_cb)); |
| return; |
| } |
| |
| state_ = STATE_FLUSHING; |
| absl::optional<uint32_t> flush_audio_count; |
| if (audio_demuxer_stream_adapter_) |
| flush_audio_count = audio_demuxer_stream_adapter_->SignalFlush(true); |
| absl::optional<uint32_t> flush_video_count; |
| if (video_demuxer_stream_adapter_) |
| flush_video_count = video_demuxer_stream_adapter_->SignalFlush(true); |
| // Makes sure flush count is valid if stream is available or both audio and |
| // video agrees on the same flushing state. |
| if ((audio_demuxer_stream_adapter_ && !flush_audio_count.has_value()) || |
| (video_demuxer_stream_adapter_ && !flush_video_count.has_value()) || |
| (audio_demuxer_stream_adapter_ && video_demuxer_stream_adapter_ && |
| flush_audio_count.has_value() != flush_video_count.has_value())) { |
| return; |
| } |
| |
| flush_cb_ = std::move(flush_cb); |
| |
| // Issues RPC_R_FLUSHUNTIL RPC message. |
| auto rpc = std::make_unique<openscreen::cast::RpcMessage>(); |
| rpc->set_handle(remote_renderer_handle_); |
| rpc->set_proc(openscreen::cast::RpcMessage::RPC_R_FLUSHUNTIL); |
| openscreen::cast::RendererFlushUntil* message = |
| rpc->mutable_renderer_flushuntil_rpc(); |
| if (flush_audio_count.has_value()) |
| message->set_audio_count(*flush_audio_count); |
| if (flush_video_count.has_value()) |
| message->set_video_count(*flush_video_count); |
| message->set_callback_handle(rpc_handle_); |
| SendRpcToRemote(std::move(rpc)); |
| } |
| |
| void CourierRenderer::StartPlayingFrom(base::TimeDelta time) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| |
| if (state_ != STATE_PLAYING) { |
| DCHECK_EQ(state_, STATE_ERROR); |
| return; |
| } |
| |
| // Issues RPC_R_STARTPLAYINGFROM RPC message. |
| auto rpc = std::make_unique<openscreen::cast::RpcMessage>(); |
| rpc->set_handle(remote_renderer_handle_); |
| rpc->set_proc(openscreen::cast::RpcMessage::RPC_R_STARTPLAYINGFROM); |
| rpc->set_integer64_value(time.InMicroseconds()); |
| SendRpcToRemote(std::move(rpc)); |
| |
| { |
| base::AutoLock auto_lock(time_lock_); |
| current_media_time_ = time; |
| } |
| ResetMeasurements(); |
| } |
| |
| void CourierRenderer::SetPlaybackRate(double playback_rate) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| |
| if (state_ != STATE_FLUSHING && state_ != STATE_PLAYING) { |
| DCHECK_EQ(state_, STATE_ERROR); |
| return; |
| } |
| |
| // Issues RPC_R_SETPLAYBACKRATE RPC message. |
| auto rpc = std::make_unique<openscreen::cast::RpcMessage>(); |
| rpc->set_handle(remote_renderer_handle_); |
| rpc->set_proc(openscreen::cast::RpcMessage::RPC_R_SETPLAYBACKRATE); |
| rpc->set_double_value(playback_rate); |
| SendRpcToRemote(std::move(rpc)); |
| playback_rate_ = playback_rate; |
| ResetMeasurements(); |
| } |
| |
| void CourierRenderer::SetVolume(float volume) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| |
| volume_ = volume; |
| if (state_ != STATE_FLUSHING && state_ != STATE_PLAYING) |
| return; |
| |
| // Issues RPC_R_SETVOLUME RPC message. |
| auto rpc = std::make_unique<openscreen::cast::RpcMessage>(); |
| rpc->set_handle(remote_renderer_handle_); |
| rpc->set_proc(openscreen::cast::RpcMessage::RPC_R_SETVOLUME); |
| rpc->set_double_value(volume); |
| SendRpcToRemote(std::move(rpc)); |
| } |
| |
| base::TimeDelta CourierRenderer::GetMediaTime() { |
| // No BelongsToCurrentThread() checking because this can be called from other |
| // threads. |
| // TODO(erickung): Interpolate current media time using local system time. |
| // Current receiver is to update |current_media_time_| every 250ms. But it |
| // needs to lower the update frequency in order to reduce network usage. Hence |
| // the interpolation is needed after receiver implementation is changed. |
| base::AutoLock auto_lock(time_lock_); |
| return current_media_time_; |
| } |
| |
| // static |
| void CourierRenderer::OnDataPipeCreatedOnMainThread( |
| scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, |
| base::WeakPtr<CourierRenderer> self, |
| openscreen::WeakPtr<RpcMessenger> rpc_messenger, |
| mojo::PendingRemote<mojom::RemotingDataStreamSender> audio, |
| mojo::PendingRemote<mojom::RemotingDataStreamSender> video, |
| mojo::ScopedDataPipeProducerHandle audio_handle, |
| mojo::ScopedDataPipeProducerHandle video_handle) { |
| media_task_runner->PostTask( |
| FROM_HERE, |
| base::BindOnce(&CourierRenderer::OnDataPipeCreated, self, |
| std::move(audio), std::move(video), |
| std::move(audio_handle), std::move(video_handle), |
| rpc_messenger ? rpc_messenger->GetUniqueHandle() |
| : RpcMessenger::kInvalidHandle, |
| rpc_messenger ? rpc_messenger->GetUniqueHandle() |
| : RpcMessenger::kInvalidHandle)); |
| } |
| |
| void CourierRenderer::OnDataPipeCreated( |
| mojo::PendingRemote<mojom::RemotingDataStreamSender> audio, |
| mojo::PendingRemote<mojom::RemotingDataStreamSender> video, |
| mojo::ScopedDataPipeProducerHandle audio_handle, |
| mojo::ScopedDataPipeProducerHandle video_handle, |
| int audio_rpc_handle, |
| int video_rpc_handle) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| |
| if (state_ == STATE_ERROR) |
| return; // Abort because something went wrong in the meantime. |
| DCHECK_EQ(state_, STATE_CREATE_PIPE); |
| DCHECK(!init_workflow_done_callback_.is_null()); |
| |
| // TODO(servolk): Add support for multiple streams. For now use the first |
| // enabled audio and video streams to preserve the existing behavior. |
| ::media::DemuxerStream* audio_demuxer_stream = |
| media_resource_->GetFirstStream(DemuxerStream::AUDIO); |
| ::media::DemuxerStream* video_demuxer_stream = |
| media_resource_->GetFirstStream(DemuxerStream::VIDEO); |
| |
| // Create audio demuxer stream adapter if audio is available. |
| if (audio_demuxer_stream && audio.is_valid() && audio_handle.is_valid() && |
| audio_rpc_handle != RpcMessenger::kInvalidHandle) { |
| audio_demuxer_stream_adapter_ = std::make_unique<DemuxerStreamAdapter>( |
| main_task_runner_, media_task_runner_, "audio", audio_demuxer_stream, |
| rpc_messenger_, audio_rpc_handle, std::move(audio), |
| std::move(audio_handle), |
| base::BindOnce(&CourierRenderer::OnFatalError, base::Unretained(this))); |
| } |
| |
| // Create video demuxer stream adapter if video is available. |
| if (video_demuxer_stream && video.is_valid() && video_handle.is_valid() && |
| video_rpc_handle != RpcMessenger::kInvalidHandle) { |
| video_demuxer_stream_adapter_ = std::make_unique<DemuxerStreamAdapter>( |
| main_task_runner_, media_task_runner_, "video", video_demuxer_stream, |
| rpc_messenger_, video_rpc_handle, std::move(video), |
| std::move(video_handle), |
| base::BindOnce(&CourierRenderer::OnFatalError, base::Unretained(this))); |
| } |
| |
| // Checks if data pipe is created successfully. |
| if (!audio_demuxer_stream_adapter_ && !video_demuxer_stream_adapter_) { |
| OnFatalError(DATA_PIPE_CREATE_ERROR); |
| return; |
| } |
| |
| state_ = STATE_ACQUIRING; |
| |
| // Issues RPC_ACQUIRE_DEMUXER RPC message. |
| auto rpc = std::make_unique<openscreen::cast::RpcMessage>(); |
| rpc->set_handle(RpcMessenger::kAcquireDemuxerHandle); |
| rpc->set_proc(openscreen::cast::RpcMessage::RPC_ACQUIRE_DEMUXER); |
| openscreen::cast::AcquireDemuxer* message = |
| rpc->mutable_acquire_demuxer_rpc(); |
| message->set_audio_demuxer_handle( |
| audio_demuxer_stream_adapter_ |
| ? audio_demuxer_stream_adapter_->rpc_handle() |
| : RpcMessenger::kInvalidHandle); |
| message->set_video_demuxer_handle( |
| video_demuxer_stream_adapter_ |
| ? video_demuxer_stream_adapter_->rpc_handle() |
| : RpcMessenger::kInvalidHandle); |
| SendRpcToRemote(std::move(rpc)); |
| |
| // Issues RPC_ACQUIRE_RENDERER RPC message. |
| rpc = std::make_unique<openscreen::cast::RpcMessage>(); |
| rpc->set_handle(RpcMessenger::kAcquireRendererHandle); |
| rpc->set_proc(openscreen::cast::RpcMessage::RPC_ACQUIRE_RENDERER); |
| rpc->set_integer_value(rpc_handle_); |
| SendRpcToRemote(std::move(rpc)); |
| } |
| |
| // static |
| void CourierRenderer::OnMessageReceivedOnMainThread( |
| scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, |
| base::WeakPtr<CourierRenderer> self, |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| if (media_task_runner) { |
| media_task_runner->PostTask( |
| FROM_HERE, base::BindOnce(&CourierRenderer::OnReceivedRpc, self, |
| std::move(message))); |
| } else { |
| LOG(WARNING) << "No valid task runner."; |
| } |
| } |
| |
| void CourierRenderer::OnReceivedRpc( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(message); |
| switch (message->proc()) { |
| case openscreen::cast::RpcMessage::RPC_ACQUIRE_RENDERER_DONE: |
| AcquireRendererDone(std::move(message)); |
| break; |
| case openscreen::cast::RpcMessage::RPC_R_INITIALIZE_CALLBACK: |
| InitializeCallback(std::move(message)); |
| break; |
| case openscreen::cast::RpcMessage::RPC_R_FLUSHUNTIL_CALLBACK: |
| FlushUntilCallback(); |
| break; |
| case openscreen::cast::RpcMessage::RPC_RC_ONTIMEUPDATE: |
| OnTimeUpdate(std::move(message)); |
| break; |
| case openscreen::cast::RpcMessage::RPC_RC_ONBUFFERINGSTATECHANGE: |
| OnBufferingStateChange(std::move(message)); |
| break; |
| case openscreen::cast::RpcMessage::RPC_RC_ONENDED: |
| client_->OnEnded(); |
| break; |
| case openscreen::cast::RpcMessage::RPC_RC_ONERROR: |
| OnFatalError(RECEIVER_PIPELINE_ERROR); |
| break; |
| case openscreen::cast::RpcMessage::RPC_RC_ONAUDIOCONFIGCHANGE: |
| OnAudioConfigChange(std::move(message)); |
| break; |
| case openscreen::cast::RpcMessage::RPC_RC_ONVIDEOCONFIGCHANGE: |
| OnVideoConfigChange(std::move(message)); |
| break; |
| case openscreen::cast::RpcMessage::RPC_RC_ONVIDEONATURALSIZECHANGE: |
| OnVideoNaturalSizeChange(std::move(message)); |
| break; |
| case openscreen::cast::RpcMessage::RPC_RC_ONVIDEOOPACITYCHANGE: |
| OnVideoOpacityChange(std::move(message)); |
| break; |
| case openscreen::cast::RpcMessage::RPC_RC_ONSTATISTICSUPDATE: |
| OnStatisticsUpdate(std::move(message)); |
| break; |
| |
| default: |
| DVLOG(1) << "Unknown RPC: " << message->proc(); |
| } |
| } |
| |
| void CourierRenderer::SendRpcToRemote( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(main_task_runner_); |
| main_task_runner_->PostTask(FROM_HERE, |
| base::BindOnce(&RpcMessenger::SendMessageToRemote, |
| rpc_messenger_, *message)); |
| } |
| |
| void CourierRenderer::AcquireRendererDone( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(message); |
| |
| remote_renderer_handle_ = message->integer_value(); |
| |
| if (state_ != STATE_ACQUIRING || init_workflow_done_callback_.is_null()) { |
| OnFatalError(PEERS_OUT_OF_SYNC); |
| return; |
| } |
| state_ = STATE_INITIALIZING; |
| |
| // Issues RPC_R_INITIALIZE RPC message to initialize renderer. |
| auto rpc = std::make_unique<openscreen::cast::RpcMessage>(); |
| rpc->set_handle(remote_renderer_handle_); |
| rpc->set_proc(openscreen::cast::RpcMessage::RPC_R_INITIALIZE); |
| openscreen::cast::RendererInitialize* init = |
| rpc->mutable_renderer_initialize_rpc(); |
| init->set_client_handle(rpc_handle_); |
| init->set_audio_demuxer_handle( |
| audio_demuxer_stream_adapter_ |
| ? audio_demuxer_stream_adapter_->rpc_handle() |
| : RpcMessenger::kInvalidHandle); |
| init->set_video_demuxer_handle( |
| video_demuxer_stream_adapter_ |
| ? video_demuxer_stream_adapter_->rpc_handle() |
| : RpcMessenger::kInvalidHandle); |
| init->set_callback_handle(rpc_handle_); |
| SendRpcToRemote(std::move(rpc)); |
| } |
| |
| void CourierRenderer::InitializeCallback( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(message); |
| |
| const bool success = message->boolean_value(); |
| if (state_ != STATE_INITIALIZING || init_workflow_done_callback_.is_null()) { |
| OnFatalError(PEERS_OUT_OF_SYNC); |
| return; |
| } |
| |
| if (!success) { |
| OnFatalError(RECEIVER_INITIALIZE_FAILED); |
| return; |
| } |
| |
| metrics_recorder_.OnRendererInitialized(); |
| |
| state_ = STATE_PLAYING; |
| |
| SetVolume(volume_); |
| std::move(init_workflow_done_callback_).Run(PIPELINE_OK); |
| } |
| |
| void CourierRenderer::FlushUntilCallback() { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| |
| if (state_ != STATE_FLUSHING || !flush_cb_) { |
| OnFatalError(PEERS_OUT_OF_SYNC); |
| return; |
| } |
| |
| state_ = STATE_PLAYING; |
| if (audio_demuxer_stream_adapter_) |
| audio_demuxer_stream_adapter_->SignalFlush(false); |
| if (video_demuxer_stream_adapter_) |
| video_demuxer_stream_adapter_->SignalFlush(false); |
| std::move(flush_cb_).Run(); |
| ResetMeasurements(); |
| } |
| |
| void CourierRenderer::OnTimeUpdate( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(message); |
| // Shutdown remoting session if receiving malformed RPC message. |
| if (!message->has_rendererclient_ontimeupdate_rpc()) { |
| OnFatalError(RPC_INVALID); |
| return; |
| } |
| const int64_t time_usec = |
| message->rendererclient_ontimeupdate_rpc().time_usec(); |
| const int64_t max_time_usec = |
| message->rendererclient_ontimeupdate_rpc().max_time_usec(); |
| // Ignores invalid time, such as negative value, or time larger than max value |
| // (usually the time stamp that all streams are pushed into AV pipeline). |
| if (time_usec < 0 || max_time_usec < 0 || time_usec > max_time_usec) |
| return; |
| |
| { |
| // Updates current time information. |
| base::AutoLock auto_lock(time_lock_); |
| current_media_time_ = base::Microseconds(time_usec); |
| current_max_time_ = base::Microseconds(max_time_usec); |
| } |
| |
| metrics_recorder_.OnEvidenceOfPlayoutAtReceiver(); |
| OnMediaTimeUpdated(); |
| } |
| |
| void CourierRenderer::OnBufferingStateChange( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(message); |
| if (!message->has_rendererclient_onbufferingstatechange_rpc()) { |
| OnFatalError(RPC_INVALID); |
| return; |
| } |
| absl::optional<BufferingState> state = ToMediaBufferingState( |
| message->rendererclient_onbufferingstatechange_rpc().state()); |
| BufferingStateChangeReason reason = BUFFERING_CHANGE_REASON_UNKNOWN; |
| if (!state.has_value()) |
| return; |
| if (state == BufferingState::BUFFERING_HAVE_NOTHING) { |
| receiver_is_blocked_on_local_demuxers_ = IsWaitingForDataFromDemuxers(); |
| reason = receiver_is_blocked_on_local_demuxers_ |
| ? DEMUXER_UNDERFLOW |
| : REMOTING_NETWORK_CONGESTION; |
| } else if (receiver_is_blocked_on_local_demuxers_) { |
| receiver_is_blocked_on_local_demuxers_ = false; |
| ResetMeasurements(); |
| } |
| |
| client_->OnBufferingStateChange(state.value(), reason); |
| } |
| |
| void CourierRenderer::OnAudioConfigChange( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(message); |
| // Shutdown remoting session if receiving malformed RPC message. |
| if (!message->has_rendererclient_onaudioconfigchange_rpc()) { |
| OnFatalError(RPC_INVALID); |
| return; |
| } |
| |
| const auto* audio_config_message = |
| message->mutable_rendererclient_onaudioconfigchange_rpc(); |
| const openscreen::cast::AudioDecoderConfig pb_audio_config = |
| audio_config_message->audio_decoder_config(); |
| AudioDecoderConfig out_audio_config; |
| ConvertProtoToAudioDecoderConfig(pb_audio_config, &out_audio_config); |
| DCHECK(out_audio_config.IsValidConfig()); |
| |
| client_->OnAudioConfigChange(out_audio_config); |
| } |
| |
| void CourierRenderer::OnVideoConfigChange( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(message); |
| // Shutdown remoting session if receiving malformed RPC message. |
| if (!message->has_rendererclient_onvideoconfigchange_rpc()) { |
| OnFatalError(RPC_INVALID); |
| return; |
| } |
| |
| const auto* video_config_message = |
| message->mutable_rendererclient_onvideoconfigchange_rpc(); |
| const openscreen::cast::VideoDecoderConfig pb_video_config = |
| video_config_message->video_decoder_config(); |
| VideoDecoderConfig out_video_config; |
| ConvertProtoToVideoDecoderConfig(pb_video_config, &out_video_config); |
| DCHECK(out_video_config.IsValidConfig()); |
| |
| client_->OnVideoConfigChange(out_video_config); |
| } |
| |
| void CourierRenderer::OnVideoNaturalSizeChange( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(message); |
| // Shutdown remoting session if receiving malformed RPC message. |
| if (!message->has_rendererclient_onvideonatualsizechange_rpc()) { |
| OnFatalError(RPC_INVALID); |
| return; |
| } |
| const auto& size_change = |
| message->rendererclient_onvideonatualsizechange_rpc(); |
| if (size_change.width() <= 0 || size_change.height() <= 0) |
| return; |
| client_->OnVideoNaturalSizeChange( |
| gfx::Size(size_change.width(), size_change.height())); |
| } |
| |
| void CourierRenderer::OnVideoOpacityChange( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(message); |
| const bool opaque = message->boolean_value(); |
| client_->OnVideoOpacityChange(opaque); |
| } |
| |
| void CourierRenderer::OnStatisticsUpdate( |
| std::unique_ptr<openscreen::cast::RpcMessage> message) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK(message); |
| // Shutdown remoting session if receiving malformed RPC message. |
| if (!message->has_rendererclient_onstatisticsupdate_rpc()) { |
| OnFatalError(RPC_INVALID); |
| return; |
| } |
| PipelineStatistics stats; |
| ConvertProtoToPipelineStatistics( |
| message->rendererclient_onstatisticsupdate_rpc(), &stats); |
| // Note: Each field in |stats| is a delta, not the aggregate amount. |
| if (stats.audio_bytes_decoded > 0 || stats.video_frames_decoded > 0 || |
| stats.video_frames_dropped > 0) { |
| metrics_recorder_.OnEvidenceOfPlayoutAtReceiver(); |
| } |
| UpdateVideoStatsQueue(stats.video_frames_decoded, stats.video_frames_dropped); |
| client_->OnStatisticsUpdate(stats); |
| } |
| |
| void CourierRenderer::OnFatalError(StopTrigger stop_trigger) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| DCHECK_NE(UNKNOWN_STOP_TRIGGER, stop_trigger); |
| |
| // If this is the first error, notify the controller. It is expected the |
| // controller will cause this renderer to shut down shortly. |
| if (state_ != STATE_ERROR) { |
| state_ = STATE_ERROR; |
| main_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&RendererController::OnRendererFatalError, |
| controller_, stop_trigger)); |
| } |
| |
| data_flow_poll_timer_.Stop(); |
| |
| // This renderer will be shut down shortly. To prevent breaking the pipeline, |
| // just run the callback without reporting error. |
| if (!init_workflow_done_callback_.is_null()) { |
| std::move(init_workflow_done_callback_).Run(PIPELINE_OK); |
| return; |
| } |
| |
| if (flush_cb_) |
| std::move(flush_cb_).Run(); |
| } |
| |
| void CourierRenderer::OnMediaTimeUpdated() { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| if (flush_cb_) |
| return; // Don't manage and check the queue when Flush() is on-going. |
| if (receiver_is_blocked_on_local_demuxers_) |
| return; // Don't manage and check the queue when buffering is on-going. |
| |
| base::TimeTicks current_time = clock_->NowTicks(); |
| if (current_time < ignore_updates_until_time_) |
| return; // Not stable yet. |
| |
| media_time_queue_.push_back( |
| std::make_pair(current_time, current_media_time_)); |
| base::TimeDelta window_duration = |
| current_time - media_time_queue_.front().first; |
| if (window_duration < kTrackingWindow) |
| return; // Not enough data to make a reliable decision. |
| |
| base::TimeDelta media_duration = |
| media_time_queue_.back().second - media_time_queue_.front().second; |
| base::TimeDelta update_duration = |
| (media_time_queue_.back().first - media_time_queue_.front().first) * |
| playback_rate_; |
| if ((media_duration - update_duration).magnitude() >= |
| kMediaPlaybackDelayThreshold) { |
| ++times_playback_delayed_; |
| if (times_playback_delayed_ == kPlaybackDelayCountThreshold) |
| OnFatalError(PACING_TOO_SLOWLY); |
| } else { |
| times_playback_delayed_ = 0; |
| } |
| |
| // Prune |media_time_queue_|. |
| while (media_time_queue_.back().first - media_time_queue_.front().first >= |
| kTrackingWindow) |
| media_time_queue_.pop_front(); |
| } |
| |
| void CourierRenderer::UpdateVideoStatsQueue(int video_frames_decoded, |
| int video_frames_dropped) { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| if (flush_cb_) |
| return; // Don't manage and check the queue when Flush() is on-going. |
| |
| if (!stats_updated_) { |
| if (video_frames_decoded) |
| stats_updated_ = true; |
| // Ignore the first stats since it may include the information during |
| // unstable period. |
| return; |
| } |
| |
| base::TimeTicks current_time = clock_->NowTicks(); |
| if (current_time < ignore_updates_until_time_) |
| return; // Not stable yet. |
| |
| video_stats_queue_.push_back(std::make_tuple( |
| current_time, video_frames_decoded, video_frames_dropped)); |
| sum_video_frames_decoded_ += video_frames_decoded; |
| sum_video_frames_dropped_ += video_frames_dropped; |
| base::TimeDelta window_duration = |
| current_time - std::get<0>(video_stats_queue_.front()); |
| if (window_duration < kTrackingWindow) |
| return; // Not enough data to make a reliable decision. |
| |
| if (sum_video_frames_decoded_ && |
| sum_video_frames_dropped_ * 100 > |
| sum_video_frames_decoded_ * kMaxNumVideoFramesDroppedPercentage) { |
| OnFatalError(FRAME_DROP_RATE_HIGH); |
| } |
| // Prune |video_stats_queue_|. |
| while (std::get<0>(video_stats_queue_.back()) - |
| std::get<0>(video_stats_queue_.front()) >= |
| kTrackingWindow) { |
| sum_video_frames_decoded_ -= std::get<1>(video_stats_queue_.front()); |
| sum_video_frames_dropped_ -= std::get<2>(video_stats_queue_.front()); |
| video_stats_queue_.pop_front(); |
| } |
| } |
| |
| void CourierRenderer::ResetMeasurements() { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| media_time_queue_.clear(); |
| video_stats_queue_.clear(); |
| sum_video_frames_dropped_ = 0; |
| sum_video_frames_decoded_ = 0; |
| stats_updated_ = false; |
| ignore_updates_until_time_ = clock_->NowTicks() + kStabilizationPeriod; |
| |
| if (state_ != STATE_ERROR && |
| (audio_demuxer_stream_adapter_ || video_demuxer_stream_adapter_)) { |
| data_flow_poll_timer_.Start(FROM_HERE, kDataFlowPollPeriod, this, |
| &CourierRenderer::MeasureAndRecordDataRates); |
| } |
| } |
| |
| void CourierRenderer::MeasureAndRecordDataRates() { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| |
| // Whenever media is first started or flushed/seeked, there is a "burst |
| // bufferring" period as the remote device rapidly fills its buffer before |
| // resuming playback. Since the goal here is to measure the sustained content |
| // bitrates, ignore the byte counts the first time since the last |
| // ResetMeasurements() call. |
| const base::TimeTicks current_time = clock_->NowTicks(); |
| if (current_time < ignore_updates_until_time_ + kDataFlowPollPeriod) { |
| if (audio_demuxer_stream_adapter_) |
| audio_demuxer_stream_adapter_->GetBytesWrittenAndReset(); |
| if (video_demuxer_stream_adapter_) |
| video_demuxer_stream_adapter_->GetBytesWrittenAndReset(); |
| return; |
| } |
| |
| const int kBytesPerKilobit = 1024 / 8; |
| if (audio_demuxer_stream_adapter_) { |
| const double kilobits_per_second = |
| (audio_demuxer_stream_adapter_->GetBytesWrittenAndReset() / |
| kDataFlowPollPeriod.InSecondsF()) / |
| kBytesPerKilobit; |
| DCHECK_GE(kilobits_per_second, 0); |
| const base::CheckedNumeric<int> checked_kbps = kilobits_per_second; |
| metrics_recorder_.OnAudioRateEstimate( |
| checked_kbps.ValueOrDefault(std::numeric_limits<int>::max())); |
| } |
| if (video_demuxer_stream_adapter_) { |
| const double kilobits_per_second = |
| (video_demuxer_stream_adapter_->GetBytesWrittenAndReset() / |
| kDataFlowPollPeriod.InSecondsF()) / |
| kBytesPerKilobit; |
| DCHECK_GE(kilobits_per_second, 0); |
| const base::CheckedNumeric<int> checked_kbps = kilobits_per_second; |
| metrics_recorder_.OnVideoRateEstimate( |
| checked_kbps.ValueOrDefault(std::numeric_limits<int>::max())); |
| } |
| } |
| |
| bool CourierRenderer::IsWaitingForDataFromDemuxers() const { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| return ((video_demuxer_stream_adapter_ && |
| video_demuxer_stream_adapter_->is_processing_read_request() && |
| !video_demuxer_stream_adapter_->is_data_pending()) || |
| (audio_demuxer_stream_adapter_ && |
| audio_demuxer_stream_adapter_->is_processing_read_request() && |
| !audio_demuxer_stream_adapter_->is_data_pending())); |
| } |
| |
| void CourierRenderer::DeregisterFromRpcMessaging() { |
| DCHECK(media_task_runner_->BelongsToCurrentThread()); |
| if (rpc_messenger_) { |
| rpc_messenger_->UnregisterMessageReceiverCallback(rpc_handle_); |
| } |
| } |
| |
| } // namespace remoting |
| } // namespace media |