Coverage Report

Created: 2023-03-26 06:28

/src/httpd/server/mpm_fdqueue.c
Line
Count
Source (jump to first uncovered line)
1
/* Licensed to the Apache Software Foundation (ASF) under one or more
2
 * contributor license agreements.  See the NOTICE file distributed with
3
 * this work for additional information regarding copyright ownership.
4
 * The ASF licenses this file to You under the Apache License, Version 2.0
5
 * (the "License"); you may not use this file except in compliance with
6
 * the License.  You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
#include "mpm_fdqueue.h"
18
19
#if APR_HAS_THREADS
20
21
#include <apr_atomic.h>
22
23
static const apr_uint32_t zero_pt = APR_UINT32_MAX/2;
24
25
struct recycled_pool
26
{
27
    apr_pool_t *pool;
28
    struct recycled_pool *next;
29
};
30
31
struct fd_queue_info_t
32
{
33
    apr_uint32_t volatile idlers; /**
34
                                   * >= zero_pt: number of idle worker threads
35
                                   * <  zero_pt: number of threads blocked,
36
                                   *             waiting for an idle worker
37
                                   */
38
    apr_thread_mutex_t *idlers_mutex;
39
    apr_thread_cond_t *wait_for_idler;
40
    int terminated;
41
    int max_idlers;
42
    int max_recycled_pools;
43
    apr_uint32_t recycled_pools_count;
44
    struct recycled_pool *volatile recycled_pools;
45
};
46
47
struct fd_queue_elem_t
48
{
49
    apr_socket_t *sd;
50
    void *sd_baton;
51
    apr_pool_t *p;
52
};
53
54
static apr_status_t queue_info_cleanup(void *data_)
55
0
{
56
0
    fd_queue_info_t *qi = data_;
57
0
    apr_thread_cond_destroy(qi->wait_for_idler);
58
0
    apr_thread_mutex_destroy(qi->idlers_mutex);
59
60
    /* Clean up any pools in the recycled list */
61
0
    for (;;) {
62
0
        struct recycled_pool *first_pool = qi->recycled_pools;
63
0
        if (first_pool == NULL) {
64
0
            break;
65
0
        }
66
0
        if (apr_atomic_casptr((void *)&qi->recycled_pools, first_pool->next,
67
0
                              first_pool) == first_pool) {
68
0
            apr_pool_destroy(first_pool->pool);
69
0
        }
70
0
    }
71
72
0
    return APR_SUCCESS;
73
0
}
74
75
apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info,
76
                                  apr_pool_t *pool, int max_idlers,
77
                                  int max_recycled_pools)
78
0
{
79
0
    apr_status_t rv;
80
0
    fd_queue_info_t *qi;
81
82
0
    qi = apr_pcalloc(pool, sizeof(*qi));
83
84
0
    rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT,
85
0
                                 pool);
86
0
    if (rv != APR_SUCCESS) {
87
0
        return rv;
88
0
    }
89
0
    rv = apr_thread_cond_create(&qi->wait_for_idler, pool);
90
0
    if (rv != APR_SUCCESS) {
91
0
        return rv;
92
0
    }
93
0
    qi->recycled_pools = NULL;
94
0
    qi->max_recycled_pools = max_recycled_pools;
95
0
    qi->max_idlers = max_idlers;
96
0
    qi->idlers = zero_pt;
97
0
    apr_pool_cleanup_register(pool, qi, queue_info_cleanup,
98
0
                              apr_pool_cleanup_null);
99
100
0
    *queue_info = qi;
101
102
0
    return APR_SUCCESS;
103
0
}
104
105
apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info,
106
                                    apr_pool_t *pool_to_recycle)
