Coverage Report

Created: 2025-09-04 07:15

/src/mpv/misc/dispatch.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * This file is part of mpv.
3
 *
4
 * mpv is free software; you can redistribute it and/or
5
 * modify it under the terms of the GNU Lesser General Public
6
 * License as published by the Free Software Foundation; either
7
 * version 2.1 of the License, or (at your option) any later version.
8
 *
9
 * mpv is distributed in the hope that it will be useful,
10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 * GNU Lesser General Public License for more details.
13
 *
14
 * You should have received a copy of the GNU Lesser General Public
15
 * License along with mpv.  If not, see <http://www.gnu.org/licenses/>.
16
 */
17
18
#include <stdbool.h>
19
#include <assert.h>
20
21
#include "common/common.h"
22
#include "osdep/threads.h"
23
#include "osdep/timer.h"
24
25
#include "dispatch.h"
26
27
struct mp_dispatch_queue {
28
    struct mp_dispatch_item *head, *tail;
29
    mp_mutex lock;
30
    mp_cond cond;
31
    void (*wakeup_fn)(void *wakeup_ctx);
32
    void *wakeup_ctx;
33
    void (*onlock_fn)(void *onlock_ctx);
34
    void *onlock_ctx;
35
    // Time at which mp_dispatch_queue_process() should return.
36
    int64_t wait;
37
    // Make mp_dispatch_queue_process() exit if it's idle.
38
    bool interrupted;
39
    // The target thread is in mp_dispatch_queue_process() (and either idling,
40
    // locked, or running a dispatch callback).
41
    bool in_process;
42
    mp_thread_id in_process_thread_id;
43
    // The target thread is in mp_dispatch_queue_process(), and currently
44
    // something has exclusive access to it (e.g. running a dispatch callback,
45
    // or a different thread got it with mp_dispatch_lock()).
46
    bool locked;
47
    // A mp_dispatch_lock() call is requesting an exclusive lock.
48
    size_t lock_requests;
49
    // locked==true is due to a mp_dispatch_lock() call (for debugging).
50
    bool locked_explicit;
51
    mp_thread_id locked_explicit_thread_id;
52
};
53
54
struct mp_dispatch_item {
55
    mp_dispatch_fn fn;
56
    void *fn_data;
57
    bool asynchronous;
58
    bool mergeable;
59
    bool completed;
60
    struct mp_dispatch_item *next;
61
};
62
63
static void queue_dtor(void *p)
64
188k
{
65
188k
    struct mp_dispatch_queue *queue = p;
66
188k
    mp_assert(!queue->head);
67
188k
    mp_assert(!queue->in_process);
68
188k
    mp_assert(!queue->lock_requests);
69
188k
    mp_assert(!queue->locked);
70
188k
    mp_cond_destroy(&queue->cond);
71
188k
    mp_mutex_destroy(&queue->lock);
72
188k
}
73
74
// A dispatch queue lets other threads run callbacks in a target thread.
75
// The target thread is the thread which calls mp_dispatch_queue_process().
76
// Free the dispatch queue with talloc_free(). At the time of destruction,
77
// the queue must be empty. The easiest way to guarantee this is to
78
// terminate all potential senders, then call mp_dispatch_run() with a
79
// function that e.g. makes the target thread exit, then mp_thread_join() the
80
// target thread, and finally destroy the queue. Another way is calling
81
// mp_dispatch_queue_process() after terminating all potential senders, and
82
// then destroying the queue.
83
struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
84
188k
{
85
188k
    struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
86
188k
    *queue = (struct mp_dispatch_queue){0};
87
188k
    talloc_set_destructor(queue, queue_dtor);
88
188k
    mp_mutex_init(&queue->lock);
89
188k
    mp_cond_init(&queue->cond);
90
188k
    return queue;
91
188k
}
92
93
// Set a custom function that should be called to guarantee that the target
94
// thread wakes up. This is intended for use with code that needs to block
95
// on non-pthread primitives, such as e.g. poll(). In the case of poll(),
96
// the wakeup_fn could for example write a byte into a "wakeup" pipe in order
97
// to unblock the poll(). The wakeup_fn is called from the dispatch queue
98
// when there are new dispatch items, and the target thread should then enter
99
// mp_dispatch_queue_process() as soon as possible.
100
// Note that this setter does not do internal synchronization, so you must set
101
// it before other threads see it.
102
void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
103
                               void (*wakeup_fn)(void *wakeup_ctx),
