Coverage Report

Created: 2025-10-13 06:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rauc/subprojects/glib-2.76.5/glib/gthreadpool.c
Line
Count
Source
1
/* GLIB - Library of useful routines for C programming
2
 * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3
 *
4
 * GThreadPool: thread pool implementation.
5
 * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6
 *
7
 * SPDX-License-Identifier: LGPL-2.1-or-later
8
 *
9
 * This library is free software; you can redistribute it and/or
10
 * modify it under the terms of the GNU Lesser General Public
11
 * License as published by the Free Software Foundation; either
12
 * version 2.1 of the License, or (at your option) any later version.
13
 *
14
 * This library is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
 * Lesser General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU Lesser General Public
20
 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21
 */
22
23
/*
24
 * MT safe
25
 */
26
27
#include "config.h"
28
29
#include "gthreadpool.h"
30
31
#include "gasyncqueue.h"
32
#include "gasyncqueueprivate.h"
33
#include "glib-private.h"
34
#include "gmain.h"
35
#include "gtestutils.h"
36
#include "gthreadprivate.h"
37
#include "gtimer.h"
38
#include "gutils.h"
39
40
/**
41
 * SECTION:thread_pools
42
 * @title: Thread Pools
43
 * @short_description: pools of threads to execute work concurrently
44
 * @see_also: #GThread
45
 *
46
 * Sometimes you wish to asynchronously fork out the execution of work
47
 * and continue working in your own thread. If that will happen often,
48
 * the overhead of starting and destroying a thread each time might be
49
 * too high. In such cases reusing already started threads seems like a
50
 * good idea. And it indeed is, but implementing this can be tedious
51
 * and error-prone.
52
 *
53
 * Therefore GLib provides thread pools for your convenience. An added
54
 * advantage is, that the threads can be shared between the different
55
 * subsystems of your program, when they are using GLib.
56
 *
57
 * To create a new thread pool, you use g_thread_pool_new().
58
 * It is destroyed by g_thread_pool_free().
59
 *
60
 * If you want to execute a certain task within a thread pool,
61
 * you call g_thread_pool_push().
62
 *
63
 * To get the current number of running threads you call
64
 * g_thread_pool_get_num_threads(). To get the number of still
65
 * unprocessed tasks you call g_thread_pool_unprocessed(). To control
66
 * the maximal number of threads for a thread pool, you use
67
 * g_thread_pool_get_max_threads() and g_thread_pool_set_max_threads().
68
 *
69
 * Finally you can control the number of unused threads, that are kept
70
 * alive by GLib for future use. The current number can be fetched with
71
 * g_thread_pool_get_num_unused_threads(). The maximal number can be
72
 * controlled by g_thread_pool_get_max_unused_threads() and
73
 * g_thread_pool_set_max_unused_threads(). All currently unused threads
74
 * can be stopped by calling g_thread_pool_stop_unused_threads().
75
 */
76
77
#define DEBUG_MSG(x)
78
/* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
79
80
typedef struct _GRealThreadPool GRealThreadPool;
81
82
/**
83
 * GThreadPool:
84
 * @func: the function to execute in the threads of this pool
85
 * @user_data: the user data for the threads of this pool
86
 * @exclusive: are all threads exclusive to this pool
87
 *
88
 * The #GThreadPool struct represents a thread pool. It has three
89
 * public read-only members, but the underlying struct is bigger,
90
 * so you must not copy this struct.
91
 */
92
struct _GRealThreadPool
93
{
94
  GThreadPool pool;
95
  GAsyncQueue *queue;
96
  GCond cond;
97
  gint max_threads;
98
  guint num_threads;
99
  gboolean running;
100
  gboolean immediate;
101
  gboolean waiting;
102
  GCompareDataFunc sort_func;
103
  gpointer sort_user_data;
104
};
105
106
/* The following is just an address to mark the wakeup order for a
107
 * thread, it could be any address (as long, as it isn't a valid
108
 * GThreadPool address)
109
 */
110
static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
111
static gint wakeup_thread_serial = 0;
112
113
/* Here all unused threads are waiting  */
114
static GAsyncQueue *unused_thread_queue = NULL;
115
static gint unused_threads = 0;
116
static gint max_unused_threads = 2;
117
static gint kill_unused_threads = 0;
118
static guint max_idle_time = 15 * 1000;
119
120
typedef struct
121
{
122
  /* Either thread or error are set in the end. Both transfer-full. */
123
  GThreadPool *pool;
124
  GThread *thread;
125
  GError *error;
126
} SpawnThreadData;
127
128
static GCond spawn_thread_cond;
129
static GAsyncQueue *spawn_thread_queue;
130
131
static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
132
                                                           gpointer          data);
133
static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
134
static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
135
static gboolean         g_thread_pool_start_thread        (GRealThreadPool  *pool,
136
                                                           GError          **error);
137
static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
138
static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
139
static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
140
141
static void
142
g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
143
                                   gpointer         data)
