Coverage Report

Created: 2026-01-26 07:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/mpv/misc/thread_pool.c
Line
Count
Source
1
/* Copyright (C) 2018 the mpv developers
2
 *
3
 * Permission to use, copy, modify, and/or distribute this software for any
4
 * purpose with or without fee is hereby granted, provided that the above
5
 * copyright notice and this permission notice appear in all copies.
6
 *
7
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
8
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
10
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14
 */
15
16
#include "common/common.h"
17
#include "osdep/threads.h"
18
#include "osdep/timer.h"
19
20
#include "thread_pool.h"
21
22
// Threads destroy themselves after this many seconds, if there's no new work
23
// and the thread count is above the configured minimum.
24
#define DESTROY_TIMEOUT 10
25
26
struct work {
27
    void (*fn)(void *ctx);
28
    void *fn_ctx;
29
};
30
31
struct mp_thread_pool {
32
    int min_threads, max_threads;
33
34
    mp_mutex lock;
35
    mp_cond wakeup;
36
37
    // --- the following fields are protected by lock
38
39
    mp_thread *threads;
40
    int num_threads;
41
42
    // Number of threads which have taken up work and are still processing it.
43
    int busy_threads;
44
45
    bool terminate;
46
47
    struct work *work;
48
    int num_work;
49
};
50
51
static MP_THREAD_VOID worker_thread(void *arg)
52
71.5k
{
53
71.5k
    struct mp_thread_pool *pool = arg;
54
55
71.5k
    mp_thread_set_name("worker");
56
57
71.5k
    mp_mutex_lock(&pool->lock);
58
59
71.5k
    int64_t destroy_deadline = 0;
60
71.5k
    bool got_timeout = false;
61
215k
    while (1) {
62
215k
        struct work work = {0};
63
215k
        if (pool->num_work > 0) {
64
71.7k
            work = pool->work[pool->num_work - 1];
65
71.7k
            pool->num_work -= 1;
66
71.7k
        }
67
68
215k
        if (!work.fn) {
69
143k
            if (got_timeout || pool->terminate)
70
71.5k
                break;
71
72
71.7k
            if (pool->num_threads > pool->min_threads) {
73
0
                if (!destroy_deadline)
74
0
                    destroy_deadline = mp_time_ns() + MP_TIME_S_TO_NS(DESTROY_TIMEOUT);
75
0
                if (mp_cond_timedwait_until(&pool->wakeup, &pool->lock, destroy_deadline))
76
0
                    got_timeout = pool->num_threads > pool->min_threads;
77
71.7k
            } else {
78
71.7k
                mp_cond_wait(&pool->wakeup, &pool->lock);
79
71.7k
            }
80
71.7k
            continue;
81
143k
        }
82
83
71.7k
        pool->busy_threads += 1;
84
71.7k
        mp_mutex_unlock(&pool->lock);
85
86
71.7k
        work.fn(work.fn_ctx);
87
88
71.7k
        mp_mutex_lock(&pool->lock);
89
71.7k
        pool->busy_threads -= 1;
90
91
71.7k
        destroy_deadline = 0;
92
71.7k
        got_timeout = false;
93
71.7k
    }
94
95
    // If no termination signal was given, it must mean we died because of a
96
    // timeout, and nobody is waiting for us. We have to remove ourselves.
97
71.5k
    if (!pool->terminate) {
98
0
        for (int n = 0; n < pool->num_threads; n++) {
99
0
            if (mp_thread_id_equal(mp_thread_get_id(pool->threads[n]),
100
0
                                   mp_thread_current_id()))
101
0
            {
102
0
                mp_thread_detach(pool->threads[n]);
103
0
                MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n);
104
0
                mp_mutex_unlock(&pool->lock);
105
0
                MP_THREAD_RETURN();
106
0
            }
107
0
        }
108
0
        MP_ASSERT_UNREACHABLE();
109
0
    }
110
111
71.5k
    mp_mutex_unlock(&pool->lock);
112
71.5k
    MP_THREAD_RETURN();
113
71.5k
}
114
115
static void thread_pool_dtor(void *ctx)
116
139k
{
117
139k
    struct mp_thread_pool *pool = ctx;
118
119
120
139k
    mp_mutex_lock(&pool->lock);
121
122
139k
    pool->terminate = true;
123
139k
    mp_cond_broadcast(&pool->wakeup);
124
125
139k
    mp_thread *threads = pool->threads;
126
139k
    int num_threads = pool->num_threads;
127
128
139k
    pool->threads = NULL;
129
139k
    pool->num_threads = 0;
130
131
139k
    mp_mutex_unlock(&pool->lock);
132
133
211k
    for (int n = 0; n < num_threads; n++)
134
71.5k
        mp_thread_join(threads[n]);
135
136
139k
    mp_assert(pool->num_work == 0);
137
139k
    mp_assert(pool->num_threads == 0);
138
139k
    mp_cond_destroy(&pool->wakeup);
139
139k
    mp_mutex_destroy(&pool->lock);
140
139k
}
141
142
static bool add_thread(struct mp_thread_pool *pool)
143
71.5k
{
144
71.5k
    mp_thread thread;
145
146
71.5k
    if (mp_thread_create(&thread, worker_thread, pool) != 0)
147
0
        return false;
148
149
71.5k
    MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
150
71.5k
    return true;
151
71.5k
}
152
153
struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads,
154
                                             int min_threads, int max_threads)
155
139k
{
156
139k
    mp_assert(min_threads >= 0);
157
139k
    mp_assert(init_threads <= min_threads);
158
139k
    mp_assert(max_threads > 0 && max_threads >= min_threads);
159
160
139k
    struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool);
161
139k
    talloc_set_destructor(pool, thread_pool_dtor);
162
163
139k
    mp_mutex_init(&pool->lock);
164
139k
    mp_cond_init(&pool->wakeup);
165
166
139k
    pool->min_threads = min_threads;
167
139k
    pool->max_threads = max_threads;
168
169
139k
    mp_mutex_lock(&pool->lock);
170
139k
    for (int n = 0; n < init_threads; n++)
171
0
        add_thread(pool);
172
139k
    bool ok = pool->num_threads >= init_threads;
173
139k
    mp_mutex_unlock(&pool->lock);
174
175
139k
    if (!ok)
176
0
        TA_FREEP(&pool);
177
178
139k
    return pool;
179
139k
}
180
181
static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx),
182
                            void *fn_ctx, bool allow_queue)
183
71.7k
{
184
71.7k
    bool ok = true;
185
186
71.7k
    mp_assert(fn);
187
188
71.7k
    mp_mutex_lock(&pool->lock);
189
71.7k
    struct work work = {fn, fn_ctx};
190
191
    // If there are not enough threads to process all at once, but we can
192
    // create a new thread, then do so. If work is queued quickly, it can
193
    // happen that not all available threads have picked up work yet (up to
194
    // num_threads - busy_threads threads), which has to be accounted for.
195
71.7k
    if (pool->busy_threads + pool->num_work + 1 > pool->num_threads &&
196
71.5k
        pool->num_threads < pool->max_threads)
197
71.5k
    {
198
71.5k
        if (!add_thread(pool)) {
199
            // If we can queue it, it'll get done as long as there is 1 thread.
200
0
            ok = allow_queue && pool->num_threads > 0;
201
0
        }
202
71.5k
    }
203
204
71.7k
    if (ok) {
205
71.7k
        MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
206
71.7k
        mp_cond_signal(&pool->wakeup);
207
71.7k
    }
208
209
71.7k
    mp_mutex_unlock(&pool->lock);
210
71.7k
    return ok;
211
71.7k
}
212
213
bool mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
214
                          void *fn_ctx)
215
71.7k
{
216
71.7k
    return thread_pool_add(pool, fn, fn_ctx, true);
217
71.7k
}
218
219
bool mp_thread_pool_run(struct mp_thread_pool *pool, void (*fn)(void *ctx),
220
                        void *fn_ctx)
221
0
{
222
    return thread_pool_add(pool, fn, fn_ctx, false);
223
0
}