Coverage Report

Created: 2026-03-08 06:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/work/x265/source/common/threadpool.cpp
Line
Count
Source
1
/*****************************************************************************
2
 * Copyright (C) 2013-2020 MulticoreWare, Inc
3
 *
4
 * Authors: Steve Borho <steve@borho.org>
5
 *          Min Chen <chenm003@163.com>
6
 *
7
 * This program is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version.
11
 *
12
 * This program is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111, USA.
20
 *
21
 * This program is also available under a commercial proprietary license.
22
 * For more information, contact us at license @ x265.com
23
 *****************************************************************************/
24
25
#include "common.h"
26
#include "threadpool.h"
27
#include "threading.h"
28
29
#include <new>
30
#include <vector>
31
32
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
33
#include <winnt.h>
34
#endif
35
36
#if X86_64
37
38
#ifdef __GNUC__
39
40
9.91k
#define SLEEPBITMAP_BSF(id, x)     (id) = ((unsigned long)__builtin_ctzll(x))
41
72.6k
#define SLEEPBITMAP_OR(ptr, mask)  __sync_fetch_and_or(ptr, mask)
42
9.91k
#define SLEEPBITMAP_AND(ptr, mask) __sync_fetch_and_and(ptr, mask)
43
44
#elif defined(_MSC_VER)
45
46
#define SLEEPBITMAP_BSF(id, x)     _BitScanForward64(&id, x)
47
#define SLEEPBITMAP_OR(ptr, mask)  InterlockedOr64((volatile LONG64*)ptr, (LONG)mask)
48
#define SLEEPBITMAP_AND(ptr, mask) InterlockedAnd64((volatile LONG64*)ptr, (LONG)mask)
49
50
#endif // ifdef __GNUC__
51
52
#else
53
54
/* use 32-bit primitives defined in threading.h */
55
#define SLEEPBITMAP_BSF BSF
56
#define SLEEPBITMAP_OR  ATOMIC_OR
57
#define SLEEPBITMAP_AND ATOMIC_AND
58
59
#endif
60
61
/* TODO FIX: Macro __MACH__ ideally should be part of MacOS definition, but adding to Cmake
62
   behaving is not as expected, need to fix this. */
