Coverage Report

Created: 2025-06-22 06:29

/src/glib/glib/gthreadpool.c
Line
Count
Source (jump to first uncovered line)
1
/* GLIB - Library of useful routines for C programming
2
 * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3
 *
4
 * GThreadPool: thread pool implementation.
5
 * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6
 *
7
 * This library is free software; you can redistribute it and/or
8
 * modify it under the terms of the GNU Lesser General Public
9
 * License as published by the Free Software Foundation; either
10
 * version 2.1 of the License, or (at your option) any later version.
11
 *
12
 * This library is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
 * Lesser General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU Lesser General Public
18
 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
19
 */
20
21
/*
22
 * MT safe
23
 */
24
25
#include "config.h"
26
27
#include "gthreadpool.h"
28
29
#include "gasyncqueue.h"
30
#include "gasyncqueueprivate.h"
31
#include "gmain.h"
32
#include "gtestutils.h"
33
#include "gthreadprivate.h"
34
#include "gtimer.h"
35
#include "gutils.h"
36
37
/**
38
 * SECTION:thread_pools
39
 * @title: Thread Pools
40
 * @short_description: pools of threads to execute work concurrently
41
 * @see_also: #GThread
42
 *
43
 * Sometimes you wish to asynchronously fork out the execution of work
44
 * and continue working in your own thread. If that will happen often,
45
 * the overhead of starting and destroying a thread each time might be
46
 * too high. In such cases reusing already started threads seems like a
47
 * good idea. And it indeed is, but implementing this can be tedious
48
 * and error-prone.
49
 *
50
 * Therefore GLib provides thread pools for your convenience. An added
51
 * advantage is, that the threads can be shared between the different
52
 * subsystems of your program, when they are using GLib.
53
 *
54
 * To create a new thread pool, you use g_thread_pool_new().
55
 * It is destroyed by g_thread_pool_free().
56
 *
57
 * If you want to execute a certain task within a thread pool,
58
 * you call g_thread_pool_push().
59
 *
60
 * To get the current number of running threads you call
61
 * g_thread_pool_get_num_threads(). To get the number of still
62
 * unprocessed tasks you call g_thread_pool_unprocessed(). To control
63
 * the maximal number of threads for a thread pool, you use
64
 * g_thread_pool_get_max_threads() and g_thread_pool_set_max_threads().
65
 *
66
 * Finally you can control the number of unused threads, that are kept
67
 * alive by GLib for future use. The current number can be fetched with
68
 * g_thread_pool_get_num_unused_threads(). The maximal number can be
69
 * controlled by g_thread_pool_get_max_unused_threads() and
70
 * g_thread_pool_set_max_unused_threads(). All currently unused threads
71
 * can be stopped by calling g_thread_pool_stop_unused_threads().
72
 */
73
74
#define DEBUG_MSG(x)
75
/* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
76
77
typedef struct _GRealThreadPool GRealThreadPool;
78
79
/**
80
 * GThreadPool:
81
 * @func: the function to execute in the threads of this pool
82
 * @user_data: the user data for the threads of this pool
83
 * @exclusive: are all threads exclusive to this pool
84
 *
85
 * The #GThreadPool struct represents a thread pool. It has three
86
 * public read-only members, but the underlying struct is bigger,
87
 * so you must not copy this struct.
88
 */
89
struct _GRealThreadPool
90
{
91
  GThreadPool pool;
92
  GAsyncQueue *queue;
93
  GCond cond;
94
  gint max_threads;
95
  guint num_threads;
96
  gboolean running;
97
  gboolean immediate;
98
  gboolean waiting;
99
  GCompareDataFunc sort_func;
100
  gpointer sort_user_data;
101
};
102
103
/* The following is just an address to mark the wakeup order for a
104
 * thread, it could be any address (as long, as it isn't a valid
105
 * GThreadPool address)
106
 */
107
static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
108
static gint wakeup_thread_serial = 0;
109
110
/* Here all unused threads are waiting  */
111
static GAsyncQueue *unused_thread_queue = NULL;
112
static gint unused_threads = 0;
113
static gint max_unused_threads = 2;
114
static gint kill_unused_threads = 0;
115
static guint max_idle_time = 15 * 1000;
116
117
static GThreadSchedulerSettings shared_thread_scheduler_settings;
118
static gboolean have_shared_thread_scheduler_settings = FALSE;
119
120
typedef struct
121
{
122
  /* Either thread or error are set in the end. Both transfer-full. */
123
  GThreadPool *pool;
124
  GThread *thread;
125
  GError *error;
126
} SpawnThreadData;
127
128
static GCond spawn_thread_cond;
129
static GAsyncQueue *spawn_thread_queue;
130
131
static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
132
                                                           gpointer          data);
133
static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
134
static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
135
static gboolean         g_thread_pool_start_thread        (GRealThreadPool  *pool,
136
                                                           GError          **error);
137
static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
138
static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
139
static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
140
141
static void
142
g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
143
                                   gpointer         data)
