Coverage Report

Created: 2026-04-01 07:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/x265/source/common/threadpool.cpp
Line
Count
Source
1
/*****************************************************************************
2
 * Copyright (C) 2013-2020 MulticoreWare, Inc
3
 *
4
 * Authors: Steve Borho <steve@borho.org>
5
 *          Min Chen <chenm003@163.com>
6
 *
7
 * This program is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version.
11
 *
12
 * This program is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111, USA.
20
 *
21
 * This program is also available under a commercial proprietary license.
22
 * For more information, contact us at license @ x265.com
23
 *****************************************************************************/
24
25
#include "common.h"
26
#include "threadpool.h"
27
#include "threading.h"
28
29
#include <new>
30
#include <vector>
31
32
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
33
#include <winnt.h>
34
#endif
35
36
#if X86_64
37
38
#ifdef __GNUC__
39
40
0
#define SLEEPBITMAP_BSF(id, x)     (id) = ((unsigned long)__builtin_ctzll(x))
41
0
#define SLEEPBITMAP_OR(ptr, mask)  __sync_fetch_and_or(ptr, mask)
42
0
#define SLEEPBITMAP_AND(ptr, mask) __sync_fetch_and_and(ptr, mask)
43
44
#elif defined(_MSC_VER)
45
46
#define SLEEPBITMAP_BSF(id, x)     _BitScanForward64(&id, x)
47
#define SLEEPBITMAP_OR(ptr, mask)  InterlockedOr64((volatile LONG64*)ptr, (LONG)mask)
48
#define SLEEPBITMAP_AND(ptr, mask) InterlockedAnd64((volatile LONG64*)ptr, (LONG)mask)
49
50
#endif // ifdef __GNUC__
51
52
#else
53
54
/* use 32-bit primitives defined in threading.h */
55
#define SLEEPBITMAP_BSF BSF
56
#define SLEEPBITMAP_OR  ATOMIC_OR
57
#define SLEEPBITMAP_AND ATOMIC_AND
58
59
#endif
60
61
/* TODO FIX: Macro __MACH__ ideally should be part of MacOS definition, but adding to Cmake
62
   behaving is not as expected, need to fix this. */
