Coverage Report

Created: 2023-12-08 06:53

/src/freeimage-svn/FreeImage/trunk/Source/OpenEXR/IlmThread/IlmThreadPool.cpp
Line
Count
Source (jump to first uncovered line)
1
///////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright (c) 2005-2012, Industrial Light & Magic, a division of Lucas
4
// Digital Ltd. LLC
5
// 
6
// All rights reserved.
7
// 
8
// Redistribution and use in source and binary forms, with or without
9
// modification, are permitted provided that the following conditions are
10
// met:
11
// *       Redistributions of source code must retain the above copyright
12
// notice, this list of conditions and the following disclaimer.
13
// *       Redistributions in binary form must reproduce the above
14
// copyright notice, this list of conditions and the following disclaimer
15
// in the documentation and/or other materials provided with the
16
// distribution.
17
// *       Neither the name of Industrial Light & Magic nor the names of
18
// its contributors may be used to endorse or promote products derived
19
// from this software without specific prior written permission. 
20
// 
21
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32
//
33
///////////////////////////////////////////////////////////////////////////
34
35
//-----------------------------------------------------------------------------
36
//
37
//  class Task, class ThreadPool, class TaskGroup
38
//
39
//-----------------------------------------------------------------------------
40
41
#include "IlmThread.h"
42
#include "IlmThreadMutex.h"
43
#include "IlmThreadSemaphore.h"
44
#include "IlmThreadPool.h"
45
#include "Iex.h"
46
#include <list>
47
48
using namespace std;
49
50
ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
51
namespace {
52
53
class WorkerThread: public Thread
54
{
55
  public:
56
57
    WorkerThread (ThreadPool::Data* data);
58
59
    virtual void  run ();
60
    
61
  private:
62
63
    ThreadPool::Data *  _data;
64
};
65
66
} //namespace
67
68
69
struct TaskGroup::Data
70
{
71
     Data ();
72
    ~Data ();
73
    
74
    void  addTask () ;
75
    void  removeTask ();
76
    
77
    Semaphore isEmpty;        // used to signal that the taskgroup is empty
78
    int         numPending;     // number of pending tasks to still execute
79
    Mutex       dtorMutex;      // used to work around the glibc bug:
80
                                // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
81
};
82
83
84
struct ThreadPool::Data
85
{
86
     Data ();
87
    ~Data();
88
    
89
    void  finish ();
90
    bool  stopped () const;
91
    void  stop ();
92
93
    Semaphore taskSemaphore;        // threads wait on this for ready tasks
94
    Mutex taskMutex;                // mutual exclusion for the tasks list
95
    list<Task*> tasks;              // the list of tasks to execute
96
    size_t numTasks;                // fast access to list size
97
                                    //   (list::size() can be O(n))
98
99
    Semaphore threadSemaphore;      // signaled when a thread starts executing
100
    Mutex threadMutex;              // mutual exclusion for threads list
101
    list<WorkerThread*> threads;    // the list of all threads
102
    size_t numThreads;              // fast access to list size
103
    
104
    bool stopping;                  // flag indicating whether to stop threads
105
    Mutex stopMutex;                // mutual exclusion for stopping flag
106
};
107
108
109
110
//
111
// class WorkerThread
112
//
113
114
WorkerThread::WorkerThread (ThreadPool::Data* data):
115
    _data (data)
116
0
{
117
0
    start();
118
0
}
119
120
121
void
122
WorkerThread::run ()
123
0
{
124
    //
125
    // Signal that the thread has started executing
126
    //
127
128
0
    _data->threadSemaphore.post();
129
130
0
    while (true)
131
0
    {
132
  //
133
        // Wait for a task to become available
134
  //
135
136
0
        _data->taskSemaphore.wait();
137
138
0
        {
139
0
            Lock taskLock (_data->taskMutex);
140
    
141
      //
142
            // If there is a task pending, pop off the next task in the FIFO
143
      //
144
145
0
            if (_data->numTasks > 0)
146
0
            {
147
0
                Task* task = _data->tasks.front();
148
0
    TaskGroup* taskGroup = task->group();
149
0
                _data->tasks.pop_front();
150
0
                _data->numTasks--;
151
152
0
                taskLock.release();
153
0
                task->execute();
154
0
                taskLock.acquire();
155
156
0
                delete task;
157
0
                taskGroup->_data->removeTask();
158
0
            }
159
0
            else if (_data->stopped())
160
0
      {
161
0
                break;
162
0
      }
163
0
        }
164
0
    }
165
0
}
166
167
168
//
169
// struct TaskGroup::Data
170
//
171
172
TaskGroup::Data::Data (): isEmpty (1), numPending (0)
173
0
{
174
    // empty
175
0
}
176
177
178
TaskGroup::Data::~Data ()
179
0
{
180
    //
181
    // A TaskGroup acts like an "inverted" semaphore: if the count
182
    // is above 0 then waiting on the taskgroup will block.  This
183
    // destructor waits until the taskgroup is empty before returning.
184
    //
185
186
0
    isEmpty.wait ();
187
188
    // Alas, given the current bug in glibc we need a secondary
189
    // syncronisation primitive here to account for the fact that
190
    // destructing the isEmpty Semaphore in this thread can cause
191
    // an error for a separate thread that is issuing the post() call.
192
    // We are entitled to destruct the semaphore at this point, however,
193
    // that post() call attempts to access data out of the associated
194
    // memory *after* it has woken the waiting threads, including this one,
195
    // potentially leading to invalid memory reads.
196
    // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
197
198
0
    Lock lock (dtorMutex);
199
0
}
200
201
202
void
203
TaskGroup::Data::addTask () 
204
0
{
205
    //
206
    // Any access to the taskgroup is protected by a mutex that is
207
    // held by the threadpool.  Therefore it is safe to access
208
    // numPending before we wait on the semaphore.
209
    //
210
211
0
    if (numPending++ == 0)
212
0
  isEmpty.wait ();
213
0
}
214
215
216
void
217
TaskGroup::Data::removeTask ()
218
0
{
219
    // Alas, given the current bug in glibc we need a secondary
220
    // syncronisation primitive here to account for the fact that
221
    // destructing the isEmpty Semaphore in a separate thread can
222
    // cause an error. Issuing the post call here the current libc
223
    // implementation attempts to access memory *after* it has woken
224
    // waiting threads.
225
    // Since other threads are entitled to delete the semaphore the
226
    // access to the memory location can be invalid.
227
    // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
228
229
0
    if (--numPending == 0)
230
0
    {
231
0
        Lock lock (dtorMutex);
232
0
        isEmpty.post ();
233
0
    }
234
0
}
235
    
236
237
//
238
// struct ThreadPool::Data
239
//
240
241
ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false)
242
0
{
243
    // empty
244
0
}
245
246
247
ThreadPool::Data::~Data()
248
0
{
249
0
    Lock lock (threadMutex);
250
0
    finish ();
251
0
}
252
253
254
void
255
ThreadPool::Data::finish ()
256
0
{
257
0
    stop();
258
259
    //
260
    // Signal enough times to allow all threads to stop.
261
    //
262
    // Wait until all threads have started their run functions.
263
    // If we do not wait before we destroy the threads then it's
264
    // possible that the threads have not yet called their run
265
    // functions.
266
    // If this happens then the run function will be called off
267
    // of an invalid object and we will crash, most likely with
268
    // an error like: "pure virtual method called"
269
    //
270
271
0
    for (size_t i = 0; i < numThreads; i++)
272
0
    {
273
0
  taskSemaphore.post();
274
0
  threadSemaphore.wait();
275
0
    }
276
277
    //
278
    // Join all the threads
279
    //
280
281
0
    for (list<WorkerThread*>::iterator i = threads.begin();
282
0
   i != threads.end();
283
0
   ++i)
284
0
    {
285
0
  delete (*i);
286
0
    }
287
288
0
    Lock lock1 (taskMutex);
289
0
    Lock lock2 (stopMutex);
290
0
    threads.clear();
291
0
    tasks.clear();
292
0
    numThreads = 0;
293
0
    numTasks = 0;
294
0
    stopping = false;
295
0
}
296
297
298
bool
299
ThreadPool::Data::stopped () const
300
0
{
301
0
    Lock lock (stopMutex);
302
0
    return stopping;
303
0
}
304
305
306
void
307
ThreadPool::Data::stop ()
308
0
{
309
0
    Lock lock (stopMutex);
310
0
    stopping = true;
311
0
}
312
313
314
//
315
// class Task
316
//
317
318
Task::Task (TaskGroup* g): _group(g)
319
0
{
320
    // empty
321
0
}
322
323
324
Task::~Task()
325
0
{
326
    // empty
327
0
}
328
329
330
TaskGroup*
331
Task::group ()
332
0
{
333
0
    return _group;
334
0
}
335
336
337
TaskGroup::TaskGroup ():
338
    _data (new Data())
339
0
{
340
    // empty
341
0
}
342
343
344
TaskGroup::~TaskGroup ()
345
0
{
346
0
    delete _data;
347
0
}
348
349
350
//
351
// class ThreadPool
352
//
353
354
ThreadPool::ThreadPool (unsigned nthreads):
355
    _data (new Data())
356
0
{
357
0
    setNumThreads (nthreads);
358
0
}
359
360
361
ThreadPool::~ThreadPool ()
362
0
{
363
0
    delete _data;
364
0
}
365
366
367
int
368
ThreadPool::numThreads () const
369
0
{
370
0
    Lock lock (_data->threadMutex);
371
0
    return _data->numThreads;
372
0
}
373
374
375
void
376
ThreadPool::setNumThreads (int count)
377
0
{
378
0
    if (count < 0)
379
0
        throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads "
380
0
         "in a thread pool to a negative value.");
381
382
    //
383
    // Lock access to thread list and size
384
    //
385
386
0
    Lock lock (_data->threadMutex);
387
388
0
    if ((size_t)count > _data->numThreads)
389
0
    {
390
  //
391
        // Add more threads
392
  //
393
394
0
        while (_data->numThreads < (size_t)count)
395
0
        {
396
0
            _data->threads.push_back (new WorkerThread (_data));
397
0
            _data->numThreads++;
398
0
        }
399
0
    }
400
0
    else if ((size_t)count < _data->numThreads)
401
0
    {
402
  //
403
  // Wait until all existing threads are finished processing,
404
  // then delete all threads.
405
  //
406
407
0
        _data->finish ();
408
409
  //
410
        // Add in new threads
411
  //
412
413
0
        while (_data->numThreads < (size_t)count)
414
0
        {
415
0
            _data->threads.push_back (new WorkerThread (_data));
416
0
            _data->numThreads++;
417
0
        }
418
0
    }
419
0
}
420
421
422
void
423
ThreadPool::addTask (Task* task) 
424
0
{
425
    //
426
    // Lock the threads, needed to access numThreads
427
    //
428
429
0
    Lock lock (_data->threadMutex);
430
431
0
    if (_data->numThreads == 0)
432
0
    {
433
0
        task->execute ();
434
0
        delete task;
435
0
    }
436
0
    else
437
0
    {
438
  //
439
        // Get exclusive access to the tasks queue
440
  //
441
442
0
        {
443
0
            Lock taskLock (_data->taskMutex);
444
445
      //
446
            // Push the new task into the FIFO
447
      //
448
449
0
            _data->tasks.push_back (task);
450
0
            _data->numTasks++;
451
0
            task->group()->_data->addTask();
452
0
        }
453
        
454
  //
455
        // Signal that we have a new task to process
456
  //
457
458
0
        _data->taskSemaphore.post ();
459
0
    }
460
0
}
461
462
463
ThreadPool&
464
ThreadPool::globalThreadPool ()
465
0
{
466
    //
467
    // The global thread pool
468
    //
469
    
470
0
    static ThreadPool gThreadPool (0);
471
472
0
    return gThreadPool;
473
0
}
474
475
476
void
477
ThreadPool::addGlobalTask (Task* task)
478
0
{
479
0
    globalThreadPool().addTask (task);
480
0
}
481
482
483
ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT