Coverage Report

Created: 2025-08-29 06:18

/src/tinysparql/subprojects/glib-2.80.3/glib/gthreadpool.c
Line
Count
Source (jump to first uncovered line)
1
/* GLIB - Library of useful routines for C programming
2
 * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3
 *
4
 * GThreadPool: thread pool implementation.
5
 * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6
 *
7
 * SPDX-License-Identifier: LGPL-2.1-or-later
8
 *
9
 * This library is free software; you can redistribute it and/or
10
 * modify it under the terms of the GNU Lesser General Public
11
 * License as published by the Free Software Foundation; either
12
 * version 2.1 of the License, or (at your option) any later version.
13
 *
14
 * This library is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
 * Lesser General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU Lesser General Public
20
 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21
 */
22
23
/*
24
 * MT safe
25
 */
26
27
#include "config.h"
28
29
#include "gthreadpool.h"
30
31
#include "gasyncqueue.h"
32
#include "gasyncqueueprivate.h"
33
#include "glib-private.h"
34
#include "gmain.h"
35
#include "gtestutils.h"
36
#include "gthreadprivate.h"
37
#include "gtimer.h"
38
#include "gutils.h"
39
40
#define DEBUG_MSG(x)
41
/* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
42
43
typedef struct _GRealThreadPool GRealThreadPool;
44
45
/**
46
 * GThreadPool:
47
 * @func: the function to execute in the threads of this pool
48
 * @user_data: the user data for the threads of this pool
49
 * @exclusive: are all threads exclusive to this pool
50
 *
51
 * The `GThreadPool` struct represents a thread pool.
52
 *
53
 * A thread pool is useful when you wish to asynchronously fork out the execution of work
54
 * and continue working in your own thread. If that will happen often, the overhead of starting
55
 * and destroying a thread each time might be too high. In such cases reusing already started
56
 * threads seems like a good idea. And it indeed is, but implementing this can be tedious
57
 * and error-prone.
58
 *
59
 * Therefore GLib provides thread pools for your convenience. An added advantage is, that the
60
 * threads can be shared between the different subsystems of your program, when they are using GLib.
61
 *
62
 * To create a new thread pool, you use [func@GLib.ThreadPool.new].
63
 * It is destroyed by [method@GLib.ThreadPool.free].
64
 *
65
 * If you want to execute a certain task within a thread pool, use [method@GLib.ThreadPool.push].
66
 *
67
 * To get the current number of running threads you call [method@GLib.ThreadPool.get_num_threads].
68
 * To get the number of still unprocessed tasks you call [method@GLib.ThreadPool.unprocessed].
69
 * To control the maximum number of threads for a thread pool, you use
70
 * [method@GLib.ThreadPool.get_max_threads]. and [method@GLib.ThreadPool.set_max_threads].
71
 *
72
 * Finally you can control the number of unused threads, that are kept alive by GLib for future use.
73
 * The current number can be fetched with [func@GLib.ThreadPool.get_num_unused_threads].
74
 * The maximum number can be controlled by [func@GLib.ThreadPool.get_max_unused_threads] and
75
 * [func@GLib.ThreadPool.set_max_unused_threads]. All currently unused threads
76
 * can be stopped by calling [func@GLib.ThreadPool.stop_unused_threads].
77
 */
78
struct _GRealThreadPool
79
{
80
  GThreadPool pool;
81
  GAsyncQueue *queue;
82
  GCond cond;
83
  gint max_threads;
84
  guint num_threads;
85
  gboolean running;
86
  gboolean immediate;
87
  gboolean waiting;
88
  GCompareDataFunc sort_func;
89
  gpointer sort_user_data;
90
};
91
92
/* The following is just an address to mark the wakeup order for a
93
 * thread, it could be any address (as long, as it isn't a valid
94
 * GThreadPool address)
95
 */
96
static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
97
static gint wakeup_thread_serial = 0;
98
99
/* Here all unused threads are waiting  */
100
static GAsyncQueue *unused_thread_queue = NULL;
101
static gint unused_threads = 0;
102
static gint max_unused_threads = 2;
103
static gint kill_unused_threads = 0;
104
static guint max_idle_time = 15 * 1000;
105
106
typedef struct
107
{
108
  /* Either thread or error are set in the end. Both transfer-full. */
109
  GThreadPool *pool;
110
  GThread *thread;
111
  GError *error;
112
} SpawnThreadData;
113
114
static GCond spawn_thread_cond;
115
static GAsyncQueue *spawn_thread_queue;
116
117
static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
118
                                                           gpointer          data);
119
static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
120
static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
121
static gboolean         g_thread_pool_start_thread        (GRealThreadPool  *pool,
122
                                                           GError          **error);
123
static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
124
static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
125
static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
126
127
static void
128
g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
129
                                   gpointer         data)
