// Copyright 2017 The Cobalt Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "cobalt/websocket/web_socket_impl.h"

#include <algorithm>
#include <cstdint>
#include <memory>

#include "base/basictypes.h"
#include "base/bind_helpers.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/string_util.h"
#include "cobalt/base/polymorphic_downcast.h"
#include "cobalt/websocket/web_socket.h"
#include "net/http/http_util.h"
#include "starboard/memory.h"

namespace cobalt {
namespace websocket {

WebSocketImpl::WebSocketImpl(cobalt::network::NetworkModule *network_module,
                             WebSocket *delegate)
    : network_module_(network_module), delegate_(delegate) {
  DCHECK(base::MessageLoop::current());
  owner_task_runner_ = base::MessageLoop::current()->task_runner();
}

void WebSocketImpl::ResetWebSocketEventDelegate() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  delegate_ = NULL;

  delegate_task_runner_->PostTask(
      FROM_HERE, base::Bind(&WebSocketImpl::DoClose, this,
                            CloseInfo(net::kWebSocketErrorGoingAway)));
}

void WebSocketImpl::Connect(const std::string &origin, const GURL &url,
                            const std::vector<std::string> &sub_protocols) {
  if (!network_module_) {
    DLOG(WARNING) << "Trying to make web socket connection without network "
                     "module, aborting.";
    return;
  }
  DCHECK(network_module_->url_request_context_getter());
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  origin_ = origin;

  DLOG(INFO) << "Connecting to websocket at " << url.spec();

  connect_url_ = url;
  desired_sub_protocols_ = sub_protocols;

  // TODO: Network Task Runner might not be the optimal task runner.
  // To achieve low latency communication via websockets, a dedicated high
  // priority thread might be required.  Investigation is needed.
  delegate_task_runner_ =
      network_module_->url_request_context_getter()->GetNetworkTaskRunner();
  DCHECK(delegate_task_runner_);
  base::WaitableEvent channel_created_event(
      base::WaitableEvent::ResetPolicy::MANUAL,
      base::WaitableEvent::InitialState::NOT_SIGNALED);
  base::Closure create_socket_stream_closure(
      base::Bind(&WebSocketImpl::DoConnect, this,
                 network_module_->url_request_context_getter(), url,
                 base::Unretained(&channel_created_event)));
  delegate_task_runner_->PostTask(FROM_HERE, create_socket_stream_closure);

  // Wait for the channel to be completed.  This event is signaled after
  // websocket_channel_ has been assigned.
  channel_created_event.Wait();
}

void WebSocketImpl::DoConnect(
    scoped_refptr<cobalt::network::URLRequestContextGetter> context,
    const GURL &url, base::WaitableEvent *channel_created_event) {
  DCHECK(delegate_task_runner_->BelongsToCurrentThread());
  DCHECK(url.is_valid());
  DCHECK(channel_created_event);
  DCHECK(context->GetURLRequestContext());

  websocket_channel_ = std::make_unique<net::WebSocketChannel>(
      std::unique_ptr<net::WebSocketEventInterface>(
          new CobaltWebSocketEventHandler(this)),
      context->GetURLRequestContext());

  websocket_channel_->SendAddChannelRequest(
      url, desired_sub_protocols_, url::Origin::Create(GURL(origin_)),
      connect_url_ /*site_for_cookies*/,
      net::HttpRequestHeaders() /*additional_headers*/);
  channel_created_event
      ->Signal();  // Signal that this->websocket_channel_ has been assigned.

  // On Cobalt we do not support flow control.
  auto flow_control_result = websocket_channel_->SendFlowControl(INT_MAX);
  DCHECK_EQ(net::WebSocketChannel::CHANNEL_ALIVE, flow_control_result);
}

void WebSocketImpl::Close(const net::WebSocketError code,
                          const std::string &reason) {
  CloseInfo close_info(code, reason);
  delegate_task_runner_->PostTask(
      FROM_HERE, base::Bind(&WebSocketImpl::DoClose, this, close_info));
}

void WebSocketImpl::DoClose(const CloseInfo &close_info) {
  if (!websocket_channel_) {
    return;
  }
  DCHECK(delegate_task_runner_->BelongsToCurrentThread());

  auto channel_state = websocket_channel_->StartClosingHandshake(
      close_info.code, close_info.reason);
  if (channel_state == net::WebSocketChannel::CHANNEL_DELETED ||
      close_info.code == net::kWebSocketErrorGoingAway) {
    websocket_channel_.reset();
  }
}

void WebSocketImpl::ResetChannel() {
  DCHECK(delegate_task_runner_->BelongsToCurrentThread());
  websocket_channel_.reset();
}

WebSocketImpl::~WebSocketImpl() {
  // The channel must have been destroyed already, in order to ensure that it
  // is destroyed on the correct thread.
  DCHECK(!websocket_channel_);
}

// The main reason to call TrampolineClose is to ensure messages that are posted
// from this thread prior to this function call are processed before the
// connection is closed.
void WebSocketImpl::TrampolineClose(const CloseInfo &close_info) {
  base::Closure no_op_closure =
      base::Closure(base::Bind([]() {} /*Do nothing*/));

  base::Closure do_close_closure(
      base::Bind(&WebSocketImpl::DoClose, this, close_info));
  owner_task_runner_->PostTaskAndReply(FROM_HERE, no_op_closure,
                                       do_close_closure);
}

void WebSocketImpl::OnHandshakeComplete(
    const std::string &selected_subprotocol) {
  owner_task_runner_->PostTask(
      FROM_HERE, base::Bind(&WebSocketImpl::OnWebSocketConnected, this,
                            selected_subprotocol));
}

void WebSocketImpl::OnFlowControl(int64_t quota) {
  DCHECK(current_quota_ >= 0);
  current_quota_ += quota;
  ProcessSendQueue();
}

void WebSocketImpl::OnWebSocketConnected(
    const std::string &selected_subprotocol) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  if (delegate_) {
    delegate_->OnConnected(selected_subprotocol);
  }
}

