Fix possible frame re-ordering during seek

Change-Id: Ic79df3e8aaa189494c0b454e3a1882d2f8568b3f
diff --git a/src/third_party/starboard/rdk/shared/media/gst_media_utils.cc b/src/third_party/starboard/rdk/shared/media/gst_media_utils.cc
index 58e4568..4f19891 100644
--- a/src/third_party/starboard/rdk/shared/media/gst_media_utils.cc
+++ b/src/third_party/starboard/rdk/shared/media/gst_media_utils.cc
@@ -66,8 +66,8 @@
       GST_ELEMENT_FACTORY_TYPE_PARSER | type, GST_RANK_MARGINAL)};
   UniqueFeatureList decoder_factories{gst_element_factory_list_get_elements(
       GST_ELEMENT_FACTORY_TYPE_DECODER | type, GST_RANK_MARGINAL)};
-  UniqueFeatureList demuxer_factories{gst_element_factory_list_get_elements(
-      GST_ELEMENT_FACTORY_TYPE_DEMUXER, GST_RANK_MARGINAL)};
+  // UniqueFeatureList demuxer_factories{gst_element_factory_list_get_elements(
+  //     GST_ELEMENT_FACTORY_TYPE_DEMUXER, GST_RANK_MARGINAL)};
 
   UniqueFeatureList elements;
   std::vector<std::string> caps;
diff --git a/src/third_party/starboard/rdk/shared/player/player_internal.cc b/src/third_party/starboard/rdk/shared/player/player_internal.cc
index ae9998c..4640a2f 100644
--- a/src/third_party/starboard/rdk/shared/player/player_internal.cc
+++ b/src/third_party/starboard/rdk/shared/player/player_internal.cc
@@ -1012,10 +1012,11 @@
   bool WriteSample(SbMediaType sample_type,
                    GstBuffer* buffer,
                    const std::string& session_id,
-                   GstBuffer* subsample = nullptr,
-                   int32_t subsamples_count = 0,
-                   GstBuffer* iv = nullptr,
-                   GstBuffer* key = nullptr);
+                   GstBuffer* subsample,
+                   int32_t subsamples_count,
+                   GstBuffer* iv,
+                   GstBuffer* key,
+                   uint64_t serial_id);
   MediaType GetBothMediaTypeTakingCodecsIntoAccount() const;
   void RecordTimestamp(SbMediaType type, SbTime timestamp);
   SbTime MinTimestamp(MediaType* origin) const;
@@ -1038,6 +1039,7 @@
 
   void HandleApplicationMessage(GstBus* bus, GstMessage* message);
   void WritePendingSamples(const uint8_t* key, size_t key_len);
+  void CheckBuffering(gint64 position);
 
   SbPlayer player_;
   SbWindow window_;
@@ -1086,7 +1088,7 @@
   PendingBounds pending_bounds_;
   SbMediaColorMetadata color_metadata_{};
   bool force_stop_ { false };
-  uint64_t samples_serial_ { 0 };
+  uint64_t samples_serial_[kMediaNumber] { 0 };
 
   bool has_oob_write_pending_{false};
   ::starboard::ConditionVariable pending_oob_write_condition_ { mutex_ };
@@ -1641,7 +1643,8 @@
                              GstBuffer* subsample,
                              int32_t subsample_count,
                              GstBuffer* iv,
-                             GstBuffer* key) {
+                             GstBuffer* key,
+                             uint64_t serial_id) {
   GstElement* src = nullptr;
   if (sample_type == kSbMediaTypeVideo) {
     src = video_appsrc_;
@@ -1662,8 +1665,8 @@
   }
 
   GST_CAT_LEVEL_LOG (GST_CAT_DEFAULT, log_level, src,
-                   "SampleType:%d %" GST_TIME_FORMAT " b:%p, s:%p, iv:%p, k:%p",
-                   sample_type, GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(buffer)),
+                   "SampleType:%d %" GST_TIME_FORMAT " id:%llu b:%p, s:%p, iv:%p, k:%p",
+                   sample_type, GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(buffer)), serial_id,
                    buffer, subsample, iv, key);
 
   bool decrypted = true;
@@ -1724,8 +1727,8 @@
   GstClockTime timestamp = sample_infos[0].timestamp * kSbTimeNanosecondsPerMicrosecond;
   GstBuffer* buffer =
       gst_buffer_new_allocate(nullptr, sample_infos[0].buffer_size, nullptr);
-  gst_buffer_fill(buffer, 0, sample_infos[0].buffer,
-                  sample_infos[0].buffer_size);
+  gsize sz = gst_buffer_fill(buffer, 0, sample_infos[0].buffer, sample_infos[0].buffer_size);
+  SB_DCHECK(sz == sample_infos[0].buffer_size);
   GST_BUFFER_TIMESTAMP(buffer) = timestamp;
   sample_deallocate_func_(player_, context_, sample_infos[0].buffer);
 
