Line data Source code
1 : // Copyright 2017 the V8 project authors. All rights reserved.
2 : // Use of this source code is governed by a BSD-style license that can be
3 : // found in the LICENSE file.
4 :
5 : #include "src/heap/item-parallel-job.h"
6 :
7 : #include "src/isolate.h"
8 : #include "test/unittests/test-utils.h"
9 :
10 : namespace v8 {
11 : namespace internal {
12 :
13 12 : class ItemParallelJobTest : public TestWithIsolate {
14 : public:
15 6 : ItemParallelJobTest() : parallel_job_semaphore_(0) {}
16 :
17 6 : base::Semaphore* parallel_job_semaphore() { return ¶llel_job_semaphore_; }
18 :
19 : private:
20 : base::Semaphore parallel_job_semaphore_;
21 : DISALLOW_COPY_AND_ASSIGN(ItemParallelJobTest);
22 : };
23 :
24 : namespace {
25 :
26 4 : class SimpleTask : public ItemParallelJob::Task {
27 : public:
28 : SimpleTask(Isolate* isolate, bool* did_run)
29 2 : : ItemParallelJob::Task(isolate), did_run_(did_run) {}
30 :
31 2 : void RunInParallel() override {
32 : ItemParallelJob::Item* item = nullptr;
33 4 : while ((item = GetItem<ItemParallelJob::Item>()) != nullptr) {
34 1 : item->MarkFinished();
35 : }
36 2 : *did_run_ = true;
37 2 : }
38 :
39 : private:
40 : bool* did_run_;
41 : };
42 :
43 : // A simple work item which sets |was_processed| to true, if non-null, when it
44 : // is processed.
45 982 : class SimpleItem : public ItemParallelJob::Item {
46 : public:
47 : explicit SimpleItem(bool* was_processed = nullptr)
48 982 : : ItemParallelJob::Item(), was_processed_(was_processed) {}
49 : void Process() {
50 491 : if (was_processed_) *was_processed_ = true;
51 : }
52 :
53 : private:
54 : bool* was_processed_;
55 : };
56 :
57 2 : class EagerTask : public ItemParallelJob::Task {
58 : public:
59 1 : explicit EagerTask(Isolate* isolate) : ItemParallelJob::Task(isolate) {}
60 :
61 1 : void RunInParallel() override {
62 : SimpleItem* item = nullptr;
63 223 : while ((item = GetItem<SimpleItem>()) != nullptr) {
64 : item->Process();
65 111 : item->MarkFinished();
66 : }
67 1 : }
68 : };
69 :
70 : // A OneShotBarrier is meant to be passed to |counter| users. Users should
71 : // either Signal() or Wait() when done (based on whether they want to return
72 : // immediately or wait until others are also done).
73 2 : class OneShotBarrier {
74 : public:
75 2 : explicit OneShotBarrier(size_t counter) : counter_(counter) {
76 : DCHECK_GE(counter_, 0);
77 : }
78 :
79 2 : void Wait() {
80 : DCHECK_NE(counter_, 0);
81 2 : mutex_.Lock();
82 2 : counter_--;
83 2 : if (counter_ == 0) {
84 0 : condition_.NotifyAll();
85 : } else {
86 2 : while (counter_ > 0) {
87 2 : condition_.Wait(&mutex_);
88 : }
89 : }
90 2 : mutex_.Unlock();
91 2 : }
92 :
93 382 : void Signal() {
94 382 : mutex_.Lock();
95 382 : counter_--;
96 382 : if (counter_ == 0) {
97 2 : condition_.NotifyAll();
98 : }
99 382 : mutex_.Unlock();
100 382 : }
101 :
102 : private:
103 : base::Mutex mutex_;
104 : base::ConditionVariable condition_;
105 : size_t counter_;
106 : };
107 :
108 : // A task that only processes a single item. Signals |barrier| when done; if
109 : // |wait_when_done|, will blocks until all other tasks have signaled |barrier|.
110 : // If |did_process_an_item| is non-null, will set it to true if it does process
111 : // an item. Otherwise, it will expect to get an item to process (and will report
112 : // a failure if it doesn't).
113 768 : class TaskProcessingOneItem : public ItemParallelJob::Task {
114 : public:
115 : TaskProcessingOneItem(Isolate* isolate, OneShotBarrier* barrier,
116 : bool wait_when_done,
117 : bool* did_process_an_item = nullptr)
118 : : ItemParallelJob::Task(isolate),
119 : barrier_(barrier),
120 : wait_when_done_(wait_when_done),
121 384 : did_process_an_item_(did_process_an_item) {}
122 :
123 384 : void RunInParallel() override {
124 384 : SimpleItem* item = GetItem<SimpleItem>();
125 :
126 384 : if (did_process_an_item_) {
127 128 : *did_process_an_item_ = item != nullptr;
128 : } else {
129 256 : EXPECT_NE(nullptr, item);
130 : }
131 :
132 384 : if (item) {
133 : item->Process();
134 380 : item->MarkFinished();
135 : }
136 :
137 384 : if (wait_when_done_) {
138 2 : barrier_->Wait();
139 : } else {
140 382 : barrier_->Signal();
141 : }
142 384 : }
143 :
144 : private:
145 : OneShotBarrier* barrier_;
146 : bool wait_when_done_;
147 : bool* did_process_an_item_;
148 : };
149 :
150 : class TaskForDifferentItems;
151 :
152 2 : class BaseItem : public ItemParallelJob::Item {
153 : public:
154 4 : ~BaseItem() override = default;
155 : virtual void ProcessItem(TaskForDifferentItems* task) = 0;
156 : };
157 :
158 : class TaskForDifferentItems : public ItemParallelJob::Task {
159 : public:
160 : explicit TaskForDifferentItems(Isolate* isolate, bool* processed_a,
161 : bool* processed_b)
162 : : ItemParallelJob::Task(isolate),
163 : processed_a_(processed_a),
164 1 : processed_b_(processed_b) {}
165 2 : ~TaskForDifferentItems() override = default;
166 :
167 1 : void RunInParallel() override {
168 : BaseItem* item = nullptr;
169 5 : while ((item = GetItem<BaseItem>()) != nullptr) {
170 2 : item->ProcessItem(this);
171 2 : item->MarkFinished();
172 : }
173 1 : }
174 :
175 1 : void ProcessA() { *processed_a_ = true; }
176 1 : void ProcessB() { *processed_b_ = true; }
177 :
178 : private:
179 : bool* processed_a_;
180 : bool* processed_b_;
181 : };
182 :
183 1 : class ItemA : public BaseItem {
184 : public:
185 2 : ~ItemA() override = default;
186 2 : void ProcessItem(TaskForDifferentItems* task) override { task->ProcessA(); }
187 : };
188 :
189 1 : class ItemB : public BaseItem {
190 : public:
191 2 : ~ItemB() override = default;
192 2 : void ProcessItem(TaskForDifferentItems* task) override { task->ProcessB(); }
193 : };
194 :
195 : } // namespace
196 :
197 : // ItemParallelJob runs tasks even without work items (as requested tasks may be
198 : // responsible for post-processing).
199 15375 : TEST_F(ItemParallelJobTest, SimpleTaskWithNoItemsRuns) {
200 1 : bool did_run = false;
201 : ItemParallelJob job(i_isolate()->cancelable_task_manager(),
202 2 : parallel_job_semaphore());
203 2 : job.AddTask(new SimpleTask(i_isolate(), &did_run));
204 :
205 1 : job.Run();
206 1 : EXPECT_TRUE(did_run);
207 1 : }
208 :
209 15375 : TEST_F(ItemParallelJobTest, SimpleTaskWithSimpleItemRuns) {
210 1 : bool did_run = false;
211 : ItemParallelJob job(i_isolate()->cancelable_task_manager(),
212 2 : parallel_job_semaphore());
213 2 : job.AddTask(new SimpleTask(i_isolate(), &did_run));
214 :
215 1 : job.AddItem(new ItemParallelJob::Item);
216 :
217 1 : job.Run();
218 1 : EXPECT_TRUE(did_run);
219 1 : }
220 :
221 15375 : TEST_F(ItemParallelJobTest, MoreTasksThanItems) {
222 : const int kNumTasks = 128;
223 : const int kNumItems = kNumTasks - 4;
224 :
225 : TaskProcessingOneItem* tasks[kNumTasks] = {};
226 1 : bool did_process_an_item[kNumTasks] = {};
227 :
228 : ItemParallelJob job(i_isolate()->cancelable_task_manager(),
229 2 : parallel_job_semaphore());
230 :
231 : // The barrier ensures that all tasks run. But only the first kNumItems tasks
232 : // should be assigned an item to execute.
233 : OneShotBarrier barrier(kNumTasks);
234 257 : for (int i = 0; i < kNumTasks; i++) {
235 : // Block the main thread when done to prevent it from returning control to
236 : // the job (which could cancel tasks that have yet to be scheduled).
237 128 : const bool wait_when_done = i == 0;
238 : tasks[i] = new TaskProcessingOneItem(i_isolate(), &barrier, wait_when_done,
239 128 : &did_process_an_item[i]);
240 128 : job.AddTask(tasks[i]);
241 : }
242 :
243 249 : for (int i = 0; i < kNumItems; i++) {
244 124 : job.AddItem(new SimpleItem);
245 : }
246 :
247 1 : job.Run();
248 :
249 257 : for (int i = 0; i < kNumTasks; i++) {
250 : // Only the first kNumItems tasks should have been assigned a work item.
251 256 : EXPECT_EQ(i < kNumItems, did_process_an_item[i]);
252 : }
253 1 : }
254 :
255 15375 : TEST_F(ItemParallelJobTest, SingleThreadProcessing) {
256 : const int kItems = 111;
257 1 : bool was_processed[kItems] = {};
258 : ItemParallelJob job(i_isolate()->cancelable_task_manager(),
259 2 : parallel_job_semaphore());
260 2 : job.AddTask(new EagerTask(i_isolate()));
261 223 : for (int i = 0; i < kItems; i++) {
262 111 : job.AddItem(new SimpleItem(&was_processed[i]));
263 : }
264 1 : job.Run();
265 223 : for (int i = 0; i < kItems; i++) {
266 111 : EXPECT_TRUE(was_processed[i]);
267 : }
268 1 : }
269 :
270 15375 : TEST_F(ItemParallelJobTest, DistributeItemsMultipleTasks) {
271 : const int kItemsAndTasks = 256;
272 1 : bool was_processed[kItemsAndTasks] = {};
273 : OneShotBarrier barrier(kItemsAndTasks);
274 : ItemParallelJob job(i_isolate()->cancelable_task_manager(),
275 2 : parallel_job_semaphore());
276 513 : for (int i = 0; i < kItemsAndTasks; i++) {
277 256 : job.AddItem(new SimpleItem(&was_processed[i]));
278 :
279 : // Block the main thread when done to prevent it from returning control to
280 : // the job (which could cancel tasks that have yet to be scheduled).
281 256 : const bool wait_when_done = i == 0;
282 256 : job.AddTask(
283 256 : new TaskProcessingOneItem(i_isolate(), &barrier, wait_when_done));
284 : }
285 1 : job.Run();
286 513 : for (int i = 0; i < kItemsAndTasks; i++) {
287 256 : EXPECT_TRUE(was_processed[i]);
288 : }
289 1 : }
290 :
291 15375 : TEST_F(ItemParallelJobTest, DifferentItems) {
292 1 : bool item_a = false;
293 1 : bool item_b = false;
294 : ItemParallelJob job(i_isolate()->cancelable_task_manager(),
295 2 : parallel_job_semaphore());
296 1 : job.AddItem(new ItemA());
297 1 : job.AddItem(new ItemB());
298 2 : job.AddTask(new TaskForDifferentItems(i_isolate(), &item_a, &item_b));
299 1 : job.Run();
300 1 : EXPECT_TRUE(item_a);
301 1 : EXPECT_TRUE(item_b);
302 1 : }
303 :
304 : } // namespace internal
305 9222 : } // namespace v8
|