Coverage Report

Created: 2026-05-16 06:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/work/x265/source/common/threadpool.cpp
Line
Count
Source
1
/*****************************************************************************
2
 * Copyright (C) 2013-2020 MulticoreWare, Inc
3
 *
4
 * Authors: Steve Borho <steve@borho.org>
5
 *          Min Chen <chenm003@163.com>
6
 *
7
 * This program is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version.
11
 *
12
 * This program is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111, USA.
20
 *
21
 * This program is also available under a commercial proprietary license.
22
 * For more information, contact us at license @ x265.com
23
 *****************************************************************************/
24
25
#include "common.h"
26
#include "threadpool.h"
27
#include "threading.h"
28
29
#include <new>
30
#include <vector>
31
32
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
33
#include <winnt.h>
34
#endif
35
36
#if defined(__APPLE__)
37
#include <sys/sysctl.h>
38
#elif !defined(_WIN32)
39
#include <fstream>
40
#include <string>
41
#include <cstdio>
42
#include <cstdlib>
43
#endif
44
45
#if X86_64
46
47
#ifdef __GNUC__
48
49
0
#define SLEEPBITMAP_BSF(id, x)     (id) = ((unsigned long)__builtin_ctzll(x))
50
0
#define SLEEPBITMAP_OR(ptr, mask)  __sync_fetch_and_or(ptr, mask)
51
0
#define SLEEPBITMAP_AND(ptr, mask) __sync_fetch_and_and(ptr, mask)
52
53
#elif defined(_MSC_VER)
54
55
#define SLEEPBITMAP_BSF(id, x)     _BitScanForward64(&id, x)
56
#define SLEEPBITMAP_OR(ptr, mask)  InterlockedOr64((volatile LONG64*)ptr, (LONG)mask)
57
#define SLEEPBITMAP_AND(ptr, mask) InterlockedAnd64((volatile LONG64*)ptr, (LONG)mask)
58
59
#endif // ifdef __GNUC__
60
61
#else
62
63
/* use 32-bit primitives defined in threading.h */
64
#define SLEEPBITMAP_BSF BSF
65
#define SLEEPBITMAP_OR  ATOMIC_OR
66
#define SLEEPBITMAP_AND ATOMIC_AND
67
68
#endif
69
70
/* TODO FIX: Macro __MACH__ ideally should be part of MacOS definition, but adding to Cmake
71
   behaving is not as expected, need to fix this. */
