Coverage Report

Created: 2025-11-16 06:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/glib/glib/gthreadpool.c
Line
Count
Source
1
/* GLIB - Library of useful routines for C programming
2
 * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3
 *
4
 * GThreadPool: thread pool implementation.
5
 * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6
 *
7
 * SPDX-License-Identifier: LGPL-2.1-or-later
8
 *
9
 * This library is free software; you can redistribute it and/or
10
 * modify it under the terms of the GNU Lesser General Public
11
 * License as published by the Free Software Foundation; either
12
 * version 2.1 of the License, or (at your option) any later version.
13
 *
14
 * This library is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
 * Lesser General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU Lesser General Public
20
 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21
 */
22
23
/*
24
 * MT safe
25
 */
26
27
#include "config.h"
28
29
#include "gthreadpool.h"
30
31
#include "gasyncqueue.h"
32
#include "gasyncqueueprivate.h"
33
#include "glib-private.h"
34
#include "gmain.h"
35
#include "gtestutils.h"
36
#include "gthreadprivate.h"
37
#include "gtimer.h"
38
#include "gutils.h"
39
40
#define DEBUG_MSG(x)
41
/* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
42
43
typedef struct _GRealThreadPool GRealThreadPool;
44
45
/**
46
 * GThreadPool:
47
 * @func: the function to execute in the threads of this pool
48
 * @user_data: the user data for the threads of this pool
49
 * @exclusive: are all threads exclusive to this pool
50
 *
51
 * The `GThreadPool` struct represents a thread pool.
52
 *
53
 * A thread pool is useful when you wish to asynchronously fork out the execution of work
54
 * and continue working in your own thread. If that will happen often, the overhead of starting
55
 * and destroying a thread each time might be too high. In such cases reusing already started
56
 * threads seems like a good idea. And it indeed is, but implementing this can be tedious
57
 * and error-prone.
58
 *
59
 * Therefore GLib provides thread pools for your convenience. An added advantage is, that the
60
 * threads can be shared between the different subsystems of your program, when they are using GLib.
61
 *
62
 * To create a new thread pool, you use [func@GLib.ThreadPool.new].
63
 * It is destroyed by [method@GLib.ThreadPool.free].
64
 *
65
 * If you want to execute a certain task within a thread pool, use [method@GLib.ThreadPool.push].
66
 *
67
 * To get the current number of running threads you call [method@GLib.ThreadPool.get_num_threads].
68
 * To get the number of still unprocessed tasks you call [method@GLib.ThreadPool.unprocessed].
69
 * To control the maximum number of threads for a thread pool, you use
70
 * [method@GLib.ThreadPool.get_max_threads]. and [method@GLib.ThreadPool.set_max_threads].
71
 *
72
 * Finally you can control the number of unused threads, that are kept alive by GLib for future use.
73
 * The current number can be fetched with [func@GLib.ThreadPool.get_num_unused_threads].
74
 * The maximum number can be controlled by [func@GLib.ThreadPool.get_max_unused_threads] and
75
 * [func@GLib.ThreadPool.set_max_unused_threads]. All currently unused threads
76
 * can be stopped by calling [func@GLib.ThreadPool.stop_unused_threads].
77
 */
78
struct _GRealThreadPool
79
{
80
  GThreadPool pool;
81
  GAsyncQueue *queue;
82
  GCond cond;
83
  gint max_threads;
84
  guint num_threads;
85
  gboolean running;
86
  gboolean immediate;
87
  gboolean waiting;
88
  GCompareDataFunc sort_func;
89
  gpointer sort_user_data;
90
};
91
92
/* The following is just an address to mark the wakeup order for a
93
 * thread, it could be any address (as long, as it isn't a valid
94
 * GThreadPool address)
95
 */
96
static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
97
static gint wakeup_thread_serial = 0;
98
99
/* Here all unused threads are waiting  */
100
static GAsyncQueue *unused_thread_queue = NULL;
101
static gint unused_threads = 0;
102
static gint max_unused_threads = 8;
103
static gint kill_unused_threads = 0;
104
static guint max_idle_time = 15 * 1000;
105
106
static int thread_counter = 0;
107
108
typedef struct
109
{
110
  /* Either thread or error are set in the end. Both transfer-full. */
111
  GThreadPool *pool;
112
  GThread *thread;
113
  GError *error;
114
} SpawnThreadData;
115
116
static GCond spawn_thread_cond;
117
static GAsyncQueue *spawn_thread_queue;
118
119
static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
120
                                                           gpointer          data);
121
static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
122
static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
123
static gboolean         g_thread_pool_start_thread        (GRealThreadPool  *pool,
124
                                                           GError          **error);
125
static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
126
static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
127
static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
128
129
static void
130
g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
131
                                   gpointer         data)
132
0
{
133
0
  if (pool->sort_func)
134
0
    g_async_queue_push_sorted_unlocked (pool->queue,
135
0
                                        data,
136
0
                                        pool->sort_func,
137
0
                                        pool->sort_user_data);
138
0
  else
139
0
    g_async_queue_push_unlocked (pool->queue, data);
140
0
}
141
142
static GRealThreadPool*
143
g_thread_pool_wait_for_new_pool (void)
144
0
{
145
0
  GRealThreadPool *pool;
146
0
  gint local_wakeup_thread_serial;
147
0
  guint local_max_unused_threads;
148
0
  gint local_max_idle_time;
149
0
  gint last_wakeup_thread_serial;
150
0
  gboolean have_relayed_thread_marker = FALSE;
151
152
0
  local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
153
0
  local_max_idle_time = g_atomic_int_get (&max_idle_time);
154
0
  last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
155
156
0
  do
157
0
    {
158
0
      if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
159
0
        {
160
          /* If this is a superfluous thread, stop it. */
161
0
          pool = NULL;
162
0
        }
163
0
      else if (local_max_idle_time > 0)
164
0
        {
165
          /* If a maximal idle time is given, wait for the given time. */
166
0
          DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
167
0
                      g_thread_self (), local_max_idle_time / 1000.0));
168
169
0
          pool = g_async_queue_timeout_pop (unused_thread_queue,
170
0
              local_max_idle_time * 1000);
171
0
        }
172
0
      else
173
0
        {
174
          /* If no maximal idle time is given, wait indefinitely. */
175
0
          DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
176
0
          pool = g_async_queue_pop (unused_thread_queue);
177
0
        }
178
179
0
      if (pool == wakeup_thread_marker)
180
0
        {
181
0
          local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
182
0
          if (last_wakeup_thread_serial == local_wakeup_thread_serial)
183
0
            {
184
0
              if (!have_relayed_thread_marker)
185
0
              {
186
                /* If this wakeup marker has been received for
187
                 * the second time, relay it.
188
                 */
189
0
                DEBUG_MSG (("thread %p relaying wakeup message to "
190
0
                            "waiting thread with lower serial.",
191
0
                            g_thread_self ()));
192
193
0
                g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
194
0
                have_relayed_thread_marker = TRUE;
195
196
                /* If a wakeup marker has been relayed, this thread
197
                 * will get out of the way for 100 microseconds to
198
                 * avoid receiving this marker again.
199
                 */
200
0
                g_usleep (100);
201
0
              }
202
0
            }
203
0
          else
204
0
            {
205
0
              if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
206
0
                {
207
0
                  pool = NULL;
208
0
                  break;
209
0
                }
210
211
0
              DEBUG_MSG (("thread %p updating to new limits.",
212
0
                          g_thread_self ()));
213
214
0
              local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
215
0
              local_max_idle_time = g_atomic_int_get (&max_idle_time);
216
0
              last_wakeup_thread_serial = local_wakeup_thread_serial;
217
218
0
              have_relayed_thread_marker = FALSE;
219
0
            }
220
0
        }
221
0
    }
222
0
  while (pool == wakeup_thread_marker);
223
224
0
  return pool;
225
0
}
226
227
static gpointer
228
g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
229
0
{
230
0
  gpointer task = NULL;
231
232
0
  if (pool->running || (!pool->immediate &&
233
0
                        g_async_queue_length_unlocked (pool->queue) > 0))
234
0
    {
235
      /* This thread pool is still active. */
236
0
      if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads)
237
0
        {
238
          /* This is a superfluous thread, so it goes to the global pool. */
239
0
          DEBUG_MSG (("superfluous thread %p in pool %p.",
240
0
                      g_thread_self (), pool));
241
0
        }
242
0
      else if (pool->pool.exclusive)
243
0
        {
244
          /* Exclusive threads stay attached to the pool. */
245
0
          task = g_async_queue_pop_unlocked (pool->queue);
246
247
0
          DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
248
0
                      "(%d running, %d unprocessed).",
249
0
                      g_thread_self (), pool, pool->num_threads,
250
0
                      g_async_queue_length_unlocked (pool->queue)));
251
0
        }
252
0
      else
253
0
        {
254
          /* A thread will wait for new tasks for at most 1/2
255
           * second before going to the global pool.
256
           */
257
0
          DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
258
0
                      "(%d running, %d unprocessed).",
259
0
                      g_thread_self (), pool, pool->num_threads,
260
0
                      g_async_queue_length_unlocked (pool->queue)));
261
262
0
          task = g_async_queue_timeout_pop_unlocked (pool->queue,
263
0
                 G_USEC_PER_SEC / 2);
264
0
        }
265
0
    }
266
0
  else
267
0
    {
268
      /* This thread pool is inactive, it will no longer process tasks. */
269
0
      DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
270
0
                  "(running: %s, immediate: %s, len: %d).",
271
0
                  pool, g_thread_self (),
272
0
                  pool->running ? "true" : "false",
273
0
                  pool->immediate ? "true" : "false",
274
0
                  g_async_queue_length_unlocked (pool->queue)));
275
0
    }
276
277
0
  return task;
278
0
}
279
280
static gpointer
281
g_thread_pool_spawn_thread (gpointer data)
282
0
{
283
0
  while (TRUE)
284
0
    {
285
0
      SpawnThreadData *spawn_thread_data;
286
0
      GThread *thread = NULL;
287
0
      GError *error = NULL;
288
0
      gchar name[16];
289
290
0
      g_snprintf (name, sizeof (name), "pool-%d", g_atomic_int_add (&thread_counter, 1));
291
292
0
      g_async_queue_lock (spawn_thread_queue);
293
      /* Spawn a new thread for the given pool and wake the requesting thread
294
       * up again with the result. This new thread will have the scheduler
295
       * settings inherited from this thread and in extension of the thread
296
       * that created the first non-exclusive thread-pool. */
297
0
      spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue);
298
0
      thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error);
