/src/tinysparql/subprojects/glib-2.80.3/glib/gthreadpool.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* GLIB - Library of useful routines for C programming |
2 | | * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald |
3 | | * |
4 | | * GThreadPool: thread pool implementation. |
5 | | * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe |
6 | | * |
7 | | * SPDX-License-Identifier: LGPL-2.1-or-later |
8 | | * |
9 | | * This library is free software; you can redistribute it and/or |
10 | | * modify it under the terms of the GNU Lesser General Public |
11 | | * License as published by the Free Software Foundation; either |
12 | | * version 2.1 of the License, or (at your option) any later version. |
13 | | * |
14 | | * This library is distributed in the hope that it will be useful, |
15 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
17 | | * Lesser General Public License for more details. |
18 | | * |
19 | | * You should have received a copy of the GNU Lesser General Public |
20 | | * License along with this library; if not, see <http://www.gnu.org/licenses/>. |
21 | | */ |
22 | | |
23 | | /* |
24 | | * MT safe |
25 | | */ |
26 | | |
27 | | #include "config.h" |
28 | | |
29 | | #include "gthreadpool.h" |
30 | | |
31 | | #include "gasyncqueue.h" |
32 | | #include "gasyncqueueprivate.h" |
33 | | #include "glib-private.h" |
34 | | #include "gmain.h" |
35 | | #include "gtestutils.h" |
36 | | #include "gthreadprivate.h" |
37 | | #include "gtimer.h" |
38 | | #include "gutils.h" |
39 | | |
40 | | #define DEBUG_MSG(x) |
41 | | /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n"); */ |
42 | | |
43 | | typedef struct _GRealThreadPool GRealThreadPool; |
44 | | |
45 | | /** |
46 | | * GThreadPool: |
47 | | * @func: the function to execute in the threads of this pool |
48 | | * @user_data: the user data for the threads of this pool |
49 | | * @exclusive: are all threads exclusive to this pool |
50 | | * |
51 | | * The `GThreadPool` struct represents a thread pool. |
52 | | * |
53 | | * A thread pool is useful when you wish to asynchronously fork out the execution of work |
54 | | * and continue working in your own thread. If that will happen often, the overhead of starting |
55 | | * and destroying a thread each time might be too high. In such cases reusing already started |
56 | | * threads seems like a good idea. And it indeed is, but implementing this can be tedious |
57 | | * and error-prone. |
58 | | * |
59 | | * Therefore GLib provides thread pools for your convenience. An added advantage is, that the |
60 | | * threads can be shared between the different subsystems of your program, when they are using GLib. |
61 | | * |
62 | | * To create a new thread pool, you use [func@GLib.ThreadPool.new]. |
63 | | * It is destroyed by [method@GLib.ThreadPool.free]. |
64 | | * |
65 | | * If you want to execute a certain task within a thread pool, use [method@GLib.ThreadPool.push]. |
66 | | * |
67 | | * To get the current number of running threads you call [method@GLib.ThreadPool.get_num_threads]. |
68 | | * To get the number of still unprocessed tasks you call [method@GLib.ThreadPool.unprocessed]. |
69 | | * To control the maximum number of threads for a thread pool, you use |
70 | | * [method@GLib.ThreadPool.get_max_threads]. and [method@GLib.ThreadPool.set_max_threads]. |
71 | | * |
72 | | * Finally you can control the number of unused threads, that are kept alive by GLib for future use. |
73 | | * The current number can be fetched with [func@GLib.ThreadPool.get_num_unused_threads]. |
74 | | * The maximum number can be controlled by [func@GLib.ThreadPool.get_max_unused_threads] and |
75 | | * [func@GLib.ThreadPool.set_max_unused_threads]. All currently unused threads |
76 | | * can be stopped by calling [func@GLib.ThreadPool.stop_unused_threads]. |
77 | | */ |
78 | | struct _GRealThreadPool |
79 | | { |
80 | | GThreadPool pool; |
81 | | GAsyncQueue *queue; |
82 | | GCond cond; |
83 | | gint max_threads; |
84 | | guint num_threads; |
85 | | gboolean running; |
86 | | gboolean immediate; |
87 | | gboolean waiting; |
88 | | GCompareDataFunc sort_func; |
89 | | gpointer sort_user_data; |
90 | | }; |
91 | | |
92 | | /* The following is just an address to mark the wakeup order for a |
93 | | * thread, it could be any address (as long, as it isn't a valid |
94 | | * GThreadPool address) |
95 | | */ |
96 | | static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new; |
97 | | static gint wakeup_thread_serial = 0; |
98 | | |
99 | | /* Here all unused threads are waiting */ |
100 | | static GAsyncQueue *unused_thread_queue = NULL; |
101 | | static gint unused_threads = 0; |
102 | | static gint max_unused_threads = 2; |
103 | | static gint kill_unused_threads = 0; |
104 | | static guint max_idle_time = 15 * 1000; |
105 | | |
106 | | typedef struct |
107 | | { |
108 | | /* Either thread or error are set in the end. Both transfer-full. */ |
109 | | GThreadPool *pool; |
110 | | GThread *thread; |
111 | | GError *error; |
112 | | } SpawnThreadData; |
113 | | |
114 | | static GCond spawn_thread_cond; |
115 | | static GAsyncQueue *spawn_thread_queue; |
116 | | |
117 | | static void g_thread_pool_queue_push_unlocked (GRealThreadPool *pool, |
118 | | gpointer data); |
119 | | static void g_thread_pool_free_internal (GRealThreadPool *pool); |
120 | | static gpointer g_thread_pool_thread_proxy (gpointer data); |
121 | | static gboolean g_thread_pool_start_thread (GRealThreadPool *pool, |
122 | | GError **error); |
123 | | static void g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool); |
124 | | static GRealThreadPool* g_thread_pool_wait_for_new_pool (void); |
125 | | static gpointer g_thread_pool_wait_for_new_task (GRealThreadPool *pool); |
126 | | |
127 | | static void |
128 | | g_thread_pool_queue_push_unlocked (GRealThreadPool *pool, |
129 | | gpointer data) |
130 | 0 | { |
131 | 0 | if (pool->sort_func) |
132 | 0 | g_async_queue_push_sorted_unlocked (pool->queue, |
133 | 0 | data, |
134 | 0 | pool->sort_func, |
135 | 0 | pool->sort_user_data); |
136 | 0 | else |
137 | 0 | g_async_queue_push_unlocked (pool->queue, data); |
138 | 0 | } |
139 | | |
140 | | static GRealThreadPool* |
141 | | g_thread_pool_wait_for_new_pool (void) |
142 | 7.87k | { |
143 | 7.87k | GRealThreadPool *pool; |
144 | 7.87k | gint local_wakeup_thread_serial; |
145 | 7.87k | guint local_max_unused_threads; |
146 | 7.87k | gint local_max_idle_time; |
147 | 7.87k | gint last_wakeup_thread_serial; |
148 | 7.87k | gboolean have_relayed_thread_marker = FALSE; |
149 | | |
150 | 7.87k | local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads); |
151 | 7.87k | local_max_idle_time = g_atomic_int_get (&max_idle_time); |
152 | 7.87k | last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial); |
153 | | |
154 | 7.87k | do |
155 | 7.87k | { |
156 | 7.87k | if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads) |
157 | 0 | { |
158 | | /* If this is a superfluous thread, stop it. */ |
159 | 0 | pool = NULL; |
160 | 0 | } |
161 | 7.87k | else if (local_max_idle_time > 0) |
162 | 7.87k | { |
163 | | /* If a maximal idle time is given, wait for the given time. */ |
164 | 7.87k | DEBUG_MSG (("thread %p waiting in global pool for %f seconds.", |
165 | 7.87k | g_thread_self (), local_max_idle_time / 1000.0)); |
166 | | |
167 | 7.87k | pool = g_async_queue_timeout_pop (unused_thread_queue, |
168 | 7.87k | local_max_idle_time * 1000); |
169 | 7.87k | } |
170 | 0 | else |
171 | 0 | { |
172 | | /* If no maximal idle time is given, wait indefinitely. */ |
173 | 0 | DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ())); |
174 | 0 | pool = g_async_queue_pop (unused_thread_queue); |
175 | 0 | } |
176 | | |
177 | 7.87k | if (pool == wakeup_thread_marker) |
178 | 0 | { |
179 | 0 | local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial); |
180 | 0 | if (last_wakeup_thread_serial == local_wakeup_thread_serial) |
181 | 0 | { |
182 | 0 | if (!have_relayed_thread_marker) |
183 | 0 | { |
184 | | /* If this wakeup marker has been received for |
185 | | * the second time, relay it. |
186 | | */ |
187 | 0 | DEBUG_MSG (("thread %p relaying wakeup message to " |
188 | 0 | "waiting thread with lower serial.", |
189 | 0 | g_thread_self ())); |
190 | |
|
191 | 0 | g_async_queue_push (unused_thread_queue, wakeup_thread_marker); |
192 | 0 | have_relayed_thread_marker = TRUE; |
193 | | |
194 | | /* If a wakeup marker has been relayed, this thread |
195 | | * will get out of the way for 100 microseconds to |
196 | | * avoid receiving this marker again. |
197 | | */ |
198 | 0 | g_usleep (100); |
199 | 0 | } |
200 | 0 | } |
201 | 0 | else |
202 | 0 | { |
203 | 0 | if (g_atomic_int_add (&kill_unused_threads, -1) > 0) |
204 | 0 | { |
205 | 0 | pool = NULL; |
206 | 0 | break; |
207 | 0 | } |
208 | | |
209 | 0 | DEBUG_MSG (("thread %p updating to new limits.", |
210 | 0 | g_thread_self ())); |
211 | |
|
212 | 0 | local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads); |
213 | 0 | local_max_idle_time = g_atomic_int_get (&max_idle_time); |
214 | 0 | last_wakeup_thread_serial = local_wakeup_thread_serial; |
215 | |
|
216 | 0 | have_relayed_thread_marker = FALSE; |
217 | 0 | } |
218 | 0 | } |
219 | 7.87k | } |
220 | 7.87k | while (pool == wakeup_thread_marker); |
221 | | |
222 | 0 | return pool; |
223 | 7.87k | } |
224 | | |
225 | | static gpointer |
226 | | g_thread_pool_wait_for_new_task (GRealThreadPool *pool) |
227 | 15.7k | { |
228 | 15.7k | gpointer task = NULL; |
229 | | |
230 | 15.7k | if (pool->running || (!pool->immediate && |
231 | 7.87k | g_async_queue_length_unlocked (pool->queue) > 0)) |
232 | 7.87k | { |
233 | | /* This thread pool is still active. */ |
234 | 7.87k | if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads) |
235 | 0 | { |
236 | | /* This is a superfluous thread, so it goes to the global pool. */ |
237 | 0 | DEBUG_MSG (("superfluous thread %p in pool %p.", |
238 | 0 | g_thread_self (), pool)); |
239 | 0 | } |
240 | 7.87k | else if (pool->pool.exclusive) |
241 | 7.87k | { |
242 | | /* Exclusive threads stay attached to the pool. */ |
243 | 7.87k | task = g_async_queue_pop_unlocked (pool->queue); |
244 | | |
245 | 7.87k | DEBUG_MSG (("thread %p in exclusive pool %p waits for task " |
246 | 7.87k | "(%d running, %d unprocessed).", |
247 | 7.87k | g_thread_self (), pool, pool->num_threads, |
248 | 7.87k | g_async_queue_length_unlocked (pool->queue))); |
249 | 7.87k | } |
250 | 0 | else |
251 | 0 | { |
252 | | /* A thread will wait for new tasks for at most 1/2 |
253 | | * second before going to the global pool. |
254 | | */ |
255 | 0 | DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task " |
256 | 0 | "(%d running, %d unprocessed).", |
257 | 0 | g_thread_self (), pool, pool->num_threads, |
258 | 0 | g_async_queue_length_unlocked (pool->queue))); |
259 | |
|
260 | 0 | task = g_async_queue_timeout_pop_unlocked (pool->queue, |
261 | 0 | G_USEC_PER_SEC / 2); |
262 | 0 | } |
263 | 7.87k | } |
264 | 7.87k | else |
265 | 7.87k | { |
266 | | /* This thread pool is inactive, it will no longer process tasks. */ |
267 | 7.87k | DEBUG_MSG (("pool %p not active, thread %p will go to global pool " |
268 | 7.87k | "(running: %s, immediate: %s, len: %d).", |
269 | 7.87k | pool, g_thread_self (), |
270 | 7.87k | pool->running ? "true" : "false", |
271 | 7.87k | pool->immediate ? "true" : "false", |
272 | 7.87k | g_async_queue_length_unlocked (pool->queue))); |
273 | 7.87k | } |
274 | | |
275 | 15.7k | return task; |
276 | 15.7k | } |
277 | | |
278 | | static gpointer |
279 | | g_thread_pool_spawn_thread (gpointer data) |
280 | 4 | { |
281 | 8 | while (TRUE) |
282 | 4 | { |
283 | 4 | SpawnThreadData *spawn_thread_data; |
284 | 4 | GThread *thread = NULL; |
285 | 4 | GError *error = NULL; |
286 | 4 | const gchar *prgname = g_get_prgname (); |
287 | 4 | gchar name[16] = "pool"; |
288 | | |
289 | 4 | if (prgname) |
290 | 0 | g_snprintf (name, sizeof (name), "pool-%s", prgname); |
291 | | |
292 | 4 | g_async_queue_lock (spawn_thread_queue); |
293 | | /* Spawn a new thread for the given pool and wake the requesting thread |
294 | | * up again with the result. This new thread will have the scheduler |
295 | | * settings inherited from this thread and in extension of the thread |
296 | | * that created the first non-exclusive thread-pool. */ |
297 | 4 | spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue); |
298 | 4 | thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error); |
299 | | |
300 | 4 | spawn_thread_data->thread = g_steal_pointer (&thread); |
301 | 4 | spawn_thread_data->error = g_steal_pointer (&error); |
302 | | |
303 | 4 | g_cond_broadcast (&spawn_thread_cond); |
304 | 4 | g_async_queue_unlock (spawn_thread_queue); |
305 | 4 | } |
306 | | |
307 | 4 | return NULL; |
308 | 4 | } |
309 | | |
310 | | static gpointer |
311 | | g_thread_pool_thread_proxy (gpointer data) |
312 | 4 | { |
313 | 4 | GRealThreadPool *pool; |
314 | | |
315 | 4 | pool = data; |
316 | | |
317 | 4 | DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool)); |
318 | | |
319 | 4 | g_async_queue_lock (pool->queue); |
320 | | |
321 | 15.7k | while (TRUE) |
322 | 15.7k | { |
323 | 15.7k | gpointer task; |
324 | | |
325 | 15.7k | task = g_thread_pool_wait_for_new_task (pool); |
326 | 15.7k | if (task) |
327 | 7.87k | { |
328 | 7.87k | if (pool->running || !pool->immediate) |
329 | 0 | { |
330 | | /* A task was received and the thread pool is active, |
331 | | * so execute the function. |
332 | | */ |
333 | 0 | g_async_queue_unlock (pool->queue); |
334 | 0 | DEBUG_MSG (("thread %p in pool %p calling func.", |
335 | 0 | g_thread_self (), pool)); |
336 | 0 | pool->pool.func (task, pool->pool.user_data); |
337 | 0 | g_async_queue_lock (pool->queue); |
338 | 0 | } |
339 | 7.87k | } |
340 | 7.87k | else |
341 | 7.87k | { |
342 | | /* No task was received, so this thread goes to the global pool. */ |
343 | 7.87k | gboolean free_pool = FALSE; |
344 | | |
345 | 7.87k | DEBUG_MSG (("thread %p leaving pool %p for global pool.", |
346 | 7.87k | g_thread_self (), pool)); |
347 | 7.87k | pool->num_threads--; |
348 | | |
349 | 7.87k | if (!pool->running) |
350 | 7.87k | { |
351 | 7.87k | if (!pool->waiting) |
352 | 7.87k | { |
353 | 7.87k | if (pool->num_threads == 0) |
354 | 7.87k | { |
355 | | /* If the pool is not running and no other |
356 | | * thread is waiting for this thread pool to |
357 | | * finish and this is the last thread of this |
358 | | * pool, free the pool. |
359 | | */ |
360 | 7.87k | free_pool = TRUE; |
361 | 7.87k | } |
362 | 0 | else |
363 | 0 | { |
364 | | /* If the pool is not running and no other |
365 | | * thread is waiting for this thread pool to |
366 | | * finish and this is not the last thread of |
367 | | * this pool and there are no tasks left in the |
368 | | * queue, wakeup the remaining threads. |
369 | | */ |
370 | 0 | if (g_async_queue_length_unlocked (pool->queue) == |
371 | 0 | (gint) -pool->num_threads) |
372 | 0 | g_thread_pool_wakeup_and_stop_all (pool); |
373 | 0 | } |
374 | 7.87k | } |
375 | 0 | else if (pool->immediate || |
376 | 0 | g_async_queue_length_unlocked (pool->queue) <= 0) |
377 | 0 | { |
378 | | /* If the pool is not running and another thread is |
379 | | * waiting for this thread pool to finish and there |
380 | | * are either no tasks left or the pool shall stop |
381 | | * immediately, inform the waiting thread of a change |
382 | | * of the thread pool state. |
383 | | */ |
384 | 0 | g_cond_broadcast (&pool->cond); |
385 | 0 | } |
386 | 7.87k | } |
387 | | |
388 | 7.87k | g_atomic_int_inc (&unused_threads); |
389 | 7.87k | g_async_queue_unlock (pool->queue); |
390 | | |
391 | 7.87k | if (free_pool) |
392 | 7.87k | g_thread_pool_free_internal (pool); |
393 | | |
394 | 7.87k | pool = g_thread_pool_wait_for_new_pool (); |
395 | 7.87k | g_atomic_int_add (&unused_threads, -1); |
396 | | |
397 | 7.87k | if (pool == NULL) |
398 | 0 | break; |
399 | | |
400 | 7.87k | g_async_queue_lock (pool->queue); |
401 | | |
402 | 7.87k | DEBUG_MSG (("thread %p entering pool %p from global pool.", |
403 | 7.87k | g_thread_self (), pool)); |
404 | | |
405 | | /* pool->num_threads++ is not done here, but in |
406 | | * g_thread_pool_start_thread to make the new started |
407 | | * thread known to the pool before itself can do it. |
408 | | */ |
409 | 7.87k | } |
410 | 15.7k | } |
411 | | |
412 | 4 | return NULL; |
413 | 4 | } |
414 | | |
415 | | static gboolean |
416 | | g_thread_pool_start_thread (GRealThreadPool *pool, |
417 | | GError **error) |
418 | 7.87k | { |
419 | 7.87k | gboolean success = FALSE; |
420 | | |
421 | 7.87k | if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads) |
422 | | /* Enough threads are already running */ |
423 | 0 | return TRUE; |
424 | | |
425 | 7.87k | g_async_queue_lock (unused_thread_queue); |
426 | | |
427 | 7.87k | if (g_async_queue_length_unlocked (unused_thread_queue) < 0) |
428 | 7.87k | { |
429 | 7.87k | g_async_queue_push_unlocked (unused_thread_queue, pool); |
430 | 7.87k | success = TRUE; |
431 | 7.87k | } |
432 | | |
433 | 7.87k | g_async_queue_unlock (unused_thread_queue); |
434 | | |
435 | 7.87k | if (!success) |
436 | 4 | { |
437 | 4 | const gchar *prgname = g_get_prgname (); |
438 | 4 | gchar name[16] = "pool"; |
439 | 4 | GThread *thread; |
440 | | |
441 | 4 | if (prgname) |
442 | 0 | g_snprintf (name, sizeof (name), "pool-%s", prgname); |
443 | | |
444 | | /* No thread was found, we have to start a new one */ |
445 | 4 | if (pool->pool.exclusive) |
446 | 4 | { |
447 | | /* For exclusive thread-pools this is directly called from new() and |
448 | | * we simply start new threads that inherit the scheduler settings |
449 | | * from the current thread. |
450 | | */ |
451 | 4 | thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error); |
452 | 4 | } |
453 | 0 | else |
454 | 0 | { |
455 | | /* For non-exclusive thread-pools this can be called at any time |
456 | | * when a new thread is needed. We make sure to create a new thread |
457 | | * here with the correct scheduler settings by going via our helper |
458 | | * thread. |
459 | | */ |
460 | 0 | SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL }; |
461 | |
|
462 | 0 | g_async_queue_lock (spawn_thread_queue); |
463 | |
|
464 | 0 | g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data); |
465 | |
|
466 | 0 | while (!spawn_thread_data.thread && !spawn_thread_data.error) |
467 | 0 | g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue)); |
468 | |
|
469 | 0 | thread = spawn_thread_data.thread; |
470 | 0 | if (!thread) |
471 | 0 | g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error)); |
472 | 0 | g_async_queue_unlock (spawn_thread_queue); |
473 | 0 | } |
474 | | |
475 | 4 | if (thread == NULL) |
476 | 0 | return FALSE; |
477 | | |
478 | 4 | g_thread_unref (thread); |
479 | 4 | } |
480 | | |
481 | | /* See comment in g_thread_pool_thread_proxy as to why this is done |
482 | | * here and not there |
483 | | */ |
484 | 7.87k | pool->num_threads++; |
485 | | |
486 | 7.87k | return TRUE; |
487 | 7.87k | } |
488 | | |
489 | | /** |
490 | | * g_thread_pool_new: |
491 | | * @func: a function to execute in the threads of the new thread pool |
492 | | * @user_data: user data that is handed over to @func every time it |
493 | | * is called |
494 | | * @max_threads: the maximal number of threads to execute concurrently |
495 | | * in the new thread pool, -1 means no limit |
496 | | * @exclusive: should this thread pool be exclusive? |
497 | | * @error: return location for error, or %NULL |
498 | | * |
499 | | * This function creates a new thread pool. |
500 | | * |
501 | | * Whenever you call g_thread_pool_push(), either a new thread is |
502 | | * created or an unused one is reused. At most @max_threads threads |
503 | | * are running concurrently for this thread pool. @max_threads = -1 |
504 | | * allows unlimited threads to be created for this thread pool. The |
505 | | * newly created or reused thread now executes the function @func |
506 | | * with the two arguments. The first one is the parameter to |
507 | | * g_thread_pool_push() and the second one is @user_data. |
508 | | * |
509 | | * Pass g_get_num_processors() to @max_threads to create as many threads as |
510 | | * there are logical processors on the system. This will not pin each thread to |
511 | | * a specific processor. |
512 | | * |
513 | | * The parameter @exclusive determines whether the thread pool owns |
514 | | * all threads exclusive or shares them with other thread pools. |
515 | | * If @exclusive is %TRUE, @max_threads threads are started |
516 | | * immediately and they will run exclusively for this thread pool |
517 | | * until it is destroyed by g_thread_pool_free(). If @exclusive is |
518 | | * %FALSE, threads are created when needed and shared between all |
519 | | * non-exclusive thread pools. This implies that @max_threads may |
520 | | * not be -1 for exclusive thread pools. Besides, exclusive thread |
521 | | * pools are not affected by g_thread_pool_set_max_idle_time() |
522 | | * since their threads are never considered idle and returned to the |
523 | | * global pool. |
524 | | * |
525 | | * Note that the threads used by exclusive thread pools will all inherit the |
526 | | * scheduler settings of the current thread while the threads used by |
527 | | * non-exclusive thread pools will inherit the scheduler settings from the |
528 | | * first thread that created such a thread pool. |
529 | | * |
530 | | * At least one thread will be spawned when this function is called, either to |
531 | | * create the @max_threads exclusive threads, or to preserve the scheduler |
532 | | * settings of the current thread for future spawns. |
533 | | * |
534 | | * @error can be %NULL to ignore errors, or non-%NULL to report |
535 | | * errors. An error can only occur when @exclusive is set to %TRUE |
536 | | * and not all @max_threads threads could be created. |
537 | | * See #GThreadError for possible errors that may occur. |
538 | | * Note, even in case of error a valid #GThreadPool is returned. |
539 | | * |
540 | | * Returns: the new #GThreadPool |
541 | | */ |
542 | | GThreadPool * |
543 | | g_thread_pool_new (GFunc func, |
544 | | gpointer user_data, |
545 | | gint max_threads, |
546 | | gboolean exclusive, |
547 | | GError **error) |
548 | 15.7k | { |
549 | 15.7k | return g_thread_pool_new_full (func, user_data, NULL, max_threads, exclusive, error); |
550 | 15.7k | } |
551 | | |
552 | | /** |
553 | | * g_thread_pool_new_full: |
554 | | * @func: a function to execute in the threads of the new thread pool |
555 | | * @user_data: user data that is handed over to @func every time it |
556 | | * is called |
557 | | * @item_free_func: (nullable): used to pass as a free function to |
558 | | * g_async_queue_new_full() |
559 | | * @max_threads: the maximal number of threads to execute concurrently |
560 | | * in the new thread pool, `-1` means no limit |
561 | | * @exclusive: should this thread pool be exclusive? |
562 | | * @error: return location for error, or %NULL |
563 | | * |
564 | | * This function creates a new thread pool similar to g_thread_pool_new() |
565 | | * but allowing @item_free_func to be specified to free the data passed |
566 | | * to g_thread_pool_push() in the case that the #GThreadPool is stopped |
567 | | * and freed before all tasks have been executed. |
568 | | * |
569 | | * @item_free_func will *not* be called on items successfully passed to @func. |
570 | | * @func is responsible for freeing the items passed to it. |
571 | | * |
572 | | * Returns: (transfer full): the new #GThreadPool |
573 | | * |
574 | | * Since: 2.70 |
575 | | */ |
576 | | GThreadPool * |
577 | | g_thread_pool_new_full (GFunc func, |
578 | | gpointer user_data, |
579 | | GDestroyNotify item_free_func, |
580 | | gint max_threads, |
581 | | gboolean exclusive, |
582 | | GError **error) |
583 | 15.7k | { |
584 | 15.7k | GRealThreadPool *retval; |
585 | 15.7k | G_LOCK_DEFINE_STATIC (init); |
586 | | |
587 | 15.7k | g_return_val_if_fail (func, NULL); |
588 | 15.7k | g_return_val_if_fail (!exclusive || max_threads != -1, NULL); |
589 | 15.7k | g_return_val_if_fail (max_threads >= -1, NULL); |
590 | | |
591 | 15.7k | retval = g_new (GRealThreadPool, 1); |
592 | | |
593 | 15.7k | retval->pool.func = func; |
594 | 15.7k | retval->pool.user_data = user_data; |
595 | 15.7k | retval->pool.exclusive = exclusive; |
596 | 15.7k | retval->queue = g_async_queue_new_full (item_free_func); |
597 | 15.7k | g_cond_init (&retval->cond); |
598 | 15.7k | retval->max_threads = max_threads; |
599 | 15.7k | retval->num_threads = 0; |
600 | 15.7k | retval->running = TRUE; |
601 | 15.7k | retval->immediate = FALSE; |
602 | 15.7k | retval->waiting = FALSE; |
603 | 15.7k | retval->sort_func = NULL; |
604 | 15.7k | retval->sort_user_data = NULL; |
605 | | |
606 | 15.7k | G_LOCK (init); |
607 | 15.7k | if (!unused_thread_queue) |
608 | 4 | unused_thread_queue = g_async_queue_new (); |
609 | | |
610 | | /* |
611 | | * Spawn a helper thread that is only responsible for spawning new threads |
612 | | * with the scheduler settings of the current thread. |
613 | | * |
614 | | * This is then used for making sure that all threads created on the |
615 | | * non-exclusive thread-pool have the same scheduler settings, and more |
616 | | * importantly don't just inherit them from the thread that just happened to |
617 | | * push a new task and caused a new thread to be created. |
618 | | * |
619 | | * Not doing so could cause real-time priority threads or otherwise |
620 | | * threads with problematic scheduler settings to be part of the |
621 | | * non-exclusive thread-pools. |
622 | | * |
623 | | * For exclusive thread-pools this is not required as all threads are |
624 | | * created immediately below and are running forever, so they will |
625 | | * automatically inherit the scheduler settings from this very thread. |
626 | | */ |
627 | 15.7k | if (!exclusive && !spawn_thread_queue) |
628 | 4 | { |
629 | 4 | GThread *pool_spawner = NULL; |
630 | | |
631 | 4 | spawn_thread_queue = g_async_queue_new (); |
632 | 4 | g_cond_init (&spawn_thread_cond); |
633 | 4 | pool_spawner = g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL); |
634 | 4 | g_ignore_leak (pool_spawner); |
635 | 4 | } |
636 | 15.7k | G_UNLOCK (init); |
637 | | |
638 | 15.7k | if (retval->pool.exclusive) |
639 | 7.87k | { |
640 | 7.87k | g_async_queue_lock (retval->queue); |
641 | | |
642 | 15.7k | while (retval->num_threads < (guint) retval->max_threads) |
643 | 7.87k | { |
644 | 7.87k | GError *local_error = NULL; |
645 | | |
646 | 7.87k | if (!g_thread_pool_start_thread (retval, &local_error)) |
647 | 0 | { |
648 | 0 | g_propagate_error (error, local_error); |
649 | 0 | break; |
650 | 0 | } |
651 | 7.87k | } |
652 | | |
653 | 7.87k | g_async_queue_unlock (retval->queue); |
654 | 7.87k | } |
655 | | |
656 | 15.7k | return (GThreadPool*) retval; |
657 | 15.7k | } |
658 | | |
659 | | /** |
660 | | * g_thread_pool_push: |
661 | | * @pool: a #GThreadPool |
662 | | * @data: a new task for @pool |
663 | | * @error: return location for error, or %NULL |
664 | | * |
665 | | * Inserts @data into the list of tasks to be executed by @pool. |
666 | | * |
667 | | * When the number of currently running threads is lower than the |
668 | | * maximal allowed number of threads, a new thread is started (or |
669 | | * reused) with the properties given to g_thread_pool_new(). |
670 | | * Otherwise, @data stays in the queue until a thread in this pool |
671 | | * finishes its previous task and processes @data. |
672 | | * |
673 | | * @error can be %NULL to ignore errors, or non-%NULL to report |
674 | | * errors. An error can only occur when a new thread couldn't be |
675 | | * created. In that case @data is simply appended to the queue of |
676 | | * work to do. |
677 | | * |
678 | | * Before version 2.32, this function did not return a success status. |
679 | | * |
680 | | * Returns: %TRUE on success, %FALSE if an error occurred |
681 | | */ |
682 | | gboolean |
683 | | g_thread_pool_push (GThreadPool *pool, |
684 | | gpointer data, |
685 | | GError **error) |
686 | 0 | { |
687 | 0 | GRealThreadPool *real; |
688 | 0 | gboolean result; |
689 | |
|
690 | 0 | real = (GRealThreadPool*) pool; |
691 | |
|
692 | 0 | g_return_val_if_fail (real, FALSE); |
693 | 0 | g_return_val_if_fail (real->running, FALSE); |
694 | | |
695 | 0 | result = TRUE; |
696 | |
|
697 | 0 | g_async_queue_lock (real->queue); |
698 | |
|
699 | 0 | if (g_async_queue_length_unlocked (real->queue) >= 0) |
700 | 0 | { |
701 | | /* No thread is waiting in the queue */ |
702 | 0 | GError *local_error = NULL; |
703 | |
|
704 | 0 | if (!g_thread_pool_start_thread (real, &local_error)) |
705 | 0 | { |
706 | 0 | g_propagate_error (error, local_error); |
707 | 0 | result = FALSE; |
708 | 0 | } |
709 | 0 | } |
710 | |
|
711 | 0 | g_thread_pool_queue_push_unlocked (real, data); |
712 | 0 | g_async_queue_unlock (real->queue); |
713 | |
|
714 | 0 | return result; |
715 | 0 | } |
716 | | |
717 | | /** |
718 | | * g_thread_pool_set_max_threads: |
719 | | * @pool: a #GThreadPool |
720 | | * @max_threads: a new maximal number of threads for @pool, |
721 | | * or -1 for unlimited |
722 | | * @error: return location for error, or %NULL |
723 | | * |
724 | | * Sets the maximal allowed number of threads for @pool. |
725 | | * A value of -1 means that the maximal number of threads |
726 | | * is unlimited. If @pool is an exclusive thread pool, setting |
727 | | * the maximal number of threads to -1 is not allowed. |
728 | | * |
729 | | * Setting @max_threads to 0 means stopping all work for @pool. |
730 | | * It is effectively frozen until @max_threads is set to a non-zero |
731 | | * value again. |
732 | | * |
733 | | * A thread is never terminated while calling @func, as supplied by |
734 | | * g_thread_pool_new(). Instead the maximal number of threads only |
735 | | * has effect for the allocation of new threads in g_thread_pool_push(). |
736 | | * A new thread is allocated, whenever the number of currently |
737 | | * running threads in @pool is smaller than the maximal number. |
738 | | * |
739 | | * @error can be %NULL to ignore errors, or non-%NULL to report |
740 | | * errors. An error can only occur when a new thread couldn't be |
741 | | * created. |
742 | | * |
743 | | * Before version 2.32, this function did not return a success status. |
744 | | * |
745 | | * Returns: %TRUE on success, %FALSE if an error occurred |
746 | | */ |
747 | | gboolean |
748 | | g_thread_pool_set_max_threads (GThreadPool *pool, |
749 | | gint max_threads, |
750 | | GError **error) |
751 | 0 | { |
752 | 0 | GRealThreadPool *real; |
753 | 0 | gint to_start; |
754 | 0 | gboolean result; |
755 | |
|
756 | 0 | real = (GRealThreadPool*) pool; |
757 | |
|
758 | 0 | g_return_val_if_fail (real, FALSE); |
759 | 0 | g_return_val_if_fail (real->running, FALSE); |
760 | 0 | g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE); |
761 | 0 | g_return_val_if_fail (max_threads >= -1, FALSE); |
762 | | |
763 | 0 | result = TRUE; |
764 | |
|
765 | 0 | g_async_queue_lock (real->queue); |
766 | |
|
767 | 0 | real->max_threads = max_threads; |
768 | |
|
769 | 0 | if (pool->exclusive) |
770 | 0 | to_start = real->max_threads - real->num_threads; |
771 | 0 | else |
772 | 0 | to_start = g_async_queue_length_unlocked (real->queue); |
773 | |
|
774 | 0 | for ( ; to_start > 0; to_start--) |
775 | 0 | { |
776 | 0 | GError *local_error = NULL; |
777 | |
|
778 | 0 | if (!g_thread_pool_start_thread (real, &local_error)) |
779 | 0 | { |
780 | 0 | g_propagate_error (error, local_error); |
781 | 0 | result = FALSE; |
782 | 0 | break; |
783 | 0 | } |
784 | 0 | } |
785 | |
|
786 | 0 | g_async_queue_unlock (real->queue); |
787 | |
|
788 | 0 | return result; |
789 | 0 | } |
790 | | |
791 | | /** |
792 | | * g_thread_pool_get_max_threads: |
793 | | * @pool: a #GThreadPool |
794 | | * |
795 | | * Returns the maximal number of threads for @pool. |
796 | | * |
797 | | * Returns: the maximal number of threads |
798 | | */ |
799 | | gint |
800 | | g_thread_pool_get_max_threads (GThreadPool *pool) |
801 | 0 | { |
802 | 0 | GRealThreadPool *real; |
803 | 0 | gint retval; |
804 | |
|
805 | 0 | real = (GRealThreadPool*) pool; |
806 | |
|
807 | 0 | g_return_val_if_fail (real, 0); |
808 | 0 | g_return_val_if_fail (real->running, 0); |
809 | | |
810 | 0 | g_async_queue_lock (real->queue); |
811 | 0 | retval = real->max_threads; |
812 | 0 | g_async_queue_unlock (real->queue); |
813 | |
|
814 | 0 | return retval; |
815 | 0 | } |
816 | | |
817 | | /** |
818 | | * g_thread_pool_get_num_threads: |
819 | | * @pool: a #GThreadPool |
820 | | * |
821 | | * Returns the number of threads currently running in @pool. |
822 | | * |
823 | | * Returns: the number of threads currently running |
824 | | */ |
825 | | guint |
826 | | g_thread_pool_get_num_threads (GThreadPool *pool) |
827 | 0 | { |
828 | 0 | GRealThreadPool *real; |
829 | 0 | guint retval; |
830 | |
|
831 | 0 | real = (GRealThreadPool*) pool; |
832 | |
|
833 | 0 | g_return_val_if_fail (real, 0); |
834 | 0 | g_return_val_if_fail (real->running, 0); |
835 | | |
836 | 0 | g_async_queue_lock (real->queue); |
837 | 0 | retval = real->num_threads; |
838 | 0 | g_async_queue_unlock (real->queue); |
839 | |
|
840 | 0 | return retval; |
841 | 0 | } |
842 | | |
843 | | /** |
844 | | * g_thread_pool_unprocessed: |
845 | | * @pool: a #GThreadPool |
846 | | * |
847 | | * Returns the number of tasks still unprocessed in @pool. |
848 | | * |
849 | | * Returns: the number of unprocessed tasks |
850 | | */ |
851 | | guint |
852 | | g_thread_pool_unprocessed (GThreadPool *pool) |
853 | 0 | { |
854 | 0 | GRealThreadPool *real; |
855 | 0 | gint unprocessed; |
856 | |
|
857 | 0 | real = (GRealThreadPool*) pool; |
858 | |
|
859 | 0 | g_return_val_if_fail (real, 0); |
860 | 0 | g_return_val_if_fail (real->running, 0); |
861 | | |
862 | 0 | unprocessed = g_async_queue_length (real->queue); |
863 | |
|
864 | 0 | return MAX (unprocessed, 0); |
865 | 0 | } |
866 | | |
867 | | /** |
868 | | * g_thread_pool_free: |
869 | | * @pool: a #GThreadPool |
870 | | * @immediate: should @pool shut down immediately? |
871 | | * @wait_: should the function wait for all tasks to be finished? |
872 | | * |
873 | | * Frees all resources allocated for @pool. |
874 | | * |
875 | | * If @immediate is %TRUE, no new task is processed for @pool. |
876 | | * Otherwise @pool is not freed before the last task is processed. |
877 | | * Note however, that no thread of this pool is interrupted while |
878 | | * processing a task. Instead at least all still running threads |
879 | | * can finish their tasks before the @pool is freed. |
880 | | * |
881 | | * If @wait_ is %TRUE, this function does not return before all |
882 | | * tasks to be processed (dependent on @immediate, whether all |
883 | | * or only the currently running) are ready. |
884 | | * Otherwise this function returns immediately. |
885 | | * |
886 | | * After calling this function @pool must not be used anymore. |
887 | | */ |
888 | | void |
889 | | g_thread_pool_free (GThreadPool *pool, |
890 | | gboolean immediate, |
891 | | gboolean wait_) |
892 | 15.7k | { |
893 | 15.7k | GRealThreadPool *real; |
894 | | |
895 | 15.7k | real = (GRealThreadPool*) pool; |
896 | | |
897 | 15.7k | g_return_if_fail (real); |
898 | 15.7k | g_return_if_fail (real->running); |
899 | | |
900 | | /* If there's no thread allowed here, there is not much sense in |
901 | | * not stopping this pool immediately, when it's not empty |
902 | | */ |
903 | 15.7k | g_return_if_fail (immediate || |
904 | 15.7k | real->max_threads != 0 || |
905 | 15.7k | g_async_queue_length (real->queue) == 0); |
906 | | |
907 | 15.7k | g_async_queue_lock (real->queue); |
908 | | |
909 | 15.7k | real->running = FALSE; |
910 | 15.7k | real->immediate = immediate; |
911 | 15.7k | real->waiting = wait_; |
912 | | |
913 | 15.7k | if (wait_) |
914 | 15.7k | { |
915 | 15.7k | while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads && |
916 | 15.7k | !(immediate && real->num_threads == 0)) |
917 | 0 | g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue)); |
918 | 15.7k | } |
919 | | |
920 | 15.7k | if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads) |
921 | 15.7k | { |
922 | | /* No thread is currently doing something (and nothing is left |
923 | | * to process in the queue) |
924 | | */ |
925 | 15.7k | if (real->num_threads == 0) |
926 | 7.87k | { |
927 | | /* No threads left, we clean up */ |
928 | 7.87k | g_async_queue_unlock (real->queue); |
929 | 7.87k | g_thread_pool_free_internal (real); |
930 | 7.87k | return; |
931 | 7.87k | } |
932 | | |
933 | 7.87k | g_thread_pool_wakeup_and_stop_all (real); |
934 | 7.87k | } |
935 | | |
936 | | /* The last thread should cleanup the pool */ |
937 | 7.87k | real->waiting = FALSE; |
938 | 7.87k | g_async_queue_unlock (real->queue); |
939 | 7.87k | } |
940 | | |
941 | | static void |
942 | | g_thread_pool_free_internal (GRealThreadPool* pool) |
943 | 15.7k | { |
944 | 15.7k | g_return_if_fail (pool); |
945 | 15.7k | g_return_if_fail (pool->running == FALSE); |
946 | 15.7k | g_return_if_fail (pool->num_threads == 0); |
947 | | |
948 | | /* Ensure the dummy item pushed on by g_thread_pool_wakeup_and_stop_all() is |
949 | | * removed, before it’s potentially passed to the user-provided |
950 | | * @item_free_func. */ |
951 | 15.7k | g_async_queue_remove (pool->queue, GUINT_TO_POINTER (1)); |
952 | | |
953 | 15.7k | g_async_queue_unref (pool->queue); |
954 | 15.7k | g_cond_clear (&pool->cond); |
955 | | |
956 | 15.7k | g_free (pool); |
957 | 15.7k | } |
958 | | |
959 | | static void |
960 | | g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool) |
961 | 7.87k | { |
962 | 7.87k | guint i; |
963 | | |
964 | 7.87k | g_return_if_fail (pool); |
965 | 7.87k | g_return_if_fail (pool->running == FALSE); |
966 | 7.87k | g_return_if_fail (pool->num_threads != 0); |
967 | | |
968 | 7.87k | pool->immediate = TRUE; |
969 | | |
970 | | /* |
971 | | * So here we're sending bogus data to the pool threads, which |
972 | | * should cause them each to wake up, and check the above |
973 | | * pool->immediate condition. However we don't want that |
974 | | * data to be sorted (since it'll crash the sorter). |
975 | | */ |
976 | 15.7k | for (i = 0; i < pool->num_threads; i++) |
977 | 7.87k | g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1)); |
978 | 7.87k | } |
979 | | |
980 | | /** |
981 | | * g_thread_pool_set_max_unused_threads: |
982 | | * @max_threads: maximal number of unused threads |
983 | | * |
984 | | * Sets the maximal number of unused threads to @max_threads. |
985 | | * If @max_threads is -1, no limit is imposed on the number |
986 | | * of unused threads. |
987 | | * |
988 | | * The default value is 2. |
989 | | */ |
990 | | void |
991 | | g_thread_pool_set_max_unused_threads (gint max_threads) |
992 | 0 | { |
993 | 0 | g_return_if_fail (max_threads >= -1); |
994 | | |
995 | 0 | g_atomic_int_set (&max_unused_threads, max_threads); |
996 | |
|
997 | 0 | if (max_threads != -1) |
998 | 0 | { |
999 | 0 | max_threads -= g_atomic_int_get (&unused_threads); |
1000 | 0 | if (max_threads < 0) |
1001 | 0 | { |
1002 | 0 | g_atomic_int_set (&kill_unused_threads, -max_threads); |
1003 | 0 | g_atomic_int_inc (&wakeup_thread_serial); |
1004 | |
|
1005 | 0 | g_async_queue_lock (unused_thread_queue); |
1006 | |
|
1007 | 0 | do |
1008 | 0 | { |
1009 | 0 | g_async_queue_push_unlocked (unused_thread_queue, |
1010 | 0 | wakeup_thread_marker); |
1011 | 0 | } |
1012 | 0 | while (++max_threads); |
1013 | |
|
1014 | 0 | g_async_queue_unlock (unused_thread_queue); |
1015 | 0 | } |
1016 | 0 | } |
1017 | 0 | } |
1018 | | |
1019 | | /** |
1020 | | * g_thread_pool_get_max_unused_threads: |
1021 | | * |
1022 | | * Returns the maximal allowed number of unused threads. |
1023 | | * |
1024 | | * Returns: the maximal number of unused threads |
1025 | | */ |
1026 | | gint |
1027 | | g_thread_pool_get_max_unused_threads (void) |
1028 | 0 | { |
1029 | 0 | return g_atomic_int_get (&max_unused_threads); |
1030 | 0 | } |
1031 | | |
1032 | | /** |
1033 | | * g_thread_pool_get_num_unused_threads: |
1034 | | * |
1035 | | * Returns the number of currently unused threads. |
1036 | | * |
1037 | | * Returns: the number of currently unused threads |
1038 | | */ |
1039 | | guint |
1040 | | g_thread_pool_get_num_unused_threads (void) |
1041 | 0 | { |
1042 | 0 | return (guint) g_atomic_int_get (&unused_threads); |
1043 | 0 | } |
1044 | | |
1045 | | /** |
1046 | | * g_thread_pool_stop_unused_threads: |
1047 | | * |
1048 | | * Stops all currently unused threads. This does not change the |
1049 | | * maximal number of unused threads. This function can be used to |
1050 | | * regularly stop all unused threads e.g. from g_timeout_add(). |
1051 | | */ |
1052 | | void |
1053 | | g_thread_pool_stop_unused_threads (void) |
1054 | 0 | { |
1055 | 0 | guint oldval; |
1056 | |
|
1057 | 0 | oldval = g_thread_pool_get_max_unused_threads (); |
1058 | |
|
1059 | 0 | g_thread_pool_set_max_unused_threads (0); |
1060 | 0 | g_thread_pool_set_max_unused_threads (oldval); |
1061 | 0 | } |
1062 | | |
1063 | | /** |
1064 | | * g_thread_pool_set_sort_function: |
1065 | | * @pool: a #GThreadPool |
1066 | | * @func: the #GCompareDataFunc used to sort the list of tasks. |
1067 | | * This function is passed two tasks. It should return |
1068 | | * 0 if the order in which they are handled does not matter, |
1069 | | * a negative value if the first task should be processed before |
1070 | | * the second or a positive value if the second task should be |
1071 | | * processed first. |
1072 | | * @user_data: user data passed to @func |
1073 | | * |
1074 | | * Sets the function used to sort the list of tasks. This allows the |
1075 | | * tasks to be processed by a priority determined by @func, and not |
1076 | | * just in the order in which they were added to the pool. |
1077 | | * |
1078 | | * Note, if the maximum number of threads is more than 1, the order |
1079 | | * that threads are executed cannot be guaranteed 100%. Threads are |
1080 | | * scheduled by the operating system and are executed at random. It |
1081 | | * cannot be assumed that threads are executed in the order they are |
1082 | | * created. |
1083 | | * |
1084 | | * Since: 2.10 |
1085 | | */ |
1086 | | void |
1087 | | g_thread_pool_set_sort_function (GThreadPool *pool, |
1088 | | GCompareDataFunc func, |
1089 | | gpointer user_data) |
1090 | 0 | { |
1091 | 0 | GRealThreadPool *real; |
1092 | |
|
1093 | 0 | real = (GRealThreadPool*) pool; |
1094 | |
|
1095 | 0 | g_return_if_fail (real); |
1096 | 0 | g_return_if_fail (real->running); |
1097 | | |
1098 | 0 | g_async_queue_lock (real->queue); |
1099 | |
|
1100 | 0 | real->sort_func = func; |
1101 | 0 | real->sort_user_data = user_data; |
1102 | |
|
1103 | 0 | if (func) |
1104 | 0 | g_async_queue_sort_unlocked (real->queue, |
1105 | 0 | real->sort_func, |
1106 | 0 | real->sort_user_data); |
1107 | |
|
1108 | 0 | g_async_queue_unlock (real->queue); |
1109 | 0 | } |
1110 | | |
1111 | | /** |
1112 | | * g_thread_pool_move_to_front: |
1113 | | * @pool: a #GThreadPool |
1114 | | * @data: an unprocessed item in the pool |
1115 | | * |
1116 | | * Moves the item to the front of the queue of unprocessed |
1117 | | * items, so that it will be processed next. |
1118 | | * |
1119 | | * Returns: %TRUE if the item was found and moved |
1120 | | * |
1121 | | * Since: 2.46 |
1122 | | */ |
1123 | | gboolean |
1124 | | g_thread_pool_move_to_front (GThreadPool *pool, |
1125 | | gpointer data) |
1126 | 0 | { |
1127 | 0 | GRealThreadPool *real = (GRealThreadPool*) pool; |
1128 | 0 | gboolean found; |
1129 | |
|
1130 | 0 | g_async_queue_lock (real->queue); |
1131 | |
|
1132 | 0 | found = g_async_queue_remove_unlocked (real->queue, data); |
1133 | 0 | if (found) |
1134 | 0 | g_async_queue_push_front_unlocked (real->queue, data); |
1135 | |
|
1136 | 0 | g_async_queue_unlock (real->queue); |
1137 | |
|
1138 | 0 | return found; |
1139 | 0 | } |
1140 | | |
1141 | | /** |
1142 | | * g_thread_pool_set_max_idle_time: |
1143 | | * @interval: the maximum @interval (in milliseconds) |
1144 | | * a thread can be idle |
1145 | | * |
1146 | | * This function will set the maximum @interval that a thread |
1147 | | * waiting in the pool for new tasks can be idle for before |
1148 | | * being stopped. This function is similar to calling |
1149 | | * g_thread_pool_stop_unused_threads() on a regular timeout, |
1150 | | * except this is done on a per thread basis. |
1151 | | * |
1152 | | * By setting @interval to 0, idle threads will not be stopped. |
1153 | | * |
1154 | | * The default value is 15000 (15 seconds). |
1155 | | * |
1156 | | * Since: 2.10 |
1157 | | */ |
1158 | | void |
1159 | | g_thread_pool_set_max_idle_time (guint interval) |
1160 | 0 | { |
1161 | 0 | guint i; |
1162 | |
|
1163 | 0 | g_atomic_int_set (&max_idle_time, interval); |
1164 | |
|
1165 | 0 | i = (guint) g_atomic_int_get (&unused_threads); |
1166 | 0 | if (i > 0) |
1167 | 0 | { |
1168 | 0 | g_atomic_int_inc (&wakeup_thread_serial); |
1169 | 0 | g_async_queue_lock (unused_thread_queue); |
1170 | |
|
1171 | 0 | do |
1172 | 0 | { |
1173 | 0 | g_async_queue_push_unlocked (unused_thread_queue, |
1174 | 0 | wakeup_thread_marker); |
1175 | 0 | } |
1176 | 0 | while (--i); |
1177 | |
|
1178 | 0 | g_async_queue_unlock (unused_thread_queue); |
1179 | 0 | } |
1180 | 0 | } |
1181 | | |
1182 | | /** |
1183 | | * g_thread_pool_get_max_idle_time: |
1184 | | * |
1185 | | * This function will return the maximum @interval that a |
1186 | | * thread will wait in the thread pool for new tasks before |
1187 | | * being stopped. |
1188 | | * |
1189 | | * If this function returns 0, threads waiting in the thread |
1190 | | * pool for new work are not stopped. |
1191 | | * |
1192 | | * Returns: the maximum @interval (milliseconds) to wait |
1193 | | * for new tasks in the thread pool before stopping the |
1194 | | * thread |
1195 | | * |
1196 | | * Since: 2.10 |
1197 | | */ |
1198 | | guint |
1199 | | g_thread_pool_get_max_idle_time (void) |
1200 | 0 | { |
1201 | 0 | return (guint) g_atomic_int_get (&max_idle_time); |
1202 | 0 | } |