/src/unrar/threadpool.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | #include "rar.hpp" |
2 | | |
3 | | |
4 | | #ifdef RAR_SMP |
5 | | #include "threadmisc.cpp" |
6 | | |
7 | | #ifdef _WIN_ALL |
8 | | int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL; |
9 | | #endif |
10 | | |
11 | | ThreadPool::ThreadPool(uint MaxThreads) |
12 | 13.0k | { |
13 | 13.0k | MaxAllowedThreads = MaxThreads; |
14 | 13.0k | if (MaxAllowedThreads>MaxPoolThreads) |
15 | 0 | MaxAllowedThreads=MaxPoolThreads; |
16 | 13.0k | if (MaxAllowedThreads==0) |
17 | 0 | MaxAllowedThreads=1; |
18 | | |
19 | 13.0k | ThreadsCreatedCount=0; |
20 | | |
21 | | // If we have more threads than queue size, we'll hang on pool destroying, |
22 | | // not releasing all waiting threads. |
23 | 13.0k | if (MaxAllowedThreads>ASIZE(TaskQueue)) |
24 | 0 | MaxAllowedThreads=ASIZE(TaskQueue); |
25 | | |
26 | 13.0k | Closing=false; |
27 | | |
28 | 13.0k | bool Success = CriticalSectionCreate(&CritSection); |
29 | | #ifdef _WIN_ALL |
30 | | QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL); |
31 | | NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL); |
32 | | Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL; |
33 | | #elif defined(_UNIX) |
34 | | AnyActive = false; |
35 | 13.0k | QueuedTasksCnt = 0; |
36 | 13.0k | Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 && |
37 | 13.0k | pthread_mutex_init(&AnyActiveMutex,NULL)==0 && |
38 | 13.0k | pthread_cond_init(&QueuedTasksCntCond,NULL)==0 && |
39 | 13.0k | pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0; |
40 | 13.0k | #endif |
41 | 13.0k | if (!Success) |
42 | 0 | { |
43 | 0 | ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed."); |
44 | 0 | ErrHandler.Exit(RARX_FATAL); |
45 | 0 | } |
46 | | |
47 | 13.0k | QueueTop = 0; |
48 | 13.0k | QueueBottom = 0; |
49 | 13.0k | ActiveThreads = 0; |
50 | 13.0k | } |
51 | | |
52 | | |
53 | | ThreadPool::~ThreadPool() |
54 | 13.0k | { |
55 | 13.0k | WaitDone(); |
56 | 13.0k | Closing=true; |
57 | | |
58 | | #ifdef _WIN_ALL |
59 | | ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL); |
60 | | #elif defined(_UNIX) |
61 | | // Threads still can access QueuedTasksCnt for a short time after WaitDone(), |
62 | | // so lock is required. We would occassionally hang without it. |
63 | 13.0k | pthread_mutex_lock(&QueuedTasksCntMutex); |
64 | 13.0k | QueuedTasksCnt+=ASIZE(TaskQueue); |
65 | 13.0k | pthread_mutex_unlock(&QueuedTasksCntMutex); |
66 | | |
67 | 13.0k | pthread_cond_broadcast(&QueuedTasksCntCond); |
68 | 13.0k | #endif |
69 | | |
70 | 20.7k | for(uint I=0;I<ThreadsCreatedCount;I++) |
71 | 7.74k | { |
72 | | #ifdef _WIN_ALL |
73 | | // Waiting until the thread terminates. |
74 | | CWaitForSingleObject(ThreadHandles[I]); |
75 | | #endif |
76 | | // Close the thread handle. In Unix it results in pthread_join call, |
77 | | // which also waits for thread termination. |
78 | 7.74k | ThreadClose(ThreadHandles[I]); |
79 | 7.74k | } |
80 | | |
81 | 13.0k | CriticalSectionDelete(&CritSection); |
82 | | #ifdef _WIN_ALL |
83 | | CloseHandle(QueuedTasksCnt); |
84 | | CloseHandle(NoneActive); |
85 | | #elif defined(_UNIX) |
86 | | pthread_cond_destroy(&AnyActiveCond); |
87 | 13.0k | pthread_mutex_destroy(&AnyActiveMutex); |
88 | 13.0k | pthread_cond_destroy(&QueuedTasksCntCond); |
89 | 13.0k | pthread_mutex_destroy(&QueuedTasksCntMutex); |
90 | 13.0k | #endif |
91 | 13.0k | } |
92 | | |
93 | | |
94 | | void ThreadPool::CreateThreads() |
95 | 968 | { |
96 | | #ifdef WIN32_CPU_GROUPS |
97 | | // 2024.12.28: Implement processor group support for pre-Windows 11 systems |
98 | | // with number of CPUs exceeding 64. For example, for 72 CPU the single |
99 | | // processor group size would be 36 and this is what RAR would use without |
100 | | // processor group support. |
101 | | |
102 | | uint GroupCount=0; |
103 | | uint CurGroupNumber=(uint)-1; // We'll increment it to 0 later. |
104 | | uint CurGroupSize=0,CumulativeGroupSize=0; |
105 | | |
106 | | typedef DWORD (WINAPI *GETACTIVEPROCESSORCOUNT)(WORD GroupNumber); |
107 | | GETACTIVEPROCESSORCOUNT pGetActiveProcessorCount=nullptr; |
108 | | typedef BOOL (WINAPI *GETTHREADGROUPAFFINITY)(HANDLE hThread,PGROUP_AFFINITY GroupAffinity); |
109 | | GETTHREADGROUPAFFINITY pGetThreadGroupAffinity=nullptr; |
110 | | typedef BOOL (WINAPI *SETTHREADGROUPAFFINITY)(HANDLE hThread,const GROUP_AFFINITY *GroupAffinity,PGROUP_AFFINITY PreviousGroupAffinity); |
111 | | SETTHREADGROUPAFFINITY pSetThreadGroupAffinity=nullptr; |
112 | | |
113 | | // Doesn't seem to be needed in Windows 11 and newer. |
114 | | // https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-getprocessaffinitymask |
115 | | // "Starting with Windows 11 and Windows Server 2022, on a system with |
116 | | // more than 64 processors, process and thread affinities span all |
117 | | // processors in the system, across all processor groups, by default." |
118 | | if (!IsWindows11OrGreater()) |
119 | | { |
120 | | HMODULE hKernel=GetModuleHandle(L"kernel32.dll"); |
121 | | if (hKernel!=nullptr) |
122 | | { |
123 | | typedef WORD (WINAPI *GETACTIVEPROCESSORGROUPCOUNT)(); |
124 | | GETACTIVEPROCESSORGROUPCOUNT pGetActiveProcessorGroupCount=(GETACTIVEPROCESSORGROUPCOUNT)GetProcAddress(hKernel,"GetActiveProcessorGroupCount"); |
125 | | |
126 | | pGetActiveProcessorCount=(GETACTIVEPROCESSORCOUNT)GetProcAddress(hKernel,"GetActiveProcessorCount"); |
127 | | pGetThreadGroupAffinity=(GETTHREADGROUPAFFINITY)GetProcAddress(hKernel,"GetThreadGroupAffinity"); |
128 | | pSetThreadGroupAffinity=(SETTHREADGROUPAFFINITY)GetProcAddress(hKernel,"SetThreadGroupAffinity"); |
129 | | |
130 | | if (pGetActiveProcessorCount!=nullptr && pGetActiveProcessorGroupCount!=nullptr && |
131 | | pGetThreadGroupAffinity!=nullptr && pSetThreadGroupAffinity!=nullptr) |
132 | | GroupCount=pGetActiveProcessorGroupCount(); |
133 | | } |
134 | | } |
135 | | #endif |
136 | | |
137 | 8.71k | for (uint I=0;I<MaxAllowedThreads;I++) |
138 | 7.74k | { |
139 | 7.74k | THREAD_HANDLE hThread=ThreadCreate(PoolThread, this); |
140 | 7.74k | ThreadHandles[I] = hThread; |
141 | 7.74k | ThreadsCreatedCount++; |
142 | | #ifdef _WIN_ALL |
143 | | #ifdef WIN32_CPU_GROUPS |
144 | | if (GroupCount>1) // If we have multiple processor groups. |
145 | | { |
146 | | if (I>=CumulativeGroupSize) // Filled the processor group, go to next. |
147 | | { |
148 | | if (++CurGroupNumber>=GroupCount) |
149 | | { |
150 | | // If we exceeded the group number, such as when user specified |
151 | | // -mt64 for lower core count, start assigning from beginning. |
152 | | CurGroupNumber=0; |
153 | | CumulativeGroupSize=0; |
154 | | } |
155 | | // Current group size. |
156 | | CurGroupSize=pGetActiveProcessorCount(CurGroupNumber); |
157 | | // Size of all preceding groups including the current. |
158 | | CumulativeGroupSize+=CurGroupSize; |
159 | | } |
160 | | GROUP_AFFINITY GroupAffinity; |
161 | | pGetThreadGroupAffinity(hThread,&GroupAffinity); |
162 | | |
163 | | // Since normally before Windows 11 all threads belong to same source |
164 | | // group, we could set this value only once. But we set it every time |
165 | | // in case we'll decide for some reason to use it to rearrange threads |
166 | | // from different source groups in Windows 11+. |
167 | | uint SrcGroupSize=pGetActiveProcessorCount(GroupAffinity.Group); |
168 | | |
169 | | // Shifting by 64 would be the undefined behavior, so we treat 64 separately. |
170 | | KAFFINITY SrcGroupMask=(KAFFINITY)(SrcGroupSize==64 ? (uint64)0xffffffffffffffff:(uint64(1)<<SrcGroupSize)-1); |
171 | | |
172 | | // Here we check that process affinity for existing thread group |
173 | | // matches the entire group size. If user limited the process |
174 | | // affinity, we prefer to not extend the process to other groups, |
175 | | // because user might want to restrict the resource usage. |
176 | | // Also if source processor group is larger than required number |
177 | | // of threads, we do not need to move threads between groups. |
178 | | if (SrcGroupSize!=0 && GroupAffinity.Mask==SrcGroupMask && |
179 | | GroupAffinity.Group!=CurGroupNumber && SrcGroupSize<MaxAllowedThreads) |
180 | | { |
181 | | // Shifting by 64 would be the undefined behavior, so we treat 64 separately. |
182 | | KAFFINITY CurGroupMask=(KAFFINITY)(CurGroupSize==64 ? (uint64)0xffffffffffffffff:(uint64(1)<<CurGroupSize)-1); |
183 | | GroupAffinity.Mask=CurGroupMask; // Use the entire group. |
184 | | GroupAffinity.Group=CurGroupNumber; |
185 | | |
186 | | // Assign the thread to a new group. |
187 | | pSetThreadGroupAffinity(hThread,&GroupAffinity,NULL); |
188 | | } |
189 | | } |
190 | | #endif |
191 | | |
192 | | // Set the thread priority if needed. |
193 | | if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL) |
194 | | SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority); |
195 | | #endif |
196 | 7.74k | } |
197 | 968 | } |
198 | | |
199 | | |
200 | | NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param) |
201 | 7.74k | { |
202 | 7.74k | ((ThreadPool*)Param)->PoolThreadLoop(); |
203 | 7.74k | return 0; |
204 | 7.74k | } |
205 | | |
206 | | |
207 | | void ThreadPool::PoolThreadLoop() |
208 | 7.74k | { |
209 | 7.74k | QueueEntry Task; |
210 | 431k | while (GetQueuedTask(&Task)) |
211 | 423k | { |
212 | 423k | Task.Proc(Task.Param); |
213 | | |
214 | 423k | CriticalSectionStart(&CritSection); |
215 | 423k | if (--ActiveThreads == 0) |
216 | 62.4k | { |
217 | | #ifdef _WIN_ALL |
218 | | SetEvent(NoneActive); |
219 | | #elif defined(_UNIX) |
220 | | pthread_mutex_lock(&AnyActiveMutex); |
221 | 62.4k | AnyActive=false; |
222 | 62.4k | pthread_cond_signal(&AnyActiveCond); |
223 | 62.4k | pthread_mutex_unlock(&AnyActiveMutex); |
224 | 62.4k | #endif |
225 | 62.4k | } |
226 | 423k | CriticalSectionEnd(&CritSection); |
227 | 423k | } |
228 | 7.74k | } |
229 | | |
230 | | |
231 | | bool ThreadPool::GetQueuedTask(QueueEntry *Task) |
232 | 431k | { |
233 | | #ifdef _WIN_ALL |
234 | | CWaitForSingleObject(QueuedTasksCnt); |
235 | | #elif defined(_UNIX) |
236 | | pthread_mutex_lock(&QueuedTasksCntMutex); |
237 | 936k | while (QueuedTasksCnt==0) |
238 | 505k | cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex); |
239 | 431k | QueuedTasksCnt--; |
240 | 431k | pthread_mutex_unlock(&QueuedTasksCntMutex); |
241 | 431k | #endif |
242 | | |
243 | 431k | if (Closing) |
244 | 7.74k | return false; |
245 | | |
246 | 423k | CriticalSectionStart(&CritSection); |
247 | | |
248 | 423k | *Task = TaskQueue[QueueBottom]; |
249 | 423k | QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue); |
250 | | |
251 | 423k | CriticalSectionEnd(&CritSection); |
252 | | |
253 | 423k | return true; |
254 | 431k | } |
255 | | |
256 | | |
257 | | // Add task to queue. We assume that it is always called from main thread, |
258 | | // it allows to avoid any locks here. We process collected tasks only |
259 | | // when WaitDone is called. |
260 | | void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data) |
261 | 423k | { |
262 | 423k | if (ThreadsCreatedCount == 0) |
263 | 968 | CreateThreads(); |
264 | | |
265 | | // If queue is full, wait until it is empty. |
266 | 423k | if (ActiveThreads>=ASIZE(TaskQueue)) |
267 | 0 | WaitDone(); |
268 | | |
269 | 423k | TaskQueue[QueueTop].Proc = Proc; |
270 | 423k | TaskQueue[QueueTop].Param = Data; |
271 | 423k | QueueTop = (QueueTop + 1) % ASIZE(TaskQueue); |
272 | 423k | ActiveThreads++; |
273 | 423k | } |
274 | | |
275 | | |
276 | | // Start queued tasks and wait until all threads are inactive. |
277 | | // We assume that it is always called from main thread, when pool threads |
278 | | // are sleeping yet. |
279 | | void ThreadPool::WaitDone() |
280 | 114k | { |
281 | 114k | if (ActiveThreads==0) |
282 | 52.1k | return; |
283 | | #ifdef _WIN_ALL |
284 | | ResetEvent(NoneActive); |
285 | | ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL); |
286 | | CWaitForSingleObject(NoneActive); |
287 | | #elif defined(_UNIX) |
288 | 62.4k | AnyActive=true; |
289 | | |
290 | | // Threads reset AnyActive before accessing QueuedTasksCnt and even |
291 | | // preceding WaitDone() call does not guarantee that some slow thread |
292 | | // is not accessing QueuedTasksCnt now. So lock is necessary. |
293 | 62.4k | pthread_mutex_lock(&QueuedTasksCntMutex); |
294 | 62.4k | QueuedTasksCnt+=ActiveThreads; |
295 | 62.4k | pthread_mutex_unlock(&QueuedTasksCntMutex); |
296 | | |
297 | 62.4k | pthread_cond_broadcast(&QueuedTasksCntCond); |
298 | | |
299 | 62.4k | pthread_mutex_lock(&AnyActiveMutex); |
300 | 124k | while (AnyActive) |
301 | 61.8k | cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex); |
302 | 62.4k | pthread_mutex_unlock(&AnyActiveMutex); |
303 | 62.4k | #endif |
304 | 62.4k | } |
305 | | #endif // RAR_SMP |