Improve source initialization

The initial state change to pause should be done on worker
thread. This way it avoids a possible race between the uridecodebin
analysing the source pads on sbplayer calling thread and
FinishSourceSetup adding pads on worker thread.

Change-Id: Ifa224b1ebeb81008a5dbee9fb4418965f8d2bab6
Signed-off-by: Eugene Mutavchi <Ievgen_Mutavchi@comcast.com>
diff --git a/src/third_party/starboard/rdk/shared/player/player_internal.cc b/src/third_party/starboard/rdk/shared/player/player_internal.cc
index 4961e1d..3eac891 100644
--- a/src/third_party/starboard/rdk/shared/player/player_internal.cc
+++ b/src/third_party/starboard/rdk/shared/player/player_internal.cc
@@ -964,7 +964,7 @@
   void Do() override { func_(player_, ctx_, state_, ticket_); }
 
   void PrintInfo() override {
-    GST_DEBUG("PlayerStatusTask state:%d (%s), ticket:%d", state_, PlayerStateToStr(state_), ticket_);
+    GST_INFO("PlayerStatusTask state:%d (%s), ticket:%d", state_, PlayerStateToStr(state_), ticket_);
   }
 
  private:
@@ -1029,8 +1029,8 @@
   }
 
   void PrintInfo() override {
-    GST_TRACE("DecoderStatusTask state:%d (%s), ticket:%d, media:%d", state_,
-              DecoderStateToStr(state_), ticket_, static_cast<int>(media_));
+    GST_LOG("DecoderStatusTask state:%d (%s), ticket:%d, media:%d", state_,
+            DecoderStateToStr(state_), ticket_, static_cast<int>(media_));
   }
 
  private:
@@ -1232,6 +1232,7 @@
 
   bool ChangePipelineState(GstState state) const;
   guint DispatchOnWorkerThread(Task* task) const;
+  void InvokeOnWorkerThreadAndWait(Task* task);
   GstClockTime GetPosition() const;
   bool WriteSample(SbMediaType sample_type,
                    GstBuffer* buffer,
@@ -1304,7 +1305,7 @@
   ::starboard::Mutex mutex_;
   ::starboard::Mutex source_setup_mutex_;
   ::starboard::Mutex seek_mutex_;
-  double rate_{1.0};
+  double rate_{0.0};
   int ticket_{SB_PLAYER_INITIAL_TICKET};
   mutable GstClockTime seek_position_{GST_CLOCK_TIME_NONE};
   GstClockTime max_sample_timestamps_[kMediaNumber]{0};
@@ -1543,6 +1544,9 @@
     while(!g_main_loop_is_running(main_loop_))
       g_usleep(1);
   }
+  else {
+    SB_NOTREACHED();
+  }
   GetPlayerRegistry()->Add(this);
 }
 
@@ -1831,6 +1835,43 @@
   return id;
 }
 
