Coverage Report

Created: 2026-03-12 07:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ffmpeg/libavutil/slicethread.c
Line
Count
Source
1
/*
2
 * This file is part of FFmpeg.
3
 *
4
 * FFmpeg is free software; you can redistribute it and/or
5
 * modify it under the terms of the GNU Lesser General Public
6
 * License as published by the Free Software Foundation; either
7
 * version 2.1 of the License, or (at your option) any later version.
8
 *
9
 * FFmpeg is distributed in the hope that it will be useful,
10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12
 * Lesser General Public License for more details.
13
 *
14
 * You should have received a copy of the GNU Lesser General Public
15
 * License along with FFmpeg; if not, write to the Free Software
16
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
 */
18
19
#include <stdatomic.h>
20
#include "cpu.h"
21
#include "internal.h"
22
#include "slicethread.h"
23
#include "mem.h"
24
#include "thread.h"
25
#include "avassert.h"
26
27
#define MAX_AUTO_THREADS 16
28
29
#if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
30
31
typedef struct WorkerContext {
32
    AVSliceThread   *ctx;
33
    pthread_mutex_t mutex;
34
    pthread_cond_t  cond;
35
    pthread_t       thread;
36
    int             done;
37
} WorkerContext;
38
39
struct AVSliceThread {
40
    WorkerContext   *workers;
41
    int             nb_threads;
42
    int             nb_active_threads;
43
    int             nb_jobs;
44
45
    atomic_uint     first_job;
46
    atomic_uint     current_job;
47
    pthread_mutex_t done_mutex;
48
    pthread_cond_t  done_cond;
49
    int             done;
50
    int             finished;
51
52
    void            *priv;
53
    void            (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
54
    void            (*main_func)(void *priv);
55
};
56
57
static int run_jobs(AVSliceThread *ctx)
58
0
{
59
0
    unsigned nb_jobs    = ctx->nb_jobs;
60
0
    unsigned nb_active_threads = ctx->nb_active_threads;
61
0
    unsigned first_job    = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
62
0
    unsigned current_job  = first_job;
63
64
0
    do {
65
0
        ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
66
0
    } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
67
68
0
    return current_job == nb_jobs + nb_active_threads - 1;
69
0
}
70
71
static void *attribute_align_arg thread_worker(void *v)
72
0
{
73
0
    WorkerContext *w = v;
74
0
    AVSliceThread *ctx = w->ctx;
75
76
0
    pthread_mutex_lock(&w->mutex);
77
0
    pthread_cond_signal(&w->cond);
78
79
0
    while (1) {
80
0
        w->done = 1;
81
0
        while (w->done)
82
0
            pthread_cond_wait(&w->cond, &w->mutex);
83
84
0
        if (ctx->finished) {
85
0
            pthread_mutex_unlock(&w->mutex);
86
0
            return NULL;
87
0
        }
88
89
0
        if (run_jobs(ctx)) {
90
0
            pthread_mutex_lock(&ctx->done_mutex);
91
0
            ctx->done = 1;
92
0
            pthread_cond_signal(&ctx->done_cond);
93
0
            pthread_mutex_unlock(&ctx->done_mutex);
94
0
        }
95
0
    }
96
0
}
97
98
av_cold
99
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
100
                              void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
101
                              void (*main_func)(void *priv),
102
                              int nb_threads)
103
0
{
104
0
    AVSliceThread *ctx;
105
0
    int nb_workers, i;
106
0
    int ret;
107
108
0
    av_assert0(nb_threads >= 0);
109
0
    if (!nb_threads) {
110
0
        int nb_cpus = av_cpu_count();
111
0
        if (nb_cpus > 1)
112
0
            nb_threads = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
113
0
        else
114
0
            nb_threads = 1;
115
0
    }
116
117
0
    nb_workers = nb_threads;
118
0
    if (!main_func)
119
0
        nb_workers--;
120
121
0
    *pctx = ctx = av_mallocz(sizeof(*ctx));
122
0
    if (!ctx)
123
0
        return AVERROR(ENOMEM);
124
125
0
    if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
126
0
        av_freep(pctx);
127
0
        return AVERROR(ENOMEM);
128
0
    }
129
130
0
    ctx->priv        = priv;
131
0
    ctx->worker_func = worker_func;
132
0
    ctx->main_func   = main_func;
133
0
    ctx->nb_threads  = nb_threads;
134
0
    ctx->nb_active_threads = 0;
135
0
    ctx->nb_jobs     = 0;
136
0
    ctx->finished    = 0;
137
138
0
    atomic_init(&ctx->first_job, 0);
139
0
    atomic_init(&ctx->current_job, 0);
140
0
    ret = pthread_mutex_init(&ctx->done_mutex, NULL);
141
0
    if (ret) {
142
0
        av_freep(&ctx->workers);
143
0
        av_freep(pctx);
144
0
        return AVERROR(ret);
145
0
    }
146
0
    ret = pthread_cond_init(&ctx->done_cond, NULL);
147
0
    if (ret) {
148
0
        ctx->nb_threads = main_func ? 0 : 1;
149
0
        avpriv_slicethread_free(pctx);
150
0
        return AVERROR(ret);
151
0
    }
152
0
    ctx->done        = 0;
153
154
0
    for (i = 0; i < nb_workers; i++) {
155
0
        WorkerContext *w = &ctx->workers[i];
156
0
        w->ctx = ctx;
157
0
        ret = pthread_mutex_init(&w->mutex, NULL);
158
0
        if (ret) {
159
0
            ctx->nb_threads = main_func ? i : i + 1;
160
0
            avpriv_slicethread_free(pctx);
161
0
            return AVERROR(ret);
162
0
        }
163
0
        ret = pthread_cond_init(&w->cond, NULL);
164
0
        if (ret) {
165
0
            pthread_mutex_destroy(&w->mutex);
166
0
            ctx->nb_threads = main_func ? i : i + 1;
167
0
            avpriv_slicethread_free(pctx);
168
0
            return AVERROR(ret);
169
0
        }
170
0
        pthread_mutex_lock(&w->mutex);
171
0
        w->done = 0;
172
173
0
        if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
174
0
            ctx->nb_threads = main_func ? i : i + 1;
175
0
            pthread_mutex_unlock(&w->mutex);
176
0
            pthread_cond_destroy(&w->cond);
177
0
            pthread_mutex_destroy(&w->mutex);
178
0
            avpriv_slicethread_free(pctx);
179
0
            return AVERROR(ret);
180
0
        }
181
182
0
        while (!w->done)
183
0
            pthread_cond_wait(&w->cond, &w->mutex);
184
0
        pthread_mutex_unlock(&w->mutex);
185
0
    }
186
187
0
    return nb_threads;
188
0
}
189
190
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
191
0
{
192
0
    int nb_workers, i, is_last = 0;
193
194
0
    av_assert0(nb_jobs > 0);
195
0
    ctx->nb_jobs           = nb_jobs;
196
0
    ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
197
0
    atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
198
0
    atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
199
0
    nb_workers             = ctx->nb_active_threads;
200
0
    if (!ctx->main_func || !execute_main)
201
0
        nb_workers--;
202
203
0
    for (i = 0; i < nb_workers; i++) {
204
0
        WorkerContext *w = &ctx->workers[i];
205
0
        pthread_mutex_lock(&w->mutex);
206
0
        w->done = 0;
207
0
        pthread_cond_signal(&w->cond);
208
0
        pthread_mutex_unlock(&w->mutex);
209
0
    }
210
211
0
    if (ctx->main_func && execute_main)
212
0
        ctx->main_func(ctx->priv);
213
0
    else
214
0
        is_last = run_jobs(ctx);
215
216
0
    if (!is_last) {
217
0
        pthread_mutex_lock(&ctx->done_mutex);
218
0
        while (!ctx->done)
219
0
            pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
220
0
        ctx->done = 0;
221
0
        pthread_mutex_unlock(&ctx->done_mutex);
222
0
    }
223
0
}
224
225
av_cold void avpriv_slicethread_free(AVSliceThread **pctx)
226
0
{
227
0
    AVSliceThread *ctx = *pctx;
228
0
    int nb_workers, i;
229
230
0
    if (!ctx)
231
0
        return;
232
233
0
    nb_workers = ctx->nb_threads;
234
0
    if (!ctx->main_func)
235
0
        nb_workers--;
236
237
0
    ctx->finished = 1;
238
0
    for (i = 0; i < nb_workers; i++) {
239
0
        WorkerContext *w = &ctx->workers[i];
240
0
        pthread_mutex_lock(&w->mutex);
241
0
        w->done = 0;
242
0
        pthread_cond_signal(&w->cond);
243
0
        pthread_mutex_unlock(&w->mutex);
244
0
    }
245
246
0
    for (i = 0; i < nb_workers; i++) {
247
0
        WorkerContext *w = &ctx->workers[i];
248
0
        pthread_join(w->thread, NULL);
249
0
        pthread_cond_destroy(&w->cond);
250
0
        pthread_mutex_destroy(&w->mutex);
251
0
    }
252
253
0
    pthread_cond_destroy(&ctx->done_cond);
254
0
    pthread_mutex_destroy(&ctx->done_mutex);
255
0
    av_freep(&ctx->workers);
256
0
    av_freep(pctx);
257
0
}
258
259
#else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
260
261
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
262
                              void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
263
                              void (*main_func)(void *priv),
264
                              int nb_threads)
265
{
266
    *pctx = NULL;
267
    return AVERROR(ENOSYS);
268
}
269
270
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
271
{
272
    av_assert0(0);
273
}
274
275
void avpriv_slicethread_free(AVSliceThread **pctx)
276
{
277
    av_assert0(!pctx || !*pctx);
278
}
279
280
#endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */