LCOV - code coverage report
Current view: top level - src/heap - item-parallel-job.h (source / functions) Hit Total Coverage
Test: app.info Lines: 52 53 98.1 %
Date: 2017-10-20 Functions: 11 16 68.8 %

          Line data    Source code
       1             : // Copyright 2017 the V8 project authors. All rights reserved.
       2             : // Use of this source code is governed by a BSD-style license that can be
       3             : // found in the LICENSE file.
       4             : 
       5             : #ifndef V8_HEAP_ITEM_PARALLEL_JOB_
       6             : #define V8_HEAP_ITEM_PARALLEL_JOB_
       7             : 
       8             : #include <vector>
       9             : 
      10             : #include "src/base/platform/semaphore.h"
      11             : #include "src/cancelable-task.h"
      12             : #include "src/v8.h"
      13             : 
      14             : namespace v8 {
      15             : namespace internal {
      16             : 
      17             : class Isolate;
      18             : 
      19             : // This class manages background tasks that process a set of items in parallel.
      20             : // The first task added is executed on the same thread as |job.Run()| is called.
      21             : // All other tasks are scheduled in the background.
      22             : //
      23             : // - Items need to inherit from ItemParallelJob::Item.
      24             : // - Tasks need to inherit from ItemParallelJob::Task.
      25             : //
      26             : // Items need to be marked as finished after processing them. Task and Item
      27             : // ownership is transferred to the job.
      28             : class ItemParallelJob {
      29             :  public:
      30             :   class Task;
      31             : 
      32             :   class Item {
      33             :    public:
      34      556654 :     Item() : state_(kAvailable) {}
      35      556654 :     virtual ~Item() {}
      36             : 
      37             :     // Marks an item as being finished.
      38     1112991 :     void MarkFinished() { CHECK(state_.TrySetValue(kProcessing, kFinished)); }
      39             : 
      40             :    private:
      41             :     enum ProcessingState { kAvailable, kProcessing, kFinished };
      42             : 
      43     1229973 :     bool TryMarkingAsProcessing() {
      44     1231419 :       return state_.TrySetValue(kAvailable, kProcessing);
      45             :     }
      46     1113308 :     bool IsFinished() { return state_.Value() == kFinished; }
      47             : 
      48             :     base::AtomicValue<ProcessingState> state_;
      49             : 
      50             :     friend class ItemParallelJob;
      51             :     friend class ItemParallelJob::Task;
      52             : 
      53             :     DISALLOW_COPY_AND_ASSIGN(Item);
      54             :   };
      55             : 
      56             :   class Task : public CancelableTask {
      57             :    public:
      58             :     explicit Task(Isolate* isolate)
      59             :         : CancelableTask(isolate),
      60             :           items_(nullptr),
      61             :           cur_index_(0),
      62             :           items_considered_(0),
      63      331326 :           on_finish_(nullptr) {}
      64      330815 :     virtual ~Task() {}
      65             : 
      66             :     virtual void RunInParallel() = 0;
      67             : 
      68             :    protected:
      69             :     // Retrieves a new item that needs to be processed. Returns |nullptr| if
      70             :     // all items are processed. Upon returning an item, the task is required
      71             :     // to process the item and mark the item as finished after doing so.
      72             :     template <class ItemType>
      73      866225 :     ItemType* GetItem() {
      74     3948031 :       while (items_considered_++ != items_->size()) {
      75             :         // Wrap around.
      76     4002311 :         if (cur_index_ == items_->size()) {
      77      103233 :           cur_index_ = 0;
      78             :         }
      79     2461408 :         Item* item = (*items_)[cur_index_++];
      80     1230704 :         if (item->TryMarkingAsProcessing()) {
      81             :           return static_cast<ItemType*>(item);
      82             :         }
      83             :       }
      84             :       return nullptr;
      85             :     }
      86             : 
      87             :    private:
      88             :     void SetupInternal(base::Semaphore* on_finish, std::vector<Item*>* items,
      89             :                        size_t start_index) {
      90      331326 :       on_finish_ = on_finish;
      91      331326 :       items_ = items;
      92      331326 :       cur_index_ = start_index;
      93             :     }
      94             : 
      95             :     // We don't allow overriding this method any further.
      96      310130 :     void RunInternal() final {
      97      310130 :       RunInParallel();
      98      310050 :       on_finish_->Signal();
      99      310116 :     }
     100             : 
     101             :     std::vector<Item*>* items_;
     102             :     size_t cur_index_;
     103             :     size_t items_considered_;
     104             :     base::Semaphore* on_finish_;
     105             : 
     106             :     friend class ItemParallelJob;
     107             :     friend class Item;
     108             : 
     109             :     DISALLOW_COPY_AND_ASSIGN(Task);
     110             :   };
     111             : 
     112             :   ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
     113             :                   base::Semaphore* pending_tasks)
     114             :       : cancelable_task_manager_(cancelable_task_manager),
     115      200056 :         pending_tasks_(pending_tasks) {}
     116             : 
     117      200056 :   ~ItemParallelJob() {
     118     1513420 :     for (size_t i = 0; i < items_.size(); i++) {
     119     1313364 :       Item* item = items_[i];
     120      556654 :       CHECK(item->IsFinished());
     121      556654 :       delete item;
     122             :     }
     123      200056 :   }
     124             : 
     125             :   // Adds a task to the job. Transfers ownership to the job.
     126      331326 :   void AddTask(Task* task) { tasks_.push_back(task); }
     127             : 
     128             :   // Adds an item to the job. Transfers ownership to the job.
     129      556654 :   void AddItem(Item* item) { items_.push_back(item); }
     130             : 
     131      207606 :   int NumberOfItems() const { return static_cast<int>(items_.size()); }
     132           0 :   int NumberOfTasks() const { return static_cast<int>(tasks_.size()); }
     133             : 
     134      190259 :   void Run() {
     135             :     DCHECK_GE(tasks_.size(), 0);
     136      521585 :     const size_t num_tasks = tasks_.size();
     137      190259 :     const size_t num_items = items_.size();
     138      190259 :     const size_t items_per_task = (num_items + num_tasks - 1) / num_tasks;
     139             :     CancelableTaskManager::Id* task_ids =
     140      190259 :         new CancelableTaskManager::Id[num_tasks];
     141             :     size_t start_index = 0;
     142             :     Task* main_task = nullptr;
     143             :     Task* task = nullptr;
     144      521585 :     for (size_t i = 0; i < num_tasks; i++, start_index += items_per_task) {
     145      331326 :       task = tasks_[i];
     146      331326 :       if (start_index >= num_items) {
     147       28411 :         start_index -= num_items;
     148             :       }
     149      331326 :       task->SetupInternal(pending_tasks_, &items_, start_index);
     150      331326 :       task_ids[i] = task->id();
     151      331326 :       if (i > 0) {
     152      141067 :         V8::GetCurrentPlatform()->CallOnBackgroundThread(
     153      141067 :             task, v8::Platform::kShortRunningTask);
     154             :       } else {
     155             :         main_task = task;
     156             :       }
     157             :     }
     158             :     // Contribute on main thread.
     159      190259 :     main_task->Run();
     160      190259 :     delete main_task;
     161             :     // Wait for background tasks.
     162      331326 :     for (size_t i = 0; i < num_tasks; i++) {
     163      331326 :       if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
     164             :           CancelableTaskManager::kTaskAborted) {
     165      310265 :         pending_tasks_->Wait();
     166             :       }
     167             :     }
     168      190259 :     delete[] task_ids;
     169      190259 :   }
     170             : 
     171             :  private:
     172             :   std::vector<Item*> items_;
     173             :   std::vector<Task*> tasks_;
     174             :   CancelableTaskManager* cancelable_task_manager_;
     175             :   base::Semaphore* pending_tasks_;
     176             :   DISALLOW_COPY_AND_ASSIGN(ItemParallelJob);
     177             : };
     178             : 
     179             : }  // namespace internal
     180             : }  // namespace v8
     181             : 
     182             : #endif  // V8_HEAP_ITEM_PARALLEL_JOB_

Generated by: LCOV version 1.10