Coverage Report

Created: 2025-09-04 07:51

/src/fluent-bit/lib/monkey/mk_server/mk_scheduler.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Monkey HTTP Server
4
 *  ==================
5
 *  Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io>
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <monkey/monkey.h>
21
#include <monkey/mk_info.h>
22
#include <monkey/mk_core.h>
23
#include <monkey/mk_vhost.h>
24
#include <monkey/mk_scheduler.h>
25
#include <monkey/mk_scheduler_tls.h>
26
#include <monkey/mk_server.h>
27
#include <monkey/mk_thread.h>
28
#include <monkey/mk_cache.h>
29
#include <monkey/mk_config.h>
30
#include <monkey/mk_clock.h>
31
#include <monkey/mk_plugin.h>
32
#include <monkey/mk_utils.h>
33
#include <monkey/mk_linuxtrace.h>
34
#include <monkey/mk_server.h>
35
#include <monkey/mk_plugin_stage.h>
36
#include <monkey/mk_http_thread.h>
37
38
#include <signal.h>
39
40
#ifndef _WIN32
41
#include <sys/syscall.h>
42
#endif
43
44
extern struct mk_sched_handler mk_http_handler;
45
extern struct mk_sched_handler mk_http2_handler;
46
47
pthread_mutex_t mutex_worker_init = PTHREAD_MUTEX_INITIALIZER;
48
pthread_mutex_t mutex_worker_exit = PTHREAD_MUTEX_INITIALIZER;
49
50
/*
51
 * Returns the worker id which should take a new incomming connection,
52
 * it returns the worker id with less active connections. Just used
53
 * if config->scheduler_mode is MK_SCHEDULER_FAIR_BALANCING.
54
 */
55
static inline int _next_target(struct mk_server *server)
56
0
{
57
0
    int i;
58
0
    int target = 0;
59
0
    unsigned long long tmp = 0, cur = 0;
60
0
    struct mk_sched_ctx *ctx = server->sched_ctx;
61
0
    struct mk_sched_worker *worker;
62
63
0
    cur = (ctx->workers[0].accepted_connections -
64
0
           ctx->workers[0].closed_connections);
65
0
    if (cur == 0) {
66
0
        return 0;
67
0
    }
68
69
    /* Finds the lowest load worker */
70
0
    for (i = 1; i < server->workers; i++) {
71
0
        worker = &ctx->workers[i];
72
0
        tmp = worker->accepted_connections - worker->closed_connections;
73
0
        if (tmp < cur) {
74
0
            target = i;
75
0
            cur = tmp;
76
77
0
            if (cur == 0)
78
0
                break;
79
0
        }
80
0
    }
81
82
    /*
83
     * If sched_ctx->workers[target] worker is full then the whole server too,
84
     * because it has the lowest load.
85
     */
86
0
    if (mk_unlikely(server->server_capacity > 0 &&
87
0
                    server->server_capacity <= cur)) {
88
0
        MK_TRACE("Too many clients: %i", server->server_capacity);
89
90
        /* Instruct to close the connection anyways, we lie, it will die */
91
0
        return -1;
92
0
    }
93
94
0
    return target;
95
0
}
96
97
struct mk_sched_worker *mk_sched_next_target(struct mk_server *server)
98
0
{
99
0
    int t;
100
0
    struct mk_sched_ctx *ctx = server->sched_ctx;
101
102
0
    t = _next_target(server);
103
0
    if (mk_likely(t != -1)) {
104
0
        return &ctx->workers[t];
105
0
    }
106
107
0
    return NULL;
108
0
}
109
110
/*
111
 * This function is invoked when the core triggers a MK_SCHED_SIGNAL_FREE_ALL
112
 * event through the signal channels, it means the server will stop working
113
 * so this is the last call to release all memory resources in use. Of course
114
 * this takes place in a thread context.
115
 */
