Improve source initialization
The initial state change to pause should be done on worker
thread. This way it avoids a possible race between the uridecodebin
analysing the source pads on sbplayer calling thread and
FinishSourceSetup adding pads on worker thread.
Change-Id: Ifa224b1ebeb81008a5dbee9fb4418965f8d2bab6
Signed-off-by: Eugene Mutavchi <Ievgen_Mutavchi@comcast.com>
diff --git a/src/third_party/starboard/rdk/shared/player/player_internal.cc b/src/third_party/starboard/rdk/shared/player/player_internal.cc
index 4961e1d..3eac891 100644
--- a/src/third_party/starboard/rdk/shared/player/player_internal.cc
+++ b/src/third_party/starboard/rdk/shared/player/player_internal.cc
@@ -964,7 +964,7 @@
void Do() override { func_(player_, ctx_, state_, ticket_); }
void PrintInfo() override {
- GST_DEBUG("PlayerStatusTask state:%d (%s), ticket:%d", state_, PlayerStateToStr(state_), ticket_);
+ GST_INFO("PlayerStatusTask state:%d (%s), ticket:%d", state_, PlayerStateToStr(state_), ticket_);
}
private:
@@ -1029,8 +1029,8 @@
}
void PrintInfo() override {
- GST_TRACE("DecoderStatusTask state:%d (%s), ticket:%d, media:%d", state_,
- DecoderStateToStr(state_), ticket_, static_cast<int>(media_));
+ GST_LOG("DecoderStatusTask state:%d (%s), ticket:%d, media:%d", state_,
+ DecoderStateToStr(state_), ticket_, static_cast<int>(media_));
}
private:
@@ -1232,6 +1232,7 @@
bool ChangePipelineState(GstState state) const;
guint DispatchOnWorkerThread(Task* task) const;
+ void InvokeOnWorkerThreadAndWait(Task* task);
GstClockTime GetPosition() const;
bool WriteSample(SbMediaType sample_type,
GstBuffer* buffer,
@@ -1304,7 +1305,7 @@
::starboard::Mutex mutex_;
::starboard::Mutex source_setup_mutex_;
::starboard::Mutex seek_mutex_;
- double rate_{1.0};
+ double rate_{0.0};
int ticket_{SB_PLAYER_INITIAL_TICKET};
mutable GstClockTime seek_position_{GST_CLOCK_TIME_NONE};
GstClockTime max_sample_timestamps_[kMediaNumber]{0};
@@ -1543,6 +1544,9 @@
while(!g_main_loop_is_running(main_loop_))
g_usleep(1);
}
+ else {
+ SB_NOTREACHED();
+ }
GetPlayerRegistry()->Add(this);
}
@@ -1831,6 +1835,43 @@
return id;
}
+void PlayerImpl::InvokeOnWorkerThreadAndWait(Task* task) {
+ struct InvokeContext {
+ ::starboard::Mutex mutex;
+ ::starboard::ConditionVariable cv { mutex };
+ Task* task;
+ bool done;
+ } ctx;
+
+ ctx.task = task;
+ ctx.done = false;
+
+ g_main_context_invoke_full(
+ main_loop_context_,
+ G_PRIORITY_HIGH,
+ [](gpointer data) -> gboolean {
+ auto* ctx = static_cast<InvokeContext*>(data);
+ GST_TRACE("%d", SbThreadGetId());
+ ctx->task->PrintInfo();
+ ctx->task->Do();
+ ctx->mutex.Acquire();
+ ctx->done = true;
+ ctx->cv.Signal();
+ ctx->mutex.Release();
+ return G_SOURCE_REMOVE;
+ },
+ &ctx,
+ nullptr);
+
+ // Wait for completion
+ ctx.mutex.Acquire();
+ while (!ctx.done)
+ ctx.cv.Wait();
+ ctx.mutex.Release();
+
+ delete task;
+}
+
// static
gboolean PlayerImpl::FinishSourceSetup(gpointer user_data) {
PlayerImpl* self = static_cast<PlayerImpl*>(user_data);
@@ -1932,7 +1973,7 @@
if (self->source_)
return;
self->source_ = source;
- static constexpr int kAsyncSourceFinishTimeMs = 50;
+ static constexpr int kAsyncSourceFinishTimeMs = 0;
GSource* src = g_timeout_source_new(kAsyncSourceFinishTimeMs);
g_source_set_callback(src, &PlayerImpl::FinishSourceSetup, self, nullptr);
self->source_setup_id_ = g_source_attach(src, self->main_loop_context_);
@@ -1975,6 +2016,8 @@
if (g_str_has_prefix(GST_ELEMENT_NAME(element), "brcmaudiosink")) {
g_object_set(G_OBJECT(element), "async", TRUE, nullptr);
}
+
+ gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(element), FALSE);
}
if (GST_IS_BASE_PARSE(element)) {
@@ -2009,7 +2052,7 @@
src = audio_appsrc_;
}
- GST_INFO_OBJECT(src, "===> %d", SbThreadGetId());
+ GST_INFO_OBJECT(src, "===> ticket: %d", ticket_);
::starboard::ScopedLock lock(mutex_);
if (state_ == State::kPrerollAfterSeek)
GST_DEBUG_OBJECT(src, "===> Mark EOS with State::kPrerollAfterSeek");
@@ -2388,24 +2431,27 @@
if (state_ == State::kInitial) {
// This is the initial seek to 0 which will trigger data pumping.
SB_DCHECK(seek_position_ == .0);
- AddBufferingProbe(0, ticket_);
state_ = State::kInitialPreroll;
- DispatchOnWorkerThread(
- new PlayerStatusTask(player_status_func_, player_,
- ticket_, context_,
- kSbPlayerStatePrerolling));
seek_position_ = GST_CLOCK_TIME_NONE;
if (GST_STATE(pipeline_) < GST_STATE_PAUSED &&
GST_STATE_PENDING(pipeline_) < GST_STATE_PAUSED) {
mutex_.Release();
- ChangePipelineState(GST_STATE_PAUSED);
+ // Trigger initial state change to pause on worker thread to serialize source setup
+ InvokeOnWorkerThreadAndWait(new FunctionTask([this]{
+ ChangePipelineState(GST_STATE_PAUSED);
+ }, __func__));
mutex_.Acquire();
}
+ // Notify player state change
+ DispatchOnWorkerThread(
+ new PlayerStatusTask(player_status_func_, player_,
+ ticket_, context_,
+ kSbPlayerStatePrerolling));
return;
}
- // Ask for data.
if (state_ == State::kInitialPreroll) {
+ // Ask for data.
MediaType need_data = GetBothMediaTypeTakingCodecsIntoAccount();
DecoderNeedsData(lock, need_data);
}
@@ -2483,7 +2529,6 @@
decoder_state_data_ = 0;
eos_data_ = 0;
is_seek_pending_ = false;
-
rate = rate_;
if (state_ == State::kInitial || GST_STATE(pipeline_) < GST_STATE_PAUSED) {
@@ -2532,6 +2577,14 @@
rate_ = rate;
pending_rate_ = .0;
+ if (state_ == State::kInitial) {
+ mutex_.Release();
+ SB_DCHECK(rate == .0);
+ SB_DCHECK(GST_STATE(pipeline_) < GST_STATE_PAUSED);
+ GST_DEBUG_OBJECT(pipeline_, "Ignore SetRate(%f) before initial seek", rate);
+ return true;
+ }
+
if (rate == .0) {
mutex_.Release();
ChangePipelineState(GST_STATE_PAUSED);