Line data Source code
1 : // Copyright 2017 the V8 project authors. All rights reserved.
2 : // Use of this source code is governed by a BSD-style license that can be
3 : // found in the LICENSE file.
4 :
5 : #ifndef V8_HEAP_ITEM_PARALLEL_JOB_
6 : #define V8_HEAP_ITEM_PARALLEL_JOB_
7 :
8 : #include <vector>
9 :
10 : #include "src/base/platform/semaphore.h"
11 : #include "src/cancelable-task.h"
12 : #include "src/v8.h"
13 :
14 : namespace v8 {
15 : namespace internal {
16 :
17 : class Isolate;
18 :
19 : // This class manages background tasks that process a set of items in parallel.
20 : // The first task added is executed on the same thread as |job.Run()| is called.
21 : // All other tasks are scheduled in the background.
22 : //
23 : // - Items need to inherit from ItemParallelJob::Item.
24 : // - Tasks need to inherit from ItemParallelJob::Task.
25 : //
26 : // Items need to be marked as finished after processing them. Task and Item
27 : // ownership is transferred to the job.
28 : class ItemParallelJob {
29 : public:
30 : class Task;
31 :
32 : class Item {
33 : public:
34 556654 : Item() : state_(kAvailable) {}
35 556654 : virtual ~Item() {}
36 :
37 : // Marks an item as being finished.
38 1112991 : void MarkFinished() { CHECK(state_.TrySetValue(kProcessing, kFinished)); }
39 :
40 : private:
41 : enum ProcessingState { kAvailable, kProcessing, kFinished };
42 :
43 1229973 : bool TryMarkingAsProcessing() {
44 1231419 : return state_.TrySetValue(kAvailable, kProcessing);
45 : }
46 1113308 : bool IsFinished() { return state_.Value() == kFinished; }
47 :
48 : base::AtomicValue<ProcessingState> state_;
49 :
50 : friend class ItemParallelJob;
51 : friend class ItemParallelJob::Task;
52 :
53 : DISALLOW_COPY_AND_ASSIGN(Item);
54 : };
55 :
56 : class Task : public CancelableTask {
57 : public:
58 : explicit Task(Isolate* isolate)
59 : : CancelableTask(isolate),
60 : items_(nullptr),
61 : cur_index_(0),
62 : items_considered_(0),
63 331326 : on_finish_(nullptr) {}
64 330815 : virtual ~Task() {}
65 :
66 : virtual void RunInParallel() = 0;
67 :
68 : protected:
69 : // Retrieves a new item that needs to be processed. Returns |nullptr| if
70 : // all items are processed. Upon returning an item, the task is required
71 : // to process the item and mark the item as finished after doing so.
72 : template <class ItemType>
73 866225 : ItemType* GetItem() {
74 3948031 : while (items_considered_++ != items_->size()) {
75 : // Wrap around.
76 4002311 : if (cur_index_ == items_->size()) {
77 103233 : cur_index_ = 0;
78 : }
79 2461408 : Item* item = (*items_)[cur_index_++];
80 1230704 : if (item->TryMarkingAsProcessing()) {
81 : return static_cast<ItemType*>(item);
82 : }
83 : }
84 : return nullptr;
85 : }
86 :
87 : private:
88 : void SetupInternal(base::Semaphore* on_finish, std::vector<Item*>* items,
89 : size_t start_index) {
90 331326 : on_finish_ = on_finish;
91 331326 : items_ = items;
92 331326 : cur_index_ = start_index;
93 : }
94 :
95 : // We don't allow overriding this method any further.
96 310130 : void RunInternal() final {
97 310130 : RunInParallel();
98 310050 : on_finish_->Signal();
99 310116 : }
100 :
101 : std::vector<Item*>* items_;
102 : size_t cur_index_;
103 : size_t items_considered_;
104 : base::Semaphore* on_finish_;
105 :
106 : friend class ItemParallelJob;
107 : friend class Item;
108 :
109 : DISALLOW_COPY_AND_ASSIGN(Task);
110 : };
111 :
112 : ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
113 : base::Semaphore* pending_tasks)
114 : : cancelable_task_manager_(cancelable_task_manager),
115 200056 : pending_tasks_(pending_tasks) {}
116 :
117 200056 : ~ItemParallelJob() {
118 1513420 : for (size_t i = 0; i < items_.size(); i++) {
119 1313364 : Item* item = items_[i];
120 556654 : CHECK(item->IsFinished());
121 556654 : delete item;
122 : }
123 200056 : }
124 :
125 : // Adds a task to the job. Transfers ownership to the job.
126 331326 : void AddTask(Task* task) { tasks_.push_back(task); }
127 :
128 : // Adds an item to the job. Transfers ownership to the job.
129 556654 : void AddItem(Item* item) { items_.push_back(item); }
130 :
131 207606 : int NumberOfItems() const { return static_cast<int>(items_.size()); }
132 0 : int NumberOfTasks() const { return static_cast<int>(tasks_.size()); }
133 :
134 190259 : void Run() {
135 : DCHECK_GE(tasks_.size(), 0);
136 521585 : const size_t num_tasks = tasks_.size();
137 190259 : const size_t num_items = items_.size();
138 190259 : const size_t items_per_task = (num_items + num_tasks - 1) / num_tasks;
139 : CancelableTaskManager::Id* task_ids =
140 190259 : new CancelableTaskManager::Id[num_tasks];
141 : size_t start_index = 0;
142 : Task* main_task = nullptr;
143 : Task* task = nullptr;
144 521585 : for (size_t i = 0; i < num_tasks; i++, start_index += items_per_task) {
145 331326 : task = tasks_[i];
146 331326 : if (start_index >= num_items) {
147 28411 : start_index -= num_items;
148 : }
149 331326 : task->SetupInternal(pending_tasks_, &items_, start_index);
150 331326 : task_ids[i] = task->id();
151 331326 : if (i > 0) {
152 141067 : V8::GetCurrentPlatform()->CallOnBackgroundThread(
153 141067 : task, v8::Platform::kShortRunningTask);
154 : } else {
155 : main_task = task;
156 : }
157 : }
158 : // Contribute on main thread.
159 190259 : main_task->Run();
160 190259 : delete main_task;
161 : // Wait for background tasks.
162 331326 : for (size_t i = 0; i < num_tasks; i++) {
163 331326 : if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
164 : CancelableTaskManager::kTaskAborted) {
165 310265 : pending_tasks_->Wait();
166 : }
167 : }
168 190259 : delete[] task_ids;
169 190259 : }
170 :
171 : private:
172 : std::vector<Item*> items_;
173 : std::vector<Task*> tasks_;
174 : CancelableTaskManager* cancelable_task_manager_;
175 : base::Semaphore* pending_tasks_;
176 : DISALLOW_COPY_AND_ASSIGN(ItemParallelJob);
177 : };
178 :
179 : } // namespace internal
180 : } // namespace v8
181 :
182 : #endif // V8_HEAP_ITEM_PARALLEL_JOB_
|