void WebSocketImpl::OnWebSocketDisconnected(bool was_clean, uint16 code,
                                            const std::string &reason) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  if (delegate_) {
    delegate_->OnDisconnected(was_clean, code, reason);
  }
}

void WebSocketImpl::OnWebSocketReceivedData(
    bool is_text_frame, scoped_refptr<net::IOBufferWithSize> data) {
  if (!owner_task_runner_->BelongsToCurrentThread()) {
    owner_task_runner_->PostTask(
        FROM_HERE, base::Bind(&WebSocketImpl::OnWebSocketReceivedData, this,
                              is_text_frame, data));
    return;
  }

  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  if (delegate_) {
    delegate_->OnReceivedData(is_text_frame, data);
  }
}

void WebSocketImpl::OnClose(bool was_clean, int error_code,
                            const std::string &close_reason) {
  DCHECK(delegate_task_runner_->BelongsToCurrentThread());

  std::uint16_t close_code = static_cast<std::uint16_t>(error_code);

  DLOG(INFO) << "WebSocket is closing."
             << " code[" << close_code << "] reason[" << close_reason << "]"
             << " was_clean: " << was_clean;

  // Queue the deletion of |websocket_channel_|.  We would do it here, but this
  // function may be called as a callback *by* |websocket_channel_|;
  delegate_task_runner_->PostTask(
      FROM_HERE, base::Bind(&WebSocketImpl::ResetChannel, this));
  owner_task_runner_->PostTask(
      FROM_HERE, base::Bind(&WebSocketImpl::OnWebSocketDisconnected, this,
                            was_clean, close_code, close_reason));
}

void WebSocketImpl::OnWriteDone(uint64_t bytes_written) {
  owner_task_runner_->PostTask(
      FROM_HERE,
      base::Bind(&WebSocketImpl::OnWebSocketWriteDone, this, bytes_written));
}

