Coverage Report

Created: 2026-05-30 06:08

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/work/x265/source/common/threadpool.cpp
Line
Count
Source
1
/*****************************************************************************
2
 * Copyright (C) 2013-2020 MulticoreWare, Inc
3
 *
4
 * Authors: Steve Borho <steve@borho.org>
5
 *          Min Chen <chenm003@163.com>
6
 *
7
 * This program is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version.
11
 *
12
 * This program is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111, USA.
20
 *
21
 * This program is also available under a commercial proprietary license.
22
 * For more information, contact us at license @ x265.com
23
 *****************************************************************************/
24
25
#include "common.h"
26
#include "threadpool.h"
27
#include "threading.h"
28
29
#include <new>
30
#include <vector>
31
32
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
33
#include <winnt.h>
34
#endif
35
36
#if defined(__APPLE__)
37
#include <sys/sysctl.h>
38
#elif !defined(_WIN32)
39
#include <fstream>
40
#include <string>
41
#include <cstdio>
42
#include <cstdlib>
43
#endif
44
45
#if X86_64 || X265_ARCH_ARM64
46
47
#ifdef __GNUC__
48
49
0
#define SLEEPBITMAP_BSF(id, x)     (id) = ((unsigned long)__builtin_ctzll(x))
50
0
#define SLEEPBITMAP_OR(ptr, mask)  __sync_fetch_and_or(ptr, mask)
51
0
#define SLEEPBITMAP_AND(ptr, mask) __sync_fetch_and_and(ptr, mask)
52
53
#elif defined(_MSC_VER)
54
55
#define SLEEPBITMAP_BSF(id, x)     _BitScanForward64(&id, x)
56
#define SLEEPBITMAP_OR(ptr, mask)  InterlockedOr64((volatile LONG64*)ptr, (LONG)mask)
57
#define SLEEPBITMAP_AND(ptr, mask) InterlockedAnd64((volatile LONG64*)ptr, (LONG)mask)
58
59
#endif // ifdef __GNUC__
60
61
#else
62
63
/* use 32-bit primitives defined in threading.h */
64
#define SLEEPBITMAP_BSF BSF
65
#define SLEEPBITMAP_OR  ATOMIC_OR
66
#define SLEEPBITMAP_AND ATOMIC_AND
67
68
#endif
69
70
/* TODO FIX: Macro __MACH__ ideally should be part of MacOS definition, but adding to Cmake
71
   behaving is not as expected, need to fix this. */
