/src/x265/source/common/threadpool.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /***************************************************************************** |
2 | | * Copyright (C) 2013-2020 MulticoreWare, Inc |
3 | | * |
4 | | * Authors: Steve Borho <steve@borho.org> |
5 | | * Min Chen <chenm003@163.com> |
6 | | * |
7 | | * This program is free software; you can redistribute it and/or modify |
8 | | * it under the terms of the GNU General Public License as published by |
9 | | * the Free Software Foundation; either version 2 of the License, or |
10 | | * (at your option) any later version. |
11 | | * |
12 | | * This program is distributed in the hope that it will be useful, |
13 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
14 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
15 | | * GNU General Public License for more details. |
16 | | * |
17 | | * You should have received a copy of the GNU General Public License |
18 | | * along with this program; if not, write to the Free Software |
19 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111, USA. |
20 | | * |
21 | | * This program is also available under a commercial proprietary license. |
22 | | * For more information, contact us at license @ x265.com |
23 | | *****************************************************************************/ |
24 | | |
25 | | #include "common.h" |
26 | | #include "threadpool.h" |
27 | | #include "threading.h" |
28 | | |
29 | | #include <new> |
30 | | |
31 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
32 | | #include <winnt.h> |
33 | | #endif |
34 | | |
35 | | #if X86_64 |
36 | | |
37 | | #ifdef __GNUC__ |
38 | | |
39 | 10.0k | #define SLEEPBITMAP_CTZ(id, x) id = (unsigned long)__builtin_ctzll(x) |
40 | 77.0k | #define SLEEPBITMAP_OR(ptr, mask) __sync_fetch_and_or(ptr, mask) |
41 | 10.0k | #define SLEEPBITMAP_AND(ptr, mask) __sync_fetch_and_and(ptr, mask) |
42 | | |
43 | | #elif defined(_MSC_VER) |
44 | | |
45 | | #define SLEEPBITMAP_CTZ(id, x) _BitScanForward64(&id, x) |
46 | | #define SLEEPBITMAP_OR(ptr, mask) InterlockedOr64((volatile LONG64*)ptr, (LONG)mask) |
47 | | #define SLEEPBITMAP_AND(ptr, mask) InterlockedAnd64((volatile LONG64*)ptr, (LONG)mask) |
48 | | |
49 | | #endif // ifdef __GNUC__ |
50 | | |
51 | | #else |
52 | | |
53 | | /* use 32-bit primitives defined in threading.h */ |
54 | | #define SLEEPBITMAP_CTZ CTZ |
55 | | #define SLEEPBITMAP_OR ATOMIC_OR |
56 | | #define SLEEPBITMAP_AND ATOMIC_AND |
57 | | |
58 | | #endif |
59 | | |
60 | | /* TODO FIX: Macro __MACH__ ideally should be part of MacOS definition, but adding to Cmake |
61 | | behaving is not as expected, need to fix this. */ |
62 | | |
63 | | #if MACOS && __MACH__ |
64 | | #include <sys/param.h> |
65 | | #include <sys/sysctl.h> |
66 | | #endif |
67 | | #if HAVE_LIBNUMA |
68 | | #include <numa.h> |
69 | | #endif |
70 | | #if defined(_MSC_VER) |
71 | | # define strcasecmp _stricmp |
72 | | #endif |
73 | | |
74 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
75 | | const uint64_t m1 = 0x5555555555555555; //binary: 0101... |
76 | | const uint64_t m2 = 0x3333333333333333; //binary: 00110011.. |
77 | | const uint64_t m3 = 0x0f0f0f0f0f0f0f0f; //binary: 4 zeros, 4 ones ... |
78 | | const uint64_t h01 = 0x0101010101010101; //the sum of 256 to the power of 0,1,2,3... |
79 | | |
80 | | static int popCount(uint64_t x) |
81 | | { |
82 | | x -= (x >> 1) & m1; |
83 | | x = (x & m2) + ((x >> 2) & m2); |
84 | | x = (x + (x >> 4)) & m3; |
85 | | return (x * h01) >> 56; |
86 | | } |
87 | | #endif |
88 | | |
89 | | namespace X265_NS { |
90 | | // x265 private namespace |
91 | | |
92 | | class WorkerThread : public Thread |
93 | | { |
94 | | private: |
95 | | |
96 | | ThreadPool& m_pool; |
97 | | int m_id; |
98 | | Event m_wakeEvent; |
99 | | |
100 | | WorkerThread& operator =(const WorkerThread&); |
101 | | |
102 | | public: |
103 | | |
104 | | JobProvider* m_curJobProvider; |
105 | | BondedTaskGroup* m_bondMaster; |
106 | | |
107 | 22.3k | WorkerThread(ThreadPool& pool, int id) : m_pool(pool), m_id(id) {} |
108 | 22.3k | virtual ~WorkerThread() {} |
109 | | |
110 | | void threadMain(); |
111 | 32.4k | void awaken() { m_wakeEvent.trigger(); } |
112 | | }; |
113 | | |
114 | | void WorkerThread::threadMain() |
115 | 22.3k | { |
116 | 22.3k | THREAD_NAME("Worker", m_id); |
117 | | |
118 | | #if _WIN32 |
119 | | SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL); |
120 | | #else |
121 | 22.3k | __attribute__((unused)) int val = nice(10); |
122 | 22.3k | #endif |
123 | | |
124 | 22.3k | m_pool.setCurrentThreadAffinity(); |
125 | | |
126 | 22.3k | sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id; |
127 | 22.3k | m_curJobProvider = m_pool.m_jpTable[0]; |
128 | 22.3k | m_bondMaster = NULL; |
129 | | |
130 | 22.3k | SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit); |
131 | 22.3k | SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); |
132 | 22.3k | m_wakeEvent.wait(); |
133 | | |
134 | 32.3k | while (m_pool.m_isActive) |
135 | 10.0k | { |
136 | 10.0k | if (m_bondMaster) |
137 | 698 | { |
138 | 698 | m_bondMaster->processTasks(m_id); |
139 | 698 | m_bondMaster->m_exitedPeerCount.incr(); |
140 | 698 | m_bondMaster = NULL; |
141 | 698 | } |
142 | | |
143 | 10.0k | do |
144 | 16.6k | { |
145 | | /* do pending work for current job provider */ |
146 | 16.6k | m_curJobProvider->findJob(m_id); |
147 | | |
148 | | /* if the current job provider still wants help, only switch to a |
149 | | * higher priority provider (lower slice type). Else take the first |
150 | | * available job provider with the highest priority */ |
151 | 16.6k | int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType : |
152 | 16.6k | INVALID_SLICE_PRIORITY + 1; |
153 | 16.6k | int nextProvider = -1; |
154 | 115k | for (int i = 0; i < m_pool.m_numProviders; i++) |
155 | 99.2k | { |
156 | 99.2k | if (m_pool.m_jpTable[i]->m_helpWanted && |
157 | 99.2k | m_pool.m_jpTable[i]->m_sliceType < curPriority) |
158 | 0 | { |
159 | 0 | nextProvider = i; |
160 | 0 | curPriority = m_pool.m_jpTable[i]->m_sliceType; |
161 | 0 | } |
162 | 99.2k | } |
163 | 16.6k | if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider]) |
164 | 0 | { |
165 | 0 | SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit); |
166 | 0 | m_curJobProvider = m_pool.m_jpTable[nextProvider]; |
167 | 0 | SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit); |
168 | 0 | } |
169 | 16.6k | } |
170 | 16.6k | while (m_curJobProvider->m_helpWanted); |
171 | | |
172 | | /* While the worker sleeps, a job-provider or bond-group may acquire this |
173 | | * worker's sleep bitmap bit. Once acquired, that thread may modify |
174 | | * m_bondMaster or m_curJobProvider, then waken the thread */ |
175 | 10.0k | SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); |
176 | 10.0k | m_wakeEvent.wait(); |
177 | 10.0k | } |
178 | | |
179 | 22.3k | SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); |
180 | 22.3k | } |
181 | | |
182 | | void JobProvider::tryWakeOne() |
183 | 9.37k | { |
184 | 9.37k | int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS); |
185 | 9.37k | if (id < 0) |
186 | 0 | { |
187 | 0 | m_helpWanted = true; |
188 | 0 | return; |
189 | 0 | } |
190 | | |
191 | 9.37k | WorkerThread& worker = m_pool->m_workers[id]; |
192 | 9.37k | if (worker.m_curJobProvider != this) /* poaching */ |
193 | 0 | { |
194 | 0 | sleepbitmap_t bit = (sleepbitmap_t)1 << id; |
195 | 0 | SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit); |
196 | 0 | worker.m_curJobProvider = this; |
197 | 0 | SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit); |
198 | 0 | } |
199 | 9.37k | worker.awaken(); |
200 | 9.37k | } |
201 | | |
202 | | int ThreadPool::tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap) |
203 | 10.0k | { |
204 | 10.0k | unsigned long id; |
205 | | |
206 | 10.0k | sleepbitmap_t masked = m_sleepBitmap & firstTryBitmap; |
207 | 10.0k | while (masked) |
208 | 10.0k | { |
209 | 10.0k | SLEEPBITMAP_CTZ(id, masked); |
210 | | |
211 | 10.0k | sleepbitmap_t bit = (sleepbitmap_t)1 << id; |
212 | 10.0k | if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit) |
213 | 10.0k | return (int)id; |
214 | | |
215 | 1 | masked = m_sleepBitmap & firstTryBitmap; |
216 | 1 | } |
217 | | |
218 | 18.4E | masked = m_sleepBitmap & secondTryBitmap; |
219 | 18.4E | while (masked) |
220 | 0 | { |
221 | 0 | SLEEPBITMAP_CTZ(id, masked); |
222 | |
|
223 | 0 | sleepbitmap_t bit = (sleepbitmap_t)1 << id; |
224 | 0 | if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit) |
225 | 0 | return (int)id; |
226 | | |
227 | 0 | masked = m_sleepBitmap & secondTryBitmap; |
228 | 0 | } |
229 | | |
230 | 18.4E | return -1; |
231 | 18.4E | } |
232 | | |
233 | | int ThreadPool::tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master) |
234 | 698 | { |
235 | 698 | int bondCount = 0; |
236 | 698 | do |
237 | 698 | { |
238 | 698 | int id = tryAcquireSleepingThread(peerBitmap, 0); |
239 | 698 | if (id < 0) |
240 | 0 | return bondCount; |
241 | | |
242 | 698 | m_workers[id].m_bondMaster = &master; |
243 | 698 | m_workers[id].awaken(); |
244 | 698 | bondCount++; |
245 | 698 | } |
246 | 698 | while (bondCount < maxPeers); |
247 | | |
248 | 698 | return bondCount; |
249 | 698 | } |
250 | | ThreadPool* ThreadPool::allocThreadPools(x265_param* p, int& numPools, bool isThreadsReserved) |
251 | 698 | { |
252 | 698 | enum { MAX_NODE_NUM = 127 }; |
253 | 698 | int cpusPerNode[MAX_NODE_NUM + 1]; |
254 | 698 | int threadsPerPool[MAX_NODE_NUM + 2]; |
255 | 698 | uint64_t nodeMaskPerPool[MAX_NODE_NUM + 2]; |
256 | 698 | int totalNumThreads = 0; |
257 | | |
258 | 698 | memset(cpusPerNode, 0, sizeof(cpusPerNode)); |
259 | 698 | memset(threadsPerPool, 0, sizeof(threadsPerPool)); |
260 | 698 | memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool)); |
261 | | |
262 | 698 | int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM); |
263 | 698 | bool bNumaSupport = false; |
264 | | |
265 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
266 | | bNumaSupport = true; |
267 | | #elif HAVE_LIBNUMA |
268 | | bNumaSupport = numa_available() >= 0; |
269 | | #endif |
270 | | |
271 | | |
272 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
273 | | PGROUP_AFFINITY groupAffinityPointer = new GROUP_AFFINITY; |
274 | | for (int i = 0; i < numNumaNodes; i++) |
275 | | { |
276 | | GetNumaNodeProcessorMaskEx((UCHAR)i, groupAffinityPointer); |
277 | | cpusPerNode[i] = popCount(groupAffinityPointer->Mask); |
278 | | } |
279 | | delete groupAffinityPointer; |
280 | | #elif HAVE_LIBNUMA |
281 | | if (bNumaSupport) |
282 | | { |
283 | | struct bitmask* bitMask = numa_allocate_cpumask(); |
284 | | for (int i = 0; i < numNumaNodes; i++) |
285 | | { |
286 | | int ret = numa_node_to_cpus(i, bitMask); |
287 | | if (!ret) |
288 | | cpusPerNode[i] = numa_bitmask_weight(bitMask); |
289 | | else |
290 | | x265_log(p, X265_LOG_ERROR, "Failed to genrate CPU mask\n"); |
291 | | } |
292 | | numa_free_cpumask(bitMask); |
293 | | } |
294 | | #else // NUMA not supported |
295 | 698 | cpusPerNode[0] = getCpuCount(); |
296 | 698 | #endif |
297 | | |
298 | 698 | if (bNumaSupport && p->logLevel >= X265_LOG_DEBUG) |
299 | 0 | for (int i = 0; i < numNumaNodes; i++) |
300 | 0 | x265_log(p, X265_LOG_DEBUG, "detected NUMA node %d with %d logical cores\n", i, cpusPerNode[i]); |
301 | | /* limit threads based on param->numaPools |
302 | | * For windows because threads can't be allocated to live across sockets |
303 | | * changing the default behavior to be per-socket pools -- FIXME */ |
304 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
305 | | if (!p->numaPools || (strcmp(p->numaPools, "NULL") == 0 || strcmp(p->numaPools, "*") == 0 || strcmp(p->numaPools, "") == 0)) |
306 | | { |
307 | | char poolString[50] = ""; |
308 | | for (int i = 0; i < numNumaNodes; i++) |
309 | | { |
310 | | char nextCount[10] = ""; |
311 | | if (i) |
312 | | sprintf(nextCount, ",%d", cpusPerNode[i]); |
313 | | else |
314 | | sprintf(nextCount, "%d", cpusPerNode[i]); |
315 | | strcat(poolString, nextCount); |
316 | | } |
317 | | x265_param_parse(p, "pools", poolString); |
318 | | } |
319 | | #endif |
320 | 698 | if (p->numaPools && *p->numaPools) |
321 | 0 | { |
322 | 0 | const char *nodeStr = p->numaPools; |
323 | 0 | for (int i = 0; i < numNumaNodes; i++) |
324 | 0 | { |
325 | 0 | if (!*nodeStr) |
326 | 0 | { |
327 | 0 | threadsPerPool[i] = 0; |
328 | 0 | continue; |
329 | 0 | } |
330 | 0 | else if (*nodeStr == '-') |
331 | 0 | threadsPerPool[i] = 0; |
332 | 0 | else if (*nodeStr == '*' || !strcasecmp(nodeStr, "NULL")) |
333 | 0 | { |
334 | 0 | for (int j = i; j < numNumaNodes; j++) |
335 | 0 | { |
336 | 0 | threadsPerPool[numNumaNodes] += cpusPerNode[j]; |
337 | 0 | nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << j); |
338 | 0 | } |
339 | 0 | break; |
340 | 0 | } |
341 | 0 | else if (*nodeStr == '+') |
342 | 0 | { |
343 | 0 | threadsPerPool[numNumaNodes] += cpusPerNode[i]; |
344 | 0 | nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i); |
345 | 0 | } |
346 | 0 | else |
347 | 0 | { |
348 | 0 | int count = atoi(nodeStr); |
349 | 0 | if (i > 0 || strchr(nodeStr, ',')) // it is comma -> old logic |
350 | 0 | { |
351 | 0 | threadsPerPool[i] = X265_MIN(count, cpusPerNode[i]); |
352 | 0 | nodeMaskPerPool[i] = ((uint64_t)1 << i); |
353 | 0 | } |
354 | 0 | else // new logic: exactly 'count' threads on all NUMAs |
355 | 0 | { |
356 | 0 | threadsPerPool[numNumaNodes] = X265_MIN(count, numNumaNodes * MAX_POOL_THREADS); |
357 | 0 | nodeMaskPerPool[numNumaNodes] = ((uint64_t)-1 >> (64 - numNumaNodes)); |
358 | 0 | } |
359 | 0 | } |
360 | | |
361 | | /* consume current node string, comma, and white-space */ |
362 | 0 | while (*nodeStr && *nodeStr != ',') |
363 | 0 | ++nodeStr; |
364 | 0 | if (*nodeStr == ',' || *nodeStr == ' ') |
365 | 0 | ++nodeStr; |
366 | 0 | } |
367 | 0 | } |
368 | 698 | else |
369 | 698 | { |
370 | 1.39k | for (int i = 0; i < numNumaNodes; i++) |
371 | 698 | { |
372 | 698 | threadsPerPool[numNumaNodes] += cpusPerNode[i]; |
373 | 698 | nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i); |
374 | 698 | } |
375 | 698 | } |
376 | | |
377 | | // If the last pool size is > MAX_POOL_THREADS, clip it to spawn thread pools only of size >= 1/2 max (heuristic) |
378 | 698 | if ((threadsPerPool[numNumaNodes] > MAX_POOL_THREADS) && |
379 | 698 | ((threadsPerPool[numNumaNodes] % MAX_POOL_THREADS) < (MAX_POOL_THREADS / 2))) |
380 | 0 | { |
381 | 0 | threadsPerPool[numNumaNodes] -= (threadsPerPool[numNumaNodes] % MAX_POOL_THREADS); |
382 | 0 | x265_log(p, X265_LOG_DEBUG, |
383 | 0 | "Creating only %d worker threads beyond specified numbers with --pools (if specified) to prevent asymmetry in pools; may not use all HW contexts\n", threadsPerPool[numNumaNodes]); |
384 | 0 | } |
385 | | |
386 | 698 | numPools = 0; |
387 | 2.09k | for (int i = 0; i < numNumaNodes + 1; i++) |
388 | 1.39k | { |
389 | 1.39k | if (bNumaSupport) |
390 | 0 | x265_log(p, X265_LOG_DEBUG, "NUMA node %d may use %d logical cores\n", i, cpusPerNode[i]); |
391 | 1.39k | if (threadsPerPool[i]) |
392 | 698 | { |
393 | 698 | numPools += (threadsPerPool[i] + MAX_POOL_THREADS - 1) / MAX_POOL_THREADS; |
394 | 698 | totalNumThreads += threadsPerPool[i]; |
395 | 698 | } |
396 | 1.39k | } |
397 | 698 | if (!isThreadsReserved) |
398 | 698 | { |
399 | 698 | if (!numPools) |
400 | 0 | { |
401 | 0 | x265_log(p, X265_LOG_DEBUG, "No pool thread available. Deciding frame-threads based on detected CPU threads\n"); |
402 | 0 | totalNumThreads = ThreadPool::getCpuCount(); // auto-detect frame threads |
403 | 0 | } |
404 | | |
405 | 698 | if (!p->frameNumThreads) |
406 | 698 | ThreadPool::getFrameThreadsCount(p, totalNumThreads); |
407 | 698 | } |
408 | | |
409 | 698 | if (!numPools) |
410 | 0 | return NULL; |
411 | | |
412 | 698 | if (numPools > p->frameNumThreads) |
413 | 0 | { |
414 | 0 | x265_log(p, X265_LOG_DEBUG, "Reducing number of thread pools for frame thread count\n"); |
415 | 0 | numPools = X265_MAX(p->frameNumThreads / 2, 1); |
416 | 0 | } |
417 | 698 | if (isThreadsReserved) |
418 | 0 | numPools = 1; |
419 | 698 | ThreadPool *pools = new ThreadPool[numPools]; |
420 | 698 | if (pools) |
421 | 698 | { |
422 | 698 | int maxProviders = (p->frameNumThreads + numPools - 1) / numPools + !isThreadsReserved; /* +1 is Lookahead, always assigned to threadpool 0 */ |
423 | 698 | int node = 0; |
424 | 1.39k | for (int i = 0; i < numPools; i++) |
425 | 698 | { |
426 | 1.39k | while (!threadsPerPool[node]) |
427 | 698 | node++; |
428 | 698 | int numThreads = X265_MIN(MAX_POOL_THREADS, threadsPerPool[node]); |
429 | 698 | int origNumThreads = numThreads; |
430 | 698 | if (i == 0 && p->lookaheadThreads > numThreads / 2) |
431 | 0 | { |
432 | 0 | p->lookaheadThreads = numThreads / 2; |
433 | 0 | x265_log(p, X265_LOG_DEBUG, "Setting lookahead threads to a maximum of half the total number of threads\n"); |
434 | 0 | } |
435 | 698 | if (isThreadsReserved) |
436 | 0 | { |
437 | 0 | numThreads = p->lookaheadThreads; |
438 | 0 | maxProviders = 1; |
439 | 0 | } |
440 | | |
441 | 698 | else if (i == 0) |
442 | 698 | numThreads -= p->lookaheadThreads; |
443 | 698 | if (!pools[i].create(numThreads, maxProviders, nodeMaskPerPool[node])) |
444 | 0 | { |
445 | 0 | X265_FREE(pools); |
446 | 0 | numPools = 0; |
447 | 0 | return NULL; |
448 | 0 | } |
449 | 698 | if (numNumaNodes > 1) |
450 | 0 | { |
451 | 0 | char *nodesstr = new char[64 * strlen(",63") + 1]; |
452 | 0 | int len = 0; |
453 | 0 | for (int j = 0; j < 64; j++) |
454 | 0 | if ((nodeMaskPerPool[node] >> j) & 1) |
455 | 0 | len += sprintf(nodesstr + len, ",%d", j); |
456 | 0 | x265_log(p, X265_LOG_INFO, "Thread pool %d using %d threads on numa nodes %s\n", i, numThreads, nodesstr + 1); |
457 | 0 | delete[] nodesstr; |
458 | 0 | } |
459 | 698 | else |
460 | 698 | x265_log(p, X265_LOG_INFO, "Thread pool created using %d threads\n", numThreads); |
461 | 698 | threadsPerPool[node] -= origNumThreads; |
462 | 698 | } |
463 | 698 | } |
464 | 0 | else |
465 | 0 | numPools = 0; |
466 | 698 | return pools; |
467 | 698 | } |
468 | | |
469 | | ThreadPool::ThreadPool() |
470 | 698 | { |
471 | 698 | memset(this, 0, sizeof(*this)); |
472 | 698 | } |
473 | | |
474 | | bool ThreadPool::create(int numThreads, int maxProviders, uint64_t nodeMask) |
475 | 698 | { |
476 | 698 | X265_CHECK(numThreads <= MAX_POOL_THREADS, "a single thread pool cannot have more than MAX_POOL_THREADS threads\n"); |
477 | | |
478 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
479 | | memset(&m_groupAffinity, 0, sizeof(GROUP_AFFINITY)); |
480 | | for (int i = 0; i < getNumaNodeCount(); i++) |
481 | | { |
482 | | int numaNode = ((nodeMask >> i) & 0x1U) ? i : -1; |
483 | | if (numaNode != -1) |
484 | | if (GetNumaNodeProcessorMaskEx((USHORT)numaNode, &m_groupAffinity)) |
485 | | break; |
486 | | } |
487 | | m_numaMask = &m_groupAffinity.Mask; |
488 | | #elif HAVE_LIBNUMA |
489 | | if (numa_available() >= 0) |
490 | | { |
491 | | struct bitmask* nodemask = numa_allocate_nodemask(); |
492 | | if (nodemask) |
493 | | { |
494 | | *(nodemask->maskp) = nodeMask; |
495 | | m_numaMask = nodemask; |
496 | | } |
497 | | else |
498 | | x265_log(NULL, X265_LOG_ERROR, "unable to get NUMA node mask for %lx\n", nodeMask); |
499 | | } |
500 | | #else |
501 | 698 | (void)nodeMask; |
502 | 698 | #endif |
503 | | |
504 | 698 | m_numWorkers = numThreads; |
505 | | |
506 | 698 | m_workers = X265_MALLOC(WorkerThread, numThreads); |
507 | | /* placement new initialization */ |
508 | 698 | if (m_workers) |
509 | 23.0k | for (int i = 0; i < numThreads; i++) |
510 | 22.3k | new (m_workers + i)WorkerThread(*this, i); |
511 | | |
512 | 698 | m_jpTable = X265_MALLOC(JobProvider*, maxProviders); |
513 | 698 | m_numProviders = 0; |
514 | | |
515 | 698 | return m_workers && m_jpTable; |
516 | 698 | } |
517 | | |
518 | | bool ThreadPool::start() |
519 | 698 | { |
520 | 698 | m_isActive = true; |
521 | 23.0k | for (int i = 0; i < m_numWorkers; i++) |
522 | 22.3k | { |
523 | 22.3k | if (!m_workers[i].start()) |
524 | 0 | { |
525 | 0 | m_isActive = false; |
526 | 0 | return false; |
527 | 0 | } |
528 | 22.3k | } |
529 | 698 | return true; |
530 | 698 | } |
531 | | |
532 | | void ThreadPool::stopWorkers() |
533 | 698 | { |
534 | 698 | if (m_workers) |
535 | 698 | { |
536 | 698 | m_isActive = false; |
537 | 23.0k | for (int i = 0; i < m_numWorkers; i++) |
538 | 22.3k | { |
539 | 22.3k | while (!(m_sleepBitmap & ((sleepbitmap_t)1 << i))) |
540 | 0 | GIVE_UP_TIME(); |
541 | 22.3k | m_workers[i].awaken(); |
542 | 22.3k | m_workers[i].stop(); |
543 | 22.3k | } |
544 | 698 | } |
545 | 698 | } |
546 | | |
547 | | ThreadPool::~ThreadPool() |
548 | 698 | { |
549 | 698 | if (m_workers) |
550 | 698 | { |
551 | 23.0k | for (int i = 0; i < m_numWorkers; i++) |
552 | 22.3k | m_workers[i].~WorkerThread(); |
553 | 698 | } |
554 | | |
555 | 698 | X265_FREE(m_workers); |
556 | 698 | X265_FREE(m_jpTable); |
557 | | |
558 | | #if HAVE_LIBNUMA |
559 | | if(m_numaMask) |
560 | | numa_free_nodemask((struct bitmask*)m_numaMask); |
561 | | #endif |
562 | 698 | } |
563 | | |
564 | | void ThreadPool::setCurrentThreadAffinity() |
565 | 25.4k | { |
566 | 25.4k | setThreadNodeAffinity(m_numaMask); |
567 | 25.4k | } |
568 | | |
569 | | void ThreadPool::setThreadNodeAffinity(void *numaMask) |
570 | 25.4k | { |
571 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
572 | | UNREFERENCED_PARAMETER(numaMask); |
573 | | GROUP_AFFINITY groupAffinity; |
574 | | memset(&groupAffinity, 0, sizeof(GROUP_AFFINITY)); |
575 | | groupAffinity.Group = m_groupAffinity.Group; |
576 | | groupAffinity.Mask = m_groupAffinity.Mask; |
577 | | const PGROUP_AFFINITY affinityPointer = &groupAffinity; |
578 | | if (SetThreadGroupAffinity(GetCurrentThread(), affinityPointer, NULL)) |
579 | | return; |
580 | | else |
581 | | x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n"); |
582 | | #elif HAVE_LIBNUMA |
583 | | if (numa_available() >= 0) |
584 | | { |
585 | | numa_run_on_node_mask((struct bitmask*)numaMask); |
586 | | numa_set_interleave_mask((struct bitmask*)numaMask); |
587 | | numa_set_localalloc(); |
588 | | return; |
589 | | } |
590 | | x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n"); |
591 | | #else |
592 | 25.4k | (void)numaMask; |
593 | 25.4k | #endif |
594 | 25.4k | return; |
595 | 25.4k | } |
596 | | |
597 | | /* static */ |
598 | | int ThreadPool::getNumaNodeCount() |
599 | 1.39k | { |
600 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
601 | | ULONG num = 1; |
602 | | if (GetNumaHighestNodeNumber(&num)) |
603 | | num++; |
604 | | return (int)num; |
605 | | #elif HAVE_LIBNUMA |
606 | | if (numa_available() >= 0) |
607 | | return numa_max_node() + 1; |
608 | | else |
609 | | return 1; |
610 | | #else |
611 | 1.39k | return 1; |
612 | 1.39k | #endif |
613 | 1.39k | } |
614 | | |
615 | | /* static */ |
616 | | int ThreadPool::getCpuCount() |
617 | 698 | { |
618 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
619 | | enum { MAX_NODE_NUM = 127 }; |
620 | | int cpus = 0; |
621 | | int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM); |
622 | | GROUP_AFFINITY groupAffinity; |
623 | | for (int i = 0; i < numNumaNodes; i++) |
624 | | { |
625 | | GetNumaNodeProcessorMaskEx((UCHAR)i, &groupAffinity); |
626 | | cpus += popCount(groupAffinity.Mask); |
627 | | } |
628 | | return cpus; |
629 | | #elif _WIN32 |
630 | | SYSTEM_INFO sysinfo; |
631 | | GetSystemInfo(&sysinfo); |
632 | | return sysinfo.dwNumberOfProcessors; |
633 | | #elif __unix__ && X265_ARCH_ARM |
634 | | /* Return the number of processors configured by OS. Because, most embedded linux distributions |
635 | | * uses only one processor as the scheduler doesn't have enough work to utilize all processors */ |
636 | | return sysconf(_SC_NPROCESSORS_CONF); |
637 | | #elif __unix__ |
638 | 698 | return sysconf(_SC_NPROCESSORS_ONLN); |
639 | | #elif MACOS && __MACH__ |
640 | | int nm[2]; |
641 | | size_t len = 4; |
642 | | uint32_t count; |
643 | | |
644 | | nm[0] = CTL_HW; |
645 | | nm[1] = HW_AVAILCPU; |
646 | | sysctl(nm, 2, &count, &len, NULL, 0); |
647 | | |
648 | | if (count < 1) |
649 | | { |
650 | | nm[1] = HW_NCPU; |
651 | | sysctl(nm, 2, &count, &len, NULL, 0); |
652 | | if (count < 1) |
653 | | count = 1; |
654 | | } |
655 | | |
656 | | return count; |
657 | | #else |
658 | | return 2; // default to 2 threads, everywhere else |
659 | | #endif |
660 | 698 | } |
661 | | |
662 | | void ThreadPool::getFrameThreadsCount(x265_param* p, int cpuCount) |
663 | 698 | { |
664 | 698 | int rows = (p->sourceHeight + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize]; |
665 | 698 | if (!p->bEnableWavefront) |
666 | 114 | p->frameNumThreads = X265_MIN3(cpuCount, (rows + 1) / 2, X265_MAX_FRAME_THREADS); |
667 | 584 | else if (cpuCount >= 32) |
668 | 584 | p->frameNumThreads = (p->sourceHeight > 2000) ? 6 : 5; |
669 | 0 | else if (cpuCount >= 16) |
670 | 0 | p->frameNumThreads = 4; |
671 | 0 | else if (cpuCount >= 8) |
672 | 0 | p->frameNumThreads = 3; |
673 | 0 | else if (cpuCount >= 4) |
674 | 0 | p->frameNumThreads = 2; |
675 | 0 | else |
676 | 0 | p->frameNumThreads = 1; |
677 | 698 | } |
678 | | |
679 | | } // end namespace X265_NS |