Initial import of Cobalt 2.8885 2016-07-27
diff --git a/src/net/proxy/multi_threaded_proxy_resolver.cc b/src/net/proxy/multi_threaded_proxy_resolver.cc
new file mode 100644
index 0000000..bd9890c
--- /dev/null
+++ b/src/net/proxy/multi_threaded_proxy_resolver.cc
@@ -0,0 +1,611 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/proxy/multi_threaded_proxy_resolver.h"
+
+#include "base/bind.h"
+#include "base/message_loop_proxy.h"
+#include "base/metrics/histogram.h"
+#include "base/string_util.h"
+#include "base/stringprintf.h"
+#include "base/threading/thread.h"
+#include "base/threading/thread_restrictions.h"
+#include "net/base/net_errors.h"
+#include "net/base/net_log.h"
+#include "net/proxy/proxy_info.h"
+
+// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
+//               data when SetPacScript fails. That will reclaim memory when
+//               testing bogus scripts.
+
+namespace net {
+
+namespace {
+
+class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
+ public:
+  explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
+  void PurgeMemory() { resolver_->PurgeMemory(); }
+ private:
+  friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
+  ~PurgeMemoryTask() {}
+  ProxyResolver* resolver_;
+};
+
+}  // namespace
+
+// An "executor" is a job-runner for PAC requests. It encapsulates a worker
+// thread and a synchronous ProxyResolver (which will be operated on said
+// thread.)
+class MultiThreadedProxyResolver::Executor
+    : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
+ public:
+  // |coordinator| must remain valid throughout our lifetime. It is used to
+  // signal when the executor is ready to receive work by calling
+  // |coordinator->OnExecutorReady()|.
+  // The constructor takes ownership of |resolver|.
+  // |thread_number| is an identifier used when naming the worker thread.
+  Executor(MultiThreadedProxyResolver* coordinator,
+           ProxyResolver* resolver,
+           int thread_number);
+
+  // Submit a job to this executor.
+  void StartJob(Job* job);
+
+  // Callback for when a job has completed running on the executor's thread.
+  void OnJobCompleted(Job* job);
+
+  // Cleanup the executor. Cancels all outstanding work, and frees the thread
+  // and resolver.
+  void Destroy();
+
+  void PurgeMemory();
+
+  // Returns the outstanding job, or NULL.
+  Job* outstanding_job() const { return outstanding_job_.get(); }
+
+  ProxyResolver* resolver() { return resolver_.get(); }
+
+  int thread_number() const { return thread_number_; }
+
+ private:
+  friend class base::RefCountedThreadSafe<Executor>;
+  ~Executor();
+
+  MultiThreadedProxyResolver* coordinator_;
+  const int thread_number_;
+
+  // The currently active job for this executor (either a SetPacScript or
+  // GetProxyForURL task).
+  scoped_refptr<Job> outstanding_job_;
+
+  // The synchronous resolver implementation.
+  scoped_ptr<ProxyResolver> resolver_;
+
+  // The thread where |resolver_| is run on.
+  // Note that declaration ordering is important here. |thread_| needs to be
+  // destroyed *before* |resolver_|, in case |resolver_| is currently
+  // executing on |thread_|.
+  scoped_ptr<base::Thread> thread_;
+};
+
+// MultiThreadedProxyResolver::Job ---------------------------------------------
+
+class MultiThreadedProxyResolver::Job
+    : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
+ public:
+  // Identifies the subclass of Job (only being used for debugging purposes).
+  enum Type {
+    TYPE_GET_PROXY_FOR_URL,
+    TYPE_SET_PAC_SCRIPT,
+    TYPE_SET_PAC_SCRIPT_INTERNAL,
+  };
+
+  Job(Type type, const CompletionCallback& callback)
+      : type_(type),
+        callback_(callback),
+        executor_(NULL),
+        was_cancelled_(false) {
+  }
+
+  void set_executor(Executor* executor) {
+    executor_ = executor;
+  }
+
+  // The "executor" is the job runner that is scheduling this job. If
+  // this job has not been submitted to an executor yet, this will be
+  // NULL (and we know it hasn't started yet).
+  Executor* executor() {
+    return executor_;
+  }
+
+  // Mark the job as having been cancelled.
+  void Cancel() {
+    was_cancelled_ = true;
+  }
+
+  // Returns true if Cancel() has been called.
+  bool was_cancelled() const { return was_cancelled_; }
+
+  Type type() const { return type_; }
+
+  // Returns true if this job still has a user callback. Some jobs
+  // do not have a user callback, because they were helper jobs
+  // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
+  //
+  // Otherwise jobs that correspond with user-initiated work will
+  // have a non-null callback up until the callback is run.
+  bool has_user_callback() const { return !callback_.is_null(); }
+
+  // This method is called when the job is inserted into a wait queue
+  // because no executors were ready to accept it.
+  virtual void WaitingForThread() {}
+
+  // This method is called just before the job is posted to the work thread.
+  virtual void FinishedWaitingForThread() {}
+
+  // This method is called on the worker thread to do the job's work. On
+  // completion, implementors are expected to call OnJobCompleted() on
+  // |origin_loop|.
+  virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0;
+
+ protected:
+  void OnJobCompleted() {
+    // |executor_| will be NULL if the executor has already been deleted.
+    if (executor_)
+      executor_->OnJobCompleted(this);
+  }
+
+  void RunUserCallback(int result) {
+    DCHECK(has_user_callback());
+    CompletionCallback callback = callback_;
+    // Reset the callback so has_user_callback() will now return false.
+    callback_.Reset();
+    callback.Run(result);
+  }
+
+  friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
+
+  virtual ~Job() {}
+
+ private:
+  const Type type_;
+  CompletionCallback callback_;
+  Executor* executor_;
+  bool was_cancelled_;
+};
+
+// MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
+
+// Runs on the worker thread to call ProxyResolver::SetPacScript.
+class MultiThreadedProxyResolver::SetPacScriptJob
+    : public MultiThreadedProxyResolver::Job {
+ public:
+  SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
+                  const CompletionCallback& callback)
+    : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
+                                TYPE_SET_PAC_SCRIPT_INTERNAL,
+          callback),
+      script_data_(script_data) {
+  }
+
+  // Runs on the worker thread.
+  virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
+    ProxyResolver* resolver = executor()->resolver();
+    int rv = resolver->SetPacScript(script_data_, CompletionCallback());
+
+    DCHECK_NE(rv, ERR_IO_PENDING);
+    origin_loop->PostTask(
+        FROM_HERE,
+        base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
+  }
+
+ protected:
+  virtual ~SetPacScriptJob() {}
+
+ private:
+  // Runs the completion callback on the origin thread.
+  void RequestComplete(int result_code) {
+    // The task may have been cancelled after it was started.
+    if (!was_cancelled() && has_user_callback()) {
+      RunUserCallback(result_code);
+    }
+    OnJobCompleted();
+  }
+
+  const scoped_refptr<ProxyResolverScriptData> script_data_;
+};
+
+// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
+
+class MultiThreadedProxyResolver::GetProxyForURLJob
+    : public MultiThreadedProxyResolver::Job {
+ public:
+  // |url|         -- the URL of the query.
+  // |results|     -- the structure to fill with proxy resolve results.
+  GetProxyForURLJob(const GURL& url,
+                    ProxyInfo* results,
+                    const CompletionCallback& callback,
+                    const BoundNetLog& net_log)
+      : Job(TYPE_GET_PROXY_FOR_URL, callback),
+        results_(results),
+        net_log_(net_log),
+        url_(url),
+        was_waiting_for_thread_(false) {
+    DCHECK(!callback.is_null());
+    start_time_ = base::TimeTicks::Now();
+  }
+
+  BoundNetLog* net_log() { return &net_log_; }
+
+  virtual void WaitingForThread() OVERRIDE {
+    was_waiting_for_thread_ = true;
+    net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
+  }
+
+  virtual void FinishedWaitingForThread() OVERRIDE {
+    DCHECK(executor());
+
+    submitted_to_thread_time_ = base::TimeTicks::Now();
+
+    if (was_waiting_for_thread_) {
+      net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
+    }
+
+    net_log_.AddEvent(
+        NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
+        NetLog::IntegerCallback("thread_number", executor()->thread_number()));
+  }
+
+  // Runs on the worker thread.
+  virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
+    ProxyResolver* resolver = executor()->resolver();
+    int rv = resolver->GetProxyForURL(
+        url_, &results_buf_, CompletionCallback(), NULL, net_log_);
+    DCHECK_NE(rv, ERR_IO_PENDING);
+
+    origin_loop->PostTask(
+        FROM_HERE,
+        base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
+  }
+
+ protected:
+  virtual ~GetProxyForURLJob() {}
+
+ private:
+  // Runs the completion callback on the origin thread.
+  void QueryComplete(int result_code) {
+    // The Job may have been cancelled after it was started.
+    if (!was_cancelled()) {
+      RecordPerformanceMetrics();
+      if (result_code >= OK) {  // Note: unit-tests use values > 0.
+        results_->Use(results_buf_);
+      }
+      RunUserCallback(result_code);
+    }
+    OnJobCompleted();
+  }
+
+  void RecordPerformanceMetrics() {
+    DCHECK(!was_cancelled());
+
+    base::TimeTicks now = base::TimeTicks::Now();
+
+    // Log the total time the request took to complete.
+    UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Time",
+                               now - start_time_);
+
+    // Log the time the request was stalled waiting for a thread to free up.
+    UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Thread_Wait_Time",
+                               submitted_to_thread_time_ - start_time_);
+  }
+
+  // Must only be used on the "origin" thread.
+  ProxyInfo* results_;
+
+  // Can be used on either "origin" or worker thread.
+  BoundNetLog net_log_;
+  const GURL url_;
+
+  // Usable from within DoQuery on the worker thread.
+  ProxyInfo results_buf_;
+
+  base::TimeTicks start_time_;
+  base::TimeTicks submitted_to_thread_time_;
+
+  bool was_waiting_for_thread_;
+};
+
+// MultiThreadedProxyResolver::Executor ----------------------------------------
+
+MultiThreadedProxyResolver::Executor::Executor(
+    MultiThreadedProxyResolver* coordinator,
+    ProxyResolver* resolver,
+    int thread_number)
+    : coordinator_(coordinator),
+      thread_number_(thread_number),
+      resolver_(resolver) {
+  DCHECK(coordinator);
+  DCHECK(resolver);
+  // Start up the thread.
+  // Note that it is safe to pass a temporary C-String to Thread(), as it will
+  // make a copy.
+  std::string thread_name =
+      base::StringPrintf("PAC thread #%d", thread_number);
+  thread_.reset(new base::Thread(thread_name.c_str()));
+  CHECK(thread_->Start());
+}
+
+void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
+  DCHECK(!outstanding_job_);
+  outstanding_job_ = job;
+
+  // Run the job. Once it has completed (regardless of whether it was
+  // cancelled), it will invoke OnJobCompleted() on this thread.
+  job->set_executor(this);
+  job->FinishedWaitingForThread();
+  thread_->message_loop()->PostTask(
+      FROM_HERE,
+      base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
+}
+
+void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
+  DCHECK_EQ(job, outstanding_job_.get());
+  outstanding_job_ = NULL;
+  coordinator_->OnExecutorReady(this);
+}
+
+void MultiThreadedProxyResolver::Executor::Destroy() {
+  DCHECK(coordinator_);
+
+  // Give the resolver an opportunity to shutdown from THIS THREAD before
+  // joining on the resolver thread. This allows certain implementations
+  // to avoid deadlocks.
+  resolver_->Shutdown();
+
+  {
+    // See http://crbug.com/69710.
+    base::ThreadRestrictions::ScopedAllowIO allow_io;
+
+    // Join the worker thread.
+    thread_.reset();
+  }
+
+  // Cancel any outstanding job.
+  if (outstanding_job_) {
+    outstanding_job_->Cancel();
+    // Orphan the job (since this executor may be deleted soon).
+    outstanding_job_->set_executor(NULL);
+  }
+
+  // It is now safe to free the ProxyResolver, since all the tasks that
+  // were using it on the resolver thread have completed.
+  resolver_.reset();
+
+  // Null some stuff as a precaution.
+  coordinator_ = NULL;
+  outstanding_job_ = NULL;
+}
+
+void MultiThreadedProxyResolver::Executor::PurgeMemory() {
+  scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
+  thread_->message_loop()->PostTask(
+      FROM_HERE,
+      base::Bind(&PurgeMemoryTask::PurgeMemory, helper.get()));
+}
+
+MultiThreadedProxyResolver::Executor::~Executor() {
+  // The important cleanup happens as part of Destroy(), which should always be
+  // called first.
+  DCHECK(!coordinator_) << "Destroy() was not called";
+  DCHECK(!thread_.get());
+  DCHECK(!resolver_.get());
+  DCHECK(!outstanding_job_);
+}
+
+// MultiThreadedProxyResolver --------------------------------------------------
+
+MultiThreadedProxyResolver::MultiThreadedProxyResolver(
+    ProxyResolverFactory* resolver_factory,
+    size_t max_num_threads)
+    : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
+      resolver_factory_(resolver_factory),
+      max_num_threads_(max_num_threads) {
+  DCHECK_GE(max_num_threads, 1u);
+}
+
+MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
+  // We will cancel all outstanding requests.
+  pending_jobs_.clear();
+  ReleaseAllExecutors();
+}
+
+int MultiThreadedProxyResolver::GetProxyForURL(
+    const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
+    RequestHandle* request, const BoundNetLog& net_log) {
+  DCHECK(CalledOnValidThread());
+  DCHECK(!callback.is_null());
+  DCHECK(current_script_data_.get())
+      << "Resolver is un-initialized. Must call SetPacScript() first!";
+
+  scoped_refptr<GetProxyForURLJob> job(
+      new GetProxyForURLJob(url, results, callback, net_log));
+
+  // Completion will be notified through |callback|, unless the caller cancels
+  // the request using |request|.
+  if (request)
+    *request = reinterpret_cast<RequestHandle>(job.get());
+
+  // If there is an executor that is ready to run this request, submit it!
+  Executor* executor = FindIdleExecutor();
+  if (executor) {
+    DCHECK_EQ(0u, pending_jobs_.size());
+    executor->StartJob(job);
+    return ERR_IO_PENDING;
+  }
+
+  // Otherwise queue this request. (We will schedule it to a thread once one
+  // becomes available).
+  job->WaitingForThread();
+  pending_jobs_.push_back(job);
+
+  // If we haven't already reached the thread limit, provision a new thread to
+  // drain the requests more quickly.
+  if (executors_.size() < max_num_threads_) {
+    executor = AddNewExecutor();
+    executor->StartJob(
+        new SetPacScriptJob(current_script_data_, CompletionCallback()));
+  }
+
+  return ERR_IO_PENDING;
+}
+
+void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
+  DCHECK(CalledOnValidThread());
+  DCHECK(req);
+
+  Job* job = reinterpret_cast<Job*>(req);
+  DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
+
+  if (job->executor()) {
+    // If the job was already submitted to the executor, just mark it
+    // as cancelled so the user callback isn't run on completion.
+    job->Cancel();
+  } else {
+    // Otherwise the job is just sitting in a queue.
+    PendingJobsQueue::iterator it =
+        std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
+    DCHECK(it != pending_jobs_.end());
+    pending_jobs_.erase(it);
+  }
+}
+
+LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
+  DCHECK(CalledOnValidThread());
+  DCHECK(req);
+
+  Job* job = reinterpret_cast<Job*>(req);
+  if (job->executor())
+    return job->executor()->resolver()->GetLoadStateThreadSafe(NULL);
+  return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
+}
+
+LoadState MultiThreadedProxyResolver::GetLoadStateThreadSafe(
+    RequestHandle req) const {
+  NOTIMPLEMENTED();
+  return LOAD_STATE_IDLE;
+}
+
+void MultiThreadedProxyResolver::CancelSetPacScript() {
+  DCHECK(CalledOnValidThread());
+  DCHECK_EQ(0u, pending_jobs_.size());
+  DCHECK_EQ(1u, executors_.size());
+  DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
+            executors_[0]->outstanding_job()->type());
+
+  // Defensively clear some data which shouldn't be getting used
+  // anymore.
+  current_script_data_ = NULL;
+
+  ReleaseAllExecutors();
+}
+
+void MultiThreadedProxyResolver::PurgeMemory() {
+  DCHECK(CalledOnValidThread());
+  for (ExecutorList::iterator it = executors_.begin();
+       it != executors_.end(); ++it) {
+    Executor* executor = *it;
+    executor->PurgeMemory();
+  }
+}
+
+int MultiThreadedProxyResolver::SetPacScript(
+    const scoped_refptr<ProxyResolverScriptData>& script_data,
+    const CompletionCallback&callback) {
+  DCHECK(CalledOnValidThread());
+  DCHECK(!callback.is_null());
+
+  // Save the script details, so we can provision new executors later.
+  current_script_data_ = script_data;
+
+  // The user should not have any outstanding requests when they call
+  // SetPacScript().
+  CheckNoOutstandingUserRequests();
+
+  // Destroy all of the current threads and their proxy resolvers.
+  ReleaseAllExecutors();
+
+  // Provision a new executor, and run the SetPacScript request. On completion
+  // notification will be sent through |callback|.
+  Executor* executor = AddNewExecutor();
+  executor->StartJob(new SetPacScriptJob(script_data, callback));
+  return ERR_IO_PENDING;
+}
+
+void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
+  DCHECK(CalledOnValidThread());
+  CHECK_EQ(0u, pending_jobs_.size());
+
+  for (ExecutorList::const_iterator it = executors_.begin();
+       it != executors_.end(); ++it) {
+    const Executor* executor = *it;
+    Job* job = executor->outstanding_job();
+    // The "has_user_callback()" is to exclude jobs for which the callback
+    // has already been invoked, or was not user-initiated (as in the case of
+    // lazy thread provisions). User-initiated jobs may !has_user_callback()
+    // when the callback has already been run. (Since we only clear the
+    // outstanding job AFTER the callback has been invoked, it is possible
+    // for a new request to be started from within the callback).
+    CHECK(!job || job->was_cancelled() || !job->has_user_callback());
+  }
+}
+
+void MultiThreadedProxyResolver::ReleaseAllExecutors() {
+  DCHECK(CalledOnValidThread());
+  for (ExecutorList::iterator it = executors_.begin();
+       it != executors_.end(); ++it) {
+    Executor* executor = *it;
+    executor->Destroy();
+  }
+  executors_.clear();
+}
+
+MultiThreadedProxyResolver::Executor*
+MultiThreadedProxyResolver::FindIdleExecutor() {
+  DCHECK(CalledOnValidThread());
+  for (ExecutorList::iterator it = executors_.begin();
+       it != executors_.end(); ++it) {
+    Executor* executor = *it;
+    if (!executor->outstanding_job())
+      return executor;
+  }
+  return NULL;
+}
+
+MultiThreadedProxyResolver::Executor*
+MultiThreadedProxyResolver::AddNewExecutor() {
+  DCHECK(CalledOnValidThread());
+  DCHECK_LT(executors_.size(), max_num_threads_);
+  // The "thread number" is used to give the thread a unique name.
+  int thread_number = executors_.size();
+  ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
+  Executor* executor = new Executor(
+      this, resolver, thread_number);
+  executors_.push_back(make_scoped_refptr(executor));
+  return executor;
+}
+
+void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
+  DCHECK(CalledOnValidThread());
+  if (pending_jobs_.empty())
+    return;
+
+  // Get the next job to process (FIFO). Transfer it from the pending queue
+  // to the executor.
+  scoped_refptr<Job> job = pending_jobs_.front();
+  pending_jobs_.pop_front();
+  executor->StartJob(job);
+}
+
+}  // namespace net