Coverage Report

Created: 2025-06-22 06:17

/src/h2o/lib/common/multithread.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2015-2016 DeNA Co., Ltd., Kazuho Oku, Tatsuhiko Kubo,
3
 *                         Chul-Woong Yang
4
 *
5
 * Permission is hereby granted, free of charge, to any person obtaining a copy
6
 * of this software and associated documentation files (the "Software"), to
7
 * deal in the Software without restriction, including without limitation the
8
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9
 * sell copies of the Software, and to permit persons to whom the Software is
10
 * furnished to do so, subject to the following conditions:
11
 *
12
 * The above copyright notice and this permission notice shall be included in
13
 * all copies or substantial portions of the Software.
14
 *
15
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21
 * IN THE SOFTWARE.
22
 */
23
#include <assert.h>
24
#include <pthread.h>
25
#ifdef __linux__
26
#include <sys/eventfd.h>
27
#endif
28
#include "cloexec.h"
29
#include "h2o/multithread.h"
30
31
struct st_h2o_multithread_queue_t {
32
#if H2O_USE_LIBUV
33
    uv_async_t async;
34
#else
35
    struct {
36
        int write;
37
        h2o_socket_t *read;
38
    } async;
39
#endif
40
    pthread_mutex_t mutex;
41
    struct {
42
        h2o_linklist_t active;
43
        h2o_linklist_t inactive;
44
    } receivers;
45
};
46
47
static void queue_cb(h2o_multithread_queue_t *queue)
48
0
{
49
0
    pthread_mutex_lock(&queue->mutex);
50
51
0
    while (!h2o_linklist_is_empty(&queue->receivers.active)) {
52
0
        h2o_multithread_receiver_t *receiver =
53
0
            H2O_STRUCT_FROM_MEMBER(h2o_multithread_receiver_t, _link, queue->receivers.active.next);
54
        /* detach all the messages from the receiver */
55
0
        h2o_linklist_t messages;
56
0
        h2o_linklist_init_anchor(&messages);
57
0
        h2o_linklist_insert_list(&messages, &receiver->_messages);
58
        /* relink the receiver to the inactive list */
59
0
        h2o_linklist_unlink(&receiver->_link);
60
0
        h2o_linklist_insert(&queue->receivers.inactive, &receiver->_link);
61
62
        /* dispatch the messages */
63
0
        pthread_mutex_unlock(&queue->mutex);
64
0
        receiver->cb(receiver, &messages);
65
0
        assert(h2o_linklist_is_empty(&messages));
66
0
        pthread_mutex_lock(&queue->mutex);
67
0
    }
68
69
0
    pthread_mutex_unlock(&queue->mutex);
70
0
}
71
72
#ifdef H2O_NO_64BIT_ATOMICS
73
pthread_mutex_t h2o_conn_id_mutex = PTHREAD_MUTEX_INITIALIZER;
74
#endif
75
76
#if H2O_USE_LIBUV
77
#else
78
79
#include <errno.h>
80
#include <fcntl.h>
81
#include <unistd.h>
82
83
static void on_read(h2o_socket_t *sock, const char *err)
84
0
{
85
0
    if (err != NULL) {
86
0
        h2o_fatal("on_read: %s", err);
87
0
    }
88
89
0
    h2o_buffer_consume(&sock->input, sock->input->size);
90
0
    queue_cb(sock->data);
91
0
}
92
93
static void init_async(h2o_multithread_queue_t *queue, h2o_loop_t *loop)
94
1
{
95
1
#if defined(__linux__)
96
    /**
97
     * The kernel overhead of an eventfd file descriptor is
98
     * much lower than that of a pipe, and only one file descriptor is required
99
     */
100
1
    int fd;
101
1
    char buf[128];
102
103
1
    fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
104
1
    if (fd == -1) {
105
0
        h2o_fatal("eventfd: %s", h2o_strerror_r(errno, buf, sizeof(buf)));
106
0
    }
107
1
    queue->async.write = fd;
108
1
    queue->async.read = h2o_evloop_socket_create(loop, fd, 0);
109
#else
110
    int fds[2];
111
    char buf[128];
112
113
    if (cloexec_pipe(fds) != 0) {
114
        h2o_fatal("pipe: %s", h2o_strerror_r(errno, buf, sizeof(buf)));
115
    }
116
    fcntl(fds[1], F_SETFL, O_NONBLOCK);
117
    queue->async.write = fds[1];
118
    queue->async.read = h2o_evloop_socket_create(loop, fds[0], 0);
119
#endif
120
1
    queue->async.read->data = queue;
121
1
    h2o_socket_read_start(queue->async.read, on_read);
122
1
}
123
124
#endif
125
126
h2o_multithread_queue_t *h2o_multithread_create_queue(h2o_loop_t *loop)
127
1
{
128
1
    h2o_multithread_queue_t *queue = h2o_mem_alloc(sizeof(*queue));
129
1
    memset(queue, 0, sizeof(*queue));
130
131
#if H2O_USE_LIBUV
132
    uv_async_init(loop, &queue->async, (uv_async_cb)queue_cb);
133
#else
134
1
    init_async(queue, loop);
135
1
#endif
136
1
    pthread_mutex_init(&queue->mutex, NULL);
137
1
    h2o_linklist_init_anchor(&queue->receivers.active);
138
1
    h2o_linklist_init_anchor(&queue->receivers.inactive);
139
140
1
    return queue;
141
1
}
142
143
#if H2O_USE_LIBUV
144
static void libuv_destroy_delayed(uv_handle_t *handle)
145
{
146
    h2o_multithread_queue_t *queue = H2O_STRUCT_FROM_MEMBER(h2o_multithread_queue_t, async, (uv_async_t *)handle);
147
    free(queue);
148
}
149
#endif
150
151
void h2o_multithread_destroy_queue(h2o_multithread_queue_t *queue)
152
0
{
153
0
    assert(h2o_linklist_is_empty(&queue->receivers.active));
154
0
    assert(h2o_linklist_is_empty(&queue->receivers.inactive));
155
0
    pthread_mutex_destroy(&queue->mutex);
156
157
#if H2O_USE_LIBUV
158
    uv_close((uv_handle_t *)&queue->async, libuv_destroy_delayed);
159
#else
160
0
    h2o_socket_read_stop(queue->async.read);
161
0
    h2o_socket_close(queue->async.read);
162
#ifndef __linux__
163
    /* only one file descriptor is required for eventfd and already closed by h2o_socket_close() */
164
    close(queue->async.write);
165
#endif
166
0
    free(queue);
167
0
#endif
168
0
}
169
170
void h2o_multithread_register_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver,
171
                                       h2o_multithread_receiver_cb cb)
172
1
{
173
1
    receiver->queue = queue;
174
1
    receiver->_link = (h2o_linklist_t){NULL};
175
1
    h2o_linklist_init_anchor(&receiver->_messages);
176
1
    receiver->cb = cb;
177
178
1
    pthread_mutex_lock(&queue->mutex);
179
1
    h2o_linklist_insert(&queue->receivers.inactive, &receiver->_link);
180
1
    pthread_mutex_unlock(&queue->mutex);
181
1
}
182
183
void h2o_multithread_unregister_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver)
184
0
{
185
0
    assert(queue == receiver->queue);
186
0
    assert(h2o_linklist_is_empty(&receiver->_messages));
187
0
    pthread_mutex_lock(&queue->mutex);
188
0
    h2o_linklist_unlink(&receiver->_link);
189
0
    pthread_mutex_unlock(&queue->mutex);
190
0
}
191
192
void h2o_multithread_send_message(h2o_multithread_receiver_t *receiver, h2o_multithread_message_t *message)
193
0
{
194
0
    int do_send = 0;
195
196
0
    pthread_mutex_lock(&receiver->queue->mutex);
197
0
    if (message != NULL) {
198
0
        assert(!h2o_linklist_is_linked(&message->link));
199
0
        if (h2o_linklist_is_empty(&receiver->_messages)) {
200
0
            h2o_linklist_unlink(&receiver->_link);
201
0
            h2o_linklist_insert(&receiver->queue->receivers.active, &receiver->_link);
202
0
            do_send = 1;
203
0
        }
204
0
        h2o_linklist_insert(&receiver->_messages, &message->link);
205
0
    } else {
206
0
        if (h2o_linklist_is_empty(&receiver->_messages))
207
0
            do_send = 1;
208
0
    }
209
0
    pthread_mutex_unlock(&receiver->queue->mutex);
210
211
0
    if (do_send) {
212
#if H2O_USE_LIBUV
213
        uv_async_send(&receiver->queue->async);
214
#else
215
0
#ifdef __linux__
216
0
        uint64_t tmp = 1;
217
0
        while (write(receiver->queue->async.write, &tmp, sizeof(tmp)) == -1 && errno == EINTR)
218
#else
219
        while (write(receiver->queue->async.write, "", 1) == -1 && errno == EINTR)
220
#endif
221
0
            ;
222
0
#endif
223
0
    }
224
0
}
225
226
void h2o_multithread_create_thread(pthread_t *tid, const pthread_attr_t *attr, void *(*func)(void *), void *arg)
227
0
{
228
0
    int ret;
229
0
    if ((ret = pthread_create(tid, attr, func, arg)) != 0) {
230
0
        char buf[128];
231
0
        h2o_fatal("pthread_create: %s", h2o_strerror_r(ret, buf, sizeof(buf)));
232
0
    }
233
0
}
234
235
h2o_loop_t *h2o_multithread_get_loop(h2o_multithread_queue_t *queue)
236
0
{
237
0
    if (queue == NULL)
238
0
        return NULL;
239
#if H2O_USE_LIBUV
240
    return ((uv_handle_t *)&queue->async)->loop;
241
#else
242
0
    return h2o_socket_get_loop(queue->async.read);
243
0
#endif
244
0
}
245
246
void h2o_sem_init(h2o_sem_t *sem, ssize_t capacity)
247
0
{
248
0
    pthread_mutex_init(&sem->_mutex, NULL);
249
0
    pthread_cond_init(&sem->_cond, NULL);
250
0
    sem->_cur = capacity;
251
0
    sem->_capacity = capacity;
252
0
}
253
254
void h2o_sem_destroy(h2o_sem_t *sem)
255
0
{
256
0
    assert(sem->_cur == sem->_capacity);
257
0
    pthread_cond_destroy(&sem->_cond);
258
0
    pthread_mutex_destroy(&sem->_mutex);
259
0
}
260
261
void h2o_sem_wait(h2o_sem_t *sem)
262
0
{
263
0
    pthread_mutex_lock(&sem->_mutex);
264
0
    while (sem->_cur <= 0)
265
0
        pthread_cond_wait(&sem->_cond, &sem->_mutex);
266
0
    --sem->_cur;
267
0
    pthread_mutex_unlock(&sem->_mutex);
268
0
}
269
270
void h2o_sem_post(h2o_sem_t *sem)
271
0
{
272
0
    pthread_mutex_lock(&sem->_mutex);
273
0
    ++sem->_cur;
274
0
    pthread_cond_signal(&sem->_cond);
275
0
    pthread_mutex_unlock(&sem->_mutex);
276
0
}
277
278
void h2o_sem_set_capacity(h2o_sem_t *sem, ssize_t new_capacity)
279
0
{
280
0
    pthread_mutex_lock(&sem->_mutex);
281
0
    sem->_cur += new_capacity - sem->_capacity;
282
0
    sem->_capacity = new_capacity;
283
0
    pthread_cond_broadcast(&sem->_cond);
284
0
    pthread_mutex_unlock(&sem->_mutex);
285
0
}
286
287
/* barrier */
288
289
void h2o_barrier_init(h2o_barrier_t *barrier, size_t count)
290
1
{
291
1
    pthread_mutex_init(&barrier->_mutex, NULL);
292
1
    pthread_cond_init(&barrier->_cond, NULL);
293
1
    barrier->_count = count;
294
1
    barrier->_out_of_wait = count;
295
1
}
296
297
void h2o_barrier_wait(h2o_barrier_t *barrier)
298
2
{
299
2
    pthread_mutex_lock(&barrier->_mutex);
300
2
    barrier->_count--;
301
2
    if (barrier->_count == 0) {
302
1
        pthread_cond_broadcast(&barrier->_cond);
303
1
    } else {
304
2
        while (barrier->_count != 0)
305
1
            pthread_cond_wait(&barrier->_cond, &barrier->_mutex);
306
1
    }
307
2
    pthread_mutex_unlock(&barrier->_mutex);
308
    /* This is needed to synchronize h2o_barrier_dispose with the exit of this function, so make sure that we can't destroy the
309
     * mutex or the condition before all threads have exited wait(). */
310
2
    __sync_sub_and_fetch(&barrier->_out_of_wait, 1);
311
2
}
312
313
int h2o_barrier_done(h2o_barrier_t *barrier)
314
0
{
315
0
    return __sync_add_and_fetch(&barrier->_count, 0) == 0;
316
0
}
317
318
void h2o_barrier_add(h2o_barrier_t *barrier, size_t delta)
319
0
{
320
0
    __sync_add_and_fetch(&barrier->_count, delta);
321
0
}
322
323
void h2o_barrier_dispose(h2o_barrier_t *barrier)
324
0
{
325
0
    while (__sync_add_and_fetch(&barrier->_out_of_wait, 0) != 0) {
326
0
        sched_yield();
327
0
    }
328
0
    pthread_mutex_destroy(&barrier->_mutex);
329
0
    pthread_cond_destroy(&barrier->_cond);
330
0
}
331
332
void h2o_error_reporter__on_timeout(h2o_timer_t *_timer)
333
0
{
334
0
    h2o_error_reporter_t *reporter = H2O_STRUCT_FROM_MEMBER(h2o_error_reporter_t, _timer, _timer);
335
336
0
    pthread_mutex_lock(&reporter->_mutex);
337
338
0
    uint64_t total_successes = __sync_fetch_and_add(&reporter->_total_successes, 0),
339
0
             cur_successes = total_successes - reporter->prev_successes;
340
341
0
    reporter->_report_errors(reporter, total_successes, cur_successes);
342
343
0
    reporter->prev_successes = total_successes;
344
0
    reporter->cur_errors = 0;
345
346
0
    pthread_mutex_unlock(&reporter->_mutex);
347
0
}
348
349
uintptr_t h2o_error_reporter_record_error(h2o_loop_t *loop, h2o_error_reporter_t *reporter, uint64_t delay_ticks,
350
                                          uintptr_t new_data)
351
0
{
352
0
    uintptr_t old_data;
353
354
0
    pthread_mutex_lock(&reporter->_mutex);
355
356
0
    if (reporter->cur_errors == 0) {
357
0
        reporter->prev_successes = __sync_fetch_and_add_8(&reporter->_total_successes, 0);
358
0
        assert(!h2o_timer_is_linked(&reporter->_timer));
359
0
        h2o_timer_link(loop, delay_ticks, &reporter->_timer);
360
0
    }
361
0
    ++reporter->cur_errors;
362
0
    old_data = reporter->data;
363
0
    reporter->data = new_data;
364
365
0
    pthread_mutex_unlock(&reporter->_mutex);
366
367
0
    return old_data;
368
0
}