/src/mozilla-central/gfx/2d/JobScheduler.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ |
2 | | /* vim: set ts=8 sts=2 et sw=2 tw=80: */ |
3 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
4 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
5 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
6 | | |
7 | | #include "JobScheduler.h" |
8 | | #include "Logging.h" |
9 | | |
10 | | namespace mozilla { |
11 | | namespace gfx { |
12 | | |
13 | | JobScheduler* JobScheduler::sSingleton = nullptr; |
14 | | |
15 | | bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues) |
16 | 0 | { |
17 | 0 | MOZ_ASSERT(!sSingleton); |
18 | 0 | MOZ_ASSERT(aNumThreads >= aNumQueues); |
19 | 0 |
|
20 | 0 | sSingleton = new JobScheduler(); |
21 | 0 | sSingleton->mNextQueue = 0; |
22 | 0 |
|
23 | 0 | for (uint32_t i = 0; i < aNumQueues; ++i) { |
24 | 0 | sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue()); |
25 | 0 | } |
26 | 0 |
|
27 | 0 | for (uint32_t i = 0; i < aNumThreads; ++i) { |
28 | 0 | sSingleton->mWorkerThreads.push_back(WorkerThread::Create(sSingleton->mDrawingQueues[i%aNumQueues])); |
29 | 0 | } |
30 | 0 | return true; |
31 | 0 | } |
32 | | |
33 | | void JobScheduler::ShutDown() |
34 | 0 | { |
35 | 0 | MOZ_ASSERT(IsEnabled()); |
36 | 0 | if (!IsEnabled()) { |
37 | 0 | return; |
38 | 0 | } |
39 | 0 | |
40 | 0 | for (auto queue : sSingleton->mDrawingQueues) { |
41 | 0 | queue->ShutDown(); |
42 | 0 | delete queue; |
43 | 0 | } |
44 | 0 |
|
45 | 0 | for (WorkerThread* thread : sSingleton->mWorkerThreads) { |
46 | 0 | // this will block until the thread is joined. |
47 | 0 | delete thread; |
48 | 0 | } |
49 | 0 |
|
50 | 0 | sSingleton->mWorkerThreads.clear(); |
51 | 0 | delete sSingleton; |
52 | 0 | sSingleton = nullptr; |
53 | 0 | } |
54 | | |
55 | | JobStatus |
56 | | JobScheduler::ProcessJob(Job* aJob) |
57 | 0 | { |
58 | 0 | MOZ_ASSERT(aJob); |
59 | 0 | auto status = aJob->Run(); |
60 | 0 | if (status == JobStatus::Error || status == JobStatus::Complete) { |
61 | 0 | delete aJob; |
62 | 0 | } |
63 | 0 | return status; |
64 | 0 | } |
65 | | |
66 | | void |
67 | | JobScheduler::SubmitJob(Job* aJob) |
68 | 0 | { |
69 | 0 | MOZ_ASSERT(aJob); |
70 | 0 | RefPtr<SyncObject> start = aJob->GetStartSync(); |
71 | 0 | if (start && start->Register(aJob)) { |
72 | 0 | // The Job buffer starts with a non-signaled sync object, it |
73 | 0 | // is now registered in the list of task buffers waiting on the |
74 | 0 | // sync object, so we should not place it in the queue. |
75 | 0 | return; |
76 | 0 | } |
77 | 0 | |
78 | 0 | GetQueueForJob(aJob)->SubmitJob(aJob); |
79 | 0 | } |
80 | | |
81 | | void |
82 | | JobScheduler::Join(SyncObject* aCompletion) |
83 | 0 | { |
84 | 0 | RefPtr<EventObject> waitForCompletion = new EventObject(); |
85 | 0 | JobScheduler::SubmitJob(new SetEventJob(waitForCompletion, aCompletion)); |
86 | 0 | waitForCompletion->Wait(); |
87 | 0 | } |
88 | | |
89 | | MultiThreadedJobQueue* |
90 | | JobScheduler::GetQueueForJob(Job* aJob) |
91 | 0 | { |
92 | 0 | return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue() |
93 | 0 | : GetDrawingQueue(); |
94 | 0 | } |
95 | | |
96 | | Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread) |
97 | | : mNextWaitingJob(nullptr) |
98 | | , mStartSync(aStart) |
99 | | , mCompletionSync(aCompletion) |
100 | | , mPinToThread(aThread) |
101 | 0 | { |
102 | 0 | if (mStartSync) { |
103 | 0 | mStartSync->AddSubsequent(this); |
104 | 0 | } |
105 | 0 | if (mCompletionSync) { |
106 | 0 | mCompletionSync->AddPrerequisite(this); |
107 | 0 | } |
108 | 0 | } |
109 | | |
110 | | Job::~Job() |
111 | 0 | { |
112 | 0 | if (mCompletionSync) { |
113 | 0 | //printf(" -- Job %p dtor completion %p\n", this, mCompletionSync); |
114 | 0 | mCompletionSync->Signal(); |
115 | 0 | mCompletionSync = nullptr; |
116 | 0 | } |
117 | 0 | } |
118 | | |
119 | | JobStatus |
120 | | SetEventJob::Run() |
121 | 0 | { |
122 | 0 | mEvent->Set(); |
123 | 0 | return JobStatus::Complete; |
124 | 0 | } |
125 | | |
126 | | SetEventJob::SetEventJob(EventObject* aEvent, |
127 | | SyncObject* aStart, SyncObject* aCompletion, |
128 | | WorkerThread* aWorker) |
129 | | : Job(aStart, aCompletion, aWorker) |
130 | | , mEvent(aEvent) |
131 | 0 | {} |
132 | | |
133 | | SetEventJob::~SetEventJob() |
134 | 0 | {} |
135 | | |
136 | | SyncObject::SyncObject(uint32_t aNumPrerequisites) |
137 | | : mSignals(aNumPrerequisites) |
138 | | , mFirstWaitingJob(nullptr) |
139 | | #ifdef DEBUG |
140 | | , mNumPrerequisites(aNumPrerequisites) |
141 | | , mAddedPrerequisites(0) |
142 | | #endif |
143 | 0 | {} |
144 | | |
145 | | SyncObject::~SyncObject() |
146 | 0 | { |
147 | 0 | MOZ_ASSERT(mFirstWaitingJob == nullptr); |
148 | 0 | } |
149 | | |
150 | | bool |
151 | | SyncObject::Register(Job* aJob) |
152 | 0 | { |
153 | 0 | MOZ_ASSERT(aJob); |
154 | 0 |
|
155 | 0 | // For now, ensure that when we schedule the first subsequent, we have already |
156 | 0 | // created all of the prerequisites. This is an arbitrary restriction because |
157 | 0 | // we specify the number of prerequisites in the constructor, but in the typical |
158 | 0 | // scenario, if the assertion FreezePrerequisite blows up here it probably means |
159 | 0 | // we got the initial nmber of prerequisites wrong. We can decide to remove |
160 | 0 | // this restriction if needed. |
161 | 0 | FreezePrerequisites(); |
162 | 0 |
|
163 | 0 | int32_t signals = mSignals; |
164 | 0 |
|
165 | 0 | if (signals > 0) { |
166 | 0 | AddWaitingJob(aJob); |
167 | 0 | // Since Register and Signal can be called concurrently, it can happen that |
168 | 0 | // reading mSignals in Register happens before decrementing mSignals in Signal, |
169 | 0 | // but SubmitWaitingJobs happens before AddWaitingJob. This ordering means |
170 | 0 | // the SyncObject ends up in the signaled state with a task sitting in the |
171 | 0 | // waiting list. To prevent that we check mSignals a second time and submit |
172 | 0 | // again if signals reached zero in the mean time. |
173 | 0 | // We do this instead of holding a mutex around mSignals+mJobs to reduce |
174 | 0 | // lock contention. |
175 | 0 | int32_t signals2 = mSignals; |
176 | 0 | if (signals2 == 0) { |
177 | 0 | SubmitWaitingJobs(); |
178 | 0 | } |
179 | 0 | return true; |
180 | 0 | } |
181 | 0 |
|
182 | 0 | return false; |
183 | 0 | } |
184 | | |
185 | | void |
186 | | SyncObject::Signal() |
187 | 0 | { |
188 | 0 | int32_t signals = --mSignals; |
189 | 0 | MOZ_ASSERT(signals >= 0); |
190 | 0 |
|
191 | 0 | if (signals == 0) { |
192 | 0 | SubmitWaitingJobs(); |
193 | 0 | } |
194 | 0 | } |
195 | | |
196 | | void |
197 | | SyncObject::AddWaitingJob(Job* aJob) |
198 | 0 | { |
199 | 0 | // Push (using atomics) the task into the list of waiting tasks. |
200 | 0 | for (;;) { |
201 | 0 | Job* first = mFirstWaitingJob; |
202 | 0 | aJob->mNextWaitingJob = first; |
203 | 0 | if (mFirstWaitingJob.compareExchange(first, aJob)) { |
204 | 0 | break; |
205 | 0 | } |
206 | 0 | } |
207 | 0 | } |
208 | | |
209 | | void SyncObject::SubmitWaitingJobs() |
210 | 0 | { |
211 | 0 | // Scheduling the tasks can cause code that modifies <this>'s reference |
212 | 0 | // count to run concurrently, and cause the caller of this function to |
213 | 0 | // be owned by another thread. We need to make sure the reference count |
214 | 0 | // does not reach 0 on another thread before the end of this method, so |
215 | 0 | // hold a strong ref to prevent that! |
216 | 0 | RefPtr<SyncObject> kungFuDeathGrip(this); |
217 | 0 |
|
218 | 0 | // First atomically swap mFirstWaitingJob and waitingJobs... |
219 | 0 | Job* waitingJobs = nullptr; |
220 | 0 | for (;;) { |
221 | 0 | waitingJobs = mFirstWaitingJob; |
222 | 0 | if (mFirstWaitingJob.compareExchange(waitingJobs, nullptr)) { |
223 | 0 | break; |
224 | 0 | } |
225 | 0 | } |
226 | 0 |
|
227 | 0 | // ... and submit all of the waiting tasks in waitingJob now that they belong |
228 | 0 | // to this thread. |
229 | 0 | while (waitingJobs) { |
230 | 0 | Job* next = waitingJobs->mNextWaitingJob; |
231 | 0 | waitingJobs->mNextWaitingJob = nullptr; |
232 | 0 | JobScheduler::GetQueueForJob(waitingJobs)->SubmitJob(waitingJobs); |
233 | 0 | waitingJobs = next; |
234 | 0 | } |
235 | 0 | } |
236 | | |
237 | | bool |
238 | | SyncObject::IsSignaled() |
239 | 0 | { |
240 | 0 | return mSignals == 0; |
241 | 0 | } |
242 | | |
243 | | void |
244 | | SyncObject::FreezePrerequisites() |
245 | 0 | { |
246 | 0 | MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites); |
247 | 0 | } |
248 | | |
249 | | void |
250 | | SyncObject::AddPrerequisite(Job* aJob) |
251 | 0 | { |
252 | 0 | MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites); |
253 | 0 | } |
254 | | |
255 | | void |
256 | | SyncObject::AddSubsequent(Job* aJob) |
257 | 0 | { |
258 | 0 | } |
259 | | |
260 | | WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue) |
261 | | : mQueue(aJobQueue) |
262 | 0 | { |
263 | 0 | aJobQueue->RegisterThread(); |
264 | 0 | } |
265 | | |
266 | | void |
267 | | WorkerThread::Run() |
268 | 0 | { |
269 | 0 | SetName("gfx worker"); |
270 | 0 |
|
271 | 0 | for (;;) { |
272 | 0 | Job* commands = nullptr; |
273 | 0 | if (!mQueue->WaitForJob(commands)) { |
274 | 0 | mQueue->UnregisterThread(); |
275 | 0 | return; |
276 | 0 | } |
277 | 0 | |
278 | 0 | JobStatus status = JobScheduler::ProcessJob(commands); |
279 | 0 |
|
280 | 0 | if (status == JobStatus::Error) { |
281 | 0 | // Don't try to handle errors for now, but that's open to discussions. |
282 | 0 | // I expect errors to be mostly OOM issues. |
283 | 0 | gfxDevCrash(LogReason::JobStatusError) << "Invalid job status " << (int)status; |
284 | 0 | } |
285 | 0 | } |
286 | 0 | } |
287 | | |
288 | | } //namespace |
289 | | } //namespace |