63
64
#if MACOS && __MACH__
65
#include <sys/param.h>
66
#include <sys/sysctl.h>
67
#endif
68
#if HAVE_LIBNUMA
69
#include <numa.h>
70
#endif
71
#if defined(_MSC_VER)
72
# define strcasecmp _stricmp
73
#endif
74
75
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
76
const uint64_t m1 = 0x5555555555555555; //binary: 0101...
77
const uint64_t m2 = 0x3333333333333333; //binary: 00110011..
78
const uint64_t m3 = 0x0f0f0f0f0f0f0f0f; //binary:  4 zeros,  4 ones ...
79
const uint64_t h01 = 0x0101010101010101; //the sum of 256 to the power of 0,1,2,3...
80
81
static int popCount(uint64_t x)
82
{
83
    x -= (x >> 1) & m1;
84
    x = (x & m2) + ((x >> 2) & m2);
85
    x = (x + (x >> 4)) & m3;
86
    return (x * h01) >> 56;
87
}
88
#endif
89
90
namespace X265_NS {
91
// x265 private namespace
92
93
class WorkerThread : public Thread
94
{
95
private:
96
97
    ThreadPool&  m_pool;
98
    int          m_id;
99
    Event        m_wakeEvent;
100
101
    WorkerThread& operator =(const WorkerThread&);
102
103
public:
104
105
    JobProvider*     m_curJobProvider;
106
    BondedTaskGroup* m_bondMaster;
107
108
20.9k
    WorkerThread(ThreadPool& pool, int id) : m_pool(pool), m_id(id) {}
109
20.9k
    virtual ~WorkerThread() {}
110
111
    void threadMain();
112
30.8k
    void awaken()           { m_wakeEvent.trigger(); }
113
};
114
115
void WorkerThread::threadMain()
116
20.9k
{
117
20.9k
    THREAD_NAME("Worker", m_id);
118
119
#if _WIN32
120
    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
121
#else
122
20.9k
    __attribute__((unused)) int val = nice(10);
123
20.9k
#endif
124
125
20.9k
    m_pool.setCurrentThreadAffinity();
126
127
20.9k
    sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id;
128
20.9k
    m_curJobProvider = m_pool.m_jpTable[0];
129
20.9k
    m_bondMaster = NULL;
130
131
20.9k
    SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
132
20.9k
    SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
133
20.9k
    m_wakeEvent.wait();
134
135
30.8k
    while (m_pool.m_isActive)
136
9.89k
    {
137
9.89k
        if (m_bondMaster)
138
654
        {
139
654
            m_bondMaster->processTasks(m_id);
140
654
            m_bondMaster->m_exitedPeerCount.incr();
141
654
            m_bondMaster = NULL;
142
654
        }
143
144
9.89k
        do
145
16.3k
        {
146
            /* do pending work for current job provider */
147
16.3k
            m_curJobProvider->findJob(m_id);
148
149
            /* if the current job provider still wants help, only switch to a
150
             * higher priority provider (lower slice type). Else take the first
151
             * available job provider with the highest priority */
152
16.3k
            int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
153
16.3k
                                                                 INVALID_SLICE_PRIORITY + 1;
154
16.3k
            int nextProvider = -1;
155
113k
            for (int i = 0; i < m_pool.m_numProviders; i++)
156
97.2k
            {
157
97.2k
                if (m_pool.m_jpTable[i]->m_helpWanted &&
158
6.42k
                    m_pool.m_jpTable[i]->m_sliceType < curPriority)
159
0
                {
160
0
                    nextProvider = i;
161
0
                    curPriority = m_pool.m_jpTable[i]->m_sliceType;
162
0
                }
163
97.2k
            }
164
16.3k
            if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
165
0
            {
166
0
                SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
167
0
                m_curJobProvider = m_pool.m_jpTable[nextProvider];
168
0
                SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
169
0
            }
170
16.3k
        }
171
16.3k
        while (m_curJobProvider->m_helpWanted);
172
173
        /* While the worker sleeps, a job-provider or bond-group may acquire this
174
         * worker's sleep bitmap bit. Once acquired, that thread may modify 
175
         * m_bondMaster or m_curJobProvider, then waken the thread */
176
9.89k
        SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
177
9.89k
        m_wakeEvent.wait();
178
9.89k
    }
179
180
20.9k
    SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
181
20.9k
}
182
183
void JobProvider::tryWakeOne()
184
9.26k
{
185
9.26k
    int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
186
9.26k
    if (id < 0)
187
0
    {
188
0
        m_helpWanted = true;
189
0
        return;
190
0
    }
191
192
9.26k
    WorkerThread& worker = m_pool->m_workers[id];
193
9.26k
    if (worker.m_curJobProvider != this) /* poaching */
194
0
    {
195
0
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
196
0
        SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
197
0
        worker.m_curJobProvider = this;
198
0
        SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
199
0
    }
200
9.26k
    worker.awaken();
201
9.26k
}
202
203
int ThreadPool::tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap)
204
9.91k
{
205
9.91k
    unsigned long id;
206
207
9.91k
    sleepbitmap_t masked = m_sleepBitmap & firstTryBitmap;
208
9.91k
    while (masked)
209
9.91k
    {
210
9.91k
        SLEEPBITMAP_BSF(id, masked);
211
212
9.91k
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
213
9.91k
        if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
214
9.91k
            return (int)id;
215
216
0
        masked = m_sleepBitmap & firstTryBitmap;
217
0
    }
218
219
0
    masked = m_sleepBitmap & secondTryBitmap;
220
0
    while (masked)
221
0
    {
222
0
        SLEEPBITMAP_BSF(id, masked);
223
224
0
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
225
0
        if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
226
0
            return (int)id;
227
228
0
        masked = m_sleepBitmap & secondTryBitmap;
229
0
    }
230
231
0
    return -1;
232
0
}
233
234
int ThreadPool::tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master)
235
654
{
236
654
    int bondCount = 0;
237
654
    do
238
654
    {
239
654
        int id = tryAcquireSleepingThread(peerBitmap, 0);
240
654
        if (id < 0)
241
0
            return bondCount;
242
243
654
        m_workers[id].m_bondMaster = &master;
244
654
        m_workers[id].awaken();
245
654
        bondCount++;
246
654
    }
247
654
    while (bondCount < maxPeers);
248
249
654
    return bondCount;
250
654
}
251
ThreadPool* ThreadPool::allocThreadPools(x265_param* p, int& numPools, bool isThreadsReserved)
252
654
{
253
654
    enum { MAX_NODE_NUM = 127 };
254
654
    int cpusPerNode[MAX_NODE_NUM + 1];
255
654
    int threadsPerPool[MAX_NODE_NUM + 2];
256
654
    uint64_t nodeMaskPerPool[MAX_NODE_NUM + 2];
257
654
    int totalNumThreads = 0;
258
259
654
    memset(cpusPerNode, 0, sizeof(cpusPerNode));
260
654
    memset(threadsPerPool, 0, sizeof(threadsPerPool));
261
654
    memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool));