104
                               void *wakeup_ctx)
105
47.8k
{
106
47.8k
    queue->wakeup_fn = wakeup_fn;
107
47.8k
    queue->wakeup_ctx = wakeup_ctx;
108
47.8k
}
109
110
// Set a function that will be called by mp_dispatch_lock() if the target thread
111
// is not calling mp_dispatch_queue_process() right now. This is an obscure,
112
// optional mechanism to make a worker thread react to external events more
113
// quickly. The idea is that the callback will make the worker thread to stop
114
// doing whatever (e.g. by setting a flag), and call mp_dispatch_queue_process()
115
// in order to let mp_dispatch_lock() calls continue sooner.
116
// Like wakeup_fn, this setter does no internal synchronization, and you must
117
// not access the dispatch queue itself from the callback.
118
void mp_dispatch_set_onlock_fn(struct mp_dispatch_queue *queue,
119
                               void (*onlock_fn)(void *onlock_ctx),
120
                               void *onlock_ctx)
121
0
{
122
0
    queue->onlock_fn = onlock_fn;
123
0
    queue->onlock_ctx = onlock_ctx;
124
0
}
125
126
static void mp_dispatch_append(struct mp_dispatch_queue *queue,
127
                               struct mp_dispatch_item *item)
128
802k
{
129
802k
    mp_mutex_lock(&queue->lock);
130
802k
    if (item->mergeable) {
131
177k
        for (struct mp_dispatch_item *cur = queue->head; cur; cur = cur->next) {
132
159k
            if (cur->mergeable && cur->fn == item->fn &&
133
159k
                cur->fn_data == item->fn_data)
134
159k
            {
135
159k
                talloc_free(item);
136
159k
                mp_mutex_unlock(&queue->lock);
137
159k
                return;
138
159k
            }
139
159k
        }
140
177k
    }
141
142
643k
    if (queue->tail) {
143
33.0k
        queue->tail->next = item;
144
610k
    } else {
145
610k
        queue->head = item;
146
610k
    }
147
643k
    queue->tail = item;
148
149
    // Wake up the main thread; note that other threads might wait on this
150
    // condition for reasons, so broadcast the condition.
151
643k
    mp_cond_broadcast(&queue->cond);
152
    // No wakeup callback -> assume mp_dispatch_queue_process() needs to be
153
    // interrupted instead.
154
643k
    if (!queue->wakeup_fn)
155
17.6k
        queue->interrupted = true;
156
643k
    mp_mutex_unlock(&queue->lock);
157
158
643k
    if (queue->wakeup_fn)
159
625k
        queue->wakeup_fn(queue->wakeup_ctx);
160
643k
}
161
162
// Enqueue a callback to run it on the target thread asynchronously. The target
163
// thread will run fn(fn_data) as soon as it enter mp_dispatch_queue_process.
164
// Note that mp_dispatch_enqueue() will usually return long before that happens.
165
// It's up to the user to signal completion of the callback. It's also up to
166
// the user to guarantee that the context fn_data has correct lifetime, i.e.
167
// lives until the callback is run, and is freed after that.
168
void mp_dispatch_enqueue(struct mp_dispatch_queue *queue,
169
                         mp_dispatch_fn fn, void *fn_data)
