/src/unit/src/nxt_thread_pool.c
Line | Count | Source |
1 | | |
2 | | /* |
3 | | * Copyright (C) Igor Sysoev |
4 | | * Copyright (C) NGINX, Inc. |
5 | | */ |
6 | | |
7 | | #include <nxt_main.h> |
8 | | |
9 | | |
10 | | static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp); |
11 | | static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data); |
12 | | static void nxt_thread_pool_start(void *ctx); |
13 | | static void nxt_thread_pool_loop(void *ctx); |
14 | | static void nxt_thread_pool_wait(nxt_thread_pool_t *tp); |
15 | | |
16 | | |
17 | | nxt_thread_pool_t * |
18 | | nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, |
19 | | nxt_thread_pool_init_t init, nxt_event_engine_t *engine, |
20 | | nxt_work_handler_t exit) |
21 | 0 | { |
22 | 0 | nxt_thread_pool_t *tp; |
23 | |
|
24 | 0 | tp = nxt_zalloc(sizeof(nxt_thread_pool_t)); |
25 | 0 | if (tp == NULL) { |
26 | 0 | return NULL; |
27 | 0 | } |
28 | | |
29 | 0 | tp->max_threads = max_threads; |
30 | 0 | tp->timeout = timeout; |
31 | 0 | tp->engine = engine; |
32 | 0 | tp->task.thread = engine->task.thread; |
33 | 0 | tp->task.log = engine->task.log; |
34 | 0 | tp->init = init; |
35 | 0 | tp->exit = exit; |
36 | |
|
37 | 0 | return tp; |
38 | 0 | } |
39 | | |
40 | | |
41 | | nxt_int_t |
42 | | nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work) |
43 | 0 | { |
44 | 0 | nxt_thread_log_debug("thread pool post"); |
45 | |
|
46 | 0 | if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) { |
47 | 0 | return NXT_ERROR; |
48 | 0 | } |
49 | | |
50 | 0 | nxt_locked_work_queue_add(&tp->work_queue, work); |
51 | |
|
52 | 0 | (void) nxt_sem_post(&tp->sem); |
53 | |
|
54 | 0 | return NXT_OK; |
55 | 0 | } |
56 | | |
57 | | |
58 | | static nxt_int_t |
59 | | nxt_thread_pool_init(nxt_thread_pool_t *tp) |
60 | 0 | { |
61 | 0 | nxt_int_t ret; |
62 | 0 | nxt_thread_link_t *link; |
63 | 0 | nxt_thread_handle_t handle; |
64 | |
|
65 | 0 | if (nxt_fast_path(tp->ready)) { |
66 | 0 | return NXT_OK; |
67 | 0 | } |
68 | | |
69 | 0 | if (tp->max_threads == 0) { |
70 | | /* The pool is being destroyed. */ |
71 | 0 | return NXT_ERROR; |
72 | 0 | } |
73 | | |
74 | 0 | nxt_thread_spin_lock(&tp->work_queue.lock); |
75 | |
|
76 | 0 | ret = NXT_OK; |
77 | |
|
78 | 0 | if (!tp->ready) { |
79 | |
|
80 | 0 | nxt_thread_log_debug("thread pool init"); |
81 | |
|
82 | 0 | (void) nxt_atomic_fetch_add(&tp->threads, 1); |
83 | |
|
84 | 0 | if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) { |
85 | |
|
86 | 0 | link = nxt_zalloc(sizeof(nxt_thread_link_t)); |
87 | |
|
88 | 0 | if (nxt_fast_path(link != NULL)) { |
89 | 0 | link->start = nxt_thread_pool_start; |
90 | 0 | link->work.data = tp; |
91 | |
|
92 | 0 | if (nxt_thread_create(&handle, link) == NXT_OK) { |
93 | 0 | tp->ready = 1; |
94 | 0 | goto done; |
95 | 0 | } |
96 | 0 | } |
97 | | |
98 | 0 | nxt_sem_destroy(&tp->sem); |
99 | 0 | } |
100 | | |
101 | 0 | (void) nxt_atomic_fetch_add(&tp->threads, -1); |
102 | |
|
103 | 0 | ret = NXT_ERROR; |
104 | 0 | } |
105 | | |
106 | 0 | done: |
107 | |
|
108 | 0 | nxt_thread_spin_unlock(&tp->work_queue.lock); |
109 | |
|
110 | 0 | return ret; |
111 | 0 | } |
112 | | |
113 | | |
114 | | static void |
115 | | nxt_thread_pool_start(void *ctx) |
116 | 0 | { |
117 | 0 | nxt_thread_t *thr; |
118 | 0 | nxt_thread_pool_t *tp; |
119 | |
|
120 | 0 | tp = ctx; |
121 | 0 | thr = nxt_thread(); |
122 | |
|
123 | 0 | tp->main = thr->handle; |
124 | 0 | tp->task.thread = thr; |
125 | |
|
126 | 0 | nxt_thread_pool_loop(ctx); |
127 | 0 | } |
128 | | |
129 | | |
130 | | static void |
131 | | nxt_thread_pool_loop(void *ctx) |
132 | 0 | { |
133 | 0 | void *obj, *data; |
134 | 0 | nxt_task_t *task; |
135 | 0 | nxt_thread_t *thr; |
136 | 0 | nxt_thread_pool_t *tp; |
137 | 0 | nxt_work_handler_t handler; |
138 | |
|
139 | 0 | tp = ctx; |
140 | 0 | thr = nxt_thread(); |
141 | |
|
142 | 0 | if (tp->init != NULL) { |
143 | 0 | tp->init(); |
144 | 0 | } |
145 | |
|
146 | 0 | for ( ;; ) { |
147 | 0 | nxt_thread_pool_wait(tp); |
148 | |
|
149 | 0 | handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj, |
150 | 0 | &data); |
151 | |
|
152 | 0 | if (nxt_fast_path(handler != NULL)) { |
153 | 0 | task->thread = thr; |
154 | |
|
155 | 0 | nxt_log_debug(thr->log, "locked work queue"); |
156 | |
|
157 | 0 | handler(task, obj, data); |
158 | 0 | } |
159 | |
|
160 | 0 | thr->log = &nxt_main_log; |
161 | 0 | } |
162 | 0 | } |
163 | | |
164 | | |
165 | | static void |
166 | | nxt_thread_pool_wait(nxt_thread_pool_t *tp) |
167 | 0 | { |
168 | 0 | nxt_err_t err; |
169 | 0 | nxt_thread_t *thr; |
170 | 0 | nxt_atomic_uint_t waiting, threads; |
171 | 0 | nxt_thread_link_t *link; |
172 | 0 | nxt_thread_handle_t handle; |
173 | |
|
174 | 0 | thr = nxt_thread(); |
175 | |
|
176 | 0 | nxt_log_debug(thr->log, "thread pool wait"); |
177 | |
|
178 | 0 | (void) nxt_atomic_fetch_add(&tp->waiting, 1); |
179 | |
|
180 | 0 | for ( ;; ) { |
181 | 0 | err = nxt_sem_wait(&tp->sem, tp->timeout); |
182 | |
|
183 | 0 | if (err == 0) { |
184 | 0 | waiting = nxt_atomic_fetch_add(&tp->waiting, -1); |
185 | 0 | break; |
186 | 0 | } |
187 | | |
188 | 0 | if (err == NXT_ETIMEDOUT) { |
189 | 0 | if (nxt_thread_handle_equal(thr->handle, tp->main)) { |
190 | 0 | continue; |
191 | 0 | } |
192 | 0 | } |
193 | | |
194 | 0 | (void) nxt_atomic_fetch_add(&tp->waiting, -1); |
195 | 0 | (void) nxt_atomic_fetch_add(&tp->threads, -1); |
196 | |
|
197 | 0 | nxt_thread_exit(thr); |
198 | 0 | nxt_unreachable(); |
199 | 0 | } |
200 | | |
201 | 0 | nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting); |
202 | |
|
203 | 0 | if (waiting > 1) { |
204 | 0 | return; |
205 | 0 | } |
206 | | |
207 | 0 | do { |
208 | 0 | threads = tp->threads; |
209 | |
|
210 | 0 | if (threads >= tp->max_threads) { |
211 | 0 | return; |
212 | 0 | } |
213 | |
|
214 | 0 | } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1)); |
215 | | |
216 | 0 | link = nxt_zalloc(sizeof(nxt_thread_link_t)); |
217 | |
|
218 | 0 | if (nxt_fast_path(link != NULL)) { |
219 | 0 | link->start = nxt_thread_pool_loop; |
220 | 0 | link->work.data = tp; |
221 | |
|
222 | 0 | if (nxt_thread_create(&handle, link) != NXT_OK) { |
223 | 0 | (void) nxt_atomic_fetch_add(&tp->threads, -1); |
224 | 0 | } |
225 | 0 | } |
226 | 0 | } |
227 | | |
228 | | |
229 | | void |
230 | | nxt_thread_pool_destroy(nxt_thread_pool_t *tp) |
231 | 0 | { |
232 | 0 | nxt_thread_t *thr; |
233 | |
|
234 | 0 | thr = nxt_thread(); |
235 | |
|
236 | 0 | nxt_log_debug(thr->log, "thread pool destroy: %A", tp->ready); |
237 | |
|
238 | 0 | if (!tp->ready) { |
239 | 0 | nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit, |
240 | 0 | &tp->engine->task, tp, NULL); |
241 | 0 | return; |
242 | 0 | } |
243 | | |
244 | 0 | if (tp->max_threads != 0) { |
245 | | /* Disable new threads creation and mark a pool as being destroyed. */ |
246 | 0 | tp->max_threads = 0; |
247 | |
|
248 | 0 | nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL); |
249 | |
|
250 | 0 | nxt_thread_pool_post(tp, &tp->work); |
251 | 0 | } |
252 | 0 | } |
253 | | |
254 | | |
255 | | /* |
256 | | * A thread handle (pthread_t) is either pointer or integer, so it can be |
257 | | * passed as work handler pointer "data" argument. To convert void pointer |
258 | | * to pthread_t and vice versa the source argument should be cast first to |
259 | | * uintptr_t type and then to the destination type. |
260 | | * |
261 | | * If the handle would be a struct it should be stored in thread pool and |
262 | | * the thread pool must be freed in the thread pool exit procedure after |
263 | | * the last thread of pool will exit. |
264 | | */ |
265 | | |
266 | | static void |
267 | | nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data) |
268 | 0 | { |
269 | 0 | nxt_thread_t *thread; |
270 | 0 | nxt_thread_pool_t *tp; |
271 | 0 | nxt_atomic_uint_t threads; |
272 | 0 | nxt_thread_handle_t handle; |
273 | |
|
274 | 0 | tp = obj; |
275 | 0 | thread = task->thread; |
276 | |
|
277 | 0 | nxt_debug(task, "thread pool exit"); |
278 | |
|
279 | 0 | if (data != NULL) { |
280 | 0 | handle = (nxt_thread_handle_t) (uintptr_t) data; |
281 | 0 | nxt_thread_wait(handle); |
282 | 0 | } |
283 | |
|
284 | 0 | threads = nxt_atomic_fetch_add(&tp->threads, -1); |
285 | |
|
286 | 0 | nxt_debug(task, "thread pool threads: %A", threads); |
287 | |
|
288 | 0 | if (threads > 1) { |
289 | 0 | nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, |
290 | 0 | (void *) (uintptr_t) thread->handle); |
291 | |
|
292 | 0 | nxt_thread_pool_post(tp, &tp->work); |
293 | |
|
294 | 0 | } else { |
295 | 0 | nxt_debug(task, "thread pool destroy"); |
296 | |
|
297 | 0 | nxt_sem_destroy(&tp->sem); |
298 | |
|
299 | 0 | nxt_work_set(&tp->work, tp->exit, &tp->engine->task, tp, |
300 | 0 | (void *) (uintptr_t) thread->handle); |
301 | |
|
302 | 0 | nxt_event_engine_post(tp->engine, &tp->work); |
303 | | |
304 | | /* The "tp" memory should be freed by tp->exit handler. */ |
305 | 0 | } |
306 | |
|
307 | 0 | nxt_thread_exit(thread); |
308 | |
|
309 | 0 | nxt_unreachable(); |
310 | 0 | } |