262
263
654
    int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
264
654
    bool bNumaSupport = false;
265
266
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
267
    bNumaSupport = true;
268
#elif HAVE_LIBNUMA
269
    bNumaSupport = numa_available() >= 0;
270
#endif
271
272
273
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
274
    PGROUP_AFFINITY groupAffinityPointer = new GROUP_AFFINITY;
275
    for (int i = 0; i < numNumaNodes; i++)
276
    {
277
        GetNumaNodeProcessorMaskEx((UCHAR)i, groupAffinityPointer);
278
        cpusPerNode[i] = popCount(groupAffinityPointer->Mask);
279
    }
280
    delete groupAffinityPointer;
281
#elif HAVE_LIBNUMA
282
    if (bNumaSupport)
283
    {
284
        struct bitmask* bitMask = numa_allocate_cpumask();
285
        for (int i = 0; i < numNumaNodes; i++)
286
        {
287
            int ret = numa_node_to_cpus(i, bitMask);
288
            if (!ret)
289
                cpusPerNode[i] = numa_bitmask_weight(bitMask);
290
            else
291
                x265_log(p, X265_LOG_ERROR, "Failed to genrate CPU mask\n");
292
        }
293
        numa_free_cpumask(bitMask);
294
    }
295
#else // NUMA not supported
296
654
    cpusPerNode[0] = getCpuCount();
297
654
#endif
298
299
654
    if (bNumaSupport && p->logLevel >= X265_LOG_DEBUG)
300
0
    for (int i = 0; i < numNumaNodes; i++)
301
0
        x265_log(p, X265_LOG_DEBUG, "detected NUMA node %d with %d logical cores\n", i, cpusPerNode[i]);
302
    /* limit threads based on param->numaPools
303
     * For windows because threads can't be allocated to live across sockets
304
     * changing the default behavior to be per-socket pools -- FIXME */
305
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA
306
    if (!strlen(p->numaPools) || (strcmp(p->numaPools, "NULL") == 0 || strcmp(p->numaPools, "*") == 0 || strcmp(p->numaPools, "") == 0))
307
    {
308
         char poolString[50] = "";
309
         for (int i = 0; i < numNumaNodes; i++)
310
         {
311
             char nextCount[10] = "";
312
             if (i)
313
                 snprintf(nextCount, sizeof(nextCount), ",%d", cpusPerNode[i]);
314
             else
315
                   snprintf(nextCount, sizeof(nextCount), "%d", cpusPerNode[i]);
316
             strcat(poolString, nextCount);
317
         }
318
         x265_param_parse(p, "pools", poolString);
319
     }
320
#endif
321
654
    if (strlen(p->numaPools))
322
0
    {
323
0
        const char *nodeStr = p->numaPools;
324
0
        for (int i = 0; i < numNumaNodes; i++)
325
0
        {
326
0
            if (!*nodeStr)
327
0
            {
328
0
                threadsPerPool[i] = 0;
329
0
                continue;
330
0
            }
331
0
            else if (*nodeStr == '-')
332
0
                threadsPerPool[i] = 0;
333
0
            else if (*nodeStr == '*' || !strcasecmp(nodeStr, "NULL"))
334
0
            {
335
0
                for (int j = i; j < numNumaNodes; j++)
336
0
                {
337
0
                    threadsPerPool[numNumaNodes] += cpusPerNode[j];
338
0
                    nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << j);
339
0
                }
340
0
                break;
341
0
            }
342
0
            else if (*nodeStr == '+')
343
0
            {
344
0
                threadsPerPool[numNumaNodes] += cpusPerNode[i];
345
0
                nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i);
346
0
            }
347
0
            else
348
0
            {
349
0
                int count = atoi(nodeStr);
350
0
                if (i > 0 || strchr(nodeStr, ','))   // it is comma -> old logic
351
0
                {
352
0
                    threadsPerPool[i] = X265_MIN(count, cpusPerNode[i]);
353
0
                    nodeMaskPerPool[i] = ((uint64_t)1 << i);
354
0
                }
355
0
                else                                 // new logic: exactly 'count' threads on all NUMAs
356
0
                {
357
0
                    threadsPerPool[numNumaNodes] = X265_MIN(count, numNumaNodes * MAX_POOL_THREADS);
358
0
                    nodeMaskPerPool[numNumaNodes] = ((uint64_t)-1 >> (64 - numNumaNodes));
359
0
                }
360
0
            }
