Coverage Report

Created: 2025-04-11 06:56

/src/unrar/threadpool.cpp
Line
Count
Source (jump to first uncovered line)
1
#include "rar.hpp"
2
3
4
#ifdef RAR_SMP
5
#include "threadmisc.cpp"
6
7
#ifdef _WIN_ALL
8
int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL;
9
#endif
10
11
ThreadPool::ThreadPool(uint MaxThreads)
12
13.0k
{
13
13.0k
  MaxAllowedThreads = MaxThreads;
14
13.0k
  if (MaxAllowedThreads>MaxPoolThreads)
15
0
    MaxAllowedThreads=MaxPoolThreads;
16
13.0k
  if (MaxAllowedThreads==0)
17
0
    MaxAllowedThreads=1;
18
19
13.0k
  ThreadsCreatedCount=0;
20
21
  // If we have more threads than queue size, we'll hang on pool destroying,
22
  // not releasing all waiting threads.
23
13.0k
  if (MaxAllowedThreads>ASIZE(TaskQueue))
24
0
    MaxAllowedThreads=ASIZE(TaskQueue);
25
26
13.0k
  Closing=false;
27
28
13.0k
  bool Success = CriticalSectionCreate(&CritSection);
29
#ifdef _WIN_ALL
30
  QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL);
31
  NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL);
32
  Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL;
33
#elif defined(_UNIX)
34
  AnyActive = false;
35
13.0k
  QueuedTasksCnt = 0;
36
13.0k
  Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 &&
37
13.0k
          pthread_mutex_init(&AnyActiveMutex,NULL)==0 &&
38
13.0k
          pthread_cond_init(&QueuedTasksCntCond,NULL)==0 &&
39
13.0k
          pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0;
40
13.0k
#endif
41
13.0k
  if (!Success)
42
0
  {
43
0
    ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed.");
44
0
    ErrHandler.Exit(RARX_FATAL);
45
0
  }
46
47
13.0k
  QueueTop = 0;
48
13.0k
  QueueBottom = 0;
49
13.0k
  ActiveThreads = 0;
50
13.0k
}
51
52
53
ThreadPool::~ThreadPool()
54
13.0k
{
55
13.0k
  WaitDone();
56
13.0k
  Closing=true;
57
58
#ifdef _WIN_ALL
59
  ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL);
60
#elif defined(_UNIX)
61
  // Threads still can access QueuedTasksCnt for a short time after WaitDone(),
62
  // so lock is required. We would occassionally hang without it.
63
13.0k
  pthread_mutex_lock(&QueuedTasksCntMutex);
64
13.0k
  QueuedTasksCnt+=ASIZE(TaskQueue);
65
13.0k
  pthread_mutex_unlock(&QueuedTasksCntMutex);
66
67
13.0k
  pthread_cond_broadcast(&QueuedTasksCntCond);
68
13.0k
#endif
69
70
20.7k
  for(uint I=0;I<ThreadsCreatedCount;I++)
71
7.74k
  {
72
#ifdef _WIN_ALL
73
    // Waiting until the thread terminates.
74
    CWaitForSingleObject(ThreadHandles[I]);
75
#endif
76
    // Close the thread handle. In Unix it results in pthread_join call,
77
    // which also waits for thread termination.
78
7.74k
    ThreadClose(ThreadHandles[I]);
79
7.74k
  }
80
81
13.0k
  CriticalSectionDelete(&CritSection);
82
#ifdef _WIN_ALL
83
  CloseHandle(QueuedTasksCnt);
84
  CloseHandle(NoneActive);
85
#elif defined(_UNIX)
86
  pthread_cond_destroy(&AnyActiveCond);
87
13.0k
  pthread_mutex_destroy(&AnyActiveMutex);
88
13.0k
  pthread_cond_destroy(&QueuedTasksCntCond);
89
13.0k
  pthread_mutex_destroy(&QueuedTasksCntMutex);
