Coverage Report

Created: 2025-07-11 06:53

/src/htslib/thread_pool.c
Line
Count
Source (jump to first uncovered line)
1
/*  thread_pool.c -- A pool of generic worker threads
2
3
    Copyright (c) 2013-2020, 2025 Genome Research Ltd.
4
5
    Author: James Bonfield <jkb@sanger.ac.uk>
6
7
Permission is hereby granted, free of charge, to any person obtaining a copy
8
of this software and associated documentation files (the "Software"), to deal
9
in the Software without restriction, including without limitation the rights
10
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11
copies of the Software, and to permit persons to whom the Software is
12
furnished to do so, subject to the following conditions:
13
14
The above copyright notice and this permission notice shall be included in
15
all copies or substantial portions of the Software.
16
17
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23
DEALINGS IN THE SOFTWARE.  */
24
25
#ifndef TEST_MAIN
26
#define HTS_BUILDING_LIBRARY // Enables HTSLIB_EXPORT, see htslib/hts_defs.h
27
#include <config.h>
28
#endif
29
30
#include <stdlib.h>
31
#include <inttypes.h>
32
#include <signal.h>
33
#include <errno.h>
34
#include <stdio.h>
35
#include <string.h>
36
#include <sys/time.h>
37
#include <assert.h>
38
#include <stdarg.h>
39
#include <unistd.h>
40
#include <limits.h>
41
42
#include "thread_pool_internal.h"
43
#include "htslib/hts_log.h"
44
45
// Minimum stack size for threads.  Required for some rANS codecs
46
// that use over 2Mbytes of stack for encoder / decoder state
47
0
#define HTS_MIN_THREAD_STACK (3 * 1024 * 1024)
48
49
static void hts_tpool_process_detach_locked(hts_tpool *p,
50
                                            hts_tpool_process *q);
51
52
//#define DEBUG
53
54
// Return the worker ID index, from 0 to nthreads-1.
55
// Return <0 on error, but this shouldn't be possible
56
0
int hts_tpool_worker_id(hts_tpool *p) {
57
0
    if (!p)
58
0
        return -1;
59
0
    pthread_t s = pthread_self();
60
0
    int i;
61
0
    for (i = 0; i < p->tsize; i++) {
62
0
        if (pthread_equal(s, p->t[i].tid))
63
0
            return i;
64
0
    }
65
0
    return -1;
66
0
}
67
68
#ifdef DEBUG
69
void DBG_OUT(FILE *fp, char *fmt, ...) {
70
    va_list args;
71
    va_start(args, fmt);
72
    vfprintf(fp, fmt, args);
73
    va_end(args);
74
}
75
#else
76
0
#define DBG_OUT(...) do{}while(0)
77
#endif
78
79
/* ----------------------------------------------------------------------------
80
 * A process-queue to hold results from the thread pool.
81
 *
82
 * Each thread pool may have jobs of multiple types being queued up and
83
 * interleaved, so we attach several job process-queues to a single pool.
84
 *
85
 * The jobs themselves are expected to push their results onto their
86
 * appropriate results queue.
87
 */
88
89
/*
90
 * Adds a result to the end of the process result queue.
91
 *
92
 * Returns 0 on success;
93
 *        -1 on failure
94
 */
95
0
static int hts_tpool_add_result(hts_tpool_job *j, void *data) {
96
0
    hts_tpool_process *q = j->q;
97
0
    hts_tpool_result *r;
98
99
0
    pthread_mutex_lock(&q->p->pool_m);
100
101
0
    DBG_OUT(stderr, "%d: Adding result to queue %p, serial %"PRId64", %d of %d\n",
102
0
            hts_tpool_worker_id(j->p), q, j->serial, q->n_output+1, q->qsize);
103
104
0
    if (--q->n_processing == 0)
105
0
        pthread_cond_signal(&q->none_processing_c);
106
107
    /* No results queue is fine if we don't want any results back */
108
0
    if (q->in_only) {
109
0
        pthread_mutex_unlock(&q->p->pool_m);
110
0
        return 0;
111
0
    }
112
113
0
    if (!(r = malloc(sizeof(*r)))) {
114
0
        pthread_mutex_unlock(&q->p->pool_m);
115
0
        hts_tpool_process_shutdown(q);
116
0
        return -1;
117
0
    }
118
119
0
    r->next = NULL;
120
0
    r->data = data;
121
0
    r->result_cleanup = j->result_cleanup;
122
0
    r->serial = j->serial;
123
124
0
    q->n_output++;
125
0
    if (q->output_tail) {
126
0
        q->output_tail->next = r;
127
0
        q->output_tail = r;
128
0
    } else {
129
0
        q->output_head = q->output_tail = r;
130
0
    }
131
132
0
    assert(r->serial >= q->next_serial    // Or it will never be dequeued ...
133
0
           || q->next_serial == INT_MAX); // ... unless flush in progress.
134
0
    if (r->serial == q->next_serial) {
135
0
        DBG_OUT(stderr, "%d: Broadcasting result_avail (id %"PRId64")\n",
136
0
                hts_tpool_worker_id(j->p), r->serial);
137
0
        pthread_cond_broadcast(&q->output_avail_c);
138
0
        DBG_OUT(stderr, "%d: Broadcast complete\n", hts_tpool_worker_id(j->p));
139
0
    }
140
141
0
    pthread_mutex_unlock(&q->p->pool_m);
142
143
0
    return 0;
144
0
}
145
146
static void wake_next_worker(hts_tpool_process *q, int locked);
147
148
/* Core of hts_tpool_next_result() */
149
0
static hts_tpool_result *hts_tpool_next_result_locked(hts_tpool_process *q) {
150
0
    hts_tpool_result *r, *last;
151
152
0
    if (q->shutdown)
153
0
        return NULL;
154
155
0
    for (last = NULL, r = q->output_head; r; last = r, r = r->next) {
156
0
        if (r->serial == q->next_serial)
157
0
            break;
158
0
    }
159
160
0
    if (r) {
161
        // Remove r from out linked list
162
0
        if (q->output_head == r)
163
0
            q->output_head = r->next;
164
0
        else
165
0
            last->next = r->next;
166
167
0
        if (q->output_tail == r)
168
0
            q->output_tail = last;
169
170
0
        if (!q->output_head)
171
0
            q->output_tail = NULL;
172
173
0
        q->next_serial++;
174
0
        q->n_output--;
175
176
0
        if (q->qsize && q->n_output < q->qsize) {
177
            // Not technically input full, but can guarantee there is
178
            // room for the input to go somewhere so we still signal.
179
            // The waiting code will then check the condition again.
180
0
            if (q->n_input < q->qsize)
181
0
                pthread_cond_signal(&q->input_not_full_c);
182
0
            if (!q->shutdown)
183
0
                wake_next_worker(q, 1);
184
0
        }
185
0
    }
186
187
0
    return r;
188
0
}
189
190
/*
191
 * Pulls the next item off the process result queue.  The caller should free
192
 * it (and any internals as appropriate) after use.  This doesn't wait for a
193
 * result to be present.
194
 *
195
 * Results will be returned in strict order.
196
 *
197
 * Returns hts_tpool_result pointer if a result is ready.
198
 *         NULL if not.
199
 */