361
362
            /* consume current node string, comma, and white-space */
363
0
            while (*nodeStr && *nodeStr != ',')
364
0
               ++nodeStr;
365
0
            if (*nodeStr == ',' || *nodeStr == ' ')
366
0
               ++nodeStr;
367
0
        }
368
0
    }
369
654
    else
370
654
    {
371
1.30k
        for (int i = 0; i < numNumaNodes; i++)
372
654
        {
373
654
            threadsPerPool[numNumaNodes]  += cpusPerNode[i];
374
654
            nodeMaskPerPool[numNumaNodes] |= ((uint64_t)1 << i);
375
654
        }
376
654
    }
377
378
    /* If ThreadedME is enabled, split resources: give half NUMA nodes (or
379
     * half cores when NUMA not available) to threaded-me and the other half
380
     * to the remaining job providers. */
381
    /* Compute total CPU count from detected per-node counts when available */
382
1.96k
    for (int i = 0; i < numNumaNodes + 1; i++)
383
1.30k
        totalNumThreads += threadsPerPool[i];
384
654
    if (!totalNumThreads)
385
0
        totalNumThreads = ThreadPool::getCpuCount();
386
387
654
    int threadsFrameEnc = 0;
388
    
389
654
    if (p->bThreadedME)
390
0
    {
391
0
        int targetTME = configureTmeThreadCount(p, totalNumThreads);
392
0
        targetTME = (targetTME < 1) ? 1 : targetTME;
393
394
0
        threadsFrameEnc = totalNumThreads - targetTME;
395
0
        int defaultNumFT = getFrameThreadsCount(p, totalNumThreads);
396
0
        if (threadsFrameEnc < defaultNumFT)
397
0
        {
398
0
            threadsFrameEnc = defaultNumFT;
399
0
            targetTME = totalNumThreads - threadsFrameEnc;
400
0
        }
401
402
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 || HAVE_LIBNUMA
403
        if (bNumaSupport && numNumaNodes > 1)
404
        {
405
            int tmeNumaNodes = 0;
406
            int leftover = 0;
407
            
408
            // First thread pool belongs to ThreadedME
409
            std::vector<int> threads(1, 0);
410
            std::vector<uint64_t> nodeMasks(1, 0);
411
            int poolIndex = 0;
412
            
413
            /* Greedily assign whole NUMA nodes to TME until reaching or exceeding the target */
414
            for (int i = 0; i < numNumaNodes + 1; i++)
415
            {
416
                if (!threadsPerPool[i] && !nodeMaskPerPool[i])
417
                    continue;
418
419
                int toTake = X265_MIN(threadsPerPool[i], targetTME - threads[0]);
420
                if (toTake > 0)
421
                {
422
                    threads[poolIndex] += toTake;
423
                    nodeMasks[poolIndex] |= nodeMaskPerPool[i];
424
                    tmeNumaNodes++;
425
426
                    if (threads[0] == targetTME)
427
                        poolIndex++;
428
429
                    if (toTake < threadsPerPool[i])
430
                        leftover = threadsPerPool[i] - toTake;
431
                }
432
                else
433
                { 
434
                    threads.push_back(threadsPerPool[i]);
435
                    nodeMasks.push_back(nodeMaskPerPool[i]);
436
                    poolIndex++;
437
                }
438
            }
439
440
            // Distribute leftover threads among FrameEncoders
441
            if (leftover)
442
            {
443
                // Case 1: There are 1 or more threadpools for FrameEncoder(s) by now
444
                if (threads.size() > 1)
445
                {
446
                    int split = static_cast<int>(static_cast<double>(leftover) / (numNumaNodes - 1));
447
                    for (int pool = 1; pool < numNumaNodes; pool++)
448
                    {
449
                        int give = X265_MIN(split, leftover);
450
                        threads[pool] += give;
451
                        leftover -= give;
452
                    }
453
                }
454
455
                // Case 2: FrameEncoder(s) haven't received threads yet
456
                if (threads.size() == 1)
457
                {
458
                    threads.push_back(leftover);
459
                    // Give the same node mask as the last node of ThreadedME
460
                    uint64_t msb = 1;
461
                    uint64_t tmeNodeMask = nodeMasks[0];
462
                    while (tmeNodeMask > 1)
463
                    {
464
                        tmeNodeMask >>= 1;
465
                        msb <<= 1;
466
                    }
467
                    nodeMasks.push_back(msb);
468
                }
469
            }
470
471
            // Apply calculated threadpool assignment
472
            // TODO: Make sure this doesn't cause a problem later on
473
            memset(threadsPerPool, 0, sizeof(threadsPerPool));
474
            memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool));
