// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "media/mojo/common/mojo_decoder_buffer_converter.h"

#include <memory>

#include "base/bind.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/single_thread_task_runner.h"
#include "base/threading/thread_task_runner_handle.h"
#include "media/base/audio_buffer.h"
#include "media/base/cdm_context.h"
#include "media/base/decoder_buffer.h"
#include "media/mojo/common/media_type_converters.h"
#include "media/mojo/common/mojo_pipe_read_write_util.h"

using media::mojo_pipe_read_write_util::IsPipeReadWriteError;

namespace media {

// Creates mojo::DataPipe and sets `producer_handle` and `consumer_handle`.
// Returns true on success. Otherwise returns false and reset the handles.
bool CreateDataPipe(uint32_t capacity,
                    mojo::ScopedDataPipeProducerHandle* producer_handle,
                    mojo::ScopedDataPipeConsumerHandle* consumer_handle) {
  MojoCreateDataPipeOptions options;
  options.struct_size = sizeof(MojoCreateDataPipeOptions);
  options.flags = MOJO_CREATE_DATA_PIPE_FLAG_NONE;
  options.element_num_bytes = 1;
  options.capacity_num_bytes = capacity;

  auto result =
      mojo::CreateDataPipe(&options, *producer_handle, *consumer_handle);

  if (result != MOJO_RESULT_OK) {
    DLOG(ERROR) << "DataPipe creation failed with " << result;
    producer_handle->reset();
    consumer_handle->reset();
    return false;
  }

  return true;
}

uint32_t GetDefaultDecoderBufferConverterCapacity(DemuxerStream::Type type) {
  uint32_t capacity = 0;

  if (type == DemuxerStream::AUDIO) {
    // TODO(timav): Consider capacity calculation based on AudioDecoderConfig.
    capacity = 512 * 1024;
  } else if (type == DemuxerStream::VIDEO) {
    // Video can get quite large; at 4K, VP9 delivers packets which are ~1MB in
    // size; so allow for some head room.
    // TODO(xhwang, sandersd): Provide a better way to customize this value.
    capacity = 2 * (1024 * 1024);
  } else {
    NOTREACHED() << "Unsupported type: " << type;
    // Choose an arbitrary size.
    capacity = 512 * 1024;
  }

  return capacity;
}

// MojoDecoderBufferReader

// static
std::unique_ptr<MojoDecoderBufferReader> MojoDecoderBufferReader::Create(
    uint32_t capacity,
    mojo::ScopedDataPipeProducerHandle* producer_handle) {
  DVLOG(1) << __func__;
  DCHECK_GT(capacity, 0u);

  // Create a MojoDecoderBufferReader even on the failure case and
  // `ReadDecoderBuffer()` below will fail.
  // TODO(xhwang): Update callers to handle failure so we can return null.
  mojo::ScopedDataPipeConsumerHandle consumer_handle;
  ignore_result(CreateDataPipe(capacity, producer_handle, &consumer_handle));
  return std::make_unique<MojoDecoderBufferReader>(std::move(consumer_handle));
}

MojoDecoderBufferReader::MojoDecoderBufferReader(
    mojo::ScopedDataPipeConsumerHandle consumer_handle)
    : consumer_handle_(std::move(consumer_handle)),
      pipe_watcher_(FROM_HERE,
                    mojo::SimpleWatcher::ArmingPolicy::MANUAL,
                    base::SequencedTaskRunnerHandle::Get()),
      armed_(false),
      bytes_read_(0) {
  DVLOG(1) << __func__;

  if (!consumer_handle_.is_valid()) {
    DLOG(ERROR) << __func__ << ": Invalid consumer handle";
    return;
  }

  MojoResult result = pipe_watcher_.Watch(
      consumer_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE,
      MOJO_WATCH_CONDITION_SATISFIED,
      base::BindRepeating(&MojoDecoderBufferReader::OnPipeReadable,
                          base::Unretained(this)));
  if (result != MOJO_RESULT_OK) {
    DLOG(ERROR) << __func__
                << ": Failed to start watching the pipe. result=" << result;
    consumer_handle_.reset();
  }
}

MojoDecoderBufferReader::~MojoDecoderBufferReader() {
  DVLOG(1) << __func__;
  CancelAllPendingReadCBs();
  if (flush_cb_)
    std::move(flush_cb_).Run();
}

void MojoDecoderBufferReader::CancelReadCB(ReadCB read_cb) {
  DVLOG(1) << "Failed to read DecoderBuffer because the pipe is already closed";
  std::move(read_cb).Run(nullptr);
}

void MojoDecoderBufferReader::CancelAllPendingReadCBs() {
  while (!pending_read_cbs_.empty()) {
    ReadCB read_cb = std::move(pending_read_cbs_.front());
    pending_read_cbs_.pop_front();
    // TODO(sandersd): Make sure there are no possible re-entrancy issues
    // here. Perhaps these should be posted, or merged into a single error
    // callback?
    CancelReadCB(std::move(read_cb));
  }
}

void MojoDecoderBufferReader::CompleteCurrentRead() {
  DVLOG(4) << __func__;
  DCHECK(!pending_read_cbs_.empty());
  DCHECK_EQ(pending_read_cbs_.size(), pending_buffers_.size());

  ReadCB read_cb = std::move(pending_read_cbs_.front());
  pending_read_cbs_.pop_front();

  scoped_refptr<DecoderBuffer> buffer = std::move(pending_buffers_.front());
  pending_buffers_.pop_front();

  DCHECK(buffer->end_of_stream() || buffer->data_size() == bytes_read_);
  bytes_read_ = 0;

  std::move(read_cb).Run(std::move(buffer));

  if (pending_read_cbs_.empty() && flush_cb_)
    std::move(flush_cb_).Run();
}

void MojoDecoderBufferReader::ScheduleNextRead() {
  DVLOG(4) << __func__;
  DCHECK(!armed_);
  DCHECK(!pending_buffers_.empty());

  armed_ = true;
  pipe_watcher_.ArmOrNotify();
}

// TODO(xhwang): Move this up to match declaration order.
void MojoDecoderBufferReader::ReadDecoderBuffer(
    mojom::DecoderBufferPtr mojo_buffer,
    ReadCB read_cb) {
  DVLOG(3) << __func__;
  DCHECK(!flush_cb_);

  if (!consumer_handle_.is_valid()) {
    DCHECK(pending_read_cbs_.empty());
    CancelReadCB(std::move(read_cb));
    return;
  }

  scoped_refptr<DecoderBuffer> media_buffer(
      mojo_buffer.To<scoped_refptr<DecoderBuffer>>());
  DCHECK(media_buffer);

  // We don't want reads to complete out of order, so we queue them even if they
  // are zero-sized.
  pending_read_cbs_.push_back(std::move(read_cb));
  pending_buffers_.push_back(std::move(media_buffer));

  // Do nothing if a read is already scheduled.
  if (armed_)
    return;

  // To reduce latency, always process pending reads immediately.
  ProcessPendingReads();
}

void MojoDecoderBufferReader::Flush(base::OnceClosure flush_cb) {
  DVLOG(2) << __func__;
  DCHECK(!flush_cb_);

  if (pending_read_cbs_.empty()) {
    std::move(flush_cb).Run();
    return;
  }

  flush_cb_ = std::move(flush_cb);
}

bool MojoDecoderBufferReader::HasPendingReads() const {
  return !pending_read_cbs_.empty();
}

void MojoDecoderBufferReader::OnPipeReadable(
    MojoResult result,
    const mojo::HandleSignalsState& state) {
  DVLOG(4) << __func__ << "(" << result << ", " << state.readable() << ")";

  // |MOJO_RESULT_CANCELLED| may be dispatched even while the SimpleWatcher
  // is disarmed, and no further notifications will be dispatched after that.
  DCHECK(armed_ || result == MOJO_RESULT_CANCELLED);

  armed_ = false;

  if (result != MOJO_RESULT_OK) {
    OnPipeError(result);
    return;
  }

  DCHECK(state.readable());
  ProcessPendingReads();
}

void MojoDecoderBufferReader::ProcessPendingReads() {
  DVLOG(4) << __func__;
  DCHECK(!armed_);
  DCHECK(!pending_buffers_.empty());

  while (!pending_buffers_.empty()) {
    DecoderBuffer* buffer = pending_buffers_.front().get();

    uint32_t buffer_size = 0u;
    if (!pending_buffers_.front()->end_of_stream())
      buffer_size = base::checked_cast<uint32_t>(buffer->data_size());

    // Immediately complete empty reads.
    // A non-EOS buffer can have zero size. See http://crbug.com/663438
    if (buffer_size == 0) {
      // TODO(sandersd): Make sure there are no possible re-entrancy issues
      // here. Perhaps read callbacks should be posted?
      CompleteCurrentRead();
      continue;
    }

    // We may be starting to read a new buffer (|bytes_read_| == 0), or
    // recovering from a previous partial read (|bytes_read_| > 0).
    DCHECK_GT(buffer_size, bytes_read_);
    uint32_t num_bytes = buffer_size - bytes_read_;

    MojoResult result =
        consumer_handle_->ReadData(buffer->writable_data() + bytes_read_,
                                   &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);

    if (IsPipeReadWriteError(result)) {
      OnPipeError(result);
      return;
    }

    if (result == MOJO_RESULT_SHOULD_WAIT) {
      ScheduleNextRead();
      return;
    }

    DCHECK_EQ(result, MOJO_RESULT_OK);
    DVLOG(4) << __func__ << ": " << num_bytes << " bytes read.";
    DCHECK_GT(num_bytes, 0u);
    bytes_read_ += num_bytes;

    // TODO(sandersd): Make sure there are no possible re-entrancy issues
    // here.
    if (bytes_read_ == buffer_size)
      CompleteCurrentRead();

    // Since we can still read, try to read more.
  }
}

void MojoDecoderBufferReader::OnPipeError(MojoResult result) {
  DVLOG(1) << __func__ << "(" << result << ")";
  DCHECK(IsPipeReadWriteError(result));

  consumer_handle_.reset();

  if (!pending_buffers_.empty()) {
    DVLOG(1) << __func__ << ": reading from data pipe failed. result=" << result
             << ", buffer size=" << pending_buffers_.front()->data_size()
             << ", num_bytes(read)=" << bytes_read_;
    bytes_read_ = 0;
    pending_buffers_.clear();
    CancelAllPendingReadCBs();
  }
}

// MojoDecoderBufferWriter

// static
std::unique_ptr<MojoDecoderBufferWriter> MojoDecoderBufferWriter::Create(
    uint32_t capacity,
    mojo::ScopedDataPipeConsumerHandle* consumer_handle) {
  DVLOG(1) << __func__;
  DCHECK_GT(capacity, 0u);

  // Create a MojoDecoderBufferWriter even on the failure case and
  // `WriteDecoderBuffer()` below will fail.
  // TODO(xhwang): Update callers to handle failure so we can return null.
  mojo::ScopedDataPipeProducerHandle producer_handle;
  ignore_result(CreateDataPipe(capacity, &producer_handle, consumer_handle));
  return std::make_unique<MojoDecoderBufferWriter>(std::move(producer_handle));
}

MojoDecoderBufferWriter::MojoDecoderBufferWriter(
    mojo::ScopedDataPipeProducerHandle producer_handle)
    : producer_handle_(std::move(producer_handle)),
      pipe_watcher_(FROM_HERE,
                    mojo::SimpleWatcher::ArmingPolicy::MANUAL,
                    base::SequencedTaskRunnerHandle::Get()),
      armed_(false),
      bytes_written_(0) {
  DVLOG(1) << __func__;

  if (!producer_handle_.is_valid()) {
    DLOG(ERROR) << __func__ << ": Invalid producer handle";
    return;
  }

  MojoResult result = pipe_watcher_.Watch(
      producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
      MOJO_WATCH_CONDITION_SATISFIED,
      base::BindRepeating(&MojoDecoderBufferWriter::OnPipeWritable,
                          base::Unretained(this)));
  if (result != MOJO_RESULT_OK) {
    DLOG(ERROR) << __func__
                << ": Failed to start watching the pipe. result=" << result;
    producer_handle_.reset();
  }
}

MojoDecoderBufferWriter::~MojoDecoderBufferWriter() {
  DVLOG(1) << __func__;
}

void MojoDecoderBufferWriter::ScheduleNextWrite() {
  DVLOG(4) << __func__;
  DCHECK(!armed_);
  DCHECK(!pending_buffers_.empty());

  armed_ = true;
  pipe_watcher_.ArmOrNotify();
}

mojom::DecoderBufferPtr MojoDecoderBufferWriter::WriteDecoderBuffer(
    scoped_refptr<DecoderBuffer> media_buffer) {
  DVLOG(3) << __func__;

  // DecoderBuffer cannot be written if the pipe is already closed.
  if (!producer_handle_.is_valid()) {
    DVLOG(1)
        << __func__
        << ": Failed to write DecoderBuffer because the pipe is already closed";
    return nullptr;
  }

  mojom::DecoderBufferPtr mojo_buffer =
      mojom::DecoderBuffer::From(*media_buffer);

  // A non-EOS buffer can have zero size. See http://crbug.com/663438
  if (media_buffer->end_of_stream() || media_buffer->data_size() == 0)
    return mojo_buffer;

  // Queue writing the buffer's data into our DataPipe.
  pending_buffers_.push_back(std::move(media_buffer));

  // Do nothing if a write is already scheduled. Otherwise, to reduce latency,
  // always try to write data to the pipe first.
  if (!armed_)
    ProcessPendingWrites();

  return mojo_buffer;
}

void MojoDecoderBufferWriter::OnPipeWritable(
    MojoResult result,
    const mojo::HandleSignalsState& state) {
  DVLOG(4) << __func__ << "(" << result << ", " << state.writable() << ")";

  // |MOJO_RESULT_CANCELLED| may be dispatched even while the SimpleWatcher
  // is disarmed, and no further notifications will be dispatched after that.
  DCHECK(armed_ || result == MOJO_RESULT_CANCELLED);

  armed_ = false;

  if (result != MOJO_RESULT_OK) {
    OnPipeError(result);
    return;
  }

  DCHECK(state.writable());
  ProcessPendingWrites();
}

void MojoDecoderBufferWriter::ProcessPendingWrites() {
  DVLOG(4) << __func__;
  DCHECK(!armed_);
  DCHECK(!pending_buffers_.empty());

  while (!pending_buffers_.empty()) {
    DecoderBuffer* buffer = pending_buffers_.front().get();

    uint32_t buffer_size = base::checked_cast<uint32_t>(buffer->data_size());
    DCHECK_GT(buffer_size, 0u) << "Unexpected EOS or empty buffer";

    // We may be starting to write a new buffer (|bytes_written_| == 0), or
    // recovering from a previous partial write (|bytes_written_| > 0).
    uint32_t num_bytes = buffer_size - bytes_written_;
    DCHECK_GT(num_bytes, 0u);

    MojoResult result = producer_handle_->WriteData(
        buffer->data() + bytes_written_, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);

    if (IsPipeReadWriteError(result)) {
      OnPipeError(result);
      return;
    }

    if (result == MOJO_RESULT_SHOULD_WAIT) {
      ScheduleNextWrite();
      return;
    }

    DCHECK_EQ(MOJO_RESULT_OK, result);
    DVLOG(4) << __func__ << ": " << num_bytes << " bytes written.";
    DCHECK_GT(num_bytes, 0u);
    bytes_written_ += num_bytes;
    if (bytes_written_ == buffer_size) {
      pending_buffers_.pop_front();
      bytes_written_ = 0;
    }

    // Since we can still write, try to write more.
  }
}

void MojoDecoderBufferWriter::OnPipeError(MojoResult result) {
  DVLOG(1) << __func__ << "(" << result << ")";
  DCHECK(IsPipeReadWriteError(result));

  producer_handle_.reset();

  if (!pending_buffers_.empty()) {
    DVLOG(1) << __func__ << ": writing to data pipe failed. result=" << result
             << ", buffer size=" << pending_buffers_.front()->data_size()
             << ", num_bytes(written)=" << bytes_written_;
    pending_buffers_.clear();
    bytes_written_ = 0;
  }
}

}  // namespace media
