blob: a9b7049eb23d3a620fa158001d8bc4e9e8b8f93e [file] [log] [blame]
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/spdy/spdy_session.h"
#include <map>
#include "base/basictypes.h"
#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/message_loop.h"
#include "base/metrics/field_trial.h"
#include "base/metrics/histogram.h"
#include "base/metrics/stats_counters.h"
#include "base/stl_util.h"
#include "base/string_number_conversions.h"
#include "base/string_util.h"
#include "base/stringprintf.h"
#include "base/time.h"
#include "base/utf_string_conversions.h"
#include "base/values.h"
#include "crypto/ec_private_key.h"
#include "crypto/ec_signature_creator.h"
#include "net/base/asn1_util.h"
#include "net/base/connection_type_histograms.h"
#include "net/base/net_log.h"
#include "net/base/net_util.h"
#include "net/base/server_bound_cert_service.h"
#include "net/http/http_network_session.h"
#include "net/http/http_server_properties.h"
#include "net/spdy/spdy_credential_builder.h"
#include "net/spdy/spdy_frame_builder.h"
#include "net/spdy/spdy_http_utils.h"
#include "net/spdy/spdy_protocol.h"
#include "net/spdy/spdy_session_pool.h"
#include "net/spdy/spdy_stream.h"
namespace net {
namespace {
const int kReadBufferSize = 8 * 1024;
const int kDefaultConnectionAtRiskOfLossSeconds = 10;
const int kHungIntervalSeconds = 10;
// Minimum seconds that unclaimed pushed streams will be kept in memory.
const int kMinPushedStreamLifetimeSeconds = 300;
Value* NetLogSpdySynCallback(const SpdyHeaderBlock* headers,
bool fin,
bool unidirectional,
SpdyStreamId stream_id,
SpdyStreamId associated_stream,
NetLog::LogLevel /* log_level */) {
DictionaryValue* dict = new DictionaryValue();
ListValue* headers_list = new ListValue();
for (SpdyHeaderBlock::const_iterator it = headers->begin();
it != headers->end(); ++it) {
headers_list->Append(new StringValue(base::StringPrintf(
"%s: %s", it->first.c_str(), it->second.c_str())));
}
dict->SetBoolean("fin", fin);
dict->SetBoolean("unidirectional", unidirectional);
dict->Set("headers", headers_list);
dict->SetInteger("stream_id", stream_id);
if (associated_stream)
dict->SetInteger("associated_stream", associated_stream);
return dict;
}
Value* NetLogSpdyCredentialCallback(size_t slot,
const std::string* origin,
NetLog::LogLevel /* log_level */) {
DictionaryValue* dict = new DictionaryValue();
dict->SetInteger("slot", slot);
dict->SetString("origin", *origin);
return dict;
}
Value* NetLogSpdySessionCloseCallback(int net_error,
const std::string* description,
NetLog::LogLevel /* log_level */) {
DictionaryValue* dict = new DictionaryValue();
dict->SetInteger("net_error", net_error);
dict->SetString("description", *description);
return dict;
}
Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
NetLog::LogLevel /* log_level */) {
DictionaryValue* dict = new DictionaryValue();
dict->SetString("host", host_pair->first.ToString());
dict->SetString("proxy", host_pair->second.ToPacString());
return dict;
}
Value* NetLogSpdySettingCallback(SpdySettingsIds id,
SpdySettingsFlags flags,
uint32 value,
NetLog::LogLevel /* log_level */) {
DictionaryValue* dict = new DictionaryValue();
dict->SetInteger("id", id);
dict->SetInteger("flags", flags);
dict->SetInteger("value", value);
return dict;
}
Value* NetLogSpdySettingsCallback(const SettingsMap* settings,
NetLog::LogLevel /* log_level */) {
DictionaryValue* dict = new DictionaryValue();
ListValue* settings_list = new ListValue();
for (SettingsMap::const_iterator it = settings->begin();
it != settings->end(); ++it) {
const SpdySettingsIds id = it->first;
const SpdySettingsFlags flags = it->second.first;
const uint32 value = it->second.second;
settings_list->Append(new StringValue(
base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value)));
}
dict->Set("settings", settings_list);
return dict;
}
Value* NetLogSpdyWindowUpdateCallback(SpdyStreamId stream_id,
int32 delta,
NetLog::LogLevel /* log_level */) {
DictionaryValue* dict = new DictionaryValue();
dict->SetInteger("stream_id", static_cast<int>(stream_id));
dict->SetInteger("delta", delta);
return dict;
}
Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
int size,
SpdyDataFlags flags,
NetLog::LogLevel /* log_level */) {
DictionaryValue* dict = new DictionaryValue();
dict->SetInteger("stream_id", static_cast<int>(stream_id));
dict->SetInteger("size", size);
dict->SetInteger("flags", static_cast<int>(flags));
return dict;
}
Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
int status,
const std::string* description,
NetLog::LogLevel /* log_level */) {
DictionaryValue* dict = new DictionaryValue();
dict->SetInteger("stream_id", static_cast<int>(stream_id));
dict->SetInteger("status", status);
dict->SetString("description", *description);
return dict;
}
Value* NetLogSpdyPingCallback(uint32 unique_id,
const char* type,
NetLog::LogLevel /* log_level */) {
DictionaryValue* dict = new DictionaryValue();
dict->SetInteger("unique_id", unique_id);
dict->SetString("type", type);
return dict;
}
Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
int active_streams,
int unclaimed_streams,
SpdyGoAwayStatus status,
NetLog::LogLevel /* log_level */) {
DictionaryValue* dict = new DictionaryValue();
dict->SetInteger("last_accepted_stream_id",
static_cast<int>(last_stream_id));
dict->SetInteger("active_streams", active_streams);
dict->SetInteger("unclaimed_streams", unclaimed_streams);
dict->SetInteger("status", static_cast<int>(status));
return dict;
}
// Maximum number of concurrent streams we will create, unless the server
// sends a SETTINGS frame with a different value.
const size_t kInitialMaxConcurrentStreams = 100;
// The maximum number of concurrent streams we will ever create. Even if
// the server permits more, we will never exceed this limit.
const size_t kMaxConcurrentStreamLimit = 256;
const size_t kDefaultInitialRecvWindowSize = 10 * 1024 * 1024; // 10MB
} // namespace
// static
void SpdySession::SpdyIOBufferProducer::ActivateStream(
SpdySession* spdy_session,
SpdyStream* spdy_stream) {
spdy_session->ActivateStream(spdy_stream);
}
// static
SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
SpdyFrame* frame,
RequestPriority priority,
SpdyStream* stream) {
size_t size = frame->length() + SpdyFrame::kHeaderSize;
DCHECK_GT(size, 0u);
// TODO(mbelshe): We have too much copying of data here.
IOBufferWithSize* buffer = new IOBufferWithSize(size);
memcpy(buffer->data(), frame->data(), size);
return new SpdyIOBuffer(buffer, size, priority, stream);
}
SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
SpdySessionPool* spdy_session_pool,
HttpServerProperties* http_server_properties,
bool verify_domain_authentication,
bool enable_sending_initial_settings,
bool enable_credential_frames,
bool enable_compression,
bool enable_ping_based_connection_checking,
NextProto default_protocol,
size_t initial_recv_window_size,
size_t initial_max_concurrent_streams,
size_t max_concurrent_streams_limit,
TimeFunc time_func,
const HostPortPair& trusted_spdy_proxy,
NetLog* net_log)
: ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)),
host_port_proxy_pair_(host_port_proxy_pair),
spdy_session_pool_(spdy_session_pool),
http_server_properties_(http_server_properties),
connection_(new ClientSocketHandle),
read_buffer_(new IOBuffer(kReadBufferSize)),
read_pending_(false),
stream_hi_water_mark_(1), // Always start at 1 for the first stream id.
write_pending_(false),
delayed_write_pending_(false),
is_secure_(false),
certificate_error_code_(OK),
error_(OK),
state_(IDLE),
max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
kInitialMaxConcurrentStreams :
initial_max_concurrent_streams),
max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ?
kMaxConcurrentStreamLimit :
max_concurrent_streams_limit),
streams_initiated_count_(0),
streams_pushed_count_(0),
streams_pushed_and_claimed_count_(0),
streams_abandoned_count_(0),
bytes_received_(0),
sent_settings_(false),
received_settings_(false),
stalled_streams_(0),
pings_in_flight_(0),
next_ping_id_(1),
last_activity_time_(base::TimeTicks::Now()),
check_ping_status_pending_(false),
flow_control_(false),
initial_send_window_size_(kSpdyStreamInitialWindowSize),
initial_recv_window_size_(initial_recv_window_size == 0 ?
kDefaultInitialRecvWindowSize :
initial_recv_window_size),
net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
verify_domain_authentication_(verify_domain_authentication),
enable_sending_initial_settings_(enable_sending_initial_settings),
enable_credential_frames_(enable_credential_frames),
enable_compression_(enable_compression),
enable_ping_based_connection_checking_(
enable_ping_based_connection_checking),
default_protocol_(default_protocol),
credential_state_(SpdyCredentialState::kDefaultNumSlots),
connection_at_risk_of_loss_time_(
base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
hung_interval_(
base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
trusted_spdy_proxy_(trusted_spdy_proxy),
time_func_(time_func) {
DCHECK(HttpStreamFactory::spdy_enabled());
net_log_.BeginEvent(
NetLog::TYPE_SPDY_SESSION,
base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair_));
next_unclaimed_push_stream_sweep_time_ = time_func_() +
base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
// TODO(mbelshe): consider randomization of the stream_hi_water_mark.
}
SpdySession::PendingCreateStream::PendingCreateStream(
const GURL& url, RequestPriority priority,
scoped_refptr<SpdyStream>* spdy_stream,
const BoundNetLog& stream_net_log,
const CompletionCallback& callback)
: url(&url),
priority(priority),
spdy_stream(spdy_stream),
stream_net_log(&stream_net_log),
callback(callback) {
}
SpdySession::PendingCreateStream::~PendingCreateStream() {}
SpdySession::CallbackResultPair::CallbackResultPair(
const CompletionCallback& callback_in, int result_in)
: callback(callback_in),
result(result_in) {
}
SpdySession::CallbackResultPair::~CallbackResultPair() {}
SpdySession::~SpdySession() {
if (state_ != CLOSED) {
state_ = CLOSED;
// Cleanup all the streams.
CloseAllStreams(net::ERR_ABORTED);
}
if (connection_->is_initialized()) {
// With SPDY we can't recycle sockets.
connection_->socket()->Disconnect();
}
// Streams should all be gone now.
DCHECK_EQ(0u, num_active_streams());
DCHECK_EQ(0u, num_unclaimed_pushed_streams());
DCHECK(pending_callback_map_.empty());
RecordHistograms();
net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
}
net::Error SpdySession::InitializeWithSocket(
ClientSocketHandle* connection,
bool is_secure,
int certificate_error_code) {
base::StatsCounter spdy_sessions("spdy.sessions");
spdy_sessions.Increment();
state_ = CONNECTED;
connection_.reset(connection);
is_secure_ = is_secure;
certificate_error_code_ = certificate_error_code;
NextProto protocol = default_protocol_;
NextProto protocol_negotiated = connection->socket()->GetNegotiatedProtocol();
if (protocol_negotiated != kProtoUnknown) {
protocol = protocol_negotiated;
}
SSLClientSocket* ssl_socket = GetSSLClientSocket();
if (ssl_socket && ssl_socket->WasChannelIDSent()) {
// According to the SPDY spec, the credential associated with the TLS
// connection is stored in slot[1].
credential_state_.SetHasCredential(GURL("https://" +
host_port_pair().ToString()));
}
DCHECK(protocol >= kProtoSPDY2);
DCHECK(protocol <= kProtoSPDY3);
int version = (protocol == kProtoSPDY3) ? 3 : 2;
flow_control_ = (protocol >= kProtoSPDY3);
buffered_spdy_framer_.reset(new BufferedSpdyFramer(version,
enable_compression_));
buffered_spdy_framer_->set_visitor(this);
SendInitialSettings();
UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol, kProtoMaximumVersion);
// Write out any data that we might have to send, such as the settings frame.
WriteSocketLater();
net::Error error = ReadSocket();
if (error == ERR_IO_PENDING)
return OK;
return error;
}
bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
if (!verify_domain_authentication_)
return true;
if (state_ != CONNECTED)
return false;
SSLInfo ssl_info;
bool was_npn_negotiated;
NextProto protocol_negotiated = kProtoUnknown;
if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
return true; // This is not a secure session, so all domains are okay.
return !ssl_info.client_cert_sent &&
(enable_credential_frames_ || !ssl_info.channel_id_sent ||
ServerBoundCertService::GetDomainForHost(domain) ==
ServerBoundCertService::GetDomainForHost(
host_port_proxy_pair_.first.host())) &&
ssl_info.cert->VerifyNameMatch(domain);
}
void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream,
SpdyIOBufferProducer* producer) {
write_queue_.push(producer);
stream_producers_[producer] = stream;
WriteSocketLater();
}
int SpdySession::GetPushStream(
const GURL& url,
scoped_refptr<SpdyStream>* stream,
const BoundNetLog& stream_net_log) {
CHECK_NE(state_, CLOSED);
*stream = NULL;
// Don't allow access to secure push streams over an unauthenticated, but
// encrypted SSL socket.
if (is_secure_ && certificate_error_code_ != OK &&
(url.SchemeIs("https") || url.SchemeIs("wss"))) {
RecordProtocolErrorHistogram(
PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
CloseSessionOnError(
static_cast<net::Error>(certificate_error_code_),
true,
"Tried to get SPDY stream for secure content over an unauthenticated "
"session.");
return ERR_SPDY_PROTOCOL_ERROR;
}
*stream = GetActivePushStream(url.spec());
if (stream->get()) {
DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
streams_pushed_and_claimed_count_++;
return OK;
}
return 0;
}
int SpdySession::CreateStream(
const GURL& url,
RequestPriority priority,
scoped_refptr<SpdyStream>* spdy_stream,
const BoundNetLog& stream_net_log,
const CompletionCallback& callback) {
if (!max_concurrent_streams_ ||
(active_streams_.size() + created_streams_.size() <
max_concurrent_streams_)) {
return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
}
stalled_streams_++;
net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
pending_create_stream_queues_[priority].push(
PendingCreateStream(url, priority, spdy_stream,
stream_net_log, callback));
return ERR_IO_PENDING;
}
void SpdySession::ProcessPendingCreateStreams() {
while (!max_concurrent_streams_ ||
(active_streams_.size() + created_streams_.size() <
max_concurrent_streams_)) {
bool no_pending_create_streams = true;
for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) {
if (!pending_create_stream_queues_[i].empty()) {
PendingCreateStream pending_create =
pending_create_stream_queues_[i].front();
pending_create_stream_queues_[i].pop();
no_pending_create_streams = false;
int error = CreateStreamImpl(*pending_create.url,
pending_create.priority,
pending_create.spdy_stream,
*pending_create.stream_net_log);
scoped_refptr<SpdyStream>* stream = pending_create.spdy_stream;
DCHECK(!ContainsKey(pending_callback_map_, stream));
pending_callback_map_.insert(std::make_pair(stream,
CallbackResultPair(pending_create.callback, error)));
MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&SpdySession::InvokeUserStreamCreationCallback,
weak_factory_.GetWeakPtr(), stream));
break;
}
}
if (no_pending_create_streams)
return; // there were no streams in any queue
}
}
void SpdySession::CancelPendingCreateStreams(
const scoped_refptr<SpdyStream>* spdy_stream) {
PendingCallbackMap::iterator it = pending_callback_map_.find(spdy_stream);
if (it != pending_callback_map_.end()) {
pending_callback_map_.erase(it);
return;
}
for (int i = 0; i < NUM_PRIORITIES; ++i) {
PendingCreateStreamQueue tmp;
// Make a copy removing this trans
while (!pending_create_stream_queues_[i].empty()) {
PendingCreateStream pending_create =
pending_create_stream_queues_[i].front();
pending_create_stream_queues_[i].pop();
if (pending_create.spdy_stream != spdy_stream)
tmp.push(pending_create);
}
// Now copy it back
while (!tmp.empty()) {
pending_create_stream_queues_[i].push(tmp.front());
tmp.pop();
}
}
}
int SpdySession::CreateStreamImpl(
const GURL& url,
RequestPriority priority,
scoped_refptr<SpdyStream>* spdy_stream,
const BoundNetLog& stream_net_log) {
DCHECK_GE(priority, MINIMUM_PRIORITY);
DCHECK_LT(priority, NUM_PRIORITIES);
// Make sure that we don't try to send https/wss over an unauthenticated, but
// encrypted SSL socket.
if (is_secure_ && certificate_error_code_ != OK &&
(url.SchemeIs("https") || url.SchemeIs("wss"))) {
RecordProtocolErrorHistogram(
PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
CloseSessionOnError(
static_cast<net::Error>(certificate_error_code_),
true,
"Tried to create SPDY stream for secure content over an "
"unauthenticated session.");
return ERR_SPDY_PROTOCOL_ERROR;
}
const std::string& path = url.PathForRequest();
*spdy_stream = new SpdyStream(this,
false,
stream_net_log);
const scoped_refptr<SpdyStream>& stream = *spdy_stream;
stream->set_priority(priority);
stream->set_path(path);
stream->set_send_window_size(initial_send_window_size_);
stream->set_recv_window_size(initial_recv_window_size_);
created_streams_.insert(stream);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
static_cast<int>(priority), 0, 10, 11);
// TODO(mbelshe): Optimize memory allocations
return OK;
}
bool SpdySession::NeedsCredentials() const {
if (!is_secure_)
return false;
SSLClientSocket* ssl_socket = GetSSLClientSocket();
if (ssl_socket->GetNegotiatedProtocol() < kProtoSPDY3)
return false;
return ssl_socket->WasChannelIDSent();
}
void SpdySession::AddPooledAlias(const HostPortProxyPair& alias) {
pooled_aliases_.insert(alias);
}
int SpdySession::GetProtocolVersion() const {
DCHECK(buffered_spdy_framer_.get());
return buffered_spdy_framer_->protocol_version();
}
SpdySynStreamControlFrame* SpdySession::CreateSynStream(
SpdyStreamId stream_id,
RequestPriority priority,
uint8 credential_slot,
SpdyControlFlags flags,
const SpdyHeaderBlock& headers) {
CHECK(IsStreamActive(stream_id));
const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
SendPrefacePingIfNoneInFlight();
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdySynStreamControlFrame> syn_frame(
buffered_spdy_framer_->CreateSynStream(
stream_id, 0,
ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()),
credential_slot, flags, true, &headers));
base::StatsCounter spdy_requests("spdy.requests");
spdy_requests.Increment();
streams_initiated_count_++;
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
base::Bind(&NetLogSpdySynCallback, &headers,
(flags & CONTROL_FLAG_FIN) != 0,
(flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
stream_id, 0));
}
return syn_frame.release();
}
SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame(
const std::string& origin,
SSLClientCertType type,
const std::string& key,
const std::string& cert,
RequestPriority priority) {
DCHECK(is_secure_);
SSLClientSocket* ssl_socket = GetSSLClientSocket();
DCHECK(ssl_socket);
DCHECK(ssl_socket->WasChannelIDSent());
SpdyCredential credential;
std::string tls_unique;
ssl_socket->GetTLSUniqueChannelBinding(&tls_unique);
size_t slot = credential_state_.SetHasCredential(GURL(origin));
int rv = SpdyCredentialBuilder::Build(tls_unique, type, key, cert, slot,
&credential);
DCHECK_EQ(OK, rv);
if (rv != OK)
return NULL;
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyCredentialControlFrame> credential_frame(
buffered_spdy_framer_->CreateCredentialFrame(credential));
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin));
}
return credential_frame.release();
}
SpdyHeadersControlFrame* SpdySession::CreateHeadersFrame(
SpdyStreamId stream_id,
const SpdyHeaderBlock& headers,
SpdyControlFlags flags) {
// Find our stream
CHECK(IsStreamActive(stream_id));
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
// Create a HEADER frame.
scoped_ptr<SpdyHeadersControlFrame> frame(
buffered_spdy_framer_->CreateHeaders(stream_id, flags, true, &headers));
if (net_log().IsLoggingAllEvents()) {
bool fin = flags & CONTROL_FLAG_FIN;
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SEND_HEADERS,
base::Bind(&NetLogSpdySynCallback,
&headers, fin, /*unidirectional=*/false,
stream_id, 0));
}
return frame.release();
}
SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
net::IOBuffer* data, int len,
SpdyDataFlags flags) {
// Find our stream
CHECK(IsStreamActive(stream_id));
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
if (len > kMaxSpdyFrameChunkSize) {
len = kMaxSpdyFrameChunkSize;
flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
}
// Obey send window size of the stream if flow control is enabled.
if (flow_control_) {
if (stream->send_window_size() <= 0) {
// Because we queue frames onto the session, it is possible that
// a stream was not flow controlled at the time it attempted the
// write, but when we go to fulfill the write, it is now flow
// controlled. This is why we need the session to mark the stream
// as stalled - because only the session knows for sure when the
// stall occurs.
stream->set_stalled_by_flow_control(true);
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
NetLog::IntegerCallback("stream_id", stream_id));
return NULL;
}
int new_len = std::min(len, stream->send_window_size());
if (new_len < len) {
len = new_len;
flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
}
stream->DecreaseSendWindowSize(len);
}
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SEND_DATA,
base::Bind(&NetLogSpdyDataCallback, stream_id, len, flags));
}
// Send PrefacePing for DATA_FRAMEs with nonzero payload size.
if (len > 0)
SendPrefacePingIfNoneInFlight();
// TODO(mbelshe): reduce memory copies here.
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyDataFrame> frame(
buffered_spdy_framer_->CreateDataFrame(
stream_id, data->data(), len, flags));
return frame.release();
}
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
DCHECK_NE(0u, stream_id);
// TODO(mbelshe): We should send a RST_STREAM control frame here
// so that the server can cancel a large send.
DeleteStream(stream_id, status);
}
void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) {
DCHECK_EQ(0u, stream->stream_id());
created_streams_.erase(scoped_refptr<SpdyStream>(stream));
ProcessPendingCreateStreams();
}
void SpdySession::ResetStream(SpdyStreamId stream_id,
SpdyStatusCodes status,
const std::string& description) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyRstStreamControlFrame> rst_frame(
buffered_spdy_framer_->CreateRstStream(stream_id, status));
// Default to lowest priority unless we know otherwise.
RequestPriority priority = net::IDLE;
if(IsStreamActive(stream_id)) {
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
priority = stream->priority();
}
QueueFrame(rst_frame.release(), priority);
RecordProtocolErrorHistogram(
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
}
bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
return ContainsKey(active_streams_, stream_id);
}
LoadState SpdySession::GetLoadState() const {
// NOTE: The application only queries the LoadState via the
// SpdyNetworkTransaction, and details are only needed when
// we're in the process of connecting.
// If we're connecting, defer to the connection to give us the actual
// LoadState.
if (state_ == CONNECTING)
return connection_->GetLoadState();
// Just report that we're idle since the session could be doing
// many things concurrently.
return LOAD_STATE_IDLE;
}
void SpdySession::OnReadComplete(int bytes_read) {
// Parse a frame. For now this code requires that the frame fit into our
// buffer (32KB).
// TODO(mbelshe): support arbitrarily large frames!
read_pending_ = false;
if (bytes_read <= 0) {
// Session is tearing down.
net::Error error = static_cast<net::Error>(bytes_read);
if (bytes_read == 0)
error = ERR_CONNECTION_CLOSED;
CloseSessionOnError(error, true, "bytes_read is <= 0.");
return;
}
bytes_received_ += bytes_read;
last_activity_time_ = base::TimeTicks::Now();
// The SpdyFramer will use callbacks onto |this| as it parses frames.
// When errors occur, those callbacks can lead to teardown of all references
// to |this|, so maintain a reference to self during this call for safe
// cleanup.
scoped_refptr<SpdySession> self(this);
DCHECK(buffered_spdy_framer_.get());
char *data = read_buffer_->data();
while (bytes_read &&
buffered_spdy_framer_->error_code() ==
SpdyFramer::SPDY_NO_ERROR) {
uint32 bytes_processed =
buffered_spdy_framer_->ProcessInput(data, bytes_read);
bytes_read -= bytes_processed;
data += bytes_processed;
if (buffered_spdy_framer_->state() == SpdyFramer::SPDY_DONE)
buffered_spdy_framer_->Reset();
}
if (state_ != CLOSED)
ReadSocket();
}
void SpdySession::OnWriteComplete(int result) {
DCHECK(write_pending_);
DCHECK(in_flight_write_.size());
last_activity_time_ = base::TimeTicks::Now();
write_pending_ = false;
scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
if (result >= 0) {
// It should not be possible to have written more bytes than our
// in_flight_write_.
DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
in_flight_write_.buffer()->DidConsume(result);
// We only notify the stream when we've fully written the pending frame.
if (!in_flight_write_.buffer()->BytesRemaining()) {
if (stream) {
// Report the number of bytes written to the caller, but exclude the
// frame size overhead. NOTE: if this frame was compressed the
// reported bytes written is the compressed size, not the original
// size.
if (result > 0) {
result = in_flight_write_.buffer()->size();
DCHECK_GE(result, static_cast<int>(SpdyFrame::kHeaderSize));
result -= static_cast<int>(SpdyFrame::kHeaderSize);
}
// It is possible that the stream was cancelled while we were writing
// to the socket.
if (!stream->cancelled())
stream->OnWriteComplete(result);
}
// Cleanup the write which just completed.
in_flight_write_.release();
}
// Write more data. We're already in a continuation, so we can
// go ahead and write it immediately (without going back to the
// message loop).
WriteSocketLater();
} else {
in_flight_write_.release();
// The stream is now errored. Close it down.
CloseSessionOnError(
static_cast<net::Error>(result), true, "The stream has errored.");
}
}
net::Error SpdySession::ReadSocket() {
if (read_pending_)
return OK;
if (state_ == CLOSED) {
NOTREACHED();
return ERR_UNEXPECTED;
}
CHECK(connection_.get());
CHECK(connection_->socket());
int bytes_read = connection_->socket()->Read(
read_buffer_.get(),
kReadBufferSize,
base::Bind(&SpdySession::OnReadComplete, base::Unretained(this)));
switch (bytes_read) {
case 0:
// Socket is closed!
CloseSessionOnError(ERR_CONNECTION_CLOSED, true, "bytes_read is 0.");
return ERR_CONNECTION_CLOSED;
case net::ERR_IO_PENDING:
// Waiting for data. Nothing to do now.
read_pending_ = true;
return ERR_IO_PENDING;
default:
// Data was read, process it.
// Schedule the work through the message loop to avoid recursive
// callbacks.
read_pending_ = true;
MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&SpdySession::OnReadComplete,
weak_factory_.GetWeakPtr(), bytes_read));
break;
}
return OK;
}
void SpdySession::WriteSocketLater() {
if (delayed_write_pending_)
return;
if (state_ < CONNECTED)
return;
delayed_write_pending_ = true;
MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&SpdySession::WriteSocket, weak_factory_.GetWeakPtr()));
}
void SpdySession::WriteSocket() {
// This function should only be called via WriteSocketLater.
DCHECK(delayed_write_pending_);
delayed_write_pending_ = false;
// If the socket isn't connected yet, just wait; we'll get called
// again when the socket connection completes. If the socket is
// closed, just return.
if (state_ < CONNECTED || state_ == CLOSED)
return;
if (write_pending_) // Another write is in progress still.
return;
// Loop sending frames until we've sent everything or until the write
// returns error (or ERR_IO_PENDING).
DCHECK(buffered_spdy_framer_.get());
while (in_flight_write_.buffer() || !write_queue_.empty()) {
if (!in_flight_write_.buffer()) {
// Grab the next SpdyBuffer to send.
scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
write_queue_.pop();
scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this));
stream_producers_.erase(producer.get());
// It is possible that a stream had data to write, but a
// WINDOW_UPDATE frame has been received which made that
// stream no longer writable.
// TODO(rch): consider handling that case by removing the
// stream from the writable queue?
if (buffer == NULL)
continue;
in_flight_write_ = *buffer;
} else {
DCHECK(in_flight_write_.buffer()->BytesRemaining());
}
write_pending_ = true;
int rv = connection_->socket()->Write(
in_flight_write_.buffer(),
in_flight_write_.buffer()->BytesRemaining(),
base::Bind(&SpdySession::OnWriteComplete, base::Unretained(this)));
if (rv == net::ERR_IO_PENDING)
break;
// We sent the frame successfully.
OnWriteComplete(rv);
// TODO(mbelshe): Test this error case. Maybe we should mark the socket
// as in an error state.
if (rv < 0)
break;
}
}
void SpdySession::CloseAllStreams(net::Error status) {
base::StatsCounter abandoned_streams("spdy.abandoned_streams");
base::StatsCounter abandoned_push_streams(
"spdy.abandoned_push_streams");
if (!active_streams_.empty())
abandoned_streams.Add(active_streams_.size());
if (!unclaimed_pushed_streams_.empty()) {
streams_abandoned_count_ += unclaimed_pushed_streams_.size();
abandoned_push_streams.Add(unclaimed_pushed_streams_.size());
unclaimed_pushed_streams_.clear();
}
for (int i = 0; i < NUM_PRIORITIES; ++i) {
while (!pending_create_stream_queues_[i].empty()) {
PendingCreateStream pending_create =
pending_create_stream_queues_[i].front();
pending_create_stream_queues_[i].pop();
pending_create.callback.Run(ERR_ABORTED);
}
}
while (!active_streams_.empty()) {
ActiveStreamMap::iterator it = active_streams_.begin();
const scoped_refptr<SpdyStream>& stream = it->second;
LogAbandonedStream(stream, status);
DeleteStream(stream->stream_id(), status);
}
while (!created_streams_.empty()) {
CreatedStreamSet::iterator it = created_streams_.begin();
const scoped_refptr<SpdyStream> stream = *it;
created_streams_.erase(it);
LogAbandonedStream(stream, status);
stream->OnClose(status);
}
// We also need to drain the queue.
while (!write_queue_.empty()) {
scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
write_queue_.pop();
stream_producers_.erase(producer.get());
}
}
void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
net::Error status) {
DCHECK(stream);
std::string description = base::StringPrintf(
"ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
stream->LogStreamError(status, description);
}
int SpdySession::GetNewStreamId() {
int id = stream_hi_water_mark_;
stream_hi_water_mark_ += 2;
if (stream_hi_water_mark_ > 0x7fff)
stream_hi_water_mark_ = 1;
return id;
}
void SpdySession::CloseSessionOnError(net::Error err,
bool remove_from_pool,
const std::string& description) {
// Closing all streams can have a side-effect of dropping the last reference
// to |this|. Hold a reference through this function.
scoped_refptr<SpdySession> self(this);
DCHECK_LT(err, OK);
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_CLOSE,
base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
// Don't close twice. This can occur because we can have both
// a read and a write outstanding, and each can complete with
// an error.
if (state_ != CLOSED) {
state_ = CLOSED;
error_ = err;
if (remove_from_pool)
RemoveFromPool();
CloseAllStreams(err);
}
}
Value* SpdySession::GetInfoAsValue() const {
DictionaryValue* dict = new DictionaryValue();
dict->SetInteger("source_id", net_log_.source().id);
dict->SetString("host_port_pair", host_port_proxy_pair_.first.ToString());
if (!pooled_aliases_.empty()) {
ListValue* alias_list = new ListValue();
for (std::set<HostPortProxyPair>::const_iterator it =
pooled_aliases_.begin();
it != pooled_aliases_.end(); it++) {
alias_list->Append(Value::CreateStringValue(it->first.ToString()));
}
dict->Set("aliases", alias_list);
}
dict->SetString("proxy", host_port_proxy_pair_.second.ToURI());
dict->SetInteger("active_streams", active_streams_.size());
dict->SetInteger("unclaimed_pushed_streams",
unclaimed_pushed_streams_.size());
dict->SetBoolean("is_secure", is_secure_);
dict->SetString("protocol_negotiated",
SSLClientSocket::NextProtoToString(
connection_->socket()->GetNegotiatedProtocol()));
dict->SetInteger("error", error_);
dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
dict->SetInteger("streams_initiated_count", streams_initiated_count_);
dict->SetInteger("streams_pushed_count", streams_pushed_count_);
dict->SetInteger("streams_pushed_and_claimed_count",
streams_pushed_and_claimed_count_);
dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
DCHECK(buffered_spdy_framer_.get());
dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
dict->SetBoolean("sent_settings", sent_settings_);
dict->SetBoolean("received_settings", received_settings_);
return dict;
}
bool SpdySession::IsReused() const {
return buffered_spdy_framer_->frames_received() > 0;
}
int SpdySession::GetPeerAddress(IPEndPoint* address) const {
if (!connection_->socket())
return ERR_SOCKET_NOT_CONNECTED;
return connection_->socket()->GetPeerAddress(address);
}
int SpdySession::GetLocalAddress(IPEndPoint* address) const {
if (!connection_->socket())
return ERR_SOCKET_NOT_CONNECTED;
return connection_->socket()->GetLocalAddress(address);
}
class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer {
public:
SimpleSpdyIOBufferProducer(SpdyFrame* frame,
RequestPriority priority)
: frame_(frame),
priority_(priority) {
}
virtual RequestPriority GetPriority() const override {
return priority_;
}
virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) override {
return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
frame_.get(), priority_, NULL);
}
private:
scoped_ptr<SpdyFrame> frame_;
RequestPriority priority_;
};
void SpdySession::QueueFrame(SpdyFrame* frame,
RequestPriority priority) {
SimpleSpdyIOBufferProducer* producer =
new SimpleSpdyIOBufferProducer(frame, priority);
write_queue_.push(producer);
WriteSocketLater();
}
void SpdySession::ActivateStream(SpdyStream* stream) {
if (stream->stream_id() == 0) {
stream->set_stream_id(GetNewStreamId());
created_streams_.erase(scoped_refptr<SpdyStream>(stream));
}
const SpdyStreamId id = stream->stream_id();
DCHECK(!IsStreamActive(id));
active_streams_[id] = stream;
}
void SpdySession::DeleteStream(SpdyStreamId id, int status) {
// For push streams, if they are being deleted normally, we leave
// the stream in the unclaimed_pushed_streams_ list. However, if
// the stream is errored out, clean it up entirely.
if (status != OK) {
PushedStreamMap::iterator it;
for (it = unclaimed_pushed_streams_.begin();
it != unclaimed_pushed_streams_.end(); ++it) {
scoped_refptr<SpdyStream> curr = it->second.first;
if (id == curr->stream_id()) {
unclaimed_pushed_streams_.erase(it);
break;
}
}
}
// The stream might have been deleted.
ActiveStreamMap::iterator it2 = active_streams_.find(id);
if (it2 == active_streams_.end())
return;
// Possibly remove from the write queue.
WriteQueue old = write_queue_;
write_queue_ = WriteQueue();
while (!old.empty()) {
scoped_ptr<SpdyIOBufferProducer> producer(old.top());
old.pop();
StreamProducerMap::iterator it = stream_producers_.find(producer.get());
if (it == stream_producers_.end() || it->second->stream_id() != id) {
write_queue_.push(producer.release());
} else {
stream_producers_.erase(producer.get());
producer.reset(NULL);
}
}
// If this is an active stream, call the callback.
const scoped_refptr<SpdyStream> stream(it2->second);
active_streams_.erase(it2);
if (stream)
stream->OnClose(status);
ProcessPendingCreateStreams();
}
void SpdySession::RemoveFromPool() {
if (spdy_session_pool_) {
SpdySessionPool* pool = spdy_session_pool_;
spdy_session_pool_ = NULL;
pool->Remove(make_scoped_refptr(this));
}
}
scoped_refptr<SpdyStream> SpdySession::GetActivePushStream(
const std::string& path) {
base::StatsCounter used_push_streams("spdy.claimed_push_streams");
PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(path);
if (it != unclaimed_pushed_streams_.end()) {
net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
scoped_refptr<SpdyStream> stream = it->second.first;
unclaimed_pushed_streams_.erase(it);
used_push_streams.Increment();
return stream;
}
return NULL;
}
bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
bool* was_npn_negotiated,
NextProto* protocol_negotiated) {
*was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
*protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
return connection_->socket()->GetSSLInfo(ssl_info);
}
bool SpdySession::GetSSLCertRequestInfo(
SSLCertRequestInfo* cert_request_info) {
if (!is_secure_)
return false;
GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
return true;
}
ServerBoundCertService* SpdySession::GetServerBoundCertService() const {
if (!is_secure_)
return NULL;
return GetSSLClientSocket()->GetServerBoundCertService();
}
void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
RecordProtocolErrorHistogram(
static_cast<SpdyProtocolErrorDetails>(error_code));
std::string description = base::StringPrintf(
"SPDY_ERROR error_code: %d.", error_code);
CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true, description);
}
void SpdySession::OnStreamError(SpdyStreamId stream_id,
const std::string& description) {
if (IsStreamActive(stream_id))
ResetStream(stream_id, PROTOCOL_ERROR, description);
}
void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
const char* data,
size_t len,
SpdyDataFlags flags) {
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_RECV_DATA,
base::Bind(&NetLogSpdyDataCallback, stream_id, len, flags));
}
if (!IsStreamActive(stream_id)) {
// NOTE: it may just be that the stream was cancelled.
return;
}
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
stream->OnDataReceived(data, len);
}
void SpdySession::OnSetting(SpdySettingsIds id,
uint8 flags,
uint32 value) {
HandleSetting(id, value);
http_server_properties_->SetSpdySetting(
host_port_pair(),
id,
static_cast<SpdySettingsFlags>(flags),
value);
received_settings_ = true;
// Log the setting.
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
base::Bind(&NetLogSpdySettingCallback,
id, static_cast<SpdySettingsFlags>(flags), value));
}
void SpdySession::OnControlFrameCompressed(
const SpdyControlFrame& uncompressed_frame,
const SpdyControlFrame& compressed_frame) {
if (uncompressed_frame.type() == SYN_STREAM) {
int uncompressed_size = uncompressed_frame.length();
int compressed_size = compressed_frame.length();
// Make sure we avoid early decimal truncation.
int compression_pct = 100 - (100 * compressed_size) / uncompressed_size;
UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
compression_pct);
}
}
bool SpdySession::Respond(const SpdyHeaderBlock& headers,
const scoped_refptr<SpdyStream> stream) {
int rv = OK;
rv = stream->OnResponseReceived(headers);
if (rv < 0) {
DCHECK_NE(rv, ERR_IO_PENDING);
const SpdyStreamId stream_id = stream->stream_id();
DeleteStream(stream_id, rv);
return false;
}
return true;
}
void SpdySession::OnSynStream(SpdyStreamId stream_id,
SpdyStreamId associated_stream_id,
SpdyPriority priority,
uint8 credential_slot,
bool fin,
bool unidirectional,
const SpdyHeaderBlock& headers) {
if (net_log_.IsLoggingAllEvents()) {
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
base::Bind(&NetLogSpdySynCallback,
&headers, fin, unidirectional,
stream_id, associated_stream_id));
}
// Server-initiated streams should have even sequence numbers.
if ((stream_id & 0x1) != 0) {
LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
return;
}
if (IsStreamActive(stream_id)) {
LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
return;
}
if (associated_stream_id == 0) {
std::string description = base::StringPrintf(
"Received invalid OnSyn associated stream id %d for stream %d",
associated_stream_id, stream_id);
ResetStream(stream_id, REFUSED_STREAM, description);
return;
}
streams_pushed_count_++;
// TODO(mbelshe): DCHECK that this is a GET method?
// Verify that the response had a URL for us.
GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
if (!gurl.is_valid()) {
ResetStream(stream_id, PROTOCOL_ERROR,
"Pushed stream url was invalid: " + gurl.spec());
return;
}
const std::string& url = gurl.spec();
// Verify we have a valid stream association.
if (!IsStreamActive(associated_stream_id)) {
ResetStream(stream_id, INVALID_STREAM,
base::StringPrintf(
"Received OnSyn with inactive associated stream %d",
associated_stream_id));
return;
}
// Check that the SYN advertises the same origin as its associated stream.
// Bypass this check if and only if this session is with a SPDY proxy that
// is trusted explicitly via the --trusted-spdy-proxy switch.
if (trusted_spdy_proxy_.Equals(host_port_pair())) {
// Disallow pushing of HTTPS content.
if (gurl.SchemeIs("https")) {
ResetStream(stream_id, REFUSED_STREAM,
base::StringPrintf(
"Rejected push of Cross Origin HTTPS content %d",
associated_stream_id));
}
} else {
scoped_refptr<SpdyStream> associated_stream =
active_streams_[associated_stream_id];
GURL associated_url(associated_stream->GetUrl());
if (associated_url.GetOrigin() != gurl.GetOrigin()) {
ResetStream(stream_id, REFUSED_STREAM,
base::StringPrintf(
"Rejected Cross Origin Push Stream %d",
associated_stream_id));
return;
}
}
// There should not be an existing pushed stream with the same path.
PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url);
if (it != unclaimed_pushed_streams_.end()) {
ResetStream(stream_id, PROTOCOL_ERROR,
"Received duplicate pushed stream with url: " + url);
return;
}
scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_));
stream->set_stream_id(stream_id);
stream->set_path(gurl.PathForRequest());
stream->set_send_window_size(initial_send_window_size_);
stream->set_recv_window_size(initial_recv_window_size_);
DeleteExpiredPushedStreams();
unclaimed_pushed_streams_[url] =
std::pair<scoped_refptr<SpdyStream>, base::TimeTicks> (
stream, time_func_());
ActivateStream(stream);
stream->set_response_received();
// Parse the headers.
if (!Respond(headers, stream))
return;
base::StatsCounter push_requests("spdy.pushed_streams");
push_requests.Increment();
}
void SpdySession::DeleteExpiredPushedStreams() {
if (unclaimed_pushed_streams_.empty())
return;
// Check that adequate time has elapsed since the last sweep.
if (time_func_() < next_unclaimed_push_stream_sweep_time_)
return;
// Delete old streams.
base::TimeTicks minimum_freshness = time_func_() -
base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
PushedStreamMap::iterator it;
for (it = unclaimed_pushed_streams_.begin();
it != unclaimed_pushed_streams_.end(); ) {
const scoped_refptr<SpdyStream>& stream = it->second.first;
base::TimeTicks creation_time = it->second.second;
// DeleteStream() will invalidate the current iterator, so move to next.
++it;
if (minimum_freshness > creation_time) {
DeleteStream(stream->stream_id(), ERR_INVALID_SPDY_STREAM);
base::StatsCounter abandoned_push_streams(
"spdy.abandoned_push_streams");
base::StatsCounter abandoned_streams("spdy.abandoned_streams");
abandoned_push_streams.Increment();
abandoned_streams.Increment();
streams_abandoned_count_++;
}
}
next_unclaimed_push_stream_sweep_time_ = time_func_() +
base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
}
void SpdySession::OnSynReply(SpdyStreamId stream_id,
bool fin,
const SpdyHeaderBlock& headers) {
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
base::Bind(&NetLogSpdySynCallback,
&headers, fin, false,// not unidirectional
stream_id, 0));
}
if (!IsStreamActive(stream_id)) {
// NOTE: it may just be that the stream was cancelled.
LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
return;
}
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
CHECK(!stream->cancelled());
if (stream->response_received()) {
stream->LogStreamError(ERR_SYN_REPLY_NOT_RECEIVED,
"Received duplicate SYN_REPLY for stream.");
RecordProtocolErrorHistogram(PROTOCOL_ERROR_SYN_REPLY_NOT_RECEIVED);
CloseStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR);
return;
}
stream->set_response_received();
Respond(headers, stream);
}
void SpdySession::OnHeaders(SpdyStreamId stream_id,
bool fin,
const SpdyHeaderBlock& headers) {
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
base::Bind(&NetLogSpdySynCallback,
&headers, fin, /*unidirectional=*/false,
stream_id, 0));
}
if (!IsStreamActive(stream_id)) {
// NOTE: it may just be that the stream was cancelled.
LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
return;
}
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
CHECK(!stream->cancelled());
int rv = stream->OnHeaders(headers);
if (rv < 0) {
DCHECK_NE(rv, ERR_IO_PENDING);
const SpdyStreamId stream_id = stream->stream_id();
DeleteStream(stream_id, rv);
}
}
void SpdySession::OnRstStream(SpdyStreamId stream_id, SpdyStatusCodes status) {
std::string description;
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_RST_STREAM,
base::Bind(&NetLogSpdyRstCallback,
stream_id, status, &description));
if (!IsStreamActive(stream_id)) {
// NOTE: it may just be that the stream was cancelled.
LOG(WARNING) << "Received RST for invalid stream" << stream_id;
return;
}
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
CHECK(!stream->cancelled());
if (status == 0) {
stream->OnDataReceived(NULL, 0);
} else if (status == REFUSED_STREAM) {
DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM);
} else {
RecordProtocolErrorHistogram(
PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
base::StringPrintf("SPDY stream closed: %d",
status));
// TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
// For now, it doesn't matter much - it is a protocol error.
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
}
}
void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
SpdyGoAwayStatus status) {
net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
base::Bind(&NetLogSpdyGoAwayCallback,
last_accepted_stream_id,
active_streams_.size(),
unclaimed_pushed_streams_.size(),
status));
RemoveFromPool();
CloseAllStreams(net::ERR_ABORTED);
// TODO(willchan): Cancel any streams that are past the GoAway frame's
// |last_accepted_stream_id|.
// Don't bother killing any streams that are still reading. They'll either
// complete successfully or get an ERR_CONNECTION_CLOSED when the socket is
// closed.
}
void SpdySession::OnPing(uint32 unique_id) {
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_PING,
base::Bind(&NetLogSpdyPingCallback, unique_id, "received"));
// Send response to a PING from server.
if (unique_id % 2 == 0) {
WritePingFrame(unique_id);
return;
}
--pings_in_flight_;
if (pings_in_flight_ < 0) {
RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
CloseSessionOnError(
net::ERR_SPDY_PROTOCOL_ERROR, true, "pings_in_flight_ is < 0.");
pings_in_flight_ = 0;
return;
}
if (pings_in_flight_ > 0)
return;
// We will record RTT in histogram when there are no more client sent
// pings_in_flight_.
RecordPingRTTHistogram(base::TimeTicks::Now() - last_ping_sent_time_);
}
void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
int delta_window_size) {
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE,
base::Bind(&NetLogSpdyWindowUpdateCallback,
stream_id, delta_window_size));
if (!IsStreamActive(stream_id)) {
LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
return;
}
if (delta_window_size < 1) {
ResetStream(stream_id, FLOW_CONTROL_ERROR,
base::StringPrintf(
"Received WINDOW_UPDATE with an invalid "
"delta_window_size %d", delta_window_size));
return;
}
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
CHECK(!stream->cancelled());
if (flow_control_)
stream->IncreaseSendWindowSize(delta_window_size);
}
void SpdySession::SendWindowUpdate(SpdyStreamId stream_id,
int32 delta_window_size) {
CHECK(IsStreamActive(stream_id));
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE,
base::Bind(&NetLogSpdyWindowUpdateCallback,
stream_id, delta_window_size));
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame(
buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
QueueFrame(window_update_frame.release(), stream->priority());
}
// Given a cwnd that we would have sent to the server, modify it based on the
// field trial policy.
uint32 ApplyCwndFieldTrialPolicy(int cwnd) {
#if defined(COBALT)
// Cobalt doesn't support field trials.
return cwnd;
#else
base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd");
if (!trial) {
LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList";
return cwnd;
}
if (trial->group_name() == "cwnd10")
return 10;
else if (trial->group_name() == "cwnd16")
return 16;
else if (trial->group_name() == "cwndMin16")
return std::max(cwnd, 16);
else if (trial->group_name() == "cwndMin10")
return std::max(cwnd, 10);
else if (trial->group_name() == "cwndDynamic")
return cwnd;
NOTREACHED();
return cwnd;
#endif
}
void SpdySession::SendInitialSettings() {
// First notify the server about the settings they should use when
// communicating with us.
if (GetProtocolVersion() >= 2 && enable_sending_initial_settings_) {
SettingsMap settings_map;
// Create a new settings frame notifying the sever of our
// max_concurrent_streams_ and initial window size.
settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
if (GetProtocolVersion() > 2 &&
initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
SettingsFlagsAndValue(SETTINGS_FLAG_NONE, initial_recv_window_size_);
}
SendSettings(settings_map);
}
// Next notify the server about the settings they have previously
// told us to use when communicating with them.
const SettingsMap& settings_map =
http_server_properties_->GetSpdySettings(host_port_pair());
if (settings_map.empty())
return;
// Record Histogram Data and Apply the SpdyCwnd FieldTrial if applicable.
const SpdySettingsIds id = SETTINGS_CURRENT_CWND;
SettingsMap::const_iterator it = settings_map.find(id);
uint32 value = 0;
if (it != settings_map.end())
value = it->second.second;
uint32 cwnd = ApplyCwndFieldTrialPolicy(value);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
if (cwnd != value) {
http_server_properties_->SetSpdySetting(
host_port_pair(), id, SETTINGS_FLAG_PLEASE_PERSIST, cwnd);
}
const SettingsMap& settings_map_new =
http_server_properties_->GetSpdySettings(host_port_pair());
for (SettingsMap::const_iterator i = settings_map_new.begin(),
end = settings_map_new.end(); i != end; ++i) {
const SpdySettingsIds new_id = i->first;
const uint32 new_val = i->second.second;
HandleSetting(new_id, new_val);
}
SendSettings(settings_map_new);
}
void SpdySession::SendSettings(const SettingsMap& settings) {
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
base::Bind(&NetLogSpdySettingsCallback, &settings));
// Create the SETTINGS frame and send it.
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdySettingsControlFrame> settings_frame(
buffered_spdy_framer_->CreateSettings(settings));
sent_settings_ = true;
QueueFrame(settings_frame.release(), HIGHEST);
}
void SpdySession::HandleSetting(uint32 id, uint32 value) {
switch (id) {
case SETTINGS_MAX_CONCURRENT_STREAMS:
max_concurrent_streams_ = std::min(static_cast<size_t>(value),
kMaxConcurrentStreamLimit);
ProcessPendingCreateStreams();
break;
case SETTINGS_INITIAL_WINDOW_SIZE:
if (static_cast<int32>(value) < 0) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_NEGATIVE_INITIAL_WINDOW_SIZE,
NetLog::IntegerCallback("initial_window_size", value));
} else {
// SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
int32 delta_window_size = value - initial_send_window_size_;
initial_send_window_size_ = value;
UpdateStreamsSendWindowSize(delta_window_size);
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
NetLog::IntegerCallback("delta_window_size", delta_window_size));
}
break;
}
}
void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
ActiveStreamMap::iterator it;
for (it = active_streams_.begin(); it != active_streams_.end(); ++it) {
const scoped_refptr<SpdyStream>& stream = it->second;
DCHECK(stream);
stream->AdjustSendWindowSize(delta_window_size);
}
CreatedStreamSet::iterator i;
for (i = created_streams_.begin(); i != created_streams_.end(); i++) {
const scoped_refptr<SpdyStream>& stream = *i;
stream->AdjustSendWindowSize(delta_window_size);
}
}
void SpdySession::SendPrefacePingIfNoneInFlight() {
if (pings_in_flight_ || !enable_ping_based_connection_checking_)
return;
base::TimeTicks now = base::TimeTicks::Now();
// If there is no activity in the session, then send a preface-PING.
if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
SendPrefacePing();
}
void SpdySession::SendPrefacePing() {
WritePingFrame(next_ping_id_);
}
void SpdySession::WritePingFrame(uint32 unique_id) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyPingControlFrame> ping_frame(
buffered_spdy_framer_->CreatePingFrame(unique_id));
QueueFrame(ping_frame.release(), HIGHEST);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_PING,
base::Bind(&NetLogSpdyPingCallback, unique_id, "sent"));
}
if (unique_id % 2 != 0) {
next_ping_id_ += 2;
++pings_in_flight_;
PlanToCheckPingStatus();
last_ping_sent_time_ = base::TimeTicks::Now();
}
}
void SpdySession::PlanToCheckPingStatus() {
if (check_ping_status_pending_)
return;
check_ping_status_pending_ = true;
MessageLoop::current()->PostDelayedTask(
FROM_HERE,
base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
base::TimeTicks::Now()), hung_interval_);
}
void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
// Check if we got a response back for all PINGs we had sent.
if (pings_in_flight_ == 0) {
check_ping_status_pending_ = false;
return;
}
DCHECK(check_ping_status_pending_);
base::TimeTicks now = base::TimeTicks::Now();
base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
CloseSessionOnError(net::ERR_SPDY_PING_FAILED, true, "Failed ping.");
// Track all failed PING messages in a separate bucket.
const base::TimeDelta kFailedPing =
base::TimeDelta::FromInternalValue(INT_MAX);
RecordPingRTTHistogram(kFailedPing);
return;
}
// Check the status of connection after a delay.
MessageLoop::current()->PostDelayedTask(
FROM_HERE,
base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
now),
delay);
}
void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
}
void SpdySession::RecordProtocolErrorHistogram(
SpdyProtocolErrorDetails details) {
UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails", details,
NUM_SPDY_PROTOCOL_ERROR_DETAILS);
if (EndsWith(host_port_pair().host(), "google.com", false)) {
UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google", details,
NUM_SPDY_PROTOCOL_ERROR_DETAILS);
}
}
void SpdySession::RecordHistograms() {
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
streams_initiated_count_,
0, 300, 50);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
streams_pushed_count_,
0, 300, 50);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
streams_pushed_and_claimed_count_,
0, 300, 50);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
streams_abandoned_count_,
0, 300, 50);
UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
sent_settings_ ? 1 : 0, 2);
UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
received_settings_ ? 1 : 0, 2);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
stalled_streams_,
0, 300, 50);
UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
stalled_streams_ > 0 ? 1 : 0, 2);
if (received_settings_) {
// Enumerate the saved settings, and set histograms for it.
const SettingsMap& settings_map =
http_server_properties_->GetSpdySettings(host_port_pair());
SettingsMap::const_iterator it;
for (it = settings_map.begin(); it != settings_map.end(); ++it) {
const SpdySettingsIds id = it->first;
const uint32 val = it->second.second;
switch (id) {
case SETTINGS_CURRENT_CWND:
// Record several different histograms to see if cwnd converges
// for larger volumes of data being sent.
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
val, 1, 200, 100);
if (bytes_received_ > 10 * 1024) {
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
val, 1, 200, 100);
if (bytes_received_ > 25 * 1024) {
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
val, 1, 200, 100);
if (bytes_received_ > 50 * 1024) {
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
val, 1, 200, 100);
if (bytes_received_ > 100 * 1024) {
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
val, 1, 200, 100);
}
}
}
}
break;
case SETTINGS_ROUND_TRIP_TIME:
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
val, 1, 1200, 100);
break;
case SETTINGS_DOWNLOAD_RETRANS_RATE:
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
val, 1, 100, 50);
break;
default:
break;
}
}
}
}
void SpdySession::InvokeUserStreamCreationCallback(
scoped_refptr<SpdyStream>* stream) {
PendingCallbackMap::iterator it = pending_callback_map_.find(stream);
// Exit if the request has already been cancelled.
if (it == pending_callback_map_.end())
return;
CompletionCallback callback = it->second.callback;
int result = it->second.result;
pending_callback_map_.erase(it);
callback.Run(result);
}
SSLClientSocket* SpdySession::GetSSLClientSocket() const {
if (!is_secure_)
return NULL;
SSLClientSocket* ssl_socket =
reinterpret_cast<SSLClientSocket*>(connection_->socket());
DCHECK(ssl_socket);
return ssl_socket;
}
} // namespace net