299
300
0
      spawn_thread_data->thread = g_steal_pointer (&thread);
301
0
      spawn_thread_data->error = g_steal_pointer (&error);
302
303
0
      g_cond_broadcast (&spawn_thread_cond);
304
0
      g_async_queue_unlock (spawn_thread_queue);
305
0
    }
306
307
0
  return NULL;
308
0
}
309
310
static gpointer
311
g_thread_pool_thread_proxy (gpointer data)
312
0
{
313
0
  GRealThreadPool *pool;
314
315
0
  pool = data;
316
317
0
  DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
318
319
0
  g_async_queue_lock (pool->queue);
320
321
0
  while (TRUE)
322
0
    {
323
0
      gpointer task;
324
325
0
      task = g_thread_pool_wait_for_new_task (pool);
326
0
      if (task)
327
0
        {
328
0
          if (pool->running || !pool->immediate)
329
0
            {
330
              /* A task was received and the thread pool is active,
331
               * so execute the function.
332
               */
333
0
              g_async_queue_unlock (pool->queue);
334
0
              DEBUG_MSG (("thread %p in pool %p calling func.",
335
0
                          g_thread_self (), pool));
336
0
              pool->pool.func (task, pool->pool.user_data);
337
0
              g_async_queue_lock (pool->queue);
338
0
            }
339
0
        }
340
0
      else
341
0
        {
342
          /* No task was received, so this thread goes to the global pool. */
343
0
          gboolean free_pool = FALSE;
344
345
0
          DEBUG_MSG (("thread %p leaving pool %p for global pool.",
346
0
                      g_thread_self (), pool));
347
0
          pool->num_threads--;
348
349
0
          if (!pool->running)
350
0
            {
351
0
              if (!pool->waiting)
352
0
                {
353
0
                  if (pool->num_threads == 0)
354
0
                    {
355
                      /* If the pool is not running and no other
356
                       * thread is waiting for this thread pool to
357
                       * finish and this is the last thread of this
358
                       * pool, free the pool.
359
                       */
360
0
                      free_pool = TRUE;
361
0
                    }
362
0
                  else
363
0
                    {
364
                      /* If the pool is not running and no other
365
                       * thread is waiting for this thread pool to
366
                       * finish and this is not the last thread of
367
                       * this pool and there are no tasks left in the
368
                       * queue, wakeup the remaining threads.
369
                       */
370
0
                      if (g_async_queue_length_unlocked (pool->queue) ==
371
0
                          (gint) -pool->num_threads)
372
0
                        g_thread_pool_wakeup_and_stop_all (pool);
373
0
                    }
374
0
                }
375
0
              else if (pool->immediate ||
376
0
                       g_async_queue_length_unlocked (pool->queue) <= 0)
377
0
                {
378
                  /* If the pool is not running and another thread is
379
                   * waiting for this thread pool to finish and there
380
                   * are either no tasks left or the pool shall stop
381
                   * immediately, inform the waiting thread of a change
382
                   * of the thread pool state.
383
                   */
384
0
                  g_cond_broadcast (&pool->cond);
385
0
                }
386
0
            }
387
388
0
          g_atomic_int_inc (&unused_threads);
389
0
          g_async_queue_unlock (pool->queue);
390
391
0
          if (free_pool)
392
0
            g_thread_pool_free_internal (pool);
393
394
0
          pool = g_thread_pool_wait_for_new_pool ();
395
0
          g_atomic_int_add (&unused_threads, -1);
396
397
0
          if (pool == NULL)
398
0
            break;
399
400
0
          g_async_queue_lock (pool->queue);
401
402
0
          DEBUG_MSG (("thread %p entering pool %p from global pool.",
403
0
                      g_thread_self (), pool));
404
405
          /* pool->num_threads++ is not done here, but in
406
           * g_thread_pool_start_thread to make the new started
407
           * thread known to the pool before itself can do it.
408
           */
409
0
        }
410
0
    }
411
412
0
  return NULL;
413
0
}
414
415
static gboolean
416
g_thread_pool_start_thread (GRealThreadPool  *pool,
417
                            GError          **error)
418
0
{
419
0
  gboolean success = FALSE;
420
421
0
  if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads)
422
    /* Enough threads are already running */
423
0
    return TRUE;
424
425
0
  g_async_queue_lock (unused_thread_queue);
426
427
0
  if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
428
0
    {
429
0
      g_async_queue_push_unlocked (unused_thread_queue, pool);
430
0
      success = TRUE;
431
0
    }
432
433
0
  g_async_queue_unlock (unused_thread_queue);
434
435
0
  if (!success)
436
0
    {
437
0
      GThread *thread;
438
439
      /* No thread was found, we have to start a new one */
440
0
      if (pool->pool.exclusive)
441
0
        {
442
          /* For exclusive thread-pools this is directly called from new() and
443
           * we simply start new threads that inherit the scheduler settings
444
           * from the current thread.
445
           */
446
0
          char name[16];
447
448
0
          g_snprintf (name, sizeof (name), "pool-%d", g_atomic_int_add (&thread_counter, 1));
449
450
0
          thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
451
0
        }
452
0
      else
453
0
        {
454
          /* For non-exclusive thread-pools this can be called at any time
455
           * when a new thread is needed. We make sure to create a new thread
456
           * here with the correct scheduler settings by going via our helper
457
           * thread.
458
           */
459
0
          SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL };
460
461
0
          g_async_queue_lock (spawn_thread_queue);
462
463
0
          g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data);
464
465
0
          while (!spawn_thread_data.thread && !spawn_thread_data.error)
466
0
            g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue));
467
468
0
          thread = spawn_thread_data.thread;
469
0
          if (!thread)
470
0
            g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error));
471
0
          g_async_queue_unlock (spawn_thread_queue);
472
0
        }
473
474
0
      if (thread == NULL)
475
0
        return FALSE;
476
477
0
      g_thread_unref (thread);
478
0
    }
479
480
  /* See comment in g_thread_pool_thread_proxy as to why this is done
481
   * here and not there
482
   */
483
0
  pool->num_threads++;
484
485
0
  return TRUE;
486
0
}
487
488
/**
489
 * g_thread_pool_new:
490
 * @func: a function to execute in the threads of the new thread pool
491
 * @user_data: user data that is handed over to @func every time it
492
 *     is called
493
 * @max_threads: the maximal number of threads to execute concurrently
494
 *     in  the new thread pool, -1 means no limit
495
 * @exclusive: should this thread pool be exclusive?
496
 * @error: return location for error, or %NULL
497
 *
498
 * This function creates a new thread pool.
499
 *
500
 * Whenever you call g_thread_pool_push(), either a new thread is
501
 * created or an unused one is reused. At most @max_threads threads
502
 * are running concurrently for this thread pool. @max_threads = -1
503
 * allows unlimited threads to be created for this thread pool. The
504
 * newly created or reused thread now executes the function @func
505
 * with the two arguments. The first one is the parameter to
506
 * g_thread_pool_push() and the second one is @user_data.
507
 *
508
 * Pass g_get_num_processors() to @max_threads to create as many threads as
509
 * there are logical processors on the system. This will not pin each thread to
510
 * a specific processor.
511
 *
512
 * The parameter @exclusive determines whether the thread pool owns
513
 * all threads exclusive or shares them with other thread pools.
514
 * If @exclusive is %TRUE, @max_threads threads are started
515
 * immediately and they will run exclusively for this thread pool
516
 * until it is destroyed by g_thread_pool_free(). If @exclusive is
517
 * %FALSE, threads are created when needed and shared between all
518
 * non-exclusive thread pools. This implies that @max_threads may
519
 * not be -1 for exclusive thread pools. Besides, exclusive thread
520
 * pools are not affected by g_thread_pool_set_max_idle_time()
521
 * since their threads are never considered idle and returned to the
522
 * global pool.
523
 *
524
 * Note that the threads used by exclusive thread pools will all inherit the
525
 * scheduler settings of the current thread while the threads used by
526
 * non-exclusive thread pools will inherit the scheduler settings from the
527
 * first thread that created such a thread pool.
528
 *
529
 * At least one thread will be spawned when this function is called, either to
530
 * create the @max_threads exclusive threads, or to preserve the scheduler
531
 * settings of the current thread for future spawns.
532
 *
533
 * @error can be %NULL to ignore errors, or non-%NULL to report
534
 * errors. An error can only occur when @exclusive is set to %TRUE
535
 * and not all @max_threads threads could be created.
536
 * See #GThreadError for possible errors that may occur.
537
 * Note, even in case of error a valid #GThreadPool is returned.
538
 *
539
 * Returns: the new #GThreadPool
540
 */
541
GThreadPool *
542
g_thread_pool_new (GFunc      func,
543
                   gpointer   user_data,
544
                   gint       max_threads,
545
                   gboolean   exclusive,
546
                   GError   **error)
547
0
{
548
0
  return g_thread_pool_new_full (func, user_data, NULL, max_threads, exclusive, error);
549
0
}
550
551
/**
552
 * g_thread_pool_new_full:
553
 * @func: a function to execute in the threads of the new thread pool
554
 * @user_data: user data that is handed over to @func every time it
555
 *     is called
556
 * @item_free_func: (nullable): used to pass as a free function to
557
 *     g_async_queue_new_full()
558
 * @max_threads: the maximal number of threads to execute concurrently
559
 *     in the new thread pool, `-1` means no limit
560
 * @exclusive: should this thread pool be exclusive?
561
 * @error: return location for error, or %NULL
562
 *
563
 * This function creates a new thread pool similar to g_thread_pool_new()
564
 * but allowing @item_free_func to be specified to free the data passed
565
 * to g_thread_pool_push() in the case that the #GThreadPool is stopped
566
 * and freed before all tasks have been executed.
567
 *
568
 * @item_free_func will *not* be called on items successfully passed to @func.
569
 * @func is responsible for freeing the items passed to it.
570
 *
571
 * Returns: (transfer full): the new #GThreadPool
572
 *
573
 * Since: 2.70
574
 */
575
GThreadPool *
576
g_thread_pool_new_full (GFunc           func,
577
                        gpointer        user_data,
578
                        GDestroyNotify  item_free_func,
579
                        gint            max_threads,
580
                        gboolean        exclusive,
581
                        GError        **error)
582
0
{
583
0
  GRealThreadPool *retval;
584
0
  G_LOCK_DEFINE_STATIC (init);
585
586
0
  g_return_val_if_fail (func, NULL);
587
0
  g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
588
0
  g_return_val_if_fail (max_threads >= -1, NULL);
589
590
0
  retval = g_new (GRealThreadPool, 1);
591
592
0
  retval->pool.func = func;
593
0
  retval->pool.user_data = user_data;
594
0
  retval->pool.exclusive = exclusive;
595
0
  retval->queue = g_async_queue_new_full (item_free_func);
596
0
  g_cond_init (&retval->cond);
597
0
  retval->max_threads = max_threads;
598
0
  retval->num_threads = 0;
599
0
  retval->running = TRUE;
600
0
  retval->immediate = FALSE;
601
0
  retval->waiting = FALSE;
602
0
  retval->sort_func = NULL;
603
0
  retval->sort_user_data = NULL;
604
605
0
  G_LOCK (init);
606
0
  if (!unused_thread_queue)
607
0
      unused_thread_queue = g_async_queue_new ();
608
609
  /*
610
   * Spawn a helper thread that is only responsible for spawning new threads
611
   * with the scheduler settings of the current thread.
612
   *
613
   * This is then used for making sure that all threads created on the
614
   * non-exclusive thread-pool have the same scheduler settings, and more
615
   * importantly don't just inherit them from the thread that just happened to
616
   * push a new task and caused a new thread to be created.
617
   *
618
   * Not doing so could cause real-time priority threads or otherwise
619
   * threads with problematic scheduler settings to be part of the
620
   * non-exclusive thread-pools.
621
   *
622
   * For exclusive thread-pools this is not required as all threads are
623
   * created immediately below and are running forever, so they will
624
   * automatically inherit the scheduler settings from this very thread.
625
   */
626
0
  if (!exclusive && !spawn_thread_queue)
627
0
    {
628
0
      GThread *pool_spawner = NULL;
629
630
0
      spawn_thread_queue = g_async_queue_new ();
631
0
      g_cond_init (&spawn_thread_cond);
632
0
      pool_spawner = g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL);
633
0
      g_ignore_leak (pool_spawner);
634
0
    }
635
0
  G_UNLOCK (init);
636
637
0
  if (retval->pool.exclusive)
638
0
    {
639
0
      g_async_queue_lock (retval->queue);
640
641
0
      while (retval->num_threads < (guint) retval->max_threads)
642
0
        {
643
0
          GError *local_error = NULL;
644
645
0
          if (!g_thread_pool_start_thread (retval, &local_error))
646
0
            {
647
0
              g_propagate_error (error, local_error);
648
0
              break;
649
0
            }
650
0
        }
651
652
0
      g_async_queue_unlock (retval->queue);
653
0
    }
654
655
0
  return (GThreadPool*) retval;
656
0
}
657
658
/**
659
 * g_thread_pool_push:
660
 * @pool: a #GThreadPool
661
 * @data: a new task for @pool
662
 * @error: return location for error, or %NULL
663
 *
664
 * Inserts @data into the list of tasks to be executed by @pool.
665
 *
666
 * When the number of currently running threads is lower than the
667
 * maximal allowed number of threads, a new thread is started (or
668
 * reused) with the properties given to g_thread_pool_new().
669
 * Otherwise, @data stays in the queue until a thread in this pool
670
 * finishes its previous task and processes @data.
671
 *
672
 * @error can be %NULL to ignore errors, or non-%NULL to report
673
 * errors. An error can only occur when a new thread couldn't be
674
 * created. In that case @data is simply appended to the queue of
675
 * work to do.
676
 *
677
 * Before version 2.32, this function did not return a success status.
678
 *
679
 * Returns: %TRUE on success, %FALSE if an error occurred
680
 */
681
gboolean
682
g_thread_pool_push (GThreadPool  *pool,
683
                    gpointer      data,
684
                    GError      **error)
685
0
{
686
0
  GRealThreadPool *real;
687
0
  gboolean result;
688
689
0
  real = (GRealThreadPool*) pool;
690
691
0
  g_return_val_if_fail (real, FALSE);
692
0
  g_return_val_if_fail (real->running, FALSE);
693
694
0
  result = TRUE;
695
696
0
  g_async_queue_lock (real->queue);
697
698
0
  if (g_async_queue_length_unlocked (real->queue) >= 0)
699
0
    {
700
      /* No thread is waiting in the queue */
701
0
      GError *local_error = NULL;
702
703
0
      if (!g_thread_pool_start_thread (real, &local_error))
704
0
        {
705
0
          g_propagate_error (error, local_error);
706
0
          result = FALSE;
707
0
        }
708
0
    }
709
710
0
  g_thread_pool_queue_push_unlocked (real, data);
711
0
  g_async_queue_unlock (real->queue);
712
713
0
  return result;
714
0
}
715
716
/**
717
 * g_thread_pool_set_max_threads:
718
 * @pool: a #GThreadPool
719
 * @max_threads: a new maximal number of threads for @pool,
720
 *     or -1 for unlimited
721
 * @error: return location for error, or %NULL
722
 *
723
 * Sets the maximal allowed number of threads for @pool.
724
 * A value of -1 means that the maximal number of threads
725
 * is unlimited. If @pool is an exclusive thread pool, setting
726
 * the maximal number of threads to -1 is not allowed.
727
 *
728
 * Setting @max_threads to 0 means stopping all work for @pool.
729
 * It is effectively frozen until @max_threads is set to a non-zero
730
 * value again.
731
 *
732
 * A thread is never terminated while calling @func, as supplied by
733
 * g_thread_pool_new(). Instead the maximal number of threads only
734
 * has effect for the allocation of new threads in g_thread_pool_push().
735
 * A new thread is allocated, whenever the number of currently
736
 * running threads in @pool is smaller than the maximal number.
737
 *
738
 * @error can be %NULL to ignore errors, or non-%NULL to report
739
 * errors. An error can only occur when a new thread couldn't be
740
 * created.
741
 *
742
 * Before version 2.32, this function did not return a success status.
743
 *
744
 * Returns: %TRUE on success, %FALSE if an error occurred
745
 */