116
void mk_sched_worker_free(struct mk_server *server)
117
0
{
118
0
    int i;
119
0
    pthread_t tid;
120
0
    struct mk_sched_ctx *ctx = server->sched_ctx;
121
0
    struct mk_sched_worker *worker = NULL;
122
123
0
    pthread_mutex_lock(&mutex_worker_exit);
124
125
    /*
126
     * Fix Me: needs to implement API to make plugins release
127
     * their resources first at WORKER LEVEL
128
     */
129
130
    /* External */
131
0
    mk_plugin_exit_worker();
132
0
    mk_vhost_fdt_worker_exit(server);
133
0
    mk_cache_worker_exit();
134
135
    /* Scheduler stuff */
136
0
    tid = pthread_self();
137
0
    for (i = 0; i < server->workers; i++) {
138
0
        worker = &ctx->workers[i];
139
0
        if (worker->tid == tid) {
140
0
            break;
141
0
        }
142
0
        worker = NULL;
143
0
    }
144
145
0
    mk_bug(!worker);
146
147
    /* FIXME!: there is nothing done here with the worker context */
148
149
    /* Free master array (av queue & busy queue) */
150
0
    mk_mem_free(MK_TLS_GET(mk_tls_sched_cs));
151
0
    mk_mem_free(MK_TLS_GET(mk_tls_sched_cs_incomplete));
152
0
    mk_mem_free(MK_TLS_GET(mk_tls_sched_worker_notif));
153
0
    pthread_mutex_unlock(&mutex_worker_exit);
154
0
}
155
156
struct mk_sched_handler *mk_sched_handler_cap(char cap)
157
0
{
158
0
    if (cap == MK_CAP_HTTP) {
159
0
        return &mk_http_handler;
160
0
    }
161
162
#ifdef MK_HAVE_HTTP2
163
    else if (cap == MK_CAP_HTTP2) {
164
        return &mk_http2_handler;
165
    }
166
#endif
167
168
0
    return NULL;
169
0
}
170
171
/*
172
 * Register a new client connection into the scheduler, this call takes place
173
 * inside the worker/thread context.
174
 */
175
struct mk_sched_conn *mk_sched_add_connection(int remote_fd,
176
                                              struct mk_server_listen *listener,
177
                                              struct mk_sched_worker *sched,
178
                                              struct mk_server *server)