90
13.0k
#endif
91
13.0k
}
92
93
94
void ThreadPool::CreateThreads()
95
968
{
96
#ifdef WIN32_CPU_GROUPS
97
  // 2024.12.28: Implement processor group support for pre-Windows 11 systems
98
  // with number of CPUs exceeding 64. For example, for 72 CPU the single
99
  // processor group size would be 36 and this is what RAR would use without
100
  // processor group support.
101
102
  uint GroupCount=0;
103
  uint CurGroupNumber=(uint)-1; // We'll increment it to 0 later.
104
  uint CurGroupSize=0,CumulativeGroupSize=0;
105
106
  typedef DWORD (WINAPI *GETACTIVEPROCESSORCOUNT)(WORD GroupNumber);
107
  GETACTIVEPROCESSORCOUNT pGetActiveProcessorCount=nullptr;
108
  typedef BOOL (WINAPI *GETTHREADGROUPAFFINITY)(HANDLE hThread,PGROUP_AFFINITY GroupAffinity);
109
  GETTHREADGROUPAFFINITY pGetThreadGroupAffinity=nullptr;
110
  typedef BOOL (WINAPI *SETTHREADGROUPAFFINITY)(HANDLE hThread,const GROUP_AFFINITY *GroupAffinity,PGROUP_AFFINITY PreviousGroupAffinity);
111
  SETTHREADGROUPAFFINITY pSetThreadGroupAffinity=nullptr;
112
113
  // Doesn't seem to be needed in Windows 11 and newer.
114
  // https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-getprocessaffinitymask
115
  // "Starting with Windows 11 and Windows Server 2022, on a system with
116
  //  more than 64 processors, process and thread affinities span all
117
  //  processors in the system, across all processor groups, by default."
118
  if (!IsWindows11OrGreater())
119
  {
120
    HMODULE hKernel=GetModuleHandle(L"kernel32.dll");
121
    if (hKernel!=nullptr)
122
    {
123
      typedef WORD (WINAPI *GETACTIVEPROCESSORGROUPCOUNT)();
124
      GETACTIVEPROCESSORGROUPCOUNT pGetActiveProcessorGroupCount=(GETACTIVEPROCESSORGROUPCOUNT)GetProcAddress(hKernel,"GetActiveProcessorGroupCount");
125
126
      pGetActiveProcessorCount=(GETACTIVEPROCESSORCOUNT)GetProcAddress(hKernel,"GetActiveProcessorCount");
127
      pGetThreadGroupAffinity=(GETTHREADGROUPAFFINITY)GetProcAddress(hKernel,"GetThreadGroupAffinity");
128
      pSetThreadGroupAffinity=(SETTHREADGROUPAFFINITY)GetProcAddress(hKernel,"SetThreadGroupAffinity");
129
130
      if (pGetActiveProcessorCount!=nullptr && pGetActiveProcessorGroupCount!=nullptr &&
131
          pGetThreadGroupAffinity!=nullptr && pSetThreadGroupAffinity!=nullptr)
132
        GroupCount=pGetActiveProcessorGroupCount();
133
    }
134
  }
135
#endif
136
137
8.71k
  for (uint I=0;I<MaxAllowedThreads;I++)
138
7.74k
  {
139
7.74k
    THREAD_HANDLE hThread=ThreadCreate(PoolThread, this);
140
7.74k
    ThreadHandles[I] = hThread;
141
7.74k
    ThreadsCreatedCount++;
142
#ifdef _WIN_ALL
143
#ifdef WIN32_CPU_GROUPS
144
    if (GroupCount>1) // If we have multiple processor groups.
145
    {
146
      if (I>=CumulativeGroupSize) // Filled the processor group, go to next.
147
      {
148
        if (++CurGroupNumber>=GroupCount)
149
        {
150
          // If we exceeded the group number, such as when user specified
151
          // -mt64 for lower core count, start assigning from beginning.
152
          CurGroupNumber=0;
153
          CumulativeGroupSize=0;
154
        }
155
        // Current group size.
156
        CurGroupSize=pGetActiveProcessorCount(CurGroupNumber);
157
        // Size of all preceding groups including the current.
158
        CumulativeGroupSize+=CurGroupSize;
159
      }
160
      GROUP_AFFINITY GroupAffinity;
161
      pGetThreadGroupAffinity(hThread,&GroupAffinity);
162
163
      // Since normally before Windows 11 all threads belong to same source
164
      // group, we could set this value only once. But we set it every time
165
      // in case we'll decide for some reason to use it to rearrange threads
166
      // from different source groups in Windows 11+.
167
      uint SrcGroupSize=pGetActiveProcessorCount(GroupAffinity.Group);
168
169
      // Shifting by 64 would be the undefined behavior, so we treat 64 separately.
170
      KAFFINITY SrcGroupMask=(KAFFINITY)(SrcGroupSize==64 ? (uint64)0xffffffffffffffff:(uint64(1)<<SrcGroupSize)-1);
171
172
      // Here we check that process affinity for existing thread group
173
      // matches the entire group size. If user limited the process
174
      // affinity, we prefer to not extend the process to other groups,
175
      // because user might want to restrict the resource usage.
176
      // Also if source processor group is larger than required number
177
      // of threads, we do not need to move threads between groups.
178
      if (SrcGroupSize!=0 && GroupAffinity.Mask==SrcGroupMask &&
179
          GroupAffinity.Group!=CurGroupNumber && SrcGroupSize<MaxAllowedThreads)
180
      {
181
        // Shifting by 64 would be the undefined behavior, so we treat 64 separately.
182
        KAFFINITY CurGroupMask=(KAFFINITY)(CurGroupSize==64 ? (uint64)0xffffffffffffffff:(uint64(1)<<CurGroupSize)-1);
183
        GroupAffinity.Mask=CurGroupMask; // Use the entire group.
184
        GroupAffinity.Group=CurGroupNumber;
185
186
        // Assign the thread to a new group.
187
        pSetThreadGroupAffinity(hThread,&GroupAffinity,NULL);
188
      }
189
    }
190
#endif
191
    
192
    // Set the thread priority if needed.
193
    if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL)
194
      SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority);
