// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "cobalt/media/blink/multibuffer.h"

#include <utility>

#include "base/bind.h"
#include "base/location.h"

namespace media {

// Prune 80 blocks per 30 seconds.
// This means a full cache will go away in ~5 minutes.
enum {
  kBlockPruneInterval = 30,
  kBlocksPrunedPerInterval = 80,
};

// Returns the block ID closest to (but less or equal than) |pos| from |index|.
template <class T>
static MultiBuffer::BlockId ClosestPreviousEntry(
    const std::map<MultiBuffer::BlockId, T>& index, MultiBuffer::BlockId pos) {
  auto i = index.upper_bound(pos);
  DCHECK(i == index.end() || i->first > pos);
  if (i == index.begin()) {
    return std::numeric_limits<MultiBufferBlockId>::min();
  }
  --i;
  DCHECK_LE(i->first, pos);
  return i->first;
}

// Returns the block ID closest to (but greter than or equal to) |pos|
// from |index|.
template <class T>
static MultiBuffer::BlockId ClosestNextEntry(
    const std::map<MultiBuffer::BlockId, T>& index, MultiBuffer::BlockId pos) {
  auto i = index.lower_bound(pos);
  if (i == index.end()) {
    return std::numeric_limits<MultiBufferBlockId>::max();
  }
  DCHECK_GE(i->first, pos);
  return i->first;
}

//
// MultiBuffer::GlobalLRU
//
MultiBuffer::GlobalLRU::GlobalLRU(
    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
    : max_size_(0),
      data_size_(0),
      background_pruning_pending_(false),
      task_runner_(task_runner) {}

MultiBuffer::GlobalLRU::~GlobalLRU() {
  // By the time we're freed, all blocks should have been removed,
  // and our sums should be zero.
  DCHECK(lru_.Empty());
  DCHECK_EQ(max_size_, 0);
  DCHECK_EQ(data_size_, 0);
}

void MultiBuffer::GlobalLRU::Use(MultiBuffer* multibuffer,
                                 MultiBufferBlockId block_id) {
  GlobalBlockId id(multibuffer, block_id);
  lru_.Use(id);
  SchedulePrune();
}

void MultiBuffer::GlobalLRU::Insert(MultiBuffer* multibuffer,
                                    MultiBufferBlockId block_id) {
  GlobalBlockId id(multibuffer, block_id);
  lru_.Insert(id);
  SchedulePrune();
}

void MultiBuffer::GlobalLRU::Remove(MultiBuffer* multibuffer,
                                    MultiBufferBlockId block_id) {
  GlobalBlockId id(multibuffer, block_id);
  lru_.Remove(id);
}

bool MultiBuffer::GlobalLRU::Contains(MultiBuffer* multibuffer,
                                      MultiBufferBlockId block_id) {
  GlobalBlockId id(multibuffer, block_id);
  return lru_.Contains(id);
}

void MultiBuffer::GlobalLRU::IncrementDataSize(int64_t blocks) {
  data_size_ += blocks;
  DCHECK_GE(data_size_, 0);
  SchedulePrune();
}

void MultiBuffer::GlobalLRU::IncrementMaxSize(int64_t blocks) {
  max_size_ += blocks;
  DCHECK_GE(max_size_, 0);
  SchedulePrune();
}

bool MultiBuffer::GlobalLRU::Pruneable() const {
  return data_size_ > max_size_ && !lru_.Empty();
}

void MultiBuffer::GlobalLRU::SchedulePrune() {
  if (Pruneable() && !background_pruning_pending_) {
    task_runner_->PostDelayedTask(
        FROM_HERE, base::Bind(&MultiBuffer::GlobalLRU::PruneTask, this),
        base::TimeDelta::FromSeconds(kBlockPruneInterval));
    background_pruning_pending_ = true;
  }
}

void MultiBuffer::GlobalLRU::PruneTask() {
  background_pruning_pending_ = false;
  Prune(kBlocksPrunedPerInterval);
  SchedulePrune();
}

void MultiBuffer::GlobalLRU::Prune(int64_t max_to_free) {
  // We group the blocks by multibuffer so that we can free as many blocks as
  // possible in one call. This reduces the number of callbacks to clients
  // when their available ranges change.
  std::map<MultiBuffer*, std::vector<MultiBufferBlockId>> to_free;
  int64_t freed = 0;
  while (data_size_ - freed > max_size_ && !lru_.Empty() &&
         freed < max_to_free) {
    GlobalBlockId block_id = lru_.Pop();
    to_free[block_id.first].push_back(block_id.second);
    freed++;
  }
  for (const auto& to_free_pair : to_free) {
    to_free_pair.first->ReleaseBlocks(to_free_pair.second);
  }
}

int64_t MultiBuffer::GlobalLRU::Size() const { return lru_.Size(); }

//
// MultiBuffer
//
MultiBuffer::MultiBuffer(int32_t block_size_shift,
                         const scoped_refptr<GlobalLRU>& global_lru)
    : max_size_(0), block_size_shift_(block_size_shift), lru_(global_lru) {}

MultiBuffer::~MultiBuffer() {
  CHECK(pinned_.empty());
  DCHECK_EQ(max_size_, 0);
  // Remove all blocks from the LRU.
  for (const auto& i : data_) {
    lru_->Remove(this, i.first);
  }
  lru_->IncrementDataSize(-static_cast<int64_t>(data_.size()));
  lru_->IncrementMaxSize(-max_size_);
}

void MultiBuffer::AddReader(const BlockId& pos, Reader* reader) {
  std::set<Reader*>* set_of_readers = &readers_[pos];
  bool already_waited_for = !set_of_readers->empty();
  set_of_readers->insert(reader);

  if (already_waited_for || Contains(pos)) {
    return;
  }

  // We may need to create a new data provider to service this request.
  // Look for an existing data provider first.
  DataProvider* provider = NULL;
  BlockId closest_writer = ClosestPreviousEntry(writer_index_, pos);

  if (closest_writer > pos - kMaxWaitForWriterOffset) {
    auto i = present_.find(pos);
    BlockId closest_block;
    if (i.value()) {
      // Shouldn't happen, we already tested that Contains(pos) is true.
      NOTREACHED();
      closest_block = pos;
    } else if (i == present_.begin()) {
      closest_block = -1;
    } else {
      closest_block = i.interval_begin() - 1;
    }

    // Make sure that there are no present blocks between the writer and
    // the requested position, as that will cause the writer to quit.
    if (closest_writer > closest_block) {
      provider = writer_index_[closest_writer].get();
      DCHECK(provider);
    }
  }
  if (!provider) {
    DCHECK(writer_index_.find(pos) == writer_index_.end());
    writer_index_[pos] = CreateWriter(pos);
    provider = writer_index_[pos].get();
  }
  provider->SetDeferred(false);
}

void MultiBuffer::RemoveReader(const BlockId& pos, Reader* reader) {
  auto i = readers_.find(pos);
  if (i == readers_.end()) return;
  i->second.erase(reader);
  if (i->second.empty()) {
    readers_.erase(i);
  }
}

void MultiBuffer::CleanupWriters(const BlockId& pos) {
  BlockId p2 = pos + kMaxWaitForReaderOffset;
  BlockId closest_writer = ClosestPreviousEntry(writer_index_, p2);
  while (closest_writer > pos - kMaxWaitForWriterOffset) {
    DCHECK(writer_index_[closest_writer]);
    OnDataProviderEvent(writer_index_[closest_writer].get());
    closest_writer = ClosestPreviousEntry(writer_index_, closest_writer - 1);
  }
}

bool MultiBuffer::Contains(const BlockId& pos) const {
  DCHECK(present_[pos] == 0 || present_[pos] == 1)
      << " pos = " << pos << " present_[pos] " << present_[pos];
  DCHECK_EQ(present_[pos], data_.find(pos) != data_.end() ? 1 : 0);
  return !!present_[pos];
}

MultiBufferBlockId MultiBuffer::FindNextUnavailable(const BlockId& pos) const {
  auto i = present_.find(pos);
  if (i.value()) return i.interval_end();
  return pos;
}

void MultiBuffer::NotifyAvailableRange(
    const Interval<MultiBufferBlockId>& observer_range,
    const Interval<MultiBufferBlockId>& new_range) {
  std::set<Reader*> tmp;
  for (auto i = readers_.lower_bound(observer_range.begin);
       i != readers_.end() && i->first < observer_range.end; ++i) {
    tmp.insert(i->second.begin(), i->second.end());
  }
  for (Reader* reader : tmp) {
    reader->NotifyAvailableRange(new_range);
  }
}

void MultiBuffer::ReleaseBlocks(const std::vector<MultiBufferBlockId>& blocks) {
  IntervalMap<BlockId, int32_t> freed;
  for (MultiBufferBlockId to_free : blocks) {
    DCHECK(data_[to_free]);
    DCHECK_EQ(pinned_[to_free], 0);
    DCHECK_EQ(present_[to_free], 1);
    data_.erase(to_free);
    freed.IncrementInterval(to_free, to_free + 1, 1);
    present_.IncrementInterval(to_free, to_free + 1, -1);
  }
  lru_->IncrementDataSize(-static_cast<int64_t>(blocks.size()));

  for (const auto& freed_range : freed) {
    if (freed_range.second) {
      // Technically, there shouldn't be any observers in this range
      // as all observers really should be pinning the range where it's
      // actually observing.
      NotifyAvailableRange(
          freed_range.first,
          // Empty range.
          Interval<BlockId>(freed_range.first.begin, freed_range.first.begin));

      auto i = present_.find(freed_range.first.begin);
      DCHECK_EQ(i.value(), 0);
      DCHECK_LE(i.interval_begin(), freed_range.first.begin);
      DCHECK_LE(freed_range.first.end, i.interval_end());

      if (i.interval_begin() == freed_range.first.begin) {
        // Notify the previous range that it contains fewer blocks.
        auto j = i;
        --j;
        DCHECK_EQ(j.value(), 1);
        NotifyAvailableRange(j.interval(), j.interval());
      }
      if (i.interval_end() == freed_range.first.end) {
        // Notify the following range that it contains fewer blocks.
        auto j = i;
        ++j;
        DCHECK_EQ(j.value(), 1);
        NotifyAvailableRange(j.interval(), j.interval());
      }
    }
  }
  if (data_.empty()) OnEmpty();
}

void MultiBuffer::OnEmpty() {}

void MultiBuffer::AddProvider(std::unique_ptr<DataProvider> provider) {
  // If there is already a provider in the same location, we delete it.
  DCHECK(!provider->Available());
  BlockId pos = provider->Tell();
  writer_index_[pos] = std::move(provider);
}

std::unique_ptr<MultiBuffer::DataProvider> MultiBuffer::RemoveProvider(
    DataProvider* provider) {
  BlockId pos = provider->Tell();
  auto iter = writer_index_.find(pos);
  DCHECK(iter != writer_index_.end());
  DCHECK_EQ(iter->second.get(), provider);
  std::unique_ptr<DataProvider> ret = std::move(iter->second);
  writer_index_.erase(iter);
  return ret;
}

MultiBuffer::ProviderState MultiBuffer::SuggestProviderState(
    const BlockId& pos) const {
  MultiBufferBlockId next_reader_pos = ClosestNextEntry(readers_, pos);
  if (next_reader_pos != std::numeric_limits<MultiBufferBlockId>::max() &&
      (next_reader_pos - pos <= kMaxWaitForWriterOffset || !RangeSupported())) {
    // Check if there is another writer between us and the next reader.
    MultiBufferBlockId next_writer_pos =
        ClosestNextEntry(writer_index_, pos + 1);
    if (next_writer_pos > next_reader_pos) {
      return ProviderStateLoad;
    }
  }

  MultiBufferBlockId previous_reader_pos =
      ClosestPreviousEntry(readers_, pos - 1);
  if (previous_reader_pos != std::numeric_limits<MultiBufferBlockId>::min() &&
      (pos - previous_reader_pos <= kMaxWaitForReaderOffset ||
       !RangeSupported())) {
    MultiBufferBlockId previous_writer_pos =
        ClosestPreviousEntry(writer_index_, pos - 1);
    if (previous_writer_pos < previous_reader_pos) {
      return ProviderStateDefer;
    }
  }

  return ProviderStateDead;
}

bool MultiBuffer::ProviderCollision(const BlockId& id) const {
  // If there is a writer at the same location, it is always a collision.
  if (writer_index_.find(id) != writer_index_.end()) return true;

  // Data already exists at providers current position,
  // if the URL supports ranges, we can kill the data provider.
  if (RangeSupported() && Contains(id)) return true;

  return false;
}

void MultiBuffer::Prune(size_t max_to_free) { lru_->Prune(max_to_free); }

void MultiBuffer::OnDataProviderEvent(DataProvider* provider_tmp) {
  std::unique_ptr<DataProvider> provider(RemoveProvider(provider_tmp));
  BlockId start_pos = provider->Tell();
  BlockId pos = start_pos;
  bool eof = false;
  int64_t blocks_before = data_.size();

  while (!ProviderCollision(pos) && !eof) {
    if (!provider->Available()) {
      AddProvider(std::move(provider));
      break;
    }
    DCHECK_GE(pos, 0);
    scoped_refptr<DataBuffer> data = provider->Read();
    data_[pos] = data;
    eof = data->end_of_stream();
    if (!pinned_[pos]) lru_->Use(this, pos);
    ++pos;
  }
  int64_t blocks_after = data_.size();
  int64_t blocks_added = blocks_after - blocks_before;

  if (pos > start_pos) {
    present_.SetInterval(start_pos, pos, 1);
    Interval<BlockId> expanded_range = present_.find(start_pos).interval();
    NotifyAvailableRange(expanded_range, expanded_range);
    lru_->IncrementDataSize(blocks_added);
    Prune(blocks_added * kMaxFreesPerAdd + 1);
  } else {
    // Make sure to give progress reports even when there
    // aren't any new blocks yet.
    NotifyAvailableRange(Interval<BlockId>(start_pos, start_pos + 1),
                         Interval<BlockId>(start_pos, start_pos));
  }

  // Check that it's still there before we try to delete it.
  // In case of EOF or a collision, we might not have called AddProvider above.
  // Even if we did call AddProvider, calling NotifyAvailableRange can cause
  // readers to seek or self-destruct and clean up any associated writers.
  auto i = writer_index_.find(pos);
  if (i != writer_index_.end() && i->second.get() == provider_tmp) {
    switch (SuggestProviderState(pos)) {
      case ProviderStateLoad:
        // Not sure we actually need to do this
        provider_tmp->SetDeferred(false);
        break;
      case ProviderStateDefer:
        provider_tmp->SetDeferred(true);
        break;
      case ProviderStateDead:
        RemoveProvider(provider_tmp);
        break;
    }
  }
}

void MultiBuffer::MergeFrom(MultiBuffer* other) {
  // Import data and update LRU.
  size_t data_size = data_.size();
  for (const auto& data : other->data_) {
    if (data_.insert(std::make_pair(data.first, data.second)).second) {
      if (!pinned_[data.first]) {
        lru_->Insert(this, data.first);
      }
    }
  }
  lru_->IncrementDataSize(static_cast<int64_t>(data_.size() - data_size));
  // Update present_
  for (const auto& r : other->present_) {
    if (r.second) {
      present_.SetInterval(r.first.begin, r.first.end, 1);
    }
  }
  // Notify existing readers.
  auto last = present_.begin();
  for (const auto& r : other->present_) {
    if (r.second) {
      auto i = present_.find(r.first.begin);
      if (i != last) {
        NotifyAvailableRange(i.interval(), i.interval());
        last = i;
      }
    }
  }
}

void MultiBuffer::PinRange(const BlockId& from, const BlockId& to,
                           int32_t how_much) {
  DCHECK_NE(how_much, 0);
  DVLOG(3) << "PINRANGE [" << from << " - " << to << ") += " << how_much;
  pinned_.IncrementInterval(from, to, how_much);
  Interval<BlockId> modified_range(from, to);

  // Iterate over all the modified ranges and check if any of them have
  // transitioned in or out of the unlocked state. If so, we iterate over
  // all buffers in that range and add/remove them from the LRU as approperiate.
  // We iterate *backwards* through the ranges, with the idea that data in a
  // continous range should be freed from the end first.

  if (data_.empty()) return;

  auto range = pinned_.find(to - 1);
  while (1) {
    DCHECK_GE(range.value(), 0);
    if (range.value() == 0 || range.value() == how_much) {
      bool pin = range.value() == how_much;
      Interval<BlockId> transition_range =
          modified_range.Intersect(range.interval());
      if (transition_range.Empty()) break;

      // For each range that has transitioned to/from a pinned state,
      // we iterate over the corresponding ranges in |present_| to find
      // the blocks that are actually in the multibuffer.
      for (auto present_block_range = present_.find(transition_range.end - 1);
           present_block_range != present_.begin(); --present_block_range) {
        if (!present_block_range.value()) continue;
        Interval<BlockId> present_transitioned_range =
            transition_range.Intersect(present_block_range.interval());
        if (present_transitioned_range.Empty()) break;
        for (BlockId block = present_transitioned_range.end - 1;
             block >= present_transitioned_range.begin; --block) {
          DCHECK_GE(block, 0);
          DCHECK(data_.find(block) != data_.end());
          if (pin) {
            DCHECK(pinned_[block]);
            lru_->Remove(this, block);
          } else {
            DCHECK(!pinned_[block]);
            lru_->Insert(this, block);
          }
        }
      }
    }
    if (range == pinned_.begin()) break;
    --range;
  }
}

void MultiBuffer::PinRanges(const IntervalMap<BlockId, int32_t>& ranges) {
  for (const auto& r : ranges) {
    if (r.second != 0) {
      PinRange(r.first.begin, r.first.end, r.second);
    }
  }
}

void MultiBuffer::IncrementMaxSize(int32_t size) {
  max_size_ += size;
  lru_->IncrementMaxSize(size);
  DCHECK_GE(max_size_, 0);
  // Pruning only happens when blocks are added.
}

int64_t MultiBuffer::UncommittedBytesAt(const MultiBuffer::BlockId& block) {
  auto i = writer_index_.find(block);
  if (writer_index_.end() == i) return 0;
  return i->second->AvailableBytes();
}

}  // namespace media