72
73
#if MACOS && __MACH__
74
#include <sys/param.h>
75
#include <sys/sysctl.h>
76
#endif
77
#if HAVE_LIBNUMA
78
#include <numa.h>
79
#endif
80
#if defined(_MSC_VER)
81
# define strcasecmp _stricmp
82
#endif
83
84
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
85
const uint64_t m1 = 0x5555555555555555; //binary: 0101...
86
const uint64_t m2 = 0x3333333333333333; //binary: 00110011..
87
const uint64_t m3 = 0x0f0f0f0f0f0f0f0f; //binary:  4 zeros,  4 ones ...
88
const uint64_t h01 = 0x0101010101010101; //the sum of 256 to the power of 0,1,2,3...
89
90
static int popCount(uint64_t x)
91
{
92
    x -= (x >> 1) & m1;
93
    x = (x & m2) + ((x >> 2) & m2);
94
    x = (x + (x >> 4)) & m3;
95
    return (x * h01) >> 56;
96
}
97
#endif
98
99
namespace X265_NS {
100
// x265 private namespace
101
102
class WorkerThread : public Thread
103
{
104
private:
105
106
    ThreadPool&  m_pool;
107
    int          m_id;
108
    Event        m_wakeEvent;
109
110
    WorkerThread& operator =(const WorkerThread&);
111
112
public:
113
114
    JobProvider*     m_curJobProvider;
115
    BondedTaskGroup* m_bondMaster;
116
117
0
    WorkerThread(ThreadPool& pool, int id) : m_pool(pool), m_id(id) {}
118
0
    virtual ~WorkerThread() {}
119
120
    void threadMain();
121
0
    void awaken()           { m_wakeEvent.trigger(); }
122
};
123
124
void WorkerThread::threadMain()
125
0
{
126
0
    THREAD_NAME("Worker", m_id);
127
128
#if _WIN32
129
    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
130
#else
131
0
    __attribute__((unused)) int val = nice(10);
132
0
#endif
133
134
0
    m_pool.setCurrentThreadAffinity();
135
136
0
    sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id;
137
0
    m_curJobProvider = m_pool.m_jpTable[0];
138
0
    m_bondMaster = NULL;
139
140
0
    SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
141
0
    SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
142
0
    m_wakeEvent.wait();
143
144
0
    while (m_pool.m_isActive)
145
0
    {
146
0
        if (m_bondMaster)
147
0
        {
148
0
            m_bondMaster->processTasks(m_id);
149
0
            m_bondMaster->m_exitedPeerCount.incr();
150
0
            m_bondMaster = NULL;
151
0
        }
152
153
0
        do
154
0
        {
155
            /* do pending work for current job provider */
156
0
            m_curJobProvider->findJob(m_id);
157
158
            /* if the current job provider still wants help, only switch to a
159
             * higher priority provider (lower slice type). Else take the first
160
             * available job provider with the highest priority */
161
0
            int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
162
0
                                                                 INVALID_SLICE_PRIORITY + 1;
163
0
            int nextProvider = -1;
164
0
            for (int i = 0; i < m_pool.m_numProviders; i++)
165
0
            {
166
0
                if (m_pool.m_jpTable[i]->m_helpWanted &&
167
0
                    m_pool.m_jpTable[i]->m_sliceType < curPriority)
168
0
                {
169
0
                    nextProvider = i;
170
0
                    curPriority = m_pool.m_jpTable[i]->m_sliceType;
171
0
                }
172
0
            }
173
0
            if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
174
0
            {
175
0
                SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
176
0
                m_curJobProvider = m_pool.m_jpTable[nextProvider];
177
0
                SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
178
0
            }
179
0
        }
180
0
        while (m_curJobProvider->m_helpWanted);
181
182
        /* While the worker sleeps, a job-provider or bond-group may acquire this
183
         * worker's sleep bitmap bit. Once acquired, that thread may modify 
184
         * m_bondMaster or m_curJobProvider, then waken the thread */
185
0
        SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
186
0
        m_wakeEvent.wait();
187
0
    }
188
189
0
    SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
190
0
}
191
192
void JobProvider::tryWakeOne()
193
0
{
194
0
    int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
195
0
    if (id < 0)
196
0
    {
197
0
        m_helpWanted = true;
198
0
        return;
199
0
    }
200
201
0
    WorkerThread& worker = m_pool->m_workers[id];
202
0
    if (worker.m_curJobProvider != this) /* poaching */
203
0
    {
204
0
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
205
0
        SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
206
0
        worker.m_curJobProvider = this;
207
0
        SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
208
0
    }
209
0
    worker.awaken();
210
0
}
211
212
int ThreadPool::tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap)
213
0
{
214
0
    unsigned long id;
215
216
0
    sleepbitmap_t masked = m_sleepBitmap & firstTryBitmap;
217
0
    while (masked)
218
0
    {
219
0
        SLEEPBITMAP_BSF(id, masked);
220
221
0
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
222
0
        if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
223
0
            return (int)id;
224
225
0
        masked = m_sleepBitmap & firstTryBitmap;
226
0
    }
227
228
0
    masked = m_sleepBitmap & secondTryBitmap;
229
0
    while (masked)
230
0
    {
231
0
        SLEEPBITMAP_BSF(id, masked);
232
233
0
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
234
0
        if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
235
0
            return (int)id;
236
237
0
        masked = m_sleepBitmap & secondTryBitmap;
238
0
    }
239
240
0
    return -1;
241
0
}
242
243
int ThreadPool::tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master)
244
0
{
245
0
    int bondCount = 0;
246
0
    do
247
0
    {
248
0
        int id = tryAcquireSleepingThread(peerBitmap, 0);
249
0
        if (id < 0)
250
0
            return bondCount;
251
252
0
        m_workers[id].m_bondMaster = &master;
253
0
        m_workers[id].awaken();
254
0
        bondCount++;
255
0
    }
256
0
    while (bondCount < maxPeers);
257
258
0
    return bondCount;
259
0
}
260
261
/* Distributes totalNumThreads between ThreadedME and FrameEncoder pools.
262
 * Modifies threadsPerPool[], nodeMaskPerPool[], numNumaNodes, and numPools in-place.
263
 * Returns the number of threads reserved for frame encoding. */
264
static void distributeThreadsForTme(
265
    x265_param* p,
266
    int totalNumThreads,
267
    int& numNumaNodes,
268
    bool bNumaSupport,
269
    int* threadsPerPool,
270
    uint64_t* nodeMaskPerPool,
271
    int& numPools,
272
    int& threadsFrameEnc)
