blob: 23c709f87b68ca7cced1298ef9939ef1e3ff3c8b [file] [log] [blame]
// Copyright 2017 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef V8_HEAP_ITEM_PARALLEL_JOB_
#define V8_HEAP_ITEM_PARALLEL_JOB_
#include <vector>
#include "src/base/platform/semaphore.h"
#include "src/cancelable-task.h"
#include "src/v8.h"
namespace v8 {
namespace internal {
class Isolate;
// This class manages background tasks that process a set of items in parallel.
// The first task added is executed on the same thread as |job.Run()| is called.
// All other tasks are scheduled in the background.
//
// - Items need to inherit from ItemParallelJob::Item.
// - Tasks need to inherit from ItemParallelJob::Task.
//
// Items need to be marked as finished after processing them. Task and Item
// ownership is transferred to the job.
class ItemParallelJob {
public:
class Task;
class Item {
public:
Item() : state_(kAvailable) {}
virtual ~Item() {}
// Marks an item as being finished.
void MarkFinished() { CHECK(state_.TrySetValue(kProcessing, kFinished)); }
private:
enum ProcessingState { kAvailable, kProcessing, kFinished };
bool TryMarkingAsProcessing() {
return state_.TrySetValue(kAvailable, kProcessing);
}
bool IsFinished() { return state_.Value() == kFinished; }
base::AtomicValue<ProcessingState> state_;
friend class ItemParallelJob;
friend class ItemParallelJob::Task;
DISALLOW_COPY_AND_ASSIGN(Item);
};
class Task : public CancelableTask {
public:
explicit Task(Isolate* isolate)
: CancelableTask(isolate),
items_(nullptr),
cur_index_(0),
items_considered_(0),
on_finish_(nullptr) {}
virtual ~Task() {}
virtual void RunInParallel() = 0;
protected:
// Retrieves a new item that needs to be processed. Returns |nullptr| if
// all items are processed. Upon returning an item, the task is required
// to process the item and mark the item as finished after doing so.
template <class ItemType>
ItemType* GetItem() {
while (items_considered_++ != items_->size()) {
// Wrap around.
if (cur_index_ == items_->size()) {
cur_index_ = 0;
}
Item* item = (*items_)[cur_index_++];
if (item->TryMarkingAsProcessing()) {
return static_cast<ItemType*>(item);
}
}
return nullptr;
}
private:
void SetupInternal(base::Semaphore* on_finish, std::vector<Item*>* items,
size_t start_index) {
on_finish_ = on_finish;
items_ = items;
cur_index_ = start_index;
}
// We don't allow overriding this method any further.
void RunInternal() final {
RunInParallel();
on_finish_->Signal();
}
std::vector<Item*>* items_;
size_t cur_index_;
size_t items_considered_;
base::Semaphore* on_finish_;
friend class ItemParallelJob;
friend class Item;
DISALLOW_COPY_AND_ASSIGN(Task);
};
ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
base::Semaphore* pending_tasks)
: cancelable_task_manager_(cancelable_task_manager),
pending_tasks_(pending_tasks) {}
~ItemParallelJob() {
for (size_t i = 0; i < items_.size(); i++) {
Item* item = items_[i];
CHECK(item->IsFinished());
delete item;
}
}
// Adds a task to the job. Transfers ownership to the job.
void AddTask(Task* task) { tasks_.push_back(task); }
// Adds an item to the job. Transfers ownership to the job.
void AddItem(Item* item) { items_.push_back(item); }
int NumberOfItems() const { return static_cast<int>(items_.size()); }
int NumberOfTasks() const { return static_cast<int>(tasks_.size()); }
void Run() {
DCHECK_GE(tasks_.size(), 0);
const size_t num_tasks = tasks_.size();
const size_t num_items = items_.size();
const size_t items_per_task = (num_items + num_tasks - 1) / num_tasks;
CancelableTaskManager::Id* task_ids =
new CancelableTaskManager::Id[num_tasks];
size_t start_index = 0;
Task* main_task = nullptr;
Task* task = nullptr;
for (size_t i = 0; i < num_tasks; i++, start_index += items_per_task) {
task = tasks_[i];
if (start_index >= num_items) {
start_index -= num_items;
}
task->SetupInternal(pending_tasks_, &items_, start_index);
task_ids[i] = task->id();
if (i > 0) {
V8::GetCurrentPlatform()->CallOnBackgroundThread(
task, v8::Platform::kShortRunningTask);
} else {
main_task = task;
}
}
// Contribute on main thread.
main_task->Run();
delete main_task;
// Wait for background tasks.
for (size_t i = 0; i < num_tasks; i++) {
if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
CancelableTaskManager::kTaskAborted) {
pending_tasks_->Wait();
}
}
delete[] task_ids;
}
private:
std::vector<Item*> items_;
std::vector<Task*> tasks_;
CancelableTaskManager* cancelable_task_manager_;
base::Semaphore* pending_tasks_;
DISALLOW_COPY_AND_ASSIGN(ItemParallelJob);
};
} // namespace internal
} // namespace v8
#endif // V8_HEAP_ITEM_PARALLEL_JOB_