63
64
#if MACOS && __MACH__
65
#include <sys/param.h>
66
#include <sys/sysctl.h>
67
#endif
68
#if HAVE_LIBNUMA
69
#include <numa.h>
70
#endif
71
#if defined(_MSC_VER)
72
# define strcasecmp _stricmp
73
#endif
74
75
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
76
const uint64_t m1 = 0x5555555555555555; //binary: 0101...
77
const uint64_t m2 = 0x3333333333333333; //binary: 00110011..
78
const uint64_t m3 = 0x0f0f0f0f0f0f0f0f; //binary:  4 zeros,  4 ones ...
79
const uint64_t h01 = 0x0101010101010101; //the sum of 256 to the power of 0,1,2,3...
80
81
static int popCount(uint64_t x)
82
{
83
    x -= (x >> 1) & m1;
84
    x = (x & m2) + ((x >> 2) & m2);
85
    x = (x + (x >> 4)) & m3;
86
    return (x * h01) >> 56;
87
}
88
#endif
89
90
namespace X265_NS {
91
// x265 private namespace
92
93
class WorkerThread : public Thread
94
{
95
private:
96
97
    ThreadPool&  m_pool;
98
    int          m_id;
99
    Event        m_wakeEvent;
100
101
    WorkerThread& operator =(const WorkerThread&);
102
103
public:
104
105
    JobProvider*     m_curJobProvider;
106
    BondedTaskGroup* m_bondMaster;
107
108
0
    WorkerThread(ThreadPool& pool, int id) : m_pool(pool), m_id(id) {}
109
0
    virtual ~WorkerThread() {}
110
111
    void threadMain();
112
0
    void awaken()           { m_wakeEvent.trigger(); }
113
};
114
115
void WorkerThread::threadMain()
116
0
{
117
0
    THREAD_NAME("Worker", m_id);
118
119
#if _WIN32
120
    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
121
#else
122
0
    __attribute__((unused)) int val = nice(10);
123
0
#endif
124
125
0
    m_pool.setCurrentThreadAffinity();
126
127
0
    sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id;
128
0
    m_curJobProvider = m_pool.m_jpTable[0];
129
0
    m_bondMaster = NULL;
130
131
0
    SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
132
0
    SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
133
0
    m_wakeEvent.wait();
134
135
0
    while (m_pool.m_isActive)
136
0
    {
137
0
        if (m_bondMaster)
138
0
        {
139
0
            m_bondMaster->processTasks(m_id);
140
0
            m_bondMaster->m_exitedPeerCount.incr();
141
0
            m_bondMaster = NULL;
142
0
        }
143
144
0
        do
145
0
        {
146
            /* do pending work for current job provider */
147
0
            m_curJobProvider->findJob(m_id);
148
149
            /* if the current job provider still wants help, only switch to a
150
             * higher priority provider (lower slice type). Else take the first
151
             * available job provider with the highest priority */
152
0
            int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
153
0
                                                                 INVALID_SLICE_PRIORITY + 1;
154
0
            int nextProvider = -1;
155
0
            for (int i = 0; i < m_pool.m_numProviders; i++)
156
0
            {
157
0
                if (m_pool.m_jpTable[i]->m_helpWanted &&
158
0
                    m_pool.m_jpTable[i]->m_sliceType < curPriority)
159
0
                {
160
0
                    nextProvider = i;
161
0
                    curPriority = m_pool.m_jpTable[i]->m_sliceType;
162
0
                }
163
0
            }
164
0
            if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
165
0
            {
166
0
                SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
167
0
                m_curJobProvider = m_pool.m_jpTable[nextProvider];
168
0
                SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
169
0
            }
170
0
        }
171
0
        while (m_curJobProvider->m_helpWanted);
172
173
        /* While the worker sleeps, a job-provider or bond-group may acquire this
174
         * worker's sleep bitmap bit. Once acquired, that thread may modify 
175
         * m_bondMaster or m_curJobProvider, then waken the thread */
176
0
        SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
177
0
        m_wakeEvent.wait();
178
0
    }
179
180
0
    SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
181
0
}
182
183
void JobProvider::tryWakeOne()
184
0
{
185
0
    int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
186
0
    if (id < 0)
187
0
    {
188
0
        m_helpWanted = true;
189
0
        return;
190
0
    }
191
192
0
    WorkerThread& worker = m_pool->m_workers[id];
193
0
    if (worker.m_curJobProvider != this) /* poaching */
194
0
    {
195
0
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
196
0
        SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
197
0
        worker.m_curJobProvider = this;
198
0
        SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
199
0
    }
200
0
    worker.awaken();
201
0
}
202
203
int ThreadPool::tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap)
204
0
{
205
0
    unsigned long id;
206
207
0
    sleepbitmap_t masked = m_sleepBitmap & firstTryBitmap;
208
0
    while (masked)
209
0
    {
210
0
        SLEEPBITMAP_BSF(id, masked);
211
212
0
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
213
0
        if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
214
0
            return (int)id;
215
216
0
        masked = m_sleepBitmap & firstTryBitmap;
217
0
    }
218
219
0
    masked = m_sleepBitmap & secondTryBitmap;
220
0
    while (masked)
221
0
    {
222
0
        SLEEPBITMAP_BSF(id, masked);
223
224
0
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
225
0
        if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
226
0
            return (int)id;
227
228
0
        masked = m_sleepBitmap & secondTryBitmap;
229
0
    }
230
231
0
    return -1;
232
0
}
233
234
int ThreadPool::tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master)
235
0
{
236
0
    int bondCount = 0;
237
0
    do
238
0
    {
239
0
        int id = tryAcquireSleepingThread(peerBitmap, 0);
240
0
        if (id < 0)
241
0
            return bondCount;
242
243
0
        m_workers[id].m_bondMaster = &master;
244
0
        m_workers[id].awaken();
245
0
        bondCount++;
246
0
    }
247
0
    while (bondCount < maxPeers);
248
249
0
    return bondCount;
250
0
}
251
252
/* Distributes totalNumThreads between ThreadedME and FrameEncoder pools.
253
 * Modifies threadsPerPool[], nodeMaskPerPool[], numNumaNodes, and numPools in-place.
254
 * Returns the number of threads reserved for frame encoding. */
255
static void distributeThreadsForTme(
256
    x265_param* p,
257
    int totalNumThreads,
258
    int& numNumaNodes,
259
    bool bNumaSupport,
260
    int* threadsPerPool,
261
    uint64_t* nodeMaskPerPool,
262
    int& numPools,
263
    int& threadsFrameEnc)
