Line data Source code
1 : // Copyright 2017 the V8 project authors. All rights reserved.
2 : // Use of this source code is governed by a BSD-style license that can be
3 : // found in the LICENSE file.
4 :
5 : #ifndef V8_HEAP_ITEM_PARALLEL_JOB_H_
6 : #define V8_HEAP_ITEM_PARALLEL_JOB_H_
7 :
8 : #include <memory>
9 : #include <vector>
10 :
11 : #include "src/base/atomic-utils.h"
12 : #include "src/base/logging.h"
13 : #include "src/base/macros.h"
14 : #include "src/cancelable-task.h"
15 : #include "src/globals.h"
16 :
17 : namespace v8 {
18 :
19 : namespace base {
20 : class Semaphore;
21 : }
22 :
23 : namespace internal {
24 :
25 : class Counters;
26 : class Isolate;
27 :
28 : // This class manages background tasks that process a set of items in parallel.
29 : // The first task added is executed on the same thread as |job.Run()| is called.
30 : // All other tasks are scheduled in the background.
31 : //
32 : // - Items need to inherit from ItemParallelJob::Item.
33 : // - Tasks need to inherit from ItemParallelJob::Task.
34 : //
35 : // Items need to be marked as finished after processing them. Task and Item
36 : // ownership is transferred to the job.
37 : class V8_EXPORT_PRIVATE ItemParallelJob {
38 : public:
39 : class Task;
40 :
41 : class V8_EXPORT_PRIVATE Item {
42 : public:
43 665495 : Item() = default;
44 665495 : virtual ~Item() = default;
45 :
46 : // Marks an item as being finished.
47 1330526 : void MarkFinished() { CHECK_EQ(kProcessing, state_.exchange(kFinished)); }
48 :
49 : private:
50 : enum ProcessingState : uintptr_t { kAvailable, kProcessing, kFinished };
51 :
52 : bool TryMarkingAsProcessing() {
53 1416567 : ProcessingState available = kAvailable;
54 : return state_.compare_exchange_strong(available, kProcessing);
55 : }
56 : bool IsFinished() { return state_ == kFinished; }
57 :
58 : std::atomic<ProcessingState> state_{kAvailable};
59 :
60 : friend class ItemParallelJob;
61 : friend class ItemParallelJob::Task;
62 :
63 : DISALLOW_COPY_AND_ASSIGN(Item);
64 : };
65 :
66 : class V8_EXPORT_PRIVATE Task : public CancelableTask {
67 : public:
68 : explicit Task(Isolate* isolate);
69 862564 : ~Task() override = default;
70 :
71 : virtual void RunInParallel() = 0;
72 :
73 : protected:
74 : // Retrieves a new item that needs to be processed. Returns |nullptr| if
75 : // all items are processed. Upon returning an item, the task is required
76 : // to process the item and mark the item as finished after doing so.
77 : template <class ItemType>
78 1037281 : ItemType* GetItem() {
79 3580620 : while (items_considered_++ != items_->size()) {
80 : // Wrap around.
81 1416567 : if (cur_index_ == items_->size()) {
82 139950 : cur_index_ = 0;
83 : }
84 2833134 : Item* item = (*items_)[cur_index_++];
85 1416567 : if (item->TryMarkingAsProcessing()) {
86 : return static_cast<ItemType*>(item);
87 : }
88 : }
89 : return nullptr;
90 : }
91 :
92 : private:
93 : friend class ItemParallelJob;
94 : friend class Item;
95 :
96 : // Sets up state required before invoking Run(). If
97 : // |start_index is >= items_.size()|, this task will not process work items
98 : // (some jobs have more tasks than work items in order to parallelize post-
99 : // processing, e.g. scavenging).
100 : void SetupInternal(base::Semaphore* on_finish, std::vector<Item*>* items,
101 : size_t start_index);
102 :
103 : // We don't allow overriding this method any further.
104 : void RunInternal() final;
105 :
106 : std::vector<Item*>* items_ = nullptr;
107 : size_t cur_index_ = 0;
108 : size_t items_considered_ = 0;
109 : base::Semaphore* on_finish_ = nullptr;
110 :
111 : DISALLOW_COPY_AND_ASSIGN(Task);
112 : };
113 :
114 : ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
115 : base::Semaphore* pending_tasks);
116 :
117 : ~ItemParallelJob();
118 :
119 : // Adds a task to the job. Transfers ownership to the job.
120 1295616 : void AddTask(Task* task) { tasks_.push_back(std::unique_ptr<Task>(task)); }
121 :
122 : // Adds an item to the job. Transfers ownership to the job.
123 665495 : void AddItem(Item* item) { items_.push_back(item); }
124 :
125 135209 : int NumberOfItems() const { return static_cast<int>(items_.size()); }
126 0 : int NumberOfTasks() const { return static_cast<int>(tasks_.size()); }
127 :
128 : // Runs this job.
129 : void Run();
130 :
131 : private:
132 : std::vector<Item*> items_;
133 : std::vector<std::unique_ptr<Task>> tasks_;
134 : CancelableTaskManager* cancelable_task_manager_;
135 : base::Semaphore* pending_tasks_;
136 :
137 : DISALLOW_COPY_AND_ASSIGN(ItemParallelJob);
138 : };
139 :
140 : } // namespace internal
141 : } // namespace v8
142 :
143 : #endif // V8_HEAP_ITEM_PARALLEL_JOB_H_
|