179
0
{
180
0
    int ret;
181
0
    int size;
182
0
    struct mk_sched_handler *handler;
183
0
    struct mk_sched_conn *conn;
184
0
    struct mk_event *event;
185
186
    /* Before to continue, we need to run plugin stage 10 */
187
0
    ret = mk_plugin_stage_run_10(remote_fd, server);
188
189
    /* Close connection, otherwise continue */
190
0
    if (ret == MK_PLUGIN_RET_CLOSE_CONX) {
191
0
        listener->network->network->close(listener->network, remote_fd);
192
0
        MK_LT_SCHED(remote_fd, "PLUGIN_CLOSE");
193
0
        return NULL;
194
0
    }
195
196
0
    handler = listener->protocol;
197
0
    if (handler->sched_extra_size > 0) {
198
0
        void *data;
199
0
        size = (sizeof(struct mk_sched_conn) + handler->sched_extra_size);
200
0
        data = mk_mem_alloc_z(size);
201
0
        conn = (struct mk_sched_conn *) data;
202
0
    }
203
0
    else {
204
0
        conn = mk_mem_alloc_z(sizeof(struct mk_sched_conn));
205
0
    }
206
207
0
    if (!conn) {
208
0
        mk_err("[server] Could not register client");
209
0
        return NULL;
210
0
    }
211
212
0
    event = &conn->event;
213
0
    event->fd           = remote_fd;
214
0
    event->type         = MK_EVENT_CONNECTION;
215
0
    event->mask         = MK_EVENT_EMPTY;
216
0
    event->status       = MK_EVENT_NONE;
217
0
    conn->arrive_time   = server->clock_context->log_current_utime;
218
0
    conn->protocol      = handler;
219
0
    conn->net           = listener->network->network;
220
0
    conn->is_timeout_on = MK_FALSE;
221
0
    conn->server_listen = listener;
222
223
    /* Stream channel */
224
0
    conn->channel.type  = MK_CHANNEL_SOCKET;    /* channel type     */
225
0
    conn->channel.fd    = remote_fd;            /* socket conn      */
226
0
    conn->channel.io    = conn->net;            /* network layer    */
227
0
    conn->channel.event = event;                /* parent event ref */
228
0
    mk_list_init(&conn->channel.streams);
229
230
    /*
231
     * Register the connections into the timeout_queue:
232
     *
233
     * When a new connection arrives, we cannot assume it contains some data
234
     * to read, meaning the event loop may not get notifications and the protocol
235
     * handler will never be called. So in order to avoid DDoS we always register
236
     * this session in the timeout_queue for further lookup.
237
     *
238
     * The protocol handler is in charge to remove the session from the
239
     * timeout_queue.
240
     */
241
0
    mk_sched_conn_timeout_add(conn, sched);
242
243
    /* Linux trace message */
244
0
    MK_LT_SCHED(remote_fd, "REGISTERED");
245
246
0
    return conn;
247
0
}
248
249
static void mk_sched_thread_lists_init()
250
0
{
251
0
    struct mk_list *sched_cs_incomplete;
252
253
    /* mk_tls_sched_cs_incomplete */
254
0
    sched_cs_incomplete = mk_mem_alloc(sizeof(struct mk_list));
255
0
    mk_list_init(sched_cs_incomplete);
256
0
    MK_TLS_SET(mk_tls_sched_cs_incomplete, sched_cs_incomplete);
257
0
}
258
259
/* Register thread information. The caller thread is the thread information's owner */
260
static int mk_sched_register_thread(struct mk_server *server)
261
0
{
262
0
    struct mk_sched_ctx *ctx = server->sched_ctx;
263
0
    struct mk_sched_worker *worker;
264
265
    /*
266
     * If this thread slept inside this section, some other thread may touch
267
     * server->worker_id.
268
     * So protect it with a mutex, only one thread may handle server->worker_id.
269
     *
270
     * Note : Let's use the platform agnostic atomics we implemented in cmetrics here
271
     * instead of a lock.
272
     */
273
0
    worker = &ctx->workers[server->worker_id];
274
0
    worker->idx = server->worker_id++;
275
0
    worker->tid = pthread_self();
276
277
0
#if defined(__linux__)
278
    /*
279
     * Under Linux does not exists the difference between process and
280
     * threads, everything is a thread in the kernel task struct, and each
281
     * one has it's own numerical identificator: PID .
282
     *
283
     * Here we want to know what's the PID associated to this running
284
     * task (which is different from parent Monkey PID), it can be
285
     * retrieved with gettid() but Glibc does not export to userspace
286
     * the syscall, we need to call it directly through syscall(2).
287
     */
288
0
    worker->pid = syscall(__NR_gettid);
289
#elif defined(__APPLE__)
290
    uint64_t tid;
291
    pthread_threadid_np(NULL, &tid);
292
    worker->pid = tid;
293
#else
294
    worker->pid = 0xdeadbeef;
295
#endif
296
297
    /* Initialize lists */
298
0
    mk_list_init(&worker->timeout_queue);
299
0
    worker->request_handler = NULL;
300
301
0
    return worker->idx;
302
0
}
303
304
static void mk_signal_thread_sigpipe_safe()
305
0
{
306
0
#ifndef _WIN32
307
0
    sigset_t old;
308
0
    sigset_t set;
309
310
0
    sigemptyset(&set);
311
0
    sigaddset(&set, SIGPIPE);
312
0
    pthread_sigmask(SIG_BLOCK, &set, &old);
313
0
#endif
314
0
}
315
316
/* created thread, all these calls are in the thread context */
317
void *mk_sched_launch_worker_loop(void *data)
318
0
{
319
0
    int ret;
320
0
    int wid;
321
0
    unsigned long len;
322
0
    char *thread_name = 0;
323
0
    struct mk_list *head;
324
0
    struct mk_sched_worker_cb *wcb;
325
0
    struct mk_sched_worker *sched = NULL;
326
0
    struct mk_sched_notif *notif = NULL;
327
0
    struct mk_sched_thread_conf *thinfo = data;
328
0
    struct mk_sched_ctx *ctx;
329
0
    struct mk_server *server;
330
331
0
    server = thinfo->server;
332
0
    ctx = server->sched_ctx;
333
334
    /* Avoid SIGPIPE signals on this thread */
335
0
    mk_signal_thread_sigpipe_safe();
336
337
    /* Init specific thread cache */
338
0
    mk_sched_thread_lists_init();
339
0
    mk_cache_worker_init();
340
341
    /* Virtual hosts: initialize per thread-vhost data */
342
0
    mk_vhost_fdt_worker_init(server);
343
344
    /* Register working thread */
345
0
    wid = mk_sched_register_thread(server);
346
0
    sched = &ctx->workers[wid];
347
0
    sched->loop = mk_event_loop_create(MK_EVENT_QUEUE_SIZE);
348
0
    if (!sched->loop) {
349
0
        mk_err("Error creating Scheduler loop");
350
0
        exit(EXIT_FAILURE);
351
0
    }
352
353
354
0
    sched->mem_pagesize = mk_utils_get_system_page_size();
355
356
    /*
357
     * Create the notification instance and link it to the worker
358
     * thread-scope list.
359
     */
360
0
    notif = mk_mem_alloc_z(sizeof(struct mk_sched_notif));
361
0
    MK_TLS_SET(mk_tls_sched_worker_notif, notif);
362
363
    /* Register the scheduler channel to signal active workers */
364
0
    ret = mk_event_channel_create(sched->loop,
365
0
                                  &sched->signal_channel_r,
366
0
                                  &sched->signal_channel_w,
367
0
                                  notif);
368
0
    if (ret < 0) {
369
0
        exit(EXIT_FAILURE);
370
0
    }
371
372
0
    mk_list_init(&sched->event_free_queue);
373
0
    mk_list_init(&sched->threads);
374
0
    mk_list_init(&sched->threads_purge);
375
376
    /*
377
     * ULONG_MAX BUG test only
378
     * =======================
379
     * to test the workaround we can use the following value:
380
     *
381
     *  thinfo->closed_connections = 1000;
382
     */
383
384
    //thinfo->ctx = thconf->ctx;
385
386
    /* Rename worker */
387
0
    mk_string_build(&thread_name, &len, "monkey: wrk/%i", sched->idx);
388
0
    mk_utils_worker_rename(thread_name);
389
0
    mk_mem_free(thread_name);
390
391
    /* Export known scheduler node to context thread */
392
0
    MK_TLS_SET(mk_tls_sched_worker_node, sched);
393
0
    mk_plugin_core_thread(server);
394
395
0
    if (server->scheduler_mode == MK_SCHEDULER_REUSEPORT) {
396
0
        sched->listeners = mk_server_listen_init(server);
397
0
        if (!sched->listeners) {
398
0
            exit(EXIT_FAILURE);
399
0
        }
400
0
    }
401
402
    /* Unlock the conditional initializator */
403
0
    pthread_mutex_lock(&server->pth_mutex);
404
0
    server->pth_init = MK_TRUE;
405
0
    pthread_cond_signal(&server->pth_cond);
406
0
    pthread_mutex_unlock(&server->pth_mutex);
407
408
    /* Invoke custom worker-callbacks defined by the scheduler (lib) */
409
0
    mk_list_foreach(head, &server->sched_worker_callbacks) {
410
0
        wcb = mk_list_entry(head, struct mk_sched_worker_cb, _head);
411
0
        wcb->cb_func(wcb->data);
412
0
    }
413
414
0
    mk_mem_free(thinfo);
415
416
    /* init server thread loop */
417
0
    mk_server_worker_loop(server);
418
419
0
    return 0;
420
0
}
421
422
/* Create thread which will be listening for incomings requests */
423
int mk_sched_launch_thread(struct mk_server *server, pthread_t *tout)
424
0
{
425
0
    pthread_t tid;
426
0
    pthread_attr_t attr;
427
0
    struct mk_sched_thread_conf *thconf;
428
429
0
    server->pth_init = MK_FALSE;
430
431
    /*
432
     * This lock is used for the 'pth_cond' conditional. Once the worker
433
     * thread is ready it will signal the condition.
434
     */
435
0
    pthread_mutex_lock(&server->pth_mutex);
436
437
    /* Thread data */
438
0
    thconf = mk_mem_alloc_z(sizeof(struct mk_sched_thread_conf));
439
0
    thconf->server = server;
440
441
0
    pthread_attr_init(&attr);
442
0
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
443
0
    if (pthread_create(&tid, &attr, mk_sched_launch_worker_loop,
444
0
                       (void *) thconf) != 0) {
445
0
        mk_libc_error("pthread_create");
446
0
        pthread_mutex_unlock(&server->pth_mutex);
447
0
        return -1;
448
0
    }
449
450
0
    *tout = tid;
451
452
    /* Block until the child thread is ready */
453
0
    while (!server->pth_init) {
454
0
        pthread_cond_wait(&server->pth_cond, &server->pth_mutex);
455
0
    }
456
457
0
    pthread_mutex_unlock(&server->pth_mutex);
458
459
0
    return 0;
460
0
}
461
462
/*
463
 * The scheduler nodes are an array of struct mk_sched_worker type,
464
 * each worker thread belongs to a scheduler node, on this function we
465
 * allocate a scheduler node per number of workers defined.
466
 */
