/src/x265/source/common/threadpool.cpp
Line | Count | Source |
1 | | /***************************************************************************** |
2 | | * Copyright (C) 2013-2020 MulticoreWare, Inc |
3 | | * |
4 | | * Authors: Steve Borho <steve@borho.org> |
5 | | * Min Chen <chenm003@163.com> |
6 | | * |
7 | | * This program is free software; you can redistribute it and/or modify |
8 | | * it under the terms of the GNU General Public License as published by |
9 | | * the Free Software Foundation; either version 2 of the License, or |
10 | | * (at your option) any later version. |
11 | | * |
12 | | * This program is distributed in the hope that it will be useful, |
13 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
14 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
15 | | * GNU General Public License for more details. |
16 | | * |
17 | | * You should have received a copy of the GNU General Public License |
18 | | * along with this program; if not, write to the Free Software |
19 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111, USA. |
20 | | * |
21 | | * This program is also available under a commercial proprietary license. |
22 | | * For more information, contact us at license @ x265.com |
23 | | *****************************************************************************/ |
24 | | |
25 | | #include "common.h" |
26 | | #include "threadpool.h" |
27 | | #include "threading.h" |
28 | | |
29 | | #include <new> |
30 | | #include <vector> |
31 | | |
32 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
33 | | #include <winnt.h> |
34 | | #endif |
35 | | |
36 | | #if X86_64 |
37 | | |
38 | | #ifdef __GNUC__ |
39 | | |
40 | 0 | #define SLEEPBITMAP_BSF(id, x) (id) = ((unsigned long)__builtin_ctzll(x)) |
41 | 0 | #define SLEEPBITMAP_OR(ptr, mask) __sync_fetch_and_or(ptr, mask) |
42 | 0 | #define SLEEPBITMAP_AND(ptr, mask) __sync_fetch_and_and(ptr, mask) |
43 | | |
44 | | #elif defined(_MSC_VER) |
45 | | |
46 | | #define SLEEPBITMAP_BSF(id, x) _BitScanForward64(&id, x) |
47 | | #define SLEEPBITMAP_OR(ptr, mask) InterlockedOr64((volatile LONG64*)ptr, (LONG)mask) |
48 | | #define SLEEPBITMAP_AND(ptr, mask) InterlockedAnd64((volatile LONG64*)ptr, (LONG)mask) |
49 | | |
50 | | #endif // ifdef __GNUC__ |
51 | | |
52 | | #else |
53 | | |
54 | | /* use 32-bit primitives defined in threading.h */ |
55 | | #define SLEEPBITMAP_BSF BSF |
56 | | #define SLEEPBITMAP_OR ATOMIC_OR |
57 | | #define SLEEPBITMAP_AND ATOMIC_AND |
58 | | |
59 | | #endif |
60 | | |
61 | | /* TODO FIX: Macro __MACH__ ideally should be part of MacOS definition, but adding to Cmake |
62 | | behaving is not as expected, need to fix this. */ |
63 | | |
64 | | #if MACOS && __MACH__ |
65 | | #include <sys/param.h> |
66 | | #include <sys/sysctl.h> |
67 | | #endif |
68 | | #if HAVE_LIBNUMA |
69 | | #include <numa.h> |
70 | | #endif |
71 | | #if defined(_MSC_VER) |
72 | | # define strcasecmp _stricmp |
73 | | #endif |
74 | | |
75 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
76 | | const uint64_t m1 = 0x5555555555555555; //binary: 0101... |
77 | | const uint64_t m2 = 0x3333333333333333; //binary: 00110011.. |
78 | | const uint64_t m3 = 0x0f0f0f0f0f0f0f0f; //binary: 4 zeros, 4 ones ... |
79 | | const uint64_t h01 = 0x0101010101010101; //the sum of 256 to the power of 0,1,2,3... |
80 | | |
81 | | static int popCount(uint64_t x) |
82 | | { |
83 | | x -= (x >> 1) & m1; |
84 | | x = (x & m2) + ((x >> 2) & m2); |
85 | | x = (x + (x >> 4)) & m3; |
86 | | return (x * h01) >> 56; |
87 | | } |
88 | | #endif |
89 | | |
90 | | namespace X265_NS { |
91 | | // x265 private namespace |
92 | | |
93 | | class WorkerThread : public Thread |
94 | | { |
95 | | private: |
96 | | |
97 | | ThreadPool& m_pool; |
98 | | int m_id; |
99 | | Event m_wakeEvent; |
100 | | |
101 | | WorkerThread& operator =(const WorkerThread&); |
102 | | |
103 | | public: |
104 | | |
105 | | JobProvider* m_curJobProvider; |
106 | | BondedTaskGroup* m_bondMaster; |
107 | | |
108 | 0 | WorkerThread(ThreadPool& pool, int id) : m_pool(pool), m_id(id) {} |
109 | 0 | virtual ~WorkerThread() {} |
110 | | |
111 | | void threadMain(); |
112 | 0 | void awaken() { m_wakeEvent.trigger(); } |
113 | | }; |
114 | | |
115 | | void WorkerThread::threadMain() |
116 | 0 | { |
117 | 0 | THREAD_NAME("Worker", m_id); |
118 | |
|
119 | | #if _WIN32 |
120 | | SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL); |
121 | | #else |
122 | 0 | __attribute__((unused)) int val = nice(10); |
123 | 0 | #endif |
124 | |
|
125 | 0 | m_pool.setCurrentThreadAffinity(); |
126 | |
|
127 | 0 | sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id; |
128 | 0 | m_curJobProvider = m_pool.m_jpTable[0]; |
129 | 0 | m_bondMaster = NULL; |
130 | |
|
131 | 0 | SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit); |
132 | 0 | SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); |
133 | 0 | m_wakeEvent.wait(); |
134 | |
|
135 | 0 | while (m_pool.m_isActive) |
136 | 0 | { |
137 | 0 | if (m_bondMaster) |
138 | 0 | { |
139 | 0 | m_bondMaster->processTasks(m_id); |
140 | 0 | m_bondMaster->m_exitedPeerCount.incr(); |
141 | 0 | m_bondMaster = NULL; |
142 | 0 | } |
143 | |
|
144 | 0 | do |
145 | 0 | { |
146 | | /* do pending work for current job provider */ |
147 | 0 | m_curJobProvider->findJob(m_id); |
148 | | |
149 | | /* if the current job provider still wants help, only switch to a |
150 | | * higher priority provider (lower slice type). Else take the first |
151 | | * available job provider with the highest priority */ |
152 | 0 | int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType : |
153 | 0 | INVALID_SLICE_PRIORITY + 1; |
154 | 0 | int nextProvider = -1; |
155 | 0 | for (int i = 0; i < m_pool.m_numProviders; i++) |
156 | 0 | { |
157 | 0 | if (m_pool.m_jpTable[i]->m_helpWanted && |
158 | 0 | m_pool.m_jpTable[i]->m_sliceType < curPriority) |
159 | 0 | { |
160 | 0 | nextProvider = i; |
161 | 0 | curPriority = m_pool.m_jpTable[i]->m_sliceType; |
162 | 0 | } |
163 | 0 | } |
164 | 0 | if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider]) |
165 | 0 | { |
166 | 0 | SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit); |
167 | 0 | m_curJobProvider = m_pool.m_jpTable[nextProvider]; |
168 | 0 | SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit); |
169 | 0 | } |
170 | 0 | } |
171 | 0 | while (m_curJobProvider->m_helpWanted); |
172 | | |
173 | | /* While the worker sleeps, a job-provider or bond-group may acquire this |
174 | | * worker's sleep bitmap bit. Once acquired, that thread may modify |
175 | | * m_bondMaster or m_curJobProvider, then waken the thread */ |
176 | 0 | SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); |
177 | 0 | m_wakeEvent.wait(); |
178 | 0 | } |
179 | |
|
180 | 0 | SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); |
181 | 0 | } |
182 | | |
183 | | void JobProvider::tryWakeOne() |
184 | 0 | { |
185 | 0 | int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS); |
186 | 0 | if (id < 0) |
187 | 0 | { |
188 | 0 | m_helpWanted = true; |
189 | 0 | return; |
190 | 0 | } |
191 | | |
192 | 0 | WorkerThread& worker = m_pool->m_workers[id]; |
193 | 0 | if (worker.m_curJobProvider != this) /* poaching */ |
194 | 0 | { |
195 | 0 | sleepbitmap_t bit = (sleepbitmap_t)1 << id; |
196 | 0 | SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit); |
197 | 0 | worker.m_curJobProvider = this; |
198 | 0 | SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit); |
199 | 0 | } |
200 | 0 | worker.awaken(); |
201 | 0 | } |
202 | | |
203 | | int ThreadPool::tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap) |
204 | 0 | { |
205 | 0 | unsigned long id; |
206 | |
|
207 | 0 | sleepbitmap_t masked = m_sleepBitmap & firstTryBitmap; |
208 | 0 | while (masked) |
209 | 0 | { |
210 | 0 | SLEEPBITMAP_BSF(id, masked); |
211 | |
|
212 | 0 | sleepbitmap_t bit = (sleepbitmap_t)1 << id; |
213 | 0 | if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit) |
214 | 0 | return (int)id; |
215 | | |
216 | 0 | masked = m_sleepBitmap & firstTryBitmap; |
217 | 0 | } |
218 | | |
219 | 0 | masked = m_sleepBitmap & secondTryBitmap; |
220 | 0 | while (masked) |
221 | 0 | { |
222 | 0 | SLEEPBITMAP_BSF(id, masked); |
223 | |
|
224 | 0 | sleepbitmap_t bit = (sleepbitmap_t)1 << id; |
225 | 0 | if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit) |
226 | 0 | return (int)id; |
227 | | |
228 | 0 | masked = m_sleepBitmap & secondTryBitmap; |
229 | 0 | } |
230 | | |
231 | 0 | return -1; |
232 | 0 | } |
233 | | |
234 | | int ThreadPool::tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master) |
235 | 0 | { |
236 | 0 | int bondCount = 0; |
237 | 0 | do |
238 | 0 | { |
239 | 0 | int id = tryAcquireSleepingThread(peerBitmap, 0); |
240 | 0 | if (id < 0) |
241 | 0 | return bondCount; |
242 | | |
243 | 0 | m_workers[id].m_bondMaster = &master; |
244 | 0 | m_workers[id].awaken(); |
245 | 0 | bondCount++; |
246 | 0 | } |
247 | 0 | while (bondCount < maxPeers); |
248 | | |
249 | 0 | return bondCount; |
250 | 0 | } |
251 | | |
252 | | /* Distributes totalNumThreads between ThreadedME and FrameEncoder pools. |
253 | | * Modifies threadsPerPool[], nodeMaskPerPool[], numNumaNodes, and numPools in-place. |
254 | | * Returns the number of threads reserved for frame encoding. */ |
255 | | static void distributeThreadsForTme( |
256 | | x265_param* p, |
257 | | int totalNumThreads, |
258 | | int& numNumaNodes, |
259 | | bool bNumaSupport, |
260 | | int* threadsPerPool, |
261 | | uint64_t* nodeMaskPerPool, |
262 | | int& numPools, |
263 | | int& threadsFrameEnc) |
264 | 0 | { |
265 | 0 | if (totalNumThreads < MIN_TME_THREADS) |
266 | 0 | { |
267 | 0 | x265_log(p, X265_LOG_WARNING, "Low thread count detected, disabling --threaded-me." |
268 | 0 | " Minimum recommended is 32 cores / threads\n"); |
269 | 0 | p->bThreadedME = 0; |
270 | 0 | return; |
271 | 0 | } |
272 | | |
273 | 0 | int targetTME = ThreadPool::configureTmeThreadCount(p, totalNumThreads); |
274 | 0 | targetTME = (targetTME < 1) ? 1 : targetTME; |
275 | |
|
276 | 0 | threadsFrameEnc = totalNumThreads - targetTME; |
277 | 0 | int defaultNumFT = ThreadPool::getFrameThreadsCount(p, totalNumThreads); |
278 | 0 | if (threadsFrameEnc < defaultNumFT) |
279 | 0 | { |
280 | 0 | threadsFrameEnc = defaultNumFT; |
281 | 0 | targetTME = totalNumThreads - threadsFrameEnc; |
282 | 0 | } |
283 | |
|
284 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA |
285 | | if (bNumaSupport && numNumaNodes > 1) |
286 | | { |
287 | | int tmeNumaNodes = 0; |
288 | | int leftover = 0; |
289 | | |
290 | | // First thread pool belongs to ThreadedME |
291 | | std::vector<int> threads(1, 0); |
292 | | std::vector<uint64_t> nodeMasks(1, 0); |
293 | | int poolIndex = 0; |
294 | | |
295 | | /* Greedily assign whole NUMA nodes to TME until reaching or exceeding the target */ |
296 | | for (int i = 0; i < numNumaNodes + 1; i++) |
297 | | { |
298 | | if (!threadsPerPool[i] && !nodeMaskPerPool[i]) |
299 | | continue; |
300 | | |
301 | | int toTake = X265_MIN(threadsPerPool[i], targetTME - threads[0]); |
302 | | if (toTake > 0) |
303 | | { |
304 | | threads[poolIndex] += toTake; |
305 | | nodeMasks[poolIndex] |= nodeMaskPerPool[i]; |
306 | | tmeNumaNodes++; |
307 | | |
308 | | if (threads[0] == targetTME) |
309 | | poolIndex++; |
310 | | |
311 | | if (toTake < threadsPerPool[i]) |
312 | | leftover = threadsPerPool[i] - toTake; |
313 | | } |
314 | | else |
315 | | { |
316 | | threads.push_back(threadsPerPool[i]); |
317 | | nodeMasks.push_back(nodeMaskPerPool[i]); |
318 | | poolIndex++; |
319 | | } |
320 | | } |
321 | | |
322 | | // Distribute leftover threads among FrameEncoders |
323 | | if (leftover) |
324 | | { |
325 | | // Case 1: There are 1 or more threadpools for FrameEncoder(s) by now |
326 | | if (threads.size() > 1) |
327 | | { |
328 | | int split = static_cast<int>(static_cast<double>(leftover) / (numNumaNodes - 1)); |
329 | | for (int pool = 1; pool < numNumaNodes; pool++) |
330 | | { |
331 | | int give = X265_MIN(split, leftover); |
332 | | threads[pool] += give; |
333 | | leftover -= give; |
334 | | } |
335 | | } |
336 | | |
337 | | // Case 2: FrameEncoder(s) haven't received threads yet |
338 | | if (threads.size() == 1) |
339 | | { |
340 | | threads.push_back(leftover); |
341 | | // Give the same node mask as the last node of ThreadedME |
342 | | uint64_t msb = 1; |
343 | | uint64_t tmeNodeMask = nodeMasks[0]; |
344 | | while (tmeNodeMask > 1) |
345 | | { |
346 | | tmeNodeMask >>= 1; |
347 | | msb <<= 1; |
348 | | } |
349 | | nodeMasks.push_back(msb); |
350 | | } |
351 | | } |
352 | | |
353 | | // Apply calculated threadpool assignment |
354 | | // TODO: Make sure this doesn't cause a problem later on |
355 | | memset(threadsPerPool, 0, sizeof(int) * (numNumaNodes + 2)); |
356 | | memset(nodeMaskPerPool, 0, sizeof(uint64_t) * (numNumaNodes + 2)); |
357 | | |
358 | | numPools = numNumaNodes = static_cast<int>(threads.size()); |
359 | | for (int pool = 0; pool < numPools; pool++) |
360 | | { |
361 | | threadsPerPool[pool] = threads[pool]; |
362 | | nodeMaskPerPool[pool] = nodeMasks[pool]; |
363 | | } |
364 | | } |
365 | | else |
366 | | #endif |
367 | 0 | { |
368 | 0 | memset(threadsPerPool, 0, sizeof(int) * (numNumaNodes + 2)); |
369 | 0 | memset(nodeMaskPerPool, 0, sizeof(uint64_t) * (numNumaNodes + 2)); |
370 | |
|
371 | 0 | threadsPerPool[0] = targetTME; |
372 | 0 | nodeMaskPerPool[0] = 1; |
373 | |
|
374 | 0 | threadsPerPool[1] = threadsFrameEnc; |
375 | 0 | nodeMaskPerPool[1] = 1; |
376 | |
|
377 | 0 | numPools = 2; |
378 | 0 | } |
379 | 0 | } |
380 | | |
381 | | ThreadPool* ThreadPool::allocThreadPools(x265_param* p, int& numPools, bool isThreadsReserved) |
382 | 0 | { |
383 | 0 | enum { MAX_NODE_NUM = 127 }; |
384 | 0 | int cpusPerNode[MAX_NODE_NUM + 1]; |
385 | 0 | int threadsPerPool[MAX_NODE_NUM + 2]; |
386 | 0 | uint64_t nodeMaskPerPool[MAX_NODE_NUM + 2]; |
387 | 0 | int totalNumThreads = 0; |
388 | |
|
389 | 0 | memset(cpusPerNode, 0, sizeof(cpusPerNode)); |
390 | 0 | memset(threadsPerPool, 0, sizeof(threadsPerPool)); |
391 | 0 | memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool)); |
392 | |
|
393 | 0 | int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM); |
394 | 0 | bool bNumaSupport = false; |
395 | |
|
396 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
397 | | bNumaSupport = true; |
398 | | #elif HAVE_LIBNUMA |
399 | | bNumaSupport = numa_available() >= 0; |
400 | | #endif |
401 | | |
402 | |
|
403 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
404 | | PGROUP_AFFINITY groupAffinityPointer = new GROUP_AFFINITY; |
405 | | for (int i = 0; i < numNumaNodes; i++) |
406 | | { |
407 | | GetNumaNodeProcessorMaskEx((UCHAR)i, groupAffinityPointer); |
408 | | cpusPerNode[i] = popCount(groupAffinityPointer->Mask); |
409 | | } |
410 | | delete groupAffinityPointer; |
411 | | #elif HAVE_LIBNUMA |
412 | | if (bNumaSupport) |
413 | | { |
414 | | struct bitmask* bitMask = numa_allocate_cpumask(); |
415 | | for (int i = 0; i < numNumaNodes; i++) |
416 | | { |
417 | | int ret = numa_node_to_cpus(i, bitMask); |
418 | | if (!ret) |
419 | | cpusPerNode[i] = numa_bitmask_weight(bitMask); |
420 | | else |
421 | | x265_log(p, X265_LOG_ERROR, "Failed to genrate CPU mask\n"); |
422 | | } |
423 | | numa_free_cpumask(bitMask); |
424 | | } |
425 | | #else // NUMA not supported |
426 | 0 | cpusPerNode[0] = getCpuCount(); |
427 | 0 | #endif |
428 | |
|
429 | 0 | if (bNumaSupport && p->logLevel >= X265_LOG_DEBUG) |
430 | 0 | for (int i = 0; i < numNumaNodes; i++) |
431 | 0 | x265_log(p, X265_LOG_DEBUG, "detected NUMA node %d with %d logical cores\n", i, cpusPerNode[i]); |
432 | | /* limit threads based on param->numaPools |
433 | | * For windows because threads can't be allocated to live across sockets |
434 | | * changing the default behavior to be per-socket pools -- FIXME */ |
435 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA |
436 | | if (!strlen(p->numaPools) || (strcmp(p->numaPools, "NULL") == 0 || strcmp(p->numaPools, "*") == 0 || strcmp(p->numaPools, "") == 0)) |
437 | | { |
438 | | char poolString[50] = ""; |
439 | | for (int i = 0; i < numNumaNodes; i++) |
440 | | { |
441 | | char nextCount[10] = ""; |
442 | | if (i) |
443 | | snprintf(nextCount, sizeof(nextCount), ",%d", cpusPerNode[i]); |
444 | | else |
445 | | snprintf(nextCount, sizeof(nextCount), "%d", cpusPerNode[i]); |
446 | | strcat(poolString, nextCount); |
447 | | } |
448 | | x265_param_parse(p, "pools", poolString); |
449 | | } |
450 | | #endif |
451 | 0 | if (strlen(p->numaPools)) |
452 | 0 | { |
453 | 0 | const char *nodeStr = p->numaPools; |
454 | 0 | for (int i = 0; i < numNumaNodes; i++) |
455 | 0 | { |
456 | 0 | if (!*nodeStr) |
457 | 0 | { |
458 | 0 | threadsPerPool[i] = 0; |
459 | 0 | continue; |
460 | 0 | } |
461 | 0 | else if (*nodeStr == '-') |
462 | 0 | threadsPerPool[i] = 0; |
463 | 0 | else if (*nodeStr == '*' || !strcasecmp(nodeStr, "NULL")) |
464 | 0 | { |
465 | 0 | for (int j = i; j < numNumaNodes; j++) |
466 | 0 | { |
467 | 0 | threadsPerPool[numNumaNodes] += cpusPerNode[j]; |
468 | 0 | nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << j); |
469 | 0 | } |
470 | 0 | break; |
471 | 0 | } |
472 | 0 | else if (*nodeStr == '+') |
473 | 0 | { |
474 | 0 | threadsPerPool[numNumaNodes] += cpusPerNode[i]; |
475 | 0 | nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i); |
476 | 0 | } |
477 | 0 | else |
478 | 0 | { |
479 | 0 | int count = atoi(nodeStr); |
480 | 0 | if (i > 0 || strchr(nodeStr, ',')) // it is comma -> old logic |
481 | 0 | { |
482 | 0 | threadsPerPool[i] = X265_MIN(count, cpusPerNode[i]); |
483 | 0 | nodeMaskPerPool[i] = ((uint64_t)1 << i); |
484 | 0 | } |
485 | 0 | else // new logic: exactly 'count' threads on all NUMAs |
486 | 0 | { |
487 | 0 | threadsPerPool[numNumaNodes] = X265_MIN(count, numNumaNodes * MAX_POOL_THREADS); |
488 | 0 | nodeMaskPerPool[numNumaNodes] = ((uint64_t)-1 >> (64 - numNumaNodes)); |
489 | 0 | } |
490 | 0 | } |
491 | | |
492 | | /* consume current node string, comma, and white-space */ |
493 | 0 | while (*nodeStr && *nodeStr != ',') |
494 | 0 | ++nodeStr; |
495 | 0 | if (*nodeStr == ',' || *nodeStr == ' ') |
496 | 0 | ++nodeStr; |
497 | 0 | } |
498 | 0 | } |
499 | 0 | else |
500 | 0 | { |
501 | 0 | for (int i = 0; i < numNumaNodes; i++) |
502 | 0 | { |
503 | 0 | threadsPerPool[numNumaNodes] += cpusPerNode[i]; |
504 | 0 | nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i); |
505 | 0 | } |
506 | 0 | } |
507 | | |
508 | | /* If ThreadedME is enabled, split resources: give half NUMA nodes (or |
509 | | * half cores when NUMA not available) to threaded-me and the other half |
510 | | * to the remaining job providers. */ |
511 | | /* Compute total CPU count from detected per-node counts when available */ |
512 | 0 | for (int i = 0; i < numNumaNodes + 1; i++) |
513 | 0 | totalNumThreads += threadsPerPool[i]; |
514 | 0 | if (!totalNumThreads) |
515 | 0 | totalNumThreads = ThreadPool::getCpuCount(); |
516 | |
|
517 | 0 | int threadsFrameEnc = totalNumThreads; |
518 | 0 | if (p->bThreadedME) |
519 | 0 | { |
520 | 0 | distributeThreadsForTme(p, totalNumThreads, numNumaNodes, bNumaSupport, threadsPerPool, |
521 | 0 | nodeMaskPerPool, numPools, threadsFrameEnc); |
522 | 0 | } |
523 | | |
524 | | // If the last pool size is > MAX_POOL_THREADS, clip it to spawn thread pools only of size >= 1/2 max (heuristic) |
525 | 0 | if ((threadsPerPool[numNumaNodes] > MAX_POOL_THREADS) && |
526 | 0 | ((threadsPerPool[numNumaNodes] % MAX_POOL_THREADS) < (MAX_POOL_THREADS / 2))) |
527 | 0 | { |
528 | 0 | threadsPerPool[numNumaNodes] -= (threadsPerPool[numNumaNodes] % MAX_POOL_THREADS); |
529 | 0 | x265_log(p, X265_LOG_DEBUG, |
530 | 0 | "Creating only %d worker threads beyond specified numbers with --pools (if specified) to prevent asymmetry in pools; may not use all HW contexts\n", threadsPerPool[numNumaNodes]); |
531 | 0 | } |
532 | |
|
533 | 0 | if (!p->bThreadedME) |
534 | 0 | { |
535 | 0 | numPools = 0; |
536 | 0 | for (int i = 0; i < numNumaNodes + 1; i++) |
537 | 0 | { |
538 | 0 | if (bNumaSupport) |
539 | 0 | x265_log(p, X265_LOG_DEBUG, "NUMA node %d may use %d logical cores\n", i, cpusPerNode[i]); |
540 | | |
541 | 0 | if (threadsPerPool[i]) |
542 | 0 | { |
543 | 0 | numPools += (threadsPerPool[i] + MAX_POOL_THREADS - 1) / MAX_POOL_THREADS; |
544 | 0 | totalNumThreads += threadsPerPool[i]; |
545 | 0 | } |
546 | 0 | } |
547 | 0 | } |
548 | | |
549 | 0 | if (!isThreadsReserved) |
550 | 0 | { |
551 | 0 | if (!numPools) |
552 | 0 | { |
553 | 0 | x265_log(p, X265_LOG_DEBUG, "No pool thread available. Deciding frame-threads based on detected CPU threads\n"); |
554 | 0 | totalNumThreads = ThreadPool::getCpuCount(); // auto-detect frame threads |
555 | 0 | } |
556 | |
|
557 | 0 | if (!p->frameNumThreads) |
558 | 0 | p->frameNumThreads = ThreadPool::getFrameThreadsCount(p, threadsFrameEnc); |
559 | 0 | } |
560 | | |
561 | 0 | if (!numPools) |
562 | 0 | return NULL; |
563 | | |
564 | 0 | if (numPools > p->frameNumThreads && !p->bThreadedME) |
565 | 0 | { |
566 | 0 | x265_log(p, X265_LOG_DEBUG, "Reducing number of thread pools for frame thread count\n"); |
567 | 0 | numPools = X265_MAX(p->frameNumThreads / 2, 1); |
568 | 0 | } |
569 | 0 | if (isThreadsReserved) |
570 | 0 | numPools = 1; |
571 | 0 | ThreadPool *pools = new ThreadPool[numPools]; |
572 | 0 | if (pools) |
573 | 0 | { |
574 | 0 | int poolCount = (p->bThreadedME) ? numPools - 1 : numPools; |
575 | 0 | int node = 0; |
576 | 0 | for (int i = 0; i < numPools; i++) |
577 | 0 | { |
578 | 0 | int maxProviders = (p->bThreadedME && i == 0) // threadpool 0 is dedicated to ThreadedME |
579 | 0 | ? 1 |
580 | 0 | : (p->frameNumThreads + poolCount - 1) / poolCount + !isThreadsReserved; // +1 is Lookahead, always assigned to threadpool 0 |
581 | | |
582 | 0 | while (!threadsPerPool[node]) |
583 | 0 | node++; |
584 | 0 | int numThreads = threadsPerPool[node]; |
585 | 0 | int origNumThreads = numThreads; |
586 | |
|
587 | 0 | if (i == 0 && p->lookaheadThreads > numThreads / 2) |
588 | 0 | { |
589 | 0 | p->lookaheadThreads = numThreads / 2; |
590 | 0 | x265_log(p, X265_LOG_DEBUG, "Setting lookahead threads to a maximum of half the total number of threads\n"); |
591 | 0 | } |
592 | |
|
593 | 0 | if (isThreadsReserved) |
594 | 0 | { |
595 | 0 | numThreads = p->lookaheadThreads; |
596 | 0 | maxProviders = 1; |
597 | 0 | } |
598 | 0 | else if (i == 0) |
599 | 0 | numThreads -= p->lookaheadThreads; |
600 | | |
601 | 0 | if (!pools[i].create(numThreads, maxProviders, nodeMaskPerPool[node])) |
602 | 0 | { |
603 | 0 | delete[] pools; |
604 | 0 | numPools = 0; |
605 | 0 | return NULL; |
606 | 0 | } |
607 | 0 | if (numNumaNodes > 1) |
608 | 0 | { |
609 | 0 | char *nodesstr = new char[64 * strlen(",63") + 1]; |
610 | 0 | int len = 0; |
611 | 0 | for (int j = 0; j < 64; j++) |
612 | 0 | if ((nodeMaskPerPool[node] >> j) & 1) |
613 | 0 | len += snprintf(nodesstr + len, sizeof(nodesstr) - len, ",%d", j); |
614 | 0 | x265_log(p, X265_LOG_INFO, "Thread pool %d using %d threads on numa nodes %s\n", i, numThreads, nodesstr + 1); |
615 | 0 | delete[] nodesstr; |
616 | 0 | } |
617 | 0 | else |
618 | 0 | x265_log(p, X265_LOG_INFO, "Thread pool created using %d threads\n", numThreads); |
619 | 0 | threadsPerPool[node] -= origNumThreads; |
620 | 0 | } |
621 | 0 | } |
622 | 0 | else |
623 | 0 | numPools = 0; |
624 | 0 | return pools; |
625 | 0 | } |
626 | | |
627 | | ThreadPool::ThreadPool() |
628 | 0 | { |
629 | 0 | memset(this, 0, sizeof(*this)); |
630 | 0 | } |
631 | | |
632 | | bool ThreadPool::create(int numThreads, int maxProviders, uint64_t nodeMask) |
633 | 0 | { |
634 | 0 | X265_CHECK(numThreads <= MAX_POOL_THREADS, "a single thread pool cannot have more than MAX_POOL_THREADS threads\n"); |
635 | |
|
636 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
637 | | memset(&m_groupAffinity, 0, sizeof(GROUP_AFFINITY)); |
638 | | for (int i = 0; i < getNumaNodeCount(); i++) |
639 | | { |
640 | | int numaNode = ((nodeMask >> i) & 0x1U) ? i : -1; |
641 | | if (numaNode != -1) |
642 | | if (GetNumaNodeProcessorMaskEx((USHORT)numaNode, &m_groupAffinity)) |
643 | | break; |
644 | | } |
645 | | m_numaMask = &m_groupAffinity.Mask; |
646 | | #elif HAVE_LIBNUMA |
647 | | if (numa_available() >= 0) |
648 | | { |
649 | | struct bitmask* nodemask = numa_allocate_nodemask(); |
650 | | if (nodemask) |
651 | | { |
652 | | *(nodemask->maskp) = nodeMask; |
653 | | m_numaMask = nodemask; |
654 | | } |
655 | | else |
656 | | x265_log(NULL, X265_LOG_ERROR, "unable to get NUMA node mask for %lx\n", nodeMask); |
657 | | } |
658 | | #else |
659 | 0 | (void)nodeMask; |
660 | 0 | #endif |
661 | |
|
662 | 0 | m_numWorkers = numThreads; |
663 | |
|
664 | 0 | m_workers = X265_MALLOC(WorkerThread, numThreads); |
665 | | /* placement new initialization */ |
666 | 0 | if (m_workers) |
667 | 0 | for (int i = 0; i < numThreads; i++) |
668 | 0 | new (m_workers + i)WorkerThread(*this, i); |
669 | |
|
670 | 0 | m_jpTable = X265_MALLOC(JobProvider*, maxProviders); |
671 | 0 | if (m_jpTable) |
672 | 0 | memset(m_jpTable, 0, sizeof(JobProvider*) * maxProviders); |
673 | 0 | m_numProviders = 0; |
674 | |
|
675 | 0 | return m_workers && m_jpTable; |
676 | 0 | } |
677 | | |
678 | | bool ThreadPool::start() |
679 | 0 | { |
680 | 0 | m_isActive = true; |
681 | 0 | for (int i = 0; i < m_numWorkers; i++) |
682 | 0 | { |
683 | 0 | if (!m_workers[i].start()) |
684 | 0 | { |
685 | 0 | m_isActive = false; |
686 | 0 | return false; |
687 | 0 | } |
688 | 0 | } |
689 | 0 | return true; |
690 | 0 | } |
691 | | |
692 | | void ThreadPool::stopWorkers() |
693 | 0 | { |
694 | 0 | if (m_workers) |
695 | 0 | { |
696 | 0 | m_isActive = false; |
697 | 0 | for (int i = 0; i < m_numWorkers; i++) |
698 | 0 | { |
699 | 0 | while (!(m_sleepBitmap & ((sleepbitmap_t)1 << i))) |
700 | 0 | GIVE_UP_TIME(); |
701 | 0 | m_workers[i].awaken(); |
702 | 0 | m_workers[i].stop(); |
703 | 0 | } |
704 | 0 | } |
705 | 0 | } |
706 | | |
707 | | ThreadPool::~ThreadPool() |
708 | 0 | { |
709 | 0 | if (m_workers) |
710 | 0 | { |
711 | 0 | for (int i = 0; i < m_numWorkers; i++) |
712 | 0 | m_workers[i].~WorkerThread(); |
713 | 0 | } |
714 | |
|
715 | 0 | X265_FREE(m_workers); |
716 | 0 | X265_FREE(m_jpTable); |
717 | |
|
718 | | #if HAVE_LIBNUMA |
719 | | if(m_numaMask) |
720 | | numa_free_nodemask((struct bitmask*)m_numaMask); |
721 | | #endif |
722 | 0 | } |
723 | | |
724 | | void ThreadPool::setCurrentThreadAffinity() |
725 | 0 | { |
726 | 0 | setThreadNodeAffinity(m_numaMask); |
727 | 0 | } |
728 | | |
729 | | void ThreadPool::setThreadNodeAffinity(void *numaMask) |
730 | 0 | { |
731 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
732 | | UNREFERENCED_PARAMETER(numaMask); |
733 | | GROUP_AFFINITY groupAffinity; |
734 | | memset(&groupAffinity, 0, sizeof(GROUP_AFFINITY)); |
735 | | groupAffinity.Group = m_groupAffinity.Group; |
736 | | groupAffinity.Mask = m_groupAffinity.Mask; |
737 | | const PGROUP_AFFINITY affinityPointer = &groupAffinity; |
738 | | if (SetThreadGroupAffinity(GetCurrentThread(), affinityPointer, NULL)) |
739 | | return; |
740 | | else |
741 | | x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n"); |
742 | | #elif HAVE_LIBNUMA |
743 | | if (numa_available() >= 0) |
744 | | { |
745 | | numa_run_on_node_mask((struct bitmask*)numaMask); |
746 | | numa_set_interleave_mask((struct bitmask*)numaMask); |
747 | | numa_set_localalloc(); |
748 | | return; |
749 | | } |
750 | | x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n"); |
751 | | #else |
752 | 0 | (void)numaMask; |
753 | 0 | #endif |
754 | 0 | return; |
755 | 0 | } |
756 | | |
757 | | /* static */ |
758 | | int ThreadPool::getNumaNodeCount() |
759 | 0 | { |
760 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
761 | | ULONG num = 1; |
762 | | if (GetNumaHighestNodeNumber(&num)) |
763 | | num++; |
764 | | return (int)num; |
765 | | #elif HAVE_LIBNUMA |
766 | | if (numa_available() >= 0) |
767 | | return numa_max_node() + 1; |
768 | | else |
769 | | return 1; |
770 | | #else |
771 | 0 | return 1; |
772 | 0 | #endif |
773 | 0 | } |
774 | | |
775 | | /* static */ |
776 | | int ThreadPool::getCpuCount() |
777 | 0 | { |
778 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
779 | | enum { MAX_NODE_NUM = 127 }; |
780 | | int cpus = 0; |
781 | | int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM); |
782 | | GROUP_AFFINITY groupAffinity; |
783 | | for (int i = 0; i < numNumaNodes; i++) |
784 | | { |
785 | | GetNumaNodeProcessorMaskEx((UCHAR)i, &groupAffinity); |
786 | | cpus += popCount(groupAffinity.Mask); |
787 | | } |
788 | | return cpus; |
789 | | #elif _WIN32 |
790 | | SYSTEM_INFO sysinfo; |
791 | | GetSystemInfo(&sysinfo); |
792 | | return sysinfo.dwNumberOfProcessors; |
793 | | #elif __unix__ && X265_ARCH_ARM |
794 | | /* Return the number of processors configured by OS. Because, most embedded linux distributions |
795 | | * uses only one processor as the scheduler doesn't have enough work to utilize all processors */ |
796 | | return sysconf(_SC_NPROCESSORS_CONF); |
797 | | #elif __unix__ |
798 | 0 | return sysconf(_SC_NPROCESSORS_ONLN); |
799 | | #elif MACOS && __MACH__ |
800 | | int nm[2]; |
801 | | size_t len = 4; |
802 | | uint32_t count; |
803 | | |
804 | | nm[0] = CTL_HW; |
805 | | nm[1] = HW_AVAILCPU; |
806 | | sysctl(nm, 2, &count, &len, NULL, 0); |
807 | | |
808 | | if (count < 1) |
809 | | { |
810 | | nm[1] = HW_NCPU; |
811 | | sysctl(nm, 2, &count, &len, NULL, 0); |
812 | | if (count < 1) |
813 | | count = 1; |
814 | | } |
815 | | |
816 | | return count; |
817 | | #else |
818 | | return 2; // default to 2 threads, everywhere else |
819 | | #endif |
820 | 0 | } |
821 | | |
822 | | int ThreadPool::getFrameThreadsCount(x265_param* p, int cpuCount) |
823 | 0 | { |
824 | 0 | int rows = (p->sourceHeight + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize]; |
825 | 0 | if (!p->bEnableWavefront) |
826 | 0 | return X265_MIN3(cpuCount, (rows + 1) / 2, X265_MAX_FRAME_THREADS); |
827 | 0 | else if (cpuCount >= 32) |
828 | 0 | return (p->sourceHeight > 2000) ? 6 : 5; |
829 | 0 | else if (cpuCount >= 16) |
830 | 0 | return 4; |
831 | 0 | else if (cpuCount >= 8) |
832 | | #if _WIN32 && X265_ARCH_ARM64 |
833 | | return cpuCount; |
834 | | #else |
835 | 0 | return 3; |
836 | 0 | #endif |
837 | 0 | else if (cpuCount >= 4) |
838 | 0 | return 2; |
839 | 0 | else |
840 | 0 | return 1; |
841 | 0 | } |
842 | | |
843 | | int ThreadPool::configureTmeThreadCount(x265_param* param, int cpuCount) |
844 | 0 | { |
845 | 0 | enum TmeResClass |
846 | 0 | { |
847 | 0 | TME_RES_LOW = 0, |
848 | 0 | TME_RES_MID, |
849 | 0 | TME_RES_HIGH, |
850 | 0 | TME_RES_COUNT |
851 | 0 | }; |
852 | |
|
853 | 0 | enum TmeRule |
854 | 0 | { |
855 | 0 | TME_RULE_SLOWER_VERYSLOW = 0, |
856 | 0 | TME_RULE_SLOW, |
857 | 0 | TME_RULE_FAST_MEDIUM, |
858 | 0 | TME_RULE_FASTER, |
859 | 0 | TME_RULE_VERYFAST, |
860 | 0 | TME_RULE_SUPERFAST, |
861 | 0 | TME_RULE_ULTRAFAST, |
862 | 0 | TME_RULE_COUNT |
863 | 0 | }; |
864 | |
|
865 | 0 | struct TmeRuleConfig |
866 | 0 | { |
867 | 0 | int taskBlockSize[TME_RES_COUNT]; |
868 | 0 | int numBufferRows[TME_RES_COUNT]; |
869 | 0 | int threadPercent[TME_RES_COUNT]; |
870 | 0 | bool widthBasedTaskBlockSize; |
871 | 0 | }; |
872 | |
|
873 | 0 | static const TmeRuleConfig s_tmeRuleConfig[TME_RULE_COUNT] = |
874 | 0 | { |
875 | 0 | { { 1, 1, 1 }, { 3, 3, 3 }, { 90, 85, 70 }, false }, // slower / veryslow preset and similar options |
876 | 0 | { { 1, 1, 1 }, { 5, 5, 3 }, { 90, 85, 75 }, false }, // slow preset and similar options |
877 | 0 | { { 1, 1, 1 }, { 10, 10, 10 }, { 90, 80, 70 }, false }, // fast / medium preset and similar options |
878 | 0 | { { 1, 1, 1 }, { 10, 15, 10 }, { 90, 80, 70 }, false }, // faster preset and similar options |
879 | 0 | { { 1, 1, 1 }, { 10, 15, 20 }, { 90, 80, 70 }, false }, // veryfast preset and similar options |
880 | 0 | { { 2, 4, 4 }, { 10, 15, 20 }, { 90, 80, 60 }, false }, // superfast preset and similar options |
881 | 0 | { { 0, 0, 0 }, { 15, 20, 20 }, { 90, 80, 50 }, true } // ultrafast preset and similar options |
882 | 0 | }; |
883 | |
|
884 | 0 | const int resClass = (param->sourceHeight >= 1440) ? TME_RES_HIGH : |
885 | 0 | (param->sourceHeight <= 720) ? TME_RES_LOW : TME_RES_MID; |
886 | |
|
887 | 0 | const bool ruleMatches[TME_RULE_COUNT] = |
888 | 0 | { |
889 | 0 | param->maxNumReferences >= 5 || param->subpelRefine >= 4 || (param->bEnableRectInter && param->bEnableAMP), |
890 | 0 | param->maxNumReferences >= 4 || param->subpelRefine >= 3 || (param->bEnableRectInter ^ param->bEnableAMP), |
891 | 0 | param->maxNumReferences >= 3 && param->subpelRefine >= 2, |
892 | 0 | param->maxNumReferences >= 2 && param->subpelRefine >= 2, |
893 | 0 | param->subpelRefine >= 1 && param->bframes > 3, |
894 | 0 | param->subpelRefine && param->maxCUSize < 64, |
895 | 0 | !param->subpelRefine || param->searchMethod == X265_DIA_SEARCH || param->minCUSize >= 16 |
896 | 0 | }; |
897 | |
|
898 | 0 | int selectedRule = -1; |
899 | 0 | for (int i = 0; i < TME_RULE_COUNT; i++) |
900 | 0 | { |
901 | 0 | if (ruleMatches[i]) |
902 | 0 | { |
903 | 0 | selectedRule = i; |
904 | 0 | break; |
905 | 0 | } |
906 | 0 | } |
907 | |
|
908 | 0 | if (selectedRule >= 0) |
909 | 0 | { |
910 | 0 | const TmeRuleConfig& cfg = s_tmeRuleConfig[selectedRule]; |
911 | 0 | param->tmeTaskBlockSize = cfg.widthBasedTaskBlockSize ? ((param->sourceWidth + 480 - 1) / 480) : cfg.taskBlockSize[resClass]; |
912 | 0 | param->tmeNumBufferRows = cfg.numBufferRows[resClass]; |
913 | 0 | return (cpuCount * cfg.threadPercent[resClass]) / 100; |
914 | 0 | } |
915 | | |
916 | 0 | static const int s_defaultThreadPercent[TME_RES_COUNT] = { 80, 80, 70 }; |
917 | 0 | return (cpuCount * s_defaultThreadPercent[resClass]) / 100; |
918 | 0 | } |
919 | | |
920 | | } // end namespace X265_NS |