blob: bd9890c8e56a714dd26c8d3e54f97b03941b2858 [file] [log] [blame]
David Ghandehari9e5b5872016-07-28 09:50:04 -07001// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/proxy/multi_threaded_proxy_resolver.h"
6
7#include "base/bind.h"
8#include "base/message_loop_proxy.h"
9#include "base/metrics/histogram.h"
10#include "base/string_util.h"
11#include "base/stringprintf.h"
12#include "base/threading/thread.h"
13#include "base/threading/thread_restrictions.h"
14#include "net/base/net_errors.h"
15#include "net/base/net_log.h"
16#include "net/proxy/proxy_info.h"
17
18// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
19// data when SetPacScript fails. That will reclaim memory when
20// testing bogus scripts.
21
22namespace net {
23
24namespace {
25
26class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
27 public:
28 explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
29 void PurgeMemory() { resolver_->PurgeMemory(); }
30 private:
31 friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
32 ~PurgeMemoryTask() {}
33 ProxyResolver* resolver_;
34};
35
36} // namespace
37
38// An "executor" is a job-runner for PAC requests. It encapsulates a worker
39// thread and a synchronous ProxyResolver (which will be operated on said
40// thread.)
41class MultiThreadedProxyResolver::Executor
42 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
43 public:
44 // |coordinator| must remain valid throughout our lifetime. It is used to
45 // signal when the executor is ready to receive work by calling
46 // |coordinator->OnExecutorReady()|.
47 // The constructor takes ownership of |resolver|.
48 // |thread_number| is an identifier used when naming the worker thread.
49 Executor(MultiThreadedProxyResolver* coordinator,
50 ProxyResolver* resolver,
51 int thread_number);
52
53 // Submit a job to this executor.
54 void StartJob(Job* job);
55
56 // Callback for when a job has completed running on the executor's thread.
57 void OnJobCompleted(Job* job);
58
59 // Cleanup the executor. Cancels all outstanding work, and frees the thread
60 // and resolver.
61 void Destroy();
62
63 void PurgeMemory();
64
65 // Returns the outstanding job, or NULL.
66 Job* outstanding_job() const { return outstanding_job_.get(); }
67
68 ProxyResolver* resolver() { return resolver_.get(); }
69
70 int thread_number() const { return thread_number_; }
71
72 private:
73 friend class base::RefCountedThreadSafe<Executor>;
74 ~Executor();
75
76 MultiThreadedProxyResolver* coordinator_;
77 const int thread_number_;
78
79 // The currently active job for this executor (either a SetPacScript or
80 // GetProxyForURL task).
81 scoped_refptr<Job> outstanding_job_;
82
83 // The synchronous resolver implementation.
84 scoped_ptr<ProxyResolver> resolver_;
85
86 // The thread where |resolver_| is run on.
87 // Note that declaration ordering is important here. |thread_| needs to be
88 // destroyed *before* |resolver_|, in case |resolver_| is currently
89 // executing on |thread_|.
90 scoped_ptr<base::Thread> thread_;
91};
92
93// MultiThreadedProxyResolver::Job ---------------------------------------------
94
95class MultiThreadedProxyResolver::Job
96 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
97 public:
98 // Identifies the subclass of Job (only being used for debugging purposes).
99 enum Type {
100 TYPE_GET_PROXY_FOR_URL,
101 TYPE_SET_PAC_SCRIPT,
102 TYPE_SET_PAC_SCRIPT_INTERNAL,
103 };
104
105 Job(Type type, const CompletionCallback& callback)
106 : type_(type),
107 callback_(callback),
108 executor_(NULL),
109 was_cancelled_(false) {
110 }
111
112 void set_executor(Executor* executor) {
113 executor_ = executor;
114 }
115
116 // The "executor" is the job runner that is scheduling this job. If
117 // this job has not been submitted to an executor yet, this will be
118 // NULL (and we know it hasn't started yet).
119 Executor* executor() {
120 return executor_;
121 }
122
123 // Mark the job as having been cancelled.
124 void Cancel() {
125 was_cancelled_ = true;
126 }
127
128 // Returns true if Cancel() has been called.
129 bool was_cancelled() const { return was_cancelled_; }
130
131 Type type() const { return type_; }
132
133 // Returns true if this job still has a user callback. Some jobs
134 // do not have a user callback, because they were helper jobs
135 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
136 //
137 // Otherwise jobs that correspond with user-initiated work will
138 // have a non-null callback up until the callback is run.
139 bool has_user_callback() const { return !callback_.is_null(); }
140
141 // This method is called when the job is inserted into a wait queue
142 // because no executors were ready to accept it.
143 virtual void WaitingForThread() {}
144
145 // This method is called just before the job is posted to the work thread.
146 virtual void FinishedWaitingForThread() {}
147
148 // This method is called on the worker thread to do the job's work. On
149 // completion, implementors are expected to call OnJobCompleted() on
150 // |origin_loop|.
151 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0;
152
153 protected:
154 void OnJobCompleted() {
155 // |executor_| will be NULL if the executor has already been deleted.
156 if (executor_)
157 executor_->OnJobCompleted(this);
158 }
159
160 void RunUserCallback(int result) {
161 DCHECK(has_user_callback());
162 CompletionCallback callback = callback_;
163 // Reset the callback so has_user_callback() will now return false.
164 callback_.Reset();
165 callback.Run(result);
166 }
167
168 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
169
170 virtual ~Job() {}
171
172 private:
173 const Type type_;
174 CompletionCallback callback_;
175 Executor* executor_;
176 bool was_cancelled_;
177};
178
179// MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
180
181// Runs on the worker thread to call ProxyResolver::SetPacScript.
182class MultiThreadedProxyResolver::SetPacScriptJob
183 : public MultiThreadedProxyResolver::Job {
184 public:
185 SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
186 const CompletionCallback& callback)
187 : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
188 TYPE_SET_PAC_SCRIPT_INTERNAL,
189 callback),
190 script_data_(script_data) {
191 }
192
193 // Runs on the worker thread.
194 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
195 ProxyResolver* resolver = executor()->resolver();
196 int rv = resolver->SetPacScript(script_data_, CompletionCallback());
197
198 DCHECK_NE(rv, ERR_IO_PENDING);
199 origin_loop->PostTask(
200 FROM_HERE,
201 base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
202 }
203
204 protected:
205 virtual ~SetPacScriptJob() {}
206
207 private:
208 // Runs the completion callback on the origin thread.
209 void RequestComplete(int result_code) {
210 // The task may have been cancelled after it was started.
211 if (!was_cancelled() && has_user_callback()) {
212 RunUserCallback(result_code);
213 }
214 OnJobCompleted();
215 }
216
217 const scoped_refptr<ProxyResolverScriptData> script_data_;
218};
219
220// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
221
222class MultiThreadedProxyResolver::GetProxyForURLJob
223 : public MultiThreadedProxyResolver::Job {
224 public:
225 // |url| -- the URL of the query.
226 // |results| -- the structure to fill with proxy resolve results.
227 GetProxyForURLJob(const GURL& url,
228 ProxyInfo* results,
229 const CompletionCallback& callback,
230 const BoundNetLog& net_log)
231 : Job(TYPE_GET_PROXY_FOR_URL, callback),
232 results_(results),
233 net_log_(net_log),
234 url_(url),
235 was_waiting_for_thread_(false) {
236 DCHECK(!callback.is_null());
237 start_time_ = base::TimeTicks::Now();
238 }
239
240 BoundNetLog* net_log() { return &net_log_; }
241
242 virtual void WaitingForThread() OVERRIDE {
243 was_waiting_for_thread_ = true;
244 net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
245 }
246
247 virtual void FinishedWaitingForThread() OVERRIDE {
248 DCHECK(executor());
249
250 submitted_to_thread_time_ = base::TimeTicks::Now();
251
252 if (was_waiting_for_thread_) {
253 net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
254 }
255
256 net_log_.AddEvent(
257 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
258 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
259 }
260
261 // Runs on the worker thread.
262 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
263 ProxyResolver* resolver = executor()->resolver();
264 int rv = resolver->GetProxyForURL(
265 url_, &results_buf_, CompletionCallback(), NULL, net_log_);
266 DCHECK_NE(rv, ERR_IO_PENDING);
267
268 origin_loop->PostTask(
269 FROM_HERE,
270 base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
271 }
272
273 protected:
274 virtual ~GetProxyForURLJob() {}
275
276 private:
277 // Runs the completion callback on the origin thread.
278 void QueryComplete(int result_code) {
279 // The Job may have been cancelled after it was started.
280 if (!was_cancelled()) {
281 RecordPerformanceMetrics();
282 if (result_code >= OK) { // Note: unit-tests use values > 0.
283 results_->Use(results_buf_);
284 }
285 RunUserCallback(result_code);
286 }
287 OnJobCompleted();
288 }
289
290 void RecordPerformanceMetrics() {
291 DCHECK(!was_cancelled());
292
293 base::TimeTicks now = base::TimeTicks::Now();
294
295 // Log the total time the request took to complete.
296 UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Time",
297 now - start_time_);
298
299 // Log the time the request was stalled waiting for a thread to free up.
300 UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Thread_Wait_Time",
301 submitted_to_thread_time_ - start_time_);
302 }
303
304 // Must only be used on the "origin" thread.
305 ProxyInfo* results_;
306
307 // Can be used on either "origin" or worker thread.
308 BoundNetLog net_log_;
309 const GURL url_;
310
311 // Usable from within DoQuery on the worker thread.
312 ProxyInfo results_buf_;
313
314 base::TimeTicks start_time_;
315 base::TimeTicks submitted_to_thread_time_;
316
317 bool was_waiting_for_thread_;
318};
319
320// MultiThreadedProxyResolver::Executor ----------------------------------------
321
322MultiThreadedProxyResolver::Executor::Executor(
323 MultiThreadedProxyResolver* coordinator,
324 ProxyResolver* resolver,
325 int thread_number)
326 : coordinator_(coordinator),
327 thread_number_(thread_number),
328 resolver_(resolver) {
329 DCHECK(coordinator);
330 DCHECK(resolver);
331 // Start up the thread.
332 // Note that it is safe to pass a temporary C-String to Thread(), as it will
333 // make a copy.
334 std::string thread_name =
335 base::StringPrintf("PAC thread #%d", thread_number);
336 thread_.reset(new base::Thread(thread_name.c_str()));
337 CHECK(thread_->Start());
338}
339
340void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
341 DCHECK(!outstanding_job_);
342 outstanding_job_ = job;
343
344 // Run the job. Once it has completed (regardless of whether it was
345 // cancelled), it will invoke OnJobCompleted() on this thread.
346 job->set_executor(this);
347 job->FinishedWaitingForThread();
348 thread_->message_loop()->PostTask(
349 FROM_HERE,
350 base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
351}
352
353void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
354 DCHECK_EQ(job, outstanding_job_.get());
355 outstanding_job_ = NULL;
356 coordinator_->OnExecutorReady(this);
357}
358
359void MultiThreadedProxyResolver::Executor::Destroy() {
360 DCHECK(coordinator_);
361
362 // Give the resolver an opportunity to shutdown from THIS THREAD before
363 // joining on the resolver thread. This allows certain implementations
364 // to avoid deadlocks.
365 resolver_->Shutdown();
366
367 {
368 // See http://crbug.com/69710.
369 base::ThreadRestrictions::ScopedAllowIO allow_io;
370
371 // Join the worker thread.
372 thread_.reset();
373 }
374
375 // Cancel any outstanding job.
376 if (outstanding_job_) {
377 outstanding_job_->Cancel();
378 // Orphan the job (since this executor may be deleted soon).
379 outstanding_job_->set_executor(NULL);
380 }
381
382 // It is now safe to free the ProxyResolver, since all the tasks that
383 // were using it on the resolver thread have completed.
384 resolver_.reset();
385
386 // Null some stuff as a precaution.
387 coordinator_ = NULL;
388 outstanding_job_ = NULL;
389}
390
391void MultiThreadedProxyResolver::Executor::PurgeMemory() {
392 scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
393 thread_->message_loop()->PostTask(
394 FROM_HERE,
395 base::Bind(&PurgeMemoryTask::PurgeMemory, helper.get()));
396}
397
398MultiThreadedProxyResolver::Executor::~Executor() {
399 // The important cleanup happens as part of Destroy(), which should always be
400 // called first.
401 DCHECK(!coordinator_) << "Destroy() was not called";
402 DCHECK(!thread_.get());
403 DCHECK(!resolver_.get());
404 DCHECK(!outstanding_job_);
405}
406
407// MultiThreadedProxyResolver --------------------------------------------------
408
409MultiThreadedProxyResolver::MultiThreadedProxyResolver(
410 ProxyResolverFactory* resolver_factory,
411 size_t max_num_threads)
412 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
413 resolver_factory_(resolver_factory),
414 max_num_threads_(max_num_threads) {
415 DCHECK_GE(max_num_threads, 1u);
416}
417
418MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
419 // We will cancel all outstanding requests.
420 pending_jobs_.clear();
421 ReleaseAllExecutors();
422}
423
424int MultiThreadedProxyResolver::GetProxyForURL(
425 const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
426 RequestHandle* request, const BoundNetLog& net_log) {
427 DCHECK(CalledOnValidThread());
428 DCHECK(!callback.is_null());
429 DCHECK(current_script_data_.get())
430 << "Resolver is un-initialized. Must call SetPacScript() first!";
431
432 scoped_refptr<GetProxyForURLJob> job(
433 new GetProxyForURLJob(url, results, callback, net_log));
434
435 // Completion will be notified through |callback|, unless the caller cancels
436 // the request using |request|.
437 if (request)
438 *request = reinterpret_cast<RequestHandle>(job.get());
439
440 // If there is an executor that is ready to run this request, submit it!
441 Executor* executor = FindIdleExecutor();
442 if (executor) {
443 DCHECK_EQ(0u, pending_jobs_.size());
444 executor->StartJob(job);
445 return ERR_IO_PENDING;
446 }
447
448 // Otherwise queue this request. (We will schedule it to a thread once one
449 // becomes available).
450 job->WaitingForThread();
451 pending_jobs_.push_back(job);
452
453 // If we haven't already reached the thread limit, provision a new thread to
454 // drain the requests more quickly.
455 if (executors_.size() < max_num_threads_) {
456 executor = AddNewExecutor();
457 executor->StartJob(
458 new SetPacScriptJob(current_script_data_, CompletionCallback()));
459 }
460
461 return ERR_IO_PENDING;
462}
463
464void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
465 DCHECK(CalledOnValidThread());
466 DCHECK(req);
467
468 Job* job = reinterpret_cast<Job*>(req);
469 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
470
471 if (job->executor()) {
472 // If the job was already submitted to the executor, just mark it
473 // as cancelled so the user callback isn't run on completion.
474 job->Cancel();
475 } else {
476 // Otherwise the job is just sitting in a queue.
477 PendingJobsQueue::iterator it =
478 std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
479 DCHECK(it != pending_jobs_.end());
480 pending_jobs_.erase(it);
481 }
482}
483
484LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
485 DCHECK(CalledOnValidThread());
486 DCHECK(req);
487
488 Job* job = reinterpret_cast<Job*>(req);
489 if (job->executor())
490 return job->executor()->resolver()->GetLoadStateThreadSafe(NULL);
491 return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
492}
493
494LoadState MultiThreadedProxyResolver::GetLoadStateThreadSafe(
495 RequestHandle req) const {
496 NOTIMPLEMENTED();
497 return LOAD_STATE_IDLE;
498}
499
500void MultiThreadedProxyResolver::CancelSetPacScript() {
501 DCHECK(CalledOnValidThread());
502 DCHECK_EQ(0u, pending_jobs_.size());
503 DCHECK_EQ(1u, executors_.size());
504 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
505 executors_[0]->outstanding_job()->type());
506
507 // Defensively clear some data which shouldn't be getting used
508 // anymore.
509 current_script_data_ = NULL;
510
511 ReleaseAllExecutors();
512}
513
514void MultiThreadedProxyResolver::PurgeMemory() {
515 DCHECK(CalledOnValidThread());
516 for (ExecutorList::iterator it = executors_.begin();
517 it != executors_.end(); ++it) {
518 Executor* executor = *it;
519 executor->PurgeMemory();
520 }
521}
522
523int MultiThreadedProxyResolver::SetPacScript(
524 const scoped_refptr<ProxyResolverScriptData>& script_data,
525 const CompletionCallback&callback) {
526 DCHECK(CalledOnValidThread());
527 DCHECK(!callback.is_null());
528
529 // Save the script details, so we can provision new executors later.
530 current_script_data_ = script_data;
531
532 // The user should not have any outstanding requests when they call
533 // SetPacScript().
534 CheckNoOutstandingUserRequests();
535
536 // Destroy all of the current threads and their proxy resolvers.
537 ReleaseAllExecutors();
538
539 // Provision a new executor, and run the SetPacScript request. On completion
540 // notification will be sent through |callback|.
541 Executor* executor = AddNewExecutor();
542 executor->StartJob(new SetPacScriptJob(script_data, callback));
543 return ERR_IO_PENDING;
544}
545
546void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
547 DCHECK(CalledOnValidThread());
548 CHECK_EQ(0u, pending_jobs_.size());
549
550 for (ExecutorList::const_iterator it = executors_.begin();
551 it != executors_.end(); ++it) {
552 const Executor* executor = *it;
553 Job* job = executor->outstanding_job();
554 // The "has_user_callback()" is to exclude jobs for which the callback
555 // has already been invoked, or was not user-initiated (as in the case of
556 // lazy thread provisions). User-initiated jobs may !has_user_callback()
557 // when the callback has already been run. (Since we only clear the
558 // outstanding job AFTER the callback has been invoked, it is possible
559 // for a new request to be started from within the callback).
560 CHECK(!job || job->was_cancelled() || !job->has_user_callback());
561 }
562}
563
564void MultiThreadedProxyResolver::ReleaseAllExecutors() {
565 DCHECK(CalledOnValidThread());
566 for (ExecutorList::iterator it = executors_.begin();
567 it != executors_.end(); ++it) {
568 Executor* executor = *it;
569 executor->Destroy();
570 }
571 executors_.clear();
572}
573
574MultiThreadedProxyResolver::Executor*
575MultiThreadedProxyResolver::FindIdleExecutor() {
576 DCHECK(CalledOnValidThread());
577 for (ExecutorList::iterator it = executors_.begin();
578 it != executors_.end(); ++it) {
579 Executor* executor = *it;
580 if (!executor->outstanding_job())
581 return executor;
582 }
583 return NULL;
584}
585
586MultiThreadedProxyResolver::Executor*
587MultiThreadedProxyResolver::AddNewExecutor() {
588 DCHECK(CalledOnValidThread());
589 DCHECK_LT(executors_.size(), max_num_threads_);
590 // The "thread number" is used to give the thread a unique name.
591 int thread_number = executors_.size();
592 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
593 Executor* executor = new Executor(
594 this, resolver, thread_number);
595 executors_.push_back(make_scoped_refptr(executor));
596 return executor;
597}
598
599void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
600 DCHECK(CalledOnValidThread());
601 if (pending_jobs_.empty())
602 return;
603
604 // Get the next job to process (FIFO). Transfer it from the pending queue
605 // to the executor.
606 scoped_refptr<Job> job = pending_jobs_.front();
607 pending_jobs_.pop_front();
608 executor->StartJob(job);
609}
610
611} // namespace net