Coverage Report

Created: 2026-04-01 06:40

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/unit/src/nxt_thread_pool.c
Line
Count
Source
1
2
/*
3
 * Copyright (C) Igor Sysoev
4
 * Copyright (C) NGINX, Inc.
5
 */
6
7
#include <nxt_main.h>
8
9
10
static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
11
static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
12
static void nxt_thread_pool_start(void *ctx);
13
static void nxt_thread_pool_loop(void *ctx);
14
static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
15
16
17
nxt_thread_pool_t *
18
nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
19
    nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
20
    nxt_work_handler_t exit)
21
0
{
22
0
    nxt_thread_pool_t  *tp;
23
24
0
    tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
25
0
    if (tp == NULL) {
26
0
        return NULL;
27
0
    }
28
29
0
    tp->max_threads = max_threads;
30
0
    tp->timeout = timeout;
31
0
    tp->engine = engine;
32
0
    tp->task.thread = engine->task.thread;
33
0
    tp->task.log = engine->task.log;
34
0
    tp->init = init;
35
0
    tp->exit = exit;
36
37
0
    return tp;
38
0
}
39
40
41
nxt_int_t
42
nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work)
43
0
{
44
0
    nxt_thread_log_debug("thread pool post");
45
46
0
    if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
47
0
        return NXT_ERROR;
48
0
    }
49
50
0
    nxt_locked_work_queue_add(&tp->work_queue, work);
51
52
0
    (void) nxt_sem_post(&tp->sem);
53
54
0
    return NXT_OK;
55
0
}
56
57
58
static nxt_int_t
59
nxt_thread_pool_init(nxt_thread_pool_t *tp)
60
0
{
61
0
    nxt_int_t            ret;
62
0
    nxt_thread_link_t    *link;
63
0
    nxt_thread_handle_t  handle;
64
65
0
    if (nxt_fast_path(tp->ready)) {
66
0
        return NXT_OK;
67
0
    }
68
69
0
    if (tp->max_threads == 0) {
70
        /* The pool is being destroyed. */
71
0
        return NXT_ERROR;
72
0
    }
73
74
0
    nxt_thread_spin_lock(&tp->work_queue.lock);
75
76
0
    ret = NXT_OK;
77
78
0
    if (!tp->ready) {
79
80
0
        nxt_thread_log_debug("thread pool init");
81
82
0
        (void) nxt_atomic_fetch_add(&tp->threads, 1);
83
84
0
        if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
85
86
0
            link = nxt_zalloc(sizeof(nxt_thread_link_t));
87
88
0
            if (nxt_fast_path(link != NULL)) {
89
0
                link->start = nxt_thread_pool_start;
90
0
                link->work.data = tp;
91
92
0
                if (nxt_thread_create(&handle, link) == NXT_OK) {
93
0
                    tp->ready = 1;
94
0
                    goto done;
95
0
                }
96
0
            }
97
98
0
            nxt_sem_destroy(&tp->sem);
99
0
        }
100
101
0
        (void) nxt_atomic_fetch_add(&tp->threads, -1);
102
103
0
        ret = NXT_ERROR;
104
0
    }
105
106
0
done:
107
108
0
    nxt_thread_spin_unlock(&tp->work_queue.lock);
109
110
0
    return ret;
111
0
}
112
113
114
static void
115
nxt_thread_pool_start(void *ctx)
116
0
{
117
0
    nxt_thread_t       *thr;
118
0
    nxt_thread_pool_t  *tp;
119
120
0
    tp = ctx;
121
0
    thr = nxt_thread();
122
123
0
    tp->main = thr->handle;
124
0
    tp->task.thread = thr;
125
126
0
    nxt_thread_pool_loop(ctx);
127
0
}
128
129
130
static void
131
nxt_thread_pool_loop(void *ctx)
132
0
{
133
0
    void                *obj, *data;
134
0
    nxt_task_t          *task;
135
0
    nxt_thread_t        *thr;
136
0
    nxt_thread_pool_t   *tp;
137
0
    nxt_work_handler_t  handler;
138
139
0
    tp = ctx;
140
0
    thr = nxt_thread();
141
142
0
    if (tp->init != NULL) {
143
0
        tp->init();
144
0
    }
145
146
0
    for ( ;; ) {
147
0
        nxt_thread_pool_wait(tp);
148
149
0
        handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj,
150
0
                                            &data);
151
152
0
        if (nxt_fast_path(handler != NULL)) {
153
0
            task->thread = thr;
154
155
0
            nxt_log_debug(thr->log, "locked work queue");
156
157
0
            handler(task, obj, data);
158
0
        }
159
160
0
        thr->log = &nxt_main_log;
161
0
    }