264
0
{
265
0
    if (totalNumThreads < MIN_TME_THREADS)
266
0
    {
267
0
        x265_log(p, X265_LOG_WARNING, "Low thread count detected, disabling --threaded-me."
268
0
            " Minimum recommended is 32 cores / threads\n");
269
0
        p->bThreadedME = 0;
270
0
        return;
271
0
    }
272
273
0
    int targetTME = ThreadPool::configureTmeThreadCount(p, totalNumThreads);
274
0
    targetTME = (targetTME < 1) ? 1 : targetTME;
275
276
0
    threadsFrameEnc = totalNumThreads - targetTME;
277
0
    int defaultNumFT = ThreadPool::getFrameThreadsCount(p, totalNumThreads);
278
0
    if (threadsFrameEnc < defaultNumFT)
279
0
    {
280
0
        threadsFrameEnc = defaultNumFT;
281
0
        targetTME = totalNumThreads - threadsFrameEnc;
282
0
    }
283
284
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA
285
    if (bNumaSupport && numNumaNodes > 1)
286
    {
287
        int tmeNumaNodes = 0;
288
        int leftover = 0;
289
290
        // First thread pool belongs to ThreadedME
291
        std::vector<int> threads(1, 0);
292
        std::vector<uint64_t> nodeMasks(1, 0);
293
        int poolIndex = 0;
294
295
        /* Greedily assign whole NUMA nodes to TME until reaching or exceeding the target */
296
        for (int i = 0; i < numNumaNodes + 1; i++)
297
        {
298
            if (!threadsPerPool[i] && !nodeMaskPerPool[i])
299
                continue;
300
301
            int toTake = X265_MIN(threadsPerPool[i], targetTME - threads[0]);
302
            if (toTake > 0)
303
            {
304
                threads[poolIndex] += toTake;
305
                nodeMasks[poolIndex] |= nodeMaskPerPool[i];
306
                tmeNumaNodes++;
307
308
                if (threads[0] == targetTME)
309
                    poolIndex++;
310
311
                if (toTake < threadsPerPool[i])
312
                    leftover = threadsPerPool[i] - toTake;
313
            }
314
            else
315
            {
316
                threads.push_back(threadsPerPool[i]);
317
                nodeMasks.push_back(nodeMaskPerPool[i]);
318
                poolIndex++;
319
            }
320
        }
321
322
        // Distribute leftover threads among FrameEncoders
323
        if (leftover)
324
        {
325
            // Case 1: There are 1 or more threadpools for FrameEncoder(s) by now
326
            if (threads.size() > 1)
327
            {
328
                int split = static_cast<int>(static_cast<double>(leftover) / (numNumaNodes - 1));
329
                for (int pool = 1; pool < numNumaNodes; pool++)
330
                {
331
                    int give = X265_MIN(split, leftover);
332
                    threads[pool] += give;
333
                    leftover -= give;
334
                }
335
            }
336
337
            // Case 2: FrameEncoder(s) haven't received threads yet
338
            if (threads.size() == 1)
339
            {
340
                threads.push_back(leftover);
341
                // Give the same node mask as the last node of ThreadedME
342
                uint64_t msb = 1;
343
                uint64_t tmeNodeMask = nodeMasks[0];
344
                while (tmeNodeMask > 1)
345
                {
346
                    tmeNodeMask >>= 1;
347
                    msb <<= 1;
348
                }
349
                nodeMasks.push_back(msb);
350
            }
351
        }
352
353
        // Apply calculated threadpool assignment
354
        // TODO: Make sure this doesn't cause a problem later on
355
        memset(threadsPerPool, 0, sizeof(int) * (numNumaNodes + 2));
356
        memset(nodeMaskPerPool, 0, sizeof(uint64_t) * (numNumaNodes + 2));
357
358
        numPools = numNumaNodes = static_cast<int>(threads.size());
359
        for (int pool = 0; pool < numPools; pool++)
360
        {
361
            threadsPerPool[pool] = threads[pool];
362
            nodeMaskPerPool[pool] = nodeMasks[pool];
363
        }
364
    }
365
    else
366
#endif
367
0
    {
368
0
        memset(threadsPerPool, 0, sizeof(int) * (numNumaNodes + 2));
369
0
        memset(nodeMaskPerPool, 0, sizeof(uint64_t) * (numNumaNodes + 2));
370
371
0
        threadsPerPool[0] = targetTME;
372
0
        nodeMaskPerPool[0] = 1;
373
374
0
        threadsPerPool[1] = threadsFrameEnc;
375
0
        nodeMaskPerPool[1] = 1;
376
377
0
        numPools = 2;
378
0
    }
379
0
}
380
381
ThreadPool* ThreadPool::allocThreadPools(x265_param* p, int& numPools, bool isThreadsReserved)
382
0
{
383
0
    enum { MAX_NODE_NUM = 127 };
384
0
    int cpusPerNode[MAX_NODE_NUM + 1];
385
0
    int threadsPerPool[MAX_NODE_NUM + 2];
386
0
    uint64_t nodeMaskPerPool[MAX_NODE_NUM + 2];
387
0
    int totalNumThreads = 0;
388
389
0
    memset(cpusPerNode, 0, sizeof(cpusPerNode));
390
0
    memset(threadsPerPool, 0, sizeof(threadsPerPool));
391
0
    memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool));
