blob: db625adcd110b398d2e3cb84a74410f75b69cb99 [file] [log] [blame]
// Copyright 2022 The Cobalt Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef COBALT_DOM_SERIALIZED_ALGORITHM_RUNNER_H_
#define COBALT_DOM_SERIALIZED_ALGORITHM_RUNNER_H_
#include <memory>
#include <utility>
#include "base/bind.h"
#include "base/callback.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop.h"
#include "base/single_thread_task_runner.h"
#include "base/trace_event/trace_event.h"
#include "starboard/common/mutex.h"
namespace cobalt {
namespace dom {
// This class defines a common interface to run algorithms, and an algorithm is
// any class with the following methods:
// void Process(bool* finished);
// void Abort();
// void Finalize();
//
// For example, with a class:
// class Download {
// void Process(bool* finished) {
// // Download some data
// // *finished = whether download is finished.
// }
// void Abort() {
// // Cancel the download, and neither Process() nor Finalize() will be
// // called again.
// }
// void Finalize() {
// // Queue an event to notify that the download has finished.
// }
// };
//
// This class will keep calling Process() until |finished| becomes false, and
// then call Finalize(). It guarantees that all calls won't be overlapped and
// the member functions of the algorithm don't have to synchronize between them.
template <typename SerializedAlgorithm>
class SerializedAlgorithmRunner {
public:
// A handle object for a running algorithm instance, to allow for aborting and
// access the algorithm.
class Handle : public base::RefCountedThreadSafe<Handle> {
public:
// Abort the algorithm and no more processing will happen on return. It is
// possible that Process() has already finished asynchronously, in which
// case this function will call Finalize() instead (if it hasn't been called
// yet).
void Abort();
void Process(bool* finished);
void FinalizeIfNotAborted();
SerializedAlgorithm* algorithm() const { return algorithm_.get(); }
private:
friend class SerializedAlgorithmRunner;
// Provide synchronization only when |synchronization_required| is true.
// This allows bypassing of synchronization for algorithm runners that
// operate on a single thread, where a mutex could be reentrant if acquired
// due to nested calls.
class ScopedLockWhenRequired {
public:
ScopedLockWhenRequired(bool synchronization_required,
const starboard::Mutex& mutex)
: synchronization_required_(synchronization_required), mutex_(mutex) {
if (synchronization_required_) {
mutex_.Acquire();
}
}
~ScopedLockWhenRequired() {
if (synchronization_required_) {
mutex_.Release();
}
}
private:
const bool synchronization_required_;
const starboard::Mutex& mutex_;
};
Handle(bool synchronization_required,
std::unique_ptr<SerializedAlgorithm> algorithm);
// The |mutex_| is necessary for algorithm runners operate on multiple
// threads as `Abort()` can be called from any thread.
const bool synchronization_required_;
starboard::Mutex mutex_;
std::unique_ptr<SerializedAlgorithm> algorithm_;
bool aborted_ = false;
bool finished_ = false;
bool finalized_ = false;
};
virtual ~SerializedAlgorithmRunner() {}
virtual scoped_refptr<Handle> CreateHandle(
std::unique_ptr<SerializedAlgorithm> algorithm) = 0;
virtual void Start(scoped_refptr<Handle> handle) = 0;
protected:
scoped_refptr<Handle> CreateHandle(
bool synchronization_required,
std::unique_ptr<SerializedAlgorithm> algorithm) {
return new Handle(synchronization_required, std::move(algorithm));
}
};
template <typename SerializedAlgorithm>
SerializedAlgorithmRunner<SerializedAlgorithm>::Handle::Handle(
bool synchronization_required,
std::unique_ptr<SerializedAlgorithm> algorithm)
: synchronization_required_(synchronization_required),
algorithm_(std::move(algorithm)) {
DCHECK(algorithm_);
}
template <typename SerializedAlgorithm>
void SerializedAlgorithmRunner<SerializedAlgorithm>::Handle::Abort() {
TRACE_EVENT0("cobalt::dom", "SerializedAlgorithmRunner::Handle::Abort()");
ScopedLockWhenRequired scoped_lock(synchronization_required_, mutex_);
DCHECK(!aborted_); // Abort() cannot be called twice.
if (finished_) {
// If the algorithm has finished, just call Finalize() to treat it as
// finished instead of aborted.
if (!finalized_) {
algorithm_->Finalize();
}
} else {
algorithm_->Abort();
}
algorithm_.reset();
aborted_ = true;
}
template <typename SerializedAlgorithm>
void SerializedAlgorithmRunner<SerializedAlgorithm>::Handle::Process(
bool* finished) {
TRACE_EVENT0("cobalt::dom", "SerializedAlgorithmRunner::Handle::Process()");
DCHECK(finished);
ScopedLockWhenRequired scoped_lock(synchronization_required_, mutex_);
DCHECK(!finished_);
DCHECK(!finalized_);
if (aborted_) {
*finished = true;
return;
}
DCHECK(algorithm_);
algorithm_->Process(&finished_);
*finished = finished_;
}
template <typename SerializedAlgorithm>
void SerializedAlgorithmRunner<
SerializedAlgorithm>::Handle::FinalizeIfNotAborted() {
TRACE_EVENT0("cobalt::dom", "SerializedAlgorithmRunner::Handle::Finalize()");
ScopedLockWhenRequired scoped_lock(synchronization_required_, mutex_);
DCHECK(!finalized_);
if (aborted_) {
return;
}
DCHECK(finished_);
DCHECK(algorithm_);
algorithm_->Finalize();
algorithm_.reset();
finalized_ = true;
}
// This class runs algorithm on the task runner associated with the thread where
// Start() is called.
template <typename SerializedAlgorithm>
class DefaultAlgorithmRunner
: public SerializedAlgorithmRunner<SerializedAlgorithm> {
public:
explicit DefaultAlgorithmRunner(bool asynchronous_reduction_enabled)
: asynchronous_reduction_enabled_(asynchronous_reduction_enabled) {}
private:
typedef
typename SerializedAlgorithmRunner<SerializedAlgorithm>::Handle Handle;
scoped_refptr<Handle> CreateHandle(
std::unique_ptr<SerializedAlgorithm> algorithm) override;
void Start(scoped_refptr<Handle> handle) override;
void Process(scoped_refptr<Handle> handle);
const bool asynchronous_reduction_enabled_;
};
// This class runs algorithms on two task runners, it can be used to offload
// processing to another task runner.
//
// This class will keep calling the Process() member function of the algorithm
// on the process task runner, and then call Finalize() on the finalize task
// runner when |finished| becomes true.
template <typename SerializedAlgorithm>
class OffloadAlgorithmRunner
: public SerializedAlgorithmRunner<SerializedAlgorithm> {
public:
typedef base::SingleThreadTaskRunner TaskRunner;
OffloadAlgorithmRunner(const scoped_refptr<TaskRunner>& process_task_runner,
const scoped_refptr<TaskRunner>& finalize_task_runner);
private:
typedef
typename SerializedAlgorithmRunner<SerializedAlgorithm>::Handle Handle;
scoped_refptr<Handle> CreateHandle(
std::unique_ptr<SerializedAlgorithm> algorithm) override;
void Start(scoped_refptr<Handle> handle) override;
void Process(scoped_refptr<Handle> handle);
scoped_refptr<TaskRunner> process_task_runner_;
scoped_refptr<TaskRunner> finalize_task_runner_;
};
template <typename SerializedAlgorithm>
scoped_refptr<typename SerializedAlgorithmRunner<SerializedAlgorithm>::Handle>
DefaultAlgorithmRunner<SerializedAlgorithm>::CreateHandle(
std::unique_ptr<SerializedAlgorithm> algorithm) {
TRACE_EVENT0("cobalt::dom", "DefaultAlgorithmRunner::CreateHandle()");
const bool kSynchronizationRequired = false;
return SerializedAlgorithmRunner<SerializedAlgorithm>::CreateHandle(
kSynchronizationRequired, std::move(algorithm));
}
template <typename SerializedAlgorithm>
void DefaultAlgorithmRunner<SerializedAlgorithm>::Start(
scoped_refptr<Handle> handle) {
DCHECK(handle);
TRACE_EVENT0("cobalt::dom", "DefaultAlgorithmRunner::Start()");
if (asynchronous_reduction_enabled_) {
Process(handle);
return;
}
auto task_runner = base::MessageLoop::current()->task_runner();
task_runner->PostTask(FROM_HERE,
base::BindOnce(&DefaultAlgorithmRunner::Process,
base::Unretained(this), handle));
}
template <typename SerializedAlgorithm>
void DefaultAlgorithmRunner<SerializedAlgorithm>::Process(
scoped_refptr<Handle> handle) {
DCHECK(handle);
TRACE_EVENT0("cobalt::dom", "DefaultAlgorithmRunner::Process()");
auto task_runner = base::MessageLoop::current()->task_runner();
bool finished = false;
handle->Process(&finished);
if (finished) {
handle->FinalizeIfNotAborted();
return;
}
task_runner->PostTask(FROM_HERE,
base::BindOnce(&DefaultAlgorithmRunner::Process,
base::Unretained(this), handle));
}
template <typename SerializedAlgorithm>
OffloadAlgorithmRunner<SerializedAlgorithm>::OffloadAlgorithmRunner(
const scoped_refptr<TaskRunner>& process_task_runner,
const scoped_refptr<TaskRunner>& finalize_task_runner)
: process_task_runner_(process_task_runner),
finalize_task_runner_(finalize_task_runner) {
DCHECK(process_task_runner_);
DCHECK(finalize_task_runner_);
DCHECK_NE(process_task_runner_, finalize_task_runner_);
}
template <typename SerializedAlgorithm>
scoped_refptr<typename SerializedAlgorithmRunner<SerializedAlgorithm>::Handle>
OffloadAlgorithmRunner<SerializedAlgorithm>::CreateHandle(
std::unique_ptr<SerializedAlgorithm> algorithm) {
TRACE_EVENT0("cobalt::dom", "OffloadAlgorithmRunner::CreateHandle()");
const bool kSynchronizationRequired = true;
return SerializedAlgorithmRunner<SerializedAlgorithm>::CreateHandle(
kSynchronizationRequired, std::move(algorithm));
}
template <typename SerializedAlgorithm>
void OffloadAlgorithmRunner<SerializedAlgorithm>::Start(
scoped_refptr<Handle> handle) {
DCHECK(handle);
TRACE_EVENT0("cobalt::dom", "OffloadAlgorithmRunner::Start()");
process_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&OffloadAlgorithmRunner::Process,
base::Unretained(this), handle));
}
template <typename SerializedAlgorithm>
void OffloadAlgorithmRunner<SerializedAlgorithm>::Process(
scoped_refptr<Handle> handle) {
DCHECK(handle);
DCHECK(process_task_runner_->BelongsToCurrentThread());
TRACE_EVENT0("cobalt::dom", "OffloadAlgorithmRunner::Process()");
bool finished = false;
handle->Process(&finished);
if (finished) {
finalize_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&Handle::FinalizeIfNotAborted, handle));
return;
}
process_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&OffloadAlgorithmRunner::Process,
base::Unretained(this), handle));
}
} // namespace dom
} // namespace cobalt
#endif // COBALT_DOM_SERIALIZED_ALGORITHM_RUNNER_H_