72
73
#if MACOS && __MACH__
74
#include <sys/param.h>
75
#include <sys/sysctl.h>
76
#endif
77
#if HAVE_LIBNUMA
78
#include <numa.h>
79
#endif
80
#if defined(_MSC_VER)
81
# define strcasecmp _stricmp
82
#endif
83
84
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
85
const uint64_t m1 = 0x5555555555555555; //binary: 0101...
86
const uint64_t m2 = 0x3333333333333333; //binary: 00110011..
87
const uint64_t m3 = 0x0f0f0f0f0f0f0f0f; //binary:  4 zeros,  4 ones ...
88
const uint64_t h01 = 0x0101010101010101; //the sum of 256 to the power of 0,1,2,3...
89
90
static int popCount(uint64_t x)
91
{
92
    x -= (x >> 1) & m1;
93
    x = (x & m2) + ((x >> 2) & m2);
94
    x = (x + (x >> 4)) & m3;
95
    return (x * h01) >> 56;
96
}
97
#endif
98
99
namespace X265_NS {
100
// x265 private namespace
101
102
class WorkerThread : public Thread
103
{
104
private:
105
106
    ThreadPool&  m_pool;
107
    int          m_id;
108
    Event        m_wakeEvent;
109
110
    WorkerThread& operator =(const WorkerThread&);
111
112
public:
113
114
    JobProvider*     m_curJobProvider;
115
    BondedTaskGroup* m_bondMaster;
116
117
0
    WorkerThread(ThreadPool& pool, int id) : m_pool(pool), m_id(id) {}
118
0
    virtual ~WorkerThread() {}
119
120
    void threadMain();
121
0
    void awaken()           { m_wakeEvent.trigger(); }
122
};
123
124
void WorkerThread::threadMain()
125
0
{
126
0
    THREAD_NAME("Worker", m_id);
127
128
#if _WIN32
129
    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
130
#else
131
0
    __attribute__((unused)) int val = nice(10);
132
0
#endif
133
134
0
    m_pool.setCurrentThreadAffinity();
135
136
0
    sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id;
137
0
    m_curJobProvider = m_pool.m_jpTable[0];
138
0
    m_bondMaster = NULL;
139
140
0
    SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
141
0
    SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
142
0
    m_wakeEvent.wait();
143
144
0
    while (m_pool.m_isActive)
145
0
    {
146
0
        if (m_bondMaster)
147
0
        {
148
0
            m_bondMaster->processTasks(m_id);
149
0
            m_bondMaster->m_exitedPeerCount.incr();
150
0
            m_bondMaster = NULL;
151
0
        }
152
153
0
        do
154
0
        {
155
            /* do pending work for current job provider */
156
0
            m_curJobProvider->findJob(m_id);
157
158
            /* if the current job provider still wants help, only switch to a
159
             * higher priority provider (lower slice type). Else take the first
160
             * available job provider with the highest priority */
161
0
            int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
162
0
                                                                 INVALID_SLICE_PRIORITY + 1;
163
0
            int nextProvider = -1;
164
0
            for (int i = 0; i < m_pool.m_numProviders; i++)
165
0
            {
166
0
                if (m_pool.m_jpTable[i]->m_helpWanted &&
167
0
                    m_pool.m_jpTable[i]->m_sliceType < curPriority)
168
0
                {
169
0
                    nextProvider = i;
170
0
                    curPriority = m_pool.m_jpTable[i]->m_sliceType;
171
0
                }
172
0
            }
173
0
            if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
174
0
            {
175
0
                SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
176
0
                m_curJobProvider = m_pool.m_jpTable[nextProvider];
177
0
                SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
178
0
            }
179
0
        }
180
0
        while (m_curJobProvider->m_helpWanted);
181
182
        /* While the worker sleeps, a job-provider or bond-group may acquire this
183
         * worker's sleep bitmap bit. Once acquired, that thread may modify 
184
         * m_bondMaster or m_curJobProvider, then waken the thread */
185
0
        SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
186
0
        m_wakeEvent.wait();
187
0
    }
188
189
0
    SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
190
0
}
191
192
void JobProvider::tryWakeOne()
193
0
{
194
0
    int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
195
0
    if (id < 0)
196
0
    {
197
0
        m_helpWanted = true;
198
0
        return;
199
0
    }
200
201
0
    WorkerThread& worker = m_pool->m_workers[id];
202
0
    if (worker.m_curJobProvider != this) /* poaching */
203
0
    {
204
0
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
205
0
        SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
206
0
        worker.m_curJobProvider = this;
207
0
        SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
208
0
    }
209
0
    worker.awaken();
210
0
}
211
212
int ThreadPool::tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap)
213
0
{
214
0
    unsigned long id;
215
216
0
    sleepbitmap_t masked = m_sleepBitmap & firstTryBitmap;
217
0
    while (masked)
218
0
    {
219
0
        SLEEPBITMAP_BSF(id, masked);
220
221
0
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
222
0
        if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
223
0
            return (int)id;
224
225
0
        masked = m_sleepBitmap & firstTryBitmap;
226
0
    }
227
228
0
    masked = m_sleepBitmap & secondTryBitmap;
229
0
    while (masked)
230
0
    {
231
0
        SLEEPBITMAP_BSF(id, masked);
232
233
0
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
234
0
        if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
235
0
            return (int)id;
236
237
0
        masked = m_sleepBitmap & secondTryBitmap;
238
0
    }
239
240
0
    return -1;
241
0
}
242
243
int ThreadPool::tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master)
244
0
{
245
0
    int bondCount = 0;
246
0
    do
247
0
    {
248
0
        int id = tryAcquireSleepingThread(peerBitmap, 0);
249
0
        if (id < 0)
250
0
            return bondCount;
251
252
0
        m_workers[id].m_bondMaster = &master;
253
0
        m_workers[id].awaken();
254
0
        bondCount++;
255
0
    }
256
0
    while (bondCount < maxPeers);
257
258
0
    return bondCount;
259
0
}
260
261
/* Distributes totalNumThreads between ThreadedME and FrameEncoder pools.
262
 * Modifies threadsPerPool[], nodeMaskPerPool[], numNumaNodes, and numPools in-place.
263
 * Returns the number of threads reserved for frame encoding. */
264
static void distributeThreadsForTme(
265
    x265_param* p,
266
    int totalNumThreads,
267
    int& numNumaNodes,
268
    bool bNumaSupport,
269
    int* threadsPerPool,
270
    uint64_t* nodeMaskPerPool,
271
    int& numPools,
272
    int& threadsFrameEnc)