144
0
{
145
0
  if (pool->sort_func)
146
0
    g_async_queue_push_sorted_unlocked (pool->queue,
147
0
                                        data,
148
0
                                        pool->sort_func,
149
0
                                        pool->sort_user_data);
150
0
  else
151
0
    g_async_queue_push_unlocked (pool->queue, data);
152
0
}
153
154
static GRealThreadPool*
155
g_thread_pool_wait_for_new_pool (void)
156
0
{
157
0
  GRealThreadPool *pool;
158
0
  gint local_wakeup_thread_serial;
159
0
  guint local_max_unused_threads;
160
0
  gint local_max_idle_time;
161
0
  gint last_wakeup_thread_serial;
162
0
  gboolean have_relayed_thread_marker = FALSE;
163
164
0
  local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
165
0
  local_max_idle_time = g_atomic_int_get (&max_idle_time);
166
0
  last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
167
168
0
  do
169
0
    {
170
0
      if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
171
0
        {
172
          /* If this is a superfluous thread, stop it. */
173
0
          pool = NULL;
174
0
        }
175
0
      else if (local_max_idle_time > 0)
176
0
        {
177
          /* If a maximal idle time is given, wait for the given time. */
178
0
          DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
179
0
                      g_thread_self (), local_max_idle_time / 1000.0));
180
181
0
          pool = g_async_queue_timeout_pop (unused_thread_queue,
182
0
              local_max_idle_time * 1000);
183
0
        }
184
0
      else
185
0
        {
186
          /* If no maximal idle time is given, wait indefinitely. */
187
0
          DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
188
0
          pool = g_async_queue_pop (unused_thread_queue);
189
0
        }
190
191
0
      if (pool == wakeup_thread_marker)
192
0
        {
193
0
          local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
194
0
          if (last_wakeup_thread_serial == local_wakeup_thread_serial)
195
0
            {
196
0
              if (!have_relayed_thread_marker)
197
0
              {
198
                /* If this wakeup marker has been received for
199
                 * the second time, relay it.
200
                 */
201
0
                DEBUG_MSG (("thread %p relaying wakeup message to "
202
0
                            "waiting thread with lower serial.",
203
0
                            g_thread_self ()));
204
205
0
                g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
206
0
                have_relayed_thread_marker = TRUE;
207
208
                /* If a wakeup marker has been relayed, this thread
209
                 * will get out of the way for 100 microseconds to
210
                 * avoid receiving this marker again.
211
                 */
212
0
                g_usleep (100);
213
0
              }
214
0
            }
215
0
          else
216
0
            {
217
0
              if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
218
0
                {
219
0
                  pool = NULL;
220
0
                  break;
221
0
                }
222
223
0
              DEBUG_MSG (("thread %p updating to new limits.",
224
0
                          g_thread_self ()));
225
226
0
              local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
227
0
              local_max_idle_time = g_atomic_int_get (&max_idle_time);
228
0
              last_wakeup_thread_serial = local_wakeup_thread_serial;
229
230
0
              have_relayed_thread_marker = FALSE;
231
0
            }
232
0
        }
233
0
    }
234
0
  while (pool == wakeup_thread_marker);
235
236
0
  return pool;
237
0
}
238
239
static gpointer
240
g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
241
0
{
242
0
  gpointer task = NULL;
243
244
0
  if (pool->running || (!pool->immediate &&
245
0
                        g_async_queue_length_unlocked (pool->queue) > 0))
246
0
    {
247
      /* This thread pool is still active. */
248
0
      if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads)
249
0
        {
250
          /* This is a superfluous thread, so it goes to the global pool. */
251
0
          DEBUG_MSG (("superfluous thread %p in pool %p.",
252
0
                      g_thread_self (), pool));
253
0
        }
254
0
      else if (pool->pool.exclusive)
255
0
        {
256
          /* Exclusive threads stay attached to the pool. */
257
0
          task = g_async_queue_pop_unlocked (pool->queue);
258
259
0
          DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
260
0
                      "(%d running, %d unprocessed).",
261
0
                      g_thread_self (), pool, pool->num_threads,
262
0
                      g_async_queue_length_unlocked (pool->queue)));
263
0
        }
264
0
      else
265
0
        {
266
          /* A thread will wait for new tasks for at most 1/2
267
           * second before going to the global pool.
268
           */
269
0
          DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
270
0
                      "(%d running, %d unprocessed).",
271
0
                      g_thread_self (), pool, pool->num_threads,
272
0
                      g_async_queue_length_unlocked (pool->queue)));
273
274
0
          task = g_async_queue_timeout_pop_unlocked (pool->queue,
275
0
                 G_USEC_PER_SEC / 2);
276
0
        }
277
0
    }
278
0
  else
279
0
    {
280
      /* This thread pool is inactive, it will no longer process tasks. */
281
0
      DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
282
0
                  "(running: %s, immediate: %s, len: %d).",
283
0
                  pool, g_thread_self (),
284
0
                  pool->running ? "true" : "false",
285
0
                  pool->immediate ? "true" : "false",
286
0
                  g_async_queue_length_unlocked (pool->queue)));
287
0
    }
288
289
0
  return task;
290
0
}
291
292
static gpointer
293
g_thread_pool_spawn_thread (gpointer data)
294
0
{
295
0
  while (TRUE)
296
0
    {
297
0
      SpawnThreadData *spawn_thread_data;
298
0
      GThread *thread = NULL;
299
0
      GError *error = NULL;
300
0
      const gchar *prgname = g_get_prgname ();
301
0
      gchar name[16] = "pool";
302
303
0
      if (prgname)
304
0
        g_snprintf (name, sizeof (name), "pool-%s", prgname);
305
306
0
      g_async_queue_lock (spawn_thread_queue);
307
      /* Spawn a new thread for the given pool and wake the requesting thread
308
       * up again with the result. This new thread will have the scheduler
309
       * settings inherited from this thread and in extension of the thread
310
       * that created the first non-exclusive thread-pool. */
311
0
      spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue);
312
0
      thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error);