130
0
{
131
0
  if (pool->sort_func)
132
0
    g_async_queue_push_sorted_unlocked (pool->queue,
133
0
                                        data,
134
0
                                        pool->sort_func,
135
0
                                        pool->sort_user_data);
136
0
  else
137
0
    g_async_queue_push_unlocked (pool->queue, data);
138
0
}
139
140
static GRealThreadPool*
141
g_thread_pool_wait_for_new_pool (void)
142
7.87k
{
143
7.87k
  GRealThreadPool *pool;
144
7.87k
  gint local_wakeup_thread_serial;
145
7.87k
  guint local_max_unused_threads;
146
7.87k
  gint local_max_idle_time;
147
7.87k
  gint last_wakeup_thread_serial;
148
7.87k
  gboolean have_relayed_thread_marker = FALSE;
149
150
7.87k
  local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
151
7.87k
  local_max_idle_time = g_atomic_int_get (&max_idle_time);
152
7.87k
  last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
153
154
7.87k
  do
155
7.87k
    {
156
7.87k
      if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
157
0
        {
158
          /* If this is a superfluous thread, stop it. */
159
0
          pool = NULL;
160
0
        }
161
7.87k
      else if (local_max_idle_time > 0)
162
7.87k
        {
163
          /* If a maximal idle time is given, wait for the given time. */
164
7.87k
          DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
165
7.87k
                      g_thread_self (), local_max_idle_time / 1000.0));
166
167
7.87k
          pool = g_async_queue_timeout_pop (unused_thread_queue,
168
7.87k
              local_max_idle_time * 1000);
169
7.87k
        }
170
0
      else
171
0
        {
172
          /* If no maximal idle time is given, wait indefinitely. */
173
0
          DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
174
0
          pool = g_async_queue_pop (unused_thread_queue);
175
0
        }
176
177
7.87k
      if (pool == wakeup_thread_marker)
178
0
        {
179
0
          local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
180
0
          if (last_wakeup_thread_serial == local_wakeup_thread_serial)
181
0
            {
182
0
              if (!have_relayed_thread_marker)
183
0
              {
184
                /* If this wakeup marker has been received for
185
                 * the second time, relay it.
186
                 */
187
0
                DEBUG_MSG (("thread %p relaying wakeup message to "
188
0
                            "waiting thread with lower serial.",
189
0
                            g_thread_self ()));
190
191
0
                g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
192
0
                have_relayed_thread_marker = TRUE;
193
194
                /* If a wakeup marker has been relayed, this thread
195
                 * will get out of the way for 100 microseconds to
196
                 * avoid receiving this marker again.
197
                 */
198
0
                g_usleep (100);
199
0
              }
200
0
            }
201
0
          else
202
0
            {
203
0
              if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
204
0
                {
205
0
                  pool = NULL;
206
0
                  break;
207
0
                }
208
209
0
              DEBUG_MSG (("thread %p updating to new limits.",
210
0
                          g_thread_self ()));
211
212
0
              local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
213
0
              local_max_idle_time = g_atomic_int_get (&max_idle_time);
214
0
              last_wakeup_thread_serial = local_wakeup_thread_serial;
215
216
0
              have_relayed_thread_marker = FALSE;
217
0
            }
218
0
        }
219
7.87k
    }
220
7.87k
  while (pool == wakeup_thread_marker);
221
222
0
  return pool;
223
7.87k
}
224
225
static gpointer
226
g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
227
15.7k
{
228
15.7k
  gpointer task = NULL;
229
230
15.7k
  if (pool->running || (!pool->immediate &&
231
7.87k
                        g_async_queue_length_unlocked (pool->queue) > 0))
232
7.87k
    {
233
      /* This thread pool is still active. */
234
7.87k
      if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads)
235
0
        {
236
          /* This is a superfluous thread, so it goes to the global pool. */
237
0
          DEBUG_MSG (("superfluous thread %p in pool %p.",
238
0
                      g_thread_self (), pool));
239
0
        }
240
7.87k
      else if (pool->pool.exclusive)
241
7.87k
        {
242
          /* Exclusive threads stay attached to the pool. */
243
7.87k
          task = g_async_queue_pop_unlocked (pool->queue);
244
245
7.87k
          DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
246
7.87k
                      "(%d running, %d unprocessed).",
247
7.87k
                      g_thread_self (), pool, pool->num_threads,
248
7.87k
                      g_async_queue_length_unlocked (pool->queue)));
249
7.87k
        }
250
0
      else
251
0
        {
252
          /* A thread will wait for new tasks for at most 1/2
253
           * second before going to the global pool.
254
           */
255
0
          DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
256
0
                      "(%d running, %d unprocessed).",
257
0
                      g_thread_self (), pool, pool->num_threads,
258
0
                      g_async_queue_length_unlocked (pool->queue)));
259
260
0
          task = g_async_queue_timeout_pop_unlocked (pool->queue,
261
0
                 G_USEC_PER_SEC / 2);
262
0
        }
263
7.87k
    }
264
7.87k
  else
265
7.87k
    {
266
      /* This thread pool is inactive, it will no longer process tasks. */
267
7.87k
      DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
268
7.87k
                  "(running: %s, immediate: %s, len: %d).",
269
7.87k
                  pool, g_thread_self (),
270
7.87k
                  pool->running ? "true" : "false",
271
7.87k
                  pool->immediate ? "true" : "false",
272
7.87k
                  g_async_queue_length_unlocked (pool->queue)));
273
7.87k
    }
274
275
15.7k
  return task;
276
15.7k
}
277
278
static gpointer
279
g_thread_pool_spawn_thread (gpointer data)
280
4
{
281
8
  while (TRUE)
282
4
    {
283
4
      SpawnThreadData *spawn_thread_data;
284
4
      GThread *thread = NULL;
285
4
      GError *error = NULL;
286
4
      const gchar *prgname = g_get_prgname ();
287
4
      gchar name[16] = "pool";
288
289
4
      if (prgname)
290
0
        g_snprintf (name, sizeof (name), "pool-%s", prgname);
291
292
4
      g_async_queue_lock (spawn_thread_queue);
293
      /* Spawn a new thread for the given pool and wake the requesting thread
294
       * up again with the result. This new thread will have the scheduler
295
       * settings inherited from this thread and in extension of the thread
296
       * that created the first non-exclusive thread-pool. */
297
4
      spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue);
298
4
      thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error);
299
300
4
      spawn_thread_data->thread = g_steal_pointer (&thread);
301
4
      spawn_thread_data->error = g_steal_pointer (&error);
302
303
4
      g_cond_broadcast (&spawn_thread_cond);
304
4
      g_async_queue_unlock (spawn_thread_queue);
305
4
    }
306
307
4
  return NULL;
308
4
}
309
310
static gpointer
311
g_thread_pool_thread_proxy (gpointer data)
312
4
{
313
4
  GRealThreadPool *pool;
314
315
4
  pool = data;
316
317
4
  DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
318
319
4
  g_async_queue_lock (pool->queue);
320
321
15.7k
  while (TRUE)
322
15.7k
    {
323
15.7k
      gpointer task;
324
325
15.7k
      task = g_thread_pool_wait_for_new_task (pool);
326
15.7k
      if (task)
327
7.87k
        {
328
7.87k
          if (pool->running || !pool->immediate)
329
0
            {
330
              /* A task was received and the thread pool is active,
331
               * so execute the function.
332
               */
333
0
              g_async_queue_unlock (pool->queue);
334
0
              DEBUG_MSG (("thread %p in pool %p calling func.",
335
0
                          g_thread_self (), pool));
336
0
              pool->pool.func (task, pool->pool.user_data);
337
0
              g_async_queue_lock (pool->queue);
338
0
            }
339
7.87k
        }
340
7.87k
      else
341
7.87k
        {
342
          /* No task was received, so this thread goes to the global pool. */
343
7.87k
          gboolean free_pool = FALSE;
344
345
7.87k
          DEBUG_MSG (("thread %p leaving pool %p for global pool.",
346
7.87k
                      g_thread_self (), pool));
347
7.87k
          pool->num_threads--;
348
349
7.87k
          if (!pool->running)
350
7.87k
            {
351
7.87k
              if (!pool->waiting)
352
7.87k
                {
353
7.87k
                  if (pool->num_threads == 0)
354
7.87k
                    {
355
                      /* If the pool is not running and no other
356
                       * thread is waiting for this thread pool to
357
                       * finish and this is the last thread of this
358
                       * pool, free the pool.
359
                       */
360
7.87k
                      free_pool = TRUE;
361
7.87k
                    }
362
0
                  else
363
0
                    {
364
                      /* If the pool is not running and no other
365
                       * thread is waiting for this thread pool to
366
                       * finish and this is not the last thread of
367
                       * this pool and there are no tasks left in the
368
                       * queue, wakeup the remaining threads.
369
                       */
370
0
                      if (g_async_queue_length_unlocked (pool->queue) ==
371
0
                          (gint) -pool->num_threads)
372
0
                        g_thread_pool_wakeup_and_stop_all (pool);
373
0
                    }
374
7.87k
                }
375
0
              else if (pool->immediate ||
376
0
                       g_async_queue_length_unlocked (pool->queue) <= 0)
377
0
                {
378
                  /* If the pool is not running and another thread is
379
                   * waiting for this thread pool to finish and there
380
                   * are either no tasks left or the pool shall stop
381
                   * immediately, inform the waiting thread of a change
382
                   * of the thread pool state.
383
                   */
384
0
                  g_cond_broadcast (&pool->cond);
385
0
                }
386
7.87k
            }
387
388
7.87k
          g_atomic_int_inc (&unused_threads);
389
7.87k
          g_async_queue_unlock (pool->queue);
390
391
7.87k
          if (free_pool)
392
7.87k
            g_thread_pool_free_internal (pool);
393
394
7.87k
          pool = g_thread_pool_wait_for_new_pool ();
395
7.87k
          g_atomic_int_add (&unused_threads, -1);
396
397
7.87k
          if (pool == NULL)
398
0
            break;
399
400
7.87k
          g_async_queue_lock (pool->queue);
401
402
7.87k
          DEBUG_MSG (("thread %p entering pool %p from global pool.",
403
7.87k
                      g_thread_self (), pool));
404
405
          /* pool->num_threads++ is not done here, but in
406
           * g_thread_pool_start_thread to make the new started
407
           * thread known to the pool before itself can do it.
408
           */
409
7.87k
        }
410
15.7k
    }
411
412
4
  return NULL;
413
4
}
414
415
static gboolean
416
g_thread_pool_start_thread (GRealThreadPool  *pool,
417
                            GError          **error)
418
7.87k
{
419
7.87k
  gboolean success = FALSE;
420
421
7.87k
  if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads)
422
    /* Enough threads are already running */
423
0
    return TRUE;
424
425
7.87k
  g_async_queue_lock (unused_thread_queue);
426
427
7.87k
  if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
428
7.87k
    {
429
7.87k
      g_async_queue_push_unlocked (unused_thread_queue, pool);
430
7.87k
      success = TRUE;
431
7.87k
    }
432
433
7.87k
  g_async_queue_unlock (unused_thread_queue);
434
435
7.87k
  if (!success)
436
4
    {
437
4
      const gchar *prgname = g_get_prgname ();
438
4
      gchar name[16] = "pool";
439
4
      GThread *thread;
440
441
4
      if (prgname)
442
0
        g_snprintf (name, sizeof (name), "pool-%s", prgname);
443
444
      /* No thread was found, we have to start a new one */
445
4
      if (pool->pool.exclusive)
446
4
        {
447
          /* For exclusive thread-pools this is directly called from new() and
448
           * we simply start new threads that inherit the scheduler settings
449
           * from the current thread.
450
           */
451
4
          thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
452
4
        }
453
0
      else
454
0
        {
455
          /* For non-exclusive thread-pools this can be called at any time
456
           * when a new thread is needed. We make sure to create a new thread
457
           * here with the correct scheduler settings by going via our helper
458
           * thread.
459
           */
460
0
          SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL };
461
462
0
          g_async_queue_lock (spawn_thread_queue);
463
464
0
          g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data);
465
466
0
          while (!spawn_thread_data.thread && !spawn_thread_data.error)
467
0
            g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue));
468
469
0
          thread = spawn_thread_data.thread;
470
0
          if (!thread)
471
0
            g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error));
472
0
          g_async_queue_unlock (spawn_thread_queue);
473
0
        }
474
475
4
      if (thread == NULL)
476
0
        return FALSE;
477
478
4
      g_thread_unref (thread);
479
4
    }
480
481
  /* See comment in g_thread_pool_thread_proxy as to why this is done
482
   * here and not there
483
   */
484
7.87k
  pool->num_threads++;
485
486
7.87k
  return TRUE;
487
7.87k
}
488
489
/**
490
 * g_thread_pool_new:
491
 * @func: a function to execute in the threads of the new thread pool
492
 * @user_data: user data that is handed over to @func every time it
493
 *     is called
494
 * @max_threads: the maximal number of threads to execute concurrently
495
 *     in  the new thread pool, -1 means no limit
496
 * @exclusive: should this thread pool be exclusive?
497
 * @error: return location for error, or %NULL
498
 *
499
 * This function creates a new thread pool.
500
 *
501
 * Whenever you call g_thread_pool_push(), either a new thread is
502
 * created or an unused one is reused. At most @max_threads threads
503
 * are running concurrently for this thread pool. @max_threads = -1
504
 * allows unlimited threads to be created for this thread pool. The
505
 * newly created or reused thread now executes the function @func
506
 * with the two arguments. The first one is the parameter to
507
 * g_thread_pool_push() and the second one is @user_data.
508
 *
509
 * Pass g_get_num_processors() to @max_threads to create as many threads as
510
 * there are logical processors on the system. This will not pin each thread to
511
 * a specific processor.
512
 *
513
 * The parameter @exclusive determines whether the thread pool owns
514
 * all threads exclusive or shares them with other thread pools.
515
 * If @exclusive is %TRUE, @max_threads threads are started
516
 * immediately and they will run exclusively for this thread pool
517
 * until it is destroyed by g_thread_pool_free(). If @exclusive is
518
 * %FALSE, threads are created when needed and shared between all
519
 * non-exclusive thread pools. This implies that @max_threads may
520
 * not be -1 for exclusive thread pools. Besides, exclusive thread
521
 * pools are not affected by g_thread_pool_set_max_idle_time()
522
 * since their threads are never considered idle and returned to the
523
 * global pool.
524
 *
525
 * Note that the threads used by exclusive thread pools will all inherit the
526
 * scheduler settings of the current thread while the threads used by
527
 * non-exclusive thread pools will inherit the scheduler settings from the
528
 * first thread that created such a thread pool.
529
 *
530
 * At least one thread will be spawned when this function is called, either to
531
 * create the @max_threads exclusive threads, or to preserve the scheduler
532
 * settings of the current thread for future spawns.
533
 *
534
 * @error can be %NULL to ignore errors, or non-%NULL to report
535
 * errors. An error can only occur when @exclusive is set to %TRUE
536
 * and not all @max_threads threads could be created.
537
 * See #GThreadError for possible errors that may occur.
538
 * Note, even in case of error a valid #GThreadPool is returned.
539
 *
540
 * Returns: the new #GThreadPool
541
 */
542
GThreadPool *
543
g_thread_pool_new (GFunc      func,
544
                   gpointer   user_data,
545
                   gint       max_threads,
546
                   gboolean   exclusive,
547
                   GError   **error)
548
15.7k
{
549
15.7k
  return g_thread_pool_new_full (func, user_data, NULL, max_threads, exclusive, error);
550
15.7k
}
551
552
/**
553
 * g_thread_pool_new_full:
554
 * @func: a function to execute in the threads of the new thread pool
555
 * @user_data: user data that is handed over to @func every time it
556
 *     is called
557
 * @item_free_func: (nullable): used to pass as a free function to
558
 *     g_async_queue_new_full()
559
 * @max_threads: the maximal number of threads to execute concurrently
560
 *     in the new thread pool, `-1` means no limit
561
 * @exclusive: should this thread pool be exclusive?
562
 * @error: return location for error, or %NULL
563
 *
564
 * This function creates a new thread pool similar to g_thread_pool_new()
565
 * but allowing @item_free_func to be specified to free the data passed
566
 * to g_thread_pool_push() in the case that the #GThreadPool is stopped
567
 * and freed before all tasks have been executed.
568
 *
569
 * @item_free_func will *not* be called on items successfully passed to @func.
570
 * @func is responsible for freeing the items passed to it.
571
 *
572
 * Returns: (transfer full): the new #GThreadPool
573
 *
574
 * Since: 2.70
575
 */
576
GThreadPool *
577
g_thread_pool_new_full (GFunc           func,
578
                        gpointer        user_data,
579
                        GDestroyNotify  item_free_func,
580
                        gint            max_threads,
581
                        gboolean        exclusive,
582
                        GError        **error)
583
15.7k
{
584
15.7k
  GRealThreadPool *retval;
585
15.7k
  G_LOCK_DEFINE_STATIC (init);
586
587
15.7k
  g_return_val_if_fail (func, NULL);
588
15.7k
  g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
589
15.7k
  g_return_val_if_fail (max_threads >= -1, NULL);
590
591
15.7k
  retval = g_new (GRealThreadPool, 1);
592
593
15.7k
  retval->pool.func = func;
594
15.7k
  retval->pool.user_data = user_data;
595
15.7k
  retval->pool.exclusive = exclusive;
596
15.7k
  retval->queue = g_async_queue_new_full (item_free_func);
597
15.7k
  g_cond_init (&retval->cond);
598
15.7k
  retval->max_threads = max_threads;
599
15.7k
  retval->num_threads = 0;
600
15.7k
  retval->running = TRUE;
601
15.7k
  retval->immediate = FALSE;
602
15.7k
  retval->waiting = FALSE;
603
15.7k
  retval->sort_func = NULL;
604
15.7k
  retval->sort_user_data = NULL;
605
606
15.7k
  G_LOCK (init);
607
15.7k
  if (!unused_thread_queue)
608
4
      unused_thread_queue = g_async_queue_new ();
609
610
  /*
611
   * Spawn a helper thread that is only responsible for spawning new threads
612
   * with the scheduler settings of the current thread.
613
   *
614
   * This is then used for making sure that all threads created on the
615
   * non-exclusive thread-pool have the same scheduler settings, and more
616
   * importantly don't just inherit them from the thread that just happened to
617
   * push a new task and caused a new thread to be created.
618
   *
619
   * Not doing so could cause real-time priority threads or otherwise
620
   * threads with problematic scheduler settings to be part of the
621
   * non-exclusive thread-pools.
622
   *
623
   * For exclusive thread-pools this is not required as all threads are
624
   * created immediately below and are running forever, so they will
625
   * automatically inherit the scheduler settings from this very thread.
626
   */
627
15.7k
  if (!exclusive && !spawn_thread_queue)
628
4
    {
629
4
      GThread *pool_spawner = NULL;
630
631
4
      spawn_thread_queue = g_async_queue_new ();
632
4
      g_cond_init (&spawn_thread_cond);
633
4
      pool_spawner = g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL);
634
4
      g_ignore_leak (pool_spawner);
635
4
    }
636
15.7k
  G_UNLOCK (init);
637
638
15.7k
  if (retval->pool.exclusive)
639
7.87k
    {
640
7.87k
      g_async_queue_lock (retval->queue);
641
642
15.7k
      while (retval->num_threads < (guint) retval->max_threads)
643
7.87k
        {
644
7.87k
          GError *local_error = NULL;
645
646
7.87k
          if (!g_thread_pool_start_thread (retval, &local_error))
647
0
            {
648
0
              g_propagate_error (error, local_error);
649
0
              break;
650
0
            }
651
7.87k
        }
652
653
7.87k
      g_async_queue_unlock (retval->queue);
654
7.87k
    }
655
656
15.7k
  return (GThreadPool*) retval;
657
15.7k
}
658
659
/**
660
 * g_thread_pool_push:
661
 * @pool: a #GThreadPool
662
 * @data: a new task for @pool
663
 * @error: return location for error, or %NULL
664
 *
665
 * Inserts @data into the list of tasks to be executed by @pool.
666
 *
667
 * When the number of currently running threads is lower than the
668
 * maximal allowed number of threads, a new thread is started (or
669
 * reused) with the properties given to g_thread_pool_new().
670
 * Otherwise, @data stays in the queue until a thread in this pool
671
 * finishes its previous task and processes @data.
672
 *
673
 * @error can be %NULL to ignore errors, or non-%NULL to report
674
 * errors. An error can only occur when a new thread couldn't be
675
 * created. In that case @data is simply appended to the queue of
676
 * work to do.
677
 *
678
 * Before version 2.32, this function did not return a success status.
679
 *
680
 * Returns: %TRUE on success, %FALSE if an error occurred
681
 */
682
gboolean
683
g_thread_pool_push (GThreadPool  *pool,
684
                    gpointer      data,
685
                    GError      **error)
686
0
{
687
0
  GRealThreadPool *real;
688
0
  gboolean result;
689
690
0
  real = (GRealThreadPool*) pool;
691
692
0
  g_return_val_if_fail (real, FALSE);
693
0
  g_return_val_if_fail (real->running, FALSE);
694
695
0
  result = TRUE;
696
697
0
  g_async_queue_lock (real->queue);
698
699
0
  if (g_async_queue_length_unlocked (real->queue) >= 0)
700
0
    {
701
      /* No thread is waiting in the queue */
702
0
      GError *local_error = NULL;
703
704
0
      if (!g_thread_pool_start_thread (real, &local_error))
705
0
        {
706
0
          g_propagate_error (error, local_error);
707
0
          result = FALSE;
708
0
        }
709
0
    }
710
711
0
  g_thread_pool_queue_push_unlocked (real, data);
712
0
  g_async_queue_unlock (real->queue);
713
714
0
  return result;
715
0
}
716
717
/**
718
 * g_thread_pool_set_max_threads:
719
 * @pool: a #GThreadPool
720
 * @max_threads: a new maximal number of threads for @pool,
721
 *     or -1 for unlimited
722
 * @error: return location for error, or %NULL
723
 *
724
 * Sets the maximal allowed number of threads for @pool.
725
 * A value of -1 means that the maximal number of threads
726
 * is unlimited. If @pool is an exclusive thread pool, setting
727
 * the maximal number of threads to -1 is not allowed.
728
 *
729
 * Setting @max_threads to 0 means stopping all work for @pool.
730
 * It is effectively frozen until @max_threads is set to a non-zero
731
 * value again.
732
 *
733
 * A thread is never terminated while calling @func, as supplied by
734
 * g_thread_pool_new(). Instead the maximal number of threads only
735
 * has effect for the allocation of new threads in g_thread_pool_push().
736
 * A new thread is allocated, whenever the number of currently
737
 * running threads in @pool is smaller than the maximal number.
738
 *
739
 * @error can be %NULL to ignore errors, or non-%NULL to report
740
 * errors. An error can only occur when a new thread couldn't be
741
 * created.
742
 *
743
 * Before version 2.32, this function did not return a success status.
744
 *
745
 * Returns: %TRUE on success, %FALSE if an error occurred
746
 */