107
0
{
108
0
    apr_status_t rv;
109
110
0
    ap_queue_info_push_pool(queue_info, pool_to_recycle);
111
112
    /* If other threads are waiting on a worker, wake one up */
113
0
    if (apr_atomic_inc32(&queue_info->idlers) < zero_pt) {
114
0
        rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
115
0
        if (rv != APR_SUCCESS) {
116
0
            AP_DEBUG_ASSERT(0);
117
0
            return rv;
118
0
        }
119
0
        rv = apr_thread_cond_signal(queue_info->wait_for_idler);
120
0
        if (rv != APR_SUCCESS) {
121
0
            apr_thread_mutex_unlock(queue_info->idlers_mutex);
122
0
            return rv;
123
0
        }
124
0
        rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
125
0
        if (rv != APR_SUCCESS) {
126
0
            return rv;
127
0
        }
128
0
    }
129
130
0
    return APR_SUCCESS;
131
0
}
132
133
apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t *queue_info)
134
0
{
135
    /* Don't block if there isn't any idle worker. */
136
0
    for (;;) {
137
0
        apr_uint32_t idlers = queue_info->idlers;
138
0
        if (idlers <= zero_pt) {
139
0
            return APR_EAGAIN;
140
0
        }
141
0
        if (apr_atomic_cas32(&queue_info->idlers, idlers - 1,
142
0
                             idlers) == idlers) {
143
0
            return APR_SUCCESS;
144
0
        }
145
0
    }
146
0
}
147
148
apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info,
149
                                          int *had_to_block)
150
0
{
151
0
    apr_status_t rv;
152
153
    /* Block if there isn't any idle worker.
154
     * apr_atomic_add32(x, -1) does the same as dec32(x), except
155
     * that it returns the previous value (unlike dec32's bool).
156
     */
157
0
    if (apr_atomic_add32(&queue_info->idlers, -1) <= zero_pt) {
158
0
        rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
159
0
        if (rv != APR_SUCCESS) {
160
0
            AP_DEBUG_ASSERT(0);
161
0
            apr_atomic_inc32(&(queue_info->idlers));    /* back out dec */
162
0
            return rv;
163
0
        }
164
        /* Re-check the idle worker count to guard against a
165
         * race condition.  Now that we're in the mutex-protected
166
         * region, one of two things may have happened:
167
         *   - If the idle worker count is still negative, the
168
         *     workers are all still busy, so it's safe to
169
         *     block on a condition variable.
170
         *   - If the idle worker count is non-negative, then a
171
         *     worker has become idle since the first check
172
         *     of queue_info->idlers above.  It's possible
173
         *     that the worker has also signaled the condition
174
         *     variable--and if so, the listener missed it
175
         *     because it wasn't yet blocked on the condition
176
         *     variable.  But if the idle worker count is
177
         *     now non-negative, it's safe for this function to
178
         *     return immediately.
179
         *
180
         *     A "negative value" (relative to zero_pt) in
181
         *     queue_info->idlers tells how many
182
         *     threads are waiting on an idle worker.
183
         */
184
0
        if (queue_info->idlers < zero_pt) {
185
0
            if (had_to_block) {
186
0
                *had_to_block = 1;
187
0
            }
188
0
            rv = apr_thread_cond_wait(queue_info->wait_for_idler,
189
0
                                      queue_info->idlers_mutex);
190
0
            if (rv != APR_SUCCESS) {
191
0
                AP_DEBUG_ASSERT(0);
192
0
                apr_thread_mutex_unlock(queue_info->idlers_mutex);
193
0
                return rv;
194
0
            }
195
0
        }
196
0
        rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
197
0
        if (rv != APR_SUCCESS) {
198
0
            return rv;
199
0
        }
200
0
    }
201
202
0
    if (queue_info->terminated) {
203
0
        return APR_EOF;
204
0
    }
205
0
    else {
206
0
        return APR_SUCCESS;
207
0
    }
208
0
}
209
210
apr_uint32_t ap_queue_info_num_idlers(fd_queue_info_t *queue_info)
211
0
{
212
0
    apr_uint32_t val;
213
0
    val = apr_atomic_read32(&queue_info->idlers);
214
0
    return (val > zero_pt) ? val - zero_pt : 0;
215
0
}
216
217
void ap_queue_info_push_pool(fd_queue_info_t *queue_info,
218
                             apr_pool_t *pool_to_recycle)
