Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/gfx/2d/JobScheduler.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this
5
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include "JobScheduler.h"
8
#include "Logging.h"
9
10
namespace mozilla {
11
namespace gfx {
12
13
JobScheduler* JobScheduler::sSingleton = nullptr;
14
15
bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues)
16
0
{
17
0
  MOZ_ASSERT(!sSingleton);
18
0
  MOZ_ASSERT(aNumThreads >= aNumQueues);
19
0
20
0
  sSingleton = new JobScheduler();
21
0
  sSingleton->mNextQueue = 0;
22
0
23
0
  for (uint32_t i = 0; i < aNumQueues; ++i) {
24
0
    sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue());
25
0
  }
26
0
27
0
  for (uint32_t i = 0; i < aNumThreads; ++i) {
28
0
    sSingleton->mWorkerThreads.push_back(WorkerThread::Create(sSingleton->mDrawingQueues[i%aNumQueues]));
29
0
  }
30
0
  return true;
31
0
}
32
33
void JobScheduler::ShutDown()
34
0
{
35
0
  MOZ_ASSERT(IsEnabled());
36
0
  if (!IsEnabled()) {
37
0
    return;
38
0
  }
39
0
40
0
  for (auto queue : sSingleton->mDrawingQueues) {
41
0
    queue->ShutDown();
42
0
    delete queue;
43
0
  }
44
0
45
0
  for (WorkerThread* thread : sSingleton->mWorkerThreads) {
46
0
    // this will block until the thread is joined.
47
0
    delete thread;
48
0
  }
49
0
50
0
  sSingleton->mWorkerThreads.clear();
51
0
  delete sSingleton;
52
0
  sSingleton = nullptr;
53
0
}
54
55
JobStatus
56
JobScheduler::ProcessJob(Job* aJob)
57
0
{
58
0
  MOZ_ASSERT(aJob);
59
0
  auto status = aJob->Run();
60
0
  if (status == JobStatus::Error || status == JobStatus::Complete) {
61
0
    delete aJob;
62
0
  }
63
0
  return status;
64
0
}
65
66
void
67
JobScheduler::SubmitJob(Job* aJob)
68
0
{
69
0
  MOZ_ASSERT(aJob);
70
0
  RefPtr<SyncObject> start = aJob->GetStartSync();
71
0
  if (start && start->Register(aJob)) {
72
0
    // The Job buffer starts with a non-signaled sync object, it
73
0
    // is now registered in the list of task buffers waiting on the
74
0
    // sync object, so we should not place it in the queue.
75
0
    return;
76
0
  }
77
0
78
0
  GetQueueForJob(aJob)->SubmitJob(aJob);
79
0
}
80
81
void
82
JobScheduler::Join(SyncObject* aCompletion)
83
0
{
84
0
  RefPtr<EventObject> waitForCompletion = new EventObject();
85
0
  JobScheduler::SubmitJob(new SetEventJob(waitForCompletion, aCompletion));
86
0
  waitForCompletion->Wait();
87
0
}
88
89
MultiThreadedJobQueue*
90
JobScheduler::GetQueueForJob(Job* aJob)
91
0
{
92
0
  return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue()
93
0
                                    : GetDrawingQueue();
94
0
}
95
96
Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread)
97
: mNextWaitingJob(nullptr)
98
, mStartSync(aStart)
99
, mCompletionSync(aCompletion)
100
, mPinToThread(aThread)
101
0
{
102
0
  if (mStartSync) {
103
0
    mStartSync->AddSubsequent(this);
104
0
  }
105
0
  if (mCompletionSync) {
106
0
    mCompletionSync->AddPrerequisite(this);
107
0
  }
108
0
}
109
110
Job::~Job()
111
0
{
112
0
  if (mCompletionSync) {
113
0
    //printf(" -- Job %p dtor completion %p\n", this, mCompletionSync);
114
0
    mCompletionSync->Signal();
115
0
    mCompletionSync = nullptr;
116
0
  }
117
0
}
118
119
JobStatus
120
SetEventJob::Run()
121
0
{
122
0
  mEvent->Set();
123
0
  return JobStatus::Complete;
124
0
}
125
126
SetEventJob::SetEventJob(EventObject* aEvent,
127
                           SyncObject* aStart, SyncObject* aCompletion,
128
                           WorkerThread* aWorker)