392
393
0
    int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
394
0
    bool bNumaSupport = false;
395
396
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
397
    bNumaSupport = true;
398
#elif HAVE_LIBNUMA
399
    bNumaSupport = numa_available() >= 0;
400
#endif
401
402
403
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
404
    PGROUP_AFFINITY groupAffinityPointer = new GROUP_AFFINITY;
405
    for (int i = 0; i < numNumaNodes; i++)
406
    {
407
        GetNumaNodeProcessorMaskEx((UCHAR)i, groupAffinityPointer);
408
        cpusPerNode[i] = popCount(groupAffinityPointer->Mask);
409
    }
410
    delete groupAffinityPointer;
411
#elif HAVE_LIBNUMA
412
    if (bNumaSupport)
413
    {
414
        struct bitmask* bitMask = numa_allocate_cpumask();
415
        for (int i = 0; i < numNumaNodes; i++)
416
        {
417
            int ret = numa_node_to_cpus(i, bitMask);
418
            if (!ret)
419
                cpusPerNode[i] = numa_bitmask_weight(bitMask);
420
            else
421
                x265_log(p, X265_LOG_ERROR, "Failed to genrate CPU mask\n");
422
        }
423
        numa_free_cpumask(bitMask);
424
    }
425
#else // NUMA not supported
426
0
    cpusPerNode[0] = getCpuCount();
427
0
#endif
428
429
0
    if (bNumaSupport && p->logLevel >= X265_LOG_DEBUG)
430
0
    for (int i = 0; i < numNumaNodes; i++)
431
0
        x265_log(p, X265_LOG_DEBUG, "detected NUMA node %d with %d logical cores\n", i, cpusPerNode[i]);
432
    /* limit threads based on param->numaPools
433
     * For windows because threads can't be allocated to live across sockets
434
     * changing the default behavior to be per-socket pools -- FIXME */
435
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA
436
    if (!strlen(p->numaPools) || (strcmp(p->numaPools, "NULL") == 0 || strcmp(p->numaPools, "*") == 0 || strcmp(p->numaPools, "") == 0))
437
    {
438
         char poolString[50] = "";
439
         for (int i = 0; i < numNumaNodes; i++)
440
         {
441
             char nextCount[10] = "";
442
             if (i)
443
                 snprintf(nextCount, sizeof(nextCount), ",%d", cpusPerNode[i]);
444
             else
445
                   snprintf(nextCount, sizeof(nextCount), "%d", cpusPerNode[i]);
446
             strcat(poolString, nextCount);
447
         }
448
         x265_param_parse(p, "pools", poolString);
449
     }
450
#endif
451
0
    if (strlen(p->numaPools))
452
0
    {
453
0
        const char *nodeStr = p->numaPools;
454
0
        for (int i = 0; i < numNumaNodes; i++)
455
0
        {
456
0
            if (!*nodeStr)
457
0
            {
458
0
                threadsPerPool[i] = 0;
459
0
                continue;
460
0
            }
461
0
            else if (*nodeStr == '-')
462
0
                threadsPerPool[i] = 0;
463
0
            else if (*nodeStr == '*' || !strcasecmp(nodeStr, "NULL"))
464
0
            {
465
0
                for (int j = i; j < numNumaNodes; j++)
466
0
                {
467
0
                    threadsPerPool[numNumaNodes] += cpusPerNode[j];
468
0
                    nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << j);
469
0
                }
470
0
                break;
471
0
            }
472
0
            else if (*nodeStr == '+')
473
0
            {
474
0
                threadsPerPool[numNumaNodes] += cpusPerNode[i];
475
0
                nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i);
476
0
            }
477
0
            else
478
0
            {
479
0
                int count = atoi(nodeStr);
480
0
                if (i > 0 || strchr(nodeStr, ','))   // it is comma -> old logic
481
0
                {
482
0
                    threadsPerPool[i] = X265_MIN(count, cpusPerNode[i]);
483
0
                    nodeMaskPerPool[i] = ((uint64_t)1 << i);
484
0
                }
485
0
                else                                 // new logic: exactly 'count' threads on all NUMAs
486
0
                {
487
0
                    threadsPerPool[numNumaNodes] = X265_MIN(count, numNumaNodes * MAX_POOL_THREADS);
488
0
                    nodeMaskPerPool[numNumaNodes] = ((uint64_t)-1 >> (64 - numNumaNodes));
489
0
                }
490
0
            }
491
492
            /* consume current node string, comma, and white-space */
493
0
            while (*nodeStr && *nodeStr != ',')
494
0
               ++nodeStr;