273
0
{
274
0
    if (totalNumThreads < MIN_TME_THREADS)
275
0
    {
276
0
        x265_log(p, X265_LOG_WARNING, "Low thread count detected, disabling --threaded-me."
277
0
            " Minimum recommended is 32 cores / threads\n");
278
0
        p->bThreadedME = 0;
279
0
        return;
280
0
    }
281
282
0
    int targetTME = ThreadPool::configureTmeThreadCount(p, totalNumThreads);
283
0
    targetTME = (targetTME < 1) ? 1 : targetTME;
284
285
0
    threadsFrameEnc = totalNumThreads - targetTME;
286
0
    int defaultNumFT = ThreadPool::getFrameThreadsCount(p, totalNumThreads);
287
0
    if (threadsFrameEnc < defaultNumFT)
288
0
    {
289
0
        threadsFrameEnc = defaultNumFT;
290
0
        targetTME = totalNumThreads - threadsFrameEnc;
291
0
    }
292
293
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA
294
    if (bNumaSupport && numNumaNodes > 1)
295
    {
296
        int tmeNumaNodes = 0;
297
        int leftover = 0;
298
299
        // First thread pool belongs to ThreadedME
300
        std::vector<int> threads(1, 0);
301
        std::vector<uint64_t> nodeMasks(1, 0);
302
        int poolIndex = 0;
303
304
        /* Greedily assign whole NUMA nodes to TME until reaching or exceeding the target */
305
        for (int i = 0; i < numNumaNodes + 1; i++)
306
        {
307
            if (!threadsPerPool[i] && !nodeMaskPerPool[i])
308
                continue;
309
310
            int toTake = X265_MIN(threadsPerPool[i], targetTME - threads[0]);
311
            if (toTake > 0)
312
            {
313
                threads[poolIndex] += toTake;
314
                nodeMasks[poolIndex] |= nodeMaskPerPool[i];
315
                tmeNumaNodes++;
316
317
                if (threads[0] == targetTME)
318
                    poolIndex++;
319
320
                if (toTake < threadsPerPool[i])
321
                    leftover = threadsPerPool[i] - toTake;
322
            }
323
            else
324
            {
325
                threads.push_back(threadsPerPool[i]);
326
                nodeMasks.push_back(nodeMaskPerPool[i]);
327
                poolIndex++;
328
            }
329
        }
330
331
        // Distribute leftover threads among FrameEncoders
332
        if (leftover)
333
        {
334
            // Case 1: There are 1 or more threadpools for FrameEncoder(s) by now
335
            if (threads.size() > 1)
336
            {
337
                int split = static_cast<int>(static_cast<double>(leftover) / (numNumaNodes - 1));
338
                for (int pool = 1; pool < numNumaNodes; pool++)
339
                {
340
                    int give = X265_MIN(split, leftover);
341
                    threads[pool] += give;
342
                    leftover -= give;
343
                }
344
            }
345
346
            // Case 2: FrameEncoder(s) haven't received threads yet
347
            if (threads.size() == 1)
348
            {
349
                threads.push_back(leftover);
350
                // Give the same node mask as the last node of ThreadedME
351
                uint64_t msb = 1;
352
                uint64_t tmeNodeMask = nodeMasks[0];
353
                while (tmeNodeMask > 1)
354
                {
355
                    tmeNodeMask >>= 1;
356
                    msb <<= 1;
357
                }
358
                nodeMasks.push_back(msb);
359
            }
360
        }
361
362
        // Apply calculated threadpool assignment
363
        memset(threadsPerPool, 0, sizeof(int) * (numNumaNodes + 2));
364
        memset(nodeMaskPerPool, 0, sizeof(uint64_t) * (numNumaNodes + 2));
365
366
        numPools = numNumaNodes = static_cast<int>(threads.size());
367
        for (int pool = 0; pool < numPools; pool++)
368
        {
369
            threadsPerPool[pool] = threads[pool];
370
            nodeMaskPerPool[pool] = nodeMasks[pool];
371
        }
372
    }
373
    else
374
#else
375
0
    (void) bNumaSupport;
376
0
#endif
377
0
    {
378
0
        memset(threadsPerPool, 0, sizeof(int) * (numNumaNodes + 2));
379
0
        memset(nodeMaskPerPool, 0, sizeof(uint64_t) * (numNumaNodes + 2));
380
381
0
        threadsPerPool[0] = targetTME;
382
0
        nodeMaskPerPool[0] = 1;
383
384
0
        threadsPerPool[1] = threadsFrameEnc;
385
0
        nodeMaskPerPool[1] = 1;
386
387
0
        numPools = 2;
388
0
    }
389
0
}
390
391
ThreadPool* ThreadPool::allocThreadPools(x265_param* p, int& numPools, bool isThreadsReserved)
392
0
{
393
0
    enum { MAX_NODE_NUM = 127 };
394
0
    int cpusPerNode[MAX_NODE_NUM + 1];
395
0
    int threadsPerPool[MAX_NODE_NUM + 2];
396
0
    uint64_t nodeMaskPerPool[MAX_NODE_NUM + 2];
397
0
    int totalNumThreads = 0;
398
399
0
    memset(cpusPerNode, 0, sizeof(cpusPerNode));
400
0
    memset(threadsPerPool, 0, sizeof(threadsPerPool));
401
0
    memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool));
402
403
0
    int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
404
0
    bool bNumaSupport = false;
405
406
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
407
    bNumaSupport = true;
408
#elif HAVE_LIBNUMA
409
    bNumaSupport = numa_available() >= 0;
410
#endif
411
412
413
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
414
    PGROUP_AFFINITY groupAffinityPointer = new GROUP_AFFINITY;
415
    for (int i = 0; i < numNumaNodes; i++)
416
    {
417
        GetNumaNodeProcessorMaskEx((UCHAR)i, groupAffinityPointer);
418
        cpusPerNode[i] = popCount(groupAffinityPointer->Mask);
419
    }
420
    delete groupAffinityPointer;
421
#elif HAVE_LIBNUMA
422
    if (bNumaSupport)