475
476
            numPools = numNumaNodes = static_cast<int>(threads.size());
477
            for (int pool = 0; pool < numPools; pool++)
478
            {
479
                threadsPerPool[pool] = threads[pool];
480
                nodeMaskPerPool[pool] = nodeMasks[pool];
481
            }
482
        }
483
        else
484
#endif
485
0
        {
486
0
            memset(threadsPerPool, 0, sizeof(threadsPerPool));
487
0
            memset(nodeMaskPerPool, 0, sizeof(nodeMaskPerPool));
488
            
489
0
            threadsPerPool[0] = targetTME;
490
0
            nodeMaskPerPool[0] = 1;
491
492
0
            threadsPerPool[1] = threadsFrameEnc;
493
0
            nodeMaskPerPool[1] = 1;
494
495
0
            numPools = 2;
496
0
        }
497
0
    }
498
654
    else
499
654
    {
500
654
        threadsFrameEnc = totalNumThreads;
501
654
    }
502
 
503
    // If the last pool size is > MAX_POOL_THREADS, clip it to spawn thread pools only of size >= 1/2 max (heuristic)
504
654
    if ((threadsPerPool[numNumaNodes] > MAX_POOL_THREADS) &&
505
0
        ((threadsPerPool[numNumaNodes] % MAX_POOL_THREADS) < (MAX_POOL_THREADS / 2)))
506
0
    {
507
0
        threadsPerPool[numNumaNodes] -= (threadsPerPool[numNumaNodes] % MAX_POOL_THREADS);
508
0
        x265_log(p, X265_LOG_DEBUG,
509
0
                 "Creating only %d worker threads beyond specified numbers with --pools (if specified) to prevent asymmetry in pools; may not use all HW contexts\n", threadsPerPool[numNumaNodes]);
510
0
    }
511
512
654
    if (!p->bThreadedME)
513
654
    {
514
654
        numPools = 0;
515
1.96k
        for (int i = 0; i < numNumaNodes + 1; i++)
516
1.30k
        {
517
1.30k
            if (bNumaSupport)
518
0
                x265_log(p, X265_LOG_DEBUG, "NUMA node %d may use %d logical cores\n", i, cpusPerNode[i]);
519
            
520
1.30k
            if (threadsPerPool[i])
521
654
            {
522
654
                numPools += (threadsPerPool[i] + MAX_POOL_THREADS - 1) / MAX_POOL_THREADS;
523
654
                totalNumThreads += threadsPerPool[i];
524
654
            }
525
1.30k
        }
526
654
    }
527
        
528
654
    if (!isThreadsReserved)
529
654
    {
530
654
        if (!numPools)
531
0
        {
532
0
            x265_log(p, X265_LOG_DEBUG, "No pool thread available. Deciding frame-threads based on detected CPU threads\n");
533
0
            totalNumThreads = ThreadPool::getCpuCount(); // auto-detect frame threads
534
0
        }
535
536
654
        if (!p->frameNumThreads)
537
654
            p->frameNumThreads = ThreadPool::getFrameThreadsCount(p, threadsFrameEnc);
538
654
    }
539
    
540
654
    if (!numPools)
541
0
        return NULL;
542
543
654
    if (numPools > p->frameNumThreads && !p->bThreadedME)
544
0
    {
545
0
        x265_log(p, X265_LOG_DEBUG, "Reducing number of thread pools for frame thread count\n");
546
0
        numPools = X265_MAX(p->frameNumThreads / 2, 1);
547
0
    }
548
654
    if (isThreadsReserved)
549
0
        numPools = 1;
550
654
    ThreadPool *pools = new ThreadPool[numPools];
551
654
    if (pools)