273
0
{
274
0
    if (totalNumThreads < MIN_TME_THREADS)
275
0
    {
276
0
        x265_log(p, X265_LOG_WARNING, "Low thread count detected, disabling --threaded-me."
277
0
            " Minimum recommended is 32 cores / threads\n");
278
0
        p->bThreadedME = 0;
279
0
        return;
280
0
    }
281
282
0
    int targetTME = ThreadPool::configureTmeThreadCount(p, totalNumThreads);
283
0
    targetTME = (targetTME < 1) ? 1 : targetTME;
284
285
0
    threadsFrameEnc = totalNumThreads - targetTME;
286
0
    int defaultNumFT = ThreadPool::getFrameThreadsCount(p, totalNumThreads);
287
0
    if (threadsFrameEnc < defaultNumFT)
288
0
    {
289
0
        threadsFrameEnc = defaultNumFT;
290
0
        targetTME = totalNumThreads - threadsFrameEnc;
291
0
    }
292
293
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA
294
    if (bNumaSupport && numNumaNodes > 1)
295
    {
296
        int tmeNumaNodes = 0;
297
        int leftover = 0;
298
299
        // First thread pool belongs to ThreadedME
300
        std::vector<int> threads(1, 0);
301
        std::vector<uint64_t> nodeMasks(1, 0);
302
        int poolIndex = 0;
303
304
        /* Greedily assign whole NUMA nodes to TME until reaching or exceeding the target */
305
        for (int i = 0; i < numNumaNodes + 1; i++)
306
        {
307
            if (!threadsPerPool[i] && !nodeMaskPerPool[i])
308
                continue;
309
310
            int toTake = X265_MIN(threadsPerPool[i], targetTME - threads[0]);
311
            if (toTake > 0)
312
            {
313
                threads[poolIndex] += toTake;
314
                nodeMasks[poolIndex] |= nodeMaskPerPool[i];
315
                tmeNumaNodes++;
316
317
                if (threads[0] == targetTME)
318
                    poolIndex++;
319
320
                if (toTake < threadsPerPool[i])
321
                    leftover = threadsPerPool[i] - toTake;
322
            }
323
            else
324
            {
325
                threads.push_back(threadsPerPool[i]);
326
                nodeMasks.push_back(nodeMaskPerPool[i]);
327
                poolIndex++;
328
            }
329
        }
330
331
        // Distribute leftover threads among FrameEncoders
332
        if (leftover)
333
        {
334
            // Case 1: There are 1 or more threadpools for FrameEncoder(s) by now
335
            if (threads.size() > 1)
336
            {
337
                int split = static_cast<int>(static_cast<double>(leftover) / (numNumaNodes - 1));
338
                for (int pool = 1; pool < numNumaNodes; pool++)
339
                {
340
                    int give = X265_MIN(split, leftover);
341
                    threads[pool] += give;
342
                    leftover -= give;
343
                }
344
            }
345
346
            // Case 2: FrameEncoder(s) haven't received threads yet
347
            if (threads.size() == 1)
348
            {
349
                threads.push_back(leftover);
350
                // Give the same node mask as the last node of ThreadedME
351
                uint64_t msb = 1;
352
                uint64_t tmeNodeMask = nodeMasks[0];
353
                while (tmeNodeMask > 1)
354
                {
355
                    tmeNodeMask >>= 1;
356
                    msb <<= 1;
357
                }
358
                nodeMasks.push_back(msb);
359
            }
360
        }
361
362
        // Apply calculated threadpool assignment
363
        memset(threadsPerPool, 0, sizeof(int) * (numNumaNodes + 2));
364
        memset(nodeMaskPerPool, 0, sizeof(uint64_t) * (numNumaNodes + 2));
365
366
        numPools = numNumaNodes = static_cast<int>(threads.size());
367
        for (int pool = 0; pool < numPools; pool++)
368
        {
369
            threadsPerPool[pool] = threads[pool];
370
            nodeMaskPerPool[pool] = nodeMasks[pool];
371
        }
372
    }
373
    else
374
#endif
375
0
    {
376
0
        memset(threadsPerPool, 0, sizeof(int) * (numNumaNodes + 2));
377
0
        memset(nodeMaskPerPool, 0, sizeof(uint64_t) * (numNumaNodes + 2));
378
379
0
        threadsPerPool[0] = targetTME;
380
0
        nodeMaskPerPool[0] = 1;
381
382
0
        threadsPerPool[1] = threadsFrameEnc;
383
0
        nodeMaskPerPool[1] = 1;
384
385
0
        numPools = 2;
386
0
    }
387
0
}
388
389
ThreadPool* ThreadPool::allocThreadPools(x265_param* p, int& numPools, bool isThreadsReserved)
390
0
{
391
0
    enum { MAX_NODE_NUM = 127 };
392
0
    int cpusPerNode[MAX_NODE_NUM + 1];
393
0
    int threadsPerPool[MAX_NODE_NUM + 2];
394
0
    uint64_t nodeMaskPerPool[MAX_NODE_NUM + 2];
395
0
    int totalNumThreads = 0;
396
397
0
    memset(cpusPerNode, 0, sizeof(cpusPerNode));
398
0
    memset(threadsPerPool, 0, sizeof(threadsPerPool));
399
0
    memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool));
400
401
0
    int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
402
0
    bool bNumaSupport = false;
403
404
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
405
    bNumaSupport = true;
406
#elif HAVE_LIBNUMA
407
    bNumaSupport = numa_available() >= 0;
408
#endif
409
410
411
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
412
    PGROUP_AFFINITY groupAffinityPointer = new GROUP_AFFINITY;
413
    for (int i = 0; i < numNumaNodes; i++)
414
    {
415
        GetNumaNodeProcessorMaskEx((UCHAR)i, groupAffinityPointer);
416
        cpusPerNode[i] = popCount(groupAffinityPointer->Mask);
417
    }
418
    delete groupAffinityPointer;
419
#elif HAVE_LIBNUMA
420
    if (bNumaSupport)
