Line data Source code
1 : // Copyright 2018 the V8 project authors. All rights reserved.
2 : // Use of this source code is governed by a BSD-style license that can be
3 : // found in the LICENSE file.
4 :
5 : #include "src/heap/item-parallel-job.h"
6 :
7 : #include "src/base/platform/semaphore.h"
8 : #include "src/v8.h"
9 :
10 : namespace v8 {
11 : namespace internal {
12 :
13 474422 : ItemParallelJob::Task::Task(Isolate* isolate) : CancelableTask(isolate) {}
14 :
15 473711 : ItemParallelJob::Task::~Task() {
16 : // The histogram is reset in RunInternal(). If it's still around it means
17 : // this task was cancelled before being scheduled.
18 473711 : if (gc_parallel_task_latency_histogram_)
19 : gc_parallel_task_latency_histogram_->RecordAbandon();
20 474284 : }
21 :
22 0 : void ItemParallelJob::Task::SetupInternal(
23 474422 : base::Semaphore* on_finish, std::vector<Item*>* items, size_t start_index,
24 : base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram) {
25 474422 : on_finish_ = on_finish;
26 474422 : items_ = items;
27 :
28 474422 : if (start_index < items->size()) {
29 471142 : cur_index_ = start_index;
30 : } else {
31 3280 : items_considered_ = items_->size();
32 : }
33 :
34 : gc_parallel_task_latency_histogram_ =
35 : std::move(gc_parallel_task_latency_histogram);
36 0 : }
37 :
38 435398 : void ItemParallelJob::Task::RunInternal() {
39 435398 : if (gc_parallel_task_latency_histogram_) {
40 : gc_parallel_task_latency_histogram_->RecordDone();
41 : gc_parallel_task_latency_histogram_.reset();
42 : }
43 :
44 435407 : RunInParallel();
45 435529 : on_finish_->Signal();
46 435481 : }
47 :
48 274076 : ItemParallelJob::ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
49 : base::Semaphore* pending_tasks)
50 : : cancelable_task_manager_(cancelable_task_manager),
51 274076 : pending_tasks_(pending_tasks) {}
52 :
53 548152 : ItemParallelJob::~ItemParallelJob() {
54 1998332 : for (size_t i = 0; i < items_.size(); i++) {
55 1724256 : Item* item = items_[i];
56 725090 : CHECK(item->IsFinished());
57 725090 : delete item;
58 : }
59 274076 : }
60 :
61 261553 : void ItemParallelJob::Run(const std::shared_ptr<Counters>& async_counters) {
62 : DCHECK_GT(tasks_.size(), 0);
63 261553 : const size_t num_items = items_.size();
64 997528 : const size_t num_tasks = tasks_.size();
65 :
66 523106 : TRACE_EVENT_INSTANT2(TRACE_DISABLED_BY_DEFAULT("v8.gc"),
67 : "ItemParallelJob::Run", TRACE_EVENT_SCOPE_THREAD,
68 : "num_tasks", static_cast<int>(num_tasks), "num_items",
69 : static_cast<int>(num_items));
70 :
71 : AsyncTimedHistogram gc_parallel_task_latency_histogram(
72 261553 : async_counters->gc_parallel_task_latency(), async_counters);
73 :
74 : // Some jobs have more tasks than items (when the items are mere coarse
75 : // grain tasks that generate work dynamically for a second phase which all
76 : // tasks participate in). Some jobs even have 0 items to preprocess but
77 : // still have multiple tasks.
78 : // TODO(gab): Figure out a cleaner scheme for this.
79 : const size_t num_tasks_processing_items = Min(num_items, tasks_.size());
80 :
81 : // In the event of an uneven workload, distribute an extra item to the first
82 : // |items_remainder| tasks.
83 : const size_t items_remainder = num_tasks_processing_items > 0
84 : ? num_items % num_tasks_processing_items
85 261553 : : 0;
86 : // Base |items_per_task|, will be bumped by 1 for the first
87 : // |items_remainder| tasks.
88 : const size_t items_per_task = num_tasks_processing_items > 0
89 : ? num_items / num_tasks_processing_items
90 261553 : : 0;
91 : CancelableTaskManager::Id* task_ids =
92 261553 : new CancelableTaskManager::Id[num_tasks];
93 : std::unique_ptr<Task> main_task;
94 1210397 : for (size_t i = 0, start_index = 0; i < num_tasks;
95 474422 : i++, start_index += items_per_task + (i < items_remainder ? 1 : 0)) {
96 : auto task = std::move(tasks_[i]);
97 : DCHECK(task);
98 :
99 : // By definition there are less |items_remainder| to distribute then
100 : // there are tasks processing items so this cannot overflow while we are
101 : // assigning work items.
102 : DCHECK_IMPLIES(start_index >= num_items, i >= num_tasks_processing_items);
103 :
104 : task->SetupInternal(pending_tasks_, &items_, start_index,
105 : i > 0 ? gc_parallel_task_latency_histogram
106 1423266 : : base::Optional<AsyncTimedHistogram>());
107 474422 : task_ids[i] = task->id();
108 474422 : if (i > 0) {
109 638607 : V8::GetCurrentPlatform()->CallBlockingTaskOnWorkerThread(std::move(task));
110 : } else {
111 : main_task = std::move(task);
112 : }
113 : }
114 :
115 : // Contribute on main thread.
116 : DCHECK(main_task);
117 261553 : main_task->Run();
118 :
119 : // Wait for background tasks.
120 735975 : for (size_t i = 0; i < num_tasks; i++) {
121 474422 : if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
122 : TryAbortResult::kTaskAborted) {
123 435626 : pending_tasks_->Wait();
124 : }
125 : }
126 261553 : delete[] task_ids;
127 261553 : }
128 :
129 : } // namespace internal
130 183867 : } // namespace v8
|