552
654
    {
553
654
        int poolCount = (p->bThreadedME) ? numPools - 1 : numPools;
554
654
        int node = 0;
555
1.30k
        for (int i = 0; i < numPools; i++)
556
654
        {
557
654
            int maxProviders = (p->bThreadedME && i == 0) // threadpool 0 is dedicated to ThreadedME
558
654
                ? 1
559
654
                : (p->frameNumThreads + poolCount - 1) / poolCount + !isThreadsReserved; // +1 is Lookahead, always assigned to threadpool 0
560
            
561
1.30k
            while (!threadsPerPool[node])
562
654
                node++;
563
654
            int numThreads = threadsPerPool[node];
564
654
            int origNumThreads = numThreads;
565
566
654
            if (i == 0 && p->lookaheadThreads > numThreads / 2)
567
0
            {
568
0
                p->lookaheadThreads = numThreads / 2;
569
0
                x265_log(p, X265_LOG_DEBUG, "Setting lookahead threads to a maximum of half the total number of threads\n");
570
0
            }
571
572
654
            if (isThreadsReserved)
573
0
            {
574
0
                numThreads = p->lookaheadThreads;
575
0
                maxProviders = 1;
576
0
            }
577
654
            else if (i == 0)
578
654
                numThreads -= p->lookaheadThreads;
579
            
580
654
            if (!pools[i].create(numThreads, maxProviders, nodeMaskPerPool[node]))
581
0
            {
582
0
                delete[] pools;
583
0
                numPools = 0;
584
0
                return NULL;
585
0
            }
586
654
            if (numNumaNodes > 1)
587
0
            {
588
0
                char *nodesstr = new char[64 * strlen(",63") + 1];
589
0
                int len = 0;
590
0
                for (int j = 0; j < 64; j++)
591
0
                    if ((nodeMaskPerPool[node] >> j) & 1)
592
0
                        len += snprintf(nodesstr + len, sizeof(nodesstr) - len, ",%d", j);
593
0
                x265_log(p, X265_LOG_INFO, "Thread pool %d using %d threads on numa nodes %s\n", i, numThreads, nodesstr + 1);
594
0
                delete[] nodesstr;
595
0
            }
596
654
            else
597
654
                x265_log(p, X265_LOG_INFO, "Thread pool created using %d threads\n", numThreads);
598
654
            threadsPerPool[node] -= origNumThreads;
599
654
        }
600
654
    }
601
0
    else
602
0
        numPools = 0;
603
654
    return pools;