421
    {
422
        struct bitmask* bitMask = numa_allocate_cpumask();
423
        for (int i = 0; i < numNumaNodes; i++)
424
        {
425
            int ret = numa_node_to_cpus(i, bitMask);
426
            if (!ret)
427
                cpusPerNode[i] = numa_bitmask_weight(bitMask);
428
            else
429
                x265_log(p, X265_LOG_ERROR, "Failed to genrate CPU mask\n");
430
        }
431
        numa_free_cpumask(bitMask);
432
    }
433
#else // NUMA not supported
434
0
    cpusPerNode[0] = getCpuCount();
435
0
#endif
436
437
0
    if (bNumaSupport && p->logLevel >= X265_LOG_DEBUG)
438
0
    for (int i = 0; i < numNumaNodes; i++)
439
0
        x265_log(p, X265_LOG_DEBUG, "detected NUMA node %d with %d logical cores\n", i, cpusPerNode[i]);
440
    /* limit threads based on param->numaPools
441
     * For windows because threads can't be allocated to live across sockets
442
     * changing the default behavior to be per-socket pools -- FIXME */
443
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA
444
    if (!strlen(p->numaPools) || (strcmp(p->numaPools, "NULL") == 0 || strcmp(p->numaPools, "*") == 0 || strcmp(p->numaPools, "") == 0))
445
    {
446
         char poolString[50] = "";
447
         for (int i = 0; i < numNumaNodes; i++)
448
         {
449
             char nextCount[10] = "";
450
             if (i)
451
                 snprintf(nextCount, sizeof(nextCount), ",%d", cpusPerNode[i]);
452
             else
453
                   snprintf(nextCount, sizeof(nextCount), "%d", cpusPerNode[i]);
454
             strcat(poolString, nextCount);
455
         }
456
         x265_param_parse(p, "pools", poolString);
457
     }
458
#endif
459
0
    if (strlen(p->numaPools))
460
0
    {
461
0
        const char *nodeStr = p->numaPools;
462
0
        for (int i = 0; i < numNumaNodes; i++)
463
0
        {
464
0
            if (!*nodeStr)
465
0
            {
466
0
                threadsPerPool[i] = 0;
467
0
                continue;
468
0
            }
469
0
            else if (*nodeStr == '-')
470
0
                threadsPerPool[i] = 0;
471
0
            else if (*nodeStr == '*' || !strcasecmp(nodeStr, "NULL"))
472
0
            {
473
0
                for (int j = i; j < numNumaNodes; j++)
474
0
                {
475
0
                    threadsPerPool[numNumaNodes] += cpusPerNode[j];
476
0
                    nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << j);
477
0
                }
478
0
                break;
479
0
            }
480
0
            else if (*nodeStr == '+')
481
0
            {
482
0
                threadsPerPool[numNumaNodes] += cpusPerNode[i];
483
0
                nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i);
484
0
            }
485
0
            else
486
0
            {
487
0
                int count = atoi(nodeStr);
488
0
                if (i > 0 || strchr(nodeStr, ','))   // it is comma -> old logic
489
0
                {
490
0
                    threadsPerPool[i] = X265_MIN(count, cpusPerNode[i]);
491
0
                    nodeMaskPerPool[i] = ((uint64_t)1 << i);
492
0
                }
493
0
                else                                 // new logic: exactly 'count' threads on all NUMAs
494
0
                {
495
0
                    threadsPerPool[numNumaNodes] = X265_MIN(count, numNumaNodes * MAX_POOL_THREADS);
496
0
                    nodeMaskPerPool[numNumaNodes] = ((uint64_t)-1 >> (64 - numNumaNodes));
497
0
                }
498
0
            }
499
500
            /* consume current node string, comma, and white-space */
501
0
            while (*nodeStr && *nodeStr != ',')
502
0
               ++nodeStr;
503
0
            if (*nodeStr == ',' || *nodeStr == ' ')
504
0
               ++nodeStr;
505
0
        }
506
0
    }
507
0
    else
508
0
    {
509
0
        for (int i = 0; i < numNumaNodes; i++)
510
0
        {
511
0
            threadsPerPool[numNumaNodes]  += cpusPerNode[i];
512
0
            nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i);
513
0
        }
514
0
    }
515
516
    /* If ThreadedME is enabled, split resources: give half NUMA nodes (or
517
     * half cores when NUMA not available) to threaded-me and the other half
518
     * to the remaining job providers. */
519
    /* Compute total CPU count from detected per-node counts when available */
520
0
    for (int i = 0; i < numNumaNodes + 1; i++)
521
0
        totalNumThreads += threadsPerPool[i];
522
0
    if (!totalNumThreads)
523
0
        totalNumThreads = ThreadPool::getCpuCount();
524
525
0
    int threadsFrameEnc = totalNumThreads;
526
0
    if (p->bThreadedME)
527
0
    {
528
0
        distributeThreadsForTme(p, totalNumThreads, numNumaNodes, bNumaSupport, threadsPerPool,
529
0
                                nodeMaskPerPool, numPools, threadsFrameEnc);
530
0
    }
531
 
532
    // If the last pool size is > MAX_POOL_THREADS, clip it to spawn thread pools only of size >= 1/2 max (heuristic)