void WebSocketImpl::OnWebSocketWriteDone(uint64_t bytes_written) {
  if (delegate_) {
    delegate_->OnWriteDone(bytes_written);
  }
}

// Currently only called in SocketStream::Finish(), so it is meant
// as an informative message.
// SocketStream code will soon call OnClose after this.
// Note: SocketStream will not call OnClose in some cases with SPDY, but that
// is legacy code, as SPDY is not used in Cobalt.

bool WebSocketImpl::SendHelper(const net::WebSocketFrameHeader::OpCode op_code,
                               const char *data, std::size_t length,
                               std::string * /*error_message*/) {
  scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(length));
  SbMemoryCopy(io_buffer->data(), data, length);

  if (delegate_task_runner_->BelongsToCurrentThread()) {
    SendOnDelegateThread(op_code, std::move(io_buffer), length);
  } else {
    base::Closure do_send_closure(
        base::Bind(&WebSocketImpl::SendOnDelegateThread, this, op_code,
                   std::move(io_buffer), length));
    delegate_task_runner_->PostTask(FROM_HERE, do_send_closure);
  }

  // TODO: Change to void function?
  return true;
}

void WebSocketImpl::SendOnDelegateThread(
    const net::WebSocketFrameHeader::OpCode op_code,
    scoped_refptr<net::IOBuffer> io_buffer, std::size_t length) {
  DCHECK(delegate_task_runner_->BelongsToCurrentThread());

  if (!websocket_channel_) {
    DLOG(WARNING) << "Attempt to send over a closed channel.";
    return;
  }
  SendQueueMessage new_message = {io_buffer, length, op_code};
  send_queue_.push(std::move(new_message));
  ProcessSendQueue();
}

void WebSocketImpl::ProcessSendQueue() {
  DCHECK(delegate_task_runner_->BelongsToCurrentThread());
  while (current_quota_ > 0 && !send_queue_.empty()) {
    SendQueueMessage message = send_queue_.front();
    size_t current_message_length = message.length - sent_size_of_top_message_;
    bool final = false;
    if (current_quota_ < static_cast<int64_t>(current_message_length)) {
      // quota is not enough to send the top message.
      scoped_refptr<net::IOBuffer> new_io_buffer(
          new net::IOBuffer(static_cast<size_t>(current_quota_)));
      SbMemoryCopy(new_io_buffer->data(),
                   message.io_buffer->data() + sent_size_of_top_message_,
                   current_quota_);
      sent_size_of_top_message_ += current_quota_;
      message.io_buffer = new_io_buffer;
      current_message_length = current_quota_;
      current_quota_ = 0;
    } else {
      // Sent all of the remaining top message.
      final = true;
      send_queue_.pop();
      sent_size_of_top_message_ = 0;
      current_quota_ -= current_message_length;
    }
    auto channel_state = websocket_channel_->SendFrame(
        final, message.op_code, message.io_buffer, current_message_length);
    if (channel_state == net::WebSocketChannel::CHANNEL_DELETED) {
      websocket_channel_.reset();
    }
  }
}

bool WebSocketImpl::SendText(const char *data, std::size_t length,
                             int *buffered_amount, std::string *error_message) {
  DCHECK(error_message);
  DCHECK(error_message->empty());
  error_message->clear();
  DCHECK(buffered_amount);
  *buffered_amount += static_cast<int>(length);

  return SendHelper(net::WebSocketFrameHeader::kOpCodeText, data, length,
                    error_message);
}

bool WebSocketImpl::SendBinary(const char *data, std::size_t length,
                               int *buffered_amount,
                               std::string *error_message) {
  DCHECK(error_message);
  DCHECK(error_message->empty());
  error_message->clear();
  DCHECK(buffered_amount);
  *buffered_amount += static_cast<int>(length);

  return SendHelper(net::WebSocketFrameHeader::kOpCodeBinary, data, length,
                    error_message);
}

}  // namespace websocket
}  // namespace cobalt