467
int mk_sched_init(struct mk_server *server)
468
0
{
469
0
    int size;
470
0
    struct mk_sched_ctx *ctx;
471
472
0
    ctx = mk_mem_alloc_z(sizeof(struct mk_sched_ctx));
473
0
    if (!ctx) {
474
0
        mk_libc_error("malloc");
475
0
        return -1;
476
0
    }
477
478
0
    size = (sizeof(struct mk_sched_worker) * server->workers);
479
0
    ctx->workers = mk_mem_alloc(size);
480
0
    if (!ctx->workers) {
481
0
        mk_libc_error("malloc");
482
0
        mk_mem_free(ctx);
483
0
        return -1;
484
0
    }
485
0
    memset(ctx->workers, '\0', size);
486
487
    /* Initialize helpers */
488
0
    pthread_mutex_init(&server->pth_mutex, NULL);
489
0
    pthread_cond_init(&server->pth_cond, NULL);
490
0
    server->pth_init = MK_FALSE;
491
492
    /* Map context into server context */
493
0
    server->sched_ctx = ctx;
494
495
    /* The mk_thread_prepare call was replaced by mk_http_thread_initialize_tls
496
     * which is called earlier.
497
     */
498
499
0
    return 0;
500
0
}
501
502
int mk_sched_exit(struct mk_server *server)
503
0
{
504
0
    struct mk_sched_ctx *ctx;
505
506
0
    ctx = server->sched_ctx;
507
0
    mk_sched_worker_cb_free(server);
508
0
    mk_mem_free(ctx->workers);
509
0
    mk_mem_free(ctx);
510
511
0
    return 0;
512
0
}
513
514
void mk_sched_set_request_list(struct rb_root *list)
515
0
{
516
0
    MK_TLS_SET(mk_tls_sched_cs, list);
517
0
}
518
519
int mk_sched_remove_client(struct mk_sched_conn *conn,
520
                           struct mk_sched_worker *sched,
521
                           struct mk_server *server)