495
0
            if (*nodeStr == ',' || *nodeStr == ' ')
496
0
               ++nodeStr;
497
0
        }
498
0
    }
499
0
    else
500
0
    {
501
0
        for (int i = 0; i < numNumaNodes; i++)
502
0
        {
503
0
            threadsPerPool[numNumaNodes]  += cpusPerNode[i];
504
0
            nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i);
505
0
        }
506
0
    }
507
508
    /* If ThreadedME is enabled, split resources: give half NUMA nodes (or
509
     * half cores when NUMA not available) to threaded-me and the other half
510
     * to the remaining job providers. */
511
    /* Compute total CPU count from detected per-node counts when available */
512
0
    for (int i = 0; i < numNumaNodes + 1; i++)
513
0
        totalNumThreads += threadsPerPool[i];
514
0
    if (!totalNumThreads)
515
0
        totalNumThreads = ThreadPool::getCpuCount();
516
517
0
    int threadsFrameEnc = totalNumThreads;
518
0
    if (p->bThreadedME)
519
0
    {
520
0
        distributeThreadsForTme(p, totalNumThreads, numNumaNodes, bNumaSupport, threadsPerPool,
521
0
                                nodeMaskPerPool, numPools, threadsFrameEnc);
522
0
    }
523
 
524
    // If the last pool size is > MAX_POOL_THREADS, clip it to spawn thread pools only of size >= 1/2 max (heuristic)
525
0
    if ((threadsPerPool[numNumaNodes] > MAX_POOL_THREADS) &&
526
0
        ((threadsPerPool[numNumaNodes] % MAX_POOL_THREADS) < (MAX_POOL_THREADS / 2)))
527
0
    {
528
0
        threadsPerPool[numNumaNodes] -= (threadsPerPool[numNumaNodes] % MAX_POOL_THREADS);
529
0
        x265_log(p, X265_LOG_DEBUG,
530
0
                 "Creating only %d worker threads beyond specified numbers with --pools (if specified) to prevent asymmetry in pools; may not use all HW contexts\n", threadsPerPool[numNumaNodes]);
531
0
    }
532
533
0
    if (!p->bThreadedME)
534
0
    {
535
0
        numPools = 0;
536
0
        for (int i = 0; i < numNumaNodes + 1; i++)
537
0
        {
538
0
            if (bNumaSupport)
539
0
                x265_log(p, X265_LOG_DEBUG, "NUMA node %d may use %d logical cores\n", i, cpusPerNode[i]);
540
            
541
0
            if (threadsPerPool[i])
542
0
            {
543
0
                numPools += (threadsPerPool[i] + MAX_POOL_THREADS - 1) / MAX_POOL_THREADS;
544
0
                totalNumThreads += threadsPerPool[i];
545
0
            }
546
0
        }
547
0
    }
548
        
549
0
    if (!isThreadsReserved)
550
0
    {
551
0
        if (!numPools)
552
0
        {
553
0
            x265_log(p, X265_LOG_DEBUG, "No pool thread available. Deciding frame-threads based on detected CPU threads\n");
554
0
            totalNumThreads = ThreadPool::getCpuCount(); // auto-detect frame threads
555
0
        }
556
557
0
        if (!p->frameNumThreads)
558
0
            p->frameNumThreads = ThreadPool::getFrameThreadsCount(p, threadsFrameEnc);
559
0
    }
560
    
561
0
    if (!numPools)
562
0
        return NULL;
563
564
0
    if (numPools > p->frameNumThreads && !p->bThreadedME)
565
0
    {
566
0
        x265_log(p, X265_LOG_DEBUG, "Reducing number of thread pools for frame thread count\n");
567
0
        numPools = X265_MAX(p->frameNumThreads / 2, 1);
568
0
    }
569
0
    if (isThreadsReserved)
570
0
        numPools = 1;
571
0
    ThreadPool *pools = new ThreadPool[numPools];
572
0
    if (pools)