200
0
hts_tpool_result *hts_tpool_next_result(hts_tpool_process *q) {
201
0
    hts_tpool_result *r;
202
203
0
    DBG_OUT(stderr, "Requesting next result on queue %p\n", q);
204
205
0
    pthread_mutex_lock(&q->p->pool_m);
206
0
    r = hts_tpool_next_result_locked(q);
207
0
    pthread_mutex_unlock(&q->p->pool_m);
208
209
0
    DBG_OUT(stderr, "(q=%p) Found %p\n", q, r);
210
211
0
    return r;
212
0
}
213
214
/*
215
 * Pulls the next item off the process result queue.  The caller should free
216
 * it (and any internals as appropriate) after use.  This will wait for
217
 * a result to be present if none are currently available.
218
 *
219
 * Results will be returned in strict order.
220
 *
221
 * Returns hts_tpool_result pointer if a result is ready.
222
 *         NULL on error or during shutdown.
223
 */
224
0
hts_tpool_result *hts_tpool_next_result_wait(hts_tpool_process *q) {
225
0
    hts_tpool_result *r;
226
227
0
    pthread_mutex_lock(&q->p->pool_m);
228
0
    while (!(r = hts_tpool_next_result_locked(q))) {
229
        /* Possible race here now avoided via _locked() call, but in case... */
230
0
        struct timeval now;
231
0
        struct timespec timeout;
232
233
0
        gettimeofday(&now, NULL);
234
0
        timeout.tv_sec = now.tv_sec + 10;
235
0
        timeout.tv_nsec = now.tv_usec * 1000;
236
237
0
        q->ref_count++;
238
0
        if (q->shutdown) {
239
0
            int rc = --q->ref_count;
240
0
            pthread_mutex_unlock(&q->p->pool_m);
241
0
            if (rc == 0)
242
0
                hts_tpool_process_destroy(q);
243
0
            return NULL;
244
0
        }
245
0
        pthread_cond_timedwait(&q->output_avail_c, &q->p->pool_m, &timeout);
246
247
0
        q->ref_count--;
248
0
    }
249
0
    pthread_mutex_unlock(&q->p->pool_m);
250
251
0
    return r;
252
0
}
253
254
/*
255
 * Returns true if there are no items in the process results queue and
256
 * also none still pending.
257
 */
258
0
int hts_tpool_process_empty(hts_tpool_process *q) {
259
0
    int empty;
260
261
0
    pthread_mutex_lock(&q->p->pool_m);
262
0
    empty = q->n_input == 0 && q->n_processing == 0 && q->n_output == 0;
263
0
    pthread_mutex_unlock(&q->p->pool_m);
264
265
0
    return empty;
266
0
}
267
268
0
void hts_tpool_process_ref_incr(hts_tpool_process *q) {
269
0
    pthread_mutex_lock(&q->p->pool_m);
270
0
    q->ref_count++;
271
0
    pthread_mutex_unlock(&q->p->pool_m);
272
0
}
273
274
0
void hts_tpool_process_ref_decr(hts_tpool_process *q) {
275
0
    pthread_mutex_lock(&q->p->pool_m);
276
0
    if (--q->ref_count <= 0) {
277
0
        pthread_mutex_unlock(&q->p->pool_m);
278
0
        hts_tpool_process_destroy(q);
279
0
        return;
280
0
    }
281
282
    // maybe also call destroy here if needed?
283
0
    pthread_mutex_unlock(&q->p->pool_m);
284
0
}
285
286
/*
287
 * Returns the number of completed jobs in the process results queue.
288
 */
289
0
int hts_tpool_process_len(hts_tpool_process *q) {
290
0
    int len;
291
292
0
    pthread_mutex_lock(&q->p->pool_m);
293
0
    len = q->n_output;
294
0
    pthread_mutex_unlock(&q->p->pool_m);
295
296
0
    return len;
297
0
}
298
299
/*
300
 * Returns the number of completed jobs in the process results queue plus the
301
 * number running and queued up to run.
302
 */
303
0
int hts_tpool_process_sz(hts_tpool_process *q) {
304
0
    int len;
305
306
0
    pthread_mutex_lock(&q->p->pool_m);
307
0
    len = q->n_output + q->n_input + q->n_processing;
308
0
    pthread_mutex_unlock(&q->p->pool_m);
309
310
0
    return len;
311
0
}
312
313
/*
314
 * Shutdown a process.
315
 *
316
 * This sets the shutdown flag and wakes any threads waiting on process
317
 * condition variables.
318
 */
319
0
static void hts_tpool_process_shutdown_locked(hts_tpool_process *q) {
320
0
    q->shutdown = 1;
321
0
    pthread_cond_broadcast(&q->output_avail_c);
322
0
    pthread_cond_broadcast(&q->input_not_full_c);
323
0
    pthread_cond_broadcast(&q->input_empty_c);
324
0
    pthread_cond_broadcast(&q->none_processing_c);
325
0
}
326
327
0
void hts_tpool_process_shutdown(hts_tpool_process *q) {
328
0
    pthread_mutex_lock(&q->p->pool_m);
329
0
    hts_tpool_process_shutdown_locked(q);
330
0
    pthread_mutex_unlock(&q->p->pool_m);
331
0
}
332
333
0
int hts_tpool_process_is_shutdown(hts_tpool_process *q) {
334
0
    pthread_mutex_lock(&q->p->pool_m);
335
0
    int r = q->shutdown;
336
0
    pthread_mutex_unlock(&q->p->pool_m);
337
0
    return r;
338
0
}
339
340
/*
341
 * Frees a result 'r' and if free_data is true also frees
342
 * the internal r->data result too.
343
 */
344
0
void hts_tpool_delete_result(hts_tpool_result *r, int free_data) {
345
0
    if (!r)
346
0
        return;
347
348
0
    if (free_data && r->data)
349
0
        free(r->data);
350
351
0
    free(r);
352
0
}
353
354
/*
355
 * Returns the data portion of a hts_tpool_result, corresponding
356
 * to the actual "result" itself.
357
 */
358
0
void *hts_tpool_result_data(hts_tpool_result *r) {
359
0
    return r->data;
360
0
}
361
362
/*
363
 * Initialises a thread process-queue.
364
 *
365
 * In_only, if true, indicates that the process generates does not need to
366
 * hold any output.  Otherwise an output queue is used to store the results
367
 * of processing each input job.
368
 *
369
 * Results hts_tpool_process pointer on success;
370
 *         NULL on failure
371
 */
372
0
hts_tpool_process *hts_tpool_process_init(hts_tpool *p, int qsize, int in_only) {
373
0
    hts_tpool_process *q = malloc(sizeof(*q));
374
0
    if (!q)
375
0
        return NULL;
376
377
0
    pthread_cond_init(&q->output_avail_c,   NULL);
378
0
    pthread_cond_init(&q->input_not_full_c, NULL);
379
0
    pthread_cond_init(&q->input_empty_c,    NULL);
380
0
    pthread_cond_init(&q->none_processing_c,NULL);
381
382
0
    q->p           = p;
383
0
    q->input_head  = NULL;
384
0
    q->input_tail  = NULL;
385
0
    q->output_head = NULL;
386
0
    q->output_tail = NULL;
387
0
    q->next_serial = 0;
388
0
    q->curr_serial = 0;
389
0
    q->no_more_input = 0;
390
0
    q->n_input     = 0;
391
0
    q->n_output    = 0;
392
0
    q->n_processing= 0;
393
0
    q->qsize       = qsize;
394
0
    q->in_only     = in_only;
395
0
    q->shutdown    = 0;
396
0
    q->wake_dispatch = 0;
397
0
    q->ref_count   = 1;
398
399
0
    q->next        = NULL;
400
0
    q->prev        = NULL;
401
402
0
    hts_tpool_process_attach(p, q);
403
404
0
    return q;
405
0
}
406
407
/* Deallocates memory for a thread process-queue.
408
 * Must be called before the thread pool is destroyed.
409
 */
