| // Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "net/spdy/spdy_write_queue.h" |
| |
| #include <cstddef> |
| #include <utility> |
| #include <vector> |
| |
| #include "base/containers/circular_deque.h" |
| #include "base/logging.h" |
| #include "base/trace_event/memory_usage_estimator.h" |
| #include "net/spdy/spdy_buffer.h" |
| #include "net/spdy/spdy_buffer_producer.h" |
| #include "net/spdy/spdy_stream.h" |
| |
| namespace net { |
| |
| SpdyWriteQueue::PendingWrite::PendingWrite() = default; |
| |
| SpdyWriteQueue::PendingWrite::PendingWrite( |
| spdy::SpdyFrameType frame_type, |
| std::unique_ptr<SpdyBufferProducer> frame_producer, |
| const base::WeakPtr<SpdyStream>& stream, |
| const MutableNetworkTrafficAnnotationTag& traffic_annotation) |
| : frame_type(frame_type), |
| frame_producer(std::move(frame_producer)), |
| stream(stream), |
| traffic_annotation(traffic_annotation), |
| has_stream(stream.get() != nullptr) {} |
| |
| SpdyWriteQueue::PendingWrite::~PendingWrite() = default; |
| |
| SpdyWriteQueue::PendingWrite::PendingWrite(PendingWrite&& other) = default; |
| SpdyWriteQueue::PendingWrite& SpdyWriteQueue::PendingWrite::operator=( |
| PendingWrite&& other) = default; |
| |
| size_t SpdyWriteQueue::PendingWrite::EstimateMemoryUsage() const { |
| return base::trace_event::EstimateMemoryUsage(frame_producer); |
| } |
| |
| SpdyWriteQueue::SpdyWriteQueue() : removing_writes_(false) {} |
| |
| SpdyWriteQueue::~SpdyWriteQueue() { |
| Clear(); |
| } |
| |
| bool SpdyWriteQueue::IsEmpty() const { |
| for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; i++) { |
| if (!queue_[i].empty()) |
| return false; |
| } |
| return true; |
| } |
| |
| void SpdyWriteQueue::Enqueue( |
| RequestPriority priority, |
| spdy::SpdyFrameType frame_type, |
| std::unique_ptr<SpdyBufferProducer> frame_producer, |
| const base::WeakPtr<SpdyStream>& stream, |
| const NetworkTrafficAnnotationTag& traffic_annotation) { |
| CHECK(!removing_writes_); |
| CHECK_GE(priority, MINIMUM_PRIORITY); |
| CHECK_LE(priority, MAXIMUM_PRIORITY); |
| if (stream.get()) |
| DCHECK_EQ(stream->priority(), priority); |
| queue_[priority].push_back( |
| {frame_type, std::move(frame_producer), stream, |
| MutableNetworkTrafficAnnotationTag(traffic_annotation)}); |
| } |
| |
| bool SpdyWriteQueue::Dequeue( |
| spdy::SpdyFrameType* frame_type, |
| std::unique_ptr<SpdyBufferProducer>* frame_producer, |
| base::WeakPtr<SpdyStream>* stream, |
| MutableNetworkTrafficAnnotationTag* traffic_annotation) { |
| CHECK(!removing_writes_); |
| for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) { |
| if (!queue_[i].empty()) { |
| PendingWrite pending_write = std::move(queue_[i].front()); |
| queue_[i].pop_front(); |
| *frame_type = pending_write.frame_type; |
| *frame_producer = std::move(pending_write.frame_producer); |
| *stream = pending_write.stream; |
| *traffic_annotation = pending_write.traffic_annotation; |
| if (pending_write.has_stream) |
| DCHECK(stream->get()); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| void SpdyWriteQueue::RemovePendingWritesForStream(SpdyStream* stream) { |
| CHECK(!removing_writes_); |
| removing_writes_ = true; |
| RequestPriority priority = stream->priority(); |
| CHECK_GE(priority, MINIMUM_PRIORITY); |
| CHECK_LE(priority, MAXIMUM_PRIORITY); |
| |
| #if DCHECK_IS_ON() |
| // |stream| should not have pending writes in a queue not matching |
| // its priority. |
| for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { |
| if (priority == i) |
| continue; |
| for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it) |
| DCHECK_NE(it->stream.get(), stream); |
| } |
| #endif |
| |
| // Defer deletion until queue iteration is complete, as |
| // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue. |
| std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers; |
| base::circular_deque<PendingWrite>& queue = queue_[priority]; |
| for (auto it = queue.begin(); it != queue.end();) { |
| if (it->stream.get() == stream) { |
| erased_buffer_producers.push_back(std::move(it->frame_producer)); |
| it = queue.erase(it); |
| } else { |
| ++it; |
| } |
| } |
| removing_writes_ = false; |
| |
| // Iteration on |queue| is completed. Now |erased_buffer_producers| goes out |
| // of scope, SpdyBufferProducers are destroyed. |
| } |
| |
| void SpdyWriteQueue::RemovePendingWritesForStreamsAfter( |
| spdy::SpdyStreamId last_good_stream_id) { |
| CHECK(!removing_writes_); |
| removing_writes_ = true; |
| |
| // Defer deletion until queue iteration is complete, as |
| // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue. |
| std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers; |
| for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { |
| base::circular_deque<PendingWrite>& queue = queue_[i]; |
| for (auto it = queue.begin(); it != queue.end();) { |
| if (it->stream.get() && (it->stream->stream_id() > last_good_stream_id || |
| it->stream->stream_id() == 0)) { |
| erased_buffer_producers.push_back(std::move(it->frame_producer)); |
| it = queue.erase(it); |
| } else { |
| ++it; |
| } |
| } |
| } |
| removing_writes_ = false; |
| |
| // Iteration on each |queue| is completed. Now |erased_buffer_producers| goes |
| // out of scope, SpdyBufferProducers are destroyed. |
| } |
| |
| void SpdyWriteQueue::ChangePriorityOfWritesForStream( |
| SpdyStream* stream, |
| RequestPriority old_priority, |
| RequestPriority new_priority) { |
| CHECK(!removing_writes_); |
| DCHECK(stream); |
| |
| #if DCHECK_IS_ON() |
| // |stream| should not have pending writes in a queue not matching |
| // |old_priority|. |
| for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { |
| if (i == old_priority) |
| continue; |
| for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it) |
| DCHECK_NE(it->stream.get(), stream); |
| } |
| #endif |
| |
| base::circular_deque<PendingWrite>& old_queue = queue_[old_priority]; |
| base::circular_deque<PendingWrite>& new_queue = queue_[new_priority]; |
| for (auto it = old_queue.begin(); it != old_queue.end();) { |
| if (it->stream.get() == stream) { |
| new_queue.push_back(std::move(*it)); |
| it = old_queue.erase(it); |
| } else { |
| ++it; |
| } |
| } |
| } |
| |
| void SpdyWriteQueue::Clear() { |
| CHECK(!removing_writes_); |
| removing_writes_ = true; |
| std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers; |
| |
| for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { |
| for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it) { |
| erased_buffer_producers.push_back(std::move(it->frame_producer)); |
| } |
| queue_[i].clear(); |
| } |
| removing_writes_ = false; |
| } |
| |
| size_t SpdyWriteQueue::EstimateMemoryUsage() const { |
| return base::trace_event::EstimateMemoryUsage(queue_); |
| } |
| |
| } // namespace net |