522
0
{
523
0
    struct mk_event *event;
524
525
    /*
526
     * Close socket and change status: we must invoke mk_epoll_del()
527
     * because when the socket is closed is cleaned from the queue by
528
     * the Kernel at its leisure, and we may get false events if we rely
529
     * on that.
530
     */
531
0
    event = &conn->event;
532
0
    MK_TRACE("[FD %i] Scheduler remove", event->fd);
533
534
0
    mk_event_del(sched->loop, event);
535
536
    /* Invoke plugins in stage 50 */
537
0
    mk_plugin_stage_run_50(event->fd, server);
538
539
0
    sched->closed_connections++;
540
541
    /* Unlink from the red-black tree */
542
    //rb_erase(&conn->_rb_head, &sched->rb_queue);
543
0
    mk_sched_conn_timeout_del(conn);
544
545
    /* Close at network layer level */
546
0
    conn->net->close(conn->net->plugin, event->fd);
547
548
    /* Release and return */
549
0
    mk_channel_clean(&conn->channel);
550
0
    mk_sched_event_free(&conn->event);
551
0
    conn->status = MK_SCHED_CONN_CLOSED;
552
553
0
    MK_LT_SCHED(remote_fd, "DELETE_CLIENT");
554
0
    return 0;
555
0
}
556
557
/* FIXME: nobody is using this function, check back later */
558
struct mk_sched_conn *mk_sched_get_connection(struct mk_sched_worker *sched,
559
                                                 int remote_fd)