410
0
void hts_tpool_process_destroy(hts_tpool_process *q) {
411
0
    DBG_OUT(stderr, "Destroying results queue %p\n", q);
412
413
0
    if (!q)
414
0
        return;
415
416
    // Prevent dispatch from queuing up any more jobs.
417
    // We want to reset (and flush) the queue here, before
418
    // we set the shutdown flag, but we need to avoid races
419
    // with queue more input during reset.
420
0
    pthread_mutex_lock(&q->p->pool_m);
421
0
    q->no_more_input = 1;
422
0
    pthread_mutex_unlock(&q->p->pool_m);
423
424
    // Ensure it's fully drained before destroying the queue
425
0
    hts_tpool_process_reset(q, 0);
426
0
    pthread_mutex_lock(&q->p->pool_m);
427
0
    hts_tpool_process_detach_locked(q->p, q);
428
0
    hts_tpool_process_shutdown_locked(q);
429
430
    // Maybe a worker is scanning this queue, so delay destruction
431
0
    if (--q->ref_count > 0) {
432
0
        pthread_mutex_unlock(&q->p->pool_m);
433
0
        return;
434
0
    }
435
436
0
    pthread_cond_destroy(&q->output_avail_c);
437
0
    pthread_cond_destroy(&q->input_not_full_c);
438
0
    pthread_cond_destroy(&q->input_empty_c);
439
0
    pthread_cond_destroy(&q->none_processing_c);
440
0
    pthread_mutex_unlock(&q->p->pool_m);
441
442
0
    free(q);
443
444
0
    DBG_OUT(stderr, "Destroyed results queue %p\n", q);
445
0
}
446
447
448
/*
449
 * Attach and detach a thread process-queue with / from the thread pool
450
 * scheduler.
451
 *
452
 * We need to do attach after making a thread process, but may also wish
453
 * to temporarily detach if we wish to stop running jobs on a specific
454
 * process while permitting other process to continue.
455
 */
456
0
void hts_tpool_process_attach(hts_tpool *p, hts_tpool_process *q) {
457
0
    pthread_mutex_lock(&p->pool_m);
458
0
    if (p->q_head) {
459
0
        q->next = p->q_head;
460
0
        q->prev = p->q_head->prev;
461
0
        p->q_head->prev->next = q;
462
0
        p->q_head->prev = q;
463
0
    } else {
464
0
        q->next = q;
465
0
        q->prev = q;
466
0
    }
467
0
    p->q_head = q;
468
0
    assert(p->q_head && p->q_head->prev && p->q_head->next);
469
0
    pthread_mutex_unlock(&p->pool_m);
470
0
}
471
472
static void hts_tpool_process_detach_locked(hts_tpool *p,
473
0
                                            hts_tpool_process *q) {
474
0
    if (!p->q_head || !q->prev || !q->next)
475
0
        return;
476
477
0
    hts_tpool_process *curr = p->q_head, *first = curr;
478
0
    do {
479
0
        if (curr == q) {
480
0
            q->next->prev = q->prev;
481
0
            q->prev->next = q->next;
482
0
            p->q_head = q->next;
483
0
            q->next = q->prev = NULL;
484
485
            // Last one
486
0
            if (p->q_head == q)
487
0
                p->q_head = NULL;
488
0
            break;
489
0
        }
490
491
0
        curr = curr->next;
492
0
    } while (curr != first);
493
0
}
494
495
0
void hts_tpool_process_detach(hts_tpool *p, hts_tpool_process *q) {
496
0
    pthread_mutex_lock(&p->pool_m);
497
0
    hts_tpool_process_detach_locked(p, q);
498
0
    pthread_mutex_unlock(&p->pool_m);
499
0
}
500
501
502
/* ----------------------------------------------------------------------------
503
 * The thread pool.
504
 */
505
506
#define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
507
508
/*
509
 * A worker thread.
510
 *
511
 * Once woken, each thread checks each process-queue in the pool in turn,
512
 * looking for input jobs that also have room for the output (if it requires
513
 * storing).  If found, we execute it and repeat.
514
 *
515
 * If we checked all input queues and find no such job, then we wait until we
516
 * are signalled to check again.
517
 */