313
314
0
      spawn_thread_data->thread = g_steal_pointer (&thread);
315
0
      spawn_thread_data->error = g_steal_pointer (&error);
316
317
0
      g_cond_broadcast (&spawn_thread_cond);
318
0
      g_async_queue_unlock (spawn_thread_queue);
319
0
    }
320
321
0
  return NULL;
322
0
}
323
324
static gpointer
325
g_thread_pool_thread_proxy (gpointer data)
326
0
{
327
0
  GRealThreadPool *pool;
328
329
0
  pool = data;
330
331
0
  DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
332
333
0
  g_async_queue_lock (pool->queue);
334
335
0
  while (TRUE)
336
0
    {
337
0
      gpointer task;
338
339
0
      task = g_thread_pool_wait_for_new_task (pool);
340
0
      if (task)
341
0
        {
342
0
          if (pool->running || !pool->immediate)
343
0
            {
344
              /* A task was received and the thread pool is active,
345
               * so execute the function.
346
               */
347
0
              g_async_queue_unlock (pool->queue);
348
0
              DEBUG_MSG (("thread %p in pool %p calling func.",
349
0
                          g_thread_self (), pool));
350
0
              pool->pool.func (task, pool->pool.user_data);
351
0
              g_async_queue_lock (pool->queue);
352
0
            }
353
0
        }
354
0
      else
355
0
        {
356
          /* No task was received, so this thread goes to the global pool. */
357
0
          gboolean free_pool = FALSE;
358
359
0
          DEBUG_MSG (("thread %p leaving pool %p for global pool.",
360
0
                      g_thread_self (), pool));
361
0
          pool->num_threads--;
362
363
0
          if (!pool->running)
364
0
            {
365
0
              if (!pool->waiting)
366
0
                {
367
0
                  if (pool->num_threads == 0)
368
0
                    {
369
                      /* If the pool is not running and no other
370
                       * thread is waiting for this thread pool to
371
                       * finish and this is the last thread of this
372
                       * pool, free the pool.
373
                       */
374
0
                      free_pool = TRUE;
375
0
                    }
376
0
                  else
377
0
                    {
378
                      /* If the pool is not running and no other
379
                       * thread is waiting for this thread pool to
380
                       * finish and this is not the last thread of
381
                       * this pool and there are no tasks left in the
382
                       * queue, wakeup the remaining threads.
383
                       */
384
0
                      if (g_async_queue_length_unlocked (pool->queue) ==
385
0
                          (gint) -pool->num_threads)
386
0
                        g_thread_pool_wakeup_and_stop_all (pool);
387
0
                    }
388
0
                }
389
0
              else if (pool->immediate ||
390
0
                       g_async_queue_length_unlocked (pool->queue) <= 0)
391
0
                {
392
                  /* If the pool is not running and another thread is
393
                   * waiting for this thread pool to finish and there
394
                   * are either no tasks left or the pool shall stop
395
                   * immediately, inform the waiting thread of a change
396
                   * of the thread pool state.
397
                   */
398
0
                  g_cond_broadcast (&pool->cond);
399
0
                }
400
0
            }
401
402
0
          g_atomic_int_inc (&unused_threads);
403
0
          g_async_queue_unlock (pool->queue);
404
405
0
          if (free_pool)
406
0
            g_thread_pool_free_internal (pool);
407
408
0
          pool = g_thread_pool_wait_for_new_pool ();
409
0
          g_atomic_int_add (&unused_threads, -1);
410
411
0
          if (pool == NULL)
412
0
            break;
413
414
0
          g_async_queue_lock (pool->queue);
415
416
0
          DEBUG_MSG (("thread %p entering pool %p from global pool.",
417
0
                      g_thread_self (), pool));
418
419
          /* pool->num_threads++ is not done here, but in
420
           * g_thread_pool_start_thread to make the new started
421
           * thread known to the pool before itself can do it.
422
           */
423
0
        }
424
0
    }
425
426
0
  return NULL;
427
0
}
428
429
static gboolean
430
g_thread_pool_start_thread (GRealThreadPool  *pool,
431
                            GError          **error)
432
0
{
433
0
  gboolean success = FALSE;
434
435
0
  if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads)
436
    /* Enough threads are already running */
437
0
    return TRUE;
438
439
0
  g_async_queue_lock (unused_thread_queue);
440
441
0
  if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
442
0
    {
443
0
      g_async_queue_push_unlocked (unused_thread_queue, pool);
444
0
      success = TRUE;
445
0
    }
446
447
0
  g_async_queue_unlock (unused_thread_queue);
448
449
0
  if (!success)
450
0
    {
451
0
      const gchar *prgname = g_get_prgname ();
452
0
      gchar name[16] = "pool";
453
0
      GThread *thread;
454
455
0
      if (prgname)
456
0
        g_snprintf (name, sizeof (name), "pool-%s", prgname);
457
458
      /* No thread was found, we have to start a new one */
459
0
      if (pool->pool.exclusive)
460
0
        {
461
          /* For exclusive thread-pools this is directly called from new() and
462
           * we simply start new threads that inherit the scheduler settings
463
           * from the current thread.
464
           */
465
0
          thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
466
0
        }
467
0
      else
468
0
        {
469
          /* For non-exclusive thread-pools this can be called at any time
470
           * when a new thread is needed. We make sure to create a new thread
471
           * here with the correct scheduler settings by going via our helper
472
           * thread.
473
           */
474
0
          SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL };
475
476
0
          g_async_queue_lock (spawn_thread_queue);
477
478
0
          g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data);