746
gboolean
747
g_thread_pool_set_max_threads (GThreadPool  *pool,
748
                               gint          max_threads,
749
                               GError      **error)
750
0
{
751
0
  GRealThreadPool *real;
752
0
  gint to_start;
753
0
  gboolean result;
754
755
0
  real = (GRealThreadPool*) pool;
756
757
0
  g_return_val_if_fail (real, FALSE);
758
0
  g_return_val_if_fail (real->running, FALSE);
759
0
  g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
760
0
  g_return_val_if_fail (max_threads >= -1, FALSE);
761
762
0
  result = TRUE;
763
764
0
  g_async_queue_lock (real->queue);
765
766
0
  real->max_threads = max_threads;
767
768
0
  if (pool->exclusive)
769
0
    to_start = real->max_threads - real->num_threads;
770
0
  else
771
0
    to_start = g_async_queue_length_unlocked (real->queue);
772
773
0
  for ( ; to_start > 0; to_start--)
774
0
    {
775
0
      GError *local_error = NULL;
776
777
0
      if (!g_thread_pool_start_thread (real, &local_error))
778
0
        {
779
0
          g_propagate_error (error, local_error);
780
0
          result = FALSE;
781
0
          break;
782
0
        }
783
0
    }
784
785
0
  g_async_queue_unlock (real->queue);
786
787
0
  return result;
788
0
}
789
790
/**
791
 * g_thread_pool_get_max_threads:
792
 * @pool: a #GThreadPool
793
 *
794
 * Returns the maximal number of threads for @pool.
795
 *
796
 * Returns: the maximal number of threads
797
 */
798
gint
799
g_thread_pool_get_max_threads (GThreadPool *pool)
800
0
{
801
0
  GRealThreadPool *real;
802
0
  gint retval;
803
804
0
  real = (GRealThreadPool*) pool;
805
806
0
  g_return_val_if_fail (real, 0);
807
0
  g_return_val_if_fail (real->running, 0);
808
809
0
  g_async_queue_lock (real->queue);
810
0
  retval = real->max_threads;
811
0
  g_async_queue_unlock (real->queue);
812
813
0
  return retval;
814
0
}
815
816
/**
817
 * g_thread_pool_get_num_threads:
818
 * @pool: a #GThreadPool
819
 *
820
 * Returns the number of threads currently running in @pool.
821
 *
822
 * Returns: the number of threads currently running
823
 */
824
guint
825
g_thread_pool_get_num_threads (GThreadPool *pool)
826
0
{
827
0
  GRealThreadPool *real;
828
0
  guint retval;
829
830
0
  real = (GRealThreadPool*) pool;
831
832
0
  g_return_val_if_fail (real, 0);
833
0
  g_return_val_if_fail (real->running, 0);
834
835
0
  g_async_queue_lock (real->queue);
836
0
  retval = real->num_threads;
837
0
  g_async_queue_unlock (real->queue);
838
839
0
  return retval;
840
0
}
841
842
/**
843
 * g_thread_pool_unprocessed:
844
 * @pool: a #GThreadPool
845
 *
846
 * Returns the number of tasks still unprocessed in @pool.
847
 *
848
 * Returns: the number of unprocessed tasks
849
 */
850
guint
851
g_thread_pool_unprocessed (GThreadPool *pool)
852
0
{
853
0
  GRealThreadPool *real;
854
0
  gint unprocessed;
855
856
0
  real = (GRealThreadPool*) pool;
857
858
0
  g_return_val_if_fail (real, 0);
859
0
  g_return_val_if_fail (real->running, 0);
860
861
0
  unprocessed = g_async_queue_length (real->queue);
862
863
0
  return MAX (unprocessed, 0);
864
0
}
865
866
/**
867
 * g_thread_pool_free:
868
 * @pool: a #GThreadPool
869
 * @immediate: should @pool shut down immediately?
870
 * @wait_: should the function wait for all tasks to be finished?
871
 *
872
 * Frees all resources allocated for @pool.
873
 *
874
 * If @immediate is %TRUE, no new task is processed for @pool.
875
 * Otherwise @pool is not freed before the last task is processed.
876
 * Note however, that no thread of this pool is interrupted while
877
 * processing a task. Instead at least all still running threads
878
 * can finish their tasks before the @pool is freed.
879
 *
880
 * If @wait_ is %TRUE, this function does not return before all
881
 * tasks to be processed (dependent on @immediate, whether all
882
 * or only the currently running) are ready.
883
 * Otherwise this function returns immediately.
884
 *
885
 * After calling this function @pool must not be used anymore.
886
 */
887
void
888
g_thread_pool_free (GThreadPool *pool,
889
                    gboolean     immediate,
890
                    gboolean     wait_)
891
0
{
892
0
  GRealThreadPool *real;
893
894
0
  real = (GRealThreadPool*) pool;
895
896
0
  g_return_if_fail (real);
897
0
  g_return_if_fail (real->running);
898
899
  /* If there's no thread allowed here, there is not much sense in
900
   * not stopping this pool immediately, when it's not empty
901
   */
902
0
  g_return_if_fail (immediate ||
903
0
                    real->max_threads != 0 ||
904
0
                    g_async_queue_length (real->queue) == 0);
905
906
0
  g_async_queue_lock (real->queue);
907
908
0
  real->running = FALSE;
909
0
  real->immediate = immediate;
910
0
  real->waiting = wait_;
911
912
0
  if (wait_)
913
0
    {
914
0
      while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads &&
915
0
             !(immediate && real->num_threads == 0))
916
0
        g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue));
917
0
    }
918
919
0
  if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads)
920
0
    {
921
      /* No thread is currently doing something (and nothing is left
922
       * to process in the queue)
923
       */
924
0
      if (real->num_threads == 0)
925
0
        {
926
          /* No threads left, we clean up */
927
0
          g_async_queue_unlock (real->queue);
928
0
          g_thread_pool_free_internal (real);
929
0
          return;
930
0
        }
931
932
0
      g_thread_pool_wakeup_and_stop_all (real);
933
0
    }
934
935
  /* The last thread should cleanup the pool */
936
0
  real->waiting = FALSE;
937
0
  g_async_queue_unlock (real->queue);
938
0
}
939
940
static void
941
g_thread_pool_free_internal (GRealThreadPool* pool)
942
0
{
943
0
  g_return_if_fail (pool);
944
0
  g_return_if_fail (pool->running == FALSE);
945
0
  g_return_if_fail (pool->num_threads == 0);
946
947
  /* Ensure the dummy item pushed on by g_thread_pool_wakeup_and_stop_all() is
948
   * removed, before it’s potentially passed to the user-provided
949
   * @item_free_func. */
950
0
  g_async_queue_remove (pool->queue, GUINT_TO_POINTER (1));
951
952
0
  g_async_queue_unref (pool->queue);
953
0
  g_cond_clear (&pool->cond);
954
955
0
  g_free (pool);
956
0
}
957
958
static void
959
g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
960
0
{
961
0
  guint i;
962
963
0
  g_return_if_fail (pool);
964
0
  g_return_if_fail (pool->running == FALSE);
965
0
  g_return_if_fail (pool->num_threads != 0);
966
967
0
  pool->immediate = TRUE;
968
969
  /*
970
   * So here we're sending bogus data to the pool threads, which
971
   * should cause them each to wake up, and check the above
972
   * pool->immediate condition. However we don't want that
973
   * data to be sorted (since it'll crash the sorter).
974
   */
975
0
  for (i = 0; i < pool->num_threads; i++)
976
0
    g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
977
0
}
978
979
/**
980
 * g_thread_pool_set_max_unused_threads:
981
 * @max_threads: maximal number of unused threads
982
 *
983
 * Sets the maximal number of unused threads to @max_threads.
984
 * If @max_threads is -1, no limit is imposed on the number
985
 * of unused threads.
986
 *
987
 * The default value is 8 since GLib 2.84. Previously the default value was 2.
988
 */
989
void
990
g_thread_pool_set_max_unused_threads (gint max_threads)
991
0
{
992
0
  g_return_if_fail (max_threads >= -1);
993
994
0
  g_atomic_int_set (&max_unused_threads, max_threads);
995
996
0
  if (max_threads != -1)
997
0
    {
998
0
      max_threads -= g_atomic_int_get (&unused_threads);
999
0
      if (max_threads < 0)
1000
0
        {
1001
0
          g_atomic_int_set (&kill_unused_threads, -max_threads);
1002
0
          g_atomic_int_inc (&wakeup_thread_serial);
1003
1004
0
          g_async_queue_lock (unused_thread_queue);
1005
1006
0
          do
1007
0
            {
1008
0
              g_async_queue_push_unlocked (unused_thread_queue,
1009
0
                                           wakeup_thread_marker);
1010
0
            }
1011
0
          while (++max_threads);
1012
1013
0
          g_async_queue_unlock (unused_thread_queue);
1014
0
        }
1015
0
    }
1016
0
}
1017
1018
/**
1019
 * g_thread_pool_get_max_unused_threads:
1020
 *
1021
 * Returns the maximal allowed number of unused threads.
1022
 *
1023
 * Returns: the maximal number of unused threads
1024
 */
1025
gint
1026
g_thread_pool_get_max_unused_threads (void)
1027
0
{
1028
0
  return g_atomic_int_get (&max_unused_threads);
1029
0
}
1030
1031
/**
1032
 * g_thread_pool_get_num_unused_threads:
1033
 *
1034
 * Returns the number of currently unused threads.
1035
 *
1036
 * Returns: the number of currently unused threads
1037
 */
1038
guint
1039
g_thread_pool_get_num_unused_threads (void)
1040
0
{
1041
0
  return (guint) g_atomic_int_get (&unused_threads);
1042
0
}
1043
1044
/**
1045
 * g_thread_pool_stop_unused_threads:
1046
 *
1047
 * Stops all currently unused threads. This does not change the
1048
 * maximal number of unused threads. This function can be used to
1049
 * regularly stop all unused threads e.g. from g_timeout_add().
1050
 */
