Coverage Report

Created: 2025-12-31 07:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ffmpeg/libavutil/slicethread.c
Line
Count
Source
1
/*
2
 * This file is part of FFmpeg.
3
 *
4
 * FFmpeg is free software; you can redistribute it and/or
5
 * modify it under the terms of the GNU Lesser General Public
6
 * License as published by the Free Software Foundation; either
7
 * version 2.1 of the License, or (at your option) any later version.
8
 *
9
 * FFmpeg is distributed in the hope that it will be useful,
10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12
 * Lesser General Public License for more details.
13
 *
14
 * You should have received a copy of the GNU Lesser General Public
15
 * License along with FFmpeg; if not, write to the Free Software
16
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
 */
18
19
#include <stdatomic.h>
20
#include "cpu.h"
21
#include "internal.h"
22
#include "slicethread.h"
23
#include "mem.h"
24
#include "thread.h"
25
#include "avassert.h"
26
27
#define MAX_AUTO_THREADS 16
28
29
#if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
30
31
typedef struct WorkerContext {
32
    AVSliceThread   *ctx;
33
    pthread_mutex_t mutex;
34
    pthread_cond_t  cond;
35
    pthread_t       thread;
36
    int             done;
37
} WorkerContext;
38
39
struct AVSliceThread {
40
    WorkerContext   *workers;
41
    int             nb_threads;
42
    int             nb_active_threads;
43
    int             nb_jobs;
44
45
    atomic_uint     first_job;
46
    atomic_uint     current_job;
47
    pthread_mutex_t done_mutex;
48
    pthread_cond_t  done_cond;
49
    int             done;
50
    int             finished;
51
52
    void            *priv;
53
    void            (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
54
    void            (*main_func)(void *priv);
55
};
56
57
static int run_jobs(AVSliceThread *ctx)
58
0
{
59
0
    unsigned nb_jobs    = ctx->nb_jobs;
60
0
    unsigned nb_active_threads = ctx->nb_active_threads;
61
0
    unsigned first_job    = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
62
0
    unsigned current_job  = first_job;
63
64
0
    do {
65
0
        ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
66
0
    } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
67
68
0
    return current_job == nb_jobs + nb_active_threads - 1;
69
0
}
70
71
static void *attribute_align_arg thread_worker(void *v)
72
0
{
73
0
    WorkerContext *w = v;
74
0
    AVSliceThread *ctx = w->ctx;
75
76
0
    pthread_mutex_lock(&w->mutex);
77
0
    pthread_cond_signal(&w->cond);
78
79
0
    while (1) {
80
0
        w->done = 1;
81
0
        while (w->done)
82
0
            pthread_cond_wait(&w->cond, &w->mutex);
83
84
0
        if (ctx->finished) {
85
0
            pthread_mutex_unlock(&w->mutex);
86
0
            return NULL;
87
0
        }
88
89
0
        if (run_jobs(ctx)) {
90
0
            pthread_mutex_lock(&ctx->done_mutex);
91
0
            ctx->done = 1;
92
0
            pthread_cond_signal(&ctx->done_cond);
93
0
            pthread_mutex_unlock(&ctx->done_mutex);
94
0
        }
95
0
    }
96
0
}
97
98
av_cold
99
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
100
                              void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
101
                              void (*main_func)(void *priv),
102
                              int nb_threads)
103
0
{
104
0
    AVSliceThread *ctx;
105
0
    int nb_workers, i;
106
0
    int ret;
107
108
0
    av_assert0(nb_threads >= 0);
109
0
    if (!nb_threads) {
110
0
        int nb_cpus = av_cpu_count();
111
0
        if (nb_cpus > 1)
112
0
            nb_threads = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
113
0
        else
114
0
            nb_threads = 1;
115
0
    }
116
117
0
    nb_workers = nb_threads;
118
0
    if (!main_func)
119
0
        nb_workers--;
120
121
0
    *pctx = ctx = av_mallocz(sizeof(*ctx));
122
0
    if (!ctx)
123
0
        return AVERROR(ENOMEM);
124
125
0
    if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
126
0
        av_freep(pctx);
127
0
        return AVERROR(ENOMEM);
128
0
    }
129
130
0
    ctx->priv        = priv;
131
0
    ctx->worker_func = worker_func;
132
0
    ctx->main_func   = main_func;
133
0
    ctx->nb_threads  = nb_threads;
134
0
    ctx->nb_active_threads = 0;
135
0
    ctx->nb_jobs     = 0;
136
0
    ctx->finished    = 0;
137
138
0
    atomic_init(&ctx->first_job, 0);
139
0
    atomic_init(&ctx->current_job, 0);
140
0
    ret = pthread_mutex_init(&ctx->done_mutex, NULL);
141
0
    if (ret) {
142
0
        av_freep(&ctx->workers);
143
0
        av_freep(pctx);
144
0
        return AVERROR(ret);
145
0
    }
146
0
    ret = pthread_cond_init(&ctx->done_cond, NULL);