604
654
}
605
606
ThreadPool::ThreadPool()
607
654
{
608
654
    memset(this, 0, sizeof(*this));
609
654
}
610
611
bool ThreadPool::create(int numThreads, int maxProviders, uint64_t nodeMask)
612
654
{
613
654
    X265_CHECK(numThreads <= MAX_POOL_THREADS, "a single thread pool cannot have more than MAX_POOL_THREADS threads\n");
614
615
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
616
    memset(&m_groupAffinity, 0, sizeof(GROUP_AFFINITY));
617
    for (int i = 0; i < getNumaNodeCount(); i++)
618
    {
619
        int numaNode = ((nodeMask >> i) & 0x1U) ? i : -1;
620
        if (numaNode != -1)
621
        if (GetNumaNodeProcessorMaskEx((USHORT)numaNode, &m_groupAffinity))
622
            break;
623
    }
624
    m_numaMask = &m_groupAffinity.Mask;
625
#elif HAVE_LIBNUMA
626
    if (numa_available() >= 0)
627
    {
628
        struct bitmask* nodemask = numa_allocate_nodemask();
629
        if (nodemask)
630
        {
631
            *(nodemask->maskp) = nodeMask;
632
            m_numaMask = nodemask;
633
        }
634
        else
635
            x265_log(NULL, X265_LOG_ERROR, "unable to get NUMA node mask for %lx\n", nodeMask);
636
    }
637
#else
638
654
    (void)nodeMask;
639
654
#endif
640
641
654
    m_numWorkers = numThreads;
642
643
654
    m_workers = X265_MALLOC(WorkerThread, numThreads);
644
    /* placement new initialization */
645
654
    if (m_workers)
646
21.5k
        for (int i = 0; i < numThreads; i++)
647
20.9k
            new (m_workers + i)WorkerThread(*this, i);
648
649
654
    m_jpTable = X265_MALLOC(JobProvider*, maxProviders);
650
654
    if (m_jpTable)
651
654
        memset(m_jpTable, 0, sizeof(JobProvider*) * maxProviders);
652
654
    m_numProviders = 0;
653
654
654
    return m_workers && m_jpTable;
655
654
}
656
657
bool ThreadPool::start()
658
654
{
659
654
    m_isActive = true;
660
21.5k
    for (int i = 0; i < m_numWorkers; i++)
661
20.9k
    {
662
20.9k
        if (!m_workers[i].start())
663
0
        {
664
0
            m_isActive = false;
665
0
            return false;
666
0
        }
667
20.9k
    }
668
654
    return true;
669
654
}
670
671
void ThreadPool::stopWorkers()
672
654
{
673
654
    if (m_workers)
674
654
    {
675
654
        m_isActive = false;
676
21.5k
        for (int i = 0; i < m_numWorkers; i++)
677
20.9k
        {
678
20.9k
            while (!(m_sleepBitmap & ((sleepbitmap_t)1 << i)))
679
0
                GIVE_UP_TIME();
680
20.9k
            m_workers[i].awaken();
681
20.9k
            m_workers[i].stop();
682
20.9k
        }
683
654
    }
684
654
}
685
686
ThreadPool::~ThreadPool()
687
654
{
688
654
    if (m_workers)
689
654
    {
690
21.5k
        for (int i = 0; i < m_numWorkers; i++)
691
20.9k
            m_workers[i].~WorkerThread();
692
654
    }
693
694
654
    X265_FREE(m_workers);
695
654
    X265_FREE(m_jpTable);
696
697
#if HAVE_LIBNUMA
698
    if(m_numaMask)
699
        numa_free_nodemask((struct bitmask*)m_numaMask);
700
#endif
701
654
}
702
703
void ThreadPool::setCurrentThreadAffinity()
704
23.8k
{
705
23.8k
    setThreadNodeAffinity(m_numaMask);
706
23.8k
}
707
708
void ThreadPool::setThreadNodeAffinity(void *numaMask)
709
23.8k
{
710
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
711
    UNREFERENCED_PARAMETER(numaMask);
712
    GROUP_AFFINITY groupAffinity;
713
    memset(&groupAffinity, 0, sizeof(GROUP_AFFINITY));
714
    groupAffinity.Group = m_groupAffinity.Group;
715
    groupAffinity.Mask = m_groupAffinity.Mask;
716
    const PGROUP_AFFINITY affinityPointer = &groupAffinity;
717
    if (SetThreadGroupAffinity(GetCurrentThread(), affinityPointer, NULL))
718
        return;
719
    else
720
        x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
721
#elif HAVE_LIBNUMA
722
    if (numa_available() >= 0)
723
    {
724
        numa_run_on_node_mask((struct bitmask*)numaMask);
725
        numa_set_interleave_mask((struct bitmask*)numaMask);
726
        numa_set_localalloc();
727
        return;
728
    }
729
    x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity for NUMA node mask\n");
730
#else
731
23.8k
    (void)numaMask;
732
23.8k
#endif
733
23.8k
    return;
734
23.8k
}
735
736
/* static */
737
int ThreadPool::getNumaNodeCount()
738
1.30k
{
739
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
740
    ULONG num = 1;
741
    if (GetNumaHighestNodeNumber(&num))
742
        num++;
743
    return (int)num;
744
#elif HAVE_LIBNUMA
745
    if (numa_available() >= 0)
746
        return numa_max_node() + 1;
747
    else
748
        return 1;
749
#else
750
1.30k
    return 1;
751
1.30k
#endif
752
1.30k
}
753
754
/* static */
755
int ThreadPool::getCpuCount()
756
654
{
757
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
758
    enum { MAX_NODE_NUM = 127 };
759
    int cpus = 0;
760
    int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
761
    GROUP_AFFINITY groupAffinity;
762
    for (int i = 0; i < numNumaNodes; i++)
763
    {
764
        GetNumaNodeProcessorMaskEx((UCHAR)i, &groupAffinity);
765
        cpus += popCount(groupAffinity.Mask);
766
    }
767
    return cpus;
768
#elif _WIN32
769
    SYSTEM_INFO sysinfo;
770
    GetSystemInfo(&sysinfo);
771
    return sysinfo.dwNumberOfProcessors;
772
#elif __unix__ && X265_ARCH_ARM
773
    /* Return the number of processors configured by OS. Because, most embedded linux distributions
774
     * uses only one processor as the scheduler doesn't have enough work to utilize all processors */
775
    return sysconf(_SC_NPROCESSORS_CONF);
776
#elif __unix__
777
654
    return sysconf(_SC_NPROCESSORS_ONLN);
778
#elif MACOS && __MACH__
779
    int nm[2];
780
    size_t len = 4;
781
    uint32_t count;
782
783
    nm[0] = CTL_HW;
784
    nm[1] = HW_AVAILCPU;
785
    sysctl(nm, 2, &count, &len, NULL, 0);
786
787
    if (count < 1)
788
    {
789
        nm[1] = HW_NCPU;
790
        sysctl(nm, 2, &count, &len, NULL, 0);
791
        if (count < 1)
792
            count = 1;
793
    }
794
795
    return count;
796
#else
797
    return 2; // default to 2 threads, everywhere else
798
#endif
799
654
}
800
801
int ThreadPool::getFrameThreadsCount(x265_param* p, int cpuCount)
802
654
{
803
654
    int rows = (p->sourceHeight + p->maxCUSize - 1) >> g_log2Size[p->maxCUSize];
804
654
    if (!p->bEnableWavefront)
805
123
        return X265_MIN3(cpuCount, (rows + 1) / 2, X265_MAX_FRAME_THREADS);
806
531
    else if (cpuCount >= 32)
807
531
        return (p->sourceHeight > 2000) ? 6 : 5; 
808
0
    else if (cpuCount >= 16)
809
0
        return 4; 
810
0
    else if (cpuCount >= 8)
811
#if _WIN32 && X265_ARCH_ARM64
812
        return cpuCount;
813
#else
814
0
        return 3;
815
0
#endif
816
0
    else if (cpuCount >= 4)
817
0
        return 2;
818
0
    else
819
0
        return 1;
820
654
}
821
822
int ThreadPool::configureTmeThreadCount(x265_param* param, int cpuCount)
823
0
{
824
0
    enum TmeResClass
825
0
    {
826
0
        TME_RES_LOW = 0,
827
0
        TME_RES_MID,
828
0
        TME_RES_HIGH,
829
0
        TME_RES_COUNT
830
0
    };
831
832
0
    enum TmeRule
833
0
    {
834
0
        TME_RULE_FAST_MEDIUM_SLOW = 0,
835
0
        TME_RULE_FASTER,
836
0
        TME_RULE_VERYFAST,
837
0
        TME_RULE_SUPERFAST,
838
0
        TME_RULE_ULTRAFAST,
839
0
        TME_RULE_COUNT
840
0
    };
841
842
0
    struct TmeRuleConfig
843
0
    {
844
0
        int taskBlockSize[TME_RES_COUNT];
845
0
        int numBufferRows[TME_RES_COUNT];
846
0
        int threadPercent[TME_RES_COUNT];
847
0
        bool widthBasedTaskBlockSize;
848
0
    };
849
850
0
    static const TmeRuleConfig s_tmeRuleConfig[TME_RULE_COUNT] =
851
0
    {
852
0
        { { 1, 1, 1 }, { 10, 10, 10 }, { 90, 80, 70 }, false }, // fast / medium and slower presets
853
0
        { { 1, 1, 1 }, { 10, 15, 10 }, { 90, 80, 70 }, false }, // faster preset and similar options
854
0
        { { 1, 1, 1 }, { 10, 15, 20 }, { 90, 80, 70 }, false }, // veryfast preset and similar options
855
0
        { { 2, 4, 4 }, { 10, 15, 20 }, { 90, 80, 60 }, false }, // superfast preset and similar options
856
0
        { { 0, 0, 0 }, { 15, 20, 20 }, { 90, 80, 50 }, true  }  // ultrafast preset and similar options
857
0
    };
858
859
0
    const int resClass = (param->sourceHeight >= 1440) ? TME_RES_HIGH :
860
0
                         (param->sourceHeight <= 720) ? TME_RES_LOW : TME_RES_MID;
861
862
0
    const bool ruleMatches[TME_RULE_COUNT] =
863
0
    {
864
0
        param->maxNumReferences >= 3 && param->subpelRefine >= 2,
865
0
        param->maxNumReferences >= 2 && param->subpelRefine >= 2,
866
0
        param->subpelRefine >= 1 && param->bframes > 3,
867
0
        param->subpelRefine && param->maxCUSize < 64,
868
0
        !param->subpelRefine || param->searchMethod == X265_DIA_SEARCH || param->minCUSize >= 16
869
0
    };
870
871
0
    int selectedRule = -1;
872
0
    for (int i = 0; i < TME_RULE_COUNT; i++)
873
0
    {
874
0
        if (ruleMatches[i])
875
0
        {
876
0
            selectedRule = i;
877
0
            break;
878
0
        }
879
0
    }
880
881
0
    if (selectedRule >= 0)
882
0
    {
883
0
        const TmeRuleConfig& cfg = s_tmeRuleConfig[selectedRule];
884
0
        param->tmeTaskBlockSize = cfg.widthBasedTaskBlockSize ? ((param->sourceWidth + 480 - 1) / 480) : cfg.taskBlockSize[resClass];
885
0
        param->tmeNumBufferRows = cfg.numBufferRows[resClass];
886
0
        return (cpuCount * cfg.threadPercent[resClass]) / 100;
887
0
    }
888
889
0
    static const int s_defaultThreadPercent[TME_RES_COUNT] = { 80, 80, 70 };
890
0
    return (cpuCount * s_defaultThreadPercent[resClass]) / 100;
891
0
}
892
893
} // end namespace X265_NS