144
0
{
145
0
  if (pool->sort_func)
146
0
    g_async_queue_push_sorted_unlocked (pool->queue,
147
0
                                        data,
148
0
                                        pool->sort_func,
149
0
                                        pool->sort_user_data);
150
0
  else
151
0
    g_async_queue_push_unlocked (pool->queue, data);
152
0
}
153
154
static GRealThreadPool*
155
g_thread_pool_wait_for_new_pool (void)
156
0
{
157
0
  GRealThreadPool *pool;
158
0
  gint local_wakeup_thread_serial;
159
0
  guint local_max_unused_threads;
160
0
  gint local_max_idle_time;
161
0
  gint last_wakeup_thread_serial;
162
0
  gboolean have_relayed_thread_marker = FALSE;
163
164
0
  local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
165
0
  local_max_idle_time = g_atomic_int_get (&max_idle_time);
166
0
  last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
167
168
0
  g_atomic_int_inc (&unused_threads);
169
170
0
  do
171
0
    {
172
0
      if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
173
0
        {
174
          /* If this is a superfluous thread, stop it. */
175
0
          pool = NULL;
176
0
        }
177
0
      else if (local_max_idle_time > 0)
178
0
        {
179
          /* If a maximal idle time is given, wait for the given time. */
180
0
          DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
181
0
                      g_thread_self (), local_max_idle_time / 1000.0));
182
183
0
          pool = g_async_queue_timeout_pop (unused_thread_queue,
184
0
              local_max_idle_time * 1000);
185
0
        }
186
0
      else
187
0
        {
188
          /* If no maximal idle time is given, wait indefinitely. */
189
0
          DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
190
0
          pool = g_async_queue_pop (unused_thread_queue);
191
0
        }
192
193
0
      if (pool == wakeup_thread_marker)
194
0
        {
195
0
          local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
196
0
          if (last_wakeup_thread_serial == local_wakeup_thread_serial)
197
0
            {
198
0
              if (!have_relayed_thread_marker)
199
0
              {
200
                /* If this wakeup marker has been received for
201
                 * the second time, relay it.
202
                 */
203
0
                DEBUG_MSG (("thread %p relaying wakeup message to "
204
0
                            "waiting thread with lower serial.",
205
0
                            g_thread_self ()));
206
207
0
                g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
208
0
                have_relayed_thread_marker = TRUE;
209
210
                /* If a wakeup marker has been relayed, this thread
211
                 * will get out of the way for 100 microseconds to
212
                 * avoid receiving this marker again.
213
                 */
214
0
                g_usleep (100);
215
0
              }
216
0
            }
217
0
          else
218
0
            {
219
0
              if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
220
0
                {
221
0
                  pool = NULL;
222
0
                  break;
223
0
                }
224
225
0
              DEBUG_MSG (("thread %p updating to new limits.",
226
0
                          g_thread_self ()));
227
228
0
              local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
229
0
              local_max_idle_time = g_atomic_int_get (&max_idle_time);
230
0
              last_wakeup_thread_serial = local_wakeup_thread_serial;
231
232
0
              have_relayed_thread_marker = FALSE;
233
0
            }
234
0
        }
235
0
    }
236
0
  while (pool == wakeup_thread_marker);
237
238
0
  g_atomic_int_add (&unused_threads, -1);
239
240
0
  return pool;
241
0
}
242
243
static gpointer
244
g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
245
0
{
246
0
  gpointer task = NULL;
247
248
0
  if (pool->running || (!pool->immediate &&
249
0
                        g_async_queue_length_unlocked (pool->queue) > 0))
250
0
    {
251
      /* This thread pool is still active. */
252
0
      if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads)
253
0
        {
254
          /* This is a superfluous thread, so it goes to the global pool. */
255
0
          DEBUG_MSG (("superfluous thread %p in pool %p.",
256
0
                      g_thread_self (), pool));
257
0
        }
258
0
      else if (pool->pool.exclusive)
259
0
        {
260
          /* Exclusive threads stay attached to the pool. */
261
0
          task = g_async_queue_pop_unlocked (pool->queue);
262
263
0
          DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
264
0
                      "(%d running, %d unprocessed).",
265
0
                      g_thread_self (), pool, pool->num_threads,
266
0
                      g_async_queue_length_unlocked (pool->queue)));
267
0
        }
268
0
      else
269
0
        {
270
          /* A thread will wait for new tasks for at most 1/2
271
           * second before going to the global pool.
272
           */
273
0
          DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
274
0
                      "(%d running, %d unprocessed).",
275
0
                      g_thread_self (), pool, pool->num_threads,
276
0
                      g_async_queue_length_unlocked (pool->queue)));
277
278
0
          task = g_async_queue_timeout_pop_unlocked (pool->queue,
279
0
                 G_USEC_PER_SEC / 2);
280
0
        }
281
0
    }
282
0
  else
283
0
    {
284
      /* This thread pool is inactive, it will no longer process tasks. */
285
0
      DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
286
0
                  "(running: %s, immediate: %s, len: %d).",
287
0
                  pool, g_thread_self (),
288
0
                  pool->running ? "true" : "false",
289
0
                  pool->immediate ? "true" : "false",
290
0
                  g_async_queue_length_unlocked (pool->queue)));
291
0
    }
292
293
0
  return task;
294
0
}
295
296
static gpointer
297
g_thread_pool_spawn_thread (gpointer data)
298
0
{
299
0
  while (TRUE)
300
0
    {
301
0
      SpawnThreadData *spawn_thread_data;
302
0
      GThread *thread = NULL;
303
0
      GError *error = NULL;
304
0
      const gchar *prgname = g_get_prgname ();
305
0
      gchar name[16] = "pool";
306
307
0
      if (prgname)
308
0
        g_snprintf (name, sizeof (name), "pool-%s", prgname);
309
310
0
      g_async_queue_lock (spawn_thread_queue);
311
      /* Spawn a new thread for the given pool and wake the requesting thread
312
       * up again with the result. This new thread will have the scheduler
313
       * settings inherited from this thread and in extension of the thread
314
       * that created the first non-exclusive thread-pool. */
315
0
      spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue);
316
0
      thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error);
317
318
0
      spawn_thread_data->thread = g_steal_pointer (&thread);
319
0
      spawn_thread_data->error = g_steal_pointer (&error);
320
321
0
      g_cond_broadcast (&spawn_thread_cond);
322
0
      g_async_queue_unlock (spawn_thread_queue);
323
0
    }
324
325
0
  return NULL;
326
0
}
327
328
static gpointer
329
g_thread_pool_thread_proxy (gpointer data)
330
0
{
331
0
  GRealThreadPool *pool;
332
333
0
  pool = data;
334
335
0
  DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
336
337
0
  g_async_queue_lock (pool->queue);
338
339
0
  while (TRUE)
340
0
    {
341
0
      gpointer task;
342
343
0
      task = g_thread_pool_wait_for_new_task (pool);
344
0
      if (task)
345
0
        {
346
0
          if (pool->running || !pool->immediate)
347
0
            {
348
              /* A task was received and the thread pool is active,
349
               * so execute the function.
350
               */
351
0
              g_async_queue_unlock (pool->queue);
352
0
              DEBUG_MSG (("thread %p in pool %p calling func.",
353
0
                          g_thread_self (), pool));
354
0
              pool->pool.func (task, pool->pool.user_data);
355
0
              g_async_queue_lock (pool->queue);
356
0
            }
357
0
        }
358
0
      else
359
0
        {
360
          /* No task was received, so this thread goes to the global pool. */
361
0
          gboolean free_pool = FALSE;
362
363
0
          DEBUG_MSG (("thread %p leaving pool %p for global pool.",
364
0
                      g_thread_self (), pool));
365
0
          pool->num_threads--;
366
367
0
          if (!pool->running)
368
0
            {
369
0
              if (!pool->waiting)
370
0
                {
371
0
                  if (pool->num_threads == 0)
372
0
                    {
373
                      /* If the pool is not running and no other
374
                       * thread is waiting for this thread pool to
375
                       * finish and this is the last thread of this
376
                       * pool, free the pool.
377
                       */
378
0
                      free_pool = TRUE;
379
0
                    }
380
0
                  else
381
0
                    {
382
                      /* If the pool is not running and no other
383
                       * thread is waiting for this thread pool to
384
                       * finish and this is not the last thread of
385
                       * this pool and there are no tasks left in the
386
                       * queue, wakeup the remaining threads.
387
                       */
388
0
                      if (g_async_queue_length_unlocked (pool->queue) ==
389
0
                          (gint) -pool->num_threads)
390
0
                        g_thread_pool_wakeup_and_stop_all (pool);
391
0
                    }
392
0
                }
393
0
              else if (pool->immediate ||
394
0
                       g_async_queue_length_unlocked (pool->queue) <= 0)
395
0
                {
396
                  /* If the pool is not running and another thread is
397
                   * waiting for this thread pool to finish and there
398
                   * are either no tasks left or the pool shall stop
399
                   * immediately, inform the waiting thread of a change
400
                   * of the thread pool state.
401
                   */
402
0
                  g_cond_broadcast (&pool->cond);
403
0
                }
404
0
            }
405
406
0
          g_async_queue_unlock (pool->queue);
407
408
0
          if (free_pool)
409
0
            g_thread_pool_free_internal (pool);
410
411
0
          if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL)
412
0
            break;
413
414
0
          g_async_queue_lock (pool->queue);
415
416
0
          DEBUG_MSG (("thread %p entering pool %p from global pool.",
417
0
                      g_thread_self (), pool));
418
419
          /* pool->num_threads++ is not done here, but in
420
           * g_thread_pool_start_thread to make the new started
421
           * thread known to the pool before itself can do it.
422
           */
423
0
        }
424
0
    }
425
426
0
  return NULL;
427
0
}
428
429
static gboolean
430
g_thread_pool_start_thread (GRealThreadPool  *pool,
431
                            GError          **error)
432
0
{
433
0
  gboolean success = FALSE;
434
435
0
  if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads)
436
    /* Enough threads are already running */
437
0
    return TRUE;
438
439
0
  g_async_queue_lock (unused_thread_queue);
440
441
0
  if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
442
0
    {
443
0
      g_async_queue_push_unlocked (unused_thread_queue, pool);
444
0
      success = TRUE;
445
0
    }
446
447
0
  g_async_queue_unlock (unused_thread_queue);
448
449
0
  if (!success)