479
480
0
          while (!spawn_thread_data.thread && !spawn_thread_data.error)
481
0
            g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue));
482
483
0
          thread = spawn_thread_data.thread;
484
0
          if (!thread)
485
0
            g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error));
486
0
          g_async_queue_unlock (spawn_thread_queue);
487
0
        }
488
489
0
      if (thread == NULL)
490
0
        return FALSE;
491
492
0
      g_thread_unref (thread);
493
0
    }
494
495
  /* See comment in g_thread_pool_thread_proxy as to why this is done
496
   * here and not there
497
   */
498
0
  pool->num_threads++;
499
500
0
  return TRUE;
501
0
}
502
503
/**
504
 * g_thread_pool_new:
505
 * @func: a function to execute in the threads of the new thread pool
506
 * @user_data: user data that is handed over to @func every time it
507
 *     is called
508
 * @max_threads: the maximal number of threads to execute concurrently
509
 *     in  the new thread pool, -1 means no limit
510
 * @exclusive: should this thread pool be exclusive?
511
 * @error: return location for error, or %NULL
512
 *
513
 * This function creates a new thread pool.
514
 *
515
 * Whenever you call g_thread_pool_push(), either a new thread is
516
 * created or an unused one is reused. At most @max_threads threads
517
 * are running concurrently for this thread pool. @max_threads = -1
518
 * allows unlimited threads to be created for this thread pool. The
519
 * newly created or reused thread now executes the function @func
520
 * with the two arguments. The first one is the parameter to
521
 * g_thread_pool_push() and the second one is @user_data.
522
 *
523
 * Pass g_get_num_processors() to @max_threads to create as many threads as
524
 * there are logical processors on the system. This will not pin each thread to
525
 * a specific processor.
526
 *
527
 * The parameter @exclusive determines whether the thread pool owns
528
 * all threads exclusive or shares them with other thread pools.
529
 * If @exclusive is %TRUE, @max_threads threads are started
530
 * immediately and they will run exclusively for this thread pool
531
 * until it is destroyed by g_thread_pool_free(). If @exclusive is
532
 * %FALSE, threads are created when needed and shared between all
533
 * non-exclusive thread pools. This implies that @max_threads may
534
 * not be -1 for exclusive thread pools. Besides, exclusive thread
535
 * pools are not affected by g_thread_pool_set_max_idle_time()
536
 * since their threads are never considered idle and returned to the
537
 * global pool.
538
 *
539
 * Note that the threads used by exclusive threadpools will all inherit the
540
 * scheduler settings of the current thread while the threads used by
541
 * non-exclusive threadpools will inherit the scheduler settings from the
542
 * first thread that created such a threadpool.
543
 *
544
 * @error can be %NULL to ignore errors, or non-%NULL to report
545
 * errors. An error can only occur when @exclusive is set to %TRUE
546
 * and not all @max_threads threads could be created.
547
 * See #GThreadError for possible errors that may occur.
548
 * Note, even in case of error a valid #GThreadPool is returned.
549
 *
550
 * Returns: the new #GThreadPool
551
 */
552
GThreadPool *
553
g_thread_pool_new (GFunc      func,
554
                   gpointer   user_data,
555
                   gint       max_threads,
556
                   gboolean   exclusive,
557
                   GError   **error)
558
0
{
559
0
  return g_thread_pool_new_full (func, user_data, NULL, max_threads, exclusive, error);
560
0
}
561
562
/**
563
 * g_thread_pool_new_full:
564
 * @func: a function to execute in the threads of the new thread pool
565
 * @user_data: user data that is handed over to @func every time it
566
 *     is called
567
 * @item_free_func: (nullable): used to pass as a free function to
568
 *     g_async_queue_new_full()
569
 * @max_threads: the maximal number of threads to execute concurrently
570
 *     in the new thread pool, `-1` means no limit
571
 * @exclusive: should this thread pool be exclusive?
572
 * @error: return location for error, or %NULL
573
 *
574
 * This function creates a new thread pool similar to g_thread_pool_new()
575
 * but allowing @item_free_func to be specified to free the data passed
576
 * to g_thread_pool_push() in the case that the #GThreadPool is stopped
577
 * and freed before all tasks have been executed.
578
 *
579
 * Returns: (transfer full): the new #GThreadPool
580
 *
581
 * Since: 2.70
582
 */
583
GThreadPool *
584
g_thread_pool_new_full (GFunc           func,
585
                        gpointer        user_data,
586
                        GDestroyNotify  item_free_func,
587
                        gint            max_threads,
588
                        gboolean        exclusive,
589
                        GError        **error)
