Coverage Report

Created: 2022-08-24 06:15

/src/x265/source/common/threadpool.h
Line
Count
Source (jump to first uncovered line)
1
/*****************************************************************************
2
 * Copyright (C) 2013-2020 MulticoreWare, Inc
3
 *
4
 * Authors: Steve Borho <steve@borho.org>
5
 *
6
 * This program is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111, USA.
19
 *
20
 * This program is also available under a commercial proprietary license.
21
 * For more information, contact us at license @ x265.com
22
 *****************************************************************************/
23
24
#ifndef X265_THREADPOOL_H
25
#define X265_THREADPOOL_H
26
27
#include "common.h"
28
#include "threading.h"
29
30
namespace X265_NS {
31
// x265 private namespace
32
33
class ThreadPool;
34
class WorkerThread;
35
class BondedTaskGroup;
36
37
#if X86_64
38
typedef uint64_t sleepbitmap_t;
39
#else
40
typedef uint32_t sleepbitmap_t;
41
#endif
42
43
static const sleepbitmap_t ALL_POOL_THREADS = (sleepbitmap_t)-1;
44
enum { MAX_POOL_THREADS = sizeof(sleepbitmap_t) * 8 };
45
enum { INVALID_SLICE_PRIORITY = 10 }; // a value larger than any X265_TYPE_* macro
46
47
// Frame level job providers. FrameEncoder and Lookahead derive from
48
// this class and implement findJob()
49
class JobProvider
50
{
51
public:
52
53
    ThreadPool*   m_pool;
54
    sleepbitmap_t m_ownerBitmap;
55
    int           m_jpId;
56
    int           m_sliceType;
57
    bool          m_helpWanted;
58
    bool          m_isFrameEncoder; /* rather ugly hack, but nothing better presents itself */
59
60
    JobProvider()
61
        : m_pool(NULL)
62
        , m_ownerBitmap(0)
63
        , m_jpId(-1)
64
        , m_sliceType(INVALID_SLICE_PRIORITY)
65
        , m_helpWanted(false)
66
        , m_isFrameEncoder(false)
67
3.81k
    {}
68
69
3.81k
    virtual ~JobProvider() {}
70
71
    // Worker threads will call this method to perform work
72
    virtual void findJob(int workerThreadId) = 0;
73
74
    // Will awaken one idle thread, preferring a thread which most recently
75
    // performed work for this provider.
76
    void tryWakeOne();
77
};
78
79
class ThreadPool
80
{
81
public:
82
83
    sleepbitmap_t m_sleepBitmap;
84
    int           m_numProviders;
85
    int           m_numWorkers;
86
    void*         m_numaMask; // node mask in linux, cpu mask in windows
87
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7 
88
    GROUP_AFFINITY m_groupAffinity;
89
#endif
90
    bool          m_isActive;
91
92
    JobProvider** m_jpTable;
93
    WorkerThread* m_workers;
94
95
    ThreadPool();
96
    ~ThreadPool();
97
98
    bool create(int numThreads, int maxProviders, uint64_t nodeMask);
99
    bool start();
100
    void stopWorkers();
101
    void setCurrentThreadAffinity();
102
    void setThreadNodeAffinity(void *numaMask);
103
    int  tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap);
104
    int  tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master);
105
    static ThreadPool* allocThreadPools(x265_param* p, int& numPools, bool isThreadsReserved);
106
    static int  getCpuCount();
107
    static int  getNumaNodeCount();
108
    static void getFrameThreadsCount(x265_param* p,int cpuCount);
109
};
110
111
/* Any worker thread may enlist the help of idle worker threads from the same
112
 * job provider. They must derive from this class and implement the
113
 * processTasks() method.  To use, an instance must be instantiated by a worker
114
 * thread (referred to as the master thread) and then tryBondPeers() must be
115
 * called. If it returns non-zero then some number of slave worker threads are
116
 * already in the process of calling your processTasks() function. The master
117
 * thread should participate and call processTasks() itself. When
118
 * waitForExit() returns, all bonded peer threads are guaranteed to have
119
 * exitied processTasks(). Since the thread count is small, it uses explicit
120
 * locking instead of atomic counters and bitmasks */
121
class BondedTaskGroup
122
{
123
public:
124
125
    Lock              m_lock;
126
    ThreadSafeInteger m_exitedPeerCount;
127
    int               m_bondedPeerCount;
128
    int               m_jobTotal;
129
    int               m_jobAcquired;
130
131
1.20k
    BondedTaskGroup()  { m_bondedPeerCount = m_jobTotal = m_jobAcquired = 0; }
132
133
    /* Do not allow the instance to be destroyed before all bonded peers have
134
     * exited processTasks() */
135
1.20k
    ~BondedTaskGroup() { waitForExit(); }
136
137
    /* Try to enlist the help of idle worker threads on most recently associated
138
     * with the given job provider and "bond" them to work on your tasks. Up to
139
     * maxPeers worker threads will call your processTasks() method. */
140
    int tryBondPeers(JobProvider& jp, int maxPeers)
141
0
    {
142
0
        int count = jp.m_pool->tryBondPeers(maxPeers, jp.m_ownerBitmap, *this);
143
0
        m_bondedPeerCount += count;
144
0
        return count;
145
0
    }
146
147
    /* Try to enlist the help of any idle worker threads and "bond" them to work
148
     * on your tasks. Up to maxPeers worker threads will call your
149
     * processTasks() method. */
150
    int tryBondPeers(ThreadPool& pool, int maxPeers)
151
698
    {
152
698
        int count = pool.tryBondPeers(maxPeers, ALL_POOL_THREADS, *this);
153
698
        m_bondedPeerCount += count;
154
698
        return count;
155
698
    }
156
157
    /* Returns when all bonded peers have exited processTasks(). It does *NOT*
158
     * ensure all tasks are completed (but this is generally implied). */
159
    void waitForExit()
160
1.89k
    {
161
1.89k
        int exited = m_exitedPeerCount.get();
162
1.91k
        while (m_bondedPeerCount != exited)
163
12
            exited = m_exitedPeerCount.waitForChange(exited);
164
1.89k
    }
165
166
    /* Derived classes must define this method. The worker thread ID may be
167
     * used to index into thread local data, or ignored.  The ID will be between
168
     * 0 and jp.m_numWorkers - 1 */
169
    virtual void processTasks(int workerThreadId) = 0;
170
};
171
172
} // end namespace X265_NS
173
174
#endif // ifndef X265_THREADPOOL_H