blob: 3d188886c959b12d96703bed168872609ba305e9 [file] [log] [blame]
/*
* Copyright 2012 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "media/filters/shell_demuxer.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/callback_helpers.h"
#include "base/debug/trace_event.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop.h"
#include "base/stringprintf.h"
#include "base/task_runner_util.h"
#include "base/time.h"
#include "media/base/bind_to_loop.h"
#include "media/base/data_source.h"
#include "media/base/shell_media_platform.h"
#include <inttypes.h>
namespace media {
ShellDemuxerStream::ShellDemuxerStream(ShellDemuxer* demuxer, Type type)
: demuxer_(demuxer),
type_(type),
last_buffer_timestamp_(kNoTimestamp()),
stopped_(false) {
TRACE_EVENT0("media_stack", "ShellDemuxerStream::ShellDemuxerStream()");
DCHECK(demuxer_);
}
bool ShellDemuxerStream::StreamWasEncrypted() const {
if (type_ == VIDEO)
return demuxer_->VideoConfig().is_encrypted();
else if (type_ == AUDIO)
return demuxer_->AudioConfig().is_encrypted();
NOTREACHED();
return false;
}
void ShellDemuxerStream::Read(const ReadCB& read_cb) {
TRACE_EVENT0("media_stack", "ShellDemuxerStream::Read()");
DCHECK(!read_cb.is_null());
base::AutoLock auto_lock(lock_);
// Don't accept any additional reads if we've been told to stop.
// The demuxer_ may have been destroyed in the pipleine thread.
if (stopped_) {
TRACE_EVENT0("media_stack", "ShellDemuxerStream::Read() EOS sent.");
read_cb.Run(DemuxerStream::kOk,
scoped_refptr<DecoderBuffer>(
DecoderBuffer::CreateEOSBuffer(kNoTimestamp())));
return;
}
// Buffers are only queued when there are no pending reads.
DCHECK(buffer_queue_.empty() || read_queue_.empty());
if (!buffer_queue_.empty()) {
// Send the oldest buffer back.
scoped_refptr<DecoderBuffer> buffer = buffer_queue_.front();
if (buffer->IsEndOfStream()) {
TRACE_EVENT0("media_stack", "ShellDemuxerStream::Read() EOS sent.");
} else {
// Do not pop EOS buffers, so that subsequent read requests also get EOS
buffer_queue_.pop_front();
}
read_cb.Run(
DemuxerStream::kOk,
ShellMediaPlatform::Instance()->ProcessBeforeLeavingDemuxer(buffer));
} else {
TRACE_EVENT0("media_stack", "ShellDemuxerStream::Read() request queued.");
read_queue_.push_back(read_cb);
}
}
const AudioDecoderConfig& ShellDemuxerStream::audio_decoder_config() {
return demuxer_->AudioConfig();
}
const VideoDecoderConfig& ShellDemuxerStream::video_decoder_config() {
return demuxer_->VideoConfig();
}
Ranges<base::TimeDelta> ShellDemuxerStream::GetBufferedRanges() {
base::AutoLock auto_lock(lock_);
return buffered_ranges_;
}
DemuxerStream::Type ShellDemuxerStream::type() {
return type_;
}
void ShellDemuxerStream::EnableBitstreamConverter() {
NOTIMPLEMENTED();
}
void ShellDemuxerStream::EnqueueBuffer(scoped_refptr<DecoderBuffer> buffer) {
TRACE_EVENT1("media_stack", "ShellDemuxerStream::EnqueueBuffer()",
"timestamp", buffer->GetTimestamp().InMicroseconds());
base::AutoLock auto_lock(lock_);
if (stopped_) {
// it's possible due to pipelining both downstream and within the
// demuxer that several pipelined reads will be enqueuing packets
// on a stopped stream. Drop them after complaining.
DLOG(WARNING) << "attempted to enqueue packet on stopped stream";
return;
}
if (buffer->IsEndOfStream()) {
TRACE_EVENT0("media_stack",
"ShellDemuxerStream::EnqueueBuffer() EOS received.");
} else if (buffer->GetTimestamp() != kNoTimestamp()) {
if (last_buffer_timestamp_ != kNoTimestamp() &&
last_buffer_timestamp_ < buffer->GetTimestamp()) {
buffered_ranges_.Add(last_buffer_timestamp_, buffer->GetTimestamp());
}
last_buffer_timestamp_ = buffer->GetTimestamp();
} else {
DLOG(WARNING) << "bad timestamp info on enqueued buffer.";
}
// Check for any already waiting reads, service oldest read if there
if (read_queue_.size()) {
// assumption here is that buffer queue is empty
DCHECK_EQ(buffer_queue_.size(), 0);
ReadCB read_cb(read_queue_.front());
read_queue_.pop_front();
read_cb.Run(
DemuxerStream::kOk,
ShellMediaPlatform::Instance()->ProcessBeforeLeavingDemuxer(buffer));
} else {
// save the buffer for next read request
buffer_queue_.push_back(buffer);
}
}
base::TimeDelta ShellDemuxerStream::GetLastBufferTimestamp() const {
base::AutoLock auto_lock(lock_);
return last_buffer_timestamp_;
}
void ShellDemuxerStream::FlushBuffers() {
TRACE_EVENT0("media_stack", "ShellDemuxerStream::FlushBuffers()");
base::AutoLock auto_lock(lock_);
// TODO: Investigate if the following warning is valid.
DLOG_IF(WARNING, !read_queue_.empty()) << "Read requests should be empty";
buffer_queue_.clear();
last_buffer_timestamp_ = kNoTimestamp();
}
void ShellDemuxerStream::Stop() {
TRACE_EVENT0("media_stack", "ShellDemuxerStream::Stop()");
DCHECK(demuxer_->MessageLoopBelongsToCurrentThread());
base::AutoLock auto_lock(lock_);
buffer_queue_.clear();
last_buffer_timestamp_ = kNoTimestamp();
// fulfill any pending callbacks with EOS buffers set to end timestamp
for (ReadQueue::iterator it = read_queue_.begin(); it != read_queue_.end();
++it) {
TRACE_EVENT0("media_stack", "ShellDemuxerStream::Stop() EOS sent.");
it->Run(DemuxerStream::kOk,
scoped_refptr<DecoderBuffer>(
DecoderBuffer::CreateEOSBuffer(kNoTimestamp())));
}
read_queue_.clear();
stopped_ = true;
}
//
// ShellDemuxer
//
ShellDemuxer::ShellDemuxer(
const scoped_refptr<base::MessageLoopProxy>& message_loop,
const scoped_refptr<DataSource>& data_source)
: message_loop_(message_loop),
host_(NULL),
blocking_thread_("ShellDemuxerBlockingThread"),
data_source_(data_source),
stopped_(false),
flushing_(false),
audio_reached_eos_(false),
video_reached_eos_(false) {
DCHECK(data_source_);
DCHECK(message_loop_);
reader_ = new ShellDataSourceReader();
reader_->SetDataSource(data_source_);
}
ShellDemuxer::~ShellDemuxer() {}
void ShellDemuxer::Initialize(DemuxerHost* host,
const PipelineStatusCB& status_cb) {
TRACE_EVENT0("media_stack", "ShellDemuxer::Initialize()");
DCHECK(MessageLoopBelongsToCurrentThread());
DCHECK(reader_);
DCHECK(!parser_);
DLOG(INFO) << "this is a PROGRESSIVE playback.";
host_ = host;
data_source_->set_host(host);
// create audio and video demuxer stream objects
audio_demuxer_stream_ = new ShellDemuxerStream(this, DemuxerStream::AUDIO);
video_demuxer_stream_ = new ShellDemuxerStream(this, DemuxerStream::VIDEO);
// start the blocking thread and have it download and parse the media config
if (!blocking_thread_.Start()) {
status_cb.Run(DEMUXER_ERROR_COULD_NOT_PARSE);
return;
}
base::PostTaskAndReplyWithResult(
blocking_thread_.message_loop_proxy(), FROM_HERE,
base::Bind(&ShellDemuxer::ParseConfigBlocking, this, status_cb),
base::Bind(&ShellDemuxer::ParseConfigDone, this, status_cb));
}
PipelineStatus ShellDemuxer::ParseConfigBlocking(
const PipelineStatusCB& status_cb) {
DCHECK(blocking_thread_.message_loop_proxy()->BelongsToCurrentThread());
DCHECK(!parser_);
// construct stream parser with error callback
PipelineStatus status = ShellParser::Construct(reader_, &parser_);
// if we can't construct a parser for this stream it's a fatal error, return
// false so ParseConfigDone will notify the caller to Initialize() via
// status_cb.
if (!parser_ || status != PIPELINE_OK) {
DCHECK(!parser_);
DCHECK_NE(status, PIPELINE_OK);
if (status == PIPELINE_OK) {
status = DEMUXER_ERROR_COULD_NOT_PARSE;
}
return status;
}
// instruct the parser to extract audio and video config from the file
if (!parser_->ParseConfig()) {
return DEMUXER_ERROR_COULD_NOT_PARSE;
}
// make sure we got a valid and complete configuration
if (!parser_->IsConfigComplete()) {
return DEMUXER_ERROR_COULD_NOT_PARSE;
}
// IsConfigComplete() should guarantee we know the duration
DCHECK(parser_->Duration() != kInfiniteDuration());
host_->SetDuration(parser_->Duration());
// Bitrate may not be known, however
uint32 bitrate = parser_->BitsPerSecond();
if (bitrate > 0) {
data_source_->SetBitrate(bitrate);
}
// successful parse of config data, inform the nonblocking demuxer thread
DCHECK_EQ(status, PIPELINE_OK);
return PIPELINE_OK;
}
void ShellDemuxer::ParseConfigDone(const PipelineStatusCB& status_cb,
PipelineStatus status) {
DCHECK(MessageLoopBelongsToCurrentThread());
// if the blocking parser thread cannot parse config we're done.
if (status != PIPELINE_OK) {
status_cb.Run(status);
return;
}
DCHECK(parser_);
// start downloading data
Request(DemuxerStream::AUDIO);
status_cb.Run(PIPELINE_OK);
}
void ShellDemuxer::Request(DemuxerStream::Type type) {
// post task to our blocking thread
blocking_thread_.message_loop_proxy()->PostTask(
FROM_HERE, base::Bind(&ShellDemuxer::RequestTask, this, type));
}
void ShellDemuxer::RequestTask(DemuxerStream::Type type) {
DCHECK(blocking_thread_.message_loop_proxy()->BelongsToCurrentThread());
DCHECK(!requested_au_) << "overlapping requests not supported!";
flushing_ = false;
// Ask parser for next AU
scoped_refptr<ShellAU> au = parser_->GetNextAU(type);
// fatal parsing error returns NULL or malformed AU
if (!au || !au->IsValid()) {
if (!stopped_) {
DLOG(ERROR) << "got back bad AU from parser";
host_->OnDemuxerError(DEMUXER_ERROR_COULD_NOT_PARSE);
}
return;
}
// make sure we got back an AU of the correct type
DCHECK(au->GetType() == type);
const char* ALLOW_UNUSED event_type =
type == DemuxerStream::AUDIO ? "audio" : "video";
TRACE_EVENT2("media_stack", "ShellDemuxer::RequestTask()", "type", event_type,
"timestamp", au->GetTimestamp().InMicroseconds());
// don't issue allocation requests for EOS AUs
if (au->IsEndOfStream()) {
TRACE_EVENT0("media_stack", "ShellDemuxer::RequestTask() EOS sent");
// enqueue EOS buffer with correct stream
scoped_refptr<DecoderBuffer> eos_buffer =
DecoderBuffer::CreateEOSBuffer(au->GetTimestamp());
if (type == DemuxerStream::AUDIO) {
audio_reached_eos_ = true;
audio_demuxer_stream_->EnqueueBuffer(eos_buffer);
} else if (type == DemuxerStream::VIDEO) {
video_reached_eos_ = true;
video_demuxer_stream_->EnqueueBuffer(eos_buffer);
}
IssueNextRequestTask();
return;
}
// enqueue the request
requested_au_ = au;
// AllocateBuffer will return false if the requested size is larger
// than the maximum limit for a single buffer.
if (!ShellBufferFactory::Instance()->AllocateBuffer(
au->GetMaxSize(), au->IsKeyframe(),
base::Bind(&ShellDemuxer::BufferAllocated, this))) {
DLOG(ERROR) << "buffer allocation failed.";
host_->OnDemuxerError(PIPELINE_ERROR_COULD_NOT_RENDER);
return;
}
}
// callback from ShellBufferAllocated, post a task to the blocking thread
void ShellDemuxer::BufferAllocated(scoped_refptr<DecoderBuffer> buffer) {
if (!stopped_) {
blocking_thread_.message_loop_proxy()->PostTask(
FROM_HERE, base::Bind(&ShellDemuxer::DownloadTask, this, buffer));
}
}
void ShellDemuxer::DownloadTask(scoped_refptr<DecoderBuffer> buffer) {
DCHECK(blocking_thread_.message_loop_proxy()->BelongsToCurrentThread());
// We need a requested_au_ or to have canceled this request and
// are buffering to a new location for this to make sense
DCHECK(requested_au_);
const char* ALLOW_UNUSED event_type =
requested_au_->GetType() == DemuxerStream::AUDIO ? "audio" : "video";
TRACE_EVENT2("media_stack", "ShellDemuxer::DownloadTask()", "type",
event_type, "timestamp",
requested_au_->GetTimestamp().InMicroseconds());
// do nothing if stopped
if (stopped_) {
DLOG(INFO) << "aborting download task, stopped";
return;
}
// Flushing is a signal to restart the request->download cycle with
// a new request. Drop current request and issue a new one.
// flushing_ will be reset by the next call to RequestTask()
if (flushing_) {
DLOG(INFO) << "skipped AU download due to flush";
requested_au_ = NULL;
IssueNextRequestTask();
return;
}
if (!requested_au_->Read(reader_, buffer)) {
DLOG(ERROR) << "au read failed";
host_->OnDemuxerError(PIPELINE_ERROR_READ);
return;
}
// copy timestamp and duration values
buffer->SetTimestamp(requested_au_->GetTimestamp());
buffer->SetDuration(requested_au_->GetDuration());
// enqueue buffer into appropriate stream
if (requested_au_->GetType() == DemuxerStream::AUDIO) {
audio_demuxer_stream_->EnqueueBuffer(buffer);
} else if (requested_au_->GetType() == DemuxerStream::VIDEO) {
video_demuxer_stream_->EnqueueBuffer(buffer);
} else {
NOTREACHED() << "invalid buffer type enqueued";
}
// finished with this au, deref
requested_au_ = NULL;
// Calculate total range of buffered data for both audio and video.
Ranges<base::TimeDelta> buffered(
audio_demuxer_stream_->GetBufferedRanges().IntersectionWith(
video_demuxer_stream_->GetBufferedRanges()));
// Notify host of each disjoint range.
for (size_t i = 0; i < buffered.size(); ++i) {
host_->AddBufferedTimeRange(buffered.start(i), buffered.end(i));
}
IssueNextRequestTask();
}
void ShellDemuxer::IssueNextRequestTask() {
DCHECK(!requested_au_);
// if we're stopped don't download anymore
if (stopped_) {
DLOG(INFO) << "stopped so request loop is stopping";
return;
}
// if we have eos in one or both buffers the decision is easy
if (audio_reached_eos_ || video_reached_eos_) {
if (audio_reached_eos_) {
if (video_reached_eos_) {
// both are true, issue no more requests!
DLOG(INFO) << "both streams at EOS, request loop stopping";
return;
} else {
// audio is at eos, video isn't, get more video
Request(DemuxerStream::VIDEO);
}
} else {
// audio is not at eos, video is, get more audio
Request(DemuxerStream::AUDIO);
}
return;
}
// priority order for figuring out what to download next
base::TimeDelta audio_stamp = audio_demuxer_stream_->GetLastBufferTimestamp();
base::TimeDelta video_stamp = video_demuxer_stream_->GetLastBufferTimestamp();
// if the audio demuxer stream is empty, always fill it first
if (audio_stamp == kNoTimestamp()) {
Request(DemuxerStream::AUDIO);
} else if (video_stamp == kNoTimestamp()) {
// the video demuxer stream is empty, we need data for it
Request(DemuxerStream::VIDEO);
} else if (video_stamp < audio_stamp) {
// video is earlier, fill it first
Request(DemuxerStream::VIDEO);
} else {
Request(DemuxerStream::AUDIO);
}
}
void ShellDemuxer::Stop(const base::Closure& callback) {
DCHECK(MessageLoopBelongsToCurrentThread());
// set our internal stop flag, to not treat read failures as
// errors anymore but as a natural part of stopping
stopped_ = true;
// stop the reader, which will stop the datasource and call back
reader_->Stop(base::Bind(&ShellDemuxer::DataSourceStopped, this, callback));
}
void ShellDemuxer::DataSourceStopped(const base::Closure& callback) {
TRACE_EVENT0("media_stack", "ShellDemuxer::DataSourceStopped()");
DCHECK(MessageLoopBelongsToCurrentThread());
// stop the download thread
blocking_thread_.Stop();
// tell downstream we've stopped
if (audio_demuxer_stream_)
audio_demuxer_stream_->Stop();
if (video_demuxer_stream_)
video_demuxer_stream_->Stop();
callback.Run();
}
void ShellDemuxer::Seek(base::TimeDelta time, const PipelineStatusCB& cb) {
blocking_thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&ShellDemuxer::SeekTask, this, time, BindToCurrentLoop(cb)));
}
// runs on blocking thread
void ShellDemuxer::SeekTask(base::TimeDelta time, const PipelineStatusCB& cb) {
TRACE_EVENT1("media_stack", "ShellDemuxer::SeekTask()", "timestamp",
time.InMicroseconds());
DLOG(INFO) << base::StringPrintf("seek to: %" PRId64 " ms",
time.InMilliseconds());
// clear any enqueued buffers on demuxer streams
audio_demuxer_stream_->FlushBuffers();
video_demuxer_stream_->FlushBuffers();
// advance parser to new timestamp
if (!parser_->SeekTo(time)) {
DLOG(ERROR) << "parser seek failed.";
cb.Run(PIPELINE_ERROR_READ);
return;
}
// if both streams had finished downloading, we need to restart the request
if (audio_reached_eos_ && video_reached_eos_) {
DLOG(INFO) << "restarting stopped request loop";
Request(DemuxerStream::AUDIO);
}
audio_reached_eos_ = false;
video_reached_eos_ = false;
flushing_ = true;
cb.Run(PIPELINE_OK);
}
void ShellDemuxer::OnAudioRendererDisabled() {
NOTIMPLEMENTED();
}
void ShellDemuxer::SetPlaybackRate(float playback_rate) {
data_source_->SetPlaybackRate(playback_rate);
}
scoped_refptr<DemuxerStream> ShellDemuxer::GetStream(
media::DemuxerStream::Type type) {
if (type == DemuxerStream::AUDIO) {
return audio_demuxer_stream_;
} else if (type == DemuxerStream::VIDEO) {
return video_demuxer_stream_;
} else {
DLOG(WARNING) << "unsupported stream type requested";
}
return NULL;
}
base::TimeDelta ShellDemuxer::GetStartTime() const {
// we always assume a start time of 0
return base::TimeDelta();
}
const AudioDecoderConfig& ShellDemuxer::AudioConfig() {
return parser_->AudioConfig();
}
const VideoDecoderConfig& ShellDemuxer::VideoConfig() {
return parser_->VideoConfig();
}
bool ShellDemuxer::MessageLoopBelongsToCurrentThread() const {
return message_loop_->BelongsToCurrentThread();
}
} // namespace media