/src/freeimage-svn/FreeImage/trunk/Source/OpenEXR/IlmThread/IlmThreadPool.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /////////////////////////////////////////////////////////////////////////// |
2 | | // |
3 | | // Copyright (c) 2005-2012, Industrial Light & Magic, a division of Lucas |
4 | | // Digital Ltd. LLC |
5 | | // |
6 | | // All rights reserved. |
7 | | // |
8 | | // Redistribution and use in source and binary forms, with or without |
9 | | // modification, are permitted provided that the following conditions are |
10 | | // met: |
11 | | // * Redistributions of source code must retain the above copyright |
12 | | // notice, this list of conditions and the following disclaimer. |
13 | | // * Redistributions in binary form must reproduce the above |
14 | | // copyright notice, this list of conditions and the following disclaimer |
15 | | // in the documentation and/or other materials provided with the |
16 | | // distribution. |
17 | | // * Neither the name of Industrial Light & Magic nor the names of |
18 | | // its contributors may be used to endorse or promote products derived |
19 | | // from this software without specific prior written permission. |
20 | | // |
21 | | // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
22 | | // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
23 | | // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
24 | | // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
25 | | // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
26 | | // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
27 | | // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
28 | | // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
29 | | // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
30 | | // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
31 | | // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
32 | | // |
33 | | /////////////////////////////////////////////////////////////////////////// |
34 | | |
35 | | //----------------------------------------------------------------------------- |
36 | | // |
37 | | // class Task, class ThreadPool, class TaskGroup |
38 | | // |
39 | | //----------------------------------------------------------------------------- |
40 | | |
41 | | #include "IlmThread.h" |
42 | | #include "IlmThreadMutex.h" |
43 | | #include "IlmThreadSemaphore.h" |
44 | | #include "IlmThreadPool.h" |
45 | | #include "Iex.h" |
46 | | #include <list> |
47 | | |
48 | | using namespace std; |
49 | | |
50 | | ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER |
51 | | namespace { |
52 | | |
53 | | class WorkerThread: public Thread |
54 | | { |
55 | | public: |
56 | | |
57 | | WorkerThread (ThreadPool::Data* data); |
58 | | |
59 | | virtual void run (); |
60 | | |
61 | | private: |
62 | | |
63 | | ThreadPool::Data * _data; |
64 | | }; |
65 | | |
66 | | } //namespace |
67 | | |
68 | | |
69 | | struct TaskGroup::Data |
70 | | { |
71 | | Data (); |
72 | | ~Data (); |
73 | | |
74 | | void addTask () ; |
75 | | void removeTask (); |
76 | | |
77 | | Semaphore isEmpty; // used to signal that the taskgroup is empty |
78 | | int numPending; // number of pending tasks to still execute |
79 | | Mutex dtorMutex; // used to work around the glibc bug: |
80 | | // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 |
81 | | }; |
82 | | |
83 | | |
84 | | struct ThreadPool::Data |
85 | | { |
86 | | Data (); |
87 | | ~Data(); |
88 | | |
89 | | void finish (); |
90 | | bool stopped () const; |
91 | | void stop (); |
92 | | |
93 | | Semaphore taskSemaphore; // threads wait on this for ready tasks |
94 | | Mutex taskMutex; // mutual exclusion for the tasks list |
95 | | list<Task*> tasks; // the list of tasks to execute |
96 | | size_t numTasks; // fast access to list size |
97 | | // (list::size() can be O(n)) |
98 | | |
99 | | Semaphore threadSemaphore; // signaled when a thread starts executing |
100 | | Mutex threadMutex; // mutual exclusion for threads list |
101 | | list<WorkerThread*> threads; // the list of all threads |
102 | | size_t numThreads; // fast access to list size |
103 | | |
104 | | bool stopping; // flag indicating whether to stop threads |
105 | | Mutex stopMutex; // mutual exclusion for stopping flag |
106 | | }; |
107 | | |
108 | | |
109 | | |
110 | | // |
111 | | // class WorkerThread |
112 | | // |
113 | | |
114 | | WorkerThread::WorkerThread (ThreadPool::Data* data): |
115 | | _data (data) |
116 | 0 | { |
117 | 0 | start(); |
118 | 0 | } |
119 | | |
120 | | |
121 | | void |
122 | | WorkerThread::run () |
123 | 0 | { |
124 | | // |
125 | | // Signal that the thread has started executing |
126 | | // |
127 | |
|
128 | 0 | _data->threadSemaphore.post(); |
129 | |
|
130 | 0 | while (true) |
131 | 0 | { |
132 | | // |
133 | | // Wait for a task to become available |
134 | | // |
135 | |
|
136 | 0 | _data->taskSemaphore.wait(); |
137 | |
|
138 | 0 | { |
139 | 0 | Lock taskLock (_data->taskMutex); |
140 | | |
141 | | // |
142 | | // If there is a task pending, pop off the next task in the FIFO |
143 | | // |
144 | |
|
145 | 0 | if (_data->numTasks > 0) |
146 | 0 | { |
147 | 0 | Task* task = _data->tasks.front(); |
148 | 0 | TaskGroup* taskGroup = task->group(); |
149 | 0 | _data->tasks.pop_front(); |
150 | 0 | _data->numTasks--; |
151 | |
|
152 | 0 | taskLock.release(); |
153 | 0 | task->execute(); |
154 | 0 | taskLock.acquire(); |
155 | |
|
156 | 0 | delete task; |
157 | 0 | taskGroup->_data->removeTask(); |
158 | 0 | } |
159 | 0 | else if (_data->stopped()) |
160 | 0 | { |
161 | 0 | break; |
162 | 0 | } |
163 | 0 | } |
164 | 0 | } |
165 | 0 | } |
166 | | |
167 | | |
168 | | // |
169 | | // struct TaskGroup::Data |
170 | | // |
171 | | |
172 | | TaskGroup::Data::Data (): isEmpty (1), numPending (0) |
173 | 0 | { |
174 | | // empty |
175 | 0 | } |
176 | | |
177 | | |
178 | | TaskGroup::Data::~Data () |
179 | 0 | { |
180 | | // |
181 | | // A TaskGroup acts like an "inverted" semaphore: if the count |
182 | | // is above 0 then waiting on the taskgroup will block. This |
183 | | // destructor waits until the taskgroup is empty before returning. |
184 | | // |
185 | |
|
186 | 0 | isEmpty.wait (); |
187 | | |
188 | | // Alas, given the current bug in glibc we need a secondary |
189 | | // syncronisation primitive here to account for the fact that |
190 | | // destructing the isEmpty Semaphore in this thread can cause |
191 | | // an error for a separate thread that is issuing the post() call. |
192 | | // We are entitled to destruct the semaphore at this point, however, |
193 | | // that post() call attempts to access data out of the associated |
194 | | // memory *after* it has woken the waiting threads, including this one, |
195 | | // potentially leading to invalid memory reads. |
196 | | // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 |
197 | |
|
198 | 0 | Lock lock (dtorMutex); |
199 | 0 | } |
200 | | |
201 | | |
202 | | void |
203 | | TaskGroup::Data::addTask () |
204 | 0 | { |
205 | | // |
206 | | // Any access to the taskgroup is protected by a mutex that is |
207 | | // held by the threadpool. Therefore it is safe to access |
208 | | // numPending before we wait on the semaphore. |
209 | | // |
210 | |
|
211 | 0 | if (numPending++ == 0) |
212 | 0 | isEmpty.wait (); |
213 | 0 | } |
214 | | |
215 | | |
216 | | void |
217 | | TaskGroup::Data::removeTask () |
218 | 0 | { |
219 | | // Alas, given the current bug in glibc we need a secondary |
220 | | // syncronisation primitive here to account for the fact that |
221 | | // destructing the isEmpty Semaphore in a separate thread can |
222 | | // cause an error. Issuing the post call here the current libc |
223 | | // implementation attempts to access memory *after* it has woken |
224 | | // waiting threads. |
225 | | // Since other threads are entitled to delete the semaphore the |
226 | | // access to the memory location can be invalid. |
227 | | // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 |
228 | |
|
229 | 0 | if (--numPending == 0) |
230 | 0 | { |
231 | 0 | Lock lock (dtorMutex); |
232 | 0 | isEmpty.post (); |
233 | 0 | } |
234 | 0 | } |
235 | | |
236 | | |
237 | | // |
238 | | // struct ThreadPool::Data |
239 | | // |
240 | | |
241 | | ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false) |
242 | 0 | { |
243 | | // empty |
244 | 0 | } |
245 | | |
246 | | |
247 | | ThreadPool::Data::~Data() |
248 | 0 | { |
249 | 0 | Lock lock (threadMutex); |
250 | 0 | finish (); |
251 | 0 | } |
252 | | |
253 | | |
254 | | void |
255 | | ThreadPool::Data::finish () |
256 | 0 | { |
257 | 0 | stop(); |
258 | | |
259 | | // |
260 | | // Signal enough times to allow all threads to stop. |
261 | | // |
262 | | // Wait until all threads have started their run functions. |
263 | | // If we do not wait before we destroy the threads then it's |
264 | | // possible that the threads have not yet called their run |
265 | | // functions. |
266 | | // If this happens then the run function will be called off |
267 | | // of an invalid object and we will crash, most likely with |
268 | | // an error like: "pure virtual method called" |
269 | | // |
270 | |
|
271 | 0 | for (size_t i = 0; i < numThreads; i++) |
272 | 0 | { |
273 | 0 | taskSemaphore.post(); |
274 | 0 | threadSemaphore.wait(); |
275 | 0 | } |
276 | | |
277 | | // |
278 | | // Join all the threads |
279 | | // |
280 | |
|
281 | 0 | for (list<WorkerThread*>::iterator i = threads.begin(); |
282 | 0 | i != threads.end(); |
283 | 0 | ++i) |
284 | 0 | { |
285 | 0 | delete (*i); |
286 | 0 | } |
287 | |
|
288 | 0 | Lock lock1 (taskMutex); |
289 | 0 | Lock lock2 (stopMutex); |
290 | 0 | threads.clear(); |
291 | 0 | tasks.clear(); |
292 | 0 | numThreads = 0; |
293 | 0 | numTasks = 0; |
294 | 0 | stopping = false; |
295 | 0 | } |
296 | | |
297 | | |
298 | | bool |
299 | | ThreadPool::Data::stopped () const |
300 | 0 | { |
301 | 0 | Lock lock (stopMutex); |
302 | 0 | return stopping; |
303 | 0 | } |
304 | | |
305 | | |
306 | | void |
307 | | ThreadPool::Data::stop () |
308 | 0 | { |
309 | 0 | Lock lock (stopMutex); |
310 | 0 | stopping = true; |
311 | 0 | } |
312 | | |
313 | | |
314 | | // |
315 | | // class Task |
316 | | // |
317 | | |
318 | | Task::Task (TaskGroup* g): _group(g) |
319 | 0 | { |
320 | | // empty |
321 | 0 | } |
322 | | |
323 | | |
324 | | Task::~Task() |
325 | 0 | { |
326 | | // empty |
327 | 0 | } |
328 | | |
329 | | |
330 | | TaskGroup* |
331 | | Task::group () |
332 | 0 | { |
333 | 0 | return _group; |
334 | 0 | } |
335 | | |
336 | | |
337 | | TaskGroup::TaskGroup (): |
338 | | _data (new Data()) |
339 | 0 | { |
340 | | // empty |
341 | 0 | } |
342 | | |
343 | | |
344 | | TaskGroup::~TaskGroup () |
345 | 0 | { |
346 | 0 | delete _data; |
347 | 0 | } |
348 | | |
349 | | |
350 | | // |
351 | | // class ThreadPool |
352 | | // |
353 | | |
354 | | ThreadPool::ThreadPool (unsigned nthreads): |
355 | | _data (new Data()) |
356 | 0 | { |
357 | 0 | setNumThreads (nthreads); |
358 | 0 | } |
359 | | |
360 | | |
361 | | ThreadPool::~ThreadPool () |
362 | 0 | { |
363 | 0 | delete _data; |
364 | 0 | } |
365 | | |
366 | | |
367 | | int |
368 | | ThreadPool::numThreads () const |
369 | 0 | { |
370 | 0 | Lock lock (_data->threadMutex); |
371 | 0 | return _data->numThreads; |
372 | 0 | } |
373 | | |
374 | | |
375 | | void |
376 | | ThreadPool::setNumThreads (int count) |
377 | 0 | { |
378 | 0 | if (count < 0) |
379 | 0 | throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads " |
380 | 0 | "in a thread pool to a negative value."); |
381 | | |
382 | | // |
383 | | // Lock access to thread list and size |
384 | | // |
385 | | |
386 | 0 | Lock lock (_data->threadMutex); |
387 | |
|
388 | 0 | if ((size_t)count > _data->numThreads) |
389 | 0 | { |
390 | | // |
391 | | // Add more threads |
392 | | // |
393 | |
|
394 | 0 | while (_data->numThreads < (size_t)count) |
395 | 0 | { |
396 | 0 | _data->threads.push_back (new WorkerThread (_data)); |
397 | 0 | _data->numThreads++; |
398 | 0 | } |
399 | 0 | } |
400 | 0 | else if ((size_t)count < _data->numThreads) |
401 | 0 | { |
402 | | // |
403 | | // Wait until all existing threads are finished processing, |
404 | | // then delete all threads. |
405 | | // |
406 | |
|
407 | 0 | _data->finish (); |
408 | | |
409 | | // |
410 | | // Add in new threads |
411 | | // |
412 | |
|
413 | 0 | while (_data->numThreads < (size_t)count) |
414 | 0 | { |
415 | 0 | _data->threads.push_back (new WorkerThread (_data)); |
416 | 0 | _data->numThreads++; |
417 | 0 | } |
418 | 0 | } |
419 | 0 | } |
420 | | |
421 | | |
422 | | void |
423 | | ThreadPool::addTask (Task* task) |
424 | 0 | { |
425 | | // |
426 | | // Lock the threads, needed to access numThreads |
427 | | // |
428 | |
|
429 | 0 | Lock lock (_data->threadMutex); |
430 | |
|
431 | 0 | if (_data->numThreads == 0) |
432 | 0 | { |
433 | 0 | task->execute (); |
434 | 0 | delete task; |
435 | 0 | } |
436 | 0 | else |
437 | 0 | { |
438 | | // |
439 | | // Get exclusive access to the tasks queue |
440 | | // |
441 | |
|
442 | 0 | { |
443 | 0 | Lock taskLock (_data->taskMutex); |
444 | | |
445 | | // |
446 | | // Push the new task into the FIFO |
447 | | // |
448 | |
|
449 | 0 | _data->tasks.push_back (task); |
450 | 0 | _data->numTasks++; |
451 | 0 | task->group()->_data->addTask(); |
452 | 0 | } |
453 | | |
454 | | // |
455 | | // Signal that we have a new task to process |
456 | | // |
457 | |
|
458 | 0 | _data->taskSemaphore.post (); |
459 | 0 | } |
460 | 0 | } |
461 | | |
462 | | |
463 | | ThreadPool& |
464 | | ThreadPool::globalThreadPool () |
465 | 0 | { |
466 | | // |
467 | | // The global thread pool |
468 | | // |
469 | | |
470 | 0 | static ThreadPool gThreadPool (0); |
471 | |
|
472 | 0 | return gThreadPool; |
473 | 0 | } |
474 | | |
475 | | |
476 | | void |
477 | | ThreadPool::addGlobalTask (Task* task) |
478 | 0 | { |
479 | 0 | globalThreadPool().addTask (task); |
480 | 0 | } |
481 | | |
482 | | |
483 | | ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT |