Coverage Report

Created: 2025-07-16 07:53

/src/openh264/codec/common/src/WelsThreadPool.cpp
Line
Count
Source (jump to first uncovered line)
1
/*!
2
 * \copy
3
 *     Copyright (c)  2009-2015, Cisco Systems
4
 *     All rights reserved.
5
 *
6
 *     Redistribution and use in source and binary forms, with or without
7
 *     modification, are permitted provided that the following conditions
8
 *     are met:
9
 *
10
 *        * Redistributions of source code must retain the above copyright
11
 *          notice, this list of conditions and the following disclaimer.
12
 *
13
 *        * Redistributions in binary form must reproduce the above copyright
14
 *          notice, this list of conditions and the following disclaimer in
15
 *          the documentation and/or other materials provided with the
16
 *          distribution.
17
 *
18
 *     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19
 *     "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20
 *     LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
21
 *     FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22
 *     COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
23
 *     INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
24
 *     BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25
 *     LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26
 *     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
 *     LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
 *     ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
 *     POSSIBILITY OF SUCH DAMAGE.
30
 *
31
 *
32
 * \file    WelsThreadPool.cpp
33
 *
34
 * \brief   functions for Thread Pool
35
 *
36
 * \date    5/09/2012 Created
37
 *
38
 *************************************************************************************
39
 */