147
0
    if (ret) {
148
0
        ctx->nb_threads = main_func ? 0 : 1;
149
0
        avpriv_slicethread_free(pctx);
150
0
        return AVERROR(ret);
151
0
    }
152
0
    ctx->done        = 0;
153
154
0
    for (i = 0; i < nb_workers; i++) {
155
0
        WorkerContext *w = &ctx->workers[i];
156
0
        int ret;
157
0
        w->ctx = ctx;
158
0
        ret = pthread_mutex_init(&w->mutex, NULL);
159
0
        if (ret) {
160
0
            ctx->nb_threads = main_func ? i : i + 1;
161
0
            avpriv_slicethread_free(pctx);
162
0
            return AVERROR(ret);
163
0
        }
164
0
        ret = pthread_cond_init(&w->cond, NULL);
165
0
        if (ret) {
166
0
            pthread_mutex_destroy(&w->mutex);
167
0
            ctx->nb_threads = main_func ? i : i + 1;
168
0
            avpriv_slicethread_free(pctx);
169
0
            return AVERROR(ret);
170
0
        }
171
0
        pthread_mutex_lock(&w->mutex);
172
0
        w->done = 0;
173
174
0
        if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
175
0
            ctx->nb_threads = main_func ? i : i + 1;
176
0
            pthread_mutex_unlock(&w->mutex);
177
0
            pthread_cond_destroy(&w->cond);
178
0
            pthread_mutex_destroy(&w->mutex);
179
0
            avpriv_slicethread_free(pctx);
180
0
            return AVERROR(ret);
181
0
        }
182
183
0
        while (!w->done)
184
0
            pthread_cond_wait(&w->cond, &w->mutex);
185
0
        pthread_mutex_unlock(&w->mutex);
186
0
    }
187
188
0
    return nb_threads;
189
0
}
190
191
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
192
0
{
193
0
    int nb_workers, i, is_last = 0;
194
195
0
    av_assert0(nb_jobs > 0);
196
0
    ctx->nb_jobs           = nb_jobs;
197
0
    ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
198
0
    atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
199
0
    atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
200
0
    nb_workers             = ctx->nb_active_threads;
201
0
    if (!ctx->main_func || !execute_main)
202
0
        nb_workers--;
203
204
0
    for (i = 0; i < nb_workers; i++) {
205
0
        WorkerContext *w = &ctx->workers[i];
206
0
        pthread_mutex_lock(&w->mutex);
207
0
        w->done = 0;
208
0
        pthread_cond_signal(&w->cond);
209
0
        pthread_mutex_unlock(&w->mutex);
210
0
    }
211
212
0
    if (ctx->main_func && execute_main)
213
0
        ctx->main_func(ctx->priv);
214
0
    else
215
0
        is_last = run_jobs(ctx);
216
217
0
    if (!is_last) {
218
0
        pthread_mutex_lock(&ctx->done_mutex);
219
0
        while (!ctx->done)
220
0
            pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
221
0
        ctx->done = 0;
222
0
        pthread_mutex_unlock(&ctx->done_mutex);
223
0
    }
224
0
}
225
226
av_cold void avpriv_slicethread_free(AVSliceThread **pctx)
227
21.8k
{
228
21.8k
    AVSliceThread *ctx = *pctx;
229
21.8k
    int nb_workers, i;
230
231
21.8k
    if (!ctx)
232
21.8k
        return;
233
234
0
    nb_workers = ctx->nb_threads;
235
0
    if (!ctx->main_func)
236
0
        nb_workers--;
237
238
0
    ctx->finished = 1;
239
0
    for (i = 0; i < nb_workers; i++) {
240
0
        WorkerContext *w = &ctx->workers[i];
241
0
        pthread_mutex_lock(&w->mutex);
242
0
        w->done = 0;
243
0
        pthread_cond_signal(&w->cond);
244
0
        pthread_mutex_unlock(&w->mutex);
245
0
    }
246
247
0
    for (i = 0; i < nb_workers; i++) {
248
0
        WorkerContext *w = &ctx->workers[i];
249
0
        pthread_join(w->thread, NULL);
250
0
        pthread_cond_destroy(&w->cond);
251
0
        pthread_mutex_destroy(&w->mutex);
252
0
    }
253
254
0
    pthread_cond_destroy(&ctx->done_cond);
255
0
    pthread_mutex_destroy(&ctx->done_mutex);
256
0
    av_freep(&ctx->workers);
257
0
    av_freep(pctx);
258
0
}
259
260
#else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
261
262
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
263
                              void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
264
                              void (*main_func)(void *priv),
265
                              int nb_threads)
266
{
267
    *pctx = NULL;
268
    return AVERROR(ENOSYS);
269
}
270
271
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
272
{
273
    av_assert0(0);
274
}
275
276
void avpriv_slicethread_free(AVSliceThread **pctx)
277
{
278
    av_assert0(!pctx || !*pctx);
279
}
280
281
#endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */