Line data Source code
1 : // Copyright 2017 the V8 project authors. All rights reserved.
2 : // Use of this source code is governed by a BSD-style license that can be
3 : // found in the LICENSE file.
4 :
5 : #ifndef V8_HEAP_ITEM_PARALLEL_JOB_H_
6 : #define V8_HEAP_ITEM_PARALLEL_JOB_H_
7 :
8 : #include <memory>
9 : #include <vector>
10 :
11 : #include "src/base/atomic-utils.h"
12 : #include "src/base/logging.h"
13 : #include "src/base/macros.h"
14 : #include "src/base/optional.h"
15 : #include "src/cancelable-task.h"
16 : #include "src/counters.h"
17 : #include "src/globals.h"
18 :
19 : namespace v8 {
20 :
21 : namespace base {
22 : class Semaphore;
23 : }
24 :
25 : namespace internal {
26 :
27 : class Counters;
28 : class Isolate;
29 :
30 : // This class manages background tasks that process a set of items in parallel.
31 : // The first task added is executed on the same thread as |job.Run()| is called.
32 : // All other tasks are scheduled in the background.
33 : //
34 : // - Items need to inherit from ItemParallelJob::Item.
35 : // - Tasks need to inherit from ItemParallelJob::Task.
36 : //
37 : // Items need to be marked as finished after processing them. Task and Item
38 : // ownership is transferred to the job.
39 : //
40 : // Each parallel (non-main thread) task will report the time between the job
41 : // being created and it being scheduled to |gc_parallel_task_latency_histogram|.
42 : class V8_EXPORT_PRIVATE ItemParallelJob {
43 : public:
44 : class Task;
45 :
46 : class V8_EXPORT_PRIVATE Item {
47 : public:
48 725090 : Item() = default;
49 725090 : virtual ~Item() = default;
50 :
51 : // Marks an item as being finished.
52 1449600 : void MarkFinished() { CHECK_EQ(kProcessing, state_.exchange(kFinished)); }
53 :
54 : private:
55 : enum ProcessingState : uintptr_t { kAvailable, kProcessing, kFinished };
56 :
57 1595224 : bool TryMarkingAsProcessing() {
58 1595224 : ProcessingState available = kAvailable;
59 1595224 : return state_.compare_exchange_strong(available, kProcessing);
60 : }
61 : bool IsFinished() { return state_ == kFinished; }
62 :
63 : std::atomic<ProcessingState> state_{kAvailable};
64 :
65 : friend class ItemParallelJob;
66 : friend class ItemParallelJob::Task;
67 :
68 : DISALLOW_COPY_AND_ASSIGN(Item);
69 : };
70 :
71 : class V8_EXPORT_PRIVATE Task : public CancelableTask {
72 : public:
73 : explicit Task(Isolate* isolate);
74 : ~Task() override;
75 :
76 : virtual void RunInParallel() = 0;
77 :
78 : protected:
79 : // Retrieves a new item that needs to be processed. Returns |nullptr| if
80 : // all items are processed. Upon returning an item, the task is required
81 : // to process the item and mark the item as finished after doing so.
82 : template <class ItemType>
83 1157626 : ItemType* GetItem() {
84 5219918 : while (items_considered_++ != items_->size()) {
85 : // Wrap around.
86 5223202 : if (cur_index_ == items_->size()) {
87 170376 : cur_index_ = 0;
88 : }
89 3192056 : Item* item = (*items_)[cur_index_++];
90 1596028 : if (item->TryMarkingAsProcessing()) {
91 : return static_cast<ItemType*>(item);
92 : }
93 : }
94 : return nullptr;
95 : }
96 :
97 : private:
98 : friend class ItemParallelJob;
99 : friend class Item;
100 :
101 : // Sets up state required before invoking Run(). If
102 : // |start_index is >= items_.size()|, this task will not process work items
103 : // (some jobs have more tasks than work items in order to parallelize post-
104 : // processing, e.g. scavenging). If |gc_parallel_task_latency_histogram| is
105 : // provided, it will be used to report histograms on the latency between
106 : // posting the task and it being scheduled.
107 : void SetupInternal(
108 : base::Semaphore* on_finish, std::vector<Item*>* items,
109 : size_t start_index,
110 : base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram);
111 :
112 : // We don't allow overriding this method any further.
113 : void RunInternal() final;
114 :
115 : std::vector<Item*>* items_ = nullptr;
116 : size_t cur_index_ = 0;
117 : size_t items_considered_ = 0;
118 : base::Semaphore* on_finish_ = nullptr;
119 : base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram_;
120 :
121 : DISALLOW_COPY_AND_ASSIGN(Task);
122 : };
123 :
124 : ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
125 : base::Semaphore* pending_tasks);
126 :
127 : ~ItemParallelJob();
128 :
129 : // Adds a task to the job. Transfers ownership to the job.
130 1423266 : void AddTask(Task* task) { tasks_.push_back(std::unique_ptr<Task>(task)); }
131 :
132 : // Adds an item to the job. Transfers ownership to the job.
133 725090 : void AddItem(Item* item) { items_.push_back(item); }
134 :
135 308922 : int NumberOfItems() const { return static_cast<int>(items_.size()); }
136 0 : int NumberOfTasks() const { return static_cast<int>(tasks_.size()); }
137 :
138 : // Runs this job. Reporting metrics in a thread-safe manner to
139 : // |async_counters|.
140 : void Run(const std::shared_ptr<Counters>& async_counters);
141 :
142 : private:
143 : std::vector<Item*> items_;
144 : std::vector<std::unique_ptr<Task>> tasks_;
145 : CancelableTaskManager* cancelable_task_manager_;
146 : base::Semaphore* pending_tasks_;
147 : DISALLOW_COPY_AND_ASSIGN(ItemParallelJob);
148 : };
149 :
150 : } // namespace internal
151 : } // namespace v8
152 :
153 : #endif // V8_HEAP_ITEM_PARALLEL_JOB_H_
|