560
0
{
561
0
    (void) sched;
562
0
    (void) remote_fd;
563
0
    return NULL;
564
0
}
565
566
/*
567
 * For a given connection number, remove all resources associated: it can be
568
 * used on any context such as: timeout, I/O errors, request finished,
569
 * exceptions, etc.
570
 */
571
int mk_sched_drop_connection(struct mk_sched_conn *conn,
572
                             struct mk_sched_worker *sched,
573
                             struct mk_server *server)
574
0
{
575
0
    mk_sched_threads_destroy_conn(sched, conn);
576
0
    return mk_sched_remove_client(conn, sched, server);
577
0
}
578
579
int mk_sched_check_timeouts(struct mk_sched_worker *sched,
580
                            struct mk_server *server)
581
0
{
582
0
    int client_timeout;
583
0
    struct mk_sched_conn *conn;
584
0
    struct mk_list *head;
585
0
    struct mk_list *temp;
586
587
    /* PENDING CONN TIMEOUT */
588
0
    mk_list_foreach_safe(head, temp, &sched->timeout_queue) {
589
0
        conn = mk_list_entry(head, struct mk_sched_conn, timeout_head);
590
0
        if (conn->event.type & MK_EVENT_IDLE) {
591
0
            continue;
592
0
        }
593
594
0
        client_timeout = conn->arrive_time + server->timeout;
595
596
        /* Check timeout */
597
0
        if (client_timeout <= server->clock_context->log_current_utime) {
598
0
            MK_TRACE("Scheduler, closing fd %i due TIMEOUT",
599
0
                     conn->event.fd);
600
0
            MK_LT_SCHED(conn->event.fd, "TIMEOUT_CONN_PENDING");
601
0
            conn->protocol->cb_close(conn, sched, MK_SCHED_CONN_TIMEOUT,
602
0
                                     server);
603
0
            mk_sched_drop_connection(conn, sched, server);
604
0
        }
605
0
    }
606
607
0
    return 0;
608
0
}
609
610
static int sched_thread_cleanup(struct mk_sched_worker *sched,
611
                                struct mk_list *list)
612
0
{
613
0
    int c = 0;
614
0
    struct mk_list *tmp;
615
0
    struct mk_list *head;
616
0
    struct mk_http_thread *mth;
617
0
    (void) sched;
618
619
0
    mk_list_foreach_safe(head, tmp, list) {
620
0
        mth = mk_list_entry(head, struct mk_http_thread, _head);
621
0
        mk_http_thread_destroy(mth);
622
0
        c++;
623
0
    }
624
625
0
    return c;
626
627
0
}
628
629
int mk_sched_threads_purge(struct mk_sched_worker *sched)
630
0
{
631
0
    int c = 0;
632
633
0
    c = sched_thread_cleanup(sched, &sched->threads_purge);
634
0
    return c;
635
0
}
636
637
int mk_sched_threads_destroy_all(struct mk_sched_worker *sched)
638
0
{
639
0
    int c = 0;
640
641
0
    c = sched_thread_cleanup(sched, &sched->threads_purge);
642
0
    c += sched_thread_cleanup(sched, &sched->threads);
643
644
0
    return c;
645
0
}
646
647
/*
648
 * Destroy the thread contexts associated to the particular
649
 * connection.
650
 *
651
 * Return the number of contexts destroyed.
652
 */