170
0
{
171
0
    struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
172
0
    *item = (struct mp_dispatch_item){
173
0
        .fn = fn,
174
0
        .fn_data = fn_data,
175
0
        .asynchronous = true,
176
0
    };
177
0
    mp_dispatch_append(queue, item);
178
0
}
179
180
// Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data)
181
// after the fn callback has been run. (The callback could trivially do that
182
// itself, but it makes it easier to implement synchronous and asynchronous
183
// requests with the same callback implementation.)
184
void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue,
185
                                  mp_dispatch_fn fn, void *fn_data)
186
339k
{
187
339k
    struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
188
339k
    *item = (struct mp_dispatch_item){
189
339k
        .fn = fn,
190
339k
        .fn_data = talloc_steal(item, fn_data),
191
339k
        .asynchronous = true,
192
339k
    };
193
339k
    mp_dispatch_append(queue, item);
194
339k
}
195
196
// Like mp_dispatch_enqueue(), but
197
void mp_dispatch_enqueue_notify(struct mp_dispatch_queue *queue,
198
                                mp_dispatch_fn fn, void *fn_data)
199
177k
{
200
177k
    struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
201
177k
    *item = (struct mp_dispatch_item){
202
177k
        .fn = fn,
203
177k
        .fn_data = fn_data,
204
177k
        .mergeable = true,
205
177k
        .asynchronous = true,
206
177k
    };
207
177k
    mp_dispatch_append(queue, item);
208
177k
}
209
210
// Remove already queued item. Only items enqueued with the following functions
211
// can be canceled:
212
//  - mp_dispatch_enqueue()
213
//  - mp_dispatch_enqueue_notify()
214
// Items which were enqueued, and which are currently executing, can not be
215
// canceled anymore. This function is mostly for being called from the same
216
// context as mp_dispatch_queue_process(), where the "currently executing" case
217
// can be excluded.
218
void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue,
219
                           mp_dispatch_fn fn, void *fn_data)
220
172k
{
221
172k
    mp_mutex_lock(&queue->lock);
222
172k
    struct mp_dispatch_item **pcur = &queue->head;
223
172k
    queue->tail = NULL;
224
172k
    while (*pcur) {
225
0
        struct mp_dispatch_item *cur = *pcur;
226
0
        if (cur->fn == fn && cur->fn_data == fn_data) {
227
0
            *pcur = cur->next;
228
0
            talloc_free(cur);
229
0
        } else {
230
0
            queue->tail = cur;
231
0
            pcur = &cur->next;
232
0
        }
233
0
    }
234
172k
    mp_mutex_unlock(&queue->lock);
235
172k
}
236
237
// Run fn(fn_data) on the target thread synchronously. This function enqueues
238
// the callback and waits until the target thread is done doing this.
239
// This is redundant to calling the function inside mp_dispatch_[un]lock(),
240
// but can be helpful with code that relies on TLS (such as OpenGL).
241
void mp_dispatch_run(struct mp_dispatch_queue *queue,
242
                     mp_dispatch_fn fn, void *fn_data)
