Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * This file is part of mpv. |
3 | | * |
4 | | * mpv is free software; you can redistribute it and/or |
5 | | * modify it under the terms of the GNU Lesser General Public |
6 | | * License as published by the Free Software Foundation; either |
7 | | * version 2.1 of the License, or (at your option) any later version. |
8 | | * |
9 | | * mpv is distributed in the hope that it will be useful, |
10 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | | * GNU Lesser General Public License for more details. |
13 | | * |
14 | | * You should have received a copy of the GNU Lesser General Public |
15 | | * License along with mpv. If not, see <http://www.gnu.org/licenses/>. |
16 | | */ |
17 | | |
18 | | #include <stdbool.h> |
19 | | #include <assert.h> |
20 | | |
21 | | #include "common/common.h" |
22 | | #include "osdep/threads.h" |
23 | | #include "osdep/timer.h" |
24 | | |
25 | | #include "dispatch.h" |
26 | | |
27 | | struct mp_dispatch_queue { |
28 | | struct mp_dispatch_item *head, *tail; |
29 | | mp_mutex lock; |
30 | | mp_cond cond; |
31 | | void (*wakeup_fn)(void *wakeup_ctx); |
32 | | void *wakeup_ctx; |
33 | | void (*onlock_fn)(void *onlock_ctx); |
34 | | void *onlock_ctx; |
35 | | // Time at which mp_dispatch_queue_process() should return. |
36 | | int64_t wait; |
37 | | // Make mp_dispatch_queue_process() exit if it's idle. |
38 | | bool interrupted; |
39 | | // The target thread is in mp_dispatch_queue_process() (and either idling, |
40 | | // locked, or running a dispatch callback). |
41 | | bool in_process; |
42 | | mp_thread_id in_process_thread_id; |
43 | | // The target thread is in mp_dispatch_queue_process(), and currently |
44 | | // something has exclusive access to it (e.g. running a dispatch callback, |
45 | | // or a different thread got it with mp_dispatch_lock()). |
46 | | bool locked; |
47 | | // A mp_dispatch_lock() call is requesting an exclusive lock. |
48 | | size_t lock_requests; |
49 | | // locked==true is due to a mp_dispatch_lock() call (for debugging). |
50 | | bool locked_explicit; |
51 | | mp_thread_id locked_explicit_thread_id; |
52 | | }; |
53 | | |
54 | | struct mp_dispatch_item { |
55 | | mp_dispatch_fn fn; |
56 | | void *fn_data; |
57 | | bool asynchronous; |
58 | | bool mergeable; |
59 | | bool completed; |
60 | | struct mp_dispatch_item *next; |
61 | | }; |
62 | | |
63 | | static void queue_dtor(void *p) |
64 | 188k | { |
65 | 188k | struct mp_dispatch_queue *queue = p; |
66 | 188k | mp_assert(!queue->head); |
67 | 188k | mp_assert(!queue->in_process); |
68 | 188k | mp_assert(!queue->lock_requests); |
69 | 188k | mp_assert(!queue->locked); |
70 | 188k | mp_cond_destroy(&queue->cond); |
71 | 188k | mp_mutex_destroy(&queue->lock); |
72 | 188k | } |
73 | | |
74 | | // A dispatch queue lets other threads run callbacks in a target thread. |
75 | | // The target thread is the thread which calls mp_dispatch_queue_process(). |
76 | | // Free the dispatch queue with talloc_free(). At the time of destruction, |
77 | | // the queue must be empty. The easiest way to guarantee this is to |
78 | | // terminate all potential senders, then call mp_dispatch_run() with a |
79 | | // function that e.g. makes the target thread exit, then mp_thread_join() the |
80 | | // target thread, and finally destroy the queue. Another way is calling |
81 | | // mp_dispatch_queue_process() after terminating all potential senders, and |
82 | | // then destroying the queue. |
83 | | struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent) |
84 | 188k | { |
85 | 188k | struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue); |
86 | 188k | *queue = (struct mp_dispatch_queue){0}; |
87 | 188k | talloc_set_destructor(queue, queue_dtor); |
88 | 188k | mp_mutex_init(&queue->lock); |
89 | 188k | mp_cond_init(&queue->cond); |
90 | 188k | return queue; |
91 | 188k | } |
92 | | |
93 | | // Set a custom function that should be called to guarantee that the target |
94 | | // thread wakes up. This is intended for use with code that needs to block |
95 | | // on non-pthread primitives, such as e.g. poll(). In the case of poll(), |
96 | | // the wakeup_fn could for example write a byte into a "wakeup" pipe in order |
97 | | // to unblock the poll(). The wakeup_fn is called from the dispatch queue |
98 | | // when there are new dispatch items, and the target thread should then enter |
99 | | // mp_dispatch_queue_process() as soon as possible. |
100 | | // Note that this setter does not do internal synchronization, so you must set |
101 | | // it before other threads see it. |
102 | | void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue, |
103 | | void (*wakeup_fn)(void *wakeup_ctx), |
104 | | void *wakeup_ctx) |
105 | 47.8k | { |
106 | 47.8k | queue->wakeup_fn = wakeup_fn; |
107 | 47.8k | queue->wakeup_ctx = wakeup_ctx; |
108 | 47.8k | } |
109 | | |
110 | | // Set a function that will be called by mp_dispatch_lock() if the target thread |
111 | | // is not calling mp_dispatch_queue_process() right now. This is an obscure, |
112 | | // optional mechanism to make a worker thread react to external events more |
113 | | // quickly. The idea is that the callback will make the worker thread to stop |
114 | | // doing whatever (e.g. by setting a flag), and call mp_dispatch_queue_process() |
115 | | // in order to let mp_dispatch_lock() calls continue sooner. |
116 | | // Like wakeup_fn, this setter does no internal synchronization, and you must |
117 | | // not access the dispatch queue itself from the callback. |
118 | | void mp_dispatch_set_onlock_fn(struct mp_dispatch_queue *queue, |
119 | | void (*onlock_fn)(void *onlock_ctx), |
120 | | void *onlock_ctx) |
121 | 0 | { |
122 | 0 | queue->onlock_fn = onlock_fn; |
123 | 0 | queue->onlock_ctx = onlock_ctx; |
124 | 0 | } |
125 | | |
126 | | static void mp_dispatch_append(struct mp_dispatch_queue *queue, |
127 | | struct mp_dispatch_item *item) |
128 | 802k | { |
129 | 802k | mp_mutex_lock(&queue->lock); |
130 | 802k | if (item->mergeable) { |
131 | 177k | for (struct mp_dispatch_item *cur = queue->head; cur; cur = cur->next) { |
132 | 159k | if (cur->mergeable && cur->fn == item->fn && |
133 | 159k | cur->fn_data == item->fn_data) |
134 | 159k | { |
135 | 159k | talloc_free(item); |
136 | 159k | mp_mutex_unlock(&queue->lock); |
137 | 159k | return; |
138 | 159k | } |
139 | 159k | } |
140 | 177k | } |
141 | | |
142 | 643k | if (queue->tail) { |
143 | 33.0k | queue->tail->next = item; |
144 | 610k | } else { |
145 | 610k | queue->head = item; |
146 | 610k | } |
147 | 643k | queue->tail = item; |
148 | | |
149 | | // Wake up the main thread; note that other threads might wait on this |
150 | | // condition for reasons, so broadcast the condition. |
151 | 643k | mp_cond_broadcast(&queue->cond); |
152 | | // No wakeup callback -> assume mp_dispatch_queue_process() needs to be |
153 | | // interrupted instead. |
154 | 643k | if (!queue->wakeup_fn) |
155 | 17.6k | queue->interrupted = true; |
156 | 643k | mp_mutex_unlock(&queue->lock); |
157 | | |
158 | 643k | if (queue->wakeup_fn) |
159 | 625k | queue->wakeup_fn(queue->wakeup_ctx); |
160 | 643k | } |
161 | | |
162 | | // Enqueue a callback to run it on the target thread asynchronously. The target |
163 | | // thread will run fn(fn_data) as soon as it enter mp_dispatch_queue_process. |
164 | | // Note that mp_dispatch_enqueue() will usually return long before that happens. |
165 | | // It's up to the user to signal completion of the callback. It's also up to |
166 | | // the user to guarantee that the context fn_data has correct lifetime, i.e. |
167 | | // lives until the callback is run, and is freed after that. |
168 | | void mp_dispatch_enqueue(struct mp_dispatch_queue *queue, |
169 | | mp_dispatch_fn fn, void *fn_data) |
170 | 0 | { |
171 | 0 | struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); |
172 | 0 | *item = (struct mp_dispatch_item){ |
173 | 0 | .fn = fn, |
174 | 0 | .fn_data = fn_data, |
175 | 0 | .asynchronous = true, |
176 | 0 | }; |
177 | 0 | mp_dispatch_append(queue, item); |
178 | 0 | } |
179 | | |
180 | | // Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data) |
181 | | // after the fn callback has been run. (The callback could trivially do that |
182 | | // itself, but it makes it easier to implement synchronous and asynchronous |
183 | | // requests with the same callback implementation.) |
184 | | void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue, |
185 | | mp_dispatch_fn fn, void *fn_data) |
186 | 339k | { |
187 | 339k | struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); |
188 | 339k | *item = (struct mp_dispatch_item){ |
189 | 339k | .fn = fn, |
190 | 339k | .fn_data = talloc_steal(item, fn_data), |
191 | 339k | .asynchronous = true, |
192 | 339k | }; |
193 | 339k | mp_dispatch_append(queue, item); |
194 | 339k | } |
195 | | |
196 | | // Like mp_dispatch_enqueue(), but |
197 | | void mp_dispatch_enqueue_notify(struct mp_dispatch_queue *queue, |
198 | | mp_dispatch_fn fn, void *fn_data) |
199 | 177k | { |
200 | 177k | struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); |
201 | 177k | *item = (struct mp_dispatch_item){ |
202 | 177k | .fn = fn, |
203 | 177k | .fn_data = fn_data, |
204 | 177k | .mergeable = true, |
205 | 177k | .asynchronous = true, |
206 | 177k | }; |
207 | 177k | mp_dispatch_append(queue, item); |
208 | 177k | } |
209 | | |
210 | | // Remove already queued item. Only items enqueued with the following functions |
211 | | // can be canceled: |
212 | | // - mp_dispatch_enqueue() |
213 | | // - mp_dispatch_enqueue_notify() |
214 | | // Items which were enqueued, and which are currently executing, can not be |
215 | | // canceled anymore. This function is mostly for being called from the same |
216 | | // context as mp_dispatch_queue_process(), where the "currently executing" case |
217 | | // can be excluded. |
218 | | void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue, |
219 | | mp_dispatch_fn fn, void *fn_data) |
220 | 172k | { |
221 | 172k | mp_mutex_lock(&queue->lock); |
222 | 172k | struct mp_dispatch_item **pcur = &queue->head; |
223 | 172k | queue->tail = NULL; |
224 | 172k | while (*pcur) { |
225 | 0 | struct mp_dispatch_item *cur = *pcur; |
226 | 0 | if (cur->fn == fn && cur->fn_data == fn_data) { |
227 | 0 | *pcur = cur->next; |
228 | 0 | talloc_free(cur); |
229 | 0 | } else { |
230 | 0 | queue->tail = cur; |
231 | 0 | pcur = &cur->next; |
232 | 0 | } |
233 | 0 | } |
234 | 172k | mp_mutex_unlock(&queue->lock); |
235 | 172k | } |
236 | | |
237 | | // Run fn(fn_data) on the target thread synchronously. This function enqueues |
238 | | // the callback and waits until the target thread is done doing this. |
239 | | // This is redundant to calling the function inside mp_dispatch_[un]lock(), |
240 | | // but can be helpful with code that relies on TLS (such as OpenGL). |
241 | | void mp_dispatch_run(struct mp_dispatch_queue *queue, |
242 | | mp_dispatch_fn fn, void *fn_data) |
243 | 285k | { |
244 | 285k | struct mp_dispatch_item item = { |
245 | 285k | .fn = fn, |
246 | 285k | .fn_data = fn_data, |
247 | 285k | }; |
248 | 285k | mp_dispatch_append(queue, &item); |
249 | | |
250 | 285k | mp_mutex_lock(&queue->lock); |
251 | 570k | while (!item.completed) |
252 | 285k | mp_cond_wait(&queue->cond, &queue->lock); |
253 | 285k | mp_mutex_unlock(&queue->lock); |
254 | 285k | } |
255 | | |
256 | | // Process any outstanding dispatch items in the queue. This also handles |
257 | | // suspending or locking the this thread from another thread via |
258 | | // mp_dispatch_lock(). |
259 | | // The timeout specifies the minimum wait time. The actual time spent in this |
260 | | // function can be much higher if the suspending/locking functions are used, or |
261 | | // if executing the dispatch items takes time. On the other hand, this function |
262 | | // can return much earlier than the timeout due to sporadic wakeups. |
263 | | // Note that this will strictly return only after: |
264 | | // - timeout has passed, |
265 | | // - all queue items were processed, |
266 | | // - the possibly acquired lock has been released |
267 | | // It's possible to cancel the timeout by calling mp_dispatch_interrupt(). |
268 | | // Reentrant calls are not allowed. There can be only 1 thread calling |
269 | | // mp_dispatch_queue_process() at a time. In addition, mp_dispatch_lock() can |
270 | | // not be called from a thread that is calling mp_dispatch_queue_process() (i.e. |
271 | | // no enqueued callback can call the lock/unlock functions). |
272 | | void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) |
273 | 3.02M | { |
274 | 3.02M | mp_mutex_lock(&queue->lock); |
275 | 3.02M | queue->wait = timeout > 0 ? mp_time_ns_add(mp_time_ns(), timeout) : 0; |
276 | 3.02M | mp_assert(!queue->in_process); // recursion not allowed |
277 | 3.02M | queue->in_process = true; |
278 | 3.02M | queue->in_process_thread_id = mp_thread_current_id(); |
279 | | // Wake up thread which called mp_dispatch_lock(). |
280 | 3.02M | if (queue->lock_requests) |
281 | 183k | mp_cond_broadcast(&queue->cond); |
282 | 5.34M | while (1) { |
283 | 5.34M | if (queue->lock_requests) { |
284 | | // Block due to something having called mp_dispatch_lock(). |
285 | 785k | mp_cond_wait(&queue->cond, &queue->lock); |
286 | 4.56M | } else if (queue->head) { |
287 | 643k | struct mp_dispatch_item *item = queue->head; |
288 | 643k | queue->head = item->next; |
289 | 643k | if (!queue->head) |
290 | 610k | queue->tail = NULL; |
291 | 643k | item->next = NULL; |
292 | | // Unlock, because we want to allow other threads to queue items |
293 | | // while the dispatch item is processed. |
294 | | // At the same time, we must prevent other threads from returning |
295 | | // from mp_dispatch_lock(), which is done by locked=true. |
296 | 643k | mp_assert(!queue->locked); |
297 | 643k | queue->locked = true; |
298 | 643k | mp_mutex_unlock(&queue->lock); |
299 | | |
300 | 643k | item->fn(item->fn_data); |
301 | | |
302 | 643k | mp_mutex_lock(&queue->lock); |
303 | 643k | mp_assert(queue->locked); |
304 | 643k | queue->locked = false; |
305 | | // Wakeup mp_dispatch_run(), also mp_dispatch_lock(). |
306 | 643k | mp_cond_broadcast(&queue->cond); |
307 | 643k | if (item->asynchronous) { |
308 | 357k | talloc_free(item); |
309 | 357k | } else { |
310 | 285k | item->completed = true; |
311 | 285k | } |
312 | 3.92M | } else if (queue->wait > 0 && !queue->interrupted) { |
313 | 900k | if (mp_cond_timedwait_until(&queue->cond, &queue->lock, queue->wait)) |
314 | 25.6k | queue->wait = 0; |
315 | 3.02M | } else { |
316 | 3.02M | break; |
317 | 3.02M | } |
318 | 5.34M | } |
319 | 3.02M | mp_assert(!queue->locked); |
320 | 3.02M | queue->in_process = false; |
321 | 3.02M | queue->interrupted = false; |
322 | 3.02M | mp_mutex_unlock(&queue->lock); |
323 | 3.02M | } |
324 | | |
325 | | // If the queue is inside of mp_dispatch_queue_process(), make it return as |
326 | | // soon as all work items have been run, without waiting for the timeout. This |
327 | | // does not make it return early if it's blocked by a mp_dispatch_lock(). |
328 | | // If the queue is _not_ inside of mp_dispatch_queue_process(), make the next |
329 | | // call of it use a timeout of 0 (this is useful behavior if you need to |
330 | | // wakeup the main thread from another thread in a race free way). |
331 | | void mp_dispatch_interrupt(struct mp_dispatch_queue *queue) |
332 | 4.85M | { |
333 | 4.85M | mp_mutex_lock(&queue->lock); |
334 | 4.85M | queue->interrupted = true; |
335 | 4.85M | mp_cond_broadcast(&queue->cond); |
336 | 4.85M | mp_mutex_unlock(&queue->lock); |
337 | 4.85M | } |
338 | | |
339 | | // If a mp_dispatch_queue_process() call is in progress, then adjust the maximum |
340 | | // time it blocks due to its timeout argument. Otherwise does nothing. (It |
341 | | // makes sense to call this in code that uses both mp_dispatch_[un]lock() and |
342 | | // a normal event loop.) |
343 | | // Does not work correctly with queues that have mp_dispatch_set_wakeup_fn() |
344 | | // called on them, because this implies you actually do waiting via |
345 | | // mp_dispatch_queue_process(), while wakeup callbacks are used when you need |
346 | | // to wait in external APIs. |
347 | | void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until) |
348 | 3.04M | { |
349 | 3.04M | mp_mutex_lock(&queue->lock); |
350 | 3.04M | if (queue->in_process && queue->wait > until) { |
351 | 0 | queue->wait = until; |
352 | 0 | mp_cond_broadcast(&queue->cond); |
353 | 0 | } |
354 | 3.04M | mp_mutex_unlock(&queue->lock); |
355 | 3.04M | } |
356 | | |
357 | | // Grant exclusive access to the target thread's state. While this is active, |
358 | | // no other thread can return from mp_dispatch_lock() (i.e. it behaves like |
359 | | // a pthread mutex), and no other thread can get dispatch items completed. |
360 | | // Other threads can still queue asynchronous dispatch items without waiting, |
361 | | // and the mutex behavior applies to this function and dispatch callbacks only. |
362 | | // The lock is non-recursive, and dispatch callback functions can be thought of |
363 | | // already holding the dispatch lock. |
364 | | void mp_dispatch_lock(struct mp_dispatch_queue *queue) |
365 | 1.50M | { |
366 | 1.50M | mp_mutex_lock(&queue->lock); |
367 | | // Must not be called recursively from dispatched callbacks. |
368 | 1.50M | if (queue->in_process) |
369 | 1.50M | mp_assert(!mp_thread_id_equal(queue->in_process_thread_id, mp_thread_current_id())); |
370 | | // Must not be called recursively at all. |
371 | 1.50M | if (queue->locked_explicit) |
372 | 1.50M | mp_assert(!mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id())); |
373 | 1.50M | queue->lock_requests += 1; |
374 | | // And now wait until the target thread gets "trapped" within the |
375 | | // mp_dispatch_queue_process() call, which will mean we get exclusive |
376 | | // access to the target's thread state. |
377 | 1.50M | if (queue->onlock_fn) |
378 | 0 | queue->onlock_fn(queue->onlock_ctx); |
379 | 1.68M | while (!queue->in_process) { |
380 | 185k | mp_mutex_unlock(&queue->lock); |
381 | 185k | if (queue->wakeup_fn) |
382 | 0 | queue->wakeup_fn(queue->wakeup_ctx); |
383 | 185k | mp_mutex_lock(&queue->lock); |
384 | 185k | if (queue->in_process) |
385 | 69 | break; |
386 | 185k | mp_cond_wait(&queue->cond, &queue->lock); |
387 | 185k | } |
388 | | // Wait until we can get the lock. |
389 | 1.51M | while (!queue->in_process || queue->locked) |
390 | 7.27k | mp_cond_wait(&queue->cond, &queue->lock); |
391 | | // "Lock". |
392 | 1.50M | mp_assert(queue->lock_requests); |
393 | 1.50M | mp_assert(!queue->locked); |
394 | 1.50M | mp_assert(!queue->locked_explicit); |
395 | 1.50M | queue->locked = true; |
396 | 1.50M | queue->locked_explicit = true; |
397 | 1.50M | queue->locked_explicit_thread_id = mp_thread_current_id(); |
398 | 1.50M | mp_mutex_unlock(&queue->lock); |
399 | 1.50M | } |
400 | | |
401 | | // Undo mp_dispatch_lock(). |
402 | | void mp_dispatch_unlock(struct mp_dispatch_queue *queue) |
403 | 1.50M | { |
404 | 1.50M | mp_mutex_lock(&queue->lock); |
405 | 1.50M | mp_assert(queue->locked); |
406 | | // Must be called after a mp_dispatch_lock(), from the same thread. |
407 | 1.50M | mp_assert(queue->locked_explicit); |
408 | 1.50M | mp_assert(mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id())); |
409 | | // "Unlock". |
410 | 1.50M | queue->locked = false; |
411 | 1.50M | queue->locked_explicit = false; |
412 | 1.50M | queue->lock_requests -= 1; |
413 | | // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s. |
414 | | // (Would be nice to wake up only 1 other locker if lock_requests>0.) |
415 | 1.50M | mp_cond_broadcast(&queue->cond); |
416 | 1.50M | mp_mutex_unlock(&queue->lock); |
417 | 1.50M | } |