1051
void
1052
g_thread_pool_stop_unused_threads (void)
1053
0
{
1054
0
  guint oldval;
1055
1056
0
  oldval = g_thread_pool_get_max_unused_threads ();
1057
1058
0
  g_thread_pool_set_max_unused_threads (0);
1059
0
  g_thread_pool_set_max_unused_threads (oldval);
1060
0
}
1061
1062
/**
1063
 * g_thread_pool_set_sort_function:
1064
 * @pool: a #GThreadPool
1065
 * @func: the #GCompareDataFunc used to sort the list of tasks.
1066
 *     This function is passed two tasks. It should return
1067
 *     0 if the order in which they are handled does not matter,
1068
 *     a negative value if the first task should be processed before
1069
 *     the second or a positive value if the second task should be
1070
 *     processed first.
1071
 * @user_data: user data passed to @func
1072
 *
1073
 * Sets the function used to sort the list of tasks. This allows the
1074
 * tasks to be processed by a priority determined by @func, and not
1075
 * just in the order in which they were added to the pool.
1076
 *
1077
 * Note, if the maximum number of threads is more than 1, the order
1078
 * that threads are executed cannot be guaranteed 100%. Threads are
1079
 * scheduled by the operating system and are executed at random. It
1080
 * cannot be assumed that threads are executed in the order they are
1081
 * created.
1082
 *
1083
 * Since: 2.10
1084
 */
1085
void
1086
g_thread_pool_set_sort_function (GThreadPool      *pool,
1087
                                 GCompareDataFunc  func,
1088
                                 gpointer          user_data)
1089
0
{
1090
0
  GRealThreadPool *real;
1091
1092
0
  real = (GRealThreadPool*) pool;
1093
1094
0
  g_return_if_fail (real);
1095
0
  g_return_if_fail (real->running);
1096
1097
0
  g_async_queue_lock (real->queue);
1098
1099
0
  real->sort_func = func;
1100
0
  real->sort_user_data = user_data;
1101
1102
0
  if (func)
1103
0
    g_async_queue_sort_unlocked (real->queue,
1104
0
                                 real->sort_func,
1105
0
                                 real->sort_user_data);
1106
1107
0
  g_async_queue_unlock (real->queue);
1108
0
}
1109
1110
/**
1111
 * g_thread_pool_move_to_front:
1112
 * @pool: a #GThreadPool
1113
 * @data: an unprocessed item in the pool
1114
 *
1115
 * Moves the item to the front of the queue of unprocessed
1116
 * items, so that it will be processed next.
1117
 *
1118
 * Returns: %TRUE if the item was found and moved
1119
 *
1120
 * Since: 2.46
1121
 */
1122
gboolean
1123
g_thread_pool_move_to_front (GThreadPool *pool,
1124
                             gpointer     data)
1125
0
{
1126
0
  GRealThreadPool *real = (GRealThreadPool*) pool;
1127
0
  gboolean found;
1128
1129
0
  g_async_queue_lock (real->queue);
1130
1131
0
  found = g_async_queue_remove_unlocked (real->queue, data);
1132
0
  if (found)
1133
0
    g_async_queue_push_front_unlocked (real->queue, data);
1134
1135
0
  g_async_queue_unlock (real->queue);
1136
1137
0
  return found;
1138
0
}
1139
1140
/**
1141
 * g_thread_pool_set_max_idle_time:
1142
 * @interval: the maximum @interval (in milliseconds)
1143
 *     a thread can be idle
1144
 *
1145
 * This function will set the maximum @interval that a thread
1146
 * waiting in the pool for new tasks can be idle for before
1147
 * being stopped. This function is similar to calling
1148
 * g_thread_pool_stop_unused_threads() on a regular timeout,
1149
 * except this is done on a per thread basis.
1150
 *
1151
 * By setting @interval to 0, idle threads will not be stopped.
1152
 *
1153
 * The default value is 15000 (15 seconds).
1154
 *
1155
 * Since: 2.10
1156
 */
1157
void
1158
g_thread_pool_set_max_idle_time (guint interval)
1159
0
{
1160
0
  guint i;
1161
1162
0
  g_atomic_int_set (&max_idle_time, interval);
1163
1164
0
  i = (guint) g_atomic_int_get (&unused_threads);
1165
0
  if (i > 0)
1166
0
    {
1167
0
      g_atomic_int_inc (&wakeup_thread_serial);
1168
0
      g_async_queue_lock (unused_thread_queue);
1169
1170
0
      do
1171
0
        {
1172
0
          g_async_queue_push_unlocked (unused_thread_queue,
1173
0
                                       wakeup_thread_marker);
1174
0
        }
1175
0
      while (--i);
1176
1177
0
      g_async_queue_unlock (unused_thread_queue);
1178
0
    }
1179
0
}
1180
1181
/**
1182
 * g_thread_pool_get_max_idle_time:
1183
 *
1184
 * This function will return the maximum @interval that a
1185
 * thread will wait in the thread pool for new tasks before
1186
 * being stopped.
1187
 *
1188
 * If this function returns 0, threads waiting in the thread
1189
 * pool for new work are not stopped.
1190
 *
1191
 * Returns: the maximum @interval (milliseconds) to wait
1192
 *     for new tasks in the thread pool before stopping the
1193
 *     thread
1194
 *
1195
 * Since: 2.10
1196
 */
1197
guint
1198
g_thread_pool_get_max_idle_time (void)
1199
0
{
1200
  return (guint) g_atomic_int_get (&max_idle_time);
1201
0
}