blob: c404fa20f6a73696f2f898636db11454509a5bfb [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/remoting/demuxer_stream_adapter.h"
#include <utility>
#include "base/base64.h"
#include "base/bind.h"
#include "base/callback_helpers.h"
#include "base/single_thread_task_runner.h"
#include "media/base/bind_to_current_loop.h"
#include "media/base/decoder_buffer.h"
#include "media/base/timestamp_constants.h"
#include "media/remoting/proto_enum_utils.h"
#include "media/remoting/proto_utils.h"
// Convenience logging macro used throughout this file.
#define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: "
using openscreen::cast::RpcMessenger;
namespace media {
namespace remoting {
DemuxerStreamAdapter::DemuxerStreamAdapter(
scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
scoped_refptr<base::SingleThreadTaskRunner> media_task_runner,
const std::string& name,
DemuxerStream* demuxer_stream,
const openscreen::WeakPtr<RpcMessenger>& rpc_messenger,
int rpc_handle,
mojo::PendingRemote<mojom::RemotingDataStreamSender> stream_sender_remote,
mojo::ScopedDataPipeProducerHandle producer_handle,
ErrorCallback error_callback)
: main_task_runner_(std::move(main_task_runner)),
media_task_runner_(std::move(media_task_runner)),
name_(name),
rpc_messenger_(rpc_messenger),
rpc_handle_(rpc_handle),
demuxer_stream_(demuxer_stream),
type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN),
error_callback_(std::move(error_callback)),
remote_callback_handle_(RpcMessenger::kInvalidHandle),
read_until_callback_handle_(RpcMessenger::kInvalidHandle),
read_until_count_(0),
last_count_(0),
pending_flush_(false),
pending_frame_is_eos_(false),
media_status_(DemuxerStream::kOk),
data_pipe_writer_(std::move(producer_handle)),
bytes_written_to_pipe_(0) {
DCHECK(main_task_runner_);
DCHECK(media_task_runner_);
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(demuxer_stream);
DCHECK(!error_callback_.is_null());
auto receive_callback = BindToCurrentLoop(base::BindRepeating(
&DemuxerStreamAdapter::OnReceivedRpc, weak_factory_.GetWeakPtr()));
main_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
&RpcMessenger::RegisterMessageReceiverCallback, rpc_messenger_,
rpc_handle_,
[cb = receive_callback](
std::unique_ptr<openscreen::cast::RpcMessage> message) {
cb.Run(std::move(message));
}));
stream_sender_.Bind(std::move(stream_sender_remote));
stream_sender_.set_disconnect_handler(
base::BindOnce(&DemuxerStreamAdapter::OnFatalError,
weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR));
}
DemuxerStreamAdapter::~DemuxerStreamAdapter() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
main_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&DemuxerStreamAdapter::DeregisterFromRpcMessaging,
weak_factory_.GetWeakPtr()));
}
int64_t DemuxerStreamAdapter::GetBytesWrittenAndReset() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
const int64_t current_count = bytes_written_to_pipe_;
bytes_written_to_pipe_ = 0;
return current_count;
}
absl::optional<uint32_t> DemuxerStreamAdapter::SignalFlush(bool flushing) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DEMUXER_VLOG(2) << "flushing=" << flushing;
// Ignores if |pending_flush_| states is same.
if (pending_flush_ == flushing)
return absl::nullopt;
// Cleans up pending frame data.
pending_frame_is_eos_ = false;
// Invalidates pending Read() tasks.
request_buffer_weak_factory_.InvalidateWeakPtrs();
// Cancels in flight data in browser process.
pending_flush_ = flushing;
if (flushing) {
stream_sender_->CancelInFlightData();
} else {
// Sets callback handle invalid to abort ongoing read request.
read_until_callback_handle_ = RpcMessenger::kInvalidHandle;
}
return last_count_;
}
void DemuxerStreamAdapter::OnReceivedRpc(
std::unique_ptr<openscreen::cast::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
DCHECK(rpc_handle_ == message->handle());
switch (message->proc()) {
case openscreen::cast::RpcMessage::RPC_DS_INITIALIZE:
Initialize(message->integer_value());
break;
case openscreen::cast::RpcMessage::RPC_DS_READUNTIL:
ReadUntil(std::move(message));
break;
case openscreen::cast::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER:
EnableBitstreamConverter();
break;
case openscreen::cast::RpcMessage::RPC_DS_ONERROR:
OnFatalError(UNEXPECTED_FAILURE);
break;
default:
DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc();
}
}
void DemuxerStreamAdapter::Initialize(int remote_callback_handle) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(!pending_flush_);
DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle="
<< remote_callback_handle;
// Checks if initialization had been called or not.
if (remote_callback_handle_ != RpcMessenger::kInvalidHandle) {
DEMUXER_VLOG(1) << "Duplicated initialization. Have: "
<< remote_callback_handle_
<< ", Given: " << remote_callback_handle;
// Shuts down data pipe if available if providing different remote callback
// handle for initialization. Otherwise, just silently ignore the duplicated
// request.
if (remote_callback_handle_ != remote_callback_handle) {
OnFatalError(PEERS_OUT_OF_SYNC);
}
return;
}
remote_callback_handle_ = remote_callback_handle;
// Issues RPC_DS_INITIALIZE_CALLBACK RPC message.
std::unique_ptr<openscreen::cast::RpcMessage> rpc(
new openscreen::cast::RpcMessage());
rpc->set_handle(remote_callback_handle_);
rpc->set_proc(openscreen::cast::RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc();
init_cb_message->set_type(type_);
switch (type_) {
case DemuxerStream::Type::AUDIO: {
audio_config_ = demuxer_stream_->audio_decoder_config();
openscreen::cast::AudioDecoderConfig* audio_message =
init_cb_message->mutable_audio_decoder_config();
ConvertAudioDecoderConfigToProto(audio_config_, audio_message);
break;
}
case DemuxerStream::Type::VIDEO: {
video_config_ = demuxer_stream_->video_decoder_config();
openscreen::cast::VideoDecoderConfig* video_message =
init_cb_message->mutable_video_decoder_config();
ConvertVideoDecoderConfigToProto(video_config_, video_message);
break;
}
default:
NOTREACHED();
}
DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle()
<< " with decoder_config={"
<< (type_ == DemuxerStream::Type::AUDIO
? audio_config_.AsHumanReadableString()
: video_config_.AsHumanReadableString())
<< '}';
main_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&RpcMessenger::SendMessageToRemote, rpc_messenger_, *rpc));
}
void DemuxerStreamAdapter::ReadUntil(
std::unique_ptr<openscreen::cast::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
if (!message->has_demuxerstream_readuntil_rpc()) {
DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC";
OnFatalError(RPC_INVALID);
return;
}
const openscreen::cast::DemuxerStreamReadUntil& rpc_message =
message->demuxerstream_readuntil_rpc();
DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle="
<< rpc_message.callback_handle()
<< ", count=" << rpc_message.count();
if (pending_flush_) {
DEMUXER_VLOG(2) << "Skip actions since it's in the flushing state";
return;
}
if (is_processing_read_request()) {
DEMUXER_VLOG(2) << "Ignore read request while it's in the reading state.";
return;
}
if (rpc_message.count() <= last_count_) {
DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to "
"current frame count";
return;
}
read_until_count_ = rpc_message.count();
read_until_callback_handle_ = rpc_message.callback_handle();
RequestBuffer();
}
void DemuxerStreamAdapter::EnableBitstreamConverter() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER";
#if BUILDFLAG(USE_PROPRIETARY_CODECS)
demuxer_stream_->EnableBitstreamConverter();
#else
DEMUXER_VLOG(1) << "Ignoring EnableBitstreamConverter() RPC: Proprietary "
"codecs not enabled in this Chromium build.";
#endif
}
void DemuxerStreamAdapter::RequestBuffer() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (!is_processing_read_request() || pending_flush_) {
DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
return;
}
demuxer_stream_->Read(
base::BindOnce(&DemuxerStreamAdapter::OnNewBuffer,
request_buffer_weak_factory_.GetWeakPtr()));
}
void DemuxerStreamAdapter::OnNewBuffer(DemuxerStream::Status status,
scoped_refptr<DecoderBuffer> input) {
DEMUXER_VLOG(3) << "status=" << status;
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (!is_processing_read_request() || pending_flush_) {
DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
return;
}
switch (status) {
case DemuxerStream::kAborted:
DCHECK(!input);
SendReadAck();
return;
case DemuxerStream::kError:
// Currently kError can only happen because of DECRYPTION_ERROR.
OnFatalError(DECRYPTION_ERROR);
return;
case DemuxerStream::kConfigChanged:
// TODO(erickung): Notify controller of new decoder config, just in case
// that will require remoting to be shutdown (due to known
// lack-of-support).
// Stores available audio/video decoder config and issues
// RPC_DS_READUNTIL_CALLBACK RPC to notify receiver.
DCHECK(!input);
media_status_ = status;
if (demuxer_stream_->type() == DemuxerStream::VIDEO)
video_config_ = demuxer_stream_->video_decoder_config();
if (demuxer_stream_->type() == DemuxerStream::AUDIO)
audio_config_ = demuxer_stream_->audio_decoder_config();
SendReadAck();
return;
case DemuxerStream::kOk: {
media_status_ = status;
DCHECK(pending_frame_.empty());
if (!data_pipe_writer_.IsPipeValid())
return; // Do not start sending (due to previous fatal error).
pending_frame_ = DecoderBufferToByteArray(*input);
pending_frame_is_eos_ = input->end_of_stream();
WriteFrame();
} break;
}
}
void DemuxerStreamAdapter::WriteFrame() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(!pending_flush_);
DCHECK(is_processing_read_request());
DCHECK(!pending_frame_.empty());
if (!stream_sender_ || !data_pipe_writer_.IsPipeValid()) {
DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid";
return;
}
stream_sender_->SendFrame(pending_frame_.size());
data_pipe_writer_.Write(pending_frame_.data(), pending_frame_.size(),
base::BindOnce(&DemuxerStreamAdapter::OnFrameWritten,
base::Unretained(this)));
}
void DemuxerStreamAdapter::OnFrameWritten(bool success) {
if (!success) {
OnFatalError(MOJO_PIPE_ERROR);
return;
}
bytes_written_to_pipe_ += pending_frame_.size();
// Resets frame buffer variables.
bool pending_frame_is_eos = pending_frame_is_eos_;
++last_count_;
ResetPendingFrame();
// Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message.
if (read_until_count_ == last_count_ || pending_frame_is_eos) {
SendReadAck();
return;
}
// Contiune to read decoder buffer until reaching |read_until_count_| or
// end of stream.
media_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&DemuxerStreamAdapter::RequestBuffer,
weak_factory_.GetWeakPtr()));
}
void DemuxerStreamAdapter::SendReadAck() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DEMUXER_VLOG(3) << "last_count_=" << last_count_
<< ", remote_read_callback_handle="
<< read_until_callback_handle_
<< ", media_status=" << media_status_;
// Issues RPC_DS_READUNTIL_CALLBACK RPC message.
std::unique_ptr<openscreen::cast::RpcMessage> rpc(
new openscreen::cast::RpcMessage());
rpc->set_handle(read_until_callback_handle_);
rpc->set_proc(openscreen::cast::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc();
message->set_count(last_count_);
message->set_status(ToProtoDemuxerStreamStatus(media_status_).value());
if (media_status_ == DemuxerStream::kConfigChanged) {
if (audio_config_.IsValidConfig()) {
openscreen::cast::AudioDecoderConfig* audio_message =
message->mutable_audio_decoder_config();
ConvertAudioDecoderConfigToProto(audio_config_, audio_message);
} else if (video_config_.IsValidConfig()) {
openscreen::cast::VideoDecoderConfig* video_message =
message->mutable_video_decoder_config();
ConvertVideoDecoderConfigToProto(video_config_, video_message);
} else {
NOTREACHED();
}
}
DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle()
<< " with count=" << message->count()
<< ", status=" << message->status() << ", decoder_config={"
<< (audio_config_.IsValidConfig()
? audio_config_.AsHumanReadableString()
: video_config_.IsValidConfig()
? video_config_.AsHumanReadableString()
: "DID NOT CHANGE")
<< '}';
main_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&RpcMessenger::SendMessageToRemote, rpc_messenger_, *rpc));
// Resets callback handle after completing the reading request.
read_until_callback_handle_ = RpcMessenger::kInvalidHandle;
// Resets audio/video decoder config since it only sends once.
if (audio_config_.IsValidConfig())
audio_config_ = AudioDecoderConfig();
if (video_config_.IsValidConfig())
video_config_ = VideoDecoderConfig();
}
void DemuxerStreamAdapter::ResetPendingFrame() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
pending_frame_.clear();
pending_frame_is_eos_ = false;
}
void DemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger;
if (error_callback_.is_null())
return;
data_pipe_writer_.Close();
std::move(error_callback_).Run(stop_trigger);
}
void DemuxerStreamAdapter::DeregisterFromRpcMessaging() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (rpc_messenger_) {
rpc_messenger_->UnregisterMessageReceiverCallback(rpc_handle_);
}
}
} // namespace remoting
} // namespace media