590
0
{
591
0
  GRealThreadPool *retval;
592
0
  G_LOCK_DEFINE_STATIC (init);
593
594
0
  g_return_val_if_fail (func, NULL);
595
0
  g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
596
0
  g_return_val_if_fail (max_threads >= -1, NULL);
597
598
0
  retval = g_new (GRealThreadPool, 1);
599
600
0
  retval->pool.func = func;
601
0
  retval->pool.user_data = user_data;
602
0
  retval->pool.exclusive = exclusive;
603
0
  retval->queue = g_async_queue_new_full (item_free_func);
604
0
  g_cond_init (&retval->cond);
605
0
  retval->max_threads = max_threads;
606
0
  retval->num_threads = 0;
607
0
  retval->running = TRUE;
608
0
  retval->immediate = FALSE;
609
0
  retval->waiting = FALSE;
610
0
  retval->sort_func = NULL;
611
0
  retval->sort_user_data = NULL;
612
613
0
  G_LOCK (init);
614
0
  if (!unused_thread_queue)
615
0
      unused_thread_queue = g_async_queue_new ();
616
617
  /*
618
   * Spawn a helper thread that is only responsible for spawning new threads
619
   * with the scheduler settings of the current thread.
620
   *
621
   * This is then used for making sure that all threads created on the
622
   * non-exclusive thread-pool have the same scheduler settings, and more
623
   * importantly don't just inherit them from the thread that just happened to
624
   * push a new task and caused a new thread to be created.
625
   *
626
   * Not doing so could cause real-time priority threads or otherwise
627
   * threads with problematic scheduler settings to be part of the
628
   * non-exclusive thread-pools.
629
   *
630
   * For exclusive thread-pools this is not required as all threads are
631
   * created immediately below and are running forever, so they will
632
   * automatically inherit the scheduler settings from this very thread.
633
   */
634
0
  if (!exclusive && !spawn_thread_queue)
635
0
    {
636
0
      GThread *pool_spawner = NULL;
637
638
0
      spawn_thread_queue = g_async_queue_new ();
639
0
      g_cond_init (&spawn_thread_cond);
640
0
      pool_spawner = g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL);
641
0
      g_ignore_leak (pool_spawner);
642
0
    }
643
0
  G_UNLOCK (init);
644
645
0
  if (retval->pool.exclusive)
646
0
    {
647
0
      g_async_queue_lock (retval->queue);
648
649
0
      while (retval->num_threads < (guint) retval->max_threads)
650
0
        {
651
0
          GError *local_error = NULL;
652
653
0
          if (!g_thread_pool_start_thread (retval, &local_error))
654
0
            {
655
0
              g_propagate_error (error, local_error);
656
0
              break;
657
0
            }
658
0
        }
659
660
0
      g_async_queue_unlock (retval->queue);
661
0
    }
662
663
0
  return (GThreadPool*) retval;
664
0
}
665
666
/**
667
 * g_thread_pool_push:
668
 * @pool: a #GThreadPool
669
 * @data: a new task for @pool
670
 * @error: return location for error, or %NULL
671
 *
672
 * Inserts @data into the list of tasks to be executed by @pool.
673
 *
674
 * When the number of currently running threads is lower than the
675
 * maximal allowed number of threads, a new thread is started (or
676
 * reused) with the properties given to g_thread_pool_new().
677
 * Otherwise, @data stays in the queue until a thread in this pool
678
 * finishes its previous task and processes @data.
679
 *
680
 * @error can be %NULL to ignore errors, or non-%NULL to report
681
 * errors. An error can only occur when a new thread couldn't be
682
 * created. In that case @data is simply appended to the queue of
683
 * work to do.
684
 *
685
 * Before version 2.32, this function did not return a success status.
686
 *
687
 * Returns: %TRUE on success, %FALSE if an error occurred
688
 */
689
gboolean
690
g_thread_pool_push (GThreadPool  *pool,
691
                    gpointer      data,
692
                    GError      **error)
693
0
{
694
0
  GRealThreadPool *real;
695
0
  gboolean result;
696
697
0
  real = (GRealThreadPool*) pool;
698
699
0
  g_return_val_if_fail (real, FALSE);
700
0
  g_return_val_if_fail (real->running, FALSE);
701
702
0
  result = TRUE;
703
704
0
  g_async_queue_lock (real->queue);
705
706
0
  if (g_async_queue_length_unlocked (real->queue) >= 0)
707
0
    {
708
      /* No thread is waiting in the queue */
709
0
      GError *local_error = NULL;
710
711
0
      if (!g_thread_pool_start_thread (real, &local_error))
712
0
        {
713
0
          g_propagate_error (error, local_error);
714
0
          result = FALSE;
715
0
        }
716
0
    }
717
718
0
  g_thread_pool_queue_push_unlocked (real, data);
719
0
  g_async_queue_unlock (real->queue);
720
721
0
  return result;
722
0
}
723
724
/**
725
 * g_thread_pool_set_max_threads:
726
 * @pool: a #GThreadPool
727
 * @max_threads: a new maximal number of threads for @pool,
728
 *     or -1 for unlimited
729
 * @error: return location for error, or %NULL
730
 *
731
 * Sets the maximal allowed number of threads for @pool.
732
 * A value of -1 means that the maximal number of threads
733
 * is unlimited. If @pool is an exclusive thread pool, setting
734
 * the maximal number of threads to -1 is not allowed.
735
 *
736
 * Setting @max_threads to 0 means stopping all work for @pool.
737
 * It is effectively frozen until @max_threads is set to a non-zero
738
 * value again.
739
 *
740
 * A thread is never terminated while calling @func, as supplied by
741
 * g_thread_pool_new(). Instead the maximal number of threads only
742
 * has effect for the allocation of new threads in g_thread_pool_push().
743
 * A new thread is allocated, whenever the number of currently
744
 * running threads in @pool is smaller than the maximal number.
745
 *
746
 * @error can be %NULL to ignore errors, or non-%NULL to report
747
 * errors. An error can only occur when a new thread couldn't be
748
 * created.
749
 *
750
 * Before version 2.32, this function did not return a success status.
751
 *
752
 * Returns: %TRUE on success, %FALSE if an error occurred
753
 */