747
gboolean
748
g_thread_pool_set_max_threads (GThreadPool  *pool,
749
                               gint          max_threads,
750
                               GError      **error)
751
0
{
752
0
  GRealThreadPool *real;
753
0
  gint to_start;
754
0
  gboolean result;
755
756
0
  real = (GRealThreadPool*) pool;
757
758
0
  g_return_val_if_fail (real, FALSE);
759
0
  g_return_val_if_fail (real->running, FALSE);
760
0
  g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
761
0
  g_return_val_if_fail (max_threads >= -1, FALSE);
762
763
0
  result = TRUE;
764
765
0
  g_async_queue_lock (real->queue);
766
767
0
  real->max_threads = max_threads;
768
769
0
  if (pool->exclusive)
770
0
    to_start = real->max_threads - real->num_threads;
771
0
  else
772
0
    to_start = g_async_queue_length_unlocked (real->queue);
773
774
0
  for ( ; to_start > 0; to_start--)
775
0
    {
776
0
      GError *local_error = NULL;
777
778
0
      if (!g_thread_pool_start_thread (real, &local_error))
779
0
        {
780
0
          g_propagate_error (error, local_error);
781
0
          result = FALSE;
782
0
          break;
783
0
        }
784
0
    }
785
786
0
  g_async_queue_unlock (real->queue);
787
788
0
  return result;
789
0
}
790
791
/**
792
 * g_thread_pool_get_max_threads:
793
 * @pool: a #GThreadPool
794
 *
795
 * Returns the maximal number of threads for @pool.
796
 *
797
 * Returns: the maximal number of threads
798
 */
799
gint
800
g_thread_pool_get_max_threads (GThreadPool *pool)
801
0
{
802
0
  GRealThreadPool *real;
803
0
  gint retval;
804
805
0
  real = (GRealThreadPool*) pool;
806
807
0
  g_return_val_if_fail (real, 0);
808
0
  g_return_val_if_fail (real->running, 0);
809
810
0
  g_async_queue_lock (real->queue);
811
0
  retval = real->max_threads;
812
0
  g_async_queue_unlock (real->queue);
813
814
0
  return retval;
815
0
}
816
817
/**
818
 * g_thread_pool_get_num_threads:
819
 * @pool: a #GThreadPool
820
 *
821
 * Returns the number of threads currently running in @pool.
822
 *
823
 * Returns: the number of threads currently running
824
 */
825
guint
826
g_thread_pool_get_num_threads (GThreadPool *pool)
827
0
{
828
0
  GRealThreadPool *real;
829
0
  guint retval;
830
831
0
  real = (GRealThreadPool*) pool;
832
833
0
  g_return_val_if_fail (real, 0);
834
0
  g_return_val_if_fail (real->running, 0);
835
836
0
  g_async_queue_lock (real->queue);
837
0
  retval = real->num_threads;
838
0
  g_async_queue_unlock (real->queue);
839
840
0
  return retval;
841
0
}
842
843
/**
844
 * g_thread_pool_unprocessed:
845
 * @pool: a #GThreadPool
846
 *
847
 * Returns the number of tasks still unprocessed in @pool.
848
 *
849
 * Returns: the number of unprocessed tasks
850
 */
851
guint
852
g_thread_pool_unprocessed (GThreadPool *pool)
853
0
{
854
0
  GRealThreadPool *real;
855
0
  gint unprocessed;
856
857
0
  real = (GRealThreadPool*) pool;
858
859
0
  g_return_val_if_fail (real, 0);
860
0
  g_return_val_if_fail (real->running, 0);
861
862
0
  unprocessed = g_async_queue_length (real->queue);
863
864
0
  return MAX (unprocessed, 0);
865
0
}
866
867
/**
868
 * g_thread_pool_free:
869
 * @pool: a #GThreadPool
870
 * @immediate: should @pool shut down immediately?
871
 * @wait_: should the function wait for all tasks to be finished?
872
 *
873
 * Frees all resources allocated for @pool.
874
 *
875
 * If @immediate is %TRUE, no new task is processed for @pool.
876
 * Otherwise @pool is not freed before the last task is processed.
877
 * Note however, that no thread of this pool is interrupted while
878
 * processing a task. Instead at least all still running threads
879
 * can finish their tasks before the @pool is freed.
880
 *
881
 * If @wait_ is %TRUE, this function does not return before all
882
 * tasks to be processed (dependent on @immediate, whether all
883
 * or only the currently running) are ready.
884
 * Otherwise this function returns immediately.
885
 *
886
 * After calling this function @pool must not be used anymore.
887
 */
888
void
889
g_thread_pool_free (GThreadPool *pool,
890
                    gboolean     immediate,
891
                    gboolean     wait_)