423
    {
424
        struct bitmask* bitMask = numa_allocate_cpumask();
425
        for (int i = 0; i < numNumaNodes; i++)
426
        {
427
            int ret = numa_node_to_cpus(i, bitMask);
428
            if (!ret)
429
                cpusPerNode[i] = numa_bitmask_weight(bitMask);
430
            else
431
                x265_log(p, X265_LOG_ERROR, "Failed to genrate CPU mask\n");
432
        }
433
        numa_free_cpumask(bitMask);
434
    }
435
#else // NUMA not supported
436
0
    cpusPerNode[0] = getCpuCount();
437
0
#endif
438
439
0
    if (bNumaSupport && p->logLevel >= X265_LOG_DEBUG)
440
0
    for (int i = 0; i < numNumaNodes; i++)
441
0
        x265_log(p, X265_LOG_DEBUG, "detected NUMA node %d with %d logical cores\n", i, cpusPerNode[i]);
442
    /* limit threads based on param->numaPools
443
     * For windows because threads can't be allocated to live across sockets
444
     * changing the default behavior to be per-socket pools -- FIXME */
445
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA
446
    if (!strlen(p->numaPools) || (strcmp(p->numaPools, "NULL") == 0 || strcmp(p->numaPools, "*") == 0 || strcmp(p->numaPools, "") == 0))
447
    {
448
         char poolString[50] = "";
449
         for (int i = 0; i < numNumaNodes; i++)
450
         {
451
             char nextCount[10] = "";
452
             if (i)
453
                 snprintf(nextCount, sizeof(nextCount), ",%d", cpusPerNode[i]);
454
             else
455
                   snprintf(nextCount, sizeof(nextCount), "%d", cpusPerNode[i]);
456
             strcat(poolString, nextCount);
457
         }
458
         x265_param_parse(p, "pools", poolString);
459
     }
460
#endif
461
0
    if (strlen(p->numaPools))
462
0
    {
463
0
        const char *nodeStr = p->numaPools;
464
0
        for (int i = 0; i < numNumaNodes; i++)
465
0
        {
466
0
            if (!*nodeStr)
467
0
            {
468
0
                threadsPerPool[i] = 0;
469
0
                continue;
470
0
            }
471
0
            else if (*nodeStr == '-')
472
0
                threadsPerPool[i] = 0;
473
0
            else if (*nodeStr == '*' || !strcasecmp(nodeStr, "NULL"))
474
0
            {
475
0
                for (int j = i; j < numNumaNodes; j++)
476
0
                {
477
0
                    threadsPerPool[numNumaNodes] += cpusPerNode[j];
478
0
                    nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << j);
479
0
                }
480
0
                break;
481
0
            }
482
0
            else if (*nodeStr == '+')
483
0
            {
484
0
                threadsPerPool[numNumaNodes] += cpusPerNode[i];
485
0
                nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i);
486
0
            }
487
0
            else
488
0
            {
489
0
                int count = atoi(nodeStr);
490
0
                if (i > 0 || strchr(nodeStr, ','))   // it is comma -> old logic
491
0
                {
492
0
                    threadsPerPool[i] = X265_MIN(count, cpusPerNode[i]);
493
0
                    nodeMaskPerPool[i] = ((uint64_t)1 << i);
494
0
                }
495
0
                else                                 // new logic: exactly 'count' threads on all NUMAs
496
0
                {
497
0
                    threadsPerPool[numNumaNodes] = X265_MIN(count, numNumaNodes * MAX_POOL_THREADS);
498
0
                    nodeMaskPerPool[numNumaNodes] = ((uint64_t)-1 >> (64 - numNumaNodes));
499
0
                }
500
0
            }
501
502
            /* consume current node string, comma, and white-space */
503
0
            while (*nodeStr && *nodeStr != ',')
504
0
               ++nodeStr;
505
0
            if (*nodeStr == ',' || *nodeStr == ' ')
506
0
               ++nodeStr;
507
0
        }
508
0
    }
509
0
    else
510
0
    {
511
0
        for (int i = 0; i < numNumaNodes; i++)
512
0
        {
513
0
            threadsPerPool[numNumaNodes]  += cpusPerNode[i];
514
0
            nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i);
515
0
        }
516
0
    }
517
518
    /* If ThreadedME is enabled, split resources: give half NUMA nodes (or
519
     * half cores when NUMA not available) to threaded-me and the other half
520
     * to the remaining job providers. */
521
    /* Compute total CPU count from detected per-node counts when available */
522
0
    for (int i = 0; i < numNumaNodes + 1; i++)
523
0
        totalNumThreads += threadsPerPool[i];
524
0
    if (!totalNumThreads)
525
0
        totalNumThreads = ThreadPool::getCpuCount();
526
527
0
    int threadsFrameEnc = totalNumThreads;
528
0
    if (p->bThreadedME)
529
0
    {
530
0
        distributeThreadsForTme(p, totalNumThreads, numNumaNodes, bNumaSupport, threadsPerPool,
531
0
                                nodeMaskPerPool, numPools, threadsFrameEnc);
532
0
    }
533
 
534
    // If the last pool size is > MAX_POOL_THREADS, clip it to spawn thread pools only of size >= 1/2 max (heuristic)