533
0
    if ((threadsPerPool[numNumaNodes] > MAX_POOL_THREADS) &&
534
0
        ((threadsPerPool[numNumaNodes] % MAX_POOL_THREADS) < (MAX_POOL_THREADS / 2)))
535
0
    {
536
0
        threadsPerPool[numNumaNodes] -= (threadsPerPool[numNumaNodes] % MAX_POOL_THREADS);
537
0
        x265_log(p, X265_LOG_DEBUG,
538
0
                 "Creating only %d worker threads beyond specified numbers with --pools (if specified) to prevent asymmetry in pools; may not use all HW contexts\n", threadsPerPool[numNumaNodes]);
539
0
    }
540
541
0
    if (!p->bThreadedME)
542
0
    {
543
0
        numPools = 0;
544
0
        for (int i = 0; i < numNumaNodes + 1; i++)
545
0
        {
546
0
            if (bNumaSupport)
547
0
                x265_log(p, X265_LOG_DEBUG, "NUMA node %d may use %d logical cores\n", i, cpusPerNode[i]);
548
            
549
0
            if (threadsPerPool[i])
550
0
            {
551
0
                numPools += (threadsPerPool[i] + MAX_POOL_THREADS - 1) / MAX_POOL_THREADS;
552
0
                totalNumThreads += threadsPerPool[i];
553
0
            }
554
0
        }
555
0
    }
556
        
557
0
    if (!isThreadsReserved)
558
0
    {
559
0
        if (!numPools)
560
0
        {
561
0
            x265_log(p, X265_LOG_DEBUG, "No pool thread available. Deciding frame-threads based on detected CPU threads\n");
562
0
            totalNumThreads = ThreadPool::getCpuCount(); // auto-detect frame threads
563
0
        }
564
565
0
        if (!p->frameNumThreads)
566
0
            p->frameNumThreads = ThreadPool::getFrameThreadsCount(p, threadsFrameEnc);
567
0
    }
568
    
569
0
    if (!numPools)
570
0
        return NULL;
571
572
0
    if (numPools > p->frameNumThreads && !p->bThreadedME)
573
0
    {
574
0
        x265_log(p, X265_LOG_DEBUG, "Reducing number of thread pools for frame thread count\n");
575
0
        numPools = X265_MAX(p->frameNumThreads / 2, 1);
576
0
    }
577
0
    if (isThreadsReserved)
578
0
        numPools = 1;
579
0
    ThreadPool *pools = new ThreadPool[numPools];
580
0
    if (pools)
581
0
    {
582
0
        int poolCount = (p->bThreadedME) ? numPools - 1 : numPools;
583
0
        int node = 0;
584
0
        for (int i = 0; i < numPools; i++)
585
0
        {
586
0
            int maxProviders = (p->bThreadedME && i == 0) // threadpool 0 is dedicated to ThreadedME
587
0
                ? 1
588
0
                : (p->frameNumThreads + poolCount - 1) / poolCount + !isThreadsReserved; // +1 is Lookahead, always assigned to threadpool 0
589
            
590
0
            while (!threadsPerPool[node])
591
0
                node++;
592
0
            int numThreads = threadsPerPool[node];
593
0
            int origNumThreads = numThreads;
594
595
0
            if (i == 0 && p->lookaheadThreads > numThreads / 2)
596
0
            {
597
0
                p->lookaheadThreads = numThreads / 2;
598
0
                x265_log(p, X265_LOG_DEBUG, "Setting lookahead threads to a maximum of half the total number of threads\n");
599
0
            }
600
601
0
            if (isThreadsReserved)
602
0
            {
603
0
                numThreads = p->lookaheadThreads;
604
0
                maxProviders = 1;
605
0
            }
606
0
            else if (i == 0)
607
0
                numThreads -= p->lookaheadThreads;
608
            
609
0
            if (!pools[i].create(numThreads, maxProviders, nodeMaskPerPool[node]))
610
0
            {
611
0
                delete[] pools;
612
0
                numPools = 0;
613
0
                return NULL;
614
0
            }
615
0
            if (numNumaNodes > 1)
616
0
            {
617
0
                char *nodesstr = new char[64 * strlen(",63") + 1];
618
0
                int len = 0;
619
0
                for (int j = 0; j < 64; j++)
620
0
                    if ((nodeMaskPerPool[node] >> j) & 1)
621
0
                        len += snprintf(nodesstr + len, sizeof(nodesstr) - len, ",%d", j);
622
0
                x265_log(p, X265_LOG_INFO, "Thread pool %d using %d threads on numa nodes %s\n", i, numThreads, nodesstr + 1);
623
0
                delete[] nodesstr;
624
0
            }
625
0
            else
626
0
                x265_log(p, X265_LOG_INFO, "Thread pool created using %d threads\n", numThreads);
627
0
            threadsPerPool[node] -= origNumThreads;
628
0
        }
629
0
    }
630
0
    else
631
0
        numPools = 0;
632
0
    return pools;
