blob: c05969dff05b8a74bbc4029b0c8f099590816c13 [file] [log] [blame]
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/proxy_resolution/multi_threaded_proxy_resolver.h"
#include <utility>
#include <vector>
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/callback_helpers.h"
#include "base/containers/circular_deque.h"
#include "base/location.h"
#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
#include "base/strings/string_util.h"
#include "base/strings/stringprintf.h"
#include "base/threading/thread.h"
#include "base/threading/thread_checker.h"
#include "base/threading/thread_restrictions.h"
#include "base/threading/thread_task_runner_handle.h"
#include "net/base/net_errors.h"
#include "net/log/net_log.h"
#include "net/log/net_log_event_type.h"
#include "net/log/net_log_with_source.h"
#include "net/proxy_resolution/proxy_info.h"
#include "net/proxy_resolution/proxy_resolver.h"
namespace net {
namespace {
class Job;
// An "executor" is a job-runner for PAC requests. It encapsulates a worker
// thread and a synchronous ProxyResolver (which will be operated on said
// thread.)
class Executor : public base::RefCountedThreadSafe<Executor> {
public:
class Coordinator {
public:
virtual void OnExecutorReady(Executor* executor) = 0;
protected:
virtual ~Coordinator() = default;
};
// |coordinator| must remain valid throughout our lifetime. It is used to
// signal when the executor is ready to receive work by calling
// |coordinator->OnExecutorReady()|.
// |thread_number| is an identifier used when naming the worker thread.
Executor(Coordinator* coordinator, int thread_number);
// Submit a job to this executor.
void StartJob(Job* job);
// Callback for when a job has completed running on the executor's thread.
void OnJobCompleted(Job* job);
// Cleanup the executor. Cancels all outstanding work, and frees the thread
// and resolver.
void Destroy();
// Returns the outstanding job, or NULL.
Job* outstanding_job() const { return outstanding_job_.get(); }
ProxyResolver* resolver() { return resolver_.get(); }
int thread_number() const { return thread_number_; }
void set_resolver(std::unique_ptr<ProxyResolver> resolver) {
resolver_ = std::move(resolver);
}
void set_coordinator(Coordinator* coordinator) {
DCHECK(coordinator);
DCHECK(coordinator_);
coordinator_ = coordinator;
}
private:
friend class base::RefCountedThreadSafe<Executor>;
~Executor();
Coordinator* coordinator_;
const int thread_number_;
// The currently active job for this executor (either a CreateProxyResolver or
// GetProxyForURL task).
scoped_refptr<Job> outstanding_job_;
// The synchronous resolver implementation.
std::unique_ptr<ProxyResolver> resolver_;
// The thread where |resolver_| is run on.
// Note that declaration ordering is important here. |thread_| needs to be
// destroyed *before* |resolver_|, in case |resolver_| is currently
// executing on |thread_|.
std::unique_ptr<base::Thread> thread_;
};
class MultiThreadedProxyResolver : public ProxyResolver,
public Executor::Coordinator {
public:
// Creates an asynchronous ProxyResolver that runs requests on up to
// |max_num_threads|.
//
// For each thread that is created, an accompanying synchronous ProxyResolver
// will be provisioned using |resolver_factory|. All methods on these
// ProxyResolvers will be called on the one thread.
MultiThreadedProxyResolver(
std::unique_ptr<ProxyResolverFactory> resolver_factory,
size_t max_num_threads,
const scoped_refptr<PacFileData>& script_data,
scoped_refptr<Executor> executor);
~MultiThreadedProxyResolver() override;
// ProxyResolver implementation:
int GetProxyForURL(const GURL& url,
ProxyInfo* results,
CompletionOnceCallback callback,
std::unique_ptr<Request>* request,
const NetLogWithSource& net_log) override;
private:
class GetProxyForURLJob;
class RequestImpl;
// FIFO queue of pending jobs waiting to be started.
// TODO(eroman): Make this priority queue.
using PendingJobsQueue = base::circular_deque<scoped_refptr<Job>>;
using ExecutorList = std::vector<scoped_refptr<Executor>>;
// Returns an idle worker thread which is ready to receive GetProxyForURL()
// requests. If all threads are occupied, returns NULL.
Executor* FindIdleExecutor();
// Creates a new worker thread, and appends it to |executors_|.
void AddNewExecutor();
// Starts the next job from |pending_jobs_| if possible.
void OnExecutorReady(Executor* executor) override;
const std::unique_ptr<ProxyResolverFactory> resolver_factory_;
const size_t max_num_threads_;
PendingJobsQueue pending_jobs_;
ExecutorList executors_;
scoped_refptr<PacFileData> script_data_;
THREAD_CHECKER(thread_checker_);
};
// Job ---------------------------------------------
class Job : public base::RefCountedThreadSafe<Job> {
public:
Job() : executor_(NULL), was_cancelled_(false) {}
void set_executor(Executor* executor) {
executor_ = executor;
}
// The "executor" is the job runner that is scheduling this job. If
// this job has not been submitted to an executor yet, this will be
// NULL (and we know it hasn't started yet).
Executor* executor() {
return executor_;
}
// Mark the job as having been cancelled.
void Cancel() {
was_cancelled_ = true;
}
// Returns true if Cancel() has been called.
bool was_cancelled() const { return was_cancelled_; }
// This method is called when the job is inserted into a wait queue
// because no executors were ready to accept it.
virtual void WaitingForThread() {}
// This method is called just before the job is posted to the work thread.
virtual void FinishedWaitingForThread() {}
// This method is called on the worker thread to do the job's work. On
// completion, implementors are expected to call OnJobCompleted() on
// |origin_runner|.
virtual void Run(
scoped_refptr<base::SingleThreadTaskRunner> origin_runner) = 0;
protected:
void OnJobCompleted() {
// |executor_| will be NULL if the executor has already been deleted.
if (executor_)
executor_->OnJobCompleted(this);
}
friend class base::RefCountedThreadSafe<Job>;
virtual ~Job() = default;
private:
Executor* executor_;
bool was_cancelled_;
};
class MultiThreadedProxyResolver::RequestImpl : public ProxyResolver::Request {
public:
explicit RequestImpl(scoped_refptr<Job> job) : job_(std::move(job)) {}
~RequestImpl() override { job_->Cancel(); }
LoadState GetLoadState() override {
return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
}
private:
scoped_refptr<Job> job_;
};
// CreateResolverJob -----------------------------------------------------------
// Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver.
class CreateResolverJob : public Job {
public:
CreateResolverJob(const scoped_refptr<PacFileData>& script_data,
ProxyResolverFactory* factory)
: script_data_(script_data), factory_(factory) {}
// Runs on the worker thread.
void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
std::unique_ptr<ProxyResolverFactory::Request> request;
int rv = factory_->CreateProxyResolver(script_data_, &resolver_,
CompletionOnceCallback(), &request);
DCHECK_NE(rv, ERR_IO_PENDING);
origin_runner->PostTask(
FROM_HERE, base::Bind(&CreateResolverJob::RequestComplete, this, rv));
}
protected:
~CreateResolverJob() override = default;
private:
// Runs the completion callback on the origin thread.
void RequestComplete(int result_code) {
// The task may have been cancelled after it was started.
if (!was_cancelled()) {
DCHECK(executor());
executor()->set_resolver(std::move(resolver_));
}
OnJobCompleted();
}
const scoped_refptr<PacFileData> script_data_;
ProxyResolverFactory* factory_;
std::unique_ptr<ProxyResolver> resolver_;
};
// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
class MultiThreadedProxyResolver::GetProxyForURLJob : public Job {
public:
// |url| -- the URL of the query.
// |results| -- the structure to fill with proxy resolve results.
GetProxyForURLJob(const GURL& url,
ProxyInfo* results,
CompletionOnceCallback callback,
const NetLogWithSource& net_log)
: callback_(std::move(callback)),
results_(results),
net_log_(net_log),
url_(url),
was_waiting_for_thread_(false) {
DCHECK(callback_);
}
NetLogWithSource* net_log() { return &net_log_; }
void WaitingForThread() override {
was_waiting_for_thread_ = true;
net_log_.BeginEvent(NetLogEventType::WAITING_FOR_PROXY_RESOLVER_THREAD);
}
void FinishedWaitingForThread() override {
DCHECK(executor());
if (was_waiting_for_thread_) {
net_log_.EndEvent(NetLogEventType::WAITING_FOR_PROXY_RESOLVER_THREAD);
}
net_log_.AddEvent(
NetLogEventType::SUBMITTED_TO_RESOLVER_THREAD,
NetLog::IntCallback("thread_number", executor()->thread_number()));
}
// Runs on the worker thread.
void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
ProxyResolver* resolver = executor()->resolver();
DCHECK(resolver);
int rv = resolver->GetProxyForURL(url_, &results_buf_,
CompletionOnceCallback(), NULL, net_log_);
DCHECK_NE(rv, ERR_IO_PENDING);
origin_runner->PostTask(
FROM_HERE, base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
}
protected:
~GetProxyForURLJob() override = default;
private:
// Runs the completion callback on the origin thread.
void QueryComplete(int result_code) {
// The Job may have been cancelled after it was started.
if (!was_cancelled()) {
if (result_code >= OK) { // Note: unit-tests use values > 0.
results_->Use(results_buf_);
}
std::move(callback_).Run(result_code);
}
OnJobCompleted();
}
CompletionOnceCallback callback_;
// Must only be used on the "origin" thread.
ProxyInfo* results_;
// Can be used on either "origin" or worker thread.
NetLogWithSource net_log_;
const GURL url_;
// Usable from within DoQuery on the worker thread.
ProxyInfo results_buf_;
bool was_waiting_for_thread_;
};
// Executor ----------------------------------------
Executor::Executor(Executor::Coordinator* coordinator, int thread_number)
: coordinator_(coordinator), thread_number_(thread_number) {
DCHECK(coordinator);
// Start up the thread.
thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
thread_number)));
CHECK(thread_->Start());
}
void Executor::StartJob(Job* job) {
DCHECK(!outstanding_job_.get());
outstanding_job_ = job;
// Run the job. Once it has completed (regardless of whether it was
// cancelled), it will invoke OnJobCompleted() on this thread.
job->set_executor(this);
job->FinishedWaitingForThread();
thread_->task_runner()->PostTask(
FROM_HERE,
base::Bind(&Job::Run, job, base::ThreadTaskRunnerHandle::Get()));
}
void Executor::OnJobCompleted(Job* job) {
DCHECK_EQ(job, outstanding_job_.get());
outstanding_job_ = NULL;
coordinator_->OnExecutorReady(this);
}
void Executor::Destroy() {
DCHECK(coordinator_);
{
// See http://crbug.com/69710.
base::ThreadRestrictions::ScopedAllowIO allow_io;
// Join the worker thread.
thread_.reset();
}
// Cancel any outstanding job.
if (outstanding_job_.get()) {
outstanding_job_->Cancel();
// Orphan the job (since this executor may be deleted soon).
outstanding_job_->set_executor(NULL);
}
// It is now safe to free the ProxyResolver, since all the tasks that
// were using it on the resolver thread have completed.
resolver_.reset();
// Null some stuff as a precaution.
coordinator_ = NULL;
outstanding_job_ = NULL;
}
Executor::~Executor() {
// The important cleanup happens as part of Destroy(), which should always be
// called first.
DCHECK(!coordinator_) << "Destroy() was not called";
DCHECK(!thread_.get());
DCHECK(!resolver_.get());
DCHECK(!outstanding_job_.get());
}
// MultiThreadedProxyResolver --------------------------------------------------
MultiThreadedProxyResolver::MultiThreadedProxyResolver(
std::unique_ptr<ProxyResolverFactory> resolver_factory,
size_t max_num_threads,
const scoped_refptr<PacFileData>& script_data,
scoped_refptr<Executor> executor)
: resolver_factory_(std::move(resolver_factory)),
max_num_threads_(max_num_threads),
script_data_(script_data) {
DCHECK(script_data_);
executor->set_coordinator(this);
executors_.push_back(executor);
}
MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
// We will cancel all outstanding requests.
pending_jobs_.clear();
for (auto& executor : executors_) {
executor->Destroy();
}
}
int MultiThreadedProxyResolver::GetProxyForURL(
const GURL& url,
ProxyInfo* results,
CompletionOnceCallback callback,
std::unique_ptr<Request>* request,
const NetLogWithSource& net_log) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!callback.is_null());
scoped_refptr<GetProxyForURLJob> job(
new GetProxyForURLJob(url, results, std::move(callback), net_log));
// Completion will be notified through |callback|, unless the caller cancels
// the request using |request|.
if (request)
request->reset(new RequestImpl(job));
// If there is an executor that is ready to run this request, submit it!
Executor* executor = FindIdleExecutor();
if (executor) {
DCHECK_EQ(0u, pending_jobs_.size());
executor->StartJob(job.get());
return ERR_IO_PENDING;
}
// Otherwise queue this request. (We will schedule it to a thread once one
// becomes available).
job->WaitingForThread();
pending_jobs_.push_back(job);
// If we haven't already reached the thread limit, provision a new thread to
// drain the requests more quickly.
if (executors_.size() < max_num_threads_)
AddNewExecutor();
return ERR_IO_PENDING;
}
Executor* MultiThreadedProxyResolver::FindIdleExecutor() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
for (auto it = executors_.begin(); it != executors_.end(); ++it) {
Executor* executor = it->get();
if (!executor->outstanding_job())
return executor;
}
return NULL;
}
void MultiThreadedProxyResolver::AddNewExecutor() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK_LT(executors_.size(), max_num_threads_);
// The "thread number" is used to give the thread a unique name.
int thread_number = executors_.size();
Executor* executor = new Executor(this, thread_number);
executor->StartJob(
new CreateResolverJob(script_data_, resolver_factory_.get()));
executors_.push_back(base::WrapRefCounted(executor));
}
void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
while (!pending_jobs_.empty()) {
scoped_refptr<Job> job = pending_jobs_.front();
pending_jobs_.pop_front();
if (!job->was_cancelled()) {
executor->StartJob(job.get());
return;
}
}
}
} // namespace
class MultiThreadedProxyResolverFactory::Job
: public ProxyResolverFactory::Request,
public Executor::Coordinator {
public:
Job(MultiThreadedProxyResolverFactory* factory,
const scoped_refptr<PacFileData>& script_data,
std::unique_ptr<ProxyResolver>* resolver,
std::unique_ptr<ProxyResolverFactory> resolver_factory,
size_t max_num_threads,
CompletionOnceCallback callback)
: factory_(factory),
resolver_out_(resolver),
resolver_factory_(std::move(resolver_factory)),
max_num_threads_(max_num_threads),
script_data_(script_data),
executor_(new Executor(this, 0)),
callback_(std::move(callback)) {
executor_->StartJob(
new CreateResolverJob(script_data_, resolver_factory_.get()));
}
~Job() override {
if (factory_) {
executor_->Destroy();
factory_->RemoveJob(this);
}
}
void FactoryDestroyed() {
executor_->Destroy();
executor_ = nullptr;
factory_ = nullptr;
}
private:
void OnExecutorReady(Executor* executor) override {
int error = OK;
if (executor->resolver()) {
resolver_out_->reset(new MultiThreadedProxyResolver(
std::move(resolver_factory_), max_num_threads_,
std::move(script_data_), executor_));
} else {
error = ERR_PAC_SCRIPT_FAILED;
executor_->Destroy();
}
factory_->RemoveJob(this);
factory_ = nullptr;
std::move(callback_).Run(error);
}
MultiThreadedProxyResolverFactory* factory_;
std::unique_ptr<ProxyResolver>* const resolver_out_;
std::unique_ptr<ProxyResolverFactory> resolver_factory_;
const size_t max_num_threads_;
scoped_refptr<PacFileData> script_data_;
scoped_refptr<Executor> executor_;
CompletionOnceCallback callback_;
};
MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory(
size_t max_num_threads,
bool factory_expects_bytes)
: ProxyResolverFactory(factory_expects_bytes),
max_num_threads_(max_num_threads) {
DCHECK_GE(max_num_threads, 1u);
}
MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() {
for (auto* job : jobs_) {
job->FactoryDestroyed();
}
}
int MultiThreadedProxyResolverFactory::CreateProxyResolver(
const scoped_refptr<PacFileData>& pac_script,
std::unique_ptr<ProxyResolver>* resolver,
CompletionOnceCallback callback,
std::unique_ptr<Request>* request) {
std::unique_ptr<Job> job(new Job(this, pac_script, resolver,
CreateProxyResolverFactory(),
max_num_threads_, std::move(callback)));
jobs_.insert(job.get());
*request = std::move(job);
return ERR_IO_PENDING;
}
void MultiThreadedProxyResolverFactory::RemoveJob(
MultiThreadedProxyResolverFactory::Job* job) {
size_t erased = jobs_.erase(job);
DCHECK_EQ(1u, erased);
}
} // namespace net