754
gboolean
755
g_thread_pool_set_max_threads (GThreadPool  *pool,
756
                               gint          max_threads,
757
                               GError      **error)
758
0
{
759
0
  GRealThreadPool *real;
760
0
  gint to_start;
761
0
  gboolean result;
762
763
0
  real = (GRealThreadPool*) pool;
764
765
0
  g_return_val_if_fail (real, FALSE);
766
0
  g_return_val_if_fail (real->running, FALSE);
767
0
  g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
768
0
  g_return_val_if_fail (max_threads >= -1, FALSE);
769
770
0
  result = TRUE;
771
772
0
  g_async_queue_lock (real->queue);
773
774
0
  real->max_threads = max_threads;
775
776
0
  if (pool->exclusive)
777
0
    to_start = real->max_threads - real->num_threads;
778
0
  else
779
0
    to_start = g_async_queue_length_unlocked (real->queue);
780
781
0
  for ( ; to_start > 0; to_start--)
782
0
    {
783
0
      GError *local_error = NULL;
784
785
0
      if (!g_thread_pool_start_thread (real, &local_error))
786
0
        {
787
0
          g_propagate_error (error, local_error);
788
0
          result = FALSE;
789
0
          break;
790
0
        }
791
0
    }
792
793
0
  g_async_queue_unlock (real->queue);
794
795
0
  return result;
796
0
}
797
798
/**
799
 * g_thread_pool_get_max_threads:
800
 * @pool: a #GThreadPool
801
 *
802
 * Returns the maximal number of threads for @pool.
803
 *
804
 * Returns: the maximal number of threads
805
 */
806
gint
807
g_thread_pool_get_max_threads (GThreadPool *pool)
808
0
{
809
0
  GRealThreadPool *real;
810
0
  gint retval;
811
812
0
  real = (GRealThreadPool*) pool;
813
814
0
  g_return_val_if_fail (real, 0);
815
0
  g_return_val_if_fail (real->running, 0);
816
817
0
  g_async_queue_lock (real->queue);
818
0
  retval = real->max_threads;
819
0
  g_async_queue_unlock (real->queue);
820
821
0
  return retval;
822
0
}
823
824
/**
825
 * g_thread_pool_get_num_threads:
826
 * @pool: a #GThreadPool
827
 *
828
 * Returns the number of threads currently running in @pool.
829
 *
830
 * Returns: the number of threads currently running
831
 */
832
guint
833
g_thread_pool_get_num_threads (GThreadPool *pool)
834
0
{
835
0
  GRealThreadPool *real;
836
0
  guint retval;
837
838
0
  real = (GRealThreadPool*) pool;
839
840
0
  g_return_val_if_fail (real, 0);
841
0
  g_return_val_if_fail (real->running, 0);
842
843
0
  g_async_queue_lock (real->queue);
844
0
  retval = real->num_threads;
845
0
  g_async_queue_unlock (real->queue);
846
847
0
  return retval;
848
0
}
849
850
/**
851
 * g_thread_pool_unprocessed:
852
 * @pool: a #GThreadPool
853
 *
854
 * Returns the number of tasks still unprocessed in @pool.
855
 *
856
 * Returns: the number of unprocessed tasks
857
 */
858
guint
859
g_thread_pool_unprocessed (GThreadPool *pool)
860
0
{
861
0
  GRealThreadPool *real;
862
0
  gint unprocessed;
863
864
0
  real = (GRealThreadPool*) pool;
865
866
0
  g_return_val_if_fail (real, 0);
867
0
  g_return_val_if_fail (real->running, 0);
868
869
0
  unprocessed = g_async_queue_length (real->queue);
870
871
0
  return MAX (unprocessed, 0);
872
0
}
873
874
/**
875
 * g_thread_pool_free:
876
 * @pool: a #GThreadPool
877
 * @immediate: should @pool shut down immediately?
878
 * @wait_: should the function wait for all tasks to be finished?
879
 *
880
 * Frees all resources allocated for @pool.
881
 *
882
 * If @immediate is %TRUE, no new task is processed for @pool.
883
 * Otherwise @pool is not freed before the last task is processed.
884
 * Note however, that no thread of this pool is interrupted while
885
 * processing a task. Instead at least all still running threads
886
 * can finish their tasks before the @pool is freed.
887
 *
888
 * If @wait_ is %TRUE, this function does not return before all
889
 * tasks to be processed (dependent on @immediate, whether all
890
 * or only the currently running) are ready.
891
 * Otherwise this function returns immediately.
892
 *
893
 * After calling this function @pool must not be used anymore.
894
 */
895
void
896
g_thread_pool_free (GThreadPool *pool,
897
                    gboolean     immediate,
898
                    gboolean     wait_)