633
0
}
634
635
ThreadPool::ThreadPool()
636
0
{
637
0
    memset(this, 0, sizeof(*this));
638
0
}
639
640
bool ThreadPool::create(int numThreads, int maxProviders, uint64_t nodeMask)
641
0
{
642
0
    X265_CHECK(numThreads <= MAX_POOL_THREADS, "a single thread pool cannot have more than MAX_POOL_THREADS threads\n");
643
644
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
645
    memset(&m_groupAffinity, 0, sizeof(GROUP_AFFINITY));
646
    for (int i = 0; i < getNumaNodeCount(); i++)
647
    {
648
        int numaNode = ((nodeMask >> i) & 0x1U) ? i : -1;
649
        if (numaNode != -1)
650
        if (GetNumaNodeProcessorMaskEx((USHORT)numaNode, &m_groupAffinity))
651
            break;
652
    }
653
    m_numaMask = &m_groupAffinity.Mask;
654
#elif HAVE_LIBNUMA
655
    if (numa_available() >= 0)
656
    {
657
        struct bitmask* nodemask = numa_allocate_nodemask();
658
        if (nodemask)
659
        {
660
            *(nodemask->maskp) = nodeMask;
661
            m_numaMask = nodemask;
662
        }
663
        else
664
            x265_log(NULL, X265_LOG_ERROR, "unable to get NUMA node mask for %lx\n", nodeMask);
665
    }
666
#else
667
0
    (void)nodeMask;
668
0
#endif
669
670
0
    m_numWorkers = numThreads;
671
672
0
    m_workers = X265_MALLOC(WorkerThread, numThreads);
673
    /* placement new initialization */
674
0
    if (m_workers)
675
0
        for (int i = 0; i < numThreads; i++)
676
0
            new (m_workers + i)WorkerThread(*this, i);
677
678
0
    m_jpTable = X265_MALLOC(JobProvider*, maxProviders);
679
0
    if (m_jpTable)
680
0
        memset(m_jpTable, 0, sizeof(JobProvider*) * maxProviders);
681
0
    m_numProviders = 0;
682
683
0
    return m_workers && m_jpTable;
684
0
}
685
686
bool ThreadPool::start()
687
0
{
688
0
    m_isActive = true;
689
0
    for (int i = 0; i < m_numWorkers; i++)
690
0
    {
691
0
        if (!m_workers[i].start())
692
0
        {
693
0
            m_isActive = false;
694
0
            return false;
695
0
        }
696
0
    }
697
0
    return true;
698
0
}
699
700
void ThreadPool::stopWorkers()
701
0
{
702
0
    if (m_workers)
703
0
    {
704
0
        m_isActive = false;
705
0
        for (int i = 0; i < m_numWorkers; i++)
706
0
        {
707
0
            while (!(m_sleepBitmap & ((sleepbitmap_t)1 << i)))
708
0
                GIVE_UP_TIME();
709
0
            m_workers[i].awaken();
710
0
            m_workers[i].stop();
711
0
        }
712
0
    }
713
0
}
714
715
ThreadPool::~ThreadPool()
716
0
{
717
0
    if (m_workers)
718
0
    {
719
0
        for (int i = 0; i < m_numWorkers; i++)
720
0
            m_workers[i].~WorkerThread();
721
0
    }
722
723
0
    X265_FREE(m_workers);
724
0
    X265_FREE(m_jpTable);
725
726
#if HAVE_LIBNUMA
727
    if(m_numaMask)
728
        numa_free_nodemask((struct bitmask*)m_numaMask);
729
#endif
730
0
}
731
732
void ThreadPool::setCurrentThreadAffinity()
733
0
{
734
0
    setThreadNodeAffinity(m_numaMask);
735
0
}
736
737
void ThreadPool::setThreadNodeAffinity(void *numaMask)
738
0
{
739
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
740
    UNREFERENCED_PARAMETER(numaMask);
741
    GROUP_AFFINITY groupAffinity;
742
    memset(&groupAffinity, 0, sizeof(GROUP_AFFINITY));
743
    groupAffinity.Group = m_groupAffinity.Group;
744
    groupAffinity.Mask = m_groupAffinity.Mask;
745
    const PGROUP_AFFINITY affinityPointer = &groupAffinity;
746
    if (SetThreadGroupAffinity(GetCurrentThread(), affinityPointer, NULL))
747
        return;
748
    else
749
        x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
750
#elif HAVE_LIBNUMA
751
    if (numa_available() >= 0)
752
    {
753
        numa_run_on_node_mask((struct bitmask*)numaMask);
754
        numa_set_interleave_mask((struct bitmask*)numaMask);
755
        numa_set_localalloc();
756
        return;
757
    }
758
    x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
759
#else
760
0
    (void)numaMask;
761
0
#endif
762
0
    return;
763
0
}
764
765
/* static */
766
int ThreadPool::getNumaNodeCount()
767
0
{
768
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
769
    ULONG num = 1;
770
    if (GetNumaHighestNodeNumber(&num))
771
        num++;
772
    return (int)num;
773
#elif HAVE_LIBNUMA
774
    if (numa_available() >= 0)
775
        return numa_max_node() + 1;
776
    else
777
        return 1;
778
#else
779
0
    return 1;
780
0
#endif
781
0
}
782
783
/* static */
784
int ThreadPool::getCpuCount()
785
0
{
786
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
787
    enum { MAX_NODE_NUM = 127 };
788
    int cpus = 0;
789
    int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
790
    GROUP_AFFINITY groupAffinity;
791
    for (int i = 0; i < numNumaNodes; i++)
792
    {
793
        GetNumaNodeProcessorMaskEx((UCHAR)i, &groupAffinity);
794
        cpus += popCount(groupAffinity.Mask);
795
    }
796
    return cpus;
797
#elif _WIN32
798
    SYSTEM_INFO sysinfo;
799
    GetSystemInfo(&sysinfo);
800
    return sysinfo.dwNumberOfProcessors;
801
#elif __unix__ && X265_ARCH_ARM
802
    /* Return the number of processors configured by OS. Because, most embedded linux distributions
803
     * uses only one processor as the scheduler doesn't have enough work to utilize all processors */
804
    return sysconf(_SC_NPROCESSORS_CONF);
805
#elif __unix__
806
0
    return sysconf(_SC_NPROCESSORS_ONLN);
807
#elif MACOS && __MACH__
808
    int nm[2];
809
    size_t len = 4;
810
    uint32_t count;
811
812
    nm[0] = CTL_HW;
813
    nm[1] = HW_AVAILCPU;
814
    sysctl(nm, 2, &count, &len, NULL, 0);
815
816
    if (count < 1)
817
    {
818
        nm[1] = HW_NCPU;
819
        sysctl(nm, 2, &count, &len, NULL, 0);
820
        if (count < 1)
821
            count = 1;
822
    }
823
824
    return count;
825
#else
826
    return 2; // default to 2 threads, everywhere else
827
#endif
828
0
}
829
830
int ThreadPool::getFrameThreadsCount(x265_param* p, int cpuCount)
831
0
{
832
0
    int rows = (p->sourceHeight + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize];
833
0
    if (!p->bEnableWavefront)
834
0
        return X265_MIN3(cpuCount, (rows + 1) / 2, X265_MAX_FRAME_THREADS);
835
0
    else if (cpuCount >= 32)
836
0
        return (p->sourceHeight > 2000) ? 6 : 5; 
837
0
    else if (cpuCount >= 16)
838
0
        return 4; 
839
0
    else if (cpuCount >= 8)
840
#if _WIN32 && X265_ARCH_ARM64
841
        return cpuCount;
842
#else
843
0
        return 3;
844
0
#endif
845
0
    else if (cpuCount >= 4)
846
0
        return 2;
847
0
    else
848
0
        return 1;
849
0
}
850
851
int ThreadPool::configureTmeThreadCount(x265_param* param, int cpuCount)
852
0
{
853
0
    enum TmeResClass
854
0
    {
855
0
        TME_RES_LOW = 0,
856
0
        TME_RES_MID,
857
0
        TME_RES_HIGH,
858
0
        TME_RES_COUNT
859
0
    };
860
861
0
    enum TmeRule
862
0
    {
863
0
        TME_RULE_SLOWER_VERYSLOW = 0,
864
0
        TME_RULE_SLOW,
865
0
        TME_RULE_FAST_MEDIUM,
866
0
        TME_RULE_FASTER,
867
0
        TME_RULE_VERYFAST,
868
0
        TME_RULE_SUPERFAST,
869
0
        TME_RULE_ULTRAFAST,
870
0
        TME_RULE_COUNT
871
0
    };
872
873
0
    struct TmeRuleConfig
874
0
    {
875
0
        int taskBlockSize[TME_RES_COUNT];
876
0
        int numBufferRows[TME_RES_COUNT];
877
0
        int threadPercent[TME_RES_COUNT];
878
0
        bool widthBasedTaskBlockSize;
879
0
    };
880
881
0
    static const TmeRuleConfig s_tmeRuleConfig[TME_RULE_COUNT] =
882
0
    {
883
0
        { { 1, 1, 1 }, { 3, 3, 3 }, { 90, 85, 70 }, false }, // slower / veryslow preset and similar options
884
0
        { { 1, 1, 1 }, { 5, 5, 3 }, { 90, 85, 75 }, false }, // slow preset and similar options
885
0
        { { 1, 1, 1 }, { 10, 10, 10 }, { 90, 80, 70 }, false }, // fast / medium preset and similar options
886
0
        { { 1, 1, 1 }, { 10, 15, 10 }, { 90, 80, 70 }, false }, // faster preset and similar options
887
0
        { { 1, 1, 1 }, { 10, 15, 20 }, { 90, 80, 70 }, false }, // veryfast preset and similar options
888
0
        { { 2, 4, 4 }, { 10, 15, 20 }, { 90, 80, 60 }, false }, // superfast preset and similar options
889
0
        { { 0, 0, 0 }, { 15, 20, 20 }, { 90, 80, 50 }, true  }  // ultrafast preset and similar options
890
0
    };
891
892
0
    const int resClass = (param->sourceHeight >= 1440) ? TME_RES_HIGH :
893
0
                         (param->sourceHeight <= 720) ? TME_RES_LOW : TME_RES_MID;
894
895
0
    const bool ruleMatches[TME_RULE_COUNT] =
896
0
    {
897
0
        param->maxNumReferences >= 5 || param->subpelRefine >= 4 || (param->bEnableRectInter && param->bEnableAMP),
898
0
        param->maxNumReferences >= 4 || param->subpelRefine >= 3 || (param->bEnableRectInter ^ param->bEnableAMP),
899
0
        param->maxNumReferences >= 3 && param->subpelRefine >= 2,
900
0
        param->maxNumReferences >= 2 && param->subpelRefine >= 2,
901
0
        param->subpelRefine >= 1 && param->bframes > 3,
902
0
        param->subpelRefine && param->maxCUSize < 64,
903
0
        !param->subpelRefine || param->searchMethod == X265_DIA_SEARCH || param->minCUSize >= 16
904
0
    };
905
906
0
    int selectedRule = -1;
907
0
    for (int i = 0; i < TME_RULE_COUNT; i++)
908
0
    {
909
0
        if (ruleMatches[i])
910
0
        {
911
0
            selectedRule = i;
912
0
            break;
913
0
        }
914
0
    }
915
916
0
    bool isHighFreq = (getCPUFrequencyMHz() > 2000.0);
917
918
0
    if (selectedRule >= 0)
919
0
    {
920
0
        const TmeRuleConfig& cfg = s_tmeRuleConfig[selectedRule];
921
0
        param->tmeTaskBlockSize = cfg.widthBasedTaskBlockSize ? ((param->sourceWidth + 480 - 1) / 480) : cfg.taskBlockSize[resClass];
922
0
        param->tmeNumBufferRows = cfg.numBufferRows[resClass];
923
0
        return (!isHighFreq) ? (cpuCount * cfg.threadPercent[resClass]) / 100 : cpuCount / 2;
924
0
    }
925
926
0
    static const int s_defaultThreadPercent[TME_RES_COUNT] = { 80, 80, 70 };
927
0
    return (!isHighFreq) ? (cpuCount * s_defaultThreadPercent[resClass]) / 100 : cpuCount / 2;
928
0
}
929
930
double getCPUFrequencyMHz()
931
0
{
932
#if defined(_WIN32)
933
    HKEY hKey;
934
    DWORD mhz = 0;
935
    DWORD size = sizeof(mhz);
936
    if (RegOpenKeyExA(HKEY_LOCAL_MACHINE,
937
                      "HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0",
938
                      0, KEY_READ, &hKey) == ERROR_SUCCESS)
939
    {
940
        RegQueryValueExA(hKey, "~MHz", NULL, NULL, (LPBYTE)&mhz, &size);
941
        RegCloseKey(hKey);
942
    }
943
    return (double)mhz;
944
945
#elif defined(__APPLE__)
946
    uint64_t freq = 0;
947
    size_t size = sizeof(freq);
948
    if (sysctlbyname("hw.cpufrequency", &freq, &size, NULL, 0) == 0)
949
        return (double)freq / 1.0e6;
950
    return 0.0;
951
952
#else  /* Linux */
953
    /* scaling_cur_freq reflects the live frequency chosen by the governor
954
     * and EPP hint. Iterate over all cpuN entries and return the highest observed value.
955
     */
956
0
    {
957
0
        uint64_t maxKhz = 0;
958
0
        char path[64];
959
0
        for (int cpu = 0; ; ++cpu)
960
0
        {
961
0
            snprintf(path, sizeof(path),
962
0
                     "/sys/devices/system/cpu/cpu%d/cpufreq/scaling_cur_freq", cpu);
963
0
            std::ifstream f(path);
964
0
            if (!f.is_open())
965
0
                break;
966
0
            uint64_t khz = 0;
967
0
            f >> khz;
968
0
            if (khz > maxKhz)
969
0
                maxKhz = khz;
970
0
        }
971
0
        if (maxKhz > 0)
972
0
            return (double)maxKhz / 1000.0;
973
0
    }
974
    /* Fall back to /proc/cpuinfo — collect the max "cpu MHz" across all entries. */
975
0
    {
976
0
        std::ifstream f("/proc/cpuinfo");
977
0
        std::string line;
978
0
        double maxMhz = 0.0;
979
0
        while (std::getline(f, line))
980
0
        {
981
0
            if (line.find("cpu MHz") != std::string::npos)
982
0
            {
983
0
                size_t colon = line.find(':');
984
0
                if (colon != std::string::npos)
985
0
                {
986
0
                    double mhz = strtod(line.c_str() + colon + 1, NULL);
987
0
                    if (mhz > maxMhz)
988
0
                        maxMhz = mhz;
989
0
                }
990
0
            }
991
0
        }
992
0
        if (maxMhz > 0.0)
993
0
            return maxMhz;
994
0
    }
995
0
    return 0.0;
996
0
#endif
997
0
}
998
999
} // end namespace X265_NS