535
0
    if ((threadsPerPool[numNumaNodes] > MAX_POOL_THREADS) &&
536
0
        ((threadsPerPool[numNumaNodes] % MAX_POOL_THREADS) < (MAX_POOL_THREADS / 2)))
537
0
    {
538
0
        threadsPerPool[numNumaNodes] -= (threadsPerPool[numNumaNodes] % MAX_POOL_THREADS);
539
0
        x265_log(p, X265_LOG_DEBUG,
540
0
                 "Creating only %d worker threads beyond specified numbers with --pools (if specified) to prevent asymmetry in pools; may not use all HW contexts\n", threadsPerPool[numNumaNodes]);
541
0
    }
542
543
0
    if (!p->bThreadedME)
544
0
    {
545
0
        numPools = 0;
546
0
        for (int i = 0; i < numNumaNodes + 1; i++)
547
0
        {
548
0
            if (bNumaSupport)
549
0
                x265_log(p, X265_LOG_DEBUG, "NUMA node %d may use %d logical cores\n", i, cpusPerNode[i]);
550
            
551
0
            if (threadsPerPool[i])
552
0
            {
553
0
                numPools += (threadsPerPool[i] + MAX_POOL_THREADS - 1) / MAX_POOL_THREADS;
554
0
                totalNumThreads += threadsPerPool[i];
555
0
            }
556
0
        }
557
0
    }
558
        
559
0
    if (!isThreadsReserved)
560
0
    {
561
0
        if (!numPools)
562
0
        {
563
0
            x265_log(p, X265_LOG_DEBUG, "No pool thread available. Deciding frame-threads based on detected CPU threads\n");
564
0
            totalNumThreads = ThreadPool::getCpuCount(); // auto-detect frame threads
565
0
        }
566
567
0
        if (!p->frameNumThreads)
568
0
            p->frameNumThreads = ThreadPool::getFrameThreadsCount(p, threadsFrameEnc);
569
0
    }
570
    
571
0
    if (!numPools)
572
0
        return NULL;
573
574
0
    if (numPools > p->frameNumThreads && !p->bThreadedME)
575
0
    {
576
0
        x265_log(p, X265_LOG_DEBUG, "Reducing number of thread pools for frame thread count\n");
577
0
        numPools = X265_MAX(p->frameNumThreads / 2, 1);
578
0
    }
579
0
    if (isThreadsReserved)
580
0
        numPools = 1;
581
0
    ThreadPool *pools = new ThreadPool[numPools];
582
0
    if (pools)
583
0
    {
584
0
        int poolCount = (p->bThreadedME) ? numPools - 1 : numPools;
585
0
        int node = 0;
586
0
        for (int i = 0; i < numPools; i++)
587
0
        {
588
0
            int maxProviders = (p->bThreadedME && i == 0) // threadpool 0 is dedicated to ThreadedME
589
0
                ? 1
590
0
                : (p->frameNumThreads + poolCount - 1) / poolCount + !isThreadsReserved; // +1 is Lookahead, always assigned to threadpool 0
591
            
592
0
            while (!threadsPerPool[node])
593
0
                node++;
594
0
            int numThreads = threadsPerPool[node];
595
0
            int origNumThreads = numThreads;
596
597
0
            if (i == 0 && p->lookaheadThreads > numThreads / 2)
598
0
            {
599
0
                p->lookaheadThreads = numThreads / 2;
600
0
                x265_log(p, X265_LOG_DEBUG, "Setting lookahead threads to a maximum of half the total number of threads\n");
601
0
            }
602
603
0
            if (isThreadsReserved)
604
0
            {
605
0
                numThreads = p->lookaheadThreads;
606
0
                maxProviders = 1;
607
0
            }
608
0
            else if (i == 0)
609
0
                numThreads -= p->lookaheadThreads;
610
611
0
            if (!p->bThreadedME)
612
0
                X265_CHECK(numThreads <= MAX_POOL_THREADS, "a single thread pool cannot have more than MAX_POOL_THREADS threads\n");
613
614
0
            if (!pools[i].create(numThreads, maxProviders, nodeMaskPerPool[node]))
615
0
            {
616
0
                delete[] pools;
617
0
                numPools = 0;
618
0
                return NULL;
619
0
            }
620
0
            if (numNumaNodes > 1)
621
0
            {
622
0
                char *nodesstr = new char[64 * strlen(",63") + 1];
623
0
                int len = 0;
624
0
                for (int j = 0; j < 64; j++)
625
0
                    if ((nodeMaskPerPool[node] >> j) & 1)
626
0
                        len += snprintf(nodesstr + len, sizeof(nodesstr) - len, ",%d", j);
627
0
                x265_log(p, X265_LOG_INFO, "Thread pool %d using %d threads on numa nodes %s\n", i, numThreads, nodesstr + 1);
628
0
                delete[] nodesstr;
629
0
            }
630
0
            else
631
0
                x265_log(p, X265_LOG_INFO, "Thread pool created using %d threads\n", numThreads);
632
0
            threadsPerPool[node] -= origNumThreads;
633
0
        }
634
0
    }