899
0
{
900
0
  GRealThreadPool *real;
901
902
0
  real = (GRealThreadPool*) pool;
903
904
0
  g_return_if_fail (real);
905
0
  g_return_if_fail (real->running);
906
907
  /* If there's no thread allowed here, there is not much sense in
908
   * not stopping this pool immediately, when it's not empty
909
   */
910
0
  g_return_if_fail (immediate ||
911
0
                    real->max_threads != 0 ||
912
0
                    g_async_queue_length (real->queue) == 0);
913
914
0
  g_async_queue_lock (real->queue);
915
916
0
  real->running = FALSE;
917
0
  real->immediate = immediate;
918
0
  real->waiting = wait_;
919
920
0
  if (wait_)
921
0
    {
922
0
      while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads &&
923
0
             !(immediate && real->num_threads == 0))
924
0
        g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue));
925
0
    }
926
927
0
  if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads)
928
0
    {
929
      /* No thread is currently doing something (and nothing is left
930
       * to process in the queue)
931
       */
932
0
      if (real->num_threads == 0)
933
0
        {
934
          /* No threads left, we clean up */
935
0
          g_async_queue_unlock (real->queue);
936
0
          g_thread_pool_free_internal (real);
937
0
          return;
938
0
        }
939
940
0
      g_thread_pool_wakeup_and_stop_all (real);
941
0
    }
942
943
  /* The last thread should cleanup the pool */
944
0
  real->waiting = FALSE;
945
0
  g_async_queue_unlock (real->queue);
946
0
}
947
948
static void
949
g_thread_pool_free_internal (GRealThreadPool* pool)
950
0
{
951
0
  g_return_if_fail (pool);
952
0
  g_return_if_fail (pool->running == FALSE);
953
0
  g_return_if_fail (pool->num_threads == 0);
954
955
  /* Ensure the dummy item pushed on by g_thread_pool_wakeup_and_stop_all() is
956
   * removed, before it’s potentially passed to the user-provided
957
   * @item_free_func. */
958
0
  g_async_queue_remove (pool->queue, GUINT_TO_POINTER (1));
959
960
0
  g_async_queue_unref (pool->queue);
961
0
  g_cond_clear (&pool->cond);
962
963
0
  g_free (pool);
964
0
}
965
966
static void
967
g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
968
0
{
969
0
  guint i;
970
971
0
  g_return_if_fail (pool);
972
0
  g_return_if_fail (pool->running == FALSE);
973
0
  g_return_if_fail (pool->num_threads != 0);
974
975
0
  pool->immediate = TRUE;
976
977
  /*
978
   * So here we're sending bogus data to the pool threads, which
979
   * should cause them each to wake up, and check the above
980
   * pool->immediate condition. However we don't want that
981
   * data to be sorted (since it'll crash the sorter).
982
   */
983
0
  for (i = 0; i < pool->num_threads; i++)
984
0
    g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
985
0
}
986
987
/**
988
 * g_thread_pool_set_max_unused_threads:
989
 * @max_threads: maximal number of unused threads
990
 *
991
 * Sets the maximal number of unused threads to @max_threads.
992
 * If @max_threads is -1, no limit is imposed on the number
993
 * of unused threads.
994
 *
995
 * The default value is 2.
996
 */
997
void
998
g_thread_pool_set_max_unused_threads (gint max_threads)
999
0
{
1000
0
  g_return_if_fail (max_threads >= -1);
1001
1002
0
  g_atomic_int_set (&max_unused_threads, max_threads);
1003
1004
0
  if (max_threads != -1)
1005
0
    {
1006
0
      max_threads -= g_atomic_int_get (&unused_threads);
1007
0
      if (max_threads < 0)
1008
0
        {
1009
0
          g_atomic_int_set (&kill_unused_threads, -max_threads);
1010
0
          g_atomic_int_inc (&wakeup_thread_serial);
1011
1012
0
          g_async_queue_lock (unused_thread_queue);
1013
1014
0
          do
1015
0
            {
1016
0
              g_async_queue_push_unlocked (unused_thread_queue,
1017
0
                                           wakeup_thread_marker);
1018
0
            }
1019
0
          while (++max_threads);
1020
1021
0
          g_async_queue_unlock (unused_thread_queue);
1022
0
        }
1023
0
    }
1024
0
}
1025
1026
/**
1027
 * g_thread_pool_get_max_unused_threads:
1028
 *
1029
 * Returns the maximal allowed number of unused threads.
1030
 *
1031
 * Returns: the maximal number of unused threads
1032
 */
1033
gint
1034
g_thread_pool_get_max_unused_threads (void)
1035
0
{
1036
0
  return g_atomic_int_get (&max_unused_threads);
1037
0
}
1038
1039
/**
1040
 * g_thread_pool_get_num_unused_threads:
1041
 *
1042
 * Returns the number of currently unused threads.
1043
 *
1044
 * Returns: the number of currently unused threads
1045
 */
1046
guint
1047
g_thread_pool_get_num_unused_threads (void)
1048
0
{
1049
0
  return (guint) g_atomic_int_get (&unused_threads);
1050
0
}
1051
1052
/**
1053
 * g_thread_pool_stop_unused_threads:
1054
 *
1055
 * Stops all currently unused threads. This does not change the
1056
 * maximal number of unused threads. This function can be used to
1057
 * regularly stop all unused threads e.g. from g_timeout_add().
1058
 */