518
0
static void *tpool_worker(void *arg) {
519
0
    hts_tpool_worker *w = (hts_tpool_worker *)arg;
520
0
    hts_tpool *p = w->p;
521
0
    hts_tpool_job *j;
522
523
0
    pthread_mutex_lock(&p->pool_m);
524
0
    while (!p->shutdown) {
525
        // Pop an item off the pool queue
526
527
0
        assert(p->q_head == 0 || (p->q_head->prev && p->q_head->next));
528
529
0
        int work_to_do = 0;
530
0
        hts_tpool_process *first = p->q_head, *q = first;
531
0
        do {
532
            // Iterate over queues, finding one with jobs and also
533
            // room to put the result.
534
            //if (q && q->input_head && !hts_tpool_process_output_full(q)) {
535
0
            if (q && q->input_head
536
0
                && q->qsize - q->n_output > q->n_processing
537
0
                && !q->shutdown) {
538
0
                work_to_do = 1;
539
0
                break;
540
0
            }
541
542
0
            if (q) q = q->next;
543
0
        } while (q && q != first);
544
545
0
        if (!work_to_do) {
546
            // We scanned all queues and cannot process any, so we wait.
547
0
            p->nwaiting++;
548
549
            // Push this thread to the top of the waiting stack
550
0
            if (p->t_stack_top == -1 || p->t_stack_top > w->idx)
551
0
                p->t_stack_top = w->idx;
552
553
0
            p->t_stack[w->idx] = 1;
554
//            printf("%2d: no work.  In=%d Proc=%d Out=%d  full=%d\n",
555
//                   w->idx, p->q_head->n_input, p->q_head->n_processing, p->q_head->n_output,
556
//                   hts_tpool_process_output_full(p->q_head));
557
0
            pthread_cond_wait(&w->pending_c, &p->pool_m);
558
0
            p->t_stack[w->idx] = 0;
559
560
            /* Find new t_stack_top */
561
0
            int i;
562
0
            p->t_stack_top = -1;
563
0
            for (i = 0; i < p->tsize; i++) {
564
0
                if (p->t_stack[i]) {
565
0
                    p->t_stack_top = i;
566
0
                    break;
567
0
                }
568
0
            }
569
570
0
            p->nwaiting--;
571
0
            continue; // To outer loop.
572
0
        }
573
574
        // Otherwise work_to_do, so process as many items in this queue as
575
        // possible before switching to another queue.  This means threads
576
        // often end up being dedicated to one type of work.
577
0
        q->ref_count++;
578
0
        while (q->input_head && q->qsize - q->n_output > q->n_processing) {
579
0
            if (p->shutdown)
580
0
                goto shutdown;
581
582
0
            if (q->shutdown)
583
                // Queue shutdown, but there may be other queues
584
0
                break;
585
586
0
            j = q->input_head;
587
0
            assert(j->p == p);
588
589
0
            if (!(q->input_head = j->next))
590
0
                q->input_tail = NULL;
591
592
            // Transitioning from full queue to not-full means we can wake up
593
            // any blocked dispatch threads.  We broadcast this as it's only
594
            // happening once (on the transition) rather than every time we
595
            // are below qsize.
596
            // (I wish I could remember why io_lib rev 3660 changed this from
597
            //  == to >=, but keeping it just in case!)
598
0
            q->n_processing++;
599
0
            if (q->n_input-- >= q->qsize)
600
0
                pthread_cond_broadcast(&q->input_not_full_c);
601
602
0
            if (q->n_input == 0)
603
0
                pthread_cond_signal(&q->input_empty_c);
604
605
0
            p->njobs--; // Total number of jobs; used to adjust to CPU scaling
606
607
0
            pthread_mutex_unlock(&p->pool_m);
608
609
0
            DBG_OUT(stderr, "%d: Processing queue %p, serial %"PRId64"\n",
610
0
                    hts_tpool_worker_id(j->p), q, j->serial);
611
612
0
            if (hts_tpool_add_result(j, j->func(j->arg)) < 0)
613
0
                goto err;
614
            //memset(j, 0xbb, sizeof(*j));
615
0
            free(j);
616
617
0
            pthread_mutex_lock(&p->pool_m);
618
0
        }
619
0
        if (--q->ref_count == 0) { // we were the last user
620
0
            hts_tpool_process_destroy(q);
621
0
        } else {
622
            // Out of jobs on this queue, so restart search from next one.
623
            // This is equivalent to "work-stealing".
624
0
            if (p->q_head)
625
0
                p->q_head = p->q_head->next;
626
0
        }
627
0
    }
628
629
0
 shutdown:
630
0
    pthread_mutex_unlock(&p->pool_m);
631
#ifdef DEBUG
632
    fprintf(stderr, "%d: Shutting down\n", hts_tpool_worker_id(p));
633
#endif
634
0
    return NULL;
635
636
0
 err:
637
#ifdef DEBUG
638
    fprintf(stderr, "%d: Failed to add result\n", hts_tpool_worker_id(p));
639
#endif
640
    // Hard failure, so shutdown all queues
641
0
    pthread_mutex_lock(&p->pool_m);
642
0
    hts_tpool_process *first = p->q_head, *q = first;
643
0
    if (q) {
644
0
        do {
645
0
            hts_tpool_process_shutdown_locked(q);
646
0
            q->shutdown = 2; // signify error.
647
0
            q = q->next;
648
0
        } while (q != first);
649
0
    }
650
0
    pthread_mutex_unlock(&p->pool_m);
651
0
    return NULL;
652
0
}
653
654
0
static void wake_next_worker(hts_tpool_process *q, int locked) {
655
0
    if (!q) return;
656
0
    hts_tpool *p = q->p;
657
0
    if (!locked)
658
0
        pthread_mutex_lock(&p->pool_m);
659
660
    // Update the q_head to be this queue so we'll start processing
661
    // the queue we know to have results.
662
0
    assert(q->prev && q->next); // attached
663
0
    p->q_head = q;
664
665
    // Wake up if we have more jobs waiting than CPUs. This partially combats
666
    // CPU frequency scaling effects.  Starting too many threads and then
667
    // running out of jobs can cause each thread to have lots of start/stop
668
    // cycles, which then translates often to CPU frequency scaling
669
    // adjustments.  Instead it is better to only start as many threads as we
670
    // need to keep the throughput up, meaning some threads run flat out and
671
    // others are idle.
672
    //
673
    // This isn't perfect as we need to know how many can actually start,
674
    // rather than how many are waiting.  A limit on output queue size makes
675
    // these two figures different.
676
0
    assert(p->njobs >= q->n_input);
677
678
0
    int running = p->tsize - p->nwaiting;
679
0
    int sig = p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting
680
0
        && (q->n_processing < q->qsize - q->n_output);
681
682
//#define AVG_USAGE
683
#ifdef AVG_USAGE
684
    // Track average number of running threads and try to keep close.
685
    // We permit this to change, but slowly.  This avoids "boom and bust" cycles
686
    // where we read a lot of data, start a lot of jobs, then become idle again.
687
    // This way some threads run steadily and others dormant, which is better
688
    // for throughput.
689
    //
690
    // It's 50:50 if this is a good thing.  It helps some tasks quite significantly
691
    // while slightly hindering other (perhaps more usual) jobs.
692
693
    if (++p->n_count == 256) {
694
        p->n_count >>= 1;
695
        p->n_running >>= 1;
696
    }
697
    p->n_running += running;
698
    // Built in lag to avoid see-sawing.  Is this safe in all cases?
699
    if (sig && p->n_count >= 128 && running*p->n_count > p->n_running+1) sig=0;
700
#endif
701
702
0
    if (0) {
703
0
        printf("%d waiting, %d running, %d output, %d, arun %d => %d\t", p->njobs,
704
0
               running, q->n_output, q->qsize - q->n_output,
705
0
               p->n_running/p->n_count, sig);
706
0
        int i;
707
0
        for (i = 0; i < p->tsize; i++)
708
0
            putchar("x "[p->t_stack[i]]);
709
0
        putchar('\n');
710
0
    }
711
712
0
    if (sig)
713
0
        pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
714
715
0
    if (!locked)
716
0
        pthread_mutex_unlock(&p->pool_m);
717
0
}
718
719
/*
720
 * Creates a worker pool with n worker threads.
721
 *
722
 * Returns pool pointer on success;
723
 *         NULL on failure
724
 */
725
0
hts_tpool *hts_tpool_init(int n) {
726
0
    int t_idx = 0;
727
0
    size_t stack_size = 0;
728
0
    pthread_attr_t pattr;
729
0
    int pattr_init_done = 0;
730
0
    hts_tpool *p = malloc(sizeof(*p));
731
0
    if (!p)
732
0
        return NULL;
733
0
    p->tsize = n;
734
0
    p->njobs = 0;
735
0
    p->nwaiting = 0;
736
0
    p->shutdown = 0;
737
0
    p->q_head = NULL;
738
0
    p->t_stack = NULL;
739
0
    p->n_count = 0;
740
0
    p->n_running = 0;
741
0
    p->t = malloc(n * sizeof(p->t[0]));
742
0
    if (!p->t) {
743
0
        free(p);
744
0
        return NULL;
745
0
    }
746
0
    p->t_stack = malloc(n * sizeof(*p->t_stack));
747
0
    if (!p->t_stack) {
748
0
        free(p->t);
749
0
        free(p);
750
0
        return NULL;
751
0
    }
752
0
    p->t_stack_top = -1;
753
754
0
    pthread_mutexattr_t attr;
755
0
    pthread_mutexattr_init(&attr);
756
0
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
757
0
    pthread_mutex_init(&p->pool_m, &attr);
758
0
    pthread_mutexattr_destroy(&attr);
759
760
0
    pthread_mutex_lock(&p->pool_m);
761
762
    // Ensure new threads have a reasonably large stack.  On some platforms,
763
    // for example MacOS which defaults to 512Kb, this is not big enough
764
    // for some of the rANS codecs.
765
766
0
    if (pthread_attr_init(&pattr) < 0)
767
0
        goto cleanup;
768
0
    pattr_init_done = 1;
769
0
    if (pthread_attr_getstacksize(&pattr, &stack_size) < 0)
770
0
        goto cleanup;
771
0
    if (stack_size < HTS_MIN_THREAD_STACK) {
772
0
        if (pthread_attr_setstacksize(&pattr, HTS_MIN_THREAD_STACK) < 0)
773
0
            goto cleanup;
774
0
    }
775
776
0
    for (t_idx = 0; t_idx < n; t_idx++) {
777
0
        hts_tpool_worker *w = &p->t[t_idx];
778
0
        p->t_stack[t_idx] = 0;
779
0
        w->p = p;
780
0
        w->idx = t_idx;
781
0
        pthread_cond_init(&w->pending_c, NULL);
782
0
        if (0 != pthread_create(&w->tid, &pattr, tpool_worker, w))
783
0
            goto cleanup;
784
0
    }
785
786
0
    pthread_mutex_unlock(&p->pool_m);
787
0
    pthread_attr_destroy(&pattr);
788
789
0
    return p;
790
791
0
 cleanup: {
792
        // Any threads started will be waiting for p->pool_m, so we can
793
        // stop them cleanly by setting p->shutdown, releasing the mutex and
794
        // waiting for them to finish.
795
0
        int j;
796
0
        int save_errno = errno;
797
0
        hts_log_error("Couldn't start thread pool worker : %s",
798
0
                      strerror(errno));
799
0
        p->shutdown = 1;
800
0
        pthread_mutex_unlock(&p->pool_m);
801
0
        for (j = 0; j < t_idx; j++) {
802
0
            pthread_join(p->t[j].tid, NULL);
803
0
            pthread_cond_destroy(&p->t[j].pending_c);
804
0
        }
805
0
        pthread_mutex_destroy(&p->pool_m);
806
0
        if (pattr_init_done)
807
0
            pthread_attr_destroy(&pattr);
808
0
        free(p->t_stack);
809
0
        free(p->t);
810
0
        free(p);
811
0
        errno = save_errno;
812
0
        return NULL;
813
0
    }
814
0
}
815
816
/*
817
 * Returns the number of requested threads for a pool.
818
 */