219
0
{
220
0
    struct recycled_pool *new_recycle;
221
    /* If we have been given a pool to recycle, atomically link
222
     * it into the queue_info's list of recycled pools
223
     */
224
0
    if (!pool_to_recycle)
225
0
        return;
226
227
0
    if (queue_info->max_recycled_pools >= 0) {
228
0
        apr_uint32_t n = apr_atomic_read32(&queue_info->recycled_pools_count);
229
0
        if (n >= queue_info->max_recycled_pools) {
230
0
            apr_pool_destroy(pool_to_recycle);
231
0
            return;
232
0
        }
233
0
        apr_atomic_inc32(&queue_info->recycled_pools_count);
234
0
    }
235
236
0
    apr_pool_clear(pool_to_recycle);
237
0
    new_recycle = apr_palloc(pool_to_recycle, sizeof *new_recycle);
238
0
    new_recycle->pool = pool_to_recycle;
239
0
    for (;;) {
240
        /*
241
         * Save queue_info->recycled_pool in local variable next because
242
         * new_recycle->next can be changed after apr_atomic_casptr
243
         * function call. For gory details see PR 44402.
244
         */
245
0
        struct recycled_pool *next = queue_info->recycled_pools;
246
0
        new_recycle->next = next;
247
0
        if (apr_atomic_casptr((void *)&queue_info->recycled_pools,
248
0
                              new_recycle, next) == next)
249
0
            break;
250
0
    }
251
0
}
252
253
void ap_queue_info_pop_pool(fd_queue_info_t *queue_info,
254
                            apr_pool_t **recycled_pool)
255
0
{
256
    /* Atomically pop a pool from the recycled list */
257
258
    /* This function is safe only as long as it is single threaded because
259
     * it reaches into the queue and accesses "next" which can change.
260
     * We are OK today because it is only called from the listener thread.
261
     * cas-based pushes do not have the same limitation - any number can
262
     * happen concurrently with a single cas-based pop.
263
     */
264
265
0
    *recycled_pool = NULL;
266
267
268
    /* Atomically pop a pool from the recycled list */
269
0
    for (;;) {
270
0
        struct recycled_pool *first_pool = queue_info->recycled_pools;
271
0
        if (first_pool == NULL) {
272
0
            break;
273
0
        }
274
0
        if (apr_atomic_casptr((void *)&queue_info->recycled_pools,
275
0
                              first_pool->next, first_pool) == first_pool) {
276
0
            *recycled_pool = first_pool->pool;
277
0
            if (queue_info->max_recycled_pools >= 0)
278
0
                apr_atomic_dec32(&queue_info->recycled_pools_count);
279
0
            break;
280
0
        }
281
0
    }
282
0
}
283
284
void ap_queue_info_free_idle_pools(fd_queue_info_t *queue_info)
285
0
{
286
0
    apr_pool_t *p;
287
288
0
    queue_info->max_recycled_pools = 0;
289
0
    for (;;) {
290
0
        ap_queue_info_pop_pool(queue_info, &p);
291
0
        if (p == NULL)
292
0
            break;
293
0
        apr_pool_destroy(p);
294
0
    }
295
0
    apr_atomic_set32(&queue_info->recycled_pools_count, 0);
296
0
}
297
298
299
apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info)
300
0
{
301
0
    apr_status_t rv;
302
303
0
    rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
304
0
    if (rv != APR_SUCCESS) {
305
0
        return rv;
306
0
    }
307
308
0
    queue_info->terminated = 1;
309
0
    apr_thread_cond_broadcast(queue_info->wait_for_idler);
310
311
0
    return apr_thread_mutex_unlock(queue_info->idlers_mutex);
312
0
}
313
314
/**
315
 * Detects when the fd_queue_t is full. This utility function is expected
316
 * to be called from within critical sections, and is not threadsafe.
317
 */