+void PlayerImpl::InvokeOnWorkerThreadAndWait(Task* task) {
+  struct InvokeContext {
+    ::starboard::Mutex mutex;
+    ::starboard::ConditionVariable cv { mutex };
+    Task* task;
+    bool done;
+  } ctx;
+
+  ctx.task = task;
+  ctx.done = false;
+
+  g_main_context_invoke_full(
+    main_loop_context_,
+    G_PRIORITY_HIGH,
+    [](gpointer data) -> gboolean {
+      auto* ctx = static_cast<InvokeContext*>(data);
+      GST_TRACE("%d", SbThreadGetId());
+      ctx->task->PrintInfo();
+      ctx->task->Do();
+      ctx->mutex.Acquire();
+      ctx->done = true;
+      ctx->cv.Signal();
+      ctx->mutex.Release();
+      return G_SOURCE_REMOVE;
+    },
+    &ctx,
+    nullptr);
+
+  // Wait for completion
+  ctx.mutex.Acquire();
+  while (!ctx.done)
+      ctx.cv.Wait();
+  ctx.mutex.Release();
+
+  delete task;
+}
+
 // static
 gboolean PlayerImpl::FinishSourceSetup(gpointer user_data) {
   PlayerImpl* self = static_cast<PlayerImpl*>(user_data);
@@ -1932,7 +1973,7 @@
   if (self->source_)
     return;
   self->source_ = source;
-  static constexpr int kAsyncSourceFinishTimeMs = 50;
+  static constexpr int kAsyncSourceFinishTimeMs = 0;
   GSource* src = g_timeout_source_new(kAsyncSourceFinishTimeMs);
   g_source_set_callback(src, &PlayerImpl::FinishSourceSetup, self, nullptr);
   self->source_setup_id_ = g_source_attach(src, self->main_loop_context_);
@@ -1975,6 +2016,8 @@
     if (g_str_has_prefix(GST_ELEMENT_NAME(element), "brcmaudiosink")) {
       g_object_set(G_OBJECT(element), "async", TRUE, nullptr);
     }
+
+    gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(element), FALSE);
   }
 
   if (GST_IS_BASE_PARSE(element)) {
@@ -2009,7 +2052,7 @@
     src = audio_appsrc_;
   }
 
-  GST_INFO_OBJECT(src, "===> %d", SbThreadGetId());
+  GST_INFO_OBJECT(src, "===> ticket: %d", ticket_);
   ::starboard::ScopedLock lock(mutex_);
   if (state_ == State::kPrerollAfterSeek)
     GST_DEBUG_OBJECT(src, "===> Mark EOS with State::kPrerollAfterSeek");
@@ -2388,24 +2431,27 @@
   if (state_ == State::kInitial) {
     // This is the initial seek to 0 which will trigger data pumping.
     SB_DCHECK(seek_position_ == .0);
-    AddBufferingProbe(0, ticket_);
     state_ = State::kInitialPreroll;
-    DispatchOnWorkerThread(
-      new PlayerStatusTask(player_status_func_, player_,
-                           ticket_, context_,
-                           kSbPlayerStatePrerolling));
     seek_position_ = GST_CLOCK_TIME_NONE;
     if (GST_STATE(pipeline_) < GST_STATE_PAUSED &&
         GST_STATE_PENDING(pipeline_) < GST_STATE_PAUSED) {
       mutex_.Release();
-      ChangePipelineState(GST_STATE_PAUSED);
+      // Trigger initial state change to pause on worker thread to serialize source setup
+      InvokeOnWorkerThreadAndWait(new FunctionTask([this]{
+        ChangePipelineState(GST_STATE_PAUSED);
+      }, __func__));
       mutex_.Acquire();
     }
+    // Notify player state change
+    DispatchOnWorkerThread(
+      new PlayerStatusTask(player_status_func_, player_,
+                           ticket_, context_,
+                           kSbPlayerStatePrerolling));
     return;
   }
 
-  // Ask for data.
   if (state_ == State::kInitialPreroll) {
+    // Ask for data.
     MediaType need_data = GetBothMediaTypeTakingCodecsIntoAccount();
     DecoderNeedsData(lock, need_data);
   }
@@ -2483,7 +2529,6 @@
     decoder_state_data_ = 0;
     eos_data_ = 0;
     is_seek_pending_ = false;
-
     rate = rate_;
 
     if (state_ == State::kInitial || GST_STATE(pipeline_) < GST_STATE_PAUSED) {
@@ -2532,6 +2577,14 @@
   rate_ = rate;
   pending_rate_ = .0;
 
+  if (state_ == State::kInitial) {
+    mutex_.Release();
+    SB_DCHECK(rate == .0);
+    SB_DCHECK(GST_STATE(pipeline_) < GST_STATE_PAUSED);
+    GST_DEBUG_OBJECT(pipeline_, "Ignore SetRate(%f) before initial seek", rate);
+    return true;
+  }
+
   if (rate == .0) {
     mutex_.Release();
     ChangePipelineState(GST_STATE_PAUSED);