819
0
int hts_tpool_size(hts_tpool *p) {
820
0
    return p->tsize;
821
0
}
822
823
/*
824
 * Adds an item to the work pool.
825
 *
826
 * Returns 0 on success
827
 *        -1 on failure
828
 */
829
int hts_tpool_dispatch(hts_tpool *p, hts_tpool_process *q,
830
0
                    void *(*func)(void *arg), void *arg) {
831
0
    return hts_tpool_dispatch3(p, q, func, arg, NULL, NULL, 0);
832
0
}
833
834
/*
835
 * As above but optional non-block flag.
836
 *
837
 * nonblock  0 => block if input queue is full
838
 * nonblock +1 => don't block if input queue is full, but do not add task
839
 * nonblock -1 => add task regardless of whether queue is full (over-size)
840
 */
841
int hts_tpool_dispatch2(hts_tpool *p, hts_tpool_process *q,
842
0
                        void *(*func)(void *arg), void *arg, int nonblock) {
843
0
    return hts_tpool_dispatch3(p, q, func, arg, NULL, NULL, nonblock);
844
0
}
845
846
int hts_tpool_dispatch3(hts_tpool *p, hts_tpool_process *q,
847
                        void *(*exec_func)(void *arg), void *arg,
848
                        void (*job_cleanup)(void *arg),
849
                        void (*result_cleanup)(void *data),
850
0
                        int nonblock) {
851
0
    hts_tpool_job *j;
852
853
0
    pthread_mutex_lock(&p->pool_m);
854
855
0
    DBG_OUT(stderr, "Dispatching job for queue %p, serial %"PRId64"\n",
856
0
            q, q->curr_serial);
857
858
0
    if ((q->no_more_input || q->n_input >= q->qsize) && nonblock == 1) {
859
0
        pthread_mutex_unlock(&p->pool_m);
860
0
        errno = EAGAIN;
861
0
        return -1;
862
0
    }
863
864
0
    if (!(j = malloc(sizeof(*j)))) {
865
0
        pthread_mutex_unlock(&p->pool_m);
866
0
        return -1;
867
0
    }
868
0
    j->func = exec_func;
869
0
    j->arg = arg;
870
0
    j->job_cleanup = job_cleanup;
871
0
    j->result_cleanup = result_cleanup;
872
0
    j->next = NULL;
873
0
    j->p = p;
874
0
    j->q = q;
875
0
    j->serial = q->curr_serial++;
876
877
0
    if (nonblock == 0) {
878
0
        while ((q->no_more_input || q->n_input >= q->qsize) &&
879
0
               !q->shutdown && !q->wake_dispatch) {
880
0
            pthread_cond_wait(&q->input_not_full_c, &q->p->pool_m);
881
0
        }
882
0
        if (q->no_more_input || q->shutdown) {
883
0
            free(j);
884
0
            pthread_mutex_unlock(&p->pool_m);
885
0
            return -1;
886
0
        }
887
0
        if (q->wake_dispatch) {
888
            //fprintf(stderr, "Wake => non-block for this operation\n");
889
0
            q->wake_dispatch = 0;
890
0
        }
891
0
    }
892
893
0
    p->njobs++;    // total across all queues
894
0
    q->n_input++;  // queue specific
895
896
0
    if (q->input_tail) {
897
0
        q->input_tail->next = j;
898
0
        q->input_tail = j;
899
0
    } else {
900
0
        q->input_head = q->input_tail = j;
901
0
    }
902
903
0
    DBG_OUT(stderr, "Dispatched (serial %"PRId64")\n", j->serial);
904
905
    // Let a worker know we have data.
906
    // Keep incoming queue at 1 per running thread, so there is always
907
    // something waiting when they end their current task.  If we go above
908
    // this signal to start more threads (if available). This has the effect
909
    // of concentrating jobs to fewer cores when we are I/O bound, which in
910
    // turn benefits systems with auto CPU frequency scaling.
911
0
    if (!q->shutdown)
912
0
        wake_next_worker(q, 1);
913
914
0
    pthread_mutex_unlock(&p->pool_m);
915
916
0
    return 0;
917
0
}
918
919
/*
920
 * Wakes up a single thread stuck in dispatch and make it return with
921
 * errno EAGAIN.
922
 */
923
0
void hts_tpool_wake_dispatch(hts_tpool_process *q) {
924
0
    pthread_mutex_lock(&q->p->pool_m);
925
0
    q->wake_dispatch = 1;
926
0
    pthread_cond_signal(&q->input_not_full_c);
927
0
    pthread_mutex_unlock(&q->p->pool_m);
928
0
}
929
930
/*
931
 * Flushes the process-queue, but doesn't exit. This simply drains the queue
932
 * and ensures all worker threads have finished their current tasks
933
 * associated with this process.
934
 *
935
 * NOT: This does not mean the worker threads are not executing jobs in
936
 * another process-queue.
937
 *
938
 * Returns 0 on success;
939
 *        -1 on failure
940
 */