243
285k
{
244
285k
    struct mp_dispatch_item item = {
245
285k
        .fn = fn,
246
285k
        .fn_data = fn_data,
247
285k
    };
248
285k
    mp_dispatch_append(queue, &item);
249
250
285k
    mp_mutex_lock(&queue->lock);
251
570k
    while (!item.completed)
252
285k
        mp_cond_wait(&queue->cond, &queue->lock);
253
285k
    mp_mutex_unlock(&queue->lock);
254
285k
}
255
256
// Process any outstanding dispatch items in the queue. This also handles
257
// suspending or locking the this thread from another thread via
258
// mp_dispatch_lock().
259
// The timeout specifies the minimum wait time. The actual time spent in this
260
// function can be much higher if the suspending/locking functions are used, or
261
// if executing the dispatch items takes time. On the other hand, this function
262
// can return much earlier than the timeout due to sporadic wakeups.
263
// Note that this will strictly return only after:
264
//      - timeout has passed,
265
//      - all queue items were processed,
266
//      - the possibly acquired lock has been released
267
// It's possible to cancel the timeout by calling mp_dispatch_interrupt().
268
// Reentrant calls are not allowed. There can be only 1 thread calling
269
// mp_dispatch_queue_process() at a time. In addition, mp_dispatch_lock() can
270
// not be called from a thread that is calling mp_dispatch_queue_process() (i.e.
271
// no enqueued callback can call the lock/unlock functions).
272
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
273
3.02M
{
274
3.02M
    mp_mutex_lock(&queue->lock);
275
3.02M
    queue->wait = timeout > 0 ? mp_time_ns_add(mp_time_ns(), timeout) : 0;
276
3.02M
    mp_assert(!queue->in_process); // recursion not allowed
277
3.02M
    queue->in_process = true;
278
3.02M
    queue->in_process_thread_id = mp_thread_current_id();
279
    // Wake up thread which called mp_dispatch_lock().
280
3.02M
    if (queue->lock_requests)
281
183k
        mp_cond_broadcast(&queue->cond);
282
5.34M
    while (1) {
283
5.34M
        if (queue->lock_requests) {
284
            // Block due to something having called mp_dispatch_lock().
285
785k
            mp_cond_wait(&queue->cond, &queue->lock);
286
4.56M
        } else if (queue->head) {
287
643k
            struct mp_dispatch_item *item = queue->head;
288
643k
            queue->head = item->next;
289
643k
            if (!queue->head)
290
610k
                queue->tail = NULL;
291
643k
            item->next = NULL;
292
            // Unlock, because we want to allow other threads to queue items
293
            // while the dispatch item is processed.
294
            // At the same time, we must prevent other threads from returning
295
            // from mp_dispatch_lock(), which is done by locked=true.
296
643k
            mp_assert(!queue->locked);
297
643k
            queue->locked = true;
298
643k
            mp_mutex_unlock(&queue->lock);
299
300
643k
            item->fn(item->fn_data);
301
302
643k
            mp_mutex_lock(&queue->lock);
303
643k
            mp_assert(queue->locked);
304
643k
            queue->locked = false;
305
            // Wakeup mp_dispatch_run(), also mp_dispatch_lock().
306
643k
            mp_cond_broadcast(&queue->cond);
307
643k
            if (item->asynchronous) {
308
357k
                talloc_free(item);
309
357k
            } else {
310
285k
                item->completed = true;
311
285k
            }
312
3.92M
        } else if (queue->wait > 0 && !queue->interrupted) {
313
900k
            if (mp_cond_timedwait_until(&queue->cond, &queue->lock, queue->wait))
314
25.6k
                queue->wait = 0;
315
3.02M
        } else {
316
3.02M
            break;
317
3.02M
        }
318
5.34M
    }
319
3.02M
    mp_assert(!queue->locked);
320
3.02M
    queue->in_process = false;
321
3.02M
    queue->interrupted = false;
322
3.02M
    mp_mutex_unlock(&queue->lock);
323
3.02M
}
324
325
// If the queue is inside of mp_dispatch_queue_process(), make it return as
326
// soon as all work items have been run, without waiting for the timeout. This
327
// does not make it return early if it's blocked by a mp_dispatch_lock().
328
// If the queue is _not_ inside of mp_dispatch_queue_process(), make the next
329
// call of it use a timeout of 0 (this is useful behavior if you need to
330
// wakeup the main thread from another thread in a race free way).
331
void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
332
4.85M
{
333
4.85M
    mp_mutex_lock(&queue->lock);
334
4.85M
    queue->interrupted = true;
335
4.85M
    mp_cond_broadcast(&queue->cond);
336
4.85M
    mp_mutex_unlock(&queue->lock);
337
4.85M
}
338
339
// If a mp_dispatch_queue_process() call is in progress, then adjust the maximum
340
// time it blocks due to its timeout argument. Otherwise does nothing. (It
341
// makes sense to call this in code that uses both mp_dispatch_[un]lock() and
342
// a normal event loop.)
343
// Does not work correctly with queues that have mp_dispatch_set_wakeup_fn()
344
// called on them, because this implies you actually do waiting via
345
// mp_dispatch_queue_process(), while wakeup callbacks are used when you need
346
// to wait in external APIs.
347
void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until)
348
3.04M
{
349
3.04M
    mp_mutex_lock(&queue->lock);
350
3.04M
    if (queue->in_process && queue->wait > until) {
351
0
        queue->wait = until;
352
0
        mp_cond_broadcast(&queue->cond);
353
0
    }
354
3.04M
    mp_mutex_unlock(&queue->lock);
355
3.04M
}
356
357
// Grant exclusive access to the target thread's state. While this is active,
358
// no other thread can return from mp_dispatch_lock() (i.e. it behaves like
359
// a pthread mutex), and no other thread can get dispatch items completed.
360
// Other threads can still queue asynchronous dispatch items without waiting,
361
// and the mutex behavior applies to this function and dispatch callbacks only.
362
// The lock is non-recursive, and dispatch callback functions can be thought of
363
// already holding the dispatch lock.
364
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
365
1.50M
{
366
1.50M
    mp_mutex_lock(&queue->lock);
367
    // Must not be called recursively from dispatched callbacks.
368
1.50M
    if (queue->in_process)
369
1.50M
        mp_assert(!mp_thread_id_equal(queue->in_process_thread_id, mp_thread_current_id()));
370
    // Must not be called recursively at all.
371
1.50M
    if (queue->locked_explicit)
372
1.50M
        mp_assert(!mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id()));