653
int mk_sched_threads_destroy_conn(struct mk_sched_worker *sched,
654
                                  struct mk_sched_conn *conn)
655
0
{
656
0
    int c = 0;
657
0
    struct mk_list *tmp;
658
0
    struct mk_list *head;
659
0
    struct mk_http_thread *mth;
660
0
    (void) sched;
661
662
0
    mk_list_foreach_safe(head, tmp, &sched->threads) {
663
0
        mth = mk_list_entry(head, struct mk_http_thread, _head);
664
0
        if (mth->session->conn == conn) {
665
0
            mk_http_thread_destroy(mth);
666
0
            c++;
667
0
        }
668
0
    }
669
0
    return c;
670
0
}
671
672
/*
673
 * Scheduler events handler: lookup for event handler and invoke
674
 * proper callbacks.
675
 */
676
int mk_sched_event_read(struct mk_sched_conn *conn,
677
                        struct mk_sched_worker *sched,
678
                        struct mk_server *server)
679
0
{
680
0
    int ret = 0;
681
682
#ifdef MK_HAVE_TRACE
683
    MK_TRACE("[FD %i] Connection Handler / read", conn->event.fd);
684
#endif
685
686
    /*
687
     * When the event loop notify that there is some readable information
688
     * from the socket, we need to invoke the protocol handler associated
689
     * to this connection and also pass as a reference the 'read()' function
690
     * that handle 'read I/O' operations, e.g:
691
     *
692
     *  - plain sockets through liana will use just read(2)
693
     *  - ssl though mbedtls should use mk_mbedtls_read(..)
694
     */
695
0
    ret = conn->protocol->cb_read(conn, sched, server);
696
0
    if (ret == -1) {
697
0
        if (errno == EAGAIN) {
698
0
            MK_TRACE("[FD %i] EAGAIN: need to read more data", conn->event.fd);
699
0
            return 1;
700
0
        }
701
0
        return -1;
702
0
    }
703
704
0
    return ret;
705
0
}
706
707
int mk_sched_event_write(struct mk_sched_conn *conn,
708
                         struct mk_sched_worker *sched,
709
                         struct mk_server *server)
710
0
{
711
0
    int ret = -1;
712
0
    size_t count;
713
0
    struct mk_event *event;
714
715
0
    MK_TRACE("[FD %i] Connection Handler / write", conn->event.fd);
716
717
0
    ret = mk_channel_write(&conn->channel, &count);
718
0
    if (ret == MK_CHANNEL_FLUSH || ret == MK_CHANNEL_BUSY) {
719
0
        return 0;
720
0
    }
721
0
    else if (ret == MK_CHANNEL_DONE || ret == MK_CHANNEL_EMPTY) {
722
0
        if (conn->protocol->cb_done) {
723
0
            ret = conn->protocol->cb_done(conn, sched, server);
724
0
        }
725
0
        if (ret == -1) {
726
0
            return -1;
727
0
        }
728
0
        else if (ret == 0) {
729
0
            event = &conn->event;
730
0
            mk_event_add(sched->loop, event->fd,
731
0
                         MK_EVENT_CONNECTION,
732
0
                         MK_EVENT_READ,
733
0
                         conn);
734
0
        }
735
0
        return 0;
736
0
    }
737
0
    else if (ret & MK_CHANNEL_ERROR) {
738
0
        return -1;
739
0
    }
740
741
    /* avoid to make gcc cry :_( */
742
0
    return -1;
743
0
}
744
745
int mk_sched_event_close(struct mk_sched_conn *conn,
746
                         struct mk_sched_worker *sched,
747
                         int type, struct mk_server *server)
