/src/mpv/misc/thread_pool.c
Line | Count | Source |
1 | | /* Copyright (C) 2018 the mpv developers |
2 | | * |
3 | | * Permission to use, copy, modify, and/or distribute this software for any |
4 | | * purpose with or without fee is hereby granted, provided that the above |
5 | | * copyright notice and this permission notice appear in all copies. |
6 | | * |
7 | | * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES |
8 | | * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF |
9 | | * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR |
10 | | * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES |
11 | | * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN |
12 | | * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF |
13 | | * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
14 | | */ |
15 | | |
16 | | #include "common/common.h" |
17 | | #include "osdep/threads.h" |
18 | | #include "osdep/timer.h" |
19 | | |
20 | | #include "thread_pool.h" |
21 | | |
22 | | // Threads destroy themselves after this many seconds, if there's no new work |
23 | | // and the thread count is above the configured minimum. |
24 | | #define DESTROY_TIMEOUT 10 |
25 | | |
26 | | struct work { |
27 | | void (*fn)(void *ctx); |
28 | | void *fn_ctx; |
29 | | }; |
30 | | |
31 | | struct mp_thread_pool { |
32 | | int min_threads, max_threads; |
33 | | |
34 | | mp_mutex lock; |
35 | | mp_cond wakeup; |
36 | | |
37 | | // --- the following fields are protected by lock |
38 | | |
39 | | mp_thread *threads; |
40 | | int num_threads; |
41 | | |
42 | | // Number of threads which have taken up work and are still processing it. |
43 | | int busy_threads; |
44 | | |
45 | | bool terminate; |
46 | | |
47 | | struct work *work; |
48 | | int num_work; |
49 | | }; |
50 | | |
51 | | static MP_THREAD_VOID worker_thread(void *arg) |
52 | 71.5k | { |
53 | 71.5k | struct mp_thread_pool *pool = arg; |
54 | | |
55 | 71.5k | mp_thread_set_name("worker"); |
56 | | |
57 | 71.5k | mp_mutex_lock(&pool->lock); |
58 | | |
59 | 71.5k | int64_t destroy_deadline = 0; |
60 | 71.5k | bool got_timeout = false; |
61 | 215k | while (1) { |
62 | 215k | struct work work = {0}; |
63 | 215k | if (pool->num_work > 0) { |
64 | 71.7k | work = pool->work[pool->num_work - 1]; |
65 | 71.7k | pool->num_work -= 1; |
66 | 71.7k | } |
67 | | |
68 | 215k | if (!work.fn) { |
69 | 143k | if (got_timeout || pool->terminate) |
70 | 71.5k | break; |
71 | | |
72 | 71.7k | if (pool->num_threads > pool->min_threads) { |
73 | 0 | if (!destroy_deadline) |
74 | 0 | destroy_deadline = mp_time_ns() + MP_TIME_S_TO_NS(DESTROY_TIMEOUT); |
75 | 0 | if (mp_cond_timedwait_until(&pool->wakeup, &pool->lock, destroy_deadline)) |
76 | 0 | got_timeout = pool->num_threads > pool->min_threads; |
77 | 71.7k | } else { |
78 | 71.7k | mp_cond_wait(&pool->wakeup, &pool->lock); |
79 | 71.7k | } |
80 | 71.7k | continue; |
81 | 143k | } |
82 | | |
83 | 71.7k | pool->busy_threads += 1; |
84 | 71.7k | mp_mutex_unlock(&pool->lock); |
85 | | |
86 | 71.7k | work.fn(work.fn_ctx); |
87 | | |
88 | 71.7k | mp_mutex_lock(&pool->lock); |
89 | 71.7k | pool->busy_threads -= 1; |
90 | | |
91 | 71.7k | destroy_deadline = 0; |
92 | 71.7k | got_timeout = false; |
93 | 71.7k | } |
94 | | |
95 | | // If no termination signal was given, it must mean we died because of a |
96 | | // timeout, and nobody is waiting for us. We have to remove ourselves. |
97 | 71.5k | if (!pool->terminate) { |
98 | 0 | for (int n = 0; n < pool->num_threads; n++) { |
99 | 0 | if (mp_thread_id_equal(mp_thread_get_id(pool->threads[n]), |
100 | 0 | mp_thread_current_id())) |
101 | 0 | { |
102 | 0 | mp_thread_detach(pool->threads[n]); |
103 | 0 | MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n); |
104 | 0 | mp_mutex_unlock(&pool->lock); |
105 | 0 | MP_THREAD_RETURN(); |
106 | 0 | } |
107 | 0 | } |
108 | 0 | MP_ASSERT_UNREACHABLE(); |
109 | 0 | } |
110 | | |
111 | 71.5k | mp_mutex_unlock(&pool->lock); |
112 | 71.5k | MP_THREAD_RETURN(); |
113 | 71.5k | } |
114 | | |
115 | | static void thread_pool_dtor(void *ctx) |
116 | 139k | { |
117 | 139k | struct mp_thread_pool *pool = ctx; |
118 | | |
119 | | |
120 | 139k | mp_mutex_lock(&pool->lock); |
121 | | |
122 | 139k | pool->terminate = true; |
123 | 139k | mp_cond_broadcast(&pool->wakeup); |
124 | | |
125 | 139k | mp_thread *threads = pool->threads; |
126 | 139k | int num_threads = pool->num_threads; |
127 | | |
128 | 139k | pool->threads = NULL; |
129 | 139k | pool->num_threads = 0; |
130 | | |
131 | 139k | mp_mutex_unlock(&pool->lock); |
132 | | |
133 | 211k | for (int n = 0; n < num_threads; n++) |
134 | 71.5k | mp_thread_join(threads[n]); |
135 | | |
136 | 139k | mp_assert(pool->num_work == 0); |
137 | 139k | mp_assert(pool->num_threads == 0); |
138 | 139k | mp_cond_destroy(&pool->wakeup); |
139 | 139k | mp_mutex_destroy(&pool->lock); |
140 | 139k | } |
141 | | |
142 | | static bool add_thread(struct mp_thread_pool *pool) |
143 | 71.5k | { |
144 | 71.5k | mp_thread thread; |
145 | | |
146 | 71.5k | if (mp_thread_create(&thread, worker_thread, pool) != 0) |
147 | 0 | return false; |
148 | | |
149 | 71.5k | MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread); |
150 | 71.5k | return true; |
151 | 71.5k | } |
152 | | |
153 | | struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads, |
154 | | int min_threads, int max_threads) |
155 | 139k | { |
156 | 139k | mp_assert(min_threads >= 0); |
157 | 139k | mp_assert(init_threads <= min_threads); |
158 | 139k | mp_assert(max_threads > 0 && max_threads >= min_threads); |
159 | | |
160 | 139k | struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool); |
161 | 139k | talloc_set_destructor(pool, thread_pool_dtor); |
162 | | |
163 | 139k | mp_mutex_init(&pool->lock); |
164 | 139k | mp_cond_init(&pool->wakeup); |
165 | | |
166 | 139k | pool->min_threads = min_threads; |
167 | 139k | pool->max_threads = max_threads; |
168 | | |
169 | 139k | mp_mutex_lock(&pool->lock); |
170 | 139k | for (int n = 0; n < init_threads; n++) |
171 | 0 | add_thread(pool); |
172 | 139k | bool ok = pool->num_threads >= init_threads; |
173 | 139k | mp_mutex_unlock(&pool->lock); |
174 | | |
175 | 139k | if (!ok) |
176 | 0 | TA_FREEP(&pool); |
177 | | |
178 | 139k | return pool; |
179 | 139k | } |
180 | | |
181 | | static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx), |
182 | | void *fn_ctx, bool allow_queue) |
183 | 71.7k | { |
184 | 71.7k | bool ok = true; |
185 | | |
186 | 71.7k | mp_assert(fn); |
187 | | |
188 | 71.7k | mp_mutex_lock(&pool->lock); |
189 | 71.7k | struct work work = {fn, fn_ctx}; |
190 | | |
191 | | // If there are not enough threads to process all at once, but we can |
192 | | // create a new thread, then do so. If work is queued quickly, it can |
193 | | // happen that not all available threads have picked up work yet (up to |
194 | | // num_threads - busy_threads threads), which has to be accounted for. |
195 | 71.7k | if (pool->busy_threads + pool->num_work + 1 > pool->num_threads && |
196 | 71.5k | pool->num_threads < pool->max_threads) |
197 | 71.5k | { |
198 | 71.5k | if (!add_thread(pool)) { |
199 | | // If we can queue it, it'll get done as long as there is 1 thread. |
200 | 0 | ok = allow_queue && pool->num_threads > 0; |
201 | 0 | } |
202 | 71.5k | } |
203 | | |
204 | 71.7k | if (ok) { |
205 | 71.7k | MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work); |
206 | 71.7k | mp_cond_signal(&pool->wakeup); |
207 | 71.7k | } |
208 | | |
209 | 71.7k | mp_mutex_unlock(&pool->lock); |
210 | 71.7k | return ok; |
211 | 71.7k | } |
212 | | |
213 | | bool mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx), |
214 | | void *fn_ctx) |
215 | 71.7k | { |
216 | 71.7k | return thread_pool_add(pool, fn, fn_ctx, true); |
217 | 71.7k | } |
218 | | |
219 | | bool mp_thread_pool_run(struct mp_thread_pool *pool, void (*fn)(void *ctx), |
220 | | void *fn_ctx) |
221 | 0 | { |
222 | | return thread_pool_add(pool, fn, fn_ctx, false); |
223 | 0 | } |