129
: Job(aStart, aCompletion, aWorker)
130
, mEvent(aEvent)
131
0
{}
132
133
SetEventJob::~SetEventJob()
134
0
{}
135
136
SyncObject::SyncObject(uint32_t aNumPrerequisites)
137
: mSignals(aNumPrerequisites)
138
, mFirstWaitingJob(nullptr)
139
#ifdef DEBUG
140
, mNumPrerequisites(aNumPrerequisites)
141
, mAddedPrerequisites(0)
142
#endif
143
0
{}
144
145
SyncObject::~SyncObject()
146
0
{
147
0
  MOZ_ASSERT(mFirstWaitingJob == nullptr);
148
0
}
149
150
bool
151
SyncObject::Register(Job* aJob)
152
0
{
153
0
  MOZ_ASSERT(aJob);
154
0
155
0
  // For now, ensure that when we schedule the first subsequent, we have already
156
0
  // created all of the prerequisites. This is an arbitrary restriction because
157
0
  // we specify the number of prerequisites in the constructor, but in the typical
158
0
  // scenario, if the assertion FreezePrerequisite blows up here it probably means
159
0
  // we got the initial nmber of prerequisites wrong. We can decide to remove
160
0
  // this restriction if needed.
161
0
  FreezePrerequisites();
162
0
163
0
  int32_t signals = mSignals;
164
0
165
0
  if (signals > 0) {
166
0
    AddWaitingJob(aJob);
167
0
    // Since Register and Signal can be called concurrently, it can happen that
168
0
    // reading mSignals in Register happens before decrementing mSignals in Signal,
169
0
    // but SubmitWaitingJobs happens before AddWaitingJob. This ordering means
170
0
    // the SyncObject ends up in the signaled state with a task sitting in the
171
0
    // waiting list. To prevent that we check mSignals a second time and submit
172
0
    // again if signals reached zero in the mean time.
173
0
    // We do this instead of holding a mutex around mSignals+mJobs to reduce
174
0
    // lock contention.
175
0
    int32_t signals2 = mSignals;
176
0
    if (signals2 == 0) {
177
0
      SubmitWaitingJobs();
178
0
    }
179
0
    return true;
180
0
  }
181
0
182
0
  return false;
183
0
}
184
185
void
186
SyncObject::Signal()
187
0
{
188
0
  int32_t signals = --mSignals;
189
0
  MOZ_ASSERT(signals >= 0);
190
0
191
0
  if (signals == 0) {
192
0
    SubmitWaitingJobs();
193
0
  }
194
0
}
195
196
void
197
SyncObject::AddWaitingJob(Job* aJob)
198
0
{
199
0
  // Push (using atomics) the task into the list of waiting tasks.
200
0
  for (;;) {
201
0
    Job* first = mFirstWaitingJob;
202
0
    aJob->mNextWaitingJob = first;
203
0
    if (mFirstWaitingJob.compareExchange(first, aJob)) {
204
0
      break;
205
0
    }
206
0
  }
207
0
}
208
209
void SyncObject::SubmitWaitingJobs()
210
0
{
211
0
  // Scheduling the tasks can cause code that modifies <this>'s reference
212
0
  // count to run concurrently, and cause the caller of this function to
213
0
  // be owned by another thread. We need to make sure the reference count
214
0
  // does not reach 0 on another thread before the end of this method, so
215
0
  // hold a strong ref to prevent that!
216
0
  RefPtr<SyncObject> kungFuDeathGrip(this);
217
0
218
0
  // First atomically swap mFirstWaitingJob and waitingJobs...
219
0
  Job* waitingJobs = nullptr;
220
0
  for (;;) {
221
0
    waitingJobs = mFirstWaitingJob;
222
0
    if (mFirstWaitingJob.compareExchange(waitingJobs, nullptr)) {
223
0
      break;
224
0
    }
225
0
  }
226
0
227
0
  // ... and submit all of the waiting tasks in waitingJob now that they belong
228
0
  // to this thread.
229
0
  while (waitingJobs) {
230
0
    Job* next = waitingJobs->mNextWaitingJob;
231
0
    waitingJobs->mNextWaitingJob = nullptr;
232
0
    JobScheduler::GetQueueForJob(waitingJobs)->SubmitJob(waitingJobs);
233
0
    waitingJobs = next;
234
0
  }
235
0
}
236
237
bool
238
SyncObject::IsSignaled()
239
0
{
240
0
  return mSignals == 0;
241
0
}
242
243
void
244
SyncObject::FreezePrerequisites()
245
0
{
246
0
  MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites);
247
0
}
248
249
void
250
SyncObject::AddPrerequisite(Job* aJob)
251
0
{
252
0
  MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites);
253
0
}
254
255
void
256
SyncObject::AddSubsequent(Job* aJob)
257
0
{
258
0
}
259
260
WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
261
: mQueue(aJobQueue)
262
0
{
263
0
  aJobQueue->RegisterThread();
264
0
}
265
266
void
267
WorkerThread::Run()
268
0
{
269
0
  SetName("gfx worker");
270
0
271
0
  for (;;) {
272
0
    Job* commands = nullptr;
273
0
    if (!mQueue->WaitForJob(commands)) {
274
0
      mQueue->UnregisterThread();
275
0
      return;
276
0
    }
277
0
278
0
    JobStatus status = JobScheduler::ProcessJob(commands);
279
0
280
0
    if (status == JobStatus::Error) {
281
0
      // Don't try to handle errors for now, but that's open to discussions.
282
0
      // I expect errors to be mostly OOM issues.
283
0
      gfxDevCrash(LogReason::JobStatusError) << "Invalid job status " << (int)status;
284
0
    }
285
0
  }
286
0
}
287
288
} //namespace
289
} //namespace