318
#define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds)
319
320
/**
321
 * Detects when the fd_queue_t is empty. This utility function is expected
322
 * to be called from within critical sections, and is not threadsafe.
323
 */
324
0
#define ap_queue_empty(queue) ((queue)->nelts == 0 && \
325
0
                               APR_RING_EMPTY(&queue->timers, \
326
0
                                              timer_event_t, link))
327
328
/**
329
 * Callback routine that is called to destroy this
330
 * fd_queue_t when its pool is destroyed.
331
 */
332
static apr_status_t ap_queue_destroy(void *data)
333
0
{
334
0
    fd_queue_t *queue = data;
335
336
    /* Ignore errors here, we can't do anything about them anyway.
337
     * XXX: We should at least try to signal an error here, it is
338
     * indicative of a programmer error. -aaron */
339
0
    apr_thread_cond_destroy(queue->not_empty);
340
0
    apr_thread_mutex_destroy(queue->one_big_mutex);
341
342
0
    return APR_SUCCESS;
343
0
}
344
345
/**
346
 * Initialize the fd_queue_t.
347
 */
348
apr_status_t ap_queue_create(fd_queue_t **pqueue, int capacity, apr_pool_t *p)
349
0
{
350
0
    apr_status_t rv;
351
0
    fd_queue_t *queue;
352
353
0
    queue = apr_pcalloc(p, sizeof *queue);
354
355
0
    if ((rv = apr_thread_mutex_create(&queue->one_big_mutex,
356
0
                                      APR_THREAD_MUTEX_DEFAULT,
357
0
                                      p)) != APR_SUCCESS) {
358
0
        return rv;
359
0
    }
360
0
    if ((rv = apr_thread_cond_create(&queue->not_empty, p)) != APR_SUCCESS) {
361
0
        return rv;
362
0
    }
363
364
0
    APR_RING_INIT(&queue->timers, timer_event_t, link);
365
366
0
    queue->data = apr_pcalloc(p, capacity * sizeof(fd_queue_elem_t));
367
0
    queue->bounds = capacity;
368
369
0
    apr_pool_cleanup_register(p, queue, ap_queue_destroy,
370
0
                              apr_pool_cleanup_null);
371
0
    *pqueue = queue;
372
373
0
    return APR_SUCCESS;
374
0
}
375
376
/**
377
 * Push a new socket onto the queue.
378
 *
379
 * precondition: ap_queue_info_wait_for_idler has already been called
380
 *               to reserve an idle worker thread
381
 */
382
apr_status_t ap_queue_push_socket(fd_queue_t *queue,
383
                                  apr_socket_t *sd, void *sd_baton,
384
                                  apr_pool_t *p)
385
0
{
386
0
    fd_queue_elem_t *elem;
387
0
    apr_status_t rv;
388
389
0
    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
390
0
        return rv;
391
0
    }
392
393
0
    AP_DEBUG_ASSERT(!queue->terminated);
394
0
    AP_DEBUG_ASSERT(!ap_queue_full(queue));
395
396
0
    elem = &queue->data[queue->in++];
397
0
    if (queue->in >= queue->bounds)
398
0
        queue->in -= queue->bounds;
399
0
    elem->sd = sd;
400
0
    elem->sd_baton = sd_baton;
401
0
    elem->p = p;
402
0
    queue->nelts++;
403
404
0
    apr_thread_cond_signal(queue->not_empty);
405
406
0
    return apr_thread_mutex_unlock(queue->one_big_mutex);
407
0
}
408
409
apr_status_t ap_queue_push_timer(fd_queue_t *queue, timer_event_t *te)
410
0
{
411
0
    apr_status_t rv;
412
413
0
    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
414
0
        return rv;
415
0
    }
416
417
0
    AP_DEBUG_ASSERT(!queue->terminated);
418
419
0
    APR_RING_INSERT_TAIL(&queue->timers, te, timer_event_t, link);