941
0
int hts_tpool_process_flush(hts_tpool_process *q) {
942
0
    int i;
943
0
    hts_tpool *p = q->p;
944
945
0
    DBG_OUT(stderr, "Flushing pool %p\n", p);
946
947
    // Drains the queue
948
0
    pthread_mutex_lock(&p->pool_m);
949
950
    // Wake up everything for the final sprint!
951
0
    for (i = 0; i < p->tsize; i++)
952
0
        if (p->t_stack[i])
953
0
            pthread_cond_signal(&p->t[i].pending_c);
954
955
    // Ensure there is room for the final sprint.
956
    // Ideally we shouldn't get here, but the "q->qsize - q->n_output >
957
    // n_processing" check in tpool_worker means we can trigger a
958
    // deadlock there.  This negates that possibility.
959
0
    if (q->qsize < q->n_output + q->n_input + q->n_processing)
960
0
        q->qsize = q->n_output + q->n_input + q->n_processing;
961
962
    // When shutdown, we won't be launching more, but we can still
963
    // wait for any processing jobs complete.
964
0
    if (q->shutdown) {
965
0
        while (q->n_processing)
966
0
            pthread_cond_wait(&q->none_processing_c, &p->pool_m);
967
0
    }
968
969
    // Wait for n_input and n_processing to hit zero.
970
0
    while (!q->shutdown && (q->n_input || q->n_processing)) {
971
0
        struct timeval now;
972
0
        struct timespec timeout;
973
974
0
        while (q->n_input && !q->shutdown) {
975
0
            gettimeofday(&now, NULL);
976
0
            timeout.tv_sec = now.tv_sec + 1;
977
0
            timeout.tv_nsec = now.tv_usec * 1000;
978
0
            pthread_cond_timedwait(&q->input_empty_c, &p->pool_m, &timeout);
979
0
        }
980
981
        // Note: even if q->shutdown is set, we still have to wait until
982
        // q->n_processing is zero as we cannot terminate while things are
983
        // running otherwise we free up the data being worked on.
984
0
        while (q->n_processing) {
985
0
            gettimeofday(&now, NULL);
986
0
            timeout.tv_sec = now.tv_sec + 1;
987
0
            timeout.tv_nsec = now.tv_usec * 1000;
988
0
            pthread_cond_timedwait(&q->none_processing_c, &p->pool_m,
989
0
                                   &timeout);
990
0
        }
991
0
        if (q->shutdown) break;
992
0
    }
993
994
0
    pthread_mutex_unlock(&p->pool_m);
995
996
0
    DBG_OUT(stderr, "Flushed complete for pool %p, queue %p\n", p, q);
997
998
0
    return 0;
999
0
}
1000
1001
/*
1002
 * Resets a process to the initial state.
1003
 *
1004
 * This removes any queued up input jobs, disables any notification of
1005
 * new results/output, flushes what is left and then discards any
1006
 * queued output.  Anything consumer stuck in a wait on results to
1007
 * appear should stay stuck and will only wake up when new data is
1008
 * pushed through the queue.
1009
 *
1010
 * Returns 0 on success;
1011
 *        -1 on failure
1012
 */
1013
0
int hts_tpool_process_reset(hts_tpool_process *q, int free_results) {
1014
0
    hts_tpool_job *j, *jn, *j_head;
1015
0
    hts_tpool_result *r, *rn, *r_head;
1016
1017
0
    pthread_mutex_lock(&q->p->pool_m);
1018
    // prevent next_result from returning data during our flush
1019
0
    q->next_serial = INT_MAX;
1020
1021
    // Remove any queued input not yet being acted upon
1022
0
    j_head = q->input_head;
1023
0
    q->input_head = q->input_tail = NULL;
1024
0
    q->n_input = 0;
1025
1026
    // Remove any queued output, thus ensuring we have room to flush.
1027
0
    r_head = q->output_head;
1028
0
    q->output_head = q->output_tail = NULL;
1029
0
    q->n_output = 0;
1030
0
    pthread_mutex_unlock(&q->p->pool_m);
1031
1032
    // Release memory.  This can be done unlocked now the lists have been
1033
    // removed from the queue
1034
0
    for (j = j_head; j; j = jn) {
1035
0
        jn = j->next;
1036
0
        if (j->job_cleanup) j->job_cleanup(j->arg);
1037
0
        free(j);
1038
0
    }
1039
1040
0
    for (r = r_head; r; r = rn) {
1041
0
        rn = r->next;
1042
0
        if (r->result_cleanup) {
1043
0
            r->result_cleanup(r->data);
1044
0
            r->data = NULL;
1045
0
        }
1046
0
        hts_tpool_delete_result(r, free_results);
1047
0
    }
1048
1049
    // Wait for any jobs being processed to complete.
1050
    // (TODO: consider how to cancel any currently processing jobs.
1051
    // Probably this is too hard.)
1052
0
    if (hts_tpool_process_flush(q) != 0)
1053
0
        return -1;
1054
1055
    // Remove any new output.
1056
0
    pthread_mutex_lock(&q->p->pool_m);
1057
0
    r_head = q->output_head;
1058
0
    q->output_head = q->output_tail = NULL;
1059
0
    q->n_output = 0;
1060
1061
    // Finally reset the serial back to the starting point.
1062
0
    q->next_serial = q->curr_serial = 0;
1063
0
    pthread_cond_signal(&q->input_not_full_c);
1064
0
    pthread_mutex_unlock(&q->p->pool_m);
1065
1066
    // Discard unwanted output
1067
0
    for (r = r_head; r; r = rn) {
1068
        //fprintf(stderr, "Discard output %d\n", r->serial);
1069
0
        rn = r->next;
1070
0
        if (r->result_cleanup) {
1071
0
            r->result_cleanup(r->data);
1072
0
            r->data = NULL;
1073
0
        }
1074
0
        hts_tpool_delete_result(r, free_results);
1075
0
    }
1076
1077
0
    return 0;
1078
0
}
1079
1080
/* Returns the process queue size */
1081
0
int hts_tpool_process_qsize(hts_tpool_process *q) {
1082
0
    return q->qsize;
1083
0
}
1084
1085
/*
1086
 * Destroys a thread pool.  The threads are joined into the main
1087
 * thread so they will finish their current work load.
1088
 */
1089
0
void hts_tpool_destroy(hts_tpool *p) {
1090
0
    int i;
1091
1092
0
    DBG_OUT(stderr, "Destroying pool %p\n", p);
1093
1094
    /* Send shutdown message to worker threads */
1095
0
    pthread_mutex_lock(&p->pool_m);
1096
0
    p->shutdown = 1;
1097
1098
0
    DBG_OUT(stderr, "Sending shutdown request\n");
1099
1100
0
    for (i = 0; i < p->tsize; i++)
1101
0
        pthread_cond_signal(&p->t[i].pending_c);
1102
1103
0
    pthread_mutex_unlock(&p->pool_m);
1104
1105
0
    DBG_OUT(stderr, "Shutdown complete\n");
1106
1107
0
    for (i = 0; i < p->tsize; i++)
1108
0
        pthread_join(p->t[i].tid, NULL);
1109
1110
0
    pthread_mutex_destroy(&p->pool_m);
1111
0
    for (i = 0; i < p->tsize; i++)
1112
0
        pthread_cond_destroy(&p->t[i].pending_c);
1113
1114
0
    if (p->t_stack)
1115
0
        free(p->t_stack);
1116
1117
0
    free(p->t);
1118
0
    free(p);
1119
1120
0
    DBG_OUT(stderr, "Destroyed pool %p\n", p);
1121
0
}
1122
1123
1124
/*
1125
 * Destroys a thread pool without waiting on jobs to complete.
1126
 * Use hts_tpool_kill(p) to quickly exit after a fatal error.
1127
 */
1128
0
void hts_tpool_kill(hts_tpool *p) {
1129
0
    int i;
1130
1131
0
    DBG_OUT(stderr, "Destroying pool %p, kill=%d\n", p, kill);
1132
1133
0
    for (i = 0; i < p->tsize; i++)
1134
0
        pthread_kill(p->t[i].tid, SIGINT);
1135
1136
0
    pthread_mutex_destroy(&p->pool_m);
1137
0
    for (i = 0; i < p->tsize; i++)
1138
0
        pthread_cond_destroy(&p->t[i].pending_c);
1139
1140
0
    if (p->t_stack)
1141
0
        free(p->t_stack);
1142
1143
0
    free(p->t);
1144
0
    free(p);
1145
1146
0
    DBG_OUT(stderr, "Destroyed pool %p\n", p);
1147
0
}
1148
1149
1150
/*=============================================================================
1151
 * Test app.
1152
 *
1153
 * This can be considered both as a basic test and as a worked example for
1154
 * various usage patterns.
1155
 *=============================================================================
1156
 */
1157
1158
#ifdef TEST_MAIN
1159
1160
#include <stdio.h>
1161
#include "hts_internal.h"
1162
1163
#ifndef TASK_SIZE
1164
#define TASK_SIZE 1000
1165
#endif
1166
1167
/*-----------------------------------------------------------------------------
1168
 * Unordered x -> x*x test.
1169
 * Results arrive in order of completion.
1170
 */
1171
void *doit_square_u(void *arg) {
1172
    int job = *(int *)arg;
1173
1174
    hts_usleep(random() % 100000); // to coerce job completion out of order
1175
1176
    printf("RESULT: %d\n", job*job);
1177
1178
    free(arg);
1179
    return NULL;
1180
}
1181
1182
int test_square_u(int n) {
1183
    hts_tpool *p = hts_tpool_init(n);
1184
    hts_tpool_process *q = hts_tpool_process_init(p, n*2, 1);
1185
    int i;
1186
1187
    // Dispatch jobs
1188
    for (i = 0; i < TASK_SIZE; i++) {
1189
        int *ip = malloc(sizeof(*ip));
1190
        *ip = i;
1191
        hts_tpool_dispatch(p, q, doit_square_u, ip);
1192
    }
1193
1194
    hts_tpool_process_flush(q);
1195
    hts_tpool_process_destroy(q);
1196
    hts_tpool_destroy(p);
1197
1198
    return 0;
1199
}
1200
1201
1202
/*-----------------------------------------------------------------------------
1203
 * Ordered x -> x*x test.
1204
 * Results arrive in numerical order.
1205
 *
1206
 * This implementation uses a non-blocking dispatch to avoid dead-locks
1207
 * where one job takes too long to complete.
1208
 */
1209
void *doit_square(void *arg) {
1210
    int job = *(int *)arg;
1211
    int *res;
1212
1213
    // One excessively slow, to stress test output queue filling and
1214
    // excessive out of order scenarios.
1215
    hts_usleep(500000 * ((job&31)==31) + random() % 10000);
1216
1217
    res = malloc(sizeof(*res));
1218
    *res = (job<0) ? -job*job : job*job;
1219
1220
    free(arg);
1221
    return res;
1222
}
1223
1224
int test_square(int n) {
1225
    hts_tpool *p = hts_tpool_init(n);
1226
    hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
1227
    int i;
1228
    hts_tpool_result *r;
1229
1230
    // Dispatch jobs
1231
    for (i = 0; i < TASK_SIZE; i++) {
1232
        int *ip = malloc(sizeof(*ip));
1233
        *ip = i;
1234
        int blk;
1235
1236
        do {
1237
            // In the situation where some jobs take much longer than
1238
            // others, we could end up blocking here as we haven't got
1239
            // any room in the output queue to place it. (We don't launch a
1240
            // job if the output queue is full.)
1241
1242
            // This happens when the next serial number to fetch is, eg, 50
1243
            // but jobs 51-100 have all executed really fast and appeared in
1244
            // the output queue before 50.  A dispatch & check-results
1245
            // alternating loop can fail to find job 50 many times over until
1246
            // eventually the dispatch blocks before it arrives.
1247
1248
            // Our solution is to dispatch in non-blocking mode so we are
1249
            // always to either dispatch or consume a result.
1250
            blk = hts_tpool_dispatch2(p, q, doit_square, ip, 1);
1251
1252
            // Check for results.
1253
            if ((r = hts_tpool_next_result(q))) {
1254
                printf("RESULT: %d\n", *(int *)hts_tpool_result_data(r));
1255
                hts_tpool_delete_result(r, 1);
1256
            }
1257
            if (blk == -1) {
1258
                // The alternative is a separate thread for dispatching and/or
1259
                // consumption of results. See test_squareB.
1260
                putchar('.'); fflush(stdout);
1261
                hts_usleep(10000);
1262
            }
1263
        } while (blk == -1);
1264
    }
1265
1266
    // Wait for any input-queued up jobs or in-progress jobs to complete.
1267
    hts_tpool_process_flush(q);
1268
1269
    while ((r = hts_tpool_next_result(q))) {
1270
        printf("RESULT: %d\n", *(int *)hts_tpool_result_data(r));
1271
        hts_tpool_delete_result(r, 1);
1272
    }
1273
1274
    hts_tpool_process_destroy(q);
1275
    hts_tpool_destroy(p);
1276
1277
    return 0;
1278
}
1279
1280
/*-----------------------------------------------------------------------------
1281
 * Ordered x -> x*x test.
1282
 * Results arrive in numerical order.
1283
 *
1284
 * This implementation uses separate dispatching threads and job consumption
1285
 * threads (main thread).  This means it can use a blocking calls for
1286
 * simplicity elsewhere.
1287
 */
1288
struct squareB_opt {
1289
    hts_tpool *p;
1290
    hts_tpool_process *q;
1291
    int n;
1292
};
1293
static void *test_squareB_dispatcher(void *arg) {
1294
    struct squareB_opt *o = (struct squareB_opt *)arg;
1295
    int i, *ip;
1296
1297
    for (i = 0; i < o->n; i++) {
1298
        ip = malloc(sizeof(*ip));
1299
        *ip = i;
1300
1301
        hts_tpool_dispatch(o->p, o->q, doit_square, ip);
1302
    }
1303
1304
    // Dispatch an sentinel job to mark the end
1305
    *(ip = malloc(sizeof(*ip))) = -1;
1306
    hts_tpool_dispatch(o->p, o->q, doit_square, ip);
1307
    pthread_exit(NULL);
1308
}
1309
1310
int test_squareB(int n) {
1311
    hts_tpool *p = hts_tpool_init(n);
1312
    hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
1313
    struct squareB_opt o = {p, q, TASK_SIZE};
1314
    pthread_t tid;
1315
1316
    // Launch our job creation thread.
1317
    pthread_create(&tid, NULL, test_squareB_dispatcher, &o);
1318
1319
    // Consume all results until we find the end-of-job marker.
1320
    for(;;) {
1321
        hts_tpool_result *r = hts_tpool_next_result_wait(q);
1322
        int x = *(int *)hts_tpool_result_data(r);
1323
        hts_tpool_delete_result(r, 1);
1324
        if (x == -1)
1325
            break;
1326
        printf("RESULT: %d\n", x);
1327
    }
1328
1329
    // Wait for any input-queued up jobs or in-progress jobs to complete.
1330
    // This should do nothing as we've been executing until the termination
1331
    // marker of -1.
1332
    hts_tpool_process_flush(q);
1333
    assert(hts_tpool_next_result(q) == NULL);
1334
1335
    hts_tpool_process_destroy(q);
1336
    hts_tpool_destroy(p);
1337
    pthread_join(tid, NULL);
1338
1339
    return 0;
1340
}
1341
1342
1343
/*-----------------------------------------------------------------------------
1344
 * A simple pipeline test.
1345
 * We use a dedicated input thread that does the initial generation of job
1346
 * and dispatch, several execution steps running in a shared pool, and a
1347
 * dedicated output thread that prints up the final result.  It's key that our
1348
 * pipeline execution stages can run independently and don't themselves have
1349
 * any waits.  To achieve this we therefore also use some dedicated threads
1350
 * that take the output from one queue and resubmits the job as the input to
1351
 * the next queue.
1352
 *
1353
 * More generally this could perhaps be a single pipeline thread that
1354
 * marshalls multiple queues and their interactions, but this is simply a
1355
 * demonstration of a single pipeline.
1356
 *
1357
 * Our process fills out the bottom byte of a 32-bit int and then shifts it
1358
 * left one byte at a time.  Only the final stage needs to be ordered.  Each
1359
 * stage uses its own queue.
1360
 *
1361
 * Possible improvement: we only need the last stage to be ordered.  By
1362
 * allocating our own serial numbers for the first job and manually setting
1363
 * these serials in the last job, perhaps we can permit out of order execution
1364
 * of all the in-between stages.  (I doubt it'll affect speed much though.)
1365
 */
1366
1367
static void *pipe_input_thread(void *arg);
1368
static void *pipe_stage1(void *arg);
1369
static void *pipe_stage2(void *arg);
1370
static void *pipe_stage3(void *arg);
1371
static void *pipe_output_thread(void *arg);
1372
1373
typedef struct {
1374
    hts_tpool *p;
1375
    hts_tpool_process *q1;
1376
    hts_tpool_process *q2;
1377
    hts_tpool_process *q3;
1378
    int n;
1379
} pipe_opt;
1380
1381
typedef struct {
1382
    pipe_opt *o;
1383
    unsigned int x;
1384
    int eof; // set with last job.
1385
} pipe_job;
1386
1387
static void *pipe_input_thread(void *arg) {
1388
    pipe_opt *o = (pipe_opt *)arg;
1389
1390
    int i;
1391
    for (i = 1; i <= o->n; i++) {
1392
        pipe_job *j = malloc(sizeof(*j));
1393
        j->o = o;
1394
        j->x = i;
1395
        j->eof = (i == o->n);
1396
1397
        printf("I  %08x\n", j->x);
1398
1399
        if (hts_tpool_dispatch(o->p, o->q1, pipe_stage1, j) != 0) {
1400
            free(j);
1401
            pthread_exit((void *)1);
1402
        }
1403
    }
1404
1405
    pthread_exit(NULL);
1406
}
1407
1408
static void *pipe_stage1(void *arg) {
1409
    pipe_job *j = (pipe_job *)arg;
1410
1411
    j->x <<= 8;
1412
    hts_usleep(random() % 10000); // fast job
1413
    printf("1  %08x\n", j->x);
1414
1415
    return j;
1416
}
1417
1418
static void *pipe_stage1to2(void *arg) {
1419
    pipe_opt *o = (pipe_opt *)arg;
1420
    hts_tpool_result *r;
1421
1422
    while ((r = hts_tpool_next_result_wait(o->q1))) {
1423
        pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
1424
        hts_tpool_delete_result(r, 0);
1425
        if (hts_tpool_dispatch(j->o->p, j->o->q2, pipe_stage2, j) != 0)
1426
            pthread_exit((void *)1);
1427
        if (j->eof)
1428
            break;
1429
    }
1430
1431
    pthread_exit(NULL);
1432
}
1433
1434
static void *pipe_stage2(void *arg) {
1435
    pipe_job *j = (pipe_job *)arg;
1436
1437
    j->x <<= 8;
1438
    hts_usleep(random() % 100000); // slow job
1439
    printf("2  %08x\n", j->x);
1440
1441
    return j;
1442
}
1443
1444
static void *pipe_stage2to3(void *arg) {
1445
    pipe_opt *o = (pipe_opt *)arg;
1446
    hts_tpool_result *r;
1447
1448
    while ((r = hts_tpool_next_result_wait(o->q2))) {
1449
        pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
1450
        hts_tpool_delete_result(r, 0);
1451
        if (hts_tpool_dispatch(j->o->p, j->o->q3, pipe_stage3, j) != 0)
1452
            pthread_exit((void *)1);
1453
        if (j->eof)
1454
            break;
1455
    }
1456
1457
    pthread_exit(NULL);
1458
}
1459
1460
static void *pipe_stage3(void *arg) {
1461
    pipe_job *j = (pipe_job *)arg;
1462
1463
    hts_usleep(random() % 10000); // fast job
1464
    j->x <<= 8;
1465
    return j;
1466
}
1467
1468
static void *pipe_output_thread(void *arg) {
1469
    pipe_opt *o = (pipe_opt *)arg;
1470
    hts_tpool_result *r;
1471
1472
    while ((r = hts_tpool_next_result_wait(o->q3))) {
1473
        pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
1474
        int eof = j->eof;
1475
        printf("O  %08x\n", j->x);
1476
        hts_tpool_delete_result(r, 1);
1477
        if (eof)
1478
            break;
1479
    }
1480
1481
    pthread_exit(NULL);
1482
}
1483
1484
int test_pipe(int n) {
1485
    hts_tpool *p = hts_tpool_init(n);
1486
    hts_tpool_process *q1 = hts_tpool_process_init(p, n*2, 0);
1487
    hts_tpool_process *q2 = hts_tpool_process_init(p, n*2, 0);
1488
    hts_tpool_process *q3 = hts_tpool_process_init(p, n*2, 0);
1489
    pipe_opt o = {p, q1, q2, q3, TASK_SIZE};
1490
    pthread_t tidIto1, tid1to2, tid2to3, tid3toO;
1491
    void *retv;
1492
    int ret;
1493
1494
    // Launch our data source and sink threads.
1495
    pthread_create(&tidIto1, NULL, pipe_input_thread,  &o);
1496
    pthread_create(&tid1to2, NULL, pipe_stage1to2,     &o);
1497
    pthread_create(&tid2to3, NULL, pipe_stage2to3,     &o);
1498
    pthread_create(&tid3toO, NULL, pipe_output_thread, &o);
1499
1500
    // Wait for tasks to finish.
1501
    ret = 0;
1502
    pthread_join(tidIto1, &retv); ret |= (retv != NULL);
1503
    pthread_join(tid1to2, &retv); ret |= (retv != NULL);
1504
    pthread_join(tid2to3, &retv); ret |= (retv != NULL);
1505
    pthread_join(tid3toO, &retv); ret |= (retv != NULL);
1506
    printf("Return value %d\n", ret);
1507
1508
    hts_tpool_process_destroy(q1);
1509
    hts_tpool_process_destroy(q2);
1510
    hts_tpool_process_destroy(q3);
1511
    hts_tpool_destroy(p);
1512
1513
    return 0;
1514
}
1515
1516
/*-----------------------------------------------------------------------------*/
1517
int main(int argc, char **argv) {
1518
    int n;
1519
    srandom(0);
1520
1521
    if (argc < 3) {
1522
        fprintf(stderr, "Usage: %s command n_threads\n", argv[0]);
1523
        fprintf(stderr, "Where commands are:\n\n");
1524
        fprintf(stderr, "unordered       # Unordered output\n");
1525
        fprintf(stderr, "ordered1        # Main thread with non-block API\n");
1526
        fprintf(stderr, "ordered2        # Dispatch thread, blocking API\n");
1527
        fprintf(stderr, "pipe            # Multi-stage pipeline, several queues\n");
1528
        exit(1);
1529
    }
1530
1531
    n = atoi(argv[2]);
1532
    if (strcmp(argv[1], "unordered") == 0) return test_square_u(n);
1533
    if (strcmp(argv[1], "ordered1") == 0)  return test_square(n);
1534
    if (strcmp(argv[1], "ordered2") == 0)  return test_squareB(n);
1535
    if (strcmp(argv[1], "pipe") == 0)      return test_pipe(n);
1536
1537
    fprintf(stderr, "Unknown sub-command\n");
1538
    exit(1);
1539
}
1540
#endif