162
0
}
163
164
165
static void
166
nxt_thread_pool_wait(nxt_thread_pool_t *tp)
167
0
{
168
0
    nxt_err_t            err;
169
0
    nxt_thread_t         *thr;
170
0
    nxt_atomic_uint_t    waiting, threads;
171
0
    nxt_thread_link_t    *link;
172
0
    nxt_thread_handle_t  handle;
173
174
0
    thr = nxt_thread();
175
176
0
    nxt_log_debug(thr->log, "thread pool wait");
177
178
0
    (void) nxt_atomic_fetch_add(&tp->waiting, 1);
179
180
0
    for ( ;; ) {
181
0
        err = nxt_sem_wait(&tp->sem, tp->timeout);
182
183
0
        if (err == 0) {
184
0
            waiting = nxt_atomic_fetch_add(&tp->waiting, -1);
185
0
            break;
186
0
        }
187
188
0
        if (err == NXT_ETIMEDOUT) {
189
0
            if (nxt_thread_handle_equal(thr->handle, tp->main)) {
190
0
                continue;
191
0
            }
192
0
        }
193
194
0
        (void) nxt_atomic_fetch_add(&tp->waiting, -1);
195
0
        (void) nxt_atomic_fetch_add(&tp->threads, -1);
196
197
0
        nxt_thread_exit(thr);
198
0
        nxt_unreachable();
199
0
    }
200
201
0
    nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting);
202
203
0
    if (waiting > 1) {
204
0
        return;
205
0
    }
206
207
0
    do {
208
0
        threads = tp->threads;
209
210
0
        if (threads >= tp->max_threads) {
211
0
            return;
212
0
        }
213
214
0
    } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1));
215
216
0
    link = nxt_zalloc(sizeof(nxt_thread_link_t));
217
218
0
    if (nxt_fast_path(link != NULL)) {
219
0
        link->start = nxt_thread_pool_loop;
220
0
        link->work.data = tp;
221
222
0
        if (nxt_thread_create(&handle, link) != NXT_OK) {
223
0
            (void) nxt_atomic_fetch_add(&tp->threads, -1);
224
0
        }
225
0
    }
226
0
}
227
228
229
void
230
nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
231
0
{
232
0
    nxt_thread_t  *thr;
233
234
0
    thr = nxt_thread();
235
236
0
    nxt_log_debug(thr->log, "thread pool destroy: %A", tp->ready);
237
238
0
    if (!tp->ready) {
239
0
        nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit,
240
0
                           &tp->engine->task, tp, NULL);
241
0
        return;
242
0
    }
243
244
0
    if (tp->max_threads != 0) {
245
        /* Disable new threads creation and mark a pool as being destroyed. */
246
0
        tp->max_threads = 0;
247
248
0
        nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL);
249
250
0
        nxt_thread_pool_post(tp, &tp->work);
251
0
    }
252
0
}
253
254
255
/*
256
 * A thread handle (pthread_t) is either pointer or integer, so it can be
257
 * passed as work handler pointer "data" argument.  To convert void pointer
258
 * to pthread_t and vice versa the source argument should be cast first to
259
 * uintptr_t type and then to the destination type.
260
 *
261
 * If the handle would be a struct it should be stored in thread pool and
262
 * the thread pool must be freed in the thread pool exit procedure after
263
 * the last thread of pool will exit.
264
 */
265
266
static void
267
nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
268
0
{
269
0
    nxt_thread_t         *thread;
270
0
    nxt_thread_pool_t    *tp;
271
0
    nxt_atomic_uint_t    threads;
272
0
    nxt_thread_handle_t  handle;
273
274
0
    tp = obj;
275
0
    thread = task->thread;
276
277
0
    nxt_debug(task, "thread pool exit");
278
279
0
    if (data != NULL) {
280
0
        handle = (nxt_thread_handle_t) (uintptr_t) data;
281
0
        nxt_thread_wait(handle);
282
0
    }
283
284
0
    threads = nxt_atomic_fetch_add(&tp->threads, -1);
285
286
0
    nxt_debug(task, "thread pool threads: %A", threads);
287
288
0
    if (threads > 1) {
289
0
        nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp,
290
0
                     (void *) (uintptr_t) thread->handle);
291
292
0
        nxt_thread_pool_post(tp, &tp->work);
293
294
0
    } else {
295
0
        nxt_debug(task, "thread pool destroy");
296
297
0
        nxt_sem_destroy(&tp->sem);
298
299
0
        nxt_work_set(&tp->work, tp->exit, &tp->engine->task, tp,
300
0
                     (void *) (uintptr_t) thread->handle);
301
302
0
        nxt_event_engine_post(tp->engine, &tp->work);
303
304
        /* The "tp" memory should be freed by tp->exit handler. */
305
0
    }
306
307
0
    nxt_thread_exit(thread);
308
309
0
    nxt_unreachable();
310
0
}