892
15.7k
{
893
15.7k
  GRealThreadPool *real;
894
895
15.7k
  real = (GRealThreadPool*) pool;
896
897
15.7k
  g_return_if_fail (real);
898
15.7k
  g_return_if_fail (real->running);
899
900
  /* If there's no thread allowed here, there is not much sense in
901
   * not stopping this pool immediately, when it's not empty
902
   */
903
15.7k
  g_return_if_fail (immediate ||
904
15.7k
                    real->max_threads != 0 ||
905
15.7k
                    g_async_queue_length (real->queue) == 0);
906
907
15.7k
  g_async_queue_lock (real->queue);
908
909
15.7k
  real->running = FALSE;
910
15.7k
  real->immediate = immediate;
911
15.7k
  real->waiting = wait_;
912
913
15.7k
  if (wait_)
914
15.7k
    {
915
15.7k
      while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads &&
916
15.7k
             !(immediate && real->num_threads == 0))
917
0
        g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue));
918
15.7k
    }
919
920
15.7k
  if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads)
921
15.7k
    {
922
      /* No thread is currently doing something (and nothing is left
923
       * to process in the queue)
924
       */
925
15.7k
      if (real->num_threads == 0)
926
7.87k
        {
927
          /* No threads left, we clean up */
928
7.87k
          g_async_queue_unlock (real->queue);
929
7.87k
          g_thread_pool_free_internal (real);
930
7.87k
          return;
931
7.87k
        }
932
933
7.87k
      g_thread_pool_wakeup_and_stop_all (real);
934
7.87k
    }
935
936
  /* The last thread should cleanup the pool */
937
7.87k
  real->waiting = FALSE;
938
7.87k
  g_async_queue_unlock (real->queue);
939
7.87k
}
940
941
static void
942
g_thread_pool_free_internal (GRealThreadPool* pool)
943
15.7k
{
944
15.7k
  g_return_if_fail (pool);
945
15.7k
  g_return_if_fail (pool->running == FALSE);
946
15.7k
  g_return_if_fail (pool->num_threads == 0);
947
948
  /* Ensure the dummy item pushed on by g_thread_pool_wakeup_and_stop_all() is
949
   * removed, before it’s potentially passed to the user-provided
950
   * @item_free_func. */
951
15.7k
  g_async_queue_remove (pool->queue, GUINT_TO_POINTER (1));
952
953
15.7k
  g_async_queue_unref (pool->queue);
954
15.7k
  g_cond_clear (&pool->cond);
955
956
15.7k
  g_free (pool);
957
15.7k
}
958
959
static void
960
g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
961
7.87k
{
962
7.87k
  guint i;
963
964
7.87k
  g_return_if_fail (pool);
965
7.87k
  g_return_if_fail (pool->running == FALSE);
966
7.87k
  g_return_if_fail (pool->num_threads != 0);
967
968
7.87k
  pool->immediate = TRUE;
969
970
  /*
971
   * So here we're sending bogus data to the pool threads, which
972
   * should cause them each to wake up, and check the above
973
   * pool->immediate condition. However we don't want that
974
   * data to be sorted (since it'll crash the sorter).
975
   */
976
15.7k
  for (i = 0; i < pool->num_threads; i++)
977
7.87k
    g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
978
7.87k
}
979
980
/**
981
 * g_thread_pool_set_max_unused_threads:
982
 * @max_threads: maximal number of unused threads
983
 *
984
 * Sets the maximal number of unused threads to @max_threads.
985
 * If @max_threads is -1, no limit is imposed on the number
986
 * of unused threads.
987
 *
988
 * The default value is 2.
989
 */
990
void
991
g_thread_pool_set_max_unused_threads (gint max_threads)
992
0
{
993
0
  g_return_if_fail (max_threads >= -1);
994
995
0
  g_atomic_int_set (&max_unused_threads, max_threads);
996
997
0
  if (max_threads != -1)
998
0
    {
999
0
      max_threads -= g_atomic_int_get (&unused_threads);
1000
0
      if (max_threads < 0)
1001
0
        {
1002
0
          g_atomic_int_set (&kill_unused_threads, -max_threads);
1003
0
          g_atomic_int_inc (&wakeup_thread_serial);
1004
1005
0
          g_async_queue_lock (unused_thread_queue);
1006
1007
0
          do
1008
0
            {
1009
0
              g_async_queue_push_unlocked (unused_thread_queue,
1010
0
                                           wakeup_thread_marker);
1011
0
            }
1012
0
          while (++max_threads);
1013
1014
0
          g_async_queue_unlock (unused_thread_queue);
1015
0
        }
1016
0
    }
1017
0
}
1018
1019
/**
1020
 * g_thread_pool_get_max_unused_threads:
1021
 *
1022
 * Returns the maximal allowed number of unused threads.
1023
 *
1024
 * Returns: the maximal number of unused threads
1025
 */
1026
gint
1027
g_thread_pool_get_max_unused_threads (void)
1028
0
{
1029
0
  return g_atomic_int_get (&max_unused_threads);
1030
0
}
1031
1032
/**
1033
 * g_thread_pool_get_num_unused_threads:
1034
 *
1035
 * Returns the number of currently unused threads.
1036
 *
1037
 * Returns: the number of currently unused threads
1038
 */
1039
guint
1040
g_thread_pool_get_num_unused_threads (void)
1041
0
{
1042
0
  return (guint) g_atomic_int_get (&unused_threads);
1043
0
}
1044
1045
/**
1046
 * g_thread_pool_stop_unused_threads:
1047
 *
1048
 * Stops all currently unused threads. This does not change the
1049
 * maximal number of unused threads. This function can be used to
1050
 * regularly stop all unused threads e.g. from g_timeout_add().
1051
 */