450
0
    {
451
0
      const gchar *prgname = g_get_prgname ();
452
0
      gchar name[16] = "pool";
453
0
      GThread *thread;
454
455
0
      if (prgname)
456
0
        g_snprintf (name, sizeof (name), "pool-%s", prgname);
457
458
      /* No thread was found, we have to start a new one */
459
0
      if (pool->pool.exclusive)
460
0
        {
461
          /* For exclusive thread-pools this is directly called from new() and
462
           * we simply start new threads that inherit the scheduler settings
463
           * from the current thread.
464
           */
465
0
          thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
466
0
        }
467
0
      else
468
0
        {
469
          /* For non-exclusive thread-pools this can be called at any time
470
           * when a new thread is needed. We make sure to create a new thread
471
           * here with the correct scheduler settings: either by directly
472
           * providing them if supported by the GThread implementation or by
473
           * going via our helper thread.
474
           */
475
0
          if (have_shared_thread_scheduler_settings)
476
0
            {
477
0
              thread = g_thread_new_internal (name, g_thread_proxy, g_thread_pool_thread_proxy, pool, 0, &shared_thread_scheduler_settings, error);
478
0
            }
479
0
          else
480
0
            {
481
0
              SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL };
482
483
0
              g_async_queue_lock (spawn_thread_queue);
484
485
0
              g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data);
486
487
0
              while (!spawn_thread_data.thread && !spawn_thread_data.error)
488
0
                g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue));
489
490
0
              thread = spawn_thread_data.thread;
491
0
              if (!thread)
492
0
                g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error));
493
0
              g_async_queue_unlock (spawn_thread_queue);
494
0
            }
495
0
        }
496
497
0
      if (thread == NULL)
498
0
        return FALSE;
499
500
0
      g_thread_unref (thread);
501
0
    }
502
503
  /* See comment in g_thread_pool_thread_proxy as to why this is done
504
   * here and not there
505
   */
506
0
  pool->num_threads++;
507
508
0
  return TRUE;
509
0
}
510
511
/**
512
 * g_thread_pool_new:
513
 * @func: a function to execute in the threads of the new thread pool
514
 * @user_data: user data that is handed over to @func every time it
515
 *     is called
516
 * @max_threads: the maximal number of threads to execute concurrently
517
 *     in  the new thread pool, -1 means no limit
518
 * @exclusive: should this thread pool be exclusive?
519
 * @error: return location for error, or %NULL
520
 *
521
 * This function creates a new thread pool.
522
 *
523
 * Whenever you call g_thread_pool_push(), either a new thread is
524
 * created or an unused one is reused. At most @max_threads threads
525
 * are running concurrently for this thread pool. @max_threads = -1
526
 * allows unlimited threads to be created for this thread pool. The
527
 * newly created or reused thread now executes the function @func
528
 * with the two arguments. The first one is the parameter to
529
 * g_thread_pool_push() and the second one is @user_data.
530
 *
531
 * Pass g_get_num_processors() to @max_threads to create as many threads as
532
 * there are logical processors on the system. This will not pin each thread to
533
 * a specific processor.
534
 *
535
 * The parameter @exclusive determines whether the thread pool owns
536
 * all threads exclusive or shares them with other thread pools.
537
 * If @exclusive is %TRUE, @max_threads threads are started
538
 * immediately and they will run exclusively for this thread pool
539
 * until it is destroyed by g_thread_pool_free(). If @exclusive is
540
 * %FALSE, threads are created when needed and shared between all
541
 * non-exclusive thread pools. This implies that @max_threads may
542
 * not be -1 for exclusive thread pools. Besides, exclusive thread
543
 * pools are not affected by g_thread_pool_set_max_idle_time()
544
 * since their threads are never considered idle and returned to the
545
 * global pool.
546
 *
547
 * @error can be %NULL to ignore errors, or non-%NULL to report
548
 * errors. An error can only occur when @exclusive is set to %TRUE
549
 * and not all @max_threads threads could be created.
550
 * See #GThreadError for possible errors that may occur.
551
 * Note, even in case of error a valid #GThreadPool is returned.
552
 *
553
 * Returns: the new #GThreadPool
554
 */
555
GThreadPool *
556
g_thread_pool_new (GFunc      func,
557
                   gpointer   user_data,
558
                   gint       max_threads,
559
                   gboolean   exclusive,
560
                   GError   **error)
561
0
{
562
0
  GRealThreadPool *retval;
563
0
  G_LOCK_DEFINE_STATIC (init);
564
565
0
  g_return_val_if_fail (func, NULL);
566
0
  g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
567
0
  g_return_val_if_fail (max_threads >= -1, NULL);
568
569
0
  retval = g_new (GRealThreadPool, 1);
570
571
0
  retval->pool.func = func;
572
0
  retval->pool.user_data = user_data;
573
0
  retval->pool.exclusive = exclusive;
574
0
  retval->queue = g_async_queue_new ();
575
0
  g_cond_init (&retval->cond);
576
0
  retval->max_threads = max_threads;
577
0
  retval->num_threads = 0;
578
0
  retval->running = TRUE;
579
0
  retval->immediate = FALSE;
580
0
  retval->waiting = FALSE;
581
0
  retval->sort_func = NULL;
582
0
  retval->sort_user_data = NULL;
583
584
0
  G_LOCK (init);
585
0
  if (!unused_thread_queue)
586
0
      unused_thread_queue = g_async_queue_new ();
587
588
  /* For the very first non-exclusive thread-pool we remember the thread
589
   * scheduler settings of the thread creating the pool, if supported by
590
   * the GThread implementation. This is then used for making sure that
591
   * all threads created on the non-exclusive thread-pool have the same
592
   * scheduler settings, and more importantly don't just inherit them
593
   * from the thread that just happened to push a new task and caused
594
   * a new thread to be created.
595
   *
596
   * Not doing so could cause real-time priority threads or otherwise
597
   * threads with problematic scheduler settings to be part of the
598
   * non-exclusive thread-pools.
599
   *
600
   * If this is not supported by the GThread implementation then we here
601
   * start a thread that will inherit the scheduler settings from this
602
   * very thread and whose only purpose is to spawn new threads with the
603
   * same settings for use by the non-exclusive thread-pools.
604
   *
605
   *
606
   * For non-exclusive thread-pools this is not required as all threads
607
   * are created immediately below and are running forever, so they will
608
   * automatically inherit the scheduler settings from this very thread.
609
   */
610
0
  if (!exclusive && !have_shared_thread_scheduler_settings && !spawn_thread_queue)
611
0
    {
612
0
      if (g_thread_get_scheduler_settings (&shared_thread_scheduler_settings))
613
0
        {
614
0
          have_shared_thread_scheduler_settings = TRUE;
615
0
        }
616
0
      else
617
0
        {
618
0
          spawn_thread_queue = g_async_queue_new ();
619
0
          g_cond_init (&spawn_thread_cond);
620
0
          g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL);
621
0
        }
622
0
    }
623
0
  G_UNLOCK (init);
624
625
0
  if (retval->pool.exclusive)
626
0
    {
627
0
      g_async_queue_lock (retval->queue);
628
629
0
      while (retval->num_threads < (guint) retval->max_threads)
630
0
        {
631
0
          GError *local_error = NULL;
632
633
0
          if (!g_thread_pool_start_thread (retval, &local_error))
634
0
            {
635
0
              g_propagate_error (error, local_error);
636
0
              break;
637
0
            }
638
0
        }
639
640
0
      g_async_queue_unlock (retval->queue);
641
0
    }
642
643
0
  return (GThreadPool*) retval;
644
0
}
645
646
/**
647
 * g_thread_pool_push:
648
 * @pool: a #GThreadPool
649
 * @data: a new task for @pool
650
 * @error: return location for error, or %NULL
651
 *
652
 * Inserts @data into the list of tasks to be executed by @pool.
653
 *
654
 * When the number of currently running threads is lower than the
655
 * maximal allowed number of threads, a new thread is started (or
656
 * reused) with the properties given to g_thread_pool_new().
657
 * Otherwise, @data stays in the queue until a thread in this pool
658
 * finishes its previous task and processes @data.
659
 *
660
 * @error can be %NULL to ignore errors, or non-%NULL to report
661
 * errors. An error can only occur when a new thread couldn't be
662
 * created. In that case @data is simply appended to the queue of
663
 * work to do.
664
 *
665
 * Before version 2.32, this function did not return a success status.
666
 *
667
 * Returns: %TRUE on success, %FALSE if an error occurred
668
 */
669
gboolean
670
g_thread_pool_push (GThreadPool  *pool,
671
                    gpointer      data,
672
                    GError      **error)
673
0
{
674
0
  GRealThreadPool *real;
675
0
  gboolean result;
676
677
0
  real = (GRealThreadPool*) pool;
678
679
0
  g_return_val_if_fail (real, FALSE);
680
0
  g_return_val_if_fail (real->running, FALSE);
681
682
0
  result = TRUE;
683
684
0
  g_async_queue_lock (real->queue);
685
686
0
  if (g_async_queue_length_unlocked (real->queue) >= 0)
687
0
    {
688
      /* No thread is waiting in the queue */
689
0
      GError *local_error = NULL;
690
691
0
      if (!g_thread_pool_start_thread (real, &local_error))
692
0
        {
693
0
          g_propagate_error (error, local_error);
694
0
          result = FALSE;
695
0
        }
696
0
    }
697
698
0
  g_thread_pool_queue_push_unlocked (real, data);
699
0
  g_async_queue_unlock (real->queue);
700
701
0
  return result;
702
0
}
703
704
/**
705
 * g_thread_pool_set_max_threads:
706
 * @pool: a #GThreadPool
707
 * @max_threads: a new maximal number of threads for @pool,
708
 *     or -1 for unlimited
709
 * @error: return location for error, or %NULL
710
 *
711
 * Sets the maximal allowed number of threads for @pool.
712
 * A value of -1 means that the maximal number of threads
713
 * is unlimited. If @pool is an exclusive thread pool, setting
714
 * the maximal number of threads to -1 is not allowed.
715
 *
716
 * Setting @max_threads to 0 means stopping all work for @pool.
717
 * It is effectively frozen until @max_threads is set to a non-zero
718
 * value again.
719
 *
720
 * A thread is never terminated while calling @func, as supplied by
721
 * g_thread_pool_new(). Instead the maximal number of threads only
722
 * has effect for the allocation of new threads in g_thread_pool_push().
723
 * A new thread is allocated, whenever the number of currently
724
 * running threads in @pool is smaller than the maximal number.
725
 *
726
 * @error can be %NULL to ignore errors, or non-%NULL to report
727
 * errors. An error can only occur when a new thread couldn't be
728
 * created.
729
 *
730
 * Before version 2.32, this function did not return a success status.
731
 *
732
 * Returns: %TRUE on success, %FALSE if an error occurred
733
 */
734
gboolean
735
g_thread_pool_set_max_threads (GThreadPool  *pool,
736
                               gint          max_threads,
737
                               GError      **error)
738
0
{
739
0
  GRealThreadPool *real;
740
0
  gint to_start;
741
0
  gboolean result;
742
743
0
  real = (GRealThreadPool*) pool;
744
745
0
  g_return_val_if_fail (real, FALSE);
746
0
  g_return_val_if_fail (real->running, FALSE);
747
0
  g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
748
0
  g_return_val_if_fail (max_threads >= -1, FALSE);
749
750
0
  result = TRUE;
751
752
0
  g_async_queue_lock (real->queue);
753
754
0
  real->max_threads = max_threads;
755
756
0
  if (pool->exclusive)
757
0
    to_start = real->max_threads - real->num_threads;
758
0
  else
759
0
    to_start = g_async_queue_length_unlocked (real->queue);
760
761
0
  for ( ; to_start > 0; to_start--)
762
0
    {
763
0
      GError *local_error = NULL;
764
765
0
      if (!g_thread_pool_start_thread (real, &local_error))
766
0
        {
767
0
          g_propagate_error (error, local_error);
768
0
          result = FALSE;
769
0
          break;
770
0
        }
771
0
    }
772
773
0
  g_async_queue_unlock (real->queue);
774
775
0
  return result;
776
0
}
777
778
/**
779
 * g_thread_pool_get_max_threads:
780
 * @pool: a #GThreadPool
781
 *
782
 * Returns the maximal number of threads for @pool.
783
 *
784
 * Returns: the maximal number of threads
785
 */
786
gint
787
g_thread_pool_get_max_threads (GThreadPool *pool)
788
0
{
789
0
  GRealThreadPool *real;
790
0
  gint retval;
791
792
0
  real = (GRealThreadPool*) pool;
793
794
0
  g_return_val_if_fail (real, 0);
795
0
  g_return_val_if_fail (real->running, 0);
796
797
0
  g_async_queue_lock (real->queue);
798
0
  retval = real->max_threads;
799
0
  g_async_queue_unlock (real->queue);
800
801
0
  return retval;
802
0
}
803
804
/**
805
 * g_thread_pool_get_num_threads:
806
 * @pool: a #GThreadPool
807
 *
808
 * Returns the number of threads currently running in @pool.
809
 *
810
 * Returns: the number of threads currently running
811
 */
812
guint
813
g_thread_pool_get_num_threads (GThreadPool *pool)
814
0
{
815
0
  GRealThreadPool *real;
816
0
  guint retval;
817
818
0
  real = (GRealThreadPool*) pool;
819
820
0
  g_return_val_if_fail (real, 0);
821
0
  g_return_val_if_fail (real->running, 0);
822
823
0
  g_async_queue_lock (real->queue);
824
0
  retval = real->num_threads;
825
0
  g_async_queue_unlock (real->queue);
826
827
0
  return retval;
828
0
}
829
830
/**
831
 * g_thread_pool_unprocessed:
832
 * @pool: a #GThreadPool
833
 *
834
 * Returns the number of tasks still unprocessed in @pool.
835
 *
836
 * Returns: the number of unprocessed tasks
837
 */
838
guint
839
g_thread_pool_unprocessed (GThreadPool *pool)
840
0
{
841
0
  GRealThreadPool *real;
842
0
  gint unprocessed;
843
844
0
  real = (GRealThreadPool*) pool;
845
846
0
  g_return_val_if_fail (real, 0);
847
0
  g_return_val_if_fail (real->running, 0);
848
849
0
  unprocessed = g_async_queue_length (real->queue);
850
851
0
  return MAX (unprocessed, 0);
852
0
}
853
854
/**
855
 * g_thread_pool_free:
856
 * @pool: a #GThreadPool
857
 * @immediate: should @pool shut down immediately?
858
 * @wait_: should the function wait for all tasks to be finished?
859
 *
860
 * Frees all resources allocated for @pool.
861
 *
862
 * If @immediate is %TRUE, no new task is processed for @pool.
863
 * Otherwise @pool is not freed before the last task is processed.
864
 * Note however, that no thread of this pool is interrupted while
865
 * processing a task. Instead at least all still running threads
866
 * can finish their tasks before the @pool is freed.
867
 *
868
 * If @wait_ is %TRUE, this function does not return before all
869
 * tasks to be processed (dependent on @immediate, whether all
870
 * or only the currently running) are ready.
871
 * Otherwise this function returns immediately.
872
 *
873
 * After calling this function @pool must not be used anymore.
874
 */
875
void
876
g_thread_pool_free (GThreadPool *pool,
877
                    gboolean     immediate,
878
                    gboolean     wait_)