373
1.50M
    queue->lock_requests += 1;
374
    // And now wait until the target thread gets "trapped" within the
375
    // mp_dispatch_queue_process() call, which will mean we get exclusive
376
    // access to the target's thread state.
377
1.50M
    if (queue->onlock_fn)
378
0
        queue->onlock_fn(queue->onlock_ctx);
379
1.68M
    while (!queue->in_process) {
380
185k
        mp_mutex_unlock(&queue->lock);
381
185k
        if (queue->wakeup_fn)
382
0
            queue->wakeup_fn(queue->wakeup_ctx);
383
185k
        mp_mutex_lock(&queue->lock);
384
185k
        if (queue->in_process)
385
69
            break;
386
185k
        mp_cond_wait(&queue->cond, &queue->lock);
387
185k
    }
388
    // Wait until we can get the lock.
389
1.51M
    while (!queue->in_process || queue->locked)
390
7.27k
        mp_cond_wait(&queue->cond, &queue->lock);
391
    // "Lock".
392
1.50M
    mp_assert(queue->lock_requests);
393
1.50M
    mp_assert(!queue->locked);
394
1.50M
    mp_assert(!queue->locked_explicit);
395
1.50M
    queue->locked = true;
396
1.50M
    queue->locked_explicit = true;
397
1.50M
    queue->locked_explicit_thread_id = mp_thread_current_id();
398
1.50M
    mp_mutex_unlock(&queue->lock);
399
1.50M
}
400
401
// Undo mp_dispatch_lock().
402
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
403
1.50M
{
404
1.50M
    mp_mutex_lock(&queue->lock);
405
1.50M
    mp_assert(queue->locked);
406
    // Must be called after a mp_dispatch_lock(), from the same thread.
407
1.50M
    mp_assert(queue->locked_explicit);
408
1.50M
    mp_assert(mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id()));
409
    // "Unlock".
410
1.50M
    queue->locked = false;
411
1.50M
    queue->locked_explicit = false;
412
1.50M
    queue->lock_requests -= 1;
413
    // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
414
    // (Would be nice to wake up only 1 other locker if lock_requests>0.)
415
1.50M
    mp_cond_broadcast(&queue->cond);
416
1.50M
    mp_mutex_unlock(&queue->lock);
417
1.50M
}