Coverage Report

Created: 2024-05-04 12:45

/proc/self/cwd/external/pthreadpool/src/pthreads.c
Line
Count
Source (jump to first uncovered line)
1
/* Standard C headers */
2
#include <assert.h>
3
#include <limits.h>
4
#include <stdbool.h>
5
#include <stdint.h>
6
#include <stdlib.h>
7
#include <string.h>
8
9
/* Configuration header */
10
#include "threadpool-common.h"
11
12
/* POSIX headers */
13
#include <pthread.h>
14
#include <unistd.h>
15
16
/* Futex-specific headers */
17
#if PTHREADPOOL_USE_FUTEX
18
  #if defined(__linux__)
19
    #include <sys/syscall.h>
20
    #include <linux/futex.h>
21
22
    /* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */
23
    #ifndef SYS_futex
24
      #define SYS_futex __NR_futex
25
    #endif
26
    #ifndef FUTEX_PRIVATE_FLAG
27
      #define FUTEX_PRIVATE_FLAG 128
28
    #endif
29
  #elif defined(__EMSCRIPTEN__)
30
    /* math.h for INFINITY constant */
31
    #include <math.h>
32
33
    #include <emscripten/threading.h>
34
  #else
35
    #error "Platform-specific implementation of futex_wait and futex_wake_all required"
36
  #endif
37
#endif
38
39
/* Windows-specific headers */
40
#ifdef _WIN32
41
  #include <sysinfoapi.h>
42
#endif
43
44
/* Dependencies */
45
#if PTHREADPOOL_USE_CPUINFO
46
  #include <cpuinfo.h>
47
#endif
48
49
/* Public library header */
50
#include <pthreadpool.h>
51
52
/* Internal library headers */
53
#include "threadpool-atomics.h"
54
#include "threadpool-object.h"
55
#include "threadpool-utils.h"
56
57
58
#if PTHREADPOOL_USE_FUTEX
59
  #if defined(__linux__)
60
0
    static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
61
0
      return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL);
62
0
    }
63
64
0
    static int futex_wake_all(pthreadpool_atomic_uint32_t* address) {
65
0
      return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX);
66
0
    }
67
  #elif defined(__EMSCRIPTEN__)
68
    static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
69
      return emscripten_futex_wait((volatile void*) address, value, INFINITY);
70
    }
71
72
    static int futex_wake_all(pthreadpool_atomic_uint32_t* address) {
73
      return emscripten_futex_wake((volatile void*) address, INT_MAX);
74
    }
75
  #else
76
    #error "Platform-specific implementation of futex_wait and futex_wake_all required"
77
  #endif
78
#endif
79
80
0
static void checkin_worker_thread(struct pthreadpool* threadpool) {
81
0
  #if PTHREADPOOL_USE_FUTEX
82
0
    if (pthreadpool_decrement_fetch_acquire_release_size_t(&threadpool->active_threads) == 0) {
83
0
      pthreadpool_store_release_uint32_t(&threadpool->has_active_threads, 0);
84
0
      futex_wake_all(&threadpool->has_active_threads);
85
0
    }
86
  #else
87
    pthread_mutex_lock(&threadpool->completion_mutex);
88
    if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) {
89
      pthread_cond_signal(&threadpool->completion_condvar);
90
    }
91
    pthread_mutex_unlock(&threadpool->completion_mutex);
92
  #endif
93
0
}
94
95
0
static void wait_worker_threads(struct pthreadpool* threadpool) {
96
  /* Initial check */
97
0
  #if PTHREADPOOL_USE_FUTEX
98
0
    uint32_t has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
99
0
    if (has_active_threads == 0) {
100
0
      return;
101
0
    }
102
  #else
103
    size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
104
    if (active_threads == 0) {
105
      return;
106
    }
107
  #endif
108
109
  /* Spin-wait */
110
0
  for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
111
0
    pthreadpool_yield();
112
113
0
    #if PTHREADPOOL_USE_FUTEX
114
0
      has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
115
0
      if (has_active_threads == 0) {
116
0
        return;
117
0
      }
118
    #else
119
      active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
120
      if (active_threads == 0) {
121
        return;
122
      }
123
    #endif
124
0
  }