573
0
    {
574
0
        int poolCount = (p->bThreadedME) ? numPools - 1 : numPools;
575
0
        int node = 0;
576
0
        for (int i = 0; i < numPools; i++)
577
0
        {
578
0
            int maxProviders = (p->bThreadedME && i == 0) // threadpool 0 is dedicated to ThreadedME
579
0
                ? 1
580
0
                : (p->frameNumThreads + poolCount - 1) / poolCount + !isThreadsReserved; // +1 is Lookahead, always assigned to threadpool 0
581
            
582
0
            while (!threadsPerPool[node])
583
0
                node++;
584
0
            int numThreads = threadsPerPool[node];
585
0
            int origNumThreads = numThreads;
586
587
0
            if (i == 0 && p->lookaheadThreads > numThreads / 2)
588
0
            {
589
0
                p->lookaheadThreads = numThreads / 2;
590
0
                x265_log(p, X265_LOG_DEBUG, "Setting lookahead threads to a maximum of half the total number of threads\n");
591
0
            }
592
593
0
            if (isThreadsReserved)
594
0
            {
595
0
                numThreads = p->lookaheadThreads;
596
0
                maxProviders = 1;
597
0
            }
598
0
            else if (i == 0)
599
0
                numThreads -= p->lookaheadThreads;
600
            
601
0
            if (!pools[i].create(numThreads, maxProviders, nodeMaskPerPool[node]))
602
0
            {
603
0
                delete[] pools;
604
0
                numPools = 0;
605
0
                return NULL;
606
0
            }
607
0
            if (numNumaNodes > 1)
608
0
            {
609
0
                char *nodesstr = new char[64 * strlen(",63") + 1];
610
0
                int len = 0;
611
0
                for (int j = 0; j < 64; j++)
612
0
                    if ((nodeMaskPerPool[node] >> j) & 1)
613
0
                        len += snprintf(nodesstr + len, sizeof(nodesstr) - len, ",%d", j);
614
0
                x265_log(p, X265_LOG_INFO, "Thread pool %d using %d threads on numa nodes %s\n", i, numThreads, nodesstr + 1);
615
0
                delete[] nodesstr;
616
0
            }
617
0
            else
618
0
                x265_log(p, X265_LOG_INFO, "Thread pool created using %d threads\n", numThreads);
619
0
            threadsPerPool[node] -= origNumThreads;
620
0
        }
621
0
    }
622
0
    else
623
0
        numPools = 0;
624
0
    return pools;