1052
void
1053
g_thread_pool_stop_unused_threads (void)
1054
0
{
1055
0
  guint oldval;
1056
1057
0
  oldval = g_thread_pool_get_max_unused_threads ();
1058
1059
0
  g_thread_pool_set_max_unused_threads (0);
1060
0
  g_thread_pool_set_max_unused_threads (oldval);
1061
0
}
1062
1063
/**
1064
 * g_thread_pool_set_sort_function:
1065
 * @pool: a #GThreadPool
1066
 * @func: the #GCompareDataFunc used to sort the list of tasks.
1067
 *     This function is passed two tasks. It should return
1068
 *     0 if the order in which they are handled does not matter,
1069
 *     a negative value if the first task should be processed before
1070
 *     the second or a positive value if the second task should be
1071
 *     processed first.
1072
 * @user_data: user data passed to @func
1073
 *
1074
 * Sets the function used to sort the list of tasks. This allows the
1075
 * tasks to be processed by a priority determined by @func, and not
1076
 * just in the order in which they were added to the pool.
1077
 *
1078
 * Note, if the maximum number of threads is more than 1, the order
1079
 * that threads are executed cannot be guaranteed 100%. Threads are
1080
 * scheduled by the operating system and are executed at random. It
1081
 * cannot be assumed that threads are executed in the order they are
1082
 * created.
1083
 *
1084
 * Since: 2.10
1085
 */
1086
void
1087
g_thread_pool_set_sort_function (GThreadPool      *pool,
1088
                                 GCompareDataFunc  func,
1089
                                 gpointer          user_data)
1090
0
{
1091
0
  GRealThreadPool *real;
1092
1093
0
  real = (GRealThreadPool*) pool;
1094
1095
0
  g_return_if_fail (real);
1096
0
  g_return_if_fail (real->running);
1097
1098
0
  g_async_queue_lock (real->queue);
1099
1100
0
  real->sort_func = func;
1101
0
  real->sort_user_data = user_data;
1102
1103
0
  if (func)
1104
0
    g_async_queue_sort_unlocked (real->queue,
1105
0
                                 real->sort_func,
1106
0
                                 real->sort_user_data);
1107
1108
0
  g_async_queue_unlock (real->queue);
1109
0
}
1110
1111
/**
1112
 * g_thread_pool_move_to_front:
1113
 * @pool: a #GThreadPool
1114
 * @data: an unprocessed item in the pool
1115
 *
1116
 * Moves the item to the front of the queue of unprocessed
1117
 * items, so that it will be processed next.
1118
 *
1119
 * Returns: %TRUE if the item was found and moved
1120
 *
1121
 * Since: 2.46
1122
 */
1123
gboolean
1124
g_thread_pool_move_to_front (GThreadPool *pool,
1125
                             gpointer     data)
1126
0
{
1127
0
  GRealThreadPool *real = (GRealThreadPool*) pool;
1128
0
  gboolean found;
1129
1130
0
  g_async_queue_lock (real->queue);
1131
1132
0
  found = g_async_queue_remove_unlocked (real->queue, data);
1133
0
  if (found)
1134
0
    g_async_queue_push_front_unlocked (real->queue, data);
1135
1136
0
  g_async_queue_unlock (real->queue);
1137
1138
0
  return found;
1139
0
}
1140
1141
/**
1142
 * g_thread_pool_set_max_idle_time:
1143
 * @interval: the maximum @interval (in milliseconds)
1144
 *     a thread can be idle
1145
 *
1146
 * This function will set the maximum @interval that a thread
1147
 * waiting in the pool for new tasks can be idle for before
1148
 * being stopped. This function is similar to calling
1149
 * g_thread_pool_stop_unused_threads() on a regular timeout,
1150
 * except this is done on a per thread basis.
1151
 *
1152
 * By setting @interval to 0, idle threads will not be stopped.
1153
 *
1154
 * The default value is 15000 (15 seconds).
1155
 *
1156
 * Since: 2.10
1157
 */
1158
void
1159
g_thread_pool_set_max_idle_time (guint interval)
1160
0
{
1161
0
  guint i;
1162
1163
0
  g_atomic_int_set (&max_idle_time, interval);
1164
1165
0
  i = (guint) g_atomic_int_get (&unused_threads);
1166
0
  if (i > 0)
1167
0
    {
1168
0
      g_atomic_int_inc (&wakeup_thread_serial);
1169
0
      g_async_queue_lock (unused_thread_queue);
1170
1171
0
      do
1172
0
        {
1173
0
          g_async_queue_push_unlocked (unused_thread_queue,
1174
0
                                       wakeup_thread_marker);
1175
0
        }
1176
0
      while (--i);
1177
1178
0
      g_async_queue_unlock (unused_thread_queue);
1179
0
    }
1180
0
}
1181
1182
/**
1183
 * g_thread_pool_get_max_idle_time:
1184
 *
1185
 * This function will return the maximum @interval that a
1186
 * thread will wait in the thread pool for new tasks before
1187
 * being stopped.
1188
 *
1189
 * If this function returns 0, threads waiting in the thread
1190
 * pool for new work are not stopped.
1191
 *
1192
 * Returns: the maximum @interval (milliseconds) to wait
1193
 *     for new tasks in the thread pool before stopping the
1194
 *     thread
1195
 *
1196
 * Since: 2.10
1197
 */
1198
guint
1199
g_thread_pool_get_max_idle_time (void)
1200
0
{
1201
0
  return (guint) g_atomic_int_get (&max_idle_time);
1202
0
}