420
421
0
    apr_thread_cond_signal(queue->not_empty);
422
423
0
    return apr_thread_mutex_unlock(queue->one_big_mutex);
424
0
}
425
426
/**
427
 * Retrieves the next available socket from the queue. If there are no
428
 * sockets available, it will block until one becomes available.
429
 * Once retrieved, the socket is placed into the address specified by
430
 * 'sd'.
431
 */
432
apr_status_t ap_queue_pop_something(fd_queue_t *queue,
433
                                    apr_socket_t **sd, void **sd_baton,
434
                                    apr_pool_t **p, timer_event_t **te_out)
435
0
{
436
0
    fd_queue_elem_t *elem;
437
0
    timer_event_t *te;
438
0
    apr_status_t rv;
439
440
0
    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
441
0
        return rv;
442
0
    }
443
444
    /* Keep waiting until we wake up and find that the queue is not empty. */
445
0
    if (ap_queue_empty(queue)) {
446
0
        if (!queue->terminated) {
447
0
            apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
448
0
        }
449
        /* If we wake up and it's still empty, then we were interrupted */
450
0
        if (ap_queue_empty(queue)) {
451
0
            rv = apr_thread_mutex_unlock(queue->one_big_mutex);
452
0
            if (rv != APR_SUCCESS) {
453
0
                return rv;
454
0
            }
455
0
            if (queue->terminated) {
456
0
                return APR_EOF; /* no more elements ever again */
457
0
            }
458
0
            else {
459
0
                return APR_EINTR;
460
0
            }
461
0
        }
462
0
    }
463
464
0
    te = NULL;
465
0
    if (te_out) {
466
0
        if (!APR_RING_EMPTY(&queue->timers, timer_event_t, link)) {
467
0
            te = APR_RING_FIRST(&queue->timers);
468
0
            APR_RING_REMOVE(te, link);
469
0
        }
470
0
        *te_out = te;
471
0
    }
472
0
    if (!te) {
473
0
        elem = &queue->data[queue->out++];
474
0
        if (queue->out >= queue->bounds)
475
0
            queue->out -= queue->bounds;
476
0
        queue->nelts--;
477
478
0
        *sd = elem->sd;
479
0
        if (sd_baton) {
480
0
            *sd_baton = elem->sd_baton;
481
0
        }
482
0
        *p = elem->p;
483
#ifdef AP_DEBUG
484
        elem->sd = NULL;
485
        elem->p = NULL;
486
#endif /* AP_DEBUG */
487
0
    }
488
489
0
    return apr_thread_mutex_unlock(queue->one_big_mutex);
490
0
}
491
492
static apr_status_t queue_interrupt(fd_queue_t *queue, int all, int term)
493
0
{
494
0
    apr_status_t rv;
495
496
0
    if (queue->terminated) {
497
0
        return APR_EOF;
498
0
    }
499
500
0
    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
501
0
        return rv;
502
0
    }
503
504
    /* we must hold one_big_mutex when setting this... otherwise,
505
     * we could end up setting it and waking everybody up just after a
506
     * would-be popper checks it but right before they block
507
     */
508
0
    if (term) {
509
0
        queue->terminated = 1;
510
0
    }
511
0
    if (all)
512
0
        apr_thread_cond_broadcast(queue->not_empty);
513
0
    else
514
0
        apr_thread_cond_signal(queue->not_empty);
515
516
0
    return apr_thread_mutex_unlock(queue->one_big_mutex);
517
0
}
518
519
apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
520
0
{
521
0
    return queue_interrupt(queue, 1, 0);
522
0
}
523
524
apr_status_t ap_queue_interrupt_one(fd_queue_t *queue)
525
0
{
526
0
    return queue_interrupt(queue, 0, 0);
527
0
}
528
529
apr_status_t ap_queue_term(fd_queue_t *queue)
530
0
{
531
0
    return queue_interrupt(queue, 1, 1);
532
0
}
533
534
#endif /* APR_HAS_THREADS */