@@ -1760,9 +1763,19 @@
       (GST_STATE_PENDING(pipeline_) == GST_STATE_VOID_PENDING ||
        GST_STATE_PENDING(pipeline_) == GST_STATE_PAUSED) &&
       rate_ > .0) {
-    GST_TRACE("Moving to playing for %" GST_TIME_FORMAT,
-              GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(buffer)));
-    ChangePipelineState(GST_STATE_PLAYING);
+
+    gint64 seek_pos_ns = GST_CLOCK_TIME_NONE;
+    {
+      ::starboard::ScopedLock lock(mutex_);
+      if (seek_position_ != kSbTimeMax)
+        seek_pos_ns =  seek_position_ * kSbTimeNanosecondsPerMicrosecond;
+    }
+
+    if (!GST_CLOCK_TIME_IS_VALID(seek_pos_ns) || GST_BUFFER_TIMESTAMP(buffer) >= seek_pos_ns) {
+      GST_TRACE("Moving to playing for %" GST_TIME_FORMAT,
+                GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(buffer)));
+      ChangePipelineState(GST_STATE_PLAYING);
+    }
   }
 
   uint64_t serial = 0;
@@ -1771,7 +1784,7 @@
   {
     ::starboard::ScopedLock lock(mutex_);
     keep_samples = is_seek_pending_ || pending_rate_ != .0;
-    serial = ++samples_serial_;
+    serial = samples_serial_[ (sample_type == kSbMediaTypeVideo ? kVideoIndex : kAudioIndex) ]++;
   }
   if (sample_infos[0].drm_info) {
     GST_LOG("Encounterd encrypted %s sample",
@@ -1828,8 +1841,8 @@
       #endif
 
       GST_INFO("No session/pending flushing operation. Storing sample");
-      GST_INFO("SampleType:%d %" GST_TIME_FORMAT " b:%p, s:%p, iv:%p, k:%p(%s)",
-               sample_type, GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(buffer)), buffer,
+      GST_INFO("SampleType:%d %" GST_TIME_FORMAT " id:%llu b:%p, s:%p, iv:%p, k:%p(%s)",
+               sample_type, GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(buffer)), serial, buffer,
                subsamples, iv, key, md5sum);
 
       if (md5sum)
@@ -1849,16 +1862,29 @@
     GST_TRACE("Encountered clear sample");
     if (keep_samples) {
       GST_INFO("Pending flushing operation. Storing sample");
-      GST_INFO("SampleType:%d %" GST_TIME_FORMAT " b:%p, s:%p, iv:%p, k:%p",
-               sample_type, GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(buffer)), buffer,
+      GST_INFO("SampleType:%d %" GST_TIME_FORMAT " id:%llu b:%p, s:%p, iv:%p, k:%p",
+               sample_type, GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(buffer)), serial, buffer,
                subsamples, iv, key);
-      ::starboard::ScopedLock lock(mutex_);
-      PendingSample sample(sample_type, buffer, nullptr, nullptr, 0, nullptr, serial);
       key_str = {kClearSamplesKey};
+      PendingSample sample(sample_type, buffer, nullptr, nullptr, 0, nullptr, serial);
+      ::starboard::ScopedLock lock(mutex_);
       pending_samples_[key_str].emplace_back(std::move(sample));
     }
   }
 
+  {
+    // Let other thread finish writing
+    ::starboard::ScopedLock lock(mutex_);
+    while(has_oob_write_pending_) {
+      const auto kWaitTime = 10 * kSbTimeSecond;
+      if (!pending_oob_write_condition_.WaitTimed(kWaitTime)) {
+        GST_ERROR("Pending write took too long, give up");
+        has_oob_write_pending_ = false;
+        break;
+      }
+    }
+  }
+
   if (keep_samples) {
     PendingSamples local_samples;
     {
@@ -1866,10 +1892,19 @@
       local_samples.swap(pending_samples_[key_str]);
     }
 
+    if(local_samples.empty()) {
+      GST_WARNING("No pending samples");
+      return;
+    }
+
     auto& sample = local_samples.back();
+
+    SB_CHECK(sample.Type() == sample_type);
+    SB_CHECK(serial == sample.SerialID());
+
     if (WriteSample(sample.Type(), sample.Buffer(), session_id,
                     sample.Subsamples(), sample.SubsamplesCount(), sample.Iv(),
-                    sample.Key())) {
+                    sample.Key(), sample.SerialID())) {
       sample.Written();
     }
 
@@ -1879,19 +1914,8 @@
                 std::back_inserter(pending_samples_[key_str]));
     }
   } else {
-    {
-      // Let playback thread finish writing, but don't hang for too long
-      ::starboard::ScopedLock lock(mutex_);
-      while(has_oob_write_pending_) {
-        const auto kWaitTime = 10 * kSbTimeMillisecond;
-        if (!pending_oob_write_condition_.WaitTimed(kWaitTime)) {
-          has_oob_write_pending_ = false;
-          break;
-        }
-      }
-    }
     WriteSample(sample_type, buffer, session_id, subsamples, subsamples_count,
-                iv, key);
+                iv, key, serial);
   }
 
   if (!session_id.empty() && !keep_samples) {
@@ -1925,6 +1949,8 @@
       max_sample_timestamps_[kVideoIndex] = 0;
       max_sample_timestamps_[kAudioIndex] = 0;
       min_sample_timestamp_ = kSbTimeMax;
+      samples_serial_[kVideoIndex] = 0;
+      samples_serial_[kAudioIndex] = 0;
     }
 
     ticket_ = ticket;
@@ -2059,6 +2085,8 @@
 
   gint64 position = GetPosition();
 
+  CheckBuffering(position);
+
   GST_TRACE("Position: %" GST_TIME_FORMAT " (Seek to: %" GST_TIME_FORMAT
             ") Duration: %" GST_TIME_FORMAT,
             GST_TIME_ARGS(position),
@@ -2115,6 +2143,31 @@
   return gst_element_set_state(pipeline_, state) != GST_STATE_CHANGE_FAILURE;
 }
 
+void PlayerImpl::CheckBuffering(gint64 position) {
+  if (!GST_CLOCK_TIME_IS_VALID(position))
+    return;
+
+  constexpr SbTime kMarginNs =
+      350 * kSbTimeMillisecond * kSbTimeNanosecondsPerMicrosecond;
+  MediaType origin = MediaType::kNone;
+  SbTime min_ts = MinTimestamp(&origin);
+  if (min_ts != kSbTimeMax && min_ts + kMarginNs <= position &&
+      GST_STATE(pipeline_) == GST_STATE_PLAYING &&
+      GST_STATE_PENDING(pipeline_) != GST_STATE_PAUSED) {
+    {
+      ::starboard::ScopedLock lock(mutex_);
+      DecoderNeedsData(lock, origin);
+    }
+
+    PrintPositionPerSink(pipeline_);
+    GST_WARNING("Force setting to PAUSED. Pos: %" GST_TIME_FORMAT
+                " sample:%" GST_TIME_FORMAT,
+                GST_TIME_ARGS(position), GST_TIME_ARGS(min_ts + kMarginNs));
+
+    ChangePipelineState(GST_STATE_PAUSED);
+  }
+}
+
 gint64 PlayerImpl::GetPosition() const {
   gint64 position = GST_CLOCK_TIME_NONE;
 
@@ -2164,25 +2217,6 @@
                 GST_TIME_ARGS(position));
   }
 
-  MediaType origin = MediaType::kNone;
-  constexpr SbTime kMarginNs =
-      350 * kSbTimeMillisecond * kSbTimeNanosecondsPerMicrosecond;
-  SbTime min_ts = MinTimestamp(&origin);
-  if (min_ts != kSbTimeMax && min_ts + kMarginNs <= position &&
-      GST_STATE(pipeline_) == GST_STATE_PLAYING &&
-      GST_STATE_PENDING(pipeline_) != GST_STATE_PAUSED) {
-    {
-      ::starboard::ScopedLock lock(mutex_);
-      DecoderNeedsData(lock, origin);
-    }
-
-    PrintPositionPerSink(pipeline_);
-    GST_WARNING("Force setting to PAUSED. Pos: %" GST_TIME_FORMAT
-                " sample:%" GST_TIME_FORMAT,
-                GST_TIME_ARGS(position), GST_TIME_ARGS(min_ts + kMarginNs));
-    ChangePipelineState(GST_STATE_PAUSED);
-  }
-
   cached_position_ns_ = position;
   return position;
 }
@@ -2233,9 +2267,9 @@
     GstClockTime prev_timestamps[kMediaNumber] = {-1, -1};
     for (auto& sample : local_samples) {
       GST_INFO("Writing pending: SampleType:%d %" GST_TIME_FORMAT
-               " b:%p, s:%p, iv:%p, k:%p",
+               " id:%llu b:%p, s:%p, iv:%p, k:%p",
                sample.Type(),
-               GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(sample.Buffer())),
+               GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(sample.Buffer())), sample.SerialID(),
                sample.Buffer(), sample.Subsamples(), sample.Iv(), sample.Key());
       auto &prev_ts = prev_timestamps[sample.Type() == kSbMediaTypeVideo ? kVideoIndex : kAudioIndex];
       if (prev_ts == GST_BUFFER_TIMESTAMP(sample.Buffer())) {
@@ -2246,7 +2280,7 @@
       prev_ts = GST_BUFFER_TIMESTAMP(sample.Buffer());
       if (WriteSample(sample.Type(), sample.Buffer(), session_id,
                       sample.Subsamples(), sample.SubsamplesCount(),
-                      sample.Iv(), sample.Key())) {
+                      sample.Iv(), sample.Key(), sample.SerialID())) {
         GST_INFO("Pending sample was written.");
         sample.Written();
       }