125
126
  /* Fall-back to mutex/futex wait */
127
0
  #if PTHREADPOOL_USE_FUTEX
128
0
    while ((has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads)) != 0) {
129
0
      futex_wait(&threadpool->has_active_threads, 1);
130
0
    }
131
  #else
132
    pthread_mutex_lock(&threadpool->completion_mutex);
133
    while (pthreadpool_load_acquire_size_t(&threadpool->active_threads) != 0) {
134
      pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex);
135
    };
136
    pthread_mutex_unlock(&threadpool->completion_mutex);
137
  #endif
138
0
}
139
140
static uint32_t wait_for_new_command(
141
  struct pthreadpool* threadpool,
142
  uint32_t last_command,
143
  uint32_t last_flags)
144
0
{
145
0
  uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
146
0
  if (command != last_command) {
147
0
    return command;
148
0
  }
149
150
0
  if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
151
    /* Spin-wait loop */
152
0
    for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
153
0
      pthreadpool_yield();
154
155
0
      command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
156
0
      if (command != last_command) {
157
0
        return command;
158
0
      }
159
0
    }
160
0
  }
161
162
  /* Spin-wait disabled or timed out, fall back to mutex/futex wait */
163
0
  #if PTHREADPOOL_USE_FUTEX
164
0
    do {
165
0
      futex_wait(&threadpool->command, last_command);
166
0
      command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
167
0
    } while (command == last_command);
168
  #else
169
    /* Lock the command mutex */
170
    pthread_mutex_lock(&threadpool->command_mutex);
171
    /* Read the command */
172
    while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == last_command) {
173
      /* Wait for new command */
174
      pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex);
175
    }
176
    /* Read a new command */
177
    pthread_mutex_unlock(&threadpool->command_mutex);
178
  #endif
179
0
  return command;
180
0
}
181
182
0
static void* thread_main(void* arg) {
183
0
  struct thread_info* thread = (struct thread_info*) arg;
184
0
  struct pthreadpool* threadpool = thread->threadpool;
185
0
  uint32_t last_command = threadpool_command_init;
186
0
  struct fpu_state saved_fpu_state = { 0 };
187
0
  uint32_t flags = 0;
188
189
  /* Check in */
190
0
  checkin_worker_thread(threadpool);
191
192
  /* Monitor new commands and act accordingly */
193
0
  for (;;) {
194
0
    uint32_t command = wait_for_new_command(threadpool, last_command, flags);
195
0
    pthreadpool_fence_acquire();
196
197
0
    flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
198
199
    /* Process command */
200
0
    switch (command & THREADPOOL_COMMAND_MASK) {
201
0
      case threadpool_command_parallelize:
202
0
      {
203
0
        const thread_function_t thread_function =
204
0
          (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
205
0
        if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
206
0
          saved_fpu_state = get_fpu_state();
207
0
          disable_fpu_denormals();
208
0
        }
209
210
0
        thread_function(threadpool, thread);
211
0
        if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
212
0
          set_fpu_state(saved_fpu_state);
213
0
        }
214
0
        break;
215
0
      }
216
0
      case threadpool_command_shutdown:
217
        /* Exit immediately: the master thread is waiting on pthread_join */
218
0
        return NULL;
219
0
      case threadpool_command_init:
220
        /* To inhibit compiler warning */
221
0
        break;
222
0
    }
223
    /* Notify the master thread that we finished processing */
224
0
    checkin_worker_thread(threadpool);
225
    /* Update last command */
226
0
    last_command = command;
227
0
  };
228
0
}
229
230
0
struct pthreadpool* pthreadpool_create(size_t threads_count) {
231
  #if PTHREADPOOL_USE_CPUINFO
232
    if (!cpuinfo_initialize()) {
233
      return NULL;
234
    }
235
  #endif
236
237
0
  if (threads_count == 0) {
238
    #if PTHREADPOOL_USE_CPUINFO
239
      threads_count = cpuinfo_get_processors_count();
240
    #elif defined(_SC_NPROCESSORS_ONLN)
241
0
      threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN);
242
      #if defined(__EMSCRIPTEN_PTHREADS__)
243
        /* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */
244
        if (threads_count >= 8) {
245
          threads_count = 8;
246
        }
247
      #endif
248
    #elif defined(_WIN32)
249
      SYSTEM_INFO system_info;
250
      ZeroMemory(&system_info, sizeof(system_info));
251
      GetSystemInfo(&system_info);
252
      threads_count = (size_t) system_info.dwNumberOfProcessors;
253
    #else
254
      #error "Platform-specific implementation of sysconf(_SC_NPROCESSORS_ONLN) required"
255
    #endif
256
0
  }
257
258
0
  struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
259
0
  if (threadpool == NULL) {
260
0
    return NULL;
261
0
  }
262
0
  threadpool->threads_count = fxdiv_init_size_t(threads_count);
263
0
  for (size_t tid = 0; tid < threads_count; tid++) {
264
0
    threadpool->threads[tid].thread_number = tid;
265
0
    threadpool->threads[tid].threadpool = threadpool;
266
0
  }
267
268
  /* Thread pool with a single thread computes everything on the caller thread. */
269
0
  if (threads_count > 1) {
270
0
    pthread_mutex_init(&threadpool->execution_mutex, NULL);
271
    #if !PTHREADPOOL_USE_FUTEX
272
      pthread_mutex_init(&threadpool->completion_mutex, NULL);
273
      pthread_cond_init(&threadpool->completion_condvar, NULL);
274
      pthread_mutex_init(&threadpool->command_mutex, NULL);
275
      pthread_cond_init(&threadpool->command_condvar, NULL);
276
    #endif
277
278
0
    #if PTHREADPOOL_USE_FUTEX
279
0
      pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
280
0
    #endif
281
0
    pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
282
283
    /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
284
0
    for (size_t tid = 1; tid < threads_count; tid++) {
285
0
      pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]);
286
0
    }
287
288
    /* Wait until all threads initialize */
289
0
    wait_worker_threads(threadpool);
290
0
  }
291
0
  return threadpool;
292
0
}
293
294
PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
295
  struct pthreadpool* threadpool,
296
  thread_function_t thread_function,
297
  const void* params,
298
  size_t params_size,
299
  void* task,
300
  void* context,
301
  size_t linear_range,
302
  uint32_t flags)
303
0
{
304
0
  assert(threadpool != NULL);
305
0
  assert(thread_function != NULL);
306
0
  assert(task != NULL);
307
0
  assert(linear_range > 1);
308
309
  /* Protect the global threadpool structures */
310
0
  pthread_mutex_lock(&threadpool->execution_mutex);
311
312
  #if !PTHREADPOOL_USE_FUTEX
313
    /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */
314
    pthread_mutex_lock(&threadpool->command_mutex);
315
  #endif
316
317
  /* Setup global arguments */
318
0
  pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
319
0
  pthreadpool_store_relaxed_void_p(&threadpool->task, task);
320
0
  pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
321
0
  pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
322
323
  /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
324
0
  const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
325
0
  pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */);
326
0
  #if PTHREADPOOL_USE_FUTEX
327
0
    pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
328
0
  #endif
329
330
0
  if (params_size != 0) {
331
0
    memcpy(&threadpool->params, params, params_size);
332
0
    pthreadpool_fence_release();
333
0
  }
334
335
  /* Spread the work between threads */
336
0
  const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
337
0
  size_t range_start = 0;
338
0
  for (size_t tid = 0; tid < threads_count.value; tid++) {
339
0
    struct thread_info* thread = &threadpool->threads[tid];
340
0
    const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
341
0
    const size_t range_end = range_start + range_length;
342
0
    pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
343
0
    pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
344
0
    pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
345
346
    /* The next subrange starts where the previous ended */
347
0
    range_start = range_end;
348
0
  }
349
350
  /*
351
   * Update the threadpool command.
352
   * Imporantly, do it after initializing command parameters (range, task, argument, flags)
353
   * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
354
   * to ensure the unmasked command is different then the last command, because worker threads
355
   * monitor for change in the unmasked command.
356
   */
357
0
  const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
358
0
  const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize;
359
360
  /*
361
   * Store the command with release semantics to guarantee that if a worker thread observes
362
   * the new command value, it also observes the updated command parameters.
363
   *
364
   * Note: release semantics is necessary even with a conditional variable, because the workers might
365
   * be waiting in a spin-loop rather than the conditional variable.
366
   */
367
0
  pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
368
0
  #if PTHREADPOOL_USE_FUTEX
369
    /* Wake up the threads */
370
0
    futex_wake_all(&threadpool->command);
371
  #else
372
    /* Unlock the command variables before waking up the threads for better performance */
373
    pthread_mutex_unlock(&threadpool->command_mutex);
374
375
    /* Wake up the threads */
376
    pthread_cond_broadcast(&threadpool->command_condvar);
377
  #endif
378
379
  /* Save and modify FPU denormals control, if needed */
380
0
  struct fpu_state saved_fpu_state = { 0 };
381
0
  if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
382
0
    saved_fpu_state = get_fpu_state();
383
0
    disable_fpu_denormals();
384
0
  }
385
386
  /* Do computations as worker #0 */
387
0
  thread_function(threadpool, &threadpool->threads[0]);
388
389
  /* Restore FPU denormals control, if needed */
390
0
  if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
391
0
    set_fpu_state(saved_fpu_state);
392
0
  }
393
394
  /* Wait until the threads finish computation */
395
0
  wait_worker_threads(threadpool);
396
397
  /* Make changes by other threads visible to this thread */
398
0
  pthreadpool_fence_acquire();
399
400
  /* Unprotect the global threadpool structures */
401
0
  pthread_mutex_unlock(&threadpool->execution_mutex);
402
0
}
403
404
0
void pthreadpool_destroy(struct pthreadpool* threadpool) {
405
0
  if (threadpool != NULL) {
406
0
    const size_t threads_count = threadpool->threads_count.value;
407
0
    if (threads_count > 1) {
408
0
      #if PTHREADPOOL_USE_FUTEX
409
0
        pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
410
0
        pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
411
412
        /*
413
         * Store the command with release semantics to guarantee that if a worker thread observes
414
         * the new command value, it also observes the updated active_threads/has_active_threads values.
415
         */
416
0
        pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
417
418
        /* Wake up worker threads */
419
0
        futex_wake_all(&threadpool->command);
420
      #else
421
        /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */
422
        pthread_mutex_lock(&threadpool->command_mutex);
423
424
        pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
425
426
        /*
427
         * Store the command with release semantics to guarantee that if a worker thread observes
428
         * the new command value, it also observes the updated active_threads value.
429
         *
430
         * Note: the release fence inside pthread_mutex_unlock is insufficient,
431
         * because the workers might be waiting in a spin-loop rather than the conditional variable.
432
         */
433
        pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
434
435
        /* Wake up worker threads */
436
        pthread_cond_broadcast(&threadpool->command_condvar);
437
438
        /* Commit the state changes and let workers start processing */
439
        pthread_mutex_unlock(&threadpool->command_mutex);
440
      #endif
441
442
      /* Wait until all threads return */
443
0
      for (size_t thread = 1; thread < threads_count; thread++) {
444
0
        pthread_join(threadpool->threads[thread].thread_object, NULL);
445
0
      }
446
447
      /* Release resources */
448
0
      pthread_mutex_destroy(&threadpool->execution_mutex);
449
      #if !PTHREADPOOL_USE_FUTEX
450
        pthread_mutex_destroy(&threadpool->completion_mutex);
451
        pthread_cond_destroy(&threadpool->completion_condvar);
452
        pthread_mutex_destroy(&threadpool->command_mutex);
453
        pthread_cond_destroy(&threadpool->command_condvar);
454
      #endif
455
0
    }
456
    #if PTHREADPOOL_USE_CPUINFO
457
      cpuinfo_deinitialize();
458
    #endif
459
0
    pthreadpool_deallocate(threadpool);
460
0
  }
461
0
}