| // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "net/base/upload_data_stream.h" |
| |
| #include "base/logging.h" |
| #include "net/base/io_buffer.h" |
| #include "net/base/net_errors.h" |
| #include "net/base/upload_bytes_element_reader.h" |
| #include "net/base/upload_element_reader.h" |
| |
| namespace net { |
| |
| bool UploadDataStream::merge_chunks_ = true; |
| |
| // static |
| void UploadDataStream::ResetMergeChunks() { |
| // WARNING: merge_chunks_ must match the above initializer. |
| merge_chunks_ = true; |
| } |
| |
| UploadDataStream::UploadDataStream( |
| ScopedVector<UploadElementReader>* element_readers, |
| int64 identifier) |
| : element_index_(0), |
| total_size_(0), |
| current_position_(0), |
| identifier_(identifier), |
| is_chunked_(false), |
| last_chunk_appended_(false), |
| initialized_successfully_(false), |
| weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { |
| element_readers_.swap(*element_readers); |
| } |
| |
| UploadDataStream::UploadDataStream(Chunked /*chunked*/, int64 identifier) |
| : element_index_(0), |
| total_size_(0), |
| current_position_(0), |
| identifier_(identifier), |
| is_chunked_(true), |
| last_chunk_appended_(false), |
| initialized_successfully_(false), |
| weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { |
| } |
| |
| UploadDataStream::~UploadDataStream() { |
| } |
| |
| UploadDataStream* UploadDataStream::CreateWithReader( |
| scoped_ptr<UploadElementReader> reader, |
| int64 identifier) { |
| ScopedVector<UploadElementReader> readers; |
| readers.push_back(reader.release()); |
| return new UploadDataStream(&readers, identifier); |
| } |
| |
| int UploadDataStream::Init(const CompletionCallback& callback) { |
| Reset(); |
| // Use fast path when initialization can be done synchronously. |
| if (IsInMemory()) |
| return InitSync(); |
| |
| return InitInternal(0, callback); |
| } |
| |
| int UploadDataStream::InitSync() { |
| Reset(); |
| // Initialize all readers synchronously. |
| for (size_t i = 0; i < element_readers_.size(); ++i) { |
| UploadElementReader* reader = element_readers_[i]; |
| const int result = reader->InitSync(); |
| if (result != OK) |
| return result; |
| } |
| |
| FinalizeInitialization(); |
| return OK; |
| } |
| |
| int UploadDataStream::Read(IOBuffer* buf, |
| int buf_len, |
| const CompletionCallback& callback) { |
| DCHECK(initialized_successfully_); |
| DCHECK(!callback.is_null()); |
| DCHECK_GT(buf_len, 0); |
| return ReadInternal(new DrainableIOBuffer(buf, buf_len), callback); |
| } |
| |
| int UploadDataStream::ReadSync(IOBuffer* buf, int buf_len) { |
| DCHECK(initialized_successfully_); |
| DCHECK_GT(buf_len, 0); |
| return ReadInternal(new DrainableIOBuffer(buf, buf_len), |
| CompletionCallback()); |
| } |
| |
| bool UploadDataStream::IsEOF() const { |
| DCHECK(initialized_successfully_); |
| // Check if all elements are consumed. |
| if (element_index_ == element_readers_.size()) { |
| // If the upload data is chunked, check if the last chunk is appended. |
| if (!is_chunked_ || last_chunk_appended_) |
| return true; |
| } |
| return false; |
| } |
| |
| bool UploadDataStream::IsInMemory() const { |
| // Chunks are in memory, but UploadData does not have all the chunks at |
| // once. Chunks are provided progressively with AppendChunk() as chunks |
| // are ready. Check is_chunked_ here, rather than relying on the loop |
| // below, as there is a case that is_chunked_ is set to true, but the |
| // first chunk is not yet delivered. |
| if (is_chunked_) |
| return false; |
| |
| for (size_t i = 0; i < element_readers_.size(); ++i) { |
| if (!element_readers_[i]->IsInMemory()) |
| return false; |
| } |
| return true; |
| } |
| |
| void UploadDataStream::AppendChunk(const char* bytes, |
| int bytes_len, |
| bool is_last_chunk) { |
| DCHECK(is_chunked_); |
| DCHECK(!last_chunk_appended_); |
| last_chunk_appended_ = is_last_chunk; |
| |
| // Initialize a reader for the newly appended chunk. We leave |total_size_| at |
| // zero, since for chunked uploads, we may not know the total size. |
| std::vector<char> data(bytes, bytes + bytes_len); |
| UploadElementReader* reader = new UploadOwnedBytesElementReader(&data); |
| const int rv = reader->InitSync(); |
| DCHECK_EQ(OK, rv); |
| element_readers_.push_back(reader); |
| |
| // Resume pending read. |
| if (!pending_chunked_read_callback_.is_null()) { |
| base::Closure callback = pending_chunked_read_callback_; |
| pending_chunked_read_callback_.Reset(); |
| callback.Run(); |
| } |
| } |
| |
| void UploadDataStream::Reset() { |
| weak_ptr_factory_.InvalidateWeakPtrs(); |
| pending_chunked_read_callback_.Reset(); |
| initialized_successfully_ = false; |
| current_position_ = 0; |
| total_size_ = 0; |
| element_index_ = 0; |
| } |
| |
| int UploadDataStream::InitInternal(int start_index, |
| const CompletionCallback& callback) { |
| DCHECK(!initialized_successfully_); |
| |
| // Call Init() for all elements. |
| for (size_t i = start_index; i < element_readers_.size(); ++i) { |
| UploadElementReader* reader = element_readers_[i]; |
| // When new_result is ERR_IO_PENDING, InitInternal() will be called |
| // with start_index == i + 1 when reader->Init() finishes. |
| const int result = reader->Init( |
| base::Bind(&UploadDataStream::ResumePendingInit, |
| weak_ptr_factory_.GetWeakPtr(), |
| i + 1, |
| callback)); |
| if (result != OK) |
| return result; |
| } |
| |
| // Finalize initialization. |
| FinalizeInitialization(); |
| return OK; |
| } |
| |
| void UploadDataStream::ResumePendingInit(int start_index, |
| const CompletionCallback& callback, |
| int previous_result) { |
| DCHECK(!initialized_successfully_); |
| DCHECK(!callback.is_null()); |
| DCHECK_NE(ERR_IO_PENDING, previous_result); |
| |
| // Check the last result. |
| if (previous_result != OK) { |
| callback.Run(previous_result); |
| return; |
| } |
| |
| const int result = InitInternal(start_index, callback); |
| if (result != ERR_IO_PENDING) |
| callback.Run(result); |
| } |
| |
| void UploadDataStream::FinalizeInitialization() { |
| DCHECK(!initialized_successfully_); |
| if (!is_chunked_) { |
| uint64 total_size = 0; |
| for (size_t i = 0; i < element_readers_.size(); ++i) { |
| UploadElementReader* reader = element_readers_[i]; |
| total_size += reader->GetContentLength(); |
| } |
| total_size_ = total_size; |
| } |
| initialized_successfully_ = true; |
| } |
| |
| int UploadDataStream::ReadInternal(scoped_refptr<DrainableIOBuffer> buf, |
| const CompletionCallback& callback) { |
| DCHECK(initialized_successfully_); |
| |
| while (element_index_ < element_readers_.size()) { |
| UploadElementReader* reader = element_readers_[element_index_]; |
| |
| if (reader->BytesRemaining() == 0) { |
| ++element_index_; |
| continue; |
| } |
| |
| if (buf->BytesRemaining() == 0) |
| break; |
| |
| // Some tests need chunks to be kept unmerged. |
| if (!merge_chunks_ && is_chunked_ && buf->BytesConsumed()) |
| break; |
| |
| int result = OK; |
| if (!callback.is_null()) { |
| result = reader->Read( |
| buf, |
| buf->BytesRemaining(), |
| base::Bind(base::IgnoreResult(&UploadDataStream::ResumePendingRead), |
| weak_ptr_factory_.GetWeakPtr(), |
| buf, |
| callback)); |
| } else { |
| result = reader->ReadSync(buf, buf->BytesRemaining()); |
| DCHECK_NE(ERR_IO_PENDING, result); |
| } |
| if (result == ERR_IO_PENDING) |
| return ERR_IO_PENDING; |
| DCHECK_GE(result, 0); |
| buf->DidConsume(result); |
| } |
| |
| const int bytes_copied = buf->BytesConsumed(); |
| current_position_ += bytes_copied; |
| |
| if (is_chunked_ && !IsEOF() && bytes_copied == 0) { |
| DCHECK(!callback.is_null()); |
| DCHECK(pending_chunked_read_callback_.is_null()); |
| pending_chunked_read_callback_ = |
| base::Bind(&UploadDataStream::ResumePendingRead, |
| weak_ptr_factory_.GetWeakPtr(), |
| buf, |
| callback, |
| OK); |
| return ERR_IO_PENDING; |
| } |
| return bytes_copied; |
| } |
| |
| void UploadDataStream::ResumePendingRead(scoped_refptr<DrainableIOBuffer> buf, |
| const CompletionCallback& callback, |
| int previous_result) { |
| DCHECK_GE(previous_result, 0); |
| |
| // Add the last result. |
| buf->DidConsume(previous_result); |
| |
| DCHECK(!callback.is_null()); |
| const int result = ReadInternal(buf, callback); |
| if (result != ERR_IO_PENDING) |
| callback.Run(result); |
| } |
| |
| } // namespace net |