635
0
    else
636
0
        numPools = 0;
637
0
    return pools;
638
0
}
639
640
ThreadPool::ThreadPool()
641
0
{
642
0
    memset(this, 0, sizeof(*this));
643
0
}
644
645
bool ThreadPool::create(int numThreads, int maxProviders, uint64_t nodeMask)
646
0
{
647
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
648
    memset(&m_groupAffinity, 0, sizeof(GROUP_AFFINITY));
649
    for (int i = 0; i < getNumaNodeCount(); i++)
650
    {
651
        int numaNode = ((nodeMask >> i) & 0x1U) ? i : -1;
652
        if (numaNode != -1)
653
        if (GetNumaNodeProcessorMaskEx((USHORT)numaNode, &m_groupAffinity))
654
            break;
655
    }
656
    m_numaMask = &m_groupAffinity.Mask;
657
#elif HAVE_LIBNUMA
658
    if (numa_available() >= 0)
659
    {
660
        struct bitmask* nodemask = numa_allocate_nodemask();
661
        if (nodemask)
662
        {
663
            *(nodemask->maskp) = nodeMask;
664
            m_numaMask = nodemask;
665
        }
666
        else
667
            x265_log(NULL, X265_LOG_ERROR, "unable to get NUMA node mask for %lx\n", nodeMask);
668
    }
669
#else
670
0
    (void)nodeMask;
671
0
#endif
672
673
0
    m_numWorkers = numThreads;
674
675
0
    m_workers = X265_MALLOC(WorkerThread, numThreads);
676
    /* placement new initialization */
677
0
    if (m_workers)
678
0
        for (int i = 0; i < numThreads; i++)
679
0
            new (m_workers + i)WorkerThread(*this, i);
680
681
0
    m_jpTable = X265_MALLOC(JobProvider*, maxProviders);
682
0
    if (m_jpTable)
683
0
        memset(m_jpTable, 0, sizeof(JobProvider*) * maxProviders);
684
0
    m_numProviders = 0;
685
686
0
    return m_workers && m_jpTable;
687
0
}
688
689
bool ThreadPool::start()
690
0
{
691
0
    m_isActive = true;
692
0
    for (int i = 0; i < m_numWorkers; i++)
693
0
    {
694
0
        if (!m_workers[i].start())
695
0
        {
696
0
            m_isActive = false;
697
0
            return false;
698
0
        }
699
0
    }
700
0
    return true;
701
0
}
702
703
void ThreadPool::stopWorkers()
704
0
{
705
0
    if (m_workers)
706
0
    {
707
0
        m_isActive = false;
708
0
        for (int i = 0; i < m_numWorkers; i++)
709
0
        {
710
0
            while (!(m_sleepBitmap & ((sleepbitmap_t)1 << i)))
711
0
                GIVE_UP_TIME();
712
0
            m_workers[i].awaken();
713
0
            m_workers[i].stop();
714
0
        }
715
0
    }
716
0
}
717
718
ThreadPool::~ThreadPool()
719
0
{
720
0
    if (m_workers)
721
0
    {
722
0
        for (int i = 0; i < m_numWorkers; i++)
723
0
            m_workers[i].~WorkerThread();
724
0
    }
725
726
0
    X265_FREE(m_workers);
727
0
    X265_FREE(m_jpTable);
728
729
#if HAVE_LIBNUMA
730
    if(m_numaMask)
731
        numa_free_nodemask((struct bitmask*)m_numaMask);
732
#endif
733
0
}
734
735
void ThreadPool::setCurrentThreadAffinity()
736
0
{
737
0
    setThreadNodeAffinity(m_numaMask);
738
0
}
739
740
void ThreadPool::setThreadNodeAffinity(void *numaMask)
741
0
{
742
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
743
    UNREFERENCED_PARAMETER(numaMask);
744
    GROUP_AFFINITY groupAffinity;
745
    memset(&groupAffinity, 0, sizeof(GROUP_AFFINITY));
746
    groupAffinity.Group = m_groupAffinity.Group;
747
    groupAffinity.Mask = m_groupAffinity.Mask;
748
    const PGROUP_AFFINITY affinityPointer = &groupAffinity;
749
    if (SetThreadGroupAffinity(GetCurrentThread(), affinityPointer, NULL))
750
        return;
751
    else
752
        x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
753
#elif HAVE_LIBNUMA
754
    if (numa_available() >= 0)
755
    {
756
        numa_run_on_node_mask((struct bitmask*)numaMask);
757
        numa_set_interleave_mask((struct bitmask*)numaMask);
758
        numa_set_localalloc();
759
        return;
760
    }
761
    x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
762
#else
763
0
    (void)numaMask;
764
0
#endif
765
0
    return;
766
0
}
767
768
/* static */
769
int ThreadPool::getNumaNodeCount()
770
0
{
771
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
772
    ULONG num = 1;
773
    if (GetNumaHighestNodeNumber(&num))
774
        num++;
775
    return (int)num;
776
#elif HAVE_LIBNUMA
777
    if (numa_available() >= 0)
778
        return numa_max_node() + 1;
779
    else
780
        return 1;
781
#else
782
0
    return 1;
783
0
#endif
784
0
}
785
786
/* static */
787
int ThreadPool::getCpuCount()
788
0
{
789
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
790
    enum { MAX_NODE_NUM = 127 };
791
    int cpus = 0;
792
    int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
793
    GROUP_AFFINITY groupAffinity;
794
    for (int i = 0; i < numNumaNodes; i++)
795
    {
796
        GetNumaNodeProcessorMaskEx((UCHAR)i, &groupAffinity);
797
        cpus += popCount(groupAffinity.Mask);
798
    }
799
    return cpus;
800
#elif _WIN32
801
    SYSTEM_INFO sysinfo;
802
    GetSystemInfo(&sysinfo);
803
    return sysinfo.dwNumberOfProcessors;
804
#elif __unix__ && X265_ARCH_ARM
805
    /* Return the number of processors configured by OS. Because, most embedded linux distributions
806
     * uses only one processor as the scheduler doesn't have enough work to utilize all processors */
807
    return sysconf(_SC_NPROCESSORS_CONF);
808
#elif __unix__
809
0
    return sysconf(_SC_NPROCESSORS_ONLN);
810
#elif MACOS && __MACH__
811
    int nm[2];
812
    size_t len = 4;
813
    uint32_t count;
814
815
    nm[0] = CTL_HW;
816
    nm[1] = HW_AVAILCPU;
817
    sysctl(nm, 2, &count, &len, NULL, 0);
818
819
    if (count < 1)
820
    {
821
        nm[1] = HW_NCPU;
822
        sysctl(nm, 2, &count, &len, NULL, 0);
823
        if (count < 1)
824
            count = 1;
825
    }
826
827
    return count;
828
#else
829
    return 2; // default to 2 threads, everywhere else
830
#endif
831
0
}
832
833
int ThreadPool::getFrameThreadsCount(x265_param* p, int cpuCount)
834
0
{
835
0
    int rows = (p->sourceHeight + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize];
836
0
    if (!p->bEnableWavefront)
837
0
        return X265_MIN3(cpuCount, (rows + 1) / 2, X265_MAX_FRAME_THREADS);
838
0
    else if (cpuCount >= 32)
839
0
        return (p->sourceHeight > 2000) ? 6 : 5; 
840
0
    else if (cpuCount >= 16)
841
0
        return 4; 
842
0
    else if (cpuCount >= 8)
843
#if _WIN32 && X265_ARCH_ARM64
844
        return cpuCount;
845
#else
846
0
        return 3;
847
0
#endif
848
0
    else if (cpuCount >= 4)
849
0
        return 2;
850
0
    else
851
0
        return 1;
852
0
}
853
854
int ThreadPool::configureTmeThreadCount(x265_param* param, int cpuCount)
855
0
{
856
0
    enum TmeResClass
857
0
    {
858
0
        TME_RES_LOW = 0,
859
0
        TME_RES_MID,
860
0
        TME_RES_HIGH,
861
0
        TME_RES_COUNT
862
0
    };
863
864
0
    enum TmeRule
865
0
    {
866
0
        TME_RULE_SLOWER_VERYSLOW = 0,
867
0
        TME_RULE_SLOW,
868
0
        TME_RULE_FAST_MEDIUM,
869
0
        TME_RULE_FASTER,
870
0
        TME_RULE_VERYFAST,
871
0
        TME_RULE_SUPERFAST,
872
0
        TME_RULE_ULTRAFAST,
873
0
        TME_RULE_COUNT
874
0
    };
875
876
0
    struct TmeRuleConfig
877
0
    {
878
0
        int taskBlockSize[TME_RES_COUNT];
879
0
        int numBufferRows[TME_RES_COUNT];
880
0
        int threadPercent[TME_RES_COUNT];
881
0
        bool widthBasedTaskBlockSize;
882
0
    };
883
884
0
    static const TmeRuleConfig s_tmeRuleConfig[TME_RULE_COUNT] =
885
0
    {
886
0
        { { 1, 1, 1 }, { 3, 3, 3 }, { 90, 85, 70 }, false }, // slower / veryslow preset and similar options
887
0
        { { 1, 1, 1 }, { 5, 5, 3 }, { 90, 85, 75 }, false }, // slow preset and similar options
888
0
        { { 1, 1, 1 }, { 10, 10, 10 }, { 90, 80, 70 }, false }, // fast / medium preset and similar options
889
0
        { { 1, 1, 1 }, { 10, 15, 10 }, { 90, 80, 70 }, false }, // faster preset and similar options
890
0
        { { 1, 1, 1 }, { 10, 15, 20 }, { 90, 80, 70 }, false }, // veryfast preset and similar options
891
0
        { { 2, 4, 4 }, { 10, 15, 20 }, { 90, 80, 60 }, false }, // superfast preset and similar options
892
0
        { { 0, 0, 0 }, { 15, 20, 20 }, { 90, 80, 50 }, true  }  // ultrafast preset and similar options
893
0
    };
894
895
0
    const int resClass = (param->sourceHeight >= 1440) ? TME_RES_HIGH :
896
0
                         (param->sourceHeight <= 720) ? TME_RES_LOW : TME_RES_MID;
897
898
0
    const bool ruleMatches[TME_RULE_COUNT] =
899
0
    {
900
0
        param->maxNumReferences >= 5 || param->subpelRefine >= 4 || (param->bEnableRectInter && param->bEnableAMP),
901
0
        param->maxNumReferences >= 4 || param->subpelRefine >= 3 || (param->bEnableRectInter ^ param->bEnableAMP),
902
0
        param->maxNumReferences >= 3 && param->subpelRefine >= 2,
903
0
        param->maxNumReferences >= 2 && param->subpelRefine >= 2,
904
0
        param->subpelRefine >= 1 && param->bframes > 3,
905
0
        param->subpelRefine && param->maxCUSize < 64,
906
0
        !param->subpelRefine || param->searchMethod == X265_DIA_SEARCH || param->minCUSize >= 16
907
0
    };
908
909
0
    int selectedRule = -1;
910
0
    for (int i = 0; i < TME_RULE_COUNT; i++)
911
0
    {
912
0
        if (ruleMatches[i])
913
0
        {
914
0
            selectedRule = i;
915
0
            break;
916
0
        }
917
0
    }
918
919
0
    bool isHighFreq = (getCPUFrequencyMHz() > 2000.0);
920
921
0
    if (selectedRule >= 0)
922
0
    {
923
0
        const TmeRuleConfig& cfg = s_tmeRuleConfig[selectedRule];
924
0
        param->tmeTaskBlockSize = cfg.widthBasedTaskBlockSize ? ((param->sourceWidth + 480 - 1) / 480) : cfg.taskBlockSize[resClass];
925
0
        param->tmeNumBufferRows = cfg.numBufferRows[resClass];
926
0
        return (!isHighFreq) ? (cpuCount * cfg.threadPercent[resClass]) / 100 : cpuCount / 2;
927
0
    }
928
929
0
    static const int s_defaultThreadPercent[TME_RES_COUNT] = { 80, 80, 70 };
930
0
    return (!isHighFreq) ? (cpuCount * s_defaultThreadPercent[resClass]) / 100 : cpuCount / 2;
931
0
}
932
933
double getCPUFrequencyMHz()
934
0
{
935
#if defined(_WIN32)
936
    HKEY hKey;
937
    DWORD mhz = 0;
938
    DWORD size = sizeof(mhz);
939
    if (RegOpenKeyExA(HKEY_LOCAL_MACHINE,
940
                      "HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0",
941
                      0, KEY_READ, &hKey) == ERROR_SUCCESS)
942
    {
943
        RegQueryValueExA(hKey, "~MHz", NULL, NULL, (LPBYTE)&mhz, &size);
944
        RegCloseKey(hKey);
945
    }
946
    return (double)mhz;
947
948
#elif defined(__APPLE__)
949
    uint64_t freq = 0;
950
    size_t size = sizeof(freq);
951
    if (sysctlbyname("hw.cpufrequency", &freq, &size, NULL, 0) == 0)
952
        return (double)freq / 1.0e6;
953
    return 0.0;
954
955
#else  /* Linux */
956
    /* scaling_cur_freq reflects the live frequency chosen by the governor
957
     * and EPP hint. Iterate over all cpuN entries and return the highest observed value.
958
     */
959
0
    {
960
0
        uint64_t maxKhz = 0;
961
0
        char path[64];
962
0
        for (int cpu = 0; ; ++cpu)
963
0
        {
964
0
            snprintf(path, sizeof(path),
965
0
                     "/sys/devices/system/cpu/cpu%d/cpufreq/scaling_cur_freq", cpu);
966
0
            std::ifstream f(path);
967
0
            if (!f.is_open())
968
0
                break;
969
0
            uint64_t khz = 0;
970
0
            f >> khz;
971
0
            if (khz > maxKhz)
972
0
                maxKhz = khz;
973
0
        }
974
0
        if (maxKhz > 0)
975
0
            return (double)maxKhz / 1000.0;
976
0
    }
977
    /* Fall back to /proc/cpuinfo — collect the max "cpu MHz" across all entries. */
978
0
    {
979
0
        std::ifstream f("/proc/cpuinfo");
980
0
        std::string line;
981
0
        double maxMhz = 0.0;
982
0
        while (std::getline(f, line))
983
0
        {
984
0
            if (line.find("cpu MHz") != std::string::npos)
985
0
            {
986
0
                size_t colon = line.find(':');
987
0
                if (colon != std::string::npos)
988
0
                {
989
0
                    double mhz = strtod(line.c_str() + colon + 1, NULL);
990
0
                    if (mhz > maxMhz)
991
0
                        maxMhz = mhz;
992
0
                }
993
0
            }
994
0
        }
995
0
        if (maxMhz > 0.0)
996
0
            return maxMhz;
997
0
    }
998
0
    return 0.0;
999
0
#endif
1000
0
}
1001
1002
} // end namespace X265_NS