748
0
{
749
0
    MK_TRACE("[FD %i] Connection Handler, closed", conn->event.fd);
750
0
    mk_event_del(sched->loop, &conn->event);
751
752
0
    if (type != MK_EP_SOCKET_DONE) {
753
0
        conn->protocol->cb_close(conn, sched, type, server);
754
0
    }
755
    /*
756
     * Remove the socket from the scheduler and make sure
757
     * to disable all notifications.
758
     */
759
0
    mk_sched_drop_connection(conn, sched, server);
760
0
    return 0;
761
0
}
762
763
void mk_sched_event_free(struct mk_event *event)
764
0
{
765
0
    struct mk_sched_worker *sched = mk_sched_get_thread_conf();
766
767
0
    if ((event->type & MK_EVENT_IDLE) != 0) {
768
0
        return;
769
0
    }
770
771
0
    event->type |= MK_EVENT_IDLE;
772
0
    mk_list_add(&event->_head, &sched->event_free_queue);
773
0
}
774
775
/* Register a new callback function to invoke when a worker is created */
776
int mk_sched_worker_cb_add(struct mk_server *server,
777
                           void (*cb_func) (void *),
778
                           void *data)
779
0
{
780
0
    struct mk_sched_worker_cb *wcb;
781
782
0
    wcb = mk_mem_alloc(sizeof(struct mk_sched_worker_cb));
783
0
    if (!wcb) {
784
0
        return -1;
785
0
    }
786
787
0
    wcb->cb_func = cb_func;
788
0
    wcb->data    = data;
789
0
    mk_list_add(&wcb->_head, &server->sched_worker_callbacks);
790
0
    return 0;
791
0
}
792
793
void mk_sched_worker_cb_free(struct mk_server *server)
794
0
{
795
0
    struct mk_list *tmp;
796
0
    struct mk_list *head;
797
0
    struct mk_sched_worker_cb *wcb;
798
799
0
    mk_list_foreach_safe(head, tmp, &server->sched_worker_callbacks) {
800
0
        wcb = mk_list_entry(head, struct mk_sched_worker_cb, _head);
801
0
        mk_list_del(&wcb->_head);
802
0
        mk_mem_free(wcb);
803
0
    }
804
0
}
805
806
int mk_sched_send_signal(struct mk_sched_worker *worker, uint64_t val)
807
0
{
808
0
    ssize_t n;
809
810
    /* When using libevent _mk_event_channel_create creates a unix socket
811
     * instead of a pipe and windows doesn't us calling read / write on a
812
     * socket instead of recv / send
813
     */
814
815
#ifdef _WIN32
816
    n = send(worker->signal_channel_w, &val, sizeof(uint64_t), 0);
817
#else
818
0
    n = write(worker->signal_channel_w, &val, sizeof(uint64_t));
819
0
#endif
820
821
0
    if (n < 0) {
822
0
        mk_libc_error("write");
823
824
0
        return 0;
825
0
    }
826
827
0
    return 1;
828
0
}
829
830
int mk_sched_broadcast_signal(struct mk_server *server, uint64_t val)
831
0
{
832
0
    int i;
833
0
    int count = 0;
834
0
    struct mk_sched_ctx *ctx;
835
0
    struct mk_sched_worker *worker;
836
837
0
    ctx = server->sched_ctx;
838
0
    for (i = 0; i < server->workers; i++) {
839
0
        worker = &ctx->workers[i];
840
841
0
        count += mk_sched_send_signal(worker, val);
842
0
    }
843
844
0
    return count;
845
0
}
846
847
/*
848
 * Wait for all workers to finish: this function assumes that previously a
849
 * MK_SCHED_SIGNAL_FREE_ALL was sent to the worker channels.
850
 */
851
int mk_sched_workers_join(struct mk_server *server)
852
0
{
853
0
    int i;
854
0
    int count = 0;
855
0
    struct mk_sched_ctx *ctx;
856
0
    struct mk_sched_worker *worker;
857
858
0
    ctx = server->sched_ctx;
859
0
    for (i = 0; i < server->workers; i++) {
860
0
        worker = &ctx->workers[i];
861
0
        pthread_join(worker->tid, NULL);
862
0
        count++;
863
0
    }
864
865
0
    return count;
866
0
}