40
#include "typedefs.h"
41
#include "memory_align.h"
42
#include "WelsThreadPool.h"
43
44
namespace WelsCommon {
45
46
namespace {
47
48
0
CWelsLock& GetInitLock() {
49
0
  static CWelsLock *initLock = new CWelsLock;
50
0
  return *initLock;
51
0
}
52
53
}
54
55
int32_t CWelsThreadPool::m_iRefCount = 0;
56
int32_t CWelsThreadPool::m_iMaxThreadNum = DEFAULT_THREAD_NUM;
57
CWelsThreadPool* CWelsThreadPool::m_pThreadPoolSelf = NULL;
58
59
CWelsThreadPool::CWelsThreadPool() :
60
0
  m_cWaitedTasks (NULL), m_cIdleThreads (NULL), m_cBusyThreads (NULL) {
61
0
}
62
63
64
0
CWelsThreadPool::~CWelsThreadPool() {
65
  //fprintf(stdout, "CWelsThreadPool::~CWelsThreadPool: delete %x, %x, %x\n", m_cWaitedTasks, m_cIdleThreads, m_cBusyThreads);
66
0
  if (0 != m_iRefCount) {
67
0
    m_iRefCount = 0;
68
0
    Uninit();
69
0
  }
70
0
}
71
72
0
WELS_THREAD_ERROR_CODE CWelsThreadPool::SetThreadNum (int32_t iMaxThreadNum) {
73
0
  CWelsAutoLock  cLock (GetInitLock());
74
75
0
  if (m_iRefCount != 0) {
76
0
    return WELS_THREAD_ERROR_GENERAL;
77
0
  }
78
79
0
  if (iMaxThreadNum <= 0) {
80
0
    iMaxThreadNum = 1;
81
0
  }
82
0
  m_iMaxThreadNum = iMaxThreadNum;
83
0
  return WELS_THREAD_ERROR_OK;
84
0
}
85
86
87
0
CWelsThreadPool* CWelsThreadPool::AddReference() {
88
0
  CWelsAutoLock  cLock (GetInitLock());
89
0
  if (m_pThreadPoolSelf == NULL) {
90
0
    m_pThreadPoolSelf = new CWelsThreadPool();
91
0
    if (!m_pThreadPoolSelf) {
92
0
      return NULL;
93
0
    }
94
0
  }
95
96
0
  if (m_iRefCount == 0) {
97
0
    if (WELS_THREAD_ERROR_OK != m_pThreadPoolSelf->Init()) {
98
0
      m_pThreadPoolSelf->Uninit();
99
0
      delete m_pThreadPoolSelf;
100
0
      m_pThreadPoolSelf = NULL;
101
0
      return NULL;
102
0
    }
103
0
  }
104
105
  ////fprintf(stdout, "m_iRefCount=%d, iMaxThreadNum=%d\n", m_iRefCount, m_iMaxThreadNum);
106
107
0
  ++ m_iRefCount;
108
  //fprintf(stdout, "m_iRefCount2=%d\n", m_iRefCount);
109
0
  return m_pThreadPoolSelf;
110
0
}
111
112
0
void CWelsThreadPool::RemoveInstance() {
113
0
  CWelsAutoLock  cLock (GetInitLock());
114
  //fprintf(stdout, "m_iRefCount=%d\n", m_iRefCount);
115
0
  -- m_iRefCount;
116
0
  if (0 == m_iRefCount) {
117
0
    StopAllRunning();
118
0
    Uninit();
119
0
    if (m_pThreadPoolSelf) {
120
0
      delete m_pThreadPoolSelf;
121
0
      m_pThreadPoolSelf = NULL;
122
0
    }
123
    //fprintf(stdout, "m_iRefCount=%d, IdleThreadNum=%d, BusyThreadNum=%d, WaitedTask=%d\n", m_iRefCount, GetIdleThreadNum(), GetBusyThreadNum(), GetWaitedTaskNum());
124
0
  }
125
0
}
126
127
128
0
bool CWelsThreadPool::IsReferenced() {
129
0
  CWelsAutoLock  cLock (GetInitLock());
130
0
  return (m_iRefCount > 0);
131
0
}
132
133
134
0
WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStart (CWelsTaskThread* pThread, IWelsTask* pTask) {
135
0
  AddThreadToBusyList (pThread);
136
  //fprintf(stdout, "CWelsThreadPool::AddThreadToBusyList: Task %x at Thread %x\n", pTask, pThread);
137
0
  return WELS_THREAD_ERROR_OK;
138
0
}
139
140
0
WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStop (CWelsTaskThread* pThread, IWelsTask* pTask) {
141
  //fprintf(stdout, "CWelsThreadPool::OnTaskStop 0: Task %x at Thread %x Finished\n", pTask, pThread);
142
143
0
  RemoveThreadFromBusyList (pThread);
144
0
  AddThreadToIdleQueue (pThread);
145
146
0
  if (pTask && pTask->GetSink()) {
147
    //fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, pTask->GetSink());
148
0
    pTask->GetSink()->OnTaskExecuted();
149
    ////fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, pTask->GetSink());
150
0
  }
151
  //if (m_pSink) {
152
  //  m_pSink->OnTaskExecuted (pTask);
153
  //}
154
  //fprintf(stdout, "CWelsThreadPool::OnTaskStop 2: Task %x at Thread %x Finished\n", pTask, pThread);
155
156
0
  SignalThread();
157
158
  //fprintf(stdout, "ThreadPool: Task %x at Thread %x Finished\n", pTask, pThread);
159
0
  return WELS_THREAD_ERROR_OK;
160
0
}
161
162
0
WELS_THREAD_ERROR_CODE CWelsThreadPool::Init() {
163
  //fprintf(stdout, "Enter WelsThreadPool Init\n");
164
165
0
  CWelsAutoLock  cLock (m_cLockPool);
166
167
0
  m_cWaitedTasks = new CWelsNonDuplicatedList<IWelsTask>();
168
0
  m_cIdleThreads = new CWelsNonDuplicatedList<CWelsTaskThread>();
169
0
  m_cBusyThreads = new CWelsList<CWelsTaskThread>();
170
0
  if (NULL == m_cWaitedTasks || NULL == m_cIdleThreads || NULL == m_cBusyThreads) {
171
0
    return WELS_THREAD_ERROR_GENERAL;
172
0
  }
173
174
0
  for (int32_t i = 0; i < m_iMaxThreadNum; i++) {
175
0
    if (WELS_THREAD_ERROR_OK != CreateIdleThread()) {
176
0
      return WELS_THREAD_ERROR_GENERAL;
177
0
    }
178
0
  }
179
180
0
  if (WELS_THREAD_ERROR_OK != Start()) {
181
0
    return WELS_THREAD_ERROR_GENERAL;
182
0
  }
183
184
0
  return WELS_THREAD_ERROR_OK;
185
0
}
186
187
0
WELS_THREAD_ERROR_CODE CWelsThreadPool::StopAllRunning() {
188
0
  WELS_THREAD_ERROR_CODE iReturn = WELS_THREAD_ERROR_OK;
189
190
0
  ClearWaitedTasks();
191
192
0
  while (GetBusyThreadNum() > 0) {
193
    //WELS_INFO_TRACE ("CWelsThreadPool::Uninit - Waiting all thread to exit");
194
0
    WelsSleep (10);
195
0
  }
196
197
0
  if (GetIdleThreadNum() != m_iMaxThreadNum) {
198
0
    iReturn = WELS_THREAD_ERROR_GENERAL;
199
0
  }
200
201
0
  return iReturn;
202
0
}
203
204
0
WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() {
205
0
  WELS_THREAD_ERROR_CODE iReturn = WELS_THREAD_ERROR_OK;
206
0
  CWelsAutoLock  cLock (m_cLockPool);
207
208
0
  iReturn = StopAllRunning();
209
0
  if (WELS_THREAD_ERROR_OK != iReturn) {
210
0
    return iReturn;
211
0
  }
212
213
0
  m_cLockIdleTasks.Lock();
214
0
  while (m_cIdleThreads->size() > 0) {
215
0
    DestroyThread (m_cIdleThreads->begin());
216
0
    m_cIdleThreads->pop_front();
217
0
  }
218
0
  m_cLockIdleTasks.Unlock();
219
220
0
  Kill();
221
222
0
  WELS_DELETE_OP (m_cWaitedTasks);
223
0
  WELS_DELETE_OP (m_cIdleThreads);
224
0
  WELS_DELETE_OP (m_cBusyThreads);
225
226
0
  return iReturn;
227
0
}
228
229
0
void CWelsThreadPool::ExecuteTask() {
230
  //fprintf(stdout, "ThreadPool: scheduled tasks: ExecuteTask\n");
231
0
  CWelsTaskThread* pThread = NULL;
232
0
  IWelsTask*    pTask = NULL;
233
0
  while (GetWaitedTaskNum() > 0) {
234
    //fprintf(stdout, "ThreadPool:  ExecuteTask: waiting task %d\n", GetWaitedTaskNum());
235
0
    pThread = GetIdleThread();
236
0
    if (pThread == NULL) {
237
      //fprintf(stdout, "ThreadPool:  ExecuteTask: no IdleThread\n");
238
239
0
      break;
240
0
    }
241
0
    pTask = GetWaitedTask();
242
    //fprintf(stdout, "ThreadPool:  ExecuteTask = %x at thread %x\n", pTask, pThread);
243
0
    if (pTask) {
244
0
      pThread->SetTask (pTask);
245
0
    } else {
246
0
      AddThreadToIdleQueue (pThread);
247
0
    }
248
0
  }
249
0
}
250
251
0
WELS_THREAD_ERROR_CODE CWelsThreadPool::QueueTask (IWelsTask* pTask) {
252
0
  CWelsAutoLock  cLock (m_cLockPool);
253
254
  //fprintf(stdout, "CWelsThreadPool::QueueTask: %d, pTask=%x\n", m_iRefCount, pTask);
255
0
  if (GetWaitedTaskNum() == 0) {
256
0
    CWelsTaskThread* pThread = GetIdleThread();
257
258
0
    if (pThread != NULL) {
259
      //fprintf(stdout, "ThreadPool:  ExecuteTask = %x at thread %x\n", pTask, pThread);
260
0
      pThread->SetTask (pTask);
261
262
0
      return WELS_THREAD_ERROR_OK;
263
0
    }
264
0
  }
265
  //fprintf(stdout, "ThreadPool:  AddTaskToWaitedList: %x\n", pTask);
266
0
  if (false == AddTaskToWaitedList (pTask)) {
267
0
    return WELS_THREAD_ERROR_GENERAL;
268
0
  }
269
270
  //fprintf(stdout, "ThreadPool:  SignalThread: %x\n", pTask);
271
0
  SignalThread();
272
0
  return WELS_THREAD_ERROR_OK;
273
0
}
274
275
0
WELS_THREAD_ERROR_CODE CWelsThreadPool::CreateIdleThread() {
276
0
  CWelsTaskThread* pThread = new CWelsTaskThread (this);
277
278
0
  if (NULL == pThread) {
279
0
    return WELS_THREAD_ERROR_GENERAL;
280
0
  }
281
282
0
  if (WELS_THREAD_ERROR_OK != pThread->Start()) {
283
0
    return WELS_THREAD_ERROR_GENERAL;
284
0
  }
285
  //fprintf(stdout, "ThreadPool:  AddThreadToIdleQueue: %x\n", pThread);
286
0
  AddThreadToIdleQueue (pThread);
287
288
0
  return WELS_THREAD_ERROR_OK;
289
0
}
290
291
0
void  CWelsThreadPool::DestroyThread (CWelsTaskThread* pThread) {
292
0
  pThread->Kill();
293
0
  WELS_DELETE_OP (pThread);
294
295
0
  return;
296
0
}
297
298
0
WELS_THREAD_ERROR_CODE CWelsThreadPool::AddThreadToIdleQueue (CWelsTaskThread* pThread) {
299
0
  CWelsAutoLock cLock (m_cLockIdleTasks);
300
0
  m_cIdleThreads->push_back (pThread);
301
0
  return WELS_THREAD_ERROR_OK;
302
0
}
303
304
0
WELS_THREAD_ERROR_CODE CWelsThreadPool::AddThreadToBusyList (CWelsTaskThread* pThread) {
305
0
  CWelsAutoLock cLock (m_cLockBusyTasks);
306
0
  m_cBusyThreads->push_back (pThread);
307
0
  return WELS_THREAD_ERROR_OK;
308
0
}
309
310
0
WELS_THREAD_ERROR_CODE CWelsThreadPool::RemoveThreadFromBusyList (CWelsTaskThread* pThread) {
311
0
  CWelsAutoLock cLock (m_cLockBusyTasks);
312
0
  if (m_cBusyThreads->erase (pThread)) {
313
0
    return WELS_THREAD_ERROR_OK;
314
0
  } else {
315
0
    return WELS_THREAD_ERROR_GENERAL;
316
0
  }
317
0
}
318
319
0
bool  CWelsThreadPool::AddTaskToWaitedList (IWelsTask* pTask) {
320
0
  CWelsAutoLock  cLock (m_cLockWaitedTasks);
321
322
0
  return m_cWaitedTasks->push_back (pTask);
323
0
}
324
325
0
CWelsTaskThread*   CWelsThreadPool::GetIdleThread() {
326
0
  CWelsAutoLock cLock (m_cLockIdleTasks);
327
328
0
  if (NULL == m_cIdleThreads || m_cIdleThreads->size() == 0) {
329
0
    return NULL;
330
0
  }
331
332
  //fprintf(stdout, "CWelsThreadPool::GetIdleThread=%d\n", m_cIdleThreads->size());
333
334
0
  CWelsTaskThread* pThread = m_cIdleThreads->begin();
335
0
  m_cIdleThreads->pop_front();
336
0
  return pThread;
337
0
}
338
339
0
int32_t  CWelsThreadPool::GetBusyThreadNum() {
340
0
  return (m_cBusyThreads?m_cBusyThreads->size():0);
341
0
}
342
343
0
int32_t  CWelsThreadPool::GetIdleThreadNum() {
344
0
  return (m_cIdleThreads?m_cIdleThreads->size():0);
345
0
}
346
347
0
int32_t  CWelsThreadPool::GetWaitedTaskNum() {
348
0
  return (m_cWaitedTasks?m_cWaitedTasks->size():0);
349
0
}
350
351
0
IWelsTask* CWelsThreadPool::GetWaitedTask() {
352
0
  CWelsAutoLock lock (m_cLockWaitedTasks);
353
354
0
  if (NULL==m_cWaitedTasks || m_cWaitedTasks->size() == 0) {
355
0
    return NULL;
356
0
  }
357
358
0
  IWelsTask* pTask = m_cWaitedTasks->begin();
359
360
0
  m_cWaitedTasks->pop_front();
361
362
0
  return pTask;
363
0
}
364
365
0
void  CWelsThreadPool::ClearWaitedTasks() {
366
0
  CWelsAutoLock cLock (m_cLockWaitedTasks);
367
0
  if (NULL == m_cWaitedTasks) {
368
0
    return;
369
0
  }
370
0
  IWelsTask* pTask = NULL;
371
0
  while (0 != m_cWaitedTasks->size()) {
372
0
    pTask = m_cWaitedTasks->begin();
373
0
    if (pTask->GetSink()) {
374
0
      pTask->GetSink()->OnTaskCancelled();
375
0
    }
376
0
    m_cWaitedTasks->pop_front();
377
0
  }
378
0
}
379
380
}