blob: eaa9b33ec2394646e133e88f5d6512188d7c8efb [file] [log] [blame]
// Copyright 2017 The Cobalt Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cobalt/websocket/web_socket_impl.h"
#include <algorithm>
#include <cstdint>
#include <memory>
#include "base/basictypes.h"
#include "base/bind_helpers.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/string_util.h"
#include "cobalt/base/polymorphic_downcast.h"
#include "cobalt/websocket/web_socket.h"
#include "net/http/http_util.h"
#include "starboard/memory.h"
namespace cobalt {
namespace websocket {
WebSocketImpl::WebSocketImpl(cobalt::network::NetworkModule *network_module,
WebSocket *delegate)
: network_module_(network_module), delegate_(delegate) {
DCHECK(base::MessageLoop::current());
owner_task_runner_ = base::MessageLoop::current()->task_runner();
}
void WebSocketImpl::ResetWebSocketEventDelegate() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
delegate_ = NULL;
delegate_task_runner_->PostTask(
FROM_HERE, base::Bind(&WebSocketImpl::DoClose, this,
CloseInfo(net::kWebSocketErrorGoingAway)));
}
void WebSocketImpl::Connect(const std::string &origin, const GURL &url,
const std::vector<std::string> &sub_protocols) {
if (!network_module_) {
DLOG(WARNING) << "Trying to make web socket connection without network "
"module, aborting.";
return;
}
DCHECK(network_module_->url_request_context_getter());
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
origin_ = origin;
DLOG(INFO) << "Connecting to websocket at " << url.spec();
connect_url_ = url;
desired_sub_protocols_ = sub_protocols;
// TODO: Network Task Runner might not be the optimal task runner.
// To achieve low latency communication via websockets, a dedicated high
// priority thread might be required. Investigation is needed.
delegate_task_runner_ =
network_module_->url_request_context_getter()->GetNetworkTaskRunner();
DCHECK(delegate_task_runner_);
base::WaitableEvent channel_created_event(
base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED);
base::Closure create_socket_stream_closure(
base::Bind(&WebSocketImpl::DoConnect, this,
network_module_->url_request_context_getter(), url,
base::Unretained(&channel_created_event)));
delegate_task_runner_->PostTask(FROM_HERE, create_socket_stream_closure);
// Wait for the channel to be completed. This event is signaled after
// websocket_channel_ has been assigned.
channel_created_event.Wait();
}
void WebSocketImpl::DoConnect(
scoped_refptr<cobalt::network::URLRequestContextGetter> context,
const GURL &url, base::WaitableEvent *channel_created_event) {
DCHECK(delegate_task_runner_->BelongsToCurrentThread());
DCHECK(url.is_valid());
DCHECK(channel_created_event);
DCHECK(context->GetURLRequestContext());
websocket_channel_ = std::make_unique<net::WebSocketChannel>(
std::unique_ptr<net::WebSocketEventInterface>(
new CobaltWebSocketEventHandler(this)),
context->GetURLRequestContext());
websocket_channel_->SendAddChannelRequest(
url, desired_sub_protocols_, url::Origin::Create(GURL(origin_)),
connect_url_ /*site_for_cookies*/,
net::HttpRequestHeaders() /*additional_headers*/);
channel_created_event
->Signal(); // Signal that this->websocket_channel_ has been assigned.
// On Cobalt we do not support flow control.
auto flow_control_result = websocket_channel_->SendFlowControl(INT_MAX);
DCHECK_EQ(net::WebSocketChannel::CHANNEL_ALIVE, flow_control_result);
}
void WebSocketImpl::Close(const net::WebSocketError code,
const std::string &reason) {
CloseInfo close_info(code, reason);
delegate_task_runner_->PostTask(
FROM_HERE, base::Bind(&WebSocketImpl::DoClose, this, close_info));
}
void WebSocketImpl::DoClose(const CloseInfo &close_info) {
if (!websocket_channel_) {
return;
}
DCHECK(delegate_task_runner_->BelongsToCurrentThread());
auto channel_state = websocket_channel_->StartClosingHandshake(
close_info.code, close_info.reason);
if (channel_state == net::WebSocketChannel::CHANNEL_DELETED ||
close_info.code == net::kWebSocketErrorGoingAway) {
websocket_channel_.reset();
}
}
void WebSocketImpl::ResetChannel() {
DCHECK(delegate_task_runner_->BelongsToCurrentThread());
websocket_channel_.reset();
}
WebSocketImpl::~WebSocketImpl() {
// The channel must have been destroyed already, in order to ensure that it
// is destroyed on the correct thread.
DCHECK(!websocket_channel_);
}
// The main reason to call TrampolineClose is to ensure messages that are posted
// from this thread prior to this function call are processed before the
// connection is closed.
void WebSocketImpl::TrampolineClose(const CloseInfo &close_info) {
base::Closure no_op_closure =
base::Closure(base::Bind([]() {} /*Do nothing*/));
base::Closure do_close_closure(
base::Bind(&WebSocketImpl::DoClose, this, close_info));
owner_task_runner_->PostTaskAndReply(FROM_HERE, no_op_closure,
do_close_closure);
}
void WebSocketImpl::OnHandshakeComplete(
const std::string &selected_subprotocol) {
owner_task_runner_->PostTask(
FROM_HERE, base::Bind(&WebSocketImpl::OnWebSocketConnected, this,
selected_subprotocol));
}
void WebSocketImpl::OnFlowControl(int64_t quota) {
DCHECK_GE(current_quota_, 0);
current_quota_ += quota;
ProcessSendQueue();
}
void WebSocketImpl::OnWebSocketConnected(
const std::string &selected_subprotocol) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (delegate_) {
delegate_->OnConnected(selected_subprotocol);
}
}
void WebSocketImpl::OnWebSocketDisconnected(bool was_clean, uint16 code,
const std::string &reason) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (delegate_) {
delegate_->OnDisconnected(was_clean, code, reason);
}
}
void WebSocketImpl::OnWebSocketReceivedData(
bool is_text_frame, scoped_refptr<net::IOBufferWithSize> data) {
if (!owner_task_runner_->BelongsToCurrentThread()) {
owner_task_runner_->PostTask(
FROM_HERE, base::Bind(&WebSocketImpl::OnWebSocketReceivedData, this,
is_text_frame, data));
return;
}
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (delegate_) {
delegate_->OnReceivedData(is_text_frame, data);
}
}
void WebSocketImpl::OnClose(bool was_clean, int error_code,
const std::string &close_reason) {
DCHECK(delegate_task_runner_->BelongsToCurrentThread());
std::uint16_t close_code = static_cast<std::uint16_t>(error_code);
DLOG(INFO) << "WebSocket is closing."
<< " code[" << close_code << "] reason[" << close_reason << "]"
<< " was_clean: " << was_clean;
// Queue the deletion of |websocket_channel_|. We would do it here, but this
// function may be called as a callback *by* |websocket_channel_|;
delegate_task_runner_->PostTask(
FROM_HERE, base::Bind(&WebSocketImpl::ResetChannel, this));
owner_task_runner_->PostTask(
FROM_HERE, base::Bind(&WebSocketImpl::OnWebSocketDisconnected, this,
was_clean, close_code, close_reason));
}
void WebSocketImpl::OnWriteDone(uint64_t bytes_written) {
owner_task_runner_->PostTask(
FROM_HERE,
base::Bind(&WebSocketImpl::OnWebSocketWriteDone, this, bytes_written));
}
void WebSocketImpl::OnWebSocketWriteDone(uint64_t bytes_written) {
if (delegate_) {
delegate_->OnWriteDone(bytes_written);
}
}
// Currently only called in SocketStream::Finish(), so it is meant
// as an informative message.
// SocketStream code will soon call OnClose after this.
// Note: SocketStream will not call OnClose in some cases with SPDY, but that
// is legacy code, as SPDY is not used in Cobalt.
bool WebSocketImpl::SendHelper(const net::WebSocketFrameHeader::OpCode op_code,
const char *data, std::size_t length,
std::string *error_message) {
scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(length));
SbMemoryCopy(io_buffer->data(), data, length);
if (delegate_task_runner_->BelongsToCurrentThread()) {
SendOnDelegateThread(op_code, std::move(io_buffer), length);
} else {
base::Closure do_send_closure(
base::Bind(&WebSocketImpl::SendOnDelegateThread, this, op_code,
std::move(io_buffer), length));
delegate_task_runner_->PostTask(FROM_HERE, do_send_closure);
}
// TODO: Change to void function?
return true;
}
void WebSocketImpl::SendOnDelegateThread(
const net::WebSocketFrameHeader::OpCode op_code,
scoped_refptr<net::IOBuffer> io_buffer, std::size_t length) {
DCHECK(delegate_task_runner_->BelongsToCurrentThread());
if (!websocket_channel_) {
DLOG(WARNING) << "Attempt to send over a closed channel.";
return;
}
SendQueueMessage new_message = {io_buffer, length, op_code};
send_queue_.push(std::move(new_message));
ProcessSendQueue();
}
void WebSocketImpl::ProcessSendQueue() {
DCHECK(delegate_task_runner_->BelongsToCurrentThread());
while (current_quota_ > 0 && !send_queue_.empty()) {
SendQueueMessage message = send_queue_.front();
size_t current_message_length = message.length - sent_size_of_top_message_;
bool final = false;
if (current_quota_ < static_cast<int64_t>(current_message_length)) {
// quota is not enough to send the top message.
scoped_refptr<net::IOBuffer> new_io_buffer(
new net::IOBuffer(static_cast<size_t>(current_quota_)));
SbMemoryCopy(new_io_buffer->data(),
message.io_buffer->data() + sent_size_of_top_message_,
current_quota_);
sent_size_of_top_message_ += current_quota_;
message.io_buffer = new_io_buffer;
current_message_length = current_quota_;
current_quota_ = 0;
} else {
// Sent all of the remaining top message.
final = true;
send_queue_.pop();
sent_size_of_top_message_ = 0;
current_quota_ -= current_message_length;
}
auto channel_state = websocket_channel_->SendFrame(
final, message.op_code, message.io_buffer, current_message_length);
if (channel_state == net::WebSocketChannel::CHANNEL_DELETED) {
websocket_channel_.reset();
}
}
}
bool WebSocketImpl::SendText(const char *data, std::size_t length,
int *buffered_amount, std::string *error_message) {
DCHECK(error_message);
DCHECK(error_message->empty());
error_message->clear();
DCHECK(buffered_amount);
*buffered_amount += static_cast<int>(length);
return SendHelper(net::WebSocketFrameHeader::kOpCodeText, data, length,
error_message);
}
bool WebSocketImpl::SendBinary(const char *data, std::size_t length,
int *buffered_amount,
std::string *error_message) {
DCHECK(error_message);
DCHECK(error_message->empty());
error_message->clear();
DCHECK(buffered_amount);
*buffered_amount += static_cast<int>(length);
return SendHelper(net::WebSocketFrameHeader::kOpCodeBinary, data, length,
error_message);
}
} // namespace websocket
} // namespace cobalt