195
#endif
196
7.74k
  }
197
968
}
198
199
200
NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param)
201
7.74k
{
202
7.74k
  ((ThreadPool*)Param)->PoolThreadLoop();
203
7.74k
  return 0;
204
7.74k
}
205
206
207
void ThreadPool::PoolThreadLoop()
208
7.74k
{
209
7.74k
  QueueEntry Task;
210
431k
  while (GetQueuedTask(&Task))
211
423k
  {
212
423k
    Task.Proc(Task.Param);
213
    
214
423k
    CriticalSectionStart(&CritSection); 
215
423k
    if (--ActiveThreads == 0)
216
62.4k
    {
217
#ifdef _WIN_ALL
218
      SetEvent(NoneActive);
219
#elif defined(_UNIX)
220
      pthread_mutex_lock(&AnyActiveMutex);
221
62.4k
      AnyActive=false;
222
62.4k
      pthread_cond_signal(&AnyActiveCond);
223
62.4k
      pthread_mutex_unlock(&AnyActiveMutex);
224
62.4k
#endif
225
62.4k
    }
226
423k
    CriticalSectionEnd(&CritSection); 
227
423k
  }
228
7.74k
}
229
230
231
bool ThreadPool::GetQueuedTask(QueueEntry *Task)
232
431k
{
233
#ifdef _WIN_ALL
234
  CWaitForSingleObject(QueuedTasksCnt);
235
#elif defined(_UNIX)
236
  pthread_mutex_lock(&QueuedTasksCntMutex);
237
936k
  while (QueuedTasksCnt==0)
238
505k
    cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex);
239
431k
  QueuedTasksCnt--;
240
431k
  pthread_mutex_unlock(&QueuedTasksCntMutex);
241
431k
#endif
242
243
431k
  if (Closing)
244
7.74k
    return false;
245
246
423k
  CriticalSectionStart(&CritSection); 
247
248
423k
  *Task = TaskQueue[QueueBottom];
249
423k
  QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue);
250
251
423k
  CriticalSectionEnd(&CritSection); 
252
253
423k
  return true;
254
431k
}
255
256
257
// Add task to queue. We assume that it is always called from main thread,
258
// it allows to avoid any locks here. We process collected tasks only
259
// when WaitDone is called.
260
void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data)
261
423k
{
262
423k
  if (ThreadsCreatedCount == 0)
263
968
    CreateThreads();
264
  
265
  // If queue is full, wait until it is empty.
266
423k
  if (ActiveThreads>=ASIZE(TaskQueue))
267
0
    WaitDone();
268
269
423k
  TaskQueue[QueueTop].Proc = Proc;
270
423k
  TaskQueue[QueueTop].Param = Data;
271
423k
  QueueTop = (QueueTop + 1) % ASIZE(TaskQueue);
272
423k
  ActiveThreads++;
273
423k
}
274
275
276
// Start queued tasks and wait until all threads are inactive.
277
// We assume that it is always called from main thread, when pool threads
278
// are sleeping yet.
279
void ThreadPool::WaitDone()
280
114k
{
281
114k
  if (ActiveThreads==0)
282
52.1k
    return;
283
#ifdef _WIN_ALL
284
  ResetEvent(NoneActive);
285
  ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL);
286
  CWaitForSingleObject(NoneActive);
287
#elif defined(_UNIX)
288
62.4k
  AnyActive=true;
289
290
  // Threads reset AnyActive before accessing QueuedTasksCnt and even
291
  // preceding WaitDone() call does not guarantee that some slow thread
292
  // is not accessing QueuedTasksCnt now. So lock is necessary.
293
62.4k
  pthread_mutex_lock(&QueuedTasksCntMutex);
294
62.4k
  QueuedTasksCnt+=ActiveThreads;
295
62.4k
  pthread_mutex_unlock(&QueuedTasksCntMutex);
296
297
62.4k
  pthread_cond_broadcast(&QueuedTasksCntCond);
298
299
62.4k
  pthread_mutex_lock(&AnyActiveMutex);
300
124k
  while (AnyActive)
301
61.8k
    cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex);
302
62.4k
  pthread_mutex_unlock(&AnyActiveMutex);
303
62.4k
#endif
304
62.4k
}
305
#endif // RAR_SMP