/proc/self/cwd/external/pthreadpool/src/pthreads.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* Standard C headers */ |
2 | | #include <assert.h> |
3 | | #include <limits.h> |
4 | | #include <stdbool.h> |
5 | | #include <stdint.h> |
6 | | #include <stdlib.h> |
7 | | #include <string.h> |
8 | | |
9 | | /* Configuration header */ |
10 | | #include "threadpool-common.h" |
11 | | |
12 | | /* POSIX headers */ |
13 | | #include <pthread.h> |
14 | | #include <unistd.h> |
15 | | |
16 | | /* Futex-specific headers */ |
17 | | #if PTHREADPOOL_USE_FUTEX |
18 | | #if defined(__linux__) |
19 | | #include <sys/syscall.h> |
20 | | #include <linux/futex.h> |
21 | | |
22 | | /* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */ |
23 | | #ifndef SYS_futex |
24 | | #define SYS_futex __NR_futex |
25 | | #endif |
26 | | #ifndef FUTEX_PRIVATE_FLAG |
27 | | #define FUTEX_PRIVATE_FLAG 128 |
28 | | #endif |
29 | | #elif defined(__EMSCRIPTEN__) |
30 | | /* math.h for INFINITY constant */ |
31 | | #include <math.h> |
32 | | |
33 | | #include <emscripten/threading.h> |
34 | | #else |
35 | | #error "Platform-specific implementation of futex_wait and futex_wake_all required" |
36 | | #endif |
37 | | #endif |
38 | | |
39 | | /* Windows-specific headers */ |
40 | | #ifdef _WIN32 |
41 | | #include <sysinfoapi.h> |
42 | | #endif |
43 | | |
44 | | /* Dependencies */ |
45 | | #if PTHREADPOOL_USE_CPUINFO |
46 | | #include <cpuinfo.h> |
47 | | #endif |
48 | | |
49 | | /* Public library header */ |
50 | | #include <pthreadpool.h> |
51 | | |
52 | | /* Internal library headers */ |
53 | | #include "threadpool-atomics.h" |
54 | | #include "threadpool-object.h" |
55 | | #include "threadpool-utils.h" |
56 | | |
57 | | |
58 | | #if PTHREADPOOL_USE_FUTEX |
59 | | #if defined(__linux__) |
60 | 0 | static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { |
61 | 0 | return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL); |
62 | 0 | } |
63 | | |
64 | 0 | static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { |
65 | 0 | return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX); |
66 | 0 | } |
67 | | #elif defined(__EMSCRIPTEN__) |
68 | | static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { |
69 | | return emscripten_futex_wait((volatile void*) address, value, INFINITY); |
70 | | } |
71 | | |
72 | | static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { |
73 | | return emscripten_futex_wake((volatile void*) address, INT_MAX); |
74 | | } |
75 | | #else |
76 | | #error "Platform-specific implementation of futex_wait and futex_wake_all required" |
77 | | #endif |
78 | | #endif |
79 | | |
80 | 0 | static void checkin_worker_thread(struct pthreadpool* threadpool) { |
81 | 0 | #if PTHREADPOOL_USE_FUTEX |
82 | 0 | if (pthreadpool_decrement_fetch_acquire_release_size_t(&threadpool->active_threads) == 0) { |
83 | 0 | pthreadpool_store_release_uint32_t(&threadpool->has_active_threads, 0); |
84 | 0 | futex_wake_all(&threadpool->has_active_threads); |
85 | 0 | } |
86 | | #else |
87 | | pthread_mutex_lock(&threadpool->completion_mutex); |
88 | | if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) { |
89 | | pthread_cond_signal(&threadpool->completion_condvar); |
90 | | } |
91 | | pthread_mutex_unlock(&threadpool->completion_mutex); |
92 | | #endif |
93 | 0 | } |
94 | | |
95 | 0 | static void wait_worker_threads(struct pthreadpool* threadpool) { |
96 | | /* Initial check */ |
97 | 0 | #if PTHREADPOOL_USE_FUTEX |
98 | 0 | uint32_t has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads); |
99 | 0 | if (has_active_threads == 0) { |
100 | 0 | return; |
101 | 0 | } |
102 | | #else |
103 | | size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads); |
104 | | if (active_threads == 0) { |
105 | | return; |
106 | | } |
107 | | #endif |
108 | | |
109 | | /* Spin-wait */ |
110 | 0 | for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { |
111 | 0 | pthreadpool_yield(); |
112 | |
|
113 | 0 | #if PTHREADPOOL_USE_FUTEX |
114 | 0 | has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads); |
115 | 0 | if (has_active_threads == 0) { |
116 | 0 | return; |
117 | 0 | } |
118 | | #else |
119 | | active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads); |
120 | | if (active_threads == 0) { |
121 | | return; |
122 | | } |
123 | | #endif |
124 | 0 | } |
125 | | |
126 | | /* Fall-back to mutex/futex wait */ |
127 | 0 | #if PTHREADPOOL_USE_FUTEX |
128 | 0 | while ((has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads)) != 0) { |
129 | 0 | futex_wait(&threadpool->has_active_threads, 1); |
130 | 0 | } |
131 | | #else |
132 | | pthread_mutex_lock(&threadpool->completion_mutex); |
133 | | while (pthreadpool_load_acquire_size_t(&threadpool->active_threads) != 0) { |
134 | | pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex); |
135 | | }; |
136 | | pthread_mutex_unlock(&threadpool->completion_mutex); |
137 | | #endif |
138 | 0 | } |
139 | | |
140 | | static uint32_t wait_for_new_command( |
141 | | struct pthreadpool* threadpool, |
142 | | uint32_t last_command, |
143 | | uint32_t last_flags) |
144 | 0 | { |
145 | 0 | uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command); |
146 | 0 | if (command != last_command) { |
147 | 0 | return command; |
148 | 0 | } |
149 | | |
150 | 0 | if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) { |
151 | | /* Spin-wait loop */ |
152 | 0 | for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { |
153 | 0 | pthreadpool_yield(); |
154 | |
|
155 | 0 | command = pthreadpool_load_acquire_uint32_t(&threadpool->command); |
156 | 0 | if (command != last_command) { |
157 | 0 | return command; |
158 | 0 | } |
159 | 0 | } |
160 | 0 | } |
161 | | |
162 | | /* Spin-wait disabled or timed out, fall back to mutex/futex wait */ |
163 | 0 | #if PTHREADPOOL_USE_FUTEX |
164 | 0 | do { |
165 | 0 | futex_wait(&threadpool->command, last_command); |
166 | 0 | command = pthreadpool_load_acquire_uint32_t(&threadpool->command); |
167 | 0 | } while (command == last_command); |
168 | | #else |
169 | | /* Lock the command mutex */ |
170 | | pthread_mutex_lock(&threadpool->command_mutex); |
171 | | /* Read the command */ |
172 | | while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == last_command) { |
173 | | /* Wait for new command */ |
174 | | pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); |
175 | | } |
176 | | /* Read a new command */ |
177 | | pthread_mutex_unlock(&threadpool->command_mutex); |
178 | | #endif |
179 | 0 | return command; |
180 | 0 | } |
181 | | |
182 | 0 | static void* thread_main(void* arg) { |
183 | 0 | struct thread_info* thread = (struct thread_info*) arg; |
184 | 0 | struct pthreadpool* threadpool = thread->threadpool; |
185 | 0 | uint32_t last_command = threadpool_command_init; |
186 | 0 | struct fpu_state saved_fpu_state = { 0 }; |
187 | 0 | uint32_t flags = 0; |
188 | | |
189 | | /* Check in */ |
190 | 0 | checkin_worker_thread(threadpool); |
191 | | |
192 | | /* Monitor new commands and act accordingly */ |
193 | 0 | for (;;) { |
194 | 0 | uint32_t command = wait_for_new_command(threadpool, last_command, flags); |
195 | 0 | pthreadpool_fence_acquire(); |
196 | |
|
197 | 0 | flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); |
198 | | |
199 | | /* Process command */ |
200 | 0 | switch (command & THREADPOOL_COMMAND_MASK) { |
201 | 0 | case threadpool_command_parallelize: |
202 | 0 | { |
203 | 0 | const thread_function_t thread_function = |
204 | 0 | (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function); |
205 | 0 | if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { |
206 | 0 | saved_fpu_state = get_fpu_state(); |
207 | 0 | disable_fpu_denormals(); |
208 | 0 | } |
209 | |
|
210 | 0 | thread_function(threadpool, thread); |
211 | 0 | if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { |
212 | 0 | set_fpu_state(saved_fpu_state); |
213 | 0 | } |
214 | 0 | break; |
215 | 0 | } |
216 | 0 | case threadpool_command_shutdown: |
217 | | /* Exit immediately: the master thread is waiting on pthread_join */ |
218 | 0 | return NULL; |
219 | 0 | case threadpool_command_init: |
220 | | /* To inhibit compiler warning */ |
221 | 0 | break; |
222 | 0 | } |
223 | | /* Notify the master thread that we finished processing */ |
224 | 0 | checkin_worker_thread(threadpool); |
225 | | /* Update last command */ |
226 | 0 | last_command = command; |
227 | 0 | }; |
228 | 0 | } |
229 | | |
230 | 0 | struct pthreadpool* pthreadpool_create(size_t threads_count) { |
231 | | #if PTHREADPOOL_USE_CPUINFO |
232 | | if (!cpuinfo_initialize()) { |
233 | | return NULL; |
234 | | } |
235 | | #endif |
236 | |
|
237 | 0 | if (threads_count == 0) { |
238 | | #if PTHREADPOOL_USE_CPUINFO |
239 | | threads_count = cpuinfo_get_processors_count(); |
240 | | #elif defined(_SC_NPROCESSORS_ONLN) |
241 | 0 | threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); |
242 | | #if defined(__EMSCRIPTEN_PTHREADS__) |
243 | | /* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */ |
244 | | if (threads_count >= 8) { |
245 | | threads_count = 8; |
246 | | } |
247 | | #endif |
248 | | #elif defined(_WIN32) |
249 | | SYSTEM_INFO system_info; |
250 | | ZeroMemory(&system_info, sizeof(system_info)); |
251 | | GetSystemInfo(&system_info); |
252 | | threads_count = (size_t) system_info.dwNumberOfProcessors; |
253 | | #else |
254 | | #error "Platform-specific implementation of sysconf(_SC_NPROCESSORS_ONLN) required" |
255 | | #endif |
256 | 0 | } |
257 | |
|
258 | 0 | struct pthreadpool* threadpool = pthreadpool_allocate(threads_count); |
259 | 0 | if (threadpool == NULL) { |
260 | 0 | return NULL; |
261 | 0 | } |
262 | 0 | threadpool->threads_count = fxdiv_init_size_t(threads_count); |
263 | 0 | for (size_t tid = 0; tid < threads_count; tid++) { |
264 | 0 | threadpool->threads[tid].thread_number = tid; |
265 | 0 | threadpool->threads[tid].threadpool = threadpool; |
266 | 0 | } |
267 | | |
268 | | /* Thread pool with a single thread computes everything on the caller thread. */ |
269 | 0 | if (threads_count > 1) { |
270 | 0 | pthread_mutex_init(&threadpool->execution_mutex, NULL); |
271 | | #if !PTHREADPOOL_USE_FUTEX |
272 | | pthread_mutex_init(&threadpool->completion_mutex, NULL); |
273 | | pthread_cond_init(&threadpool->completion_condvar, NULL); |
274 | | pthread_mutex_init(&threadpool->command_mutex, NULL); |
275 | | pthread_cond_init(&threadpool->command_condvar, NULL); |
276 | | #endif |
277 | |
|
278 | 0 | #if PTHREADPOOL_USE_FUTEX |
279 | 0 | pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); |
280 | 0 | #endif |
281 | 0 | pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); |
282 | | |
283 | | /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ |
284 | 0 | for (size_t tid = 1; tid < threads_count; tid++) { |
285 | 0 | pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]); |
286 | 0 | } |
287 | | |
288 | | /* Wait until all threads initialize */ |
289 | 0 | wait_worker_threads(threadpool); |
290 | 0 | } |
291 | 0 | return threadpool; |
292 | 0 | } |
293 | | |
294 | | PTHREADPOOL_INTERNAL void pthreadpool_parallelize( |
295 | | struct pthreadpool* threadpool, |
296 | | thread_function_t thread_function, |
297 | | const void* params, |
298 | | size_t params_size, |
299 | | void* task, |
300 | | void* context, |
301 | | size_t linear_range, |
302 | | uint32_t flags) |
303 | 0 | { |
304 | 0 | assert(threadpool != NULL); |
305 | 0 | assert(thread_function != NULL); |
306 | 0 | assert(task != NULL); |
307 | 0 | assert(linear_range > 1); |
308 | | |
309 | | /* Protect the global threadpool structures */ |
310 | 0 | pthread_mutex_lock(&threadpool->execution_mutex); |
311 | |
|
312 | | #if !PTHREADPOOL_USE_FUTEX |
313 | | /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ |
314 | | pthread_mutex_lock(&threadpool->command_mutex); |
315 | | #endif |
316 | | |
317 | | /* Setup global arguments */ |
318 | 0 | pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function); |
319 | 0 | pthreadpool_store_relaxed_void_p(&threadpool->task, task); |
320 | 0 | pthreadpool_store_relaxed_void_p(&threadpool->argument, context); |
321 | 0 | pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); |
322 | | |
323 | | /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ |
324 | 0 | const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; |
325 | 0 | pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */); |
326 | 0 | #if PTHREADPOOL_USE_FUTEX |
327 | 0 | pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); |
328 | 0 | #endif |
329 | |
|
330 | 0 | if (params_size != 0) { |
331 | 0 | memcpy(&threadpool->params, params, params_size); |
332 | 0 | pthreadpool_fence_release(); |
333 | 0 | } |
334 | | |
335 | | /* Spread the work between threads */ |
336 | 0 | const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count); |
337 | 0 | size_t range_start = 0; |
338 | 0 | for (size_t tid = 0; tid < threads_count.value; tid++) { |
339 | 0 | struct thread_info* thread = &threadpool->threads[tid]; |
340 | 0 | const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder); |
341 | 0 | const size_t range_end = range_start + range_length; |
342 | 0 | pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); |
343 | 0 | pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); |
344 | 0 | pthreadpool_store_relaxed_size_t(&thread->range_length, range_length); |
345 | | |
346 | | /* The next subrange starts where the previous ended */ |
347 | 0 | range_start = range_end; |
348 | 0 | } |
349 | | |
350 | | /* |
351 | | * Update the threadpool command. |
352 | | * Imporantly, do it after initializing command parameters (range, task, argument, flags) |
353 | | * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask |
354 | | * to ensure the unmasked command is different then the last command, because worker threads |
355 | | * monitor for change in the unmasked command. |
356 | | */ |
357 | 0 | const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); |
358 | 0 | const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; |
359 | | |
360 | | /* |
361 | | * Store the command with release semantics to guarantee that if a worker thread observes |
362 | | * the new command value, it also observes the updated command parameters. |
363 | | * |
364 | | * Note: release semantics is necessary even with a conditional variable, because the workers might |
365 | | * be waiting in a spin-loop rather than the conditional variable. |
366 | | */ |
367 | 0 | pthreadpool_store_release_uint32_t(&threadpool->command, new_command); |
368 | 0 | #if PTHREADPOOL_USE_FUTEX |
369 | | /* Wake up the threads */ |
370 | 0 | futex_wake_all(&threadpool->command); |
371 | | #else |
372 | | /* Unlock the command variables before waking up the threads for better performance */ |
373 | | pthread_mutex_unlock(&threadpool->command_mutex); |
374 | | |
375 | | /* Wake up the threads */ |
376 | | pthread_cond_broadcast(&threadpool->command_condvar); |
377 | | #endif |
378 | | |
379 | | /* Save and modify FPU denormals control, if needed */ |
380 | 0 | struct fpu_state saved_fpu_state = { 0 }; |
381 | 0 | if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { |
382 | 0 | saved_fpu_state = get_fpu_state(); |
383 | 0 | disable_fpu_denormals(); |
384 | 0 | } |
385 | | |
386 | | /* Do computations as worker #0 */ |
387 | 0 | thread_function(threadpool, &threadpool->threads[0]); |
388 | | |
389 | | /* Restore FPU denormals control, if needed */ |
390 | 0 | if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { |
391 | 0 | set_fpu_state(saved_fpu_state); |
392 | 0 | } |
393 | | |
394 | | /* Wait until the threads finish computation */ |
395 | 0 | wait_worker_threads(threadpool); |
396 | | |
397 | | /* Make changes by other threads visible to this thread */ |
398 | 0 | pthreadpool_fence_acquire(); |
399 | | |
400 | | /* Unprotect the global threadpool structures */ |
401 | 0 | pthread_mutex_unlock(&threadpool->execution_mutex); |
402 | 0 | } |
403 | | |
404 | 0 | void pthreadpool_destroy(struct pthreadpool* threadpool) { |
405 | 0 | if (threadpool != NULL) { |
406 | 0 | const size_t threads_count = threadpool->threads_count.value; |
407 | 0 | if (threads_count > 1) { |
408 | 0 | #if PTHREADPOOL_USE_FUTEX |
409 | 0 | pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); |
410 | 0 | pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); |
411 | | |
412 | | /* |
413 | | * Store the command with release semantics to guarantee that if a worker thread observes |
414 | | * the new command value, it also observes the updated active_threads/has_active_threads values. |
415 | | */ |
416 | 0 | pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); |
417 | | |
418 | | /* Wake up worker threads */ |
419 | 0 | futex_wake_all(&threadpool->command); |
420 | | #else |
421 | | /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ |
422 | | pthread_mutex_lock(&threadpool->command_mutex); |
423 | | |
424 | | pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); |
425 | | |
426 | | /* |
427 | | * Store the command with release semantics to guarantee that if a worker thread observes |
428 | | * the new command value, it also observes the updated active_threads value. |
429 | | * |
430 | | * Note: the release fence inside pthread_mutex_unlock is insufficient, |
431 | | * because the workers might be waiting in a spin-loop rather than the conditional variable. |
432 | | */ |
433 | | pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); |
434 | | |
435 | | /* Wake up worker threads */ |
436 | | pthread_cond_broadcast(&threadpool->command_condvar); |
437 | | |
438 | | /* Commit the state changes and let workers start processing */ |
439 | | pthread_mutex_unlock(&threadpool->command_mutex); |
440 | | #endif |
441 | | |
442 | | /* Wait until all threads return */ |
443 | 0 | for (size_t thread = 1; thread < threads_count; thread++) { |
444 | 0 | pthread_join(threadpool->threads[thread].thread_object, NULL); |
445 | 0 | } |
446 | | |
447 | | /* Release resources */ |
448 | 0 | pthread_mutex_destroy(&threadpool->execution_mutex); |
449 | | #if !PTHREADPOOL_USE_FUTEX |
450 | | pthread_mutex_destroy(&threadpool->completion_mutex); |
451 | | pthread_cond_destroy(&threadpool->completion_condvar); |
452 | | pthread_mutex_destroy(&threadpool->command_mutex); |
453 | | pthread_cond_destroy(&threadpool->command_condvar); |
454 | | #endif |
455 | 0 | } |
456 | | #if PTHREADPOOL_USE_CPUINFO |
457 | | cpuinfo_deinitialize(); |
458 | | #endif |
459 | 0 | pthreadpool_deallocate(threadpool); |
460 | 0 | } |
461 | 0 | } |