625
0
}
626
627
ThreadPool::ThreadPool()
628
0
{
629
0
    memset(this, 0, sizeof(*this));
630
0
}
631
632
bool ThreadPool::create(int numThreads, int maxProviders, uint64_t nodeMask)
633
0
{
634
0
    X265_CHECK(numThreads <= MAX_POOL_THREADS, "a single thread pool cannot have more than MAX_POOL_THREADS threads\n");
635
636
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
637
    memset(&m_groupAffinity, 0, sizeof(GROUP_AFFINITY));
638
    for (int i = 0; i < getNumaNodeCount(); i++)
639
    {
640
        int numaNode = ((nodeMask >> i) & 0x1U) ? i : -1;
641
        if (numaNode != -1)
642
        if (GetNumaNodeProcessorMaskEx((USHORT)numaNode, &m_groupAffinity))
643
            break;
644
    }
645
    m_numaMask = &m_groupAffinity.Mask;
646
#elif HAVE_LIBNUMA
647
    if (numa_available() >= 0)
648
    {
649
        struct bitmask* nodemask = numa_allocate_nodemask();
650
        if (nodemask)
651
        {
652
            *(nodemask->maskp) = nodeMask;
653
            m_numaMask = nodemask;
654
        }
655
        else
656
            x265_log(NULL, X265_LOG_ERROR, "unable to get NUMA node mask for %lx\n", nodeMask);
657
    }
658
#else
659
0
    (void)nodeMask;
660
0
#endif
661
662
0
    m_numWorkers = numThreads;
663
664
0
    m_workers = X265_MALLOC(WorkerThread, numThreads);
665
    /* placement new initialization */
666
0
    if (m_workers)
667
0
        for (int i = 0; i < numThreads; i++)
668
0
            new (m_workers + i)WorkerThread(*this, i);
669
670
0
    m_jpTable = X265_MALLOC(JobProvider*, maxProviders);
671
0
    if (m_jpTable)
672
0
        memset(m_jpTable, 0, sizeof(JobProvider*) * maxProviders);
673
0
    m_numProviders = 0;
674
675
0
    return m_workers && m_jpTable;
676
0
}
677
678
bool ThreadPool::start()
679
0
{
680
0
    m_isActive = true;
681
0
    for (int i = 0; i < m_numWorkers; i++)
682
0
    {
683
0
        if (!m_workers[i].start())
684
0
        {
685
0
            m_isActive = false;
686
0
            return false;
687
0
        }
688
0
    }
689
0
    return true;
690
0
}
691
692
void ThreadPool::stopWorkers()
693
0
{
694
0
    if (m_workers)
695
0
    {
696
0
        m_isActive = false;
697
0
        for (int i = 0; i < m_numWorkers; i++)
698
0
        {
699
0
            while (!(m_sleepBitmap & ((sleepbitmap_t)1 << i)))
700
0
                GIVE_UP_TIME();
701
0
            m_workers[i].awaken();
702
0
            m_workers[i].stop();
703
0
        }
704
0
    }
705
0
}
706
707
ThreadPool::~ThreadPool()
708
0
{
709
0
    if (m_workers)
710
0
    {
711
0
        for (int i = 0; i < m_numWorkers; i++)
712
0
            m_workers[i].~WorkerThread();
713
0
    }
714
715
0
    X265_FREE(m_workers);
716
0
    X265_FREE(m_jpTable);
717
718
#if HAVE_LIBNUMA
719
    if(m_numaMask)
720
        numa_free_nodemask((struct bitmask*)m_numaMask);
721
#endif
722
0
}
723
724
void ThreadPool::setCurrentThreadAffinity()
725
0
{
726
0
    setThreadNodeAffinity(m_numaMask);
727
0
}
728
729
void ThreadPool::setThreadNodeAffinity(void *numaMask)
730
0
{
731
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
732
    UNREFERENCED_PARAMETER(numaMask);
733
    GROUP_AFFINITY groupAffinity;
734
    memset(&groupAffinity, 0, sizeof(GROUP_AFFINITY));
735
    groupAffinity.Group = m_groupAffinity.Group;
736
    groupAffinity.Mask = m_groupAffinity.Mask;
737
    const PGROUP_AFFINITY affinityPointer = &groupAffinity;
738
    if (SetThreadGroupAffinity(GetCurrentThread(), affinityPointer, NULL))
739
        return;
740
    else
741
        x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
742
#elif HAVE_LIBNUMA
743
    if (numa_available() >= 0)
744
    {
745
        numa_run_on_node_mask((struct bitmask*)numaMask);
746
        numa_set_interleave_mask((struct bitmask*)numaMask);
747
        numa_set_localalloc();
748
        return;
749
    }
750
    x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
751
#else
752
0
    (void)numaMask;
753
0
#endif
754
0
    return;
755
0
}
756
757
/* static */
758
int ThreadPool::getNumaNodeCount()
759
0
{
760
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
761
    ULONG num = 1;
762
    if (GetNumaHighestNodeNumber(&num))
763
        num++;
764
    return (int)num;
765
#elif HAVE_LIBNUMA
766
    if (numa_available() >= 0)
767
        return numa_max_node() + 1;
768
    else
769
        return 1;
770
#else
771
0
    return 1;
772
0
#endif
773
0
}
774
775
/* static */
776
int ThreadPool::getCpuCount()
777
0
{
778
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
779
    enum { MAX_NODE_NUM = 127 };
780
    int cpus = 0;
781
    int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
782
    GROUP_AFFINITY groupAffinity;
783
    for (int i = 0; i < numNumaNodes; i++)
784
    {
785
        GetNumaNodeProcessorMaskEx((UCHAR)i, &groupAffinity);
786
        cpus += popCount(groupAffinity.Mask);
787
    }
788
    return cpus;
789
#elif _WIN32
790
    SYSTEM_INFO sysinfo;
791
    GetSystemInfo(&sysinfo);
792
    return sysinfo.dwNumberOfProcessors;
793
#elif __unix__ && X265_ARCH_ARM
794
    /* Return the number of processors configured by OS. Because, most embedded linux distributions
795
     * uses only one processor as the scheduler doesn't have enough work to utilize all processors */
796
    return sysconf(_SC_NPROCESSORS_CONF);
797
#elif __unix__
798
0
    return sysconf(_SC_NPROCESSORS_ONLN);
799
#elif MACOS && __MACH__
800
    int nm[2];
801
    size_t len = 4;
802
    uint32_t count;
803
804
    nm[0] = CTL_HW;
805
    nm[1] = HW_AVAILCPU;
806
    sysctl(nm, 2, &count, &len, NULL, 0);
807
808
    if (count < 1)
809
    {
810
        nm[1] = HW_NCPU;
811
        sysctl(nm, 2, &count, &len, NULL, 0);
812
        if (count < 1)
813
            count = 1;
814
    }
815
816
    return count;
817
#else
818
    return 2; // default to 2 threads, everywhere else
819
#endif
820
0
}
821
822
int ThreadPool::getFrameThreadsCount(x265_param* p, int cpuCount)
823
0
{
824
0
    int rows = (p->sourceHeight + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize];
825
0
    if (!p->bEnableWavefront)
826
0
        return X265_MIN3(cpuCount, (rows + 1) / 2, X265_MAX_FRAME_THREADS);
827
0
    else if (cpuCount >= 32)
828
0
        return (p->sourceHeight > 2000) ? 6 : 5; 
829
0
    else if (cpuCount >= 16)
830
0
        return 4; 
831
0
    else if (cpuCount >= 8)
832
#if _WIN32 && X265_ARCH_ARM64
833
        return cpuCount;
834
#else
835
0
        return 3;
836
0
#endif
837
0
    else if (cpuCount >= 4)
838
0
        return 2;
839
0
    else
840
0
        return 1;
841
0
}
842
843
int ThreadPool::configureTmeThreadCount(x265_param* param, int cpuCount)
844
0
{
845
0
    enum TmeResClass
846
0
    {
847
0
        TME_RES_LOW = 0,
848
0
        TME_RES_MID,
849
0
        TME_RES_HIGH,
850
0
        TME_RES_COUNT
851
0
    };
852
853
0
    enum TmeRule
854
0
    {
855
0
        TME_RULE_SLOWER_VERYSLOW = 0,
856
0
        TME_RULE_SLOW,
857
0
        TME_RULE_FAST_MEDIUM,
858
0
        TME_RULE_FASTER,
859
0
        TME_RULE_VERYFAST,
860
0
        TME_RULE_SUPERFAST,
861
0
        TME_RULE_ULTRAFAST,
862
0
        TME_RULE_COUNT
863
0
    };
864
865
0
    struct TmeRuleConfig
866
0
    {
867
0
        int taskBlockSize[TME_RES_COUNT];
868
0
        int numBufferRows[TME_RES_COUNT];
869
0
        int threadPercent[TME_RES_COUNT];
870
0
        bool widthBasedTaskBlockSize;
871
0
    };
872
873
0
    static const TmeRuleConfig s_tmeRuleConfig[TME_RULE_COUNT] =
874
0
    {
875
0
        { { 1, 1, 1 }, { 3, 3, 3 }, { 90, 85, 70 }, false }, // slower / veryslow preset and similar options
876
0
        { { 1, 1, 1 }, { 5, 5, 3 }, { 90, 85, 75 }, false }, // slow preset and similar options
877
0
        { { 1, 1, 1 }, { 10, 10, 10 }, { 90, 80, 70 }, false }, // fast / medium preset and similar options
878
0
        { { 1, 1, 1 }, { 10, 15, 10 }, { 90, 80, 70 }, false }, // faster preset and similar options
879
0
        { { 1, 1, 1 }, { 10, 15, 20 }, { 90, 80, 70 }, false }, // veryfast preset and similar options
880
0
        { { 2, 4, 4 }, { 10, 15, 20 }, { 90, 80, 60 }, false }, // superfast preset and similar options
881
0
        { { 0, 0, 0 }, { 15, 20, 20 }, { 90, 80, 50 }, true  }  // ultrafast preset and similar options
882
0
    };
883
884
0
    const int resClass = (param->sourceHeight >= 1440) ? TME_RES_HIGH :
885
0
                         (param->sourceHeight <= 720) ? TME_RES_LOW : TME_RES_MID;
886
887
0
    const bool ruleMatches[TME_RULE_COUNT] =
888
0
    {
889
0
        param->maxNumReferences >= 5 || param->subpelRefine >= 4 || (param->bEnableRectInter && param->bEnableAMP),
890
0
        param->maxNumReferences >= 4 || param->subpelRefine >= 3 || (param->bEnableRectInter ^ param->bEnableAMP),
891
0
        param->maxNumReferences >= 3 && param->subpelRefine >= 2,
892
0
        param->maxNumReferences >= 2 && param->subpelRefine >= 2,
893
0
        param->subpelRefine >= 1 && param->bframes > 3,
894
0
        param->subpelRefine && param->maxCUSize < 64,
895
0
        !param->subpelRefine || param->searchMethod == X265_DIA_SEARCH || param->minCUSize >= 16
896
0
    };
897
898
0
    int selectedRule = -1;
899
0
    for (int i = 0; i < TME_RULE_COUNT; i++)
900
0
    {
901
0
        if (ruleMatches[i])
902
0
        {
903
0
            selectedRule = i;
904
0
            break;
905
0
        }
906
0
    }
907
908
0
    if (selectedRule >= 0)
909
0
    {
910
0
        const TmeRuleConfig& cfg = s_tmeRuleConfig[selectedRule];
911
0
        param->tmeTaskBlockSize = cfg.widthBasedTaskBlockSize ? ((param->sourceWidth + 480 - 1) / 480) : cfg.taskBlockSize[resClass];
912
0
        param->tmeNumBufferRows = cfg.numBufferRows[resClass];
913
0
        return (cpuCount * cfg.threadPercent[resClass]) / 100;
914
0
    }
915
916
0
    static const int s_defaultThreadPercent[TME_RES_COUNT] = { 80, 80, 70 };
917
0
    return (cpuCount * s_defaultThreadPercent[resClass]) / 100;
918
0
}
919
920
} // end namespace X265_NS