Line data Source code
1 : // Copyright 2016 the V8 project authors. All rights reserved.
2 : // Use of this source code is governed by a BSD-style license that can be
3 : // found in the LICENSE file.
4 :
5 : #ifndef V8_HEAP_PAGE_PARALLEL_JOB_
6 : #define V8_HEAP_PAGE_PARALLEL_JOB_
7 :
8 : #include "src/allocation.h"
9 : #include "src/cancelable-task.h"
10 : #include "src/utils.h"
11 : #include "src/v8.h"
12 :
13 : namespace v8 {
14 : namespace internal {
15 :
16 : class Heap;
17 : class Isolate;
18 :
19 : // This class manages background tasks that process set of pages in parallel.
20 : // The JobTraits class needs to define:
21 : // - PerPageData type - state associated with each page.
22 : // - PerTaskData type - state associated with each task.
23 : // - static bool ProcessPageInParallel(Heap* heap,
24 : // PerTaskData task_data,
25 : // MemoryChunk* page,
26 : // PerPageData page_data)
27 : // The function should return true iff processing succeeded.
28 : // - static const bool NeedSequentialFinalization
29 : // - static void FinalizePageSequentially(Heap* heap,
30 : // bool processing_succeeded,
31 : // MemoryChunk* page,
32 : // PerPageData page_data)
33 : template <typename JobTraits>
34 : class PageParallelJob {
35 : public:
36 : // PageParallelJob cannot dynamically create a semaphore because of a bug in
37 : // glibc. See http://crbug.com/609249 and
38 : // https://sourceware.org/bugzilla/show_bug.cgi?id=12674.
39 : // The caller must provide a semaphore with value 0 and ensure that
40 : // the lifetime of the semaphore is the same as the lifetime of the Isolate.
41 : // It is guaranteed that the semaphore value will be 0 after Run() call.
42 : PageParallelJob(Heap* heap, CancelableTaskManager* cancelable_task_manager,
43 : base::Semaphore* semaphore)
44 : : heap_(heap),
45 : cancelable_task_manager_(cancelable_task_manager),
46 : items_(nullptr),
47 : num_items_(0),
48 : num_tasks_(0),
49 213384 : pending_tasks_(semaphore) {}
50 :
51 : ~PageParallelJob() {
52 213384 : Item* item = items_;
53 586184 : while (item != nullptr) {
54 372800 : Item* next = item->next;
55 : delete item;
56 : item = next;
57 : }
58 : }
59 :
60 : void AddPage(MemoryChunk* chunk, typename JobTraits::PerPageData data) {
61 372800 : Item* item = new Item(chunk, data, items_);
62 372800 : items_ = item;
63 372800 : ++num_items_;
64 : }
65 :
66 : int NumberOfPages() const { return num_items_; }
67 :
68 : // Returns the number of tasks that were spawned when running the job.
69 : int NumberOfTasks() const { return num_tasks_; }
70 :
71 : // Runs the given number of tasks in parallel and processes the previously
72 : // added pages. This function blocks until all tasks finish.
73 : // The callback takes the index of a task and returns data for that task.
74 : template <typename Callback>
75 213384 : void Run(int num_tasks, Callback per_task_data_callback) {
76 266701 : if (num_items_ == 0) return;
77 : DCHECK_GE(num_tasks, 1);
78 : uint32_t task_ids[kMaxNumberOfTasks];
79 : const int max_num_tasks = Min(
80 : kMaxNumberOfTasks,
81 : static_cast<int>(
82 160067 : V8::GetCurrentPlatform()->NumberOfAvailableBackgroundThreads()));
83 160067 : num_tasks_ = Max(1, Min(num_tasks, max_num_tasks));
84 160067 : int items_per_task = (num_items_ + num_tasks_ - 1) / num_tasks_;
85 : int start_index = 0;
86 : Task* main_task = nullptr;
87 349506 : for (int i = 0; i < num_tasks_; i++, start_index += items_per_task) {
88 189439 : if (start_index >= num_items_) {
89 298 : start_index -= num_items_;
90 : }
91 : Task* task = new Task(heap_, items_, num_items_, start_index,
92 189439 : pending_tasks_, per_task_data_callback(i));
93 189439 : task_ids[i] = task->id();
94 189439 : if (i > 0) {
95 29372 : V8::GetCurrentPlatform()->CallOnBackgroundThread(
96 : task, v8::Platform::kShortRunningTask);
97 : } else {
98 : main_task = task;
99 : }
100 : }
101 : // Contribute on main thread.
102 160067 : main_task->Run();
103 160067 : delete main_task;
104 : // Wait for background tasks.
105 189439 : for (int i = 0; i < num_tasks_; i++) {
106 189439 : if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
107 : CancelableTaskManager::kTaskAborted) {
108 188109 : pending_tasks_->Wait();
109 : }
110 : }
111 : if (JobTraits::NeedSequentialFinalization) {
112 53346 : Item* item = items_;
113 187524 : while (item != nullptr) {
114 80832 : bool success = (item->state.Value() == kFinished);
115 80832 : JobTraits::FinalizePageSequentially(heap_, item->chunk, success,
116 80832 : item->data);
117 80832 : item = item->next;
118 : }
119 : }
120 : }
121 :
122 : private:
123 : static const int kMaxNumberOfTasks = 10;
124 :
125 : enum ProcessingState { kAvailable, kProcessing, kFinished, kFailed };
126 :
127 : struct Item : public Malloced {
128 : Item(MemoryChunk* chunk, typename JobTraits::PerPageData data, Item* next)
129 745600 : : chunk(chunk), state(kAvailable), data(data), next(next) {}
130 : MemoryChunk* chunk;
131 : base::AtomicValue<ProcessingState> state;
132 : typename JobTraits::PerPageData data;
133 : Item* next;
134 : };
135 :
136 : class Task : public CancelableTask {
137 : public:
138 : Task(Heap* heap, Item* items, int num_items, int start_index,
139 : base::Semaphore* on_finish, typename JobTraits::PerTaskData data)
140 : : CancelableTask(heap->isolate()),
141 : heap_(heap),
142 : items_(items),
143 : num_items_(num_items),
144 : start_index_(start_index),
145 : on_finish_(on_finish),
146 189439 : data_(data) {}
147 :
148 378795 : virtual ~Task() {}
149 :
150 : private:
151 : // v8::internal::CancelableTask overrides.
152 188232 : void RunInternal() override {
153 : // Each task starts at a different index to improve parallelization.
154 188232 : Item* current = items_;
155 188232 : int skip = start_index_;
156 483295 : while (skip-- > 0) {
157 106831 : current = current->next;
158 : }
159 576317 : for (int i = 0; i < num_items_; i++) {
160 576556 : if (current->state.TrySetValue(kAvailable, kProcessing)) {
161 : bool success = JobTraits::ProcessPageInParallel(
162 372762 : heap_, data_, current->chunk, current->data);
163 372689 : current->state.SetValue(success ? kFinished : kFailed);
164 : }
165 576317 : current = current->next;
166 : // Wrap around if needed.
167 576317 : if (current == nullptr) {
168 188083 : current = items_;
169 : }
170 : }
171 188081 : on_finish_->Signal();
172 188088 : }
173 :
174 : Heap* heap_;
175 : Item* items_;
176 : int num_items_;
177 : int start_index_;
178 : base::Semaphore* on_finish_;
179 : typename JobTraits::PerTaskData data_;
180 : DISALLOW_COPY_AND_ASSIGN(Task);
181 : };
182 :
183 : Heap* heap_;
184 : CancelableTaskManager* cancelable_task_manager_;
185 : Item* items_;
186 : int num_items_;
187 : int num_tasks_;
188 : base::Semaphore* pending_tasks_;
189 : DISALLOW_COPY_AND_ASSIGN(PageParallelJob);
190 : };
191 :
192 : } // namespace internal
193 : } // namespace v8
194 :
195 : #endif // V8_HEAP_PAGE_PARALLEL_JOB_
|