879
0
{
880
0
  GRealThreadPool *real;
881
882
0
  real = (GRealThreadPool*) pool;
883
884
0
  g_return_if_fail (real);
885
0
  g_return_if_fail (real->running);
886
887
  /* If there's no thread allowed here, there is not much sense in
888
   * not stopping this pool immediately, when it's not empty
889
   */
890
0
  g_return_if_fail (immediate ||
891
0
                    real->max_threads != 0 ||
892
0
                    g_async_queue_length (real->queue) == 0);
893
894
0
  g_async_queue_lock (real->queue);
895
896
0
  real->running = FALSE;
897
0
  real->immediate = immediate;
898
0
  real->waiting = wait_;
899
900
0
  if (wait_)
901
0
    {
902
0
      while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads &&
903
0
             !(immediate && real->num_threads == 0))
904
0
        g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue));
905
0
    }
906
907
0
  if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads)
908
0
    {
909
      /* No thread is currently doing something (and nothing is left
910
       * to process in the queue)
911
       */
912
0
      if (real->num_threads == 0)
913
0
        {
914
          /* No threads left, we clean up */
915
0
          g_async_queue_unlock (real->queue);
916
0
          g_thread_pool_free_internal (real);
917
0
          return;
918
0
        }
919
920
0
      g_thread_pool_wakeup_and_stop_all (real);
921
0
    }
922
923
  /* The last thread should cleanup the pool */
924
0
  real->waiting = FALSE;
925
0
  g_async_queue_unlock (real->queue);
926
0
}
927
928
static void
929
g_thread_pool_free_internal (GRealThreadPool* pool)
930
0
{
931
0
  g_return_if_fail (pool);
932
0
  g_return_if_fail (pool->running == FALSE);
933
0
  g_return_if_fail (pool->num_threads == 0);
934
935
0
  g_async_queue_unref (pool->queue);
936
0
  g_cond_clear (&pool->cond);
937
938
0
  g_free (pool);
939
0
}
940
941
static void
942
g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
943
0
{
944
0
  guint i;
945
946
0
  g_return_if_fail (pool);
947
0
  g_return_if_fail (pool->running == FALSE);
948
0
  g_return_if_fail (pool->num_threads != 0);
949
950
0
  pool->immediate = TRUE;
951
952
  /*
953
   * So here we're sending bogus data to the pool threads, which
954
   * should cause them each to wake up, and check the above
955
   * pool->immediate condition. However we don't want that
956
   * data to be sorted (since it'll crash the sorter).
957
   */
958
0
  for (i = 0; i < pool->num_threads; i++)
959
0
    g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
960
0
}
961
962
/**
963
 * g_thread_pool_set_max_unused_threads:
964
 * @max_threads: maximal number of unused threads
965
 *
966
 * Sets the maximal number of unused threads to @max_threads.
967
 * If @max_threads is -1, no limit is imposed on the number
968
 * of unused threads.
969
 *
970
 * The default value is 2.
971
 */
972
void
973
g_thread_pool_set_max_unused_threads (gint max_threads)
974
0
{
975
0
  g_return_if_fail (max_threads >= -1);
976
977
0
  g_atomic_int_set (&max_unused_threads, max_threads);
978
979
0
  if (max_threads != -1)
980
0
    {
981
0
      max_threads -= g_atomic_int_get (&unused_threads);
982
0
      if (max_threads < 0)
983
0
        {
984
0
          g_atomic_int_set (&kill_unused_threads, -max_threads);
985
0
          g_atomic_int_inc (&wakeup_thread_serial);
986
987
0
          g_async_queue_lock (unused_thread_queue);
988
989
0
          do
990
0
            {
991
0
              g_async_queue_push_unlocked (unused_thread_queue,
992
0
                                           wakeup_thread_marker);
993
0
            }
994
0
          while (++max_threads);
995
996
0
          g_async_queue_unlock (unused_thread_queue);
997
0
        }
998
0
    }
999
0
}
1000
1001
/**
1002
 * g_thread_pool_get_max_unused_threads:
1003
 *
1004
 * Returns the maximal allowed number of unused threads.
1005
 *
1006
 * Returns: the maximal number of unused threads
1007
 */
1008
gint
1009
g_thread_pool_get_max_unused_threads (void)
1010
0
{
1011
0
  return g_atomic_int_get (&max_unused_threads);
1012
0
}
1013
1014
/**
1015
 * g_thread_pool_get_num_unused_threads:
1016
 *
1017
 * Returns the number of currently unused threads.
1018
 *
1019
 * Returns: the number of currently unused threads
1020
 */
1021
guint
1022
g_thread_pool_get_num_unused_threads (void)
1023
0
{
1024
0
  return (guint) g_atomic_int_get (&unused_threads);
1025
0
}
1026
1027
/**
1028
 * g_thread_pool_stop_unused_threads:
1029
 *
1030
 * Stops all currently unused threads. This does not change the
1031
 * maximal number of unused threads. This function can be used to
1032
 * regularly stop all unused threads e.g. from g_timeout_add().
1033
 */
1034
void
1035
g_thread_pool_stop_unused_threads (void)
1036
0
{
1037
0
  guint oldval;
1038
1039
0
  oldval = g_thread_pool_get_max_unused_threads ();
1040
1041
0
  g_thread_pool_set_max_unused_threads (0);
1042
0
  g_thread_pool_set_max_unused_threads (oldval);
1043
0
}
1044
1045
/**
1046
 * g_thread_pool_set_sort_function:
1047
 * @pool: a #GThreadPool
1048
 * @func: the #GCompareDataFunc used to sort the list of tasks.
1049
 *     This function is passed two tasks. It should return
1050
 *     0 if the order in which they are handled does not matter,
1051
 *     a negative value if the first task should be processed before
1052
 *     the second or a positive value if the second task should be
1053
 *     processed first.
1054
 * @user_data: user data passed to @func
1055
 *
1056
 * Sets the function used to sort the list of tasks. This allows the
1057
 * tasks to be processed by a priority determined by @func, and not
1058
 * just in the order in which they were added to the pool.
1059
 *
1060
 * Note, if the maximum number of threads is more than 1, the order
1061
 * that threads are executed cannot be guaranteed 100%. Threads are
1062
 * scheduled by the operating system and are executed at random. It
1063
 * cannot be assumed that threads are executed in the order they are
1064
 * created.
1065
 *
1066
 * Since: 2.10
1067
 */
1068
void
1069
g_thread_pool_set_sort_function (GThreadPool      *pool,
1070
                                 GCompareDataFunc  func,
1071
                                 gpointer          user_data)
1072
0
{
1073
0
  GRealThreadPool *real;
1074
1075
0
  real = (GRealThreadPool*) pool;
1076
1077
0
  g_return_if_fail (real);
1078
0
  g_return_if_fail (real->running);
1079
1080
0
  g_async_queue_lock (real->queue);
1081
1082
0
  real->sort_func = func;
1083
0
  real->sort_user_data = user_data;
1084
1085
0
  if (func)
1086
0
    g_async_queue_sort_unlocked (real->queue,
1087
0
                                 real->sort_func,
1088
0
                                 real->sort_user_data);
1089
1090
0
  g_async_queue_unlock (real->queue);
1091
0
}
1092
1093
/**
1094
 * g_thread_pool_move_to_front:
1095
 * @pool: a #GThreadPool
1096
 * @data: an unprocessed item in the pool
1097
 *
1098
 * Moves the item to the front of the queue of unprocessed
1099
 * items, so that it will be processed next.
1100
 *
1101
 * Returns: %TRUE if the item was found and moved
1102
 *
1103
 * Since: 2.46
1104
 */
1105
gboolean
1106
g_thread_pool_move_to_front (GThreadPool *pool,
1107
                             gpointer     data)
1108
0
{
1109
0
  GRealThreadPool *real = (GRealThreadPool*) pool;
1110
0
  gboolean found;
1111
1112
0
  g_async_queue_lock (real->queue);
1113
1114
0
  found = g_async_queue_remove_unlocked (real->queue, data);
1115
0
  if (found)
1116
0
    g_async_queue_push_front_unlocked (real->queue, data);
1117
1118
0
  g_async_queue_unlock (real->queue);
1119
1120
0
  return found;
1121
0
}
1122
1123
/**
1124
 * g_thread_pool_set_max_idle_time:
1125
 * @interval: the maximum @interval (in milliseconds)
1126
 *     a thread can be idle
1127
 *
1128
 * This function will set the maximum @interval that a thread
1129
 * waiting in the pool for new tasks can be idle for before
1130
 * being stopped. This function is similar to calling
1131
 * g_thread_pool_stop_unused_threads() on a regular timeout,
1132
 * except this is done on a per thread basis.
1133
 *
1134
 * By setting @interval to 0, idle threads will not be stopped.
1135
 *
1136
 * The default value is 15000 (15 seconds).
1137
 *
1138
 * Since: 2.10
1139
 */
1140
void
1141
g_thread_pool_set_max_idle_time (guint interval)
1142
0
{
1143
0
  guint i;
1144
1145
0
  g_atomic_int_set (&max_idle_time, interval);
1146
1147
0
  i = (guint) g_atomic_int_get (&unused_threads);
1148
0
  if (i > 0)
1149
0
    {
1150
0
      g_atomic_int_inc (&wakeup_thread_serial);
1151
0
      g_async_queue_lock (unused_thread_queue);
1152
1153
0
      do
1154
0
        {
1155
0
          g_async_queue_push_unlocked (unused_thread_queue,
1156
0
                                       wakeup_thread_marker);
1157
0
        }
1158
0
      while (--i);
1159
1160
0
      g_async_queue_unlock (unused_thread_queue);
1161
0
    }
1162
0
}
1163
1164
/**
1165
 * g_thread_pool_get_max_idle_time:
1166
 *
1167
 * This function will return the maximum @interval that a
1168
 * thread will wait in the thread pool for new tasks before
1169
 * being stopped.
1170
 *
1171
 * If this function returns 0, threads waiting in the thread
1172
 * pool for new work are not stopped.
1173
 *
1174
 * Returns: the maximum @interval (milliseconds) to wait
1175
 *     for new tasks in the thread pool before stopping the
1176
 *     thread
1177
 *
1178
 * Since: 2.10
1179
 */
1180
guint
1181
g_thread_pool_get_max_idle_time (void)
1182
0
{
1183
0
  return (guint) g_atomic_int_get (&max_idle_time);
1184
0
}