/work/x265/source/common/threadpool.cpp
Line | Count | Source |
1 | | /***************************************************************************** |
2 | | * Copyright (C) 2013-2020 MulticoreWare, Inc |
3 | | * |
4 | | * Authors: Steve Borho <steve@borho.org> |
5 | | * Min Chen <chenm003@163.com> |
6 | | * |
7 | | * This program is free software; you can redistribute it and/or modify |
8 | | * it under the terms of the GNU General Public License as published by |
9 | | * the Free Software Foundation; either version 2 of the License, or |
10 | | * (at your option) any later version. |
11 | | * |
12 | | * This program is distributed in the hope that it will be useful, |
13 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
14 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
15 | | * GNU General Public License for more details. |
16 | | * |
17 | | * You should have received a copy of the GNU General Public License |
18 | | * along with this program; if not, write to the Free Software |
19 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111, USA. |
20 | | * |
21 | | * This program is also available under a commercial proprietary license. |
22 | | * For more information, contact us at license @ x265.com |
23 | | *****************************************************************************/ |
24 | | |
25 | | #include "common.h" |
26 | | #include "threadpool.h" |
27 | | #include "threading.h" |
28 | | |
29 | | #include <new> |
30 | | #include <vector> |
31 | | |
32 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
33 | | #include <winnt.h> |
34 | | #endif |
35 | | |
36 | | #if X86_64 |
37 | | |
38 | | #ifdef __GNUC__ |
39 | | |
40 | 9.91k | #define SLEEPBITMAP_BSF(id, x) (id) = ((unsigned long)__builtin_ctzll(x)) |
41 | 72.6k | #define SLEEPBITMAP_OR(ptr, mask) __sync_fetch_and_or(ptr, mask) |
42 | 9.91k | #define SLEEPBITMAP_AND(ptr, mask) __sync_fetch_and_and(ptr, mask) |
43 | | |
44 | | #elif defined(_MSC_VER) |
45 | | |
46 | | #define SLEEPBITMAP_BSF(id, x) _BitScanForward64(&id, x) |
47 | | #define SLEEPBITMAP_OR(ptr, mask) InterlockedOr64((volatile LONG64*)ptr, (LONG)mask) |
48 | | #define SLEEPBITMAP_AND(ptr, mask) InterlockedAnd64((volatile LONG64*)ptr, (LONG)mask) |
49 | | |
50 | | #endif // ifdef __GNUC__ |
51 | | |
52 | | #else |
53 | | |
54 | | /* use 32-bit primitives defined in threading.h */ |
55 | | #define SLEEPBITMAP_BSF BSF |
56 | | #define SLEEPBITMAP_OR ATOMIC_OR |
57 | | #define SLEEPBITMAP_AND ATOMIC_AND |
58 | | |
59 | | #endif |
60 | | |
61 | | /* TODO FIX: Macro __MACH__ ideally should be part of MacOS definition, but adding to Cmake |
62 | | behaving is not as expected, need to fix this. */ |
63 | | |
64 | | #if MACOS && __MACH__ |
65 | | #include <sys/param.h> |
66 | | #include <sys/sysctl.h> |
67 | | #endif |
68 | | #if HAVE_LIBNUMA |
69 | | #include <numa.h> |
70 | | #endif |
71 | | #if defined(_MSC_VER) |
72 | | # define strcasecmp _stricmp |
73 | | #endif |
74 | | |
75 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
76 | | const uint64_t m1 = 0x5555555555555555; //binary: 0101... |
77 | | const uint64_t m2 = 0x3333333333333333; //binary: 00110011.. |
78 | | const uint64_t m3 = 0x0f0f0f0f0f0f0f0f; //binary: 4 zeros, 4 ones ... |
79 | | const uint64_t h01 = 0x0101010101010101; //the sum of 256 to the power of 0,1,2,3... |
80 | | |
81 | | static int popCount(uint64_t x) |
82 | | { |
83 | | x -= (x >> 1) & m1; |
84 | | x = (x & m2) + ((x >> 2) & m2); |
85 | | x = (x + (x >> 4)) & m3; |
86 | | return (x * h01) >> 56; |
87 | | } |
88 | | #endif |
89 | | |
90 | | namespace X265_NS { |
91 | | // x265 private namespace |
92 | | |
93 | | class WorkerThread : public Thread |
94 | | { |
95 | | private: |
96 | | |
97 | | ThreadPool& m_pool; |
98 | | int m_id; |
99 | | Event m_wakeEvent; |
100 | | |
101 | | WorkerThread& operator =(const WorkerThread&); |
102 | | |
103 | | public: |
104 | | |
105 | | JobProvider* m_curJobProvider; |
106 | | BondedTaskGroup* m_bondMaster; |
107 | | |
108 | 20.9k | WorkerThread(ThreadPool& pool, int id) : m_pool(pool), m_id(id) {} |
109 | 20.9k | virtual ~WorkerThread() {} |
110 | | |
111 | | void threadMain(); |
112 | 30.8k | void awaken() { m_wakeEvent.trigger(); } |
113 | | }; |
114 | | |
115 | | void WorkerThread::threadMain() |
116 | 20.9k | { |
117 | 20.9k | THREAD_NAME("Worker", m_id); |
118 | | |
119 | | #if _WIN32 |
120 | | SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL); |
121 | | #else |
122 | 20.9k | __attribute__((unused)) int val = nice(10); |
123 | 20.9k | #endif |
124 | | |
125 | 20.9k | m_pool.setCurrentThreadAffinity(); |
126 | | |
127 | 20.9k | sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id; |
128 | 20.9k | m_curJobProvider = m_pool.m_jpTable[0]; |
129 | 20.9k | m_bondMaster = NULL; |
130 | | |
131 | 20.9k | SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit); |
132 | 20.9k | SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); |
133 | 20.9k | m_wakeEvent.wait(); |
134 | | |
135 | 30.8k | while (m_pool.m_isActive) |
136 | 9.89k | { |
137 | 9.89k | if (m_bondMaster) |
138 | 654 | { |
139 | 654 | m_bondMaster->processTasks(m_id); |
140 | 654 | m_bondMaster->m_exitedPeerCount.incr(); |
141 | 654 | m_bondMaster = NULL; |
142 | 654 | } |
143 | | |
144 | 9.89k | do |
145 | 16.3k | { |
146 | | /* do pending work for current job provider */ |
147 | 16.3k | m_curJobProvider->findJob(m_id); |
148 | | |
149 | | /* if the current job provider still wants help, only switch to a |
150 | | * higher priority provider (lower slice type). Else take the first |
151 | | * available job provider with the highest priority */ |
152 | 16.3k | int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType : |
153 | 16.3k | INVALID_SLICE_PRIORITY + 1; |
154 | 16.3k | int nextProvider = -1; |
155 | 113k | for (int i = 0; i < m_pool.m_numProviders; i++) |
156 | 97.2k | { |
157 | 97.2k | if (m_pool.m_jpTable[i]->m_helpWanted && |
158 | 6.42k | m_pool.m_jpTable[i]->m_sliceType < curPriority) |
159 | 0 | { |
160 | 0 | nextProvider = i; |
161 | 0 | curPriority = m_pool.m_jpTable[i]->m_sliceType; |
162 | 0 | } |
163 | 97.2k | } |
164 | 16.3k | if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider]) |
165 | 0 | { |
166 | 0 | SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit); |
167 | 0 | m_curJobProvider = m_pool.m_jpTable[nextProvider]; |
168 | 0 | SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit); |
169 | 0 | } |
170 | 16.3k | } |
171 | 16.3k | while (m_curJobProvider->m_helpWanted); |
172 | | |
173 | | /* While the worker sleeps, a job-provider or bond-group may acquire this |
174 | | * worker's sleep bitmap bit. Once acquired, that thread may modify |
175 | | * m_bondMaster or m_curJobProvider, then waken the thread */ |
176 | 9.89k | SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); |
177 | 9.89k | m_wakeEvent.wait(); |
178 | 9.89k | } |
179 | | |
180 | 20.9k | SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); |
181 | 20.9k | } |
182 | | |
183 | | void JobProvider::tryWakeOne() |
184 | 9.26k | { |
185 | 9.26k | int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS); |
186 | 9.26k | if (id < 0) |
187 | 0 | { |
188 | 0 | m_helpWanted = true; |
189 | 0 | return; |
190 | 0 | } |
191 | | |
192 | 9.26k | WorkerThread& worker = m_pool->m_workers[id]; |
193 | 9.26k | if (worker.m_curJobProvider != this) /* poaching */ |
194 | 0 | { |
195 | 0 | sleepbitmap_t bit = (sleepbitmap_t)1 << id; |
196 | 0 | SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit); |
197 | 0 | worker.m_curJobProvider = this; |
198 | 0 | SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit); |
199 | 0 | } |
200 | 9.26k | worker.awaken(); |
201 | 9.26k | } |
202 | | |
203 | | int ThreadPool::tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap) |
204 | 9.91k | { |
205 | 9.91k | unsigned long id; |
206 | | |
207 | 9.91k | sleepbitmap_t masked = m_sleepBitmap & firstTryBitmap; |
208 | 9.91k | while (masked) |
209 | 9.91k | { |
210 | 9.91k | SLEEPBITMAP_BSF(id, masked); |
211 | | |
212 | 9.91k | sleepbitmap_t bit = (sleepbitmap_t)1 << id; |
213 | 9.91k | if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit) |
214 | 9.91k | return (int)id; |
215 | | |
216 | 0 | masked = m_sleepBitmap & firstTryBitmap; |
217 | 0 | } |
218 | | |
219 | 0 | masked = m_sleepBitmap & secondTryBitmap; |
220 | 0 | while (masked) |
221 | 0 | { |
222 | 0 | SLEEPBITMAP_BSF(id, masked); |
223 | |
|
224 | 0 | sleepbitmap_t bit = (sleepbitmap_t)1 << id; |
225 | 0 | if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit) |
226 | 0 | return (int)id; |
227 | | |
228 | 0 | masked = m_sleepBitmap & secondTryBitmap; |
229 | 0 | } |
230 | | |
231 | 0 | return -1; |
232 | 0 | } |
233 | | |
234 | | int ThreadPool::tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master) |
235 | 654 | { |
236 | 654 | int bondCount = 0; |
237 | 654 | do |
238 | 654 | { |
239 | 654 | int id = tryAcquireSleepingThread(peerBitmap, 0); |
240 | 654 | if (id < 0) |
241 | 0 | return bondCount; |
242 | | |
243 | 654 | m_workers[id].m_bondMaster = &master; |
244 | 654 | m_workers[id].awaken(); |
245 | 654 | bondCount++; |
246 | 654 | } |
247 | 654 | while (bondCount < maxPeers); |
248 | | |
249 | 654 | return bondCount; |
250 | 654 | } |
251 | | ThreadPool* ThreadPool::allocThreadPools(x265_param* p, int& numPools, bool isThreadsReserved) |
252 | 654 | { |
253 | 654 | enum { MAX_NODE_NUM = 127 }; |
254 | 654 | int cpusPerNode[MAX_NODE_NUM + 1]; |
255 | 654 | int threadsPerPool[MAX_NODE_NUM + 2]; |
256 | 654 | uint64_t nodeMaskPerPool[MAX_NODE_NUM + 2]; |
257 | 654 | int totalNumThreads = 0; |
258 | | |
259 | 654 | memset(cpusPerNode, 0, sizeof(cpusPerNode)); |
260 | 654 | memset(threadsPerPool, 0, sizeof(threadsPerPool)); |
261 | 654 | memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool)); |
262 | | |
263 | 654 | int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM); |
264 | 654 | bool bNumaSupport = false; |
265 | | |
266 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
267 | | bNumaSupport = true; |
268 | | #elif HAVE_LIBNUMA |
269 | | bNumaSupport = numa_available() >= 0; |
270 | | #endif |
271 | | |
272 | | |
273 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
274 | | PGROUP_AFFINITY groupAffinityPointer = new GROUP_AFFINITY; |
275 | | for (int i = 0; i < numNumaNodes; i++) |
276 | | { |
277 | | GetNumaNodeProcessorMaskEx((UCHAR)i, groupAffinityPointer); |
278 | | cpusPerNode[i] = popCount(groupAffinityPointer->Mask); |
279 | | } |
280 | | delete groupAffinityPointer; |
281 | | #elif HAVE_LIBNUMA |
282 | | if (bNumaSupport) |
283 | | { |
284 | | struct bitmask* bitMask = numa_allocate_cpumask(); |
285 | | for (int i = 0; i < numNumaNodes; i++) |
286 | | { |
287 | | int ret = numa_node_to_cpus(i, bitMask); |
288 | | if (!ret) |
289 | | cpusPerNode[i] = numa_bitmask_weight(bitMask); |
290 | | else |
291 | | x265_log(p, X265_LOG_ERROR, "Failed to genrate CPU mask\n"); |
292 | | } |
293 | | numa_free_cpumask(bitMask); |
294 | | } |
295 | | #else // NUMA not supported |
296 | 654 | cpusPerNode[0] = getCpuCount(); |
297 | 654 | #endif |
298 | | |
299 | 654 | if (bNumaSupport && p->logLevel >= X265_LOG_DEBUG) |
300 | 0 | for (int i = 0; i < numNumaNodes; i++) |
301 | 0 | x265_log(p, X265_LOG_DEBUG, "detected NUMA node %d with %d logical cores\n", i, cpusPerNode[i]); |
302 | | /* limit threads based on param->numaPools |
303 | | * For windows because threads can't be allocated to live across sockets |
304 | | * changing the default behavior to be per-socket pools -- FIXME */ |
305 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA |
306 | | if (!strlen(p->numaPools) || (strcmp(p->numaPools, "NULL") == 0 || strcmp(p->numaPools, "*") == 0 || strcmp(p->numaPools, "") == 0)) |
307 | | { |
308 | | char poolString[50] = ""; |
309 | | for (int i = 0; i < numNumaNodes; i++) |
310 | | { |
311 | | char nextCount[10] = ""; |
312 | | if (i) |
313 | | snprintf(nextCount, sizeof(nextCount), ",%d", cpusPerNode[i]); |
314 | | else |
315 | | snprintf(nextCount, sizeof(nextCount), "%d", cpusPerNode[i]); |
316 | | strcat(poolString, nextCount); |
317 | | } |
318 | | x265_param_parse(p, "pools", poolString); |
319 | | } |
320 | | #endif |
321 | 654 | if (strlen(p->numaPools)) |
322 | 0 | { |
323 | 0 | const char *nodeStr = p->numaPools; |
324 | 0 | for (int i = 0; i < numNumaNodes; i++) |
325 | 0 | { |
326 | 0 | if (!*nodeStr) |
327 | 0 | { |
328 | 0 | threadsPerPool[i] = 0; |
329 | 0 | continue; |
330 | 0 | } |
331 | 0 | else if (*nodeStr == '-') |
332 | 0 | threadsPerPool[i] = 0; |
333 | 0 | else if (*nodeStr == '*' || !strcasecmp(nodeStr, "NULL")) |
334 | 0 | { |
335 | 0 | for (int j = i; j < numNumaNodes; j++) |
336 | 0 | { |
337 | 0 | threadsPerPool[numNumaNodes] += cpusPerNode[j]; |
338 | 0 | nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << j); |
339 | 0 | } |
340 | 0 | break; |
341 | 0 | } |
342 | 0 | else if (*nodeStr == '+') |
343 | 0 | { |
344 | 0 | threadsPerPool[numNumaNodes] += cpusPerNode[i]; |
345 | 0 | nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i); |
346 | 0 | } |
347 | 0 | else |
348 | 0 | { |
349 | 0 | int count = atoi(nodeStr); |
350 | 0 | if (i > 0 || strchr(nodeStr, ',')) // it is comma -> old logic |
351 | 0 | { |
352 | 0 | threadsPerPool[i] = X265_MIN(count, cpusPerNode[i]); |
353 | 0 | nodeMaskPerPool[i] = ((uint64_t)1 << i); |
354 | 0 | } |
355 | 0 | else // new logic: exactly 'count' threads on all NUMAs |
356 | 0 | { |
357 | 0 | threadsPerPool[numNumaNodes] = X265_MIN(count, numNumaNodes * MAX_POOL_THREADS); |
358 | 0 | nodeMaskPerPool[numNumaNodes] = ((uint64_t)-1 >> (64 - numNumaNodes)); |
359 | 0 | } |
360 | 0 | } |
361 | | |
362 | | /* consume current node string, comma, and white-space */ |
363 | 0 | while (*nodeStr && *nodeStr != ',') |
364 | 0 | ++nodeStr; |
365 | 0 | if (*nodeStr == ',' || *nodeStr == ' ') |
366 | 0 | ++nodeStr; |
367 | 0 | } |
368 | 0 | } |
369 | 654 | else |
370 | 654 | { |
371 | 1.30k | for (int i = 0; i < numNumaNodes; i++) |
372 | 654 | { |
373 | 654 | threadsPerPool[numNumaNodes] += cpusPerNode[i]; |
374 | 654 | nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i); |
375 | 654 | } |
376 | 654 | } |
377 | | |
378 | | /* If ThreadedME is enabled, split resources: give half NUMA nodes (or |
379 | | * half cores when NUMA not available) to threaded-me and the other half |
380 | | * to the remaining job providers. */ |
381 | | /* Compute total CPU count from detected per-node counts when available */ |
382 | 1.96k | for (int i = 0; i < numNumaNodes + 1; i++) |
383 | 1.30k | totalNumThreads += threadsPerPool[i]; |
384 | 654 | if (!totalNumThreads) |
385 | 0 | totalNumThreads = ThreadPool::getCpuCount(); |
386 | | |
387 | 654 | int threadsFrameEnc = 0; |
388 | | |
389 | 654 | if (p->bThreadedME) |
390 | 0 | { |
391 | 0 | int targetTME = configureTmeThreadCount(p, totalNumThreads); |
392 | 0 | targetTME = (targetTME < 1) ? 1 : targetTME; |
393 | |
|
394 | 0 | threadsFrameEnc = totalNumThreads - targetTME; |
395 | 0 | int defaultNumFT = getFrameThreadsCount(p, totalNumThreads); |
396 | 0 | if (threadsFrameEnc < defaultNumFT) |
397 | 0 | { |
398 | 0 | threadsFrameEnc = defaultNumFT; |
399 | 0 | targetTME = totalNumThreads - threadsFrameEnc; |
400 | 0 | } |
401 | |
|
402 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA |
403 | | if (bNumaSupport && numNumaNodes > 1) |
404 | | { |
405 | | int tmeNumaNodes = 0; |
406 | | int leftover = 0; |
407 | | |
408 | | // First thread pool belongs to ThreadedME |
409 | | std::vector<int> threads(1, 0); |
410 | | std::vector<uint64_t> nodeMasks(1, 0); |
411 | | int poolIndex = 0; |
412 | | |
413 | | /* Greedily assign whole NUMA nodes to TME until reaching or exceeding the target */ |
414 | | for (int i = 0; i < numNumaNodes + 1; i++) |
415 | | { |
416 | | if (!threadsPerPool[i] && !nodeMaskPerPool[i]) |
417 | | continue; |
418 | | |
419 | | int toTake = X265_MIN(threadsPerPool[i], targetTME - threads[0]); |
420 | | if (toTake > 0) |
421 | | { |
422 | | threads[poolIndex] += toTake; |
423 | | nodeMasks[poolIndex] |= nodeMaskPerPool[i]; |
424 | | tmeNumaNodes++; |
425 | | |
426 | | if (threads[0] == targetTME) |
427 | | poolIndex++; |
428 | | |
429 | | if (toTake < threadsPerPool[i]) |
430 | | leftover = threadsPerPool[i] - toTake; |
431 | | } |
432 | | else |
433 | | { |
434 | | threads.push_back(threadsPerPool[i]); |
435 | | nodeMasks.push_back(nodeMaskPerPool[i]); |
436 | | poolIndex++; |
437 | | } |
438 | | } |
439 | | |
440 | | // Distribute leftover threads among FrameEncoders |
441 | | if (leftover) |
442 | | { |
443 | | // Case 1: There are 1 or more threadpools for FrameEncoder(s) by now |
444 | | if (threads.size() > 1) |
445 | | { |
446 | | int split = static_cast<int>(static_cast<double>(leftover) / (numNumaNodes - 1)); |
447 | | for (int pool = 1; pool < numNumaNodes; pool++) |
448 | | { |
449 | | int give = X265_MIN(split, leftover); |
450 | | threads[pool] += give; |
451 | | leftover -= give; |
452 | | } |
453 | | } |
454 | | |
455 | | // Case 2: FrameEncoder(s) haven't received threads yet |
456 | | if (threads.size() == 1) |
457 | | { |
458 | | threads.push_back(leftover); |
459 | | // Give the same node mask as the last node of ThreadedME |
460 | | uint64_t msb = 1; |
461 | | uint64_t tmeNodeMask = nodeMasks[0]; |
462 | | while (tmeNodeMask > 1) |
463 | | { |
464 | | tmeNodeMask >>= 1; |
465 | | msb <<= 1; |
466 | | } |
467 | | nodeMasks.push_back(msb); |
468 | | } |
469 | | } |
470 | | |
471 | | // Apply calculated threadpool assignment |
472 | | // TODO: Make sure this doesn't cause a problem later on |
473 | | memset(threadsPerPool, 0, sizeof(threadsPerPool)); |
474 | | memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool)); |
475 | | |
476 | | numPools = numNumaNodes = static_cast<int>(threads.size()); |
477 | | for (int pool = 0; pool < numPools; pool++) |
478 | | { |
479 | | threadsPerPool[pool] = threads[pool]; |
480 | | nodeMaskPerPool[pool] = nodeMasks[pool]; |
481 | | } |
482 | | } |
483 | | else |
484 | | #endif |
485 | 0 | { |
486 | 0 | memset(threadsPerPool, 0, sizeof(threadsPerPool)); |
487 | 0 | memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool)); |
488 | | |
489 | 0 | threadsPerPool[0] = targetTME; |
490 | 0 | nodeMaskPerPool[0] = 1; |
491 | |
|
492 | 0 | threadsPerPool[1] = threadsFrameEnc; |
493 | 0 | nodeMaskPerPool[1] = 1; |
494 | |
|
495 | 0 | numPools = 2; |
496 | 0 | } |
497 | 0 | } |
498 | 654 | else |
499 | 654 | { |
500 | 654 | threadsFrameEnc = totalNumThreads; |
501 | 654 | } |
502 | | |
503 | | // If the last pool size is > MAX_POOL_THREADS, clip it to spawn thread pools only of size >= 1/2 max (heuristic) |
504 | 654 | if ((threadsPerPool[numNumaNodes] > MAX_POOL_THREADS) && |
505 | 0 | ((threadsPerPool[numNumaNodes] % MAX_POOL_THREADS) < (MAX_POOL_THREADS / 2))) |
506 | 0 | { |
507 | 0 | threadsPerPool[numNumaNodes] -= (threadsPerPool[numNumaNodes] % MAX_POOL_THREADS); |
508 | 0 | x265_log(p, X265_LOG_DEBUG, |
509 | 0 | "Creating only %d worker threads beyond specified numbers with --pools (if specified) to prevent asymmetry in pools; may not use all HW contexts\n", threadsPerPool[numNumaNodes]); |
510 | 0 | } |
511 | | |
512 | 654 | if (!p->bThreadedME) |
513 | 654 | { |
514 | 654 | numPools = 0; |
515 | 1.96k | for (int i = 0; i < numNumaNodes + 1; i++) |
516 | 1.30k | { |
517 | 1.30k | if (bNumaSupport) |
518 | 0 | x265_log(p, X265_LOG_DEBUG, "NUMA node %d may use %d logical cores\n", i, cpusPerNode[i]); |
519 | | |
520 | 1.30k | if (threadsPerPool[i]) |
521 | 654 | { |
522 | 654 | numPools += (threadsPerPool[i] + MAX_POOL_THREADS - 1) / MAX_POOL_THREADS; |
523 | 654 | totalNumThreads += threadsPerPool[i]; |
524 | 654 | } |
525 | 1.30k | } |
526 | 654 | } |
527 | | |
528 | 654 | if (!isThreadsReserved) |
529 | 654 | { |
530 | 654 | if (!numPools) |
531 | 0 | { |
532 | 0 | x265_log(p, X265_LOG_DEBUG, "No pool thread available. Deciding frame-threads based on detected CPU threads\n"); |
533 | 0 | totalNumThreads = ThreadPool::getCpuCount(); // auto-detect frame threads |
534 | 0 | } |
535 | | |
536 | 654 | if (!p->frameNumThreads) |
537 | 654 | p->frameNumThreads = ThreadPool::getFrameThreadsCount(p, threadsFrameEnc); |
538 | 654 | } |
539 | | |
540 | 654 | if (!numPools) |
541 | 0 | return NULL; |
542 | | |
543 | 654 | if (numPools > p->frameNumThreads && !p->bThreadedME) |
544 | 0 | { |
545 | 0 | x265_log(p, X265_LOG_DEBUG, "Reducing number of thread pools for frame thread count\n"); |
546 | 0 | numPools = X265_MAX(p->frameNumThreads / 2, 1); |
547 | 0 | } |
548 | 654 | if (isThreadsReserved) |
549 | 0 | numPools = 1; |
550 | 654 | ThreadPool *pools = new ThreadPool[numPools]; |
551 | 654 | if (pools) |
552 | 654 | { |
553 | 654 | int poolCount = (p->bThreadedME) ? numPools - 1 : numPools; |
554 | 654 | int node = 0; |
555 | 1.30k | for (int i = 0; i < numPools; i++) |
556 | 654 | { |
557 | 654 | int maxProviders = (p->bThreadedME && i == 0) // threadpool 0 is dedicated to ThreadedME |
558 | 654 | ? 1 |
559 | 654 | : (p->frameNumThreads + poolCount - 1) / poolCount + !isThreadsReserved; // +1 is Lookahead, always assigned to threadpool 0 |
560 | | |
561 | 1.30k | while (!threadsPerPool[node]) |
562 | 654 | node++; |
563 | 654 | int numThreads = threadsPerPool[node]; |
564 | 654 | int origNumThreads = numThreads; |
565 | | |
566 | 654 | if (i == 0 && p->lookaheadThreads > numThreads / 2) |
567 | 0 | { |
568 | 0 | p->lookaheadThreads = numThreads / 2; |
569 | 0 | x265_log(p, X265_LOG_DEBUG, "Setting lookahead threads to a maximum of half the total number of threads\n"); |
570 | 0 | } |
571 | | |
572 | 654 | if (isThreadsReserved) |
573 | 0 | { |
574 | 0 | numThreads = p->lookaheadThreads; |
575 | 0 | maxProviders = 1; |
576 | 0 | } |
577 | 654 | else if (i == 0) |
578 | 654 | numThreads -= p->lookaheadThreads; |
579 | | |
580 | 654 | if (!pools[i].create(numThreads, maxProviders, nodeMaskPerPool[node])) |
581 | 0 | { |
582 | 0 | delete[] pools; |
583 | 0 | numPools = 0; |
584 | 0 | return NULL; |
585 | 0 | } |
586 | 654 | if (numNumaNodes > 1) |
587 | 0 | { |
588 | 0 | char *nodesstr = new char[64 * strlen(",63") + 1]; |
589 | 0 | int len = 0; |
590 | 0 | for (int j = 0; j < 64; j++) |
591 | 0 | if ((nodeMaskPerPool[node] >> j) & 1) |
592 | 0 | len += snprintf(nodesstr + len, sizeof(nodesstr) - len, ",%d", j); |
593 | 0 | x265_log(p, X265_LOG_INFO, "Thread pool %d using %d threads on numa nodes %s\n", i, numThreads, nodesstr + 1); |
594 | 0 | delete[] nodesstr; |
595 | 0 | } |
596 | 654 | else |
597 | 654 | x265_log(p, X265_LOG_INFO, "Thread pool created using %d threads\n", numThreads); |
598 | 654 | threadsPerPool[node] -= origNumThreads; |
599 | 654 | } |
600 | 654 | } |
601 | 0 | else |
602 | 0 | numPools = 0; |
603 | 654 | return pools; |
604 | 654 | } |
605 | | |
606 | | ThreadPool::ThreadPool() |
607 | 654 | { |
608 | 654 | memset(this, 0, sizeof(*this)); |
609 | 654 | } |
610 | | |
611 | | bool ThreadPool::create(int numThreads, int maxProviders, uint64_t nodeMask) |
612 | 654 | { |
613 | 654 | X265_CHECK(numThreads <= MAX_POOL_THREADS, "a single thread pool cannot have more than MAX_POOL_THREADS threads\n"); |
614 | | |
615 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
616 | | memset(&m_groupAffinity, 0, sizeof(GROUP_AFFINITY)); |
617 | | for (int i = 0; i < getNumaNodeCount(); i++) |
618 | | { |
619 | | int numaNode = ((nodeMask >> i) & 0x1U) ? i : -1; |
620 | | if (numaNode != -1) |
621 | | if (GetNumaNodeProcessorMaskEx((USHORT)numaNode, &m_groupAffinity)) |
622 | | break; |
623 | | } |
624 | | m_numaMask = &m_groupAffinity.Mask; |
625 | | #elif HAVE_LIBNUMA |
626 | | if (numa_available() >= 0) |
627 | | { |
628 | | struct bitmask* nodemask = numa_allocate_nodemask(); |
629 | | if (nodemask) |
630 | | { |
631 | | *(nodemask->maskp) = nodeMask; |
632 | | m_numaMask = nodemask; |
633 | | } |
634 | | else |
635 | | x265_log(NULL, X265_LOG_ERROR, "unable to get NUMA node mask for %lx\n", nodeMask); |
636 | | } |
637 | | #else |
638 | 654 | (void)nodeMask; |
639 | 654 | #endif |
640 | | |
641 | 654 | m_numWorkers = numThreads; |
642 | | |
643 | 654 | m_workers = X265_MALLOC(WorkerThread, numThreads); |
644 | | /* placement new initialization */ |
645 | 654 | if (m_workers) |
646 | 21.5k | for (int i = 0; i < numThreads; i++) |
647 | 20.9k | new (m_workers + i)WorkerThread(*this, i); |
648 | | |
649 | 654 | m_jpTable = X265_MALLOC(JobProvider*, maxProviders); |
650 | 654 | if (m_jpTable) |
651 | 654 | memset(m_jpTable, 0, sizeof(JobProvider*) * maxProviders); |
652 | 654 | m_numProviders = 0; |
653 | | |
654 | 654 | return m_workers && m_jpTable; |
655 | 654 | } |
656 | | |
657 | | bool ThreadPool::start() |
658 | 654 | { |
659 | 654 | m_isActive = true; |
660 | 21.5k | for (int i = 0; i < m_numWorkers; i++) |
661 | 20.9k | { |
662 | 20.9k | if (!m_workers[i].start()) |
663 | 0 | { |
664 | 0 | m_isActive = false; |
665 | 0 | return false; |
666 | 0 | } |
667 | 20.9k | } |
668 | 654 | return true; |
669 | 654 | } |
670 | | |
671 | | void ThreadPool::stopWorkers() |
672 | 654 | { |
673 | 654 | if (m_workers) |
674 | 654 | { |
675 | 654 | m_isActive = false; |
676 | 21.5k | for (int i = 0; i < m_numWorkers; i++) |
677 | 20.9k | { |
678 | 20.9k | while (!(m_sleepBitmap & ((sleepbitmap_t)1 << i))) |
679 | 0 | GIVE_UP_TIME(); |
680 | 20.9k | m_workers[i].awaken(); |
681 | 20.9k | m_workers[i].stop(); |
682 | 20.9k | } |
683 | 654 | } |
684 | 654 | } |
685 | | |
686 | | ThreadPool::~ThreadPool() |
687 | 654 | { |
688 | 654 | if (m_workers) |
689 | 654 | { |
690 | 21.5k | for (int i = 0; i < m_numWorkers; i++) |
691 | 20.9k | m_workers[i].~WorkerThread(); |
692 | 654 | } |
693 | | |
694 | 654 | X265_FREE(m_workers); |
695 | 654 | X265_FREE(m_jpTable); |
696 | | |
697 | | #if HAVE_LIBNUMA |
698 | | if(m_numaMask) |
699 | | numa_free_nodemask((struct bitmask*)m_numaMask); |
700 | | #endif |
701 | 654 | } |
702 | | |
703 | | void ThreadPool::setCurrentThreadAffinity() |
704 | 23.8k | { |
705 | 23.8k | setThreadNodeAffinity(m_numaMask); |
706 | 23.8k | } |
707 | | |
708 | | void ThreadPool::setThreadNodeAffinity(void *numaMask) |
709 | 23.8k | { |
710 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
711 | | UNREFERENCED_PARAMETER(numaMask); |
712 | | GROUP_AFFINITY groupAffinity; |
713 | | memset(&groupAffinity, 0, sizeof(GROUP_AFFINITY)); |
714 | | groupAffinity.Group = m_groupAffinity.Group; |
715 | | groupAffinity.Mask = m_groupAffinity.Mask; |
716 | | const PGROUP_AFFINITY affinityPointer = &groupAffinity; |
717 | | if (SetThreadGroupAffinity(GetCurrentThread(), affinityPointer, NULL)) |
718 | | return; |
719 | | else |
720 | | x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n"); |
721 | | #elif HAVE_LIBNUMA |
722 | | if (numa_available() >= 0) |
723 | | { |
724 | | numa_run_on_node_mask((struct bitmask*)numaMask); |
725 | | numa_set_interleave_mask((struct bitmask*)numaMask); |
726 | | numa_set_localalloc(); |
727 | | return; |
728 | | } |
729 | | x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n"); |
730 | | #else |
731 | 23.8k | (void)numaMask; |
732 | 23.8k | #endif |
733 | 23.8k | return; |
734 | 23.8k | } |
735 | | |
736 | | /* static */ |
737 | | int ThreadPool::getNumaNodeCount() |
738 | 1.30k | { |
739 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
740 | | ULONG num = 1; |
741 | | if (GetNumaHighestNodeNumber(&num)) |
742 | | num++; |
743 | | return (int)num; |
744 | | #elif HAVE_LIBNUMA |
745 | | if (numa_available() >= 0) |
746 | | return numa_max_node() + 1; |
747 | | else |
748 | | return 1; |
749 | | #else |
750 | 1.30k | return 1; |
751 | 1.30k | #endif |
752 | 1.30k | } |
753 | | |
754 | | /* static */ |
755 | | int ThreadPool::getCpuCount() |
756 | 654 | { |
757 | | #if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 |
758 | | enum { MAX_NODE_NUM = 127 }; |
759 | | int cpus = 0; |
760 | | int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM); |
761 | | GROUP_AFFINITY groupAffinity; |
762 | | for (int i = 0; i < numNumaNodes; i++) |
763 | | { |
764 | | GetNumaNodeProcessorMaskEx((UCHAR)i, &groupAffinity); |
765 | | cpus += popCount(groupAffinity.Mask); |
766 | | } |
767 | | return cpus; |
768 | | #elif _WIN32 |
769 | | SYSTEM_INFO sysinfo; |
770 | | GetSystemInfo(&sysinfo); |
771 | | return sysinfo.dwNumberOfProcessors; |
772 | | #elif __unix__ && X265_ARCH_ARM |
773 | | /* Return the number of processors configured by OS. Because, most embedded linux distributions |
774 | | * uses only one processor as the scheduler doesn't have enough work to utilize all processors */ |
775 | | return sysconf(_SC_NPROCESSORS_CONF); |
776 | | #elif __unix__ |
777 | 654 | return sysconf(_SC_NPROCESSORS_ONLN); |
778 | | #elif MACOS && __MACH__ |
779 | | int nm[2]; |
780 | | size_t len = 4; |
781 | | uint32_t count; |
782 | | |
783 | | nm[0] = CTL_HW; |
784 | | nm[1] = HW_AVAILCPU; |
785 | | sysctl(nm, 2, &count, &len, NULL, 0); |
786 | | |
787 | | if (count < 1) |
788 | | { |
789 | | nm[1] = HW_NCPU; |
790 | | sysctl(nm, 2, &count, &len, NULL, 0); |
791 | | if (count < 1) |
792 | | count = 1; |
793 | | } |
794 | | |
795 | | return count; |
796 | | #else |
797 | | return 2; // default to 2 threads, everywhere else |
798 | | #endif |
799 | 654 | } |
800 | | |
801 | | int ThreadPool::getFrameThreadsCount(x265_param* p, int cpuCount) |
802 | 654 | { |
803 | 654 | int rows = (p->sourceHeight + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize]; |
804 | 654 | if (!p->bEnableWavefront) |
805 | 123 | return X265_MIN3(cpuCount, (rows + 1) / 2, X265_MAX_FRAME_THREADS); |
806 | 531 | else if (cpuCount >= 32) |
807 | 531 | return (p->sourceHeight > 2000) ? 6 : 5; |
808 | 0 | else if (cpuCount >= 16) |
809 | 0 | return 4; |
810 | 0 | else if (cpuCount >= 8) |
811 | | #if _WIN32 && X265_ARCH_ARM64 |
812 | | return cpuCount; |
813 | | #else |
814 | 0 | return 3; |
815 | 0 | #endif |
816 | 0 | else if (cpuCount >= 4) |
817 | 0 | return 2; |
818 | 0 | else |
819 | 0 | return 1; |
820 | 654 | } |
821 | | |
822 | | int ThreadPool::configureTmeThreadCount(x265_param* param, int cpuCount) |
823 | 0 | { |
824 | 0 | enum TmeResClass |
825 | 0 | { |
826 | 0 | TME_RES_LOW = 0, |
827 | 0 | TME_RES_MID, |
828 | 0 | TME_RES_HIGH, |
829 | 0 | TME_RES_COUNT |
830 | 0 | }; |
831 | |
|
832 | 0 | enum TmeRule |
833 | 0 | { |
834 | 0 | TME_RULE_FAST_MEDIUM_SLOW = 0, |
835 | 0 | TME_RULE_FASTER, |
836 | 0 | TME_RULE_VERYFAST, |
837 | 0 | TME_RULE_SUPERFAST, |
838 | 0 | TME_RULE_ULTRAFAST, |
839 | 0 | TME_RULE_COUNT |
840 | 0 | }; |
841 | |
|
842 | 0 | struct TmeRuleConfig |
843 | 0 | { |
844 | 0 | int taskBlockSize[TME_RES_COUNT]; |
845 | 0 | int numBufferRows[TME_RES_COUNT]; |
846 | 0 | int threadPercent[TME_RES_COUNT]; |
847 | 0 | bool widthBasedTaskBlockSize; |
848 | 0 | }; |
849 | |
|
850 | 0 | static const TmeRuleConfig s_tmeRuleConfig[TME_RULE_COUNT] = |
851 | 0 | { |
852 | 0 | { { 1, 1, 1 }, { 10, 10, 10 }, { 90, 80, 70 }, false }, // fast / medium and slower presets |
853 | 0 | { { 1, 1, 1 }, { 10, 15, 10 }, { 90, 80, 70 }, false }, // faster preset and similar options |
854 | 0 | { { 1, 1, 1 }, { 10, 15, 20 }, { 90, 80, 70 }, false }, // veryfast preset and similar options |
855 | 0 | { { 2, 4, 4 }, { 10, 15, 20 }, { 90, 80, 60 }, false }, // superfast preset and similar options |
856 | 0 | { { 0, 0, 0 }, { 15, 20, 20 }, { 90, 80, 50 }, true } // ultrafast preset and similar options |
857 | 0 | }; |
858 | |
|
859 | 0 | const int resClass = (param->sourceHeight >= 1440) ? TME_RES_HIGH : |
860 | 0 | (param->sourceHeight <= 720) ? TME_RES_LOW : TME_RES_MID; |
861 | |
|
862 | 0 | const bool ruleMatches[TME_RULE_COUNT] = |
863 | 0 | { |
864 | 0 | param->maxNumReferences >= 3 && param->subpelRefine >= 2, |
865 | 0 | param->maxNumReferences >= 2 && param->subpelRefine >= 2, |
866 | 0 | param->subpelRefine >= 1 && param->bframes > 3, |
867 | 0 | param->subpelRefine && param->maxCUSize < 64, |
868 | 0 | !param->subpelRefine || param->searchMethod == X265_DIA_SEARCH || param->minCUSize >= 16 |
869 | 0 | }; |
870 | |
|
871 | 0 | int selectedRule = -1; |
872 | 0 | for (int i = 0; i < TME_RULE_COUNT; i++) |
873 | 0 | { |
874 | 0 | if (ruleMatches[i]) |
875 | 0 | { |
876 | 0 | selectedRule = i; |
877 | 0 | break; |
878 | 0 | } |
879 | 0 | } |
880 | |
|
881 | 0 | if (selectedRule >= 0) |
882 | 0 | { |
883 | 0 | const TmeRuleConfig& cfg = s_tmeRuleConfig[selectedRule]; |
884 | 0 | param->tmeTaskBlockSize = cfg.widthBasedTaskBlockSize ? ((param->sourceWidth + 480 - 1) / 480) : cfg.taskBlockSize[resClass]; |
885 | 0 | param->tmeNumBufferRows = cfg.numBufferRows[resClass]; |
886 | 0 | return (cpuCount * cfg.threadPercent[resClass]) / 100; |
887 | 0 | } |
888 | | |
889 | 0 | static const int s_defaultThreadPercent[TME_RES_COUNT] = { 80, 80, 70 }; |
890 | 0 | return (cpuCount * s_defaultThreadPercent[resClass]) / 100; |
891 | 0 | } |
892 | | |
893 | | } // end namespace X265_NS |