/src/fluent-bit/lib/monkey/mk_server/mk_scheduler.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Monkey HTTP Server |
4 | | * ================== |
5 | | * Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io> |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <monkey/monkey.h> |
21 | | #include <monkey/mk_info.h> |
22 | | #include <monkey/mk_core.h> |
23 | | #include <monkey/mk_vhost.h> |
24 | | #include <monkey/mk_scheduler.h> |
25 | | #include <monkey/mk_scheduler_tls.h> |
26 | | #include <monkey/mk_server.h> |
27 | | #include <monkey/mk_thread.h> |
28 | | #include <monkey/mk_cache.h> |
29 | | #include <monkey/mk_config.h> |
30 | | #include <monkey/mk_clock.h> |
31 | | #include <monkey/mk_plugin.h> |
32 | | #include <monkey/mk_utils.h> |
33 | | #include <monkey/mk_linuxtrace.h> |
34 | | #include <monkey/mk_server.h> |
35 | | #include <monkey/mk_plugin_stage.h> |
36 | | #include <monkey/mk_http_thread.h> |
37 | | |
38 | | #include <signal.h> |
39 | | |
40 | | #ifndef _WIN32 |
41 | | #include <sys/syscall.h> |
42 | | #endif |
43 | | |
44 | | extern struct mk_sched_handler mk_http_handler; |
45 | | extern struct mk_sched_handler mk_http2_handler; |
46 | | |
47 | | pthread_mutex_t mutex_worker_init = PTHREAD_MUTEX_INITIALIZER; |
48 | | pthread_mutex_t mutex_worker_exit = PTHREAD_MUTEX_INITIALIZER; |
49 | | |
50 | | /* |
51 | | * Returns the worker id which should take a new incomming connection, |
52 | | * it returns the worker id with less active connections. Just used |
53 | | * if config->scheduler_mode is MK_SCHEDULER_FAIR_BALANCING. |
54 | | */ |
55 | | static inline int _next_target(struct mk_server *server) |
56 | 0 | { |
57 | 0 | int i; |
58 | 0 | int target = 0; |
59 | 0 | unsigned long long tmp = 0, cur = 0; |
60 | 0 | struct mk_sched_ctx *ctx = server->sched_ctx; |
61 | 0 | struct mk_sched_worker *worker; |
62 | |
|
63 | 0 | cur = (ctx->workers[0].accepted_connections - |
64 | 0 | ctx->workers[0].closed_connections); |
65 | 0 | if (cur == 0) { |
66 | 0 | return 0; |
67 | 0 | } |
68 | | |
69 | | /* Finds the lowest load worker */ |
70 | 0 | for (i = 1; i < server->workers; i++) { |
71 | 0 | worker = &ctx->workers[i]; |
72 | 0 | tmp = worker->accepted_connections - worker->closed_connections; |
73 | 0 | if (tmp < cur) { |
74 | 0 | target = i; |
75 | 0 | cur = tmp; |
76 | |
|
77 | 0 | if (cur == 0) |
78 | 0 | break; |
79 | 0 | } |
80 | 0 | } |
81 | | |
82 | | /* |
83 | | * If sched_ctx->workers[target] worker is full then the whole server too, |
84 | | * because it has the lowest load. |
85 | | */ |
86 | 0 | if (mk_unlikely(server->server_capacity > 0 && |
87 | 0 | server->server_capacity <= cur)) { |
88 | 0 | MK_TRACE("Too many clients: %i", server->server_capacity); |
89 | | |
90 | | /* Instruct to close the connection anyways, we lie, it will die */ |
91 | 0 | return -1; |
92 | 0 | } |
93 | | |
94 | 0 | return target; |
95 | 0 | } |
96 | | |
97 | | struct mk_sched_worker *mk_sched_next_target(struct mk_server *server) |
98 | 0 | { |
99 | 0 | int t; |
100 | 0 | struct mk_sched_ctx *ctx = server->sched_ctx; |
101 | |
|
102 | 0 | t = _next_target(server); |
103 | 0 | if (mk_likely(t != -1)) { |
104 | 0 | return &ctx->workers[t]; |
105 | 0 | } |
106 | | |
107 | 0 | return NULL; |
108 | 0 | } |
109 | | |
110 | | /* |
111 | | * This function is invoked when the core triggers a MK_SCHED_SIGNAL_FREE_ALL |
112 | | * event through the signal channels, it means the server will stop working |
113 | | * so this is the last call to release all memory resources in use. Of course |
114 | | * this takes place in a thread context. |
115 | | */ |
116 | | void mk_sched_worker_free(struct mk_server *server) |
117 | 0 | { |
118 | 0 | int i; |
119 | 0 | pthread_t tid; |
120 | 0 | struct mk_sched_ctx *ctx = server->sched_ctx; |
121 | 0 | struct mk_sched_worker *worker = NULL; |
122 | |
|
123 | 0 | pthread_mutex_lock(&mutex_worker_exit); |
124 | | |
125 | | /* |
126 | | * Fix Me: needs to implement API to make plugins release |
127 | | * their resources first at WORKER LEVEL |
128 | | */ |
129 | | |
130 | | /* External */ |
131 | 0 | mk_plugin_exit_worker(); |
132 | 0 | mk_vhost_fdt_worker_exit(server); |
133 | 0 | mk_cache_worker_exit(); |
134 | | |
135 | | /* Scheduler stuff */ |
136 | 0 | tid = pthread_self(); |
137 | 0 | for (i = 0; i < server->workers; i++) { |
138 | 0 | worker = &ctx->workers[i]; |
139 | 0 | if (worker->tid == tid) { |
140 | 0 | break; |
141 | 0 | } |
142 | 0 | worker = NULL; |
143 | 0 | } |
144 | |
|
145 | 0 | mk_bug(!worker); |
146 | | |
147 | | /* FIXME!: there is nothing done here with the worker context */ |
148 | | |
149 | | /* Free master array (av queue & busy queue) */ |
150 | 0 | mk_mem_free(MK_TLS_GET(mk_tls_sched_cs)); |
151 | 0 | mk_mem_free(MK_TLS_GET(mk_tls_sched_cs_incomplete)); |
152 | 0 | mk_mem_free(MK_TLS_GET(mk_tls_sched_worker_notif)); |
153 | 0 | pthread_mutex_unlock(&mutex_worker_exit); |
154 | 0 | } |
155 | | |
156 | | struct mk_sched_handler *mk_sched_handler_cap(char cap) |
157 | 0 | { |
158 | 0 | if (cap == MK_CAP_HTTP) { |
159 | 0 | return &mk_http_handler; |
160 | 0 | } |
161 | | |
162 | | #ifdef MK_HAVE_HTTP2 |
163 | | else if (cap == MK_CAP_HTTP2) { |
164 | | return &mk_http2_handler; |
165 | | } |
166 | | #endif |
167 | | |
168 | 0 | return NULL; |
169 | 0 | } |
170 | | |
171 | | /* |
172 | | * Register a new client connection into the scheduler, this call takes place |
173 | | * inside the worker/thread context. |
174 | | */ |
175 | | struct mk_sched_conn *mk_sched_add_connection(int remote_fd, |
176 | | struct mk_server_listen *listener, |
177 | | struct mk_sched_worker *sched, |
178 | | struct mk_server *server) |
179 | 0 | { |
180 | 0 | int ret; |
181 | 0 | int size; |
182 | 0 | struct mk_sched_handler *handler; |
183 | 0 | struct mk_sched_conn *conn; |
184 | 0 | struct mk_event *event; |
185 | | |
186 | | /* Before to continue, we need to run plugin stage 10 */ |
187 | 0 | ret = mk_plugin_stage_run_10(remote_fd, server); |
188 | | |
189 | | /* Close connection, otherwise continue */ |
190 | 0 | if (ret == MK_PLUGIN_RET_CLOSE_CONX) { |
191 | 0 | listener->network->network->close(listener->network, remote_fd); |
192 | 0 | MK_LT_SCHED(remote_fd, "PLUGIN_CLOSE"); |
193 | 0 | return NULL; |
194 | 0 | } |
195 | | |
196 | 0 | handler = listener->protocol; |
197 | 0 | if (handler->sched_extra_size > 0) { |
198 | 0 | void *data; |
199 | 0 | size = (sizeof(struct mk_sched_conn) + handler->sched_extra_size); |
200 | 0 | data = mk_mem_alloc_z(size); |
201 | 0 | conn = (struct mk_sched_conn *) data; |
202 | 0 | } |
203 | 0 | else { |
204 | 0 | conn = mk_mem_alloc_z(sizeof(struct mk_sched_conn)); |
205 | 0 | } |
206 | |
|
207 | 0 | if (!conn) { |
208 | 0 | mk_err("[server] Could not register client"); |
209 | 0 | return NULL; |
210 | 0 | } |
211 | | |
212 | 0 | event = &conn->event; |
213 | 0 | event->fd = remote_fd; |
214 | 0 | event->type = MK_EVENT_CONNECTION; |
215 | 0 | event->mask = MK_EVENT_EMPTY; |
216 | 0 | event->status = MK_EVENT_NONE; |
217 | 0 | conn->arrive_time = server->clock_context->log_current_utime; |
218 | 0 | conn->protocol = handler; |
219 | 0 | conn->net = listener->network->network; |
220 | 0 | conn->is_timeout_on = MK_FALSE; |
221 | 0 | conn->server_listen = listener; |
222 | | |
223 | | /* Stream channel */ |
224 | 0 | conn->channel.type = MK_CHANNEL_SOCKET; /* channel type */ |
225 | 0 | conn->channel.fd = remote_fd; /* socket conn */ |
226 | 0 | conn->channel.io = conn->net; /* network layer */ |
227 | 0 | conn->channel.event = event; /* parent event ref */ |
228 | 0 | mk_list_init(&conn->channel.streams); |
229 | | |
230 | | /* |
231 | | * Register the connections into the timeout_queue: |
232 | | * |
233 | | * When a new connection arrives, we cannot assume it contains some data |
234 | | * to read, meaning the event loop may not get notifications and the protocol |
235 | | * handler will never be called. So in order to avoid DDoS we always register |
236 | | * this session in the timeout_queue for further lookup. |
237 | | * |
238 | | * The protocol handler is in charge to remove the session from the |
239 | | * timeout_queue. |
240 | | */ |
241 | 0 | mk_sched_conn_timeout_add(conn, sched); |
242 | | |
243 | | /* Linux trace message */ |
244 | 0 | MK_LT_SCHED(remote_fd, "REGISTERED"); |
245 | |
|
246 | 0 | return conn; |
247 | 0 | } |
248 | | |
249 | | static void mk_sched_thread_lists_init() |
250 | 0 | { |
251 | 0 | struct mk_list *sched_cs_incomplete; |
252 | | |
253 | | /* mk_tls_sched_cs_incomplete */ |
254 | 0 | sched_cs_incomplete = mk_mem_alloc(sizeof(struct mk_list)); |
255 | 0 | mk_list_init(sched_cs_incomplete); |
256 | 0 | MK_TLS_SET(mk_tls_sched_cs_incomplete, sched_cs_incomplete); |
257 | 0 | } |
258 | | |
259 | | /* Register thread information. The caller thread is the thread information's owner */ |
260 | | static int mk_sched_register_thread(struct mk_server *server) |
261 | 0 | { |
262 | 0 | struct mk_sched_ctx *ctx = server->sched_ctx; |
263 | 0 | struct mk_sched_worker *worker; |
264 | | |
265 | | /* |
266 | | * If this thread slept inside this section, some other thread may touch |
267 | | * server->worker_id. |
268 | | * So protect it with a mutex, only one thread may handle server->worker_id. |
269 | | * |
270 | | * Note : Let's use the platform agnostic atomics we implemented in cmetrics here |
271 | | * instead of a lock. |
272 | | */ |
273 | 0 | worker = &ctx->workers[server->worker_id]; |
274 | 0 | worker->idx = server->worker_id++; |
275 | 0 | worker->tid = pthread_self(); |
276 | |
|
277 | 0 | #if defined(__linux__) |
278 | | /* |
279 | | * Under Linux does not exists the difference between process and |
280 | | * threads, everything is a thread in the kernel task struct, and each |
281 | | * one has it's own numerical identificator: PID . |
282 | | * |
283 | | * Here we want to know what's the PID associated to this running |
284 | | * task (which is different from parent Monkey PID), it can be |
285 | | * retrieved with gettid() but Glibc does not export to userspace |
286 | | * the syscall, we need to call it directly through syscall(2). |
287 | | */ |
288 | 0 | worker->pid = syscall(__NR_gettid); |
289 | | #elif defined(__APPLE__) |
290 | | uint64_t tid; |
291 | | pthread_threadid_np(NULL, &tid); |
292 | | worker->pid = tid; |
293 | | #else |
294 | | worker->pid = 0xdeadbeef; |
295 | | #endif |
296 | | |
297 | | /* Initialize lists */ |
298 | 0 | mk_list_init(&worker->timeout_queue); |
299 | 0 | worker->request_handler = NULL; |
300 | |
|
301 | 0 | return worker->idx; |
302 | 0 | } |
303 | | |
304 | | static void mk_signal_thread_sigpipe_safe() |
305 | 0 | { |
306 | 0 | #ifndef _WIN32 |
307 | 0 | sigset_t old; |
308 | 0 | sigset_t set; |
309 | |
|
310 | 0 | sigemptyset(&set); |
311 | 0 | sigaddset(&set, SIGPIPE); |
312 | 0 | pthread_sigmask(SIG_BLOCK, &set, &old); |
313 | 0 | #endif |
314 | 0 | } |
315 | | |
316 | | /* created thread, all these calls are in the thread context */ |
317 | | void *mk_sched_launch_worker_loop(void *data) |
318 | 0 | { |
319 | 0 | int ret; |
320 | 0 | int wid; |
321 | 0 | unsigned long len; |
322 | 0 | char *thread_name = 0; |
323 | 0 | struct mk_list *head; |
324 | 0 | struct mk_sched_worker_cb *wcb; |
325 | 0 | struct mk_sched_worker *sched = NULL; |
326 | 0 | struct mk_sched_notif *notif = NULL; |
327 | 0 | struct mk_sched_thread_conf *thinfo = data; |
328 | 0 | struct mk_sched_ctx *ctx; |
329 | 0 | struct mk_server *server; |
330 | |
|
331 | 0 | server = thinfo->server; |
332 | 0 | ctx = server->sched_ctx; |
333 | | |
334 | | /* Avoid SIGPIPE signals on this thread */ |
335 | 0 | mk_signal_thread_sigpipe_safe(); |
336 | | |
337 | | /* Init specific thread cache */ |
338 | 0 | mk_sched_thread_lists_init(); |
339 | 0 | mk_cache_worker_init(); |
340 | | |
341 | | /* Virtual hosts: initialize per thread-vhost data */ |
342 | 0 | mk_vhost_fdt_worker_init(server); |
343 | | |
344 | | /* Register working thread */ |
345 | 0 | wid = mk_sched_register_thread(server); |
346 | 0 | sched = &ctx->workers[wid]; |
347 | 0 | sched->loop = mk_event_loop_create(MK_EVENT_QUEUE_SIZE); |
348 | 0 | if (!sched->loop) { |
349 | 0 | mk_err("Error creating Scheduler loop"); |
350 | 0 | exit(EXIT_FAILURE); |
351 | 0 | } |
352 | | |
353 | | |
354 | 0 | sched->mem_pagesize = mk_utils_get_system_page_size(); |
355 | | |
356 | | /* |
357 | | * Create the notification instance and link it to the worker |
358 | | * thread-scope list. |
359 | | */ |
360 | 0 | notif = mk_mem_alloc_z(sizeof(struct mk_sched_notif)); |
361 | 0 | MK_TLS_SET(mk_tls_sched_worker_notif, notif); |
362 | | |
363 | | /* Register the scheduler channel to signal active workers */ |
364 | 0 | ret = mk_event_channel_create(sched->loop, |
365 | 0 | &sched->signal_channel_r, |
366 | 0 | &sched->signal_channel_w, |
367 | 0 | notif); |
368 | 0 | if (ret < 0) { |
369 | 0 | exit(EXIT_FAILURE); |
370 | 0 | } |
371 | | |
372 | 0 | mk_list_init(&sched->event_free_queue); |
373 | 0 | mk_list_init(&sched->threads); |
374 | 0 | mk_list_init(&sched->threads_purge); |
375 | | |
376 | | /* |
377 | | * ULONG_MAX BUG test only |
378 | | * ======================= |
379 | | * to test the workaround we can use the following value: |
380 | | * |
381 | | * thinfo->closed_connections = 1000; |
382 | | */ |
383 | | |
384 | | //thinfo->ctx = thconf->ctx; |
385 | | |
386 | | /* Rename worker */ |
387 | 0 | mk_string_build(&thread_name, &len, "monkey: wrk/%i", sched->idx); |
388 | 0 | mk_utils_worker_rename(thread_name); |
389 | 0 | mk_mem_free(thread_name); |
390 | | |
391 | | /* Export known scheduler node to context thread */ |
392 | 0 | MK_TLS_SET(mk_tls_sched_worker_node, sched); |
393 | 0 | mk_plugin_core_thread(server); |
394 | |
|
395 | 0 | if (server->scheduler_mode == MK_SCHEDULER_REUSEPORT) { |
396 | 0 | sched->listeners = mk_server_listen_init(server); |
397 | 0 | if (!sched->listeners) { |
398 | 0 | exit(EXIT_FAILURE); |
399 | 0 | } |
400 | 0 | } |
401 | | |
402 | | /* Unlock the conditional initializator */ |
403 | 0 | pthread_mutex_lock(&server->pth_mutex); |
404 | 0 | server->pth_init = MK_TRUE; |
405 | 0 | pthread_cond_signal(&server->pth_cond); |
406 | 0 | pthread_mutex_unlock(&server->pth_mutex); |
407 | | |
408 | | /* Invoke custom worker-callbacks defined by the scheduler (lib) */ |
409 | 0 | mk_list_foreach(head, &server->sched_worker_callbacks) { |
410 | 0 | wcb = mk_list_entry(head, struct mk_sched_worker_cb, _head); |
411 | 0 | wcb->cb_func(wcb->data); |
412 | 0 | } |
413 | |
|
414 | 0 | mk_mem_free(thinfo); |
415 | | |
416 | | /* init server thread loop */ |
417 | 0 | mk_server_worker_loop(server); |
418 | |
|
419 | 0 | return 0; |
420 | 0 | } |
421 | | |
422 | | /* Create thread which will be listening for incomings requests */ |
423 | | int mk_sched_launch_thread(struct mk_server *server, pthread_t *tout) |
424 | 0 | { |
425 | 0 | pthread_t tid; |
426 | 0 | pthread_attr_t attr; |
427 | 0 | struct mk_sched_thread_conf *thconf; |
428 | |
|
429 | 0 | server->pth_init = MK_FALSE; |
430 | | |
431 | | /* |
432 | | * This lock is used for the 'pth_cond' conditional. Once the worker |
433 | | * thread is ready it will signal the condition. |
434 | | */ |
435 | 0 | pthread_mutex_lock(&server->pth_mutex); |
436 | | |
437 | | /* Thread data */ |
438 | 0 | thconf = mk_mem_alloc_z(sizeof(struct mk_sched_thread_conf)); |
439 | 0 | thconf->server = server; |
440 | |
|
441 | 0 | pthread_attr_init(&attr); |
442 | 0 | pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); |
443 | 0 | if (pthread_create(&tid, &attr, mk_sched_launch_worker_loop, |
444 | 0 | (void *) thconf) != 0) { |
445 | 0 | mk_libc_error("pthread_create"); |
446 | 0 | pthread_mutex_unlock(&server->pth_mutex); |
447 | 0 | return -1; |
448 | 0 | } |
449 | | |
450 | 0 | *tout = tid; |
451 | | |
452 | | /* Block until the child thread is ready */ |
453 | 0 | while (!server->pth_init) { |
454 | 0 | pthread_cond_wait(&server->pth_cond, &server->pth_mutex); |
455 | 0 | } |
456 | |
|
457 | 0 | pthread_mutex_unlock(&server->pth_mutex); |
458 | |
|
459 | 0 | return 0; |
460 | 0 | } |
461 | | |
462 | | /* |
463 | | * The scheduler nodes are an array of struct mk_sched_worker type, |
464 | | * each worker thread belongs to a scheduler node, on this function we |
465 | | * allocate a scheduler node per number of workers defined. |
466 | | */ |
467 | | int mk_sched_init(struct mk_server *server) |
468 | 0 | { |
469 | 0 | int size; |
470 | 0 | struct mk_sched_ctx *ctx; |
471 | |
|
472 | 0 | ctx = mk_mem_alloc_z(sizeof(struct mk_sched_ctx)); |
473 | 0 | if (!ctx) { |
474 | 0 | mk_libc_error("malloc"); |
475 | 0 | return -1; |
476 | 0 | } |
477 | | |
478 | 0 | size = (sizeof(struct mk_sched_worker) * server->workers); |
479 | 0 | ctx->workers = mk_mem_alloc(size); |
480 | 0 | if (!ctx->workers) { |
481 | 0 | mk_libc_error("malloc"); |
482 | 0 | mk_mem_free(ctx); |
483 | 0 | return -1; |
484 | 0 | } |
485 | 0 | memset(ctx->workers, '\0', size); |
486 | | |
487 | | /* Initialize helpers */ |
488 | 0 | pthread_mutex_init(&server->pth_mutex, NULL); |
489 | 0 | pthread_cond_init(&server->pth_cond, NULL); |
490 | 0 | server->pth_init = MK_FALSE; |
491 | | |
492 | | /* Map context into server context */ |
493 | 0 | server->sched_ctx = ctx; |
494 | | |
495 | | /* The mk_thread_prepare call was replaced by mk_http_thread_initialize_tls |
496 | | * which is called earlier. |
497 | | */ |
498 | |
|
499 | 0 | return 0; |
500 | 0 | } |
501 | | |
502 | | int mk_sched_exit(struct mk_server *server) |
503 | 0 | { |
504 | 0 | struct mk_sched_ctx *ctx; |
505 | |
|
506 | 0 | ctx = server->sched_ctx; |
507 | 0 | mk_sched_worker_cb_free(server); |
508 | 0 | mk_mem_free(ctx->workers); |
509 | 0 | mk_mem_free(ctx); |
510 | |
|
511 | 0 | return 0; |
512 | 0 | } |
513 | | |
514 | | void mk_sched_set_request_list(struct rb_root *list) |
515 | 0 | { |
516 | 0 | MK_TLS_SET(mk_tls_sched_cs, list); |
517 | 0 | } |
518 | | |
519 | | int mk_sched_remove_client(struct mk_sched_conn *conn, |
520 | | struct mk_sched_worker *sched, |
521 | | struct mk_server *server) |
522 | 0 | { |
523 | 0 | struct mk_event *event; |
524 | | |
525 | | /* |
526 | | * Close socket and change status: we must invoke mk_epoll_del() |
527 | | * because when the socket is closed is cleaned from the queue by |
528 | | * the Kernel at its leisure, and we may get false events if we rely |
529 | | * on that. |
530 | | */ |
531 | 0 | event = &conn->event; |
532 | 0 | MK_TRACE("[FD %i] Scheduler remove", event->fd); |
533 | |
|
534 | 0 | mk_event_del(sched->loop, event); |
535 | | |
536 | | /* Invoke plugins in stage 50 */ |
537 | 0 | mk_plugin_stage_run_50(event->fd, server); |
538 | |
|
539 | 0 | sched->closed_connections++; |
540 | | |
541 | | /* Unlink from the red-black tree */ |
542 | | //rb_erase(&conn->_rb_head, &sched->rb_queue); |
543 | 0 | mk_sched_conn_timeout_del(conn); |
544 | | |
545 | | /* Close at network layer level */ |
546 | 0 | conn->net->close(conn->net->plugin, event->fd); |
547 | | |
548 | | /* Release and return */ |
549 | 0 | mk_channel_clean(&conn->channel); |
550 | 0 | mk_sched_event_free(&conn->event); |
551 | 0 | conn->status = MK_SCHED_CONN_CLOSED; |
552 | |
|
553 | 0 | MK_LT_SCHED(remote_fd, "DELETE_CLIENT"); |
554 | 0 | return 0; |
555 | 0 | } |
556 | | |
557 | | /* FIXME: nobody is using this function, check back later */ |
558 | | struct mk_sched_conn *mk_sched_get_connection(struct mk_sched_worker *sched, |
559 | | int remote_fd) |
560 | 0 | { |
561 | 0 | (void) sched; |
562 | 0 | (void) remote_fd; |
563 | 0 | return NULL; |
564 | 0 | } |
565 | | |
566 | | /* |
567 | | * For a given connection number, remove all resources associated: it can be |
568 | | * used on any context such as: timeout, I/O errors, request finished, |
569 | | * exceptions, etc. |
570 | | */ |
571 | | int mk_sched_drop_connection(struct mk_sched_conn *conn, |
572 | | struct mk_sched_worker *sched, |
573 | | struct mk_server *server) |
574 | 0 | { |
575 | 0 | mk_sched_threads_destroy_conn(sched, conn); |
576 | 0 | return mk_sched_remove_client(conn, sched, server); |
577 | 0 | } |
578 | | |
579 | | int mk_sched_check_timeouts(struct mk_sched_worker *sched, |
580 | | struct mk_server *server) |
581 | 0 | { |
582 | 0 | int client_timeout; |
583 | 0 | struct mk_sched_conn *conn; |
584 | 0 | struct mk_list *head; |
585 | 0 | struct mk_list *temp; |
586 | | |
587 | | /* PENDING CONN TIMEOUT */ |
588 | 0 | mk_list_foreach_safe(head, temp, &sched->timeout_queue) { |
589 | 0 | conn = mk_list_entry(head, struct mk_sched_conn, timeout_head); |
590 | 0 | if (conn->event.type & MK_EVENT_IDLE) { |
591 | 0 | continue; |
592 | 0 | } |
593 | | |
594 | 0 | client_timeout = conn->arrive_time + server->timeout; |
595 | | |
596 | | /* Check timeout */ |
597 | 0 | if (client_timeout <= server->clock_context->log_current_utime) { |
598 | 0 | MK_TRACE("Scheduler, closing fd %i due TIMEOUT", |
599 | 0 | conn->event.fd); |
600 | 0 | MK_LT_SCHED(conn->event.fd, "TIMEOUT_CONN_PENDING"); |
601 | 0 | conn->protocol->cb_close(conn, sched, MK_SCHED_CONN_TIMEOUT, |
602 | 0 | server); |
603 | 0 | mk_sched_drop_connection(conn, sched, server); |
604 | 0 | } |
605 | 0 | } |
606 | |
|
607 | 0 | return 0; |
608 | 0 | } |
609 | | |
610 | | static int sched_thread_cleanup(struct mk_sched_worker *sched, |
611 | | struct mk_list *list) |
612 | 0 | { |
613 | 0 | int c = 0; |
614 | 0 | struct mk_list *tmp; |
615 | 0 | struct mk_list *head; |
616 | 0 | struct mk_http_thread *mth; |
617 | 0 | (void) sched; |
618 | |
|
619 | 0 | mk_list_foreach_safe(head, tmp, list) { |
620 | 0 | mth = mk_list_entry(head, struct mk_http_thread, _head); |
621 | 0 | mk_http_thread_destroy(mth); |
622 | 0 | c++; |
623 | 0 | } |
624 | |
|
625 | 0 | return c; |
626 | |
|
627 | 0 | } |
628 | | |
629 | | int mk_sched_threads_purge(struct mk_sched_worker *sched) |
630 | 0 | { |
631 | 0 | int c = 0; |
632 | |
|
633 | 0 | c = sched_thread_cleanup(sched, &sched->threads_purge); |
634 | 0 | return c; |
635 | 0 | } |
636 | | |
637 | | int mk_sched_threads_destroy_all(struct mk_sched_worker *sched) |
638 | 0 | { |
639 | 0 | int c = 0; |
640 | |
|
641 | 0 | c = sched_thread_cleanup(sched, &sched->threads_purge); |
642 | 0 | c += sched_thread_cleanup(sched, &sched->threads); |
643 | |
|
644 | 0 | return c; |
645 | 0 | } |
646 | | |
647 | | /* |
648 | | * Destroy the thread contexts associated to the particular |
649 | | * connection. |
650 | | * |
651 | | * Return the number of contexts destroyed. |
652 | | */ |
653 | | int mk_sched_threads_destroy_conn(struct mk_sched_worker *sched, |
654 | | struct mk_sched_conn *conn) |
655 | 0 | { |
656 | 0 | int c = 0; |
657 | 0 | struct mk_list *tmp; |
658 | 0 | struct mk_list *head; |
659 | 0 | struct mk_http_thread *mth; |
660 | 0 | (void) sched; |
661 | |
|
662 | 0 | mk_list_foreach_safe(head, tmp, &sched->threads) { |
663 | 0 | mth = mk_list_entry(head, struct mk_http_thread, _head); |
664 | 0 | if (mth->session->conn == conn) { |
665 | 0 | mk_http_thread_destroy(mth); |
666 | 0 | c++; |
667 | 0 | } |
668 | 0 | } |
669 | 0 | return c; |
670 | 0 | } |
671 | | |
672 | | /* |
673 | | * Scheduler events handler: lookup for event handler and invoke |
674 | | * proper callbacks. |
675 | | */ |
676 | | int mk_sched_event_read(struct mk_sched_conn *conn, |
677 | | struct mk_sched_worker *sched, |
678 | | struct mk_server *server) |
679 | 0 | { |
680 | 0 | int ret = 0; |
681 | |
|
682 | | #ifdef MK_HAVE_TRACE |
683 | | MK_TRACE("[FD %i] Connection Handler / read", conn->event.fd); |
684 | | #endif |
685 | | |
686 | | /* |
687 | | * When the event loop notify that there is some readable information |
688 | | * from the socket, we need to invoke the protocol handler associated |
689 | | * to this connection and also pass as a reference the 'read()' function |
690 | | * that handle 'read I/O' operations, e.g: |
691 | | * |
692 | | * - plain sockets through liana will use just read(2) |
693 | | * - ssl though mbedtls should use mk_mbedtls_read(..) |
694 | | */ |
695 | 0 | ret = conn->protocol->cb_read(conn, sched, server); |
696 | 0 | if (ret == -1) { |
697 | 0 | if (errno == EAGAIN) { |
698 | 0 | MK_TRACE("[FD %i] EAGAIN: need to read more data", conn->event.fd); |
699 | 0 | return 1; |
700 | 0 | } |
701 | 0 | return -1; |
702 | 0 | } |
703 | | |
704 | 0 | return ret; |
705 | 0 | } |
706 | | |
707 | | int mk_sched_event_write(struct mk_sched_conn *conn, |
708 | | struct mk_sched_worker *sched, |
709 | | struct mk_server *server) |
710 | 0 | { |
711 | 0 | int ret = -1; |
712 | 0 | size_t count; |
713 | 0 | struct mk_event *event; |
714 | |
|
715 | 0 | MK_TRACE("[FD %i] Connection Handler / write", conn->event.fd); |
716 | |
|
717 | 0 | ret = mk_channel_write(&conn->channel, &count); |
718 | 0 | if (ret == MK_CHANNEL_FLUSH || ret == MK_CHANNEL_BUSY) { |
719 | 0 | return 0; |
720 | 0 | } |
721 | 0 | else if (ret == MK_CHANNEL_DONE || ret == MK_CHANNEL_EMPTY) { |
722 | 0 | if (conn->protocol->cb_done) { |
723 | 0 | ret = conn->protocol->cb_done(conn, sched, server); |
724 | 0 | } |
725 | 0 | if (ret == -1) { |
726 | 0 | return -1; |
727 | 0 | } |
728 | 0 | else if (ret == 0) { |
729 | 0 | event = &conn->event; |
730 | 0 | mk_event_add(sched->loop, event->fd, |
731 | 0 | MK_EVENT_CONNECTION, |
732 | 0 | MK_EVENT_READ, |
733 | 0 | conn); |
734 | 0 | } |
735 | 0 | return 0; |
736 | 0 | } |
737 | 0 | else if (ret & MK_CHANNEL_ERROR) { |
738 | 0 | return -1; |
739 | 0 | } |
740 | | |
741 | | /* avoid to make gcc cry :_( */ |
742 | 0 | return -1; |
743 | 0 | } |
744 | | |
745 | | int mk_sched_event_close(struct mk_sched_conn *conn, |
746 | | struct mk_sched_worker *sched, |
747 | | int type, struct mk_server *server) |
748 | 0 | { |
749 | 0 | MK_TRACE("[FD %i] Connection Handler, closed", conn->event.fd); |
750 | 0 | mk_event_del(sched->loop, &conn->event); |
751 | |
|
752 | 0 | if (type != MK_EP_SOCKET_DONE) { |
753 | 0 | conn->protocol->cb_close(conn, sched, type, server); |
754 | 0 | } |
755 | | /* |
756 | | * Remove the socket from the scheduler and make sure |
757 | | * to disable all notifications. |
758 | | */ |
759 | 0 | mk_sched_drop_connection(conn, sched, server); |
760 | 0 | return 0; |
761 | 0 | } |
762 | | |
763 | | void mk_sched_event_free(struct mk_event *event) |
764 | 0 | { |
765 | 0 | struct mk_sched_worker *sched = mk_sched_get_thread_conf(); |
766 | |
|
767 | 0 | if ((event->type & MK_EVENT_IDLE) != 0) { |
768 | 0 | return; |
769 | 0 | } |
770 | | |
771 | 0 | event->type |= MK_EVENT_IDLE; |
772 | 0 | mk_list_add(&event->_head, &sched->event_free_queue); |
773 | 0 | } |
774 | | |
775 | | /* Register a new callback function to invoke when a worker is created */ |
776 | | int mk_sched_worker_cb_add(struct mk_server *server, |
777 | | void (*cb_func) (void *), |
778 | | void *data) |
779 | 0 | { |
780 | 0 | struct mk_sched_worker_cb *wcb; |
781 | |
|
782 | 0 | wcb = mk_mem_alloc(sizeof(struct mk_sched_worker_cb)); |
783 | 0 | if (!wcb) { |
784 | 0 | return -1; |
785 | 0 | } |
786 | | |
787 | 0 | wcb->cb_func = cb_func; |
788 | 0 | wcb->data = data; |
789 | 0 | mk_list_add(&wcb->_head, &server->sched_worker_callbacks); |
790 | 0 | return 0; |
791 | 0 | } |
792 | | |
793 | | void mk_sched_worker_cb_free(struct mk_server *server) |
794 | 0 | { |
795 | 0 | struct mk_list *tmp; |
796 | 0 | struct mk_list *head; |
797 | 0 | struct mk_sched_worker_cb *wcb; |
798 | |
|
799 | 0 | mk_list_foreach_safe(head, tmp, &server->sched_worker_callbacks) { |
800 | 0 | wcb = mk_list_entry(head, struct mk_sched_worker_cb, _head); |
801 | 0 | mk_list_del(&wcb->_head); |
802 | 0 | mk_mem_free(wcb); |
803 | 0 | } |
804 | 0 | } |
805 | | |
806 | | int mk_sched_send_signal(struct mk_sched_worker *worker, uint64_t val) |
807 | 0 | { |
808 | 0 | ssize_t n; |
809 | | |
810 | | /* When using libevent _mk_event_channel_create creates a unix socket |
811 | | * instead of a pipe and windows doesn't us calling read / write on a |
812 | | * socket instead of recv / send |
813 | | */ |
814 | |
|
815 | | #ifdef _WIN32 |
816 | | n = send(worker->signal_channel_w, &val, sizeof(uint64_t), 0); |
817 | | #else |
818 | 0 | n = write(worker->signal_channel_w, &val, sizeof(uint64_t)); |
819 | 0 | #endif |
820 | |
|
821 | 0 | if (n < 0) { |
822 | 0 | mk_libc_error("write"); |
823 | |
|
824 | 0 | return 0; |
825 | 0 | } |
826 | | |
827 | 0 | return 1; |
828 | 0 | } |
829 | | |
830 | | int mk_sched_broadcast_signal(struct mk_server *server, uint64_t val) |
831 | 0 | { |
832 | 0 | int i; |
833 | 0 | int count = 0; |
834 | 0 | struct mk_sched_ctx *ctx; |
835 | 0 | struct mk_sched_worker *worker; |
836 | |
|
837 | 0 | ctx = server->sched_ctx; |
838 | 0 | for (i = 0; i < server->workers; i++) { |
839 | 0 | worker = &ctx->workers[i]; |
840 | |
|
841 | 0 | count += mk_sched_send_signal(worker, val); |
842 | 0 | } |
843 | |
|
844 | 0 | return count; |
845 | 0 | } |
846 | | |
847 | | /* |
848 | | * Wait for all workers to finish: this function assumes that previously a |
849 | | * MK_SCHED_SIGNAL_FREE_ALL was sent to the worker channels. |
850 | | */ |
851 | | int mk_sched_workers_join(struct mk_server *server) |
852 | 0 | { |
853 | 0 | int i; |
854 | 0 | int count = 0; |
855 | 0 | struct mk_sched_ctx *ctx; |
856 | 0 | struct mk_sched_worker *worker; |
857 | |
|
858 | 0 | ctx = server->sched_ctx; |
859 | 0 | for (i = 0; i < server->workers; i++) { |
860 | 0 | worker = &ctx->workers[i]; |
861 | 0 | pthread_join(worker->tid, NULL); |
862 | 0 | count++; |
863 | 0 | } |
864 | |
|
865 | 0 | return count; |
866 | 0 | } |