/src/httpd/server/mpm_fdqueue.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* Licensed to the Apache Software Foundation (ASF) under one or more |
2 | | * contributor license agreements. See the NOTICE file distributed with |
3 | | * this work for additional information regarding copyright ownership. |
4 | | * The ASF licenses this file to You under the Apache License, Version 2.0 |
5 | | * (the "License"); you may not use this file except in compliance with |
6 | | * the License. You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | |
17 | | #include "mpm_fdqueue.h" |
18 | | |
19 | | #if APR_HAS_THREADS |
20 | | |
21 | | #include <apr_atomic.h> |
22 | | |
23 | | static const apr_uint32_t zero_pt = APR_UINT32_MAX/2; |
24 | | |
25 | | struct recycled_pool |
26 | | { |
27 | | apr_pool_t *pool; |
28 | | struct recycled_pool *next; |
29 | | }; |
30 | | |
31 | | struct fd_queue_info_t |
32 | | { |
33 | | apr_uint32_t volatile idlers; /** |
34 | | * >= zero_pt: number of idle worker threads |
35 | | * < zero_pt: number of threads blocked, |
36 | | * waiting for an idle worker |
37 | | */ |
38 | | apr_thread_mutex_t *idlers_mutex; |
39 | | apr_thread_cond_t *wait_for_idler; |
40 | | int terminated; |
41 | | int max_idlers; |
42 | | int max_recycled_pools; |
43 | | apr_uint32_t recycled_pools_count; |
44 | | struct recycled_pool *volatile recycled_pools; |
45 | | }; |
46 | | |
47 | | struct fd_queue_elem_t |
48 | | { |
49 | | apr_socket_t *sd; |
50 | | void *sd_baton; |
51 | | apr_pool_t *p; |
52 | | }; |
53 | | |
54 | | static apr_status_t queue_info_cleanup(void *data_) |
55 | 0 | { |
56 | 0 | fd_queue_info_t *qi = data_; |
57 | 0 | apr_thread_cond_destroy(qi->wait_for_idler); |
58 | 0 | apr_thread_mutex_destroy(qi->idlers_mutex); |
59 | | |
60 | | /* Clean up any pools in the recycled list */ |
61 | 0 | for (;;) { |
62 | 0 | struct recycled_pool *first_pool = qi->recycled_pools; |
63 | 0 | if (first_pool == NULL) { |
64 | 0 | break; |
65 | 0 | } |
66 | 0 | if (apr_atomic_casptr((void *)&qi->recycled_pools, first_pool->next, |
67 | 0 | first_pool) == first_pool) { |
68 | 0 | apr_pool_destroy(first_pool->pool); |
69 | 0 | } |
70 | 0 | } |
71 | |
|
72 | 0 | return APR_SUCCESS; |
73 | 0 | } |
74 | | |
75 | | apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info, |
76 | | apr_pool_t *pool, int max_idlers, |
77 | | int max_recycled_pools) |
78 | 0 | { |
79 | 0 | apr_status_t rv; |
80 | 0 | fd_queue_info_t *qi; |
81 | |
|
82 | 0 | qi = apr_pcalloc(pool, sizeof(*qi)); |
83 | |
|
84 | 0 | rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT, |
85 | 0 | pool); |
86 | 0 | if (rv != APR_SUCCESS) { |
87 | 0 | return rv; |
88 | 0 | } |
89 | 0 | rv = apr_thread_cond_create(&qi->wait_for_idler, pool); |
90 | 0 | if (rv != APR_SUCCESS) { |
91 | 0 | return rv; |
92 | 0 | } |
93 | 0 | qi->recycled_pools = NULL; |
94 | 0 | qi->max_recycled_pools = max_recycled_pools; |
95 | 0 | qi->max_idlers = max_idlers; |
96 | 0 | qi->idlers = zero_pt; |
97 | 0 | apr_pool_cleanup_register(pool, qi, queue_info_cleanup, |
98 | 0 | apr_pool_cleanup_null); |
99 | |
|
100 | 0 | *queue_info = qi; |
101 | |
|
102 | 0 | return APR_SUCCESS; |
103 | 0 | } |
104 | | |
105 | | apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info, |
106 | | apr_pool_t *pool_to_recycle) |
107 | 0 | { |
108 | 0 | apr_status_t rv; |
109 | |
|
110 | 0 | ap_queue_info_push_pool(queue_info, pool_to_recycle); |
111 | | |
112 | | /* If other threads are waiting on a worker, wake one up */ |
113 | 0 | if (apr_atomic_inc32(&queue_info->idlers) < zero_pt) { |
114 | 0 | rv = apr_thread_mutex_lock(queue_info->idlers_mutex); |
115 | 0 | if (rv != APR_SUCCESS) { |
116 | 0 | AP_DEBUG_ASSERT(0); |
117 | 0 | return rv; |
118 | 0 | } |
119 | 0 | rv = apr_thread_cond_signal(queue_info->wait_for_idler); |
120 | 0 | if (rv != APR_SUCCESS) { |
121 | 0 | apr_thread_mutex_unlock(queue_info->idlers_mutex); |
122 | 0 | return rv; |
123 | 0 | } |
124 | 0 | rv = apr_thread_mutex_unlock(queue_info->idlers_mutex); |
125 | 0 | if (rv != APR_SUCCESS) { |
126 | 0 | return rv; |
127 | 0 | } |
128 | 0 | } |
129 | | |
130 | 0 | return APR_SUCCESS; |
131 | 0 | } |
132 | | |
133 | | apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t *queue_info) |
134 | 0 | { |
135 | | /* Don't block if there isn't any idle worker. */ |
136 | 0 | for (;;) { |
137 | 0 | apr_uint32_t idlers = queue_info->idlers; |
138 | 0 | if (idlers <= zero_pt) { |
139 | 0 | return APR_EAGAIN; |
140 | 0 | } |
141 | 0 | if (apr_atomic_cas32(&queue_info->idlers, idlers - 1, |
142 | 0 | idlers) == idlers) { |
143 | 0 | return APR_SUCCESS; |
144 | 0 | } |
145 | 0 | } |
146 | 0 | } |
147 | | |
148 | | apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info, |
149 | | int *had_to_block) |
150 | 0 | { |
151 | 0 | apr_status_t rv; |
152 | | |
153 | | /* Block if there isn't any idle worker. |
154 | | * apr_atomic_add32(x, -1) does the same as dec32(x), except |
155 | | * that it returns the previous value (unlike dec32's bool). |
156 | | */ |
157 | 0 | if (apr_atomic_add32(&queue_info->idlers, -1) <= zero_pt) { |
158 | 0 | rv = apr_thread_mutex_lock(queue_info->idlers_mutex); |
159 | 0 | if (rv != APR_SUCCESS) { |
160 | 0 | AP_DEBUG_ASSERT(0); |
161 | 0 | apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */ |
162 | 0 | return rv; |
163 | 0 | } |
164 | | /* Re-check the idle worker count to guard against a |
165 | | * race condition. Now that we're in the mutex-protected |
166 | | * region, one of two things may have happened: |
167 | | * - If the idle worker count is still negative, the |
168 | | * workers are all still busy, so it's safe to |
169 | | * block on a condition variable. |
170 | | * - If the idle worker count is non-negative, then a |
171 | | * worker has become idle since the first check |
172 | | * of queue_info->idlers above. It's possible |
173 | | * that the worker has also signaled the condition |
174 | | * variable--and if so, the listener missed it |
175 | | * because it wasn't yet blocked on the condition |
176 | | * variable. But if the idle worker count is |
177 | | * now non-negative, it's safe for this function to |
178 | | * return immediately. |
179 | | * |
180 | | * A "negative value" (relative to zero_pt) in |
181 | | * queue_info->idlers tells how many |
182 | | * threads are waiting on an idle worker. |
183 | | */ |
184 | 0 | if (queue_info->idlers < zero_pt) { |
185 | 0 | if (had_to_block) { |
186 | 0 | *had_to_block = 1; |
187 | 0 | } |
188 | 0 | rv = apr_thread_cond_wait(queue_info->wait_for_idler, |
189 | 0 | queue_info->idlers_mutex); |
190 | 0 | if (rv != APR_SUCCESS) { |
191 | 0 | AP_DEBUG_ASSERT(0); |
192 | 0 | apr_thread_mutex_unlock(queue_info->idlers_mutex); |
193 | 0 | return rv; |
194 | 0 | } |
195 | 0 | } |
196 | 0 | rv = apr_thread_mutex_unlock(queue_info->idlers_mutex); |
197 | 0 | if (rv != APR_SUCCESS) { |
198 | 0 | return rv; |
199 | 0 | } |
200 | 0 | } |
201 | | |
202 | 0 | if (queue_info->terminated) { |
203 | 0 | return APR_EOF; |
204 | 0 | } |
205 | 0 | else { |
206 | 0 | return APR_SUCCESS; |
207 | 0 | } |
208 | 0 | } |
209 | | |
210 | | apr_uint32_t ap_queue_info_num_idlers(fd_queue_info_t *queue_info) |
211 | 0 | { |
212 | 0 | apr_uint32_t val; |
213 | 0 | val = apr_atomic_read32(&queue_info->idlers); |
214 | 0 | return (val > zero_pt) ? val - zero_pt : 0; |
215 | 0 | } |
216 | | |
217 | | void ap_queue_info_push_pool(fd_queue_info_t *queue_info, |
218 | | apr_pool_t *pool_to_recycle) |
219 | 0 | { |
220 | 0 | struct recycled_pool *new_recycle; |
221 | | /* If we have been given a pool to recycle, atomically link |
222 | | * it into the queue_info's list of recycled pools |
223 | | */ |
224 | 0 | if (!pool_to_recycle) |
225 | 0 | return; |
226 | | |
227 | 0 | if (queue_info->max_recycled_pools >= 0) { |
228 | 0 | apr_uint32_t n = apr_atomic_read32(&queue_info->recycled_pools_count); |
229 | 0 | if (n >= queue_info->max_recycled_pools) { |
230 | 0 | apr_pool_destroy(pool_to_recycle); |
231 | 0 | return; |
232 | 0 | } |
233 | 0 | apr_atomic_inc32(&queue_info->recycled_pools_count); |
234 | 0 | } |
235 | | |
236 | 0 | apr_pool_clear(pool_to_recycle); |
237 | 0 | new_recycle = apr_palloc(pool_to_recycle, sizeof *new_recycle); |
238 | 0 | new_recycle->pool = pool_to_recycle; |
239 | 0 | for (;;) { |
240 | | /* |
241 | | * Save queue_info->recycled_pool in local variable next because |
242 | | * new_recycle->next can be changed after apr_atomic_casptr |
243 | | * function call. For gory details see PR 44402. |
244 | | */ |
245 | 0 | struct recycled_pool *next = queue_info->recycled_pools; |
246 | 0 | new_recycle->next = next; |
247 | 0 | if (apr_atomic_casptr((void *)&queue_info->recycled_pools, |
248 | 0 | new_recycle, next) == next) |
249 | 0 | break; |
250 | 0 | } |
251 | 0 | } |
252 | | |
253 | | void ap_queue_info_pop_pool(fd_queue_info_t *queue_info, |
254 | | apr_pool_t **recycled_pool) |
255 | 0 | { |
256 | | /* Atomically pop a pool from the recycled list */ |
257 | | |
258 | | /* This function is safe only as long as it is single threaded because |
259 | | * it reaches into the queue and accesses "next" which can change. |
260 | | * We are OK today because it is only called from the listener thread. |
261 | | * cas-based pushes do not have the same limitation - any number can |
262 | | * happen concurrently with a single cas-based pop. |
263 | | */ |
264 | |
|
265 | 0 | *recycled_pool = NULL; |
266 | | |
267 | | |
268 | | /* Atomically pop a pool from the recycled list */ |
269 | 0 | for (;;) { |
270 | 0 | struct recycled_pool *first_pool = queue_info->recycled_pools; |
271 | 0 | if (first_pool == NULL) { |
272 | 0 | break; |
273 | 0 | } |
274 | 0 | if (apr_atomic_casptr((void *)&queue_info->recycled_pools, |
275 | 0 | first_pool->next, first_pool) == first_pool) { |
276 | 0 | *recycled_pool = first_pool->pool; |
277 | 0 | if (queue_info->max_recycled_pools >= 0) |
278 | 0 | apr_atomic_dec32(&queue_info->recycled_pools_count); |
279 | 0 | break; |
280 | 0 | } |
281 | 0 | } |
282 | 0 | } |
283 | | |
284 | | void ap_queue_info_free_idle_pools(fd_queue_info_t *queue_info) |
285 | 0 | { |
286 | 0 | apr_pool_t *p; |
287 | |
|
288 | 0 | queue_info->max_recycled_pools = 0; |
289 | 0 | for (;;) { |
290 | 0 | ap_queue_info_pop_pool(queue_info, &p); |
291 | 0 | if (p == NULL) |
292 | 0 | break; |
293 | 0 | apr_pool_destroy(p); |
294 | 0 | } |
295 | 0 | apr_atomic_set32(&queue_info->recycled_pools_count, 0); |
296 | 0 | } |
297 | | |
298 | | |
299 | | apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info) |
300 | 0 | { |
301 | 0 | apr_status_t rv; |
302 | |
|
303 | 0 | rv = apr_thread_mutex_lock(queue_info->idlers_mutex); |
304 | 0 | if (rv != APR_SUCCESS) { |
305 | 0 | return rv; |
306 | 0 | } |
307 | | |
308 | 0 | queue_info->terminated = 1; |
309 | 0 | apr_thread_cond_broadcast(queue_info->wait_for_idler); |
310 | |
|
311 | 0 | return apr_thread_mutex_unlock(queue_info->idlers_mutex); |
312 | 0 | } |
313 | | |
314 | | /** |
315 | | * Detects when the fd_queue_t is full. This utility function is expected |
316 | | * to be called from within critical sections, and is not threadsafe. |
317 | | */ |
318 | | #define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds) |
319 | | |
320 | | /** |
321 | | * Detects when the fd_queue_t is empty. This utility function is expected |
322 | | * to be called from within critical sections, and is not threadsafe. |
323 | | */ |
324 | 0 | #define ap_queue_empty(queue) ((queue)->nelts == 0 && \ |
325 | 0 | APR_RING_EMPTY(&queue->timers, \ |
326 | 0 | timer_event_t, link)) |
327 | | |
328 | | /** |
329 | | * Callback routine that is called to destroy this |
330 | | * fd_queue_t when its pool is destroyed. |
331 | | */ |
332 | | static apr_status_t ap_queue_destroy(void *data) |
333 | 0 | { |
334 | 0 | fd_queue_t *queue = data; |
335 | | |
336 | | /* Ignore errors here, we can't do anything about them anyway. |
337 | | * XXX: We should at least try to signal an error here, it is |
338 | | * indicative of a programmer error. -aaron */ |
339 | 0 | apr_thread_cond_destroy(queue->not_empty); |
340 | 0 | apr_thread_mutex_destroy(queue->one_big_mutex); |
341 | |
|
342 | 0 | return APR_SUCCESS; |
343 | 0 | } |
344 | | |
345 | | /** |
346 | | * Initialize the fd_queue_t. |
347 | | */ |
348 | | apr_status_t ap_queue_create(fd_queue_t **pqueue, int capacity, apr_pool_t *p) |
349 | 0 | { |
350 | 0 | apr_status_t rv; |
351 | 0 | fd_queue_t *queue; |
352 | |
|
353 | 0 | queue = apr_pcalloc(p, sizeof *queue); |
354 | |
|
355 | 0 | if ((rv = apr_thread_mutex_create(&queue->one_big_mutex, |
356 | 0 | APR_THREAD_MUTEX_DEFAULT, |
357 | 0 | p)) != APR_SUCCESS) { |
358 | 0 | return rv; |
359 | 0 | } |
360 | 0 | if ((rv = apr_thread_cond_create(&queue->not_empty, p)) != APR_SUCCESS) { |
361 | 0 | return rv; |
362 | 0 | } |
363 | | |
364 | 0 | APR_RING_INIT(&queue->timers, timer_event_t, link); |
365 | |
|
366 | 0 | queue->data = apr_pcalloc(p, capacity * sizeof(fd_queue_elem_t)); |
367 | 0 | queue->bounds = capacity; |
368 | |
|
369 | 0 | apr_pool_cleanup_register(p, queue, ap_queue_destroy, |
370 | 0 | apr_pool_cleanup_null); |
371 | 0 | *pqueue = queue; |
372 | |
|
373 | 0 | return APR_SUCCESS; |
374 | 0 | } |
375 | | |
376 | | /** |
377 | | * Push a new socket onto the queue. |
378 | | * |
379 | | * precondition: ap_queue_info_wait_for_idler has already been called |
380 | | * to reserve an idle worker thread |
381 | | */ |
382 | | apr_status_t ap_queue_push_socket(fd_queue_t *queue, |
383 | | apr_socket_t *sd, void *sd_baton, |
384 | | apr_pool_t *p) |
385 | 0 | { |
386 | 0 | fd_queue_elem_t *elem; |
387 | 0 | apr_status_t rv; |
388 | |
|
389 | 0 | if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { |
390 | 0 | return rv; |
391 | 0 | } |
392 | | |
393 | 0 | AP_DEBUG_ASSERT(!queue->terminated); |
394 | 0 | AP_DEBUG_ASSERT(!ap_queue_full(queue)); |
395 | |
|
396 | 0 | elem = &queue->data[queue->in++]; |
397 | 0 | if (queue->in >= queue->bounds) |
398 | 0 | queue->in -= queue->bounds; |
399 | 0 | elem->sd = sd; |
400 | 0 | elem->sd_baton = sd_baton; |
401 | 0 | elem->p = p; |
402 | 0 | queue->nelts++; |
403 | |
|
404 | 0 | apr_thread_cond_signal(queue->not_empty); |
405 | |
|
406 | 0 | return apr_thread_mutex_unlock(queue->one_big_mutex); |
407 | 0 | } |
408 | | |
409 | | apr_status_t ap_queue_push_timer(fd_queue_t *queue, timer_event_t *te) |
410 | 0 | { |
411 | 0 | apr_status_t rv; |
412 | |
|
413 | 0 | if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { |
414 | 0 | return rv; |
415 | 0 | } |
416 | | |
417 | 0 | AP_DEBUG_ASSERT(!queue->terminated); |
418 | |
|
419 | 0 | APR_RING_INSERT_TAIL(&queue->timers, te, timer_event_t, link); |
420 | |
|
421 | 0 | apr_thread_cond_signal(queue->not_empty); |
422 | |
|
423 | 0 | return apr_thread_mutex_unlock(queue->one_big_mutex); |
424 | 0 | } |
425 | | |
426 | | /** |
427 | | * Retrieves the next available socket from the queue. If there are no |
428 | | * sockets available, it will block until one becomes available. |
429 | | * Once retrieved, the socket is placed into the address specified by |
430 | | * 'sd'. |
431 | | */ |
432 | | apr_status_t ap_queue_pop_something(fd_queue_t *queue, |
433 | | apr_socket_t **sd, void **sd_baton, |
434 | | apr_pool_t **p, timer_event_t **te_out) |
435 | 0 | { |
436 | 0 | fd_queue_elem_t *elem; |
437 | 0 | timer_event_t *te; |
438 | 0 | apr_status_t rv; |
439 | |
|
440 | 0 | if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { |
441 | 0 | return rv; |
442 | 0 | } |
443 | | |
444 | | /* Keep waiting until we wake up and find that the queue is not empty. */ |
445 | 0 | if (ap_queue_empty(queue)) { |
446 | 0 | if (!queue->terminated) { |
447 | 0 | apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); |
448 | 0 | } |
449 | | /* If we wake up and it's still empty, then we were interrupted */ |
450 | 0 | if (ap_queue_empty(queue)) { |
451 | 0 | rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
452 | 0 | if (rv != APR_SUCCESS) { |
453 | 0 | return rv; |
454 | 0 | } |
455 | 0 | if (queue->terminated) { |
456 | 0 | return APR_EOF; /* no more elements ever again */ |
457 | 0 | } |
458 | 0 | else { |
459 | 0 | return APR_EINTR; |
460 | 0 | } |
461 | 0 | } |
462 | 0 | } |
463 | | |
464 | 0 | te = NULL; |
465 | 0 | if (te_out) { |
466 | 0 | if (!APR_RING_EMPTY(&queue->timers, timer_event_t, link)) { |
467 | 0 | te = APR_RING_FIRST(&queue->timers); |
468 | 0 | APR_RING_REMOVE(te, link); |
469 | 0 | } |
470 | 0 | *te_out = te; |
471 | 0 | } |
472 | 0 | if (!te) { |
473 | 0 | elem = &queue->data[queue->out++]; |
474 | 0 | if (queue->out >= queue->bounds) |
475 | 0 | queue->out -= queue->bounds; |
476 | 0 | queue->nelts--; |
477 | |
|
478 | 0 | *sd = elem->sd; |
479 | 0 | if (sd_baton) { |
480 | 0 | *sd_baton = elem->sd_baton; |
481 | 0 | } |
482 | 0 | *p = elem->p; |
483 | | #ifdef AP_DEBUG |
484 | | elem->sd = NULL; |
485 | | elem->p = NULL; |
486 | | #endif /* AP_DEBUG */ |
487 | 0 | } |
488 | |
|
489 | 0 | return apr_thread_mutex_unlock(queue->one_big_mutex); |
490 | 0 | } |
491 | | |
492 | | static apr_status_t queue_interrupt(fd_queue_t *queue, int all, int term) |
493 | 0 | { |
494 | 0 | apr_status_t rv; |
495 | |
|
496 | 0 | if (queue->terminated) { |
497 | 0 | return APR_EOF; |
498 | 0 | } |
499 | | |
500 | 0 | if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { |
501 | 0 | return rv; |
502 | 0 | } |
503 | | |
504 | | /* we must hold one_big_mutex when setting this... otherwise, |
505 | | * we could end up setting it and waking everybody up just after a |
506 | | * would-be popper checks it but right before they block |
507 | | */ |
508 | 0 | if (term) { |
509 | 0 | queue->terminated = 1; |
510 | 0 | } |
511 | 0 | if (all) |
512 | 0 | apr_thread_cond_broadcast(queue->not_empty); |
513 | 0 | else |
514 | 0 | apr_thread_cond_signal(queue->not_empty); |
515 | |
|
516 | 0 | return apr_thread_mutex_unlock(queue->one_big_mutex); |
517 | 0 | } |
518 | | |
519 | | apr_status_t ap_queue_interrupt_all(fd_queue_t *queue) |
520 | 0 | { |
521 | 0 | return queue_interrupt(queue, 1, 0); |
522 | 0 | } |
523 | | |
524 | | apr_status_t ap_queue_interrupt_one(fd_queue_t *queue) |
525 | 0 | { |
526 | 0 | return queue_interrupt(queue, 0, 0); |
527 | 0 | } |
528 | | |
529 | | apr_status_t ap_queue_term(fd_queue_t *queue) |
530 | 0 | { |
531 | 0 | return queue_interrupt(queue, 1, 1); |
532 | 0 | } |
533 | | |
534 | | #endif /* APR_HAS_THREADS */ |