1059
void
1060
g_thread_pool_stop_unused_threads (void)
1061
0
{
1062
0
  guint oldval;
1063
1064
0
  oldval = g_thread_pool_get_max_unused_threads ();
1065
1066
0
  g_thread_pool_set_max_unused_threads (0);
1067
0
  g_thread_pool_set_max_unused_threads (oldval);
1068
0
}
1069
1070
/**
1071
 * g_thread_pool_set_sort_function:
1072
 * @pool: a #GThreadPool
1073
 * @func: the #GCompareDataFunc used to sort the list of tasks.
1074
 *     This function is passed two tasks. It should return
1075
 *     0 if the order in which they are handled does not matter,
1076
 *     a negative value if the first task should be processed before
1077
 *     the second or a positive value if the second task should be
1078
 *     processed first.
1079
 * @user_data: user data passed to @func
1080
 *
1081
 * Sets the function used to sort the list of tasks. This allows the
1082
 * tasks to be processed by a priority determined by @func, and not
1083
 * just in the order in which they were added to the pool.
1084
 *
1085
 * Note, if the maximum number of threads is more than 1, the order
1086
 * that threads are executed cannot be guaranteed 100%. Threads are
1087
 * scheduled by the operating system and are executed at random. It
1088
 * cannot be assumed that threads are executed in the order they are
1089
 * created.
1090
 *
1091
 * Since: 2.10
1092
 */
1093
void
1094
g_thread_pool_set_sort_function (GThreadPool      *pool,
1095
                                 GCompareDataFunc  func,
1096
                                 gpointer          user_data)
1097
0
{
1098
0
  GRealThreadPool *real;
1099
1100
0
  real = (GRealThreadPool*) pool;
1101
1102
0
  g_return_if_fail (real);
1103
0
  g_return_if_fail (real->running);
1104
1105
0
  g_async_queue_lock (real->queue);
1106
1107
0
  real->sort_func = func;
1108
0
  real->sort_user_data = user_data;
1109
1110
0
  if (func)
1111
0
    g_async_queue_sort_unlocked (real->queue,
1112
0
                                 real->sort_func,
1113
0
                                 real->sort_user_data);
1114
1115
0
  g_async_queue_unlock (real->queue);
1116
0
}
1117
1118
/**
1119
 * g_thread_pool_move_to_front:
1120
 * @pool: a #GThreadPool
1121
 * @data: an unprocessed item in the pool
1122
 *
1123
 * Moves the item to the front of the queue of unprocessed
1124
 * items, so that it will be processed next.
1125
 *
1126
 * Returns: %TRUE if the item was found and moved
1127
 *
1128
 * Since: 2.46
1129
 */
1130
gboolean
1131
g_thread_pool_move_to_front (GThreadPool *pool,
1132
                             gpointer     data)
1133
0
{
1134
0
  GRealThreadPool *real = (GRealThreadPool*) pool;
1135
0
  gboolean found;
1136
1137
0
  g_async_queue_lock (real->queue);
1138
1139
0
  found = g_async_queue_remove_unlocked (real->queue, data);
1140
0
  if (found)
1141
0
    g_async_queue_push_front_unlocked (real->queue, data);
1142
1143
0
  g_async_queue_unlock (real->queue);
1144
1145
0
  return found;
1146
0
}
1147
1148
/**
1149
 * g_thread_pool_set_max_idle_time:
1150
 * @interval: the maximum @interval (in milliseconds)
1151
 *     a thread can be idle
1152
 *
1153
 * This function will set the maximum @interval that a thread
1154
 * waiting in the pool for new tasks can be idle for before
1155
 * being stopped. This function is similar to calling
1156
 * g_thread_pool_stop_unused_threads() on a regular timeout,
1157
 * except this is done on a per thread basis.
1158
 *
1159
 * By setting @interval to 0, idle threads will not be stopped.
1160
 *
1161
 * The default value is 15000 (15 seconds).
1162
 *
1163
 * Since: 2.10
1164
 */
1165
void
1166
g_thread_pool_set_max_idle_time (guint interval)
1167
0
{
1168
0
  guint i;
1169
1170
0
  g_atomic_int_set (&max_idle_time, interval);
1171
1172
0
  i = (guint) g_atomic_int_get (&unused_threads);
1173
0
  if (i > 0)
1174
0
    {
1175
0
      g_atomic_int_inc (&wakeup_thread_serial);
1176
0
      g_async_queue_lock (unused_thread_queue);
1177
1178
0
      do
1179
0
        {
1180
0
          g_async_queue_push_unlocked (unused_thread_queue,
1181
0
                                       wakeup_thread_marker);
1182
0
        }
1183
0
      while (--i);
1184
1185
0
      g_async_queue_unlock (unused_thread_queue);
1186
0
    }
1187
0
}
1188
1189
/**
1190
 * g_thread_pool_get_max_idle_time:
1191
 *
1192
 * This function will return the maximum @interval that a
1193
 * thread will wait in the thread pool for new tasks before
1194
 * being stopped.
1195
 *
1196
 * If this function returns 0, threads waiting in the thread
1197
 * pool for new work are not stopped.
1198
 *
1199
 * Returns: the maximum @interval (milliseconds) to wait
1200
 *     for new tasks in the thread pool before stopping the
1201
 *     thread
1202
 *
1203
 * Since: 2.10
1204
 */
1205
guint
1206
g_thread_pool_get_max_idle_time (void)
1207
0
{
1208
  return (guint) g_atomic_int_get (&max_idle_time);
1209
0
}