/src/h2o/lib/common/multithread.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2015-2016 DeNA Co., Ltd., Kazuho Oku, Tatsuhiko Kubo, |
3 | | * Chul-Woong Yang |
4 | | * |
5 | | * Permission is hereby granted, free of charge, to any person obtaining a copy |
6 | | * of this software and associated documentation files (the "Software"), to |
7 | | * deal in the Software without restriction, including without limitation the |
8 | | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
9 | | * sell copies of the Software, and to permit persons to whom the Software is |
10 | | * furnished to do so, subject to the following conditions: |
11 | | * |
12 | | * The above copyright notice and this permission notice shall be included in |
13 | | * all copies or substantial portions of the Software. |
14 | | * |
15 | | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
16 | | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
17 | | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
18 | | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
19 | | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
20 | | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
21 | | * IN THE SOFTWARE. |
22 | | */ |
23 | | #include <assert.h> |
24 | | #include <pthread.h> |
25 | | #ifdef __linux__ |
26 | | #include <sys/eventfd.h> |
27 | | #endif |
28 | | #include "cloexec.h" |
29 | | #include "h2o/multithread.h" |
30 | | |
31 | | struct st_h2o_multithread_queue_t { |
32 | | #if H2O_USE_LIBUV |
33 | | uv_async_t async; |
34 | | #else |
35 | | struct { |
36 | | int write; |
37 | | h2o_socket_t *read; |
38 | | } async; |
39 | | #endif |
40 | | pthread_mutex_t mutex; |
41 | | struct { |
42 | | h2o_linklist_t active; |
43 | | h2o_linklist_t inactive; |
44 | | } receivers; |
45 | | }; |
46 | | |
47 | | static void queue_cb(h2o_multithread_queue_t *queue) |
48 | 0 | { |
49 | 0 | pthread_mutex_lock(&queue->mutex); |
50 | |
|
51 | 0 | while (!h2o_linklist_is_empty(&queue->receivers.active)) { |
52 | 0 | h2o_multithread_receiver_t *receiver = |
53 | 0 | H2O_STRUCT_FROM_MEMBER(h2o_multithread_receiver_t, _link, queue->receivers.active.next); |
54 | | /* detach all the messages from the receiver */ |
55 | 0 | h2o_linklist_t messages; |
56 | 0 | h2o_linklist_init_anchor(&messages); |
57 | 0 | h2o_linklist_insert_list(&messages, &receiver->_messages); |
58 | | /* relink the receiver to the inactive list */ |
59 | 0 | h2o_linklist_unlink(&receiver->_link); |
60 | 0 | h2o_linklist_insert(&queue->receivers.inactive, &receiver->_link); |
61 | | |
62 | | /* dispatch the messages */ |
63 | 0 | pthread_mutex_unlock(&queue->mutex); |
64 | 0 | receiver->cb(receiver, &messages); |
65 | 0 | assert(h2o_linklist_is_empty(&messages)); |
66 | 0 | pthread_mutex_lock(&queue->mutex); |
67 | 0 | } |
68 | | |
69 | 0 | pthread_mutex_unlock(&queue->mutex); |
70 | 0 | } |
71 | | |
72 | | #ifdef H2O_NO_64BIT_ATOMICS |
73 | | pthread_mutex_t h2o_conn_id_mutex = PTHREAD_MUTEX_INITIALIZER; |
74 | | #endif |
75 | | |
76 | | #if H2O_USE_LIBUV |
77 | | #else |
78 | | |
79 | | #include <errno.h> |
80 | | #include <fcntl.h> |
81 | | #include <unistd.h> |
82 | | |
83 | | static void on_read(h2o_socket_t *sock, const char *err) |
84 | 0 | { |
85 | 0 | if (err != NULL) { |
86 | 0 | h2o_fatal("on_read: %s", err); |
87 | 0 | } |
88 | | |
89 | 0 | h2o_buffer_consume(&sock->input, sock->input->size); |
90 | 0 | queue_cb(sock->data); |
91 | 0 | } |
92 | | |
93 | | static void init_async(h2o_multithread_queue_t *queue, h2o_loop_t *loop) |
94 | 1 | { |
95 | 1 | #if defined(__linux__) |
96 | | /** |
97 | | * The kernel overhead of an eventfd file descriptor is |
98 | | * much lower than that of a pipe, and only one file descriptor is required |
99 | | */ |
100 | 1 | int fd; |
101 | 1 | char buf[128]; |
102 | | |
103 | 1 | fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); |
104 | 1 | if (fd == -1) { |
105 | 0 | h2o_fatal("eventfd: %s", h2o_strerror_r(errno, buf, sizeof(buf))); |
106 | 0 | } |
107 | 1 | queue->async.write = fd; |
108 | 1 | queue->async.read = h2o_evloop_socket_create(loop, fd, 0); |
109 | | #else |
110 | | int fds[2]; |
111 | | char buf[128]; |
112 | | |
113 | | if (cloexec_pipe(fds) != 0) { |
114 | | h2o_fatal("pipe: %s", h2o_strerror_r(errno, buf, sizeof(buf))); |
115 | | } |
116 | | fcntl(fds[1], F_SETFL, O_NONBLOCK); |
117 | | queue->async.write = fds[1]; |
118 | | queue->async.read = h2o_evloop_socket_create(loop, fds[0], 0); |
119 | | #endif |
120 | 1 | queue->async.read->data = queue; |
121 | 1 | h2o_socket_read_start(queue->async.read, on_read); |
122 | 1 | } |
123 | | |
124 | | #endif |
125 | | |
126 | | h2o_multithread_queue_t *h2o_multithread_create_queue(h2o_loop_t *loop) |
127 | 1 | { |
128 | 1 | h2o_multithread_queue_t *queue = h2o_mem_alloc(sizeof(*queue)); |
129 | 1 | memset(queue, 0, sizeof(*queue)); |
130 | | |
131 | | #if H2O_USE_LIBUV |
132 | | uv_async_init(loop, &queue->async, (uv_async_cb)queue_cb); |
133 | | #else |
134 | 1 | init_async(queue, loop); |
135 | 1 | #endif |
136 | 1 | pthread_mutex_init(&queue->mutex, NULL); |
137 | 1 | h2o_linklist_init_anchor(&queue->receivers.active); |
138 | 1 | h2o_linklist_init_anchor(&queue->receivers.inactive); |
139 | | |
140 | 1 | return queue; |
141 | 1 | } |
142 | | |
143 | | #if H2O_USE_LIBUV |
144 | | static void libuv_destroy_delayed(uv_handle_t *handle) |
145 | | { |
146 | | h2o_multithread_queue_t *queue = H2O_STRUCT_FROM_MEMBER(h2o_multithread_queue_t, async, (uv_async_t *)handle); |
147 | | free(queue); |
148 | | } |
149 | | #endif |
150 | | |
151 | | void h2o_multithread_destroy_queue(h2o_multithread_queue_t *queue) |
152 | 0 | { |
153 | 0 | assert(h2o_linklist_is_empty(&queue->receivers.active)); |
154 | 0 | assert(h2o_linklist_is_empty(&queue->receivers.inactive)); |
155 | 0 | pthread_mutex_destroy(&queue->mutex); |
156 | |
|
157 | | #if H2O_USE_LIBUV |
158 | | uv_close((uv_handle_t *)&queue->async, libuv_destroy_delayed); |
159 | | #else |
160 | 0 | h2o_socket_read_stop(queue->async.read); |
161 | 0 | h2o_socket_close(queue->async.read); |
162 | | #ifndef __linux__ |
163 | | /* only one file descriptor is required for eventfd and already closed by h2o_socket_close() */ |
164 | | close(queue->async.write); |
165 | | #endif |
166 | 0 | free(queue); |
167 | 0 | #endif |
168 | 0 | } |
169 | | |
170 | | void h2o_multithread_register_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver, |
171 | | h2o_multithread_receiver_cb cb) |
172 | 1 | { |
173 | 1 | receiver->queue = queue; |
174 | 1 | receiver->_link = (h2o_linklist_t){NULL}; |
175 | 1 | h2o_linklist_init_anchor(&receiver->_messages); |
176 | 1 | receiver->cb = cb; |
177 | | |
178 | 1 | pthread_mutex_lock(&queue->mutex); |
179 | 1 | h2o_linklist_insert(&queue->receivers.inactive, &receiver->_link); |
180 | 1 | pthread_mutex_unlock(&queue->mutex); |
181 | 1 | } |
182 | | |
183 | | void h2o_multithread_unregister_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver) |
184 | 0 | { |
185 | 0 | assert(queue == receiver->queue); |
186 | 0 | assert(h2o_linklist_is_empty(&receiver->_messages)); |
187 | 0 | pthread_mutex_lock(&queue->mutex); |
188 | 0 | h2o_linklist_unlink(&receiver->_link); |
189 | 0 | pthread_mutex_unlock(&queue->mutex); |
190 | 0 | } |
191 | | |
192 | | void h2o_multithread_send_message(h2o_multithread_receiver_t *receiver, h2o_multithread_message_t *message) |
193 | 0 | { |
194 | 0 | int do_send = 0; |
195 | |
|
196 | 0 | pthread_mutex_lock(&receiver->queue->mutex); |
197 | 0 | if (message != NULL) { |
198 | 0 | assert(!h2o_linklist_is_linked(&message->link)); |
199 | 0 | if (h2o_linklist_is_empty(&receiver->_messages)) { |
200 | 0 | h2o_linklist_unlink(&receiver->_link); |
201 | 0 | h2o_linklist_insert(&receiver->queue->receivers.active, &receiver->_link); |
202 | 0 | do_send = 1; |
203 | 0 | } |
204 | 0 | h2o_linklist_insert(&receiver->_messages, &message->link); |
205 | 0 | } else { |
206 | 0 | if (h2o_linklist_is_empty(&receiver->_messages)) |
207 | 0 | do_send = 1; |
208 | 0 | } |
209 | 0 | pthread_mutex_unlock(&receiver->queue->mutex); |
210 | |
|
211 | 0 | if (do_send) { |
212 | | #if H2O_USE_LIBUV |
213 | | uv_async_send(&receiver->queue->async); |
214 | | #else |
215 | 0 | #ifdef __linux__ |
216 | 0 | uint64_t tmp = 1; |
217 | 0 | while (write(receiver->queue->async.write, &tmp, sizeof(tmp)) == -1 && errno == EINTR) |
218 | | #else |
219 | | while (write(receiver->queue->async.write, "", 1) == -1 && errno == EINTR) |
220 | | #endif |
221 | 0 | ; |
222 | 0 | #endif |
223 | 0 | } |
224 | 0 | } |
225 | | |
226 | | void h2o_multithread_create_thread(pthread_t *tid, const pthread_attr_t *attr, void *(*func)(void *), void *arg) |
227 | 0 | { |
228 | 0 | int ret; |
229 | 0 | if ((ret = pthread_create(tid, attr, func, arg)) != 0) { |
230 | 0 | char buf[128]; |
231 | 0 | h2o_fatal("pthread_create: %s", h2o_strerror_r(ret, buf, sizeof(buf))); |
232 | 0 | } |
233 | 0 | } |
234 | | |
235 | | h2o_loop_t *h2o_multithread_get_loop(h2o_multithread_queue_t *queue) |
236 | 0 | { |
237 | 0 | if (queue == NULL) |
238 | 0 | return NULL; |
239 | | #if H2O_USE_LIBUV |
240 | | return ((uv_handle_t *)&queue->async)->loop; |
241 | | #else |
242 | 0 | return h2o_socket_get_loop(queue->async.read); |
243 | 0 | #endif |
244 | 0 | } |
245 | | |
246 | | void h2o_sem_init(h2o_sem_t *sem, ssize_t capacity) |
247 | 0 | { |
248 | 0 | pthread_mutex_init(&sem->_mutex, NULL); |
249 | 0 | pthread_cond_init(&sem->_cond, NULL); |
250 | 0 | sem->_cur = capacity; |
251 | 0 | sem->_capacity = capacity; |
252 | 0 | } |
253 | | |
254 | | void h2o_sem_destroy(h2o_sem_t *sem) |
255 | 0 | { |
256 | 0 | assert(sem->_cur == sem->_capacity); |
257 | 0 | pthread_cond_destroy(&sem->_cond); |
258 | 0 | pthread_mutex_destroy(&sem->_mutex); |
259 | 0 | } |
260 | | |
261 | | void h2o_sem_wait(h2o_sem_t *sem) |
262 | 0 | { |
263 | 0 | pthread_mutex_lock(&sem->_mutex); |
264 | 0 | while (sem->_cur <= 0) |
265 | 0 | pthread_cond_wait(&sem->_cond, &sem->_mutex); |
266 | 0 | --sem->_cur; |
267 | 0 | pthread_mutex_unlock(&sem->_mutex); |
268 | 0 | } |
269 | | |
270 | | void h2o_sem_post(h2o_sem_t *sem) |
271 | 0 | { |
272 | 0 | pthread_mutex_lock(&sem->_mutex); |
273 | 0 | ++sem->_cur; |
274 | 0 | pthread_cond_signal(&sem->_cond); |
275 | 0 | pthread_mutex_unlock(&sem->_mutex); |
276 | 0 | } |
277 | | |
278 | | void h2o_sem_set_capacity(h2o_sem_t *sem, ssize_t new_capacity) |
279 | 0 | { |
280 | 0 | pthread_mutex_lock(&sem->_mutex); |
281 | 0 | sem->_cur += new_capacity - sem->_capacity; |
282 | 0 | sem->_capacity = new_capacity; |
283 | 0 | pthread_cond_broadcast(&sem->_cond); |
284 | 0 | pthread_mutex_unlock(&sem->_mutex); |
285 | 0 | } |
286 | | |
287 | | /* barrier */ |
288 | | |
289 | | void h2o_barrier_init(h2o_barrier_t *barrier, size_t count) |
290 | 1 | { |
291 | 1 | pthread_mutex_init(&barrier->_mutex, NULL); |
292 | 1 | pthread_cond_init(&barrier->_cond, NULL); |
293 | 1 | barrier->_count = count; |
294 | 1 | barrier->_out_of_wait = count; |
295 | 1 | } |
296 | | |
297 | | void h2o_barrier_wait(h2o_barrier_t *barrier) |
298 | 2 | { |
299 | 2 | pthread_mutex_lock(&barrier->_mutex); |
300 | 2 | barrier->_count--; |
301 | 2 | if (barrier->_count == 0) { |
302 | 1 | pthread_cond_broadcast(&barrier->_cond); |
303 | 1 | } else { |
304 | 2 | while (barrier->_count != 0) |
305 | 1 | pthread_cond_wait(&barrier->_cond, &barrier->_mutex); |
306 | 1 | } |
307 | 2 | pthread_mutex_unlock(&barrier->_mutex); |
308 | | /* This is needed to synchronize h2o_barrier_dispose with the exit of this function, so make sure that we can't destroy the |
309 | | * mutex or the condition before all threads have exited wait(). */ |
310 | 2 | __sync_sub_and_fetch(&barrier->_out_of_wait, 1); |
311 | 2 | } |
312 | | |
313 | | int h2o_barrier_done(h2o_barrier_t *barrier) |
314 | 0 | { |
315 | 0 | return __sync_add_and_fetch(&barrier->_count, 0) == 0; |
316 | 0 | } |
317 | | |
318 | | void h2o_barrier_add(h2o_barrier_t *barrier, size_t delta) |
319 | 0 | { |
320 | 0 | __sync_add_and_fetch(&barrier->_count, delta); |
321 | 0 | } |
322 | | |
323 | | void h2o_barrier_dispose(h2o_barrier_t *barrier) |
324 | 0 | { |
325 | 0 | while (__sync_add_and_fetch(&barrier->_out_of_wait, 0) != 0) { |
326 | 0 | sched_yield(); |
327 | 0 | } |
328 | 0 | pthread_mutex_destroy(&barrier->_mutex); |
329 | 0 | pthread_cond_destroy(&barrier->_cond); |
330 | 0 | } |
331 | | |
332 | | void h2o_error_reporter__on_timeout(h2o_timer_t *_timer) |
333 | 0 | { |
334 | 0 | h2o_error_reporter_t *reporter = H2O_STRUCT_FROM_MEMBER(h2o_error_reporter_t, _timer, _timer); |
335 | |
|
336 | 0 | pthread_mutex_lock(&reporter->_mutex); |
337 | |
|
338 | 0 | uint64_t total_successes = __sync_fetch_and_add(&reporter->_total_successes, 0), |
339 | 0 | cur_successes = total_successes - reporter->prev_successes; |
340 | |
|
341 | 0 | reporter->_report_errors(reporter, total_successes, cur_successes); |
342 | |
|
343 | 0 | reporter->prev_successes = total_successes; |
344 | 0 | reporter->cur_errors = 0; |
345 | |
|
346 | 0 | pthread_mutex_unlock(&reporter->_mutex); |
347 | 0 | } |
348 | | |
349 | | uintptr_t h2o_error_reporter_record_error(h2o_loop_t *loop, h2o_error_reporter_t *reporter, uint64_t delay_ticks, |
350 | | uintptr_t new_data) |
351 | 0 | { |
352 | 0 | uintptr_t old_data; |
353 | |
|
354 | 0 | pthread_mutex_lock(&reporter->_mutex); |
355 | |
|
356 | 0 | if (reporter->cur_errors == 0) { |
357 | 0 | reporter->prev_successes = __sync_fetch_and_add_8(&reporter->_total_successes, 0); |
358 | 0 | assert(!h2o_timer_is_linked(&reporter->_timer)); |
359 | 0 | h2o_timer_link(loop, delay_ticks, &reporter->_timer); |
360 | 0 | } |
361 | 0 | ++reporter->cur_errors; |
362 | 0 | old_data = reporter->data; |
363 | 0 | reporter->data = new_data; |
364 | |
|
365 | 0 | pthread_mutex_unlock(&reporter->_mutex); |
366 | |
|
367 | 0 | return old_data; |
368 | 0 | } |