Coverage Report

Created: 2025-07-11 06:49

/src/unit/src/nxt_port.c
Line
Count
Source (jump to first uncovered line)
1
2
/*
3
 * Copyright (C) Igor Sysoev
4
 * Copyright (C) NGINX, Inc.
5
 */
6
7
#include <nxt_main.h>
8
#include <nxt_runtime.h>
9
#include <nxt_port.h>
10
#include <nxt_router.h>
11
#include <nxt_app_queue.h>
12
#include <nxt_port_queue.h>
13
14
15
static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
16
    nxt_pid_t pid);
17
static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
18
19
static nxt_atomic_uint_t nxt_port_last_id = 1;
20
21
22
static void
23
nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
24
0
{
25
0
    nxt_mp_t    *mp;
26
0
    nxt_port_t  *port;
27
28
0
    port = obj;
29
0
    mp = data;
30
31
0
    nxt_assert(port->pair[0] == -1);
32
0
    nxt_assert(port->pair[1] == -1);
33
34
0
    nxt_assert(port->use_count == 0);
35
0
    nxt_assert(port->app_link.next == NULL);
36
0
    nxt_assert(port->idle_link.next == NULL);
37
38
0
    nxt_assert(nxt_queue_is_empty(&port->messages));
39
0
    nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
40
0
    nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
41
42
0
    nxt_thread_mutex_destroy(&port->write_mutex);
43
44
0
    nxt_mp_free(mp, port);
45
0
}
46
47
48
nxt_port_t *
49
nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
50
    nxt_process_type_t type)
51
0
{
52
0
    nxt_mp_t    *mp;
53
0
    nxt_port_t  *port;
54
55
0
    mp = nxt_mp_create(1024, 128, 256, 32);
56
57
0
    if (nxt_slow_path(mp == NULL)) {
58
0
        return NULL;
59
0
    }
60
61
0
    port = nxt_mp_zalloc(mp, sizeof(nxt_port_t));
62
63
0
    if (nxt_fast_path(port != NULL)) {
64
0
        port->id = id;
65
0
        port->pid = pid;
66
0
        port->type = type;
67
0
        port->mem_pool = mp;
68
0
        port->use_count = 1;
69
70
0
        nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
71
72
0
        nxt_queue_init(&port->messages);
73
0
        nxt_thread_mutex_create(&port->write_mutex);
74
75
0
        port->queue_fd = -1;
76
77
0
    } else {
78
0
        nxt_mp_destroy(mp);
79
0
    }
80
81
0
    nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
82
83
0
    return port;
84
0
}
85
86
87
void
88
nxt_port_close(nxt_task_t *task, nxt_port_t *port)
89
0
{
90
0
    size_t  size;
91
92
0
    nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
93
0
              port->id, port->type);
94
95
0
    if (port->pair[0] != -1) {
96
0
        nxt_port_rpc_close(task, port);
97
98
0
        nxt_fd_close(port->pair[0]);
99
0
        port->pair[0] = -1;
100
0
    }
101
102
0
    if (port->pair[1] != -1) {
103
0
        nxt_fd_close(port->pair[1]);
104
0
        port->pair[1] = -1;
105
106
0
        if (port->app != NULL) {
107
0
            nxt_router_app_port_close(task, port);
108
0
        }
109
0
    }
110
111
0
    if (port->queue_fd != -1) {
112
0
        nxt_fd_close(port->queue_fd);
113
0
        port->queue_fd = -1;
114
0
    }
115
116
0
    if (port->queue != NULL) {
117
0
        size = (port->id == (nxt_port_id_t) -1) ? sizeof(nxt_app_queue_t)
118
0
                                                : sizeof(nxt_port_queue_t);
119
0
        nxt_mem_munmap(port->queue, size);
120
121
0
        port->queue = NULL;
122
0
    }
123
0
}
124
125
126
static void
127
nxt_port_release(nxt_task_t *task, nxt_port_t *port)
128
0
{
129
0
    nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
130
0
              port->id, port->type);
131
132
0
    port->app = NULL;
133
134
0
    if (port->link.next != NULL) {
135
0
        nxt_assert(port->process != NULL);
136
137
0
        nxt_process_port_remove(port);
138
139
0
        nxt_process_use(task, port->process, -1);
140
0
    }
141
142
0
    nxt_mp_release(port->mem_pool);
143
0
}
144
145
146
nxt_port_id_t
147
nxt_port_get_next_id(void)
148
0
{
149
0
    return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
150
0
}
151
152
153
void
154
nxt_port_reset_next_id(void)
155
0
{
156
0
    nxt_port_last_id = 1;
157
0
}
158
159
160
void
161
nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
162
    const nxt_port_handlers_t *handlers)
163
0
{
164
0
    port->pid = nxt_pid;
165
0
    port->handler = nxt_port_handler;
166
0
    port->data = (nxt_port_handler_t *) (handlers);
167
168
0
    nxt_port_read_enable(task, port);
169
0
}
170
171
172
static void
173
nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
174
0
{
175
0
    nxt_port_handler_t  *handlers;
176
177
0
    if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
178
179
0
        nxt_debug(task, "port %d: message type:%uD fds:%d,%d",
180
0
                  msg->port->socket.fd, msg->port_msg.type,
181
0
                  msg->fd[0], msg->fd[1]);
182
183
0
        handlers = msg->port->data;
184
0
        handlers[msg->port_msg.type](task, msg);
185
186
0
        return;
187
0
    }
188
189
0
    nxt_alert(task, "port %d: unknown message type:%uD",
190
0
              msg->port->socket.fd, msg->port_msg.type);
191
0
}
192
193
194
void
195
nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
196
0
{
197
0
    nxt_runtime_quit(task, 0);
198
0
}
199
200
201
/* TODO join with process_ready and move to nxt_main_process.c */
202
nxt_inline void
203
nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
204
    nxt_port_t *new_port, uint32_t stream)
205
0
{
206
0
    nxt_port_t     *port;
207
0
    nxt_process_t  *process;
208
209
0
    nxt_debug(task, "new port %d for process %PI",
210
0
              new_port->pair[1], new_port->pid);
211
212
0
    nxt_runtime_process_each(rt, process) {
213
214
0
        if (process->pid == new_port->pid || process->pid == nxt_pid) {
215
0
            continue;
216
0
        }
217
218
0
        port = nxt_process_port_first(process);
219
220
0
        if (nxt_proc_send_matrix[port->type][new_port->type]) {
221
0
            (void) nxt_port_send_port(task, port, new_port, stream);
222
0
        }
223
224
0
    } nxt_runtime_process_loop;
225
0
}
226
227
228
nxt_int_t
229
nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
230
    uint32_t stream)
231
0
{
232
0
    nxt_buf_t                *b;
233
0
    nxt_port_msg_new_port_t  *msg;
234
235
0
    b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
236
0
                             sizeof(nxt_port_data_t));
237
0
    if (nxt_slow_path(b == NULL)) {
238
0
        return NXT_ERROR;
239
0
    }
240
241
0
    nxt_debug(task, "send port %FD to process %PI",
242
0
              new_port->pair[1], port->pid);
243
244
0
    b->mem.free += sizeof(nxt_port_msg_new_port_t);
245
0
    msg = (nxt_port_msg_new_port_t *) b->mem.pos;
246
247
0
    msg->id = new_port->id;
248
0
    msg->pid = new_port->pid;
249
0
    msg->max_size = port->max_size;
250
0
    msg->max_share = port->max_share;
251
0
    msg->type = new_port->type;
252
253
0
    return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT,
254
0
                                  new_port->pair[1], new_port->queue_fd,
255
0
                                  stream, 0, b);
256
0
}
257
258
259
void
260
nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
261
0
{
262
0
    nxt_port_t               *port;
263
0
    nxt_runtime_t            *rt;
264
0
    nxt_port_msg_new_port_t  *new_port_msg;
265
266
0
    rt = task->thread->runtime;
267
268
0
    new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
269
270
    /* TODO check b size and make plain */
271
272
0
    nxt_debug(task, "new port %d received for process %PI:%d",
273
0
              msg->fd[0], new_port_msg->pid, new_port_msg->id);
274
275
0
    port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
276
0
    if (port != NULL) {
277
0
        nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
278
0
              new_port_msg->id);
279
280
0
        msg->u.new_port = port;
281
282
0
        nxt_fd_close(msg->fd[0]);
283
0
        msg->fd[0] = -1;
284
0
        return;
285
0
    }
286
287
0
    port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
288
0
                                           new_port_msg->id,
289
0
                                           new_port_msg->type);
290
0
    if (nxt_slow_path(port == NULL)) {
291
0
        return;
292
0
    }
293
294
0
    nxt_fd_nonblocking(task, msg->fd[0]);
295
296
0
    port->pair[0] = -1;
297
0
    port->pair[1] = msg->fd[0];
298
0
    port->max_size = new_port_msg->max_size;
299
0
    port->max_share = new_port_msg->max_share;
300
301
0
    port->socket.task = task;
302
303
0
    nxt_port_write_enable(task, port);
304
305
0
    msg->u.new_port = port;
306
0
}
307
308
/* TODO move to nxt_main_process.c */
309
void
310
nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
311
0
{
312
0
    nxt_port_t     *port;
313
0
    nxt_process_t  *process;
314
0
    nxt_runtime_t  *rt;
315
316
0
    rt = task->thread->runtime;
317
318
0
    process = nxt_runtime_process_find(rt, msg->port_msg.pid);
319
0
    if (nxt_slow_path(process == NULL)) {
320
0
        return;
321
0
    }
322
323
0
    nxt_assert(process->state != NXT_PROCESS_STATE_READY);
324
325
0
    process->state = NXT_PROCESS_STATE_READY;
326
327
0
    nxt_assert(!nxt_queue_is_empty(&process->ports));
328
329
0
    port = nxt_process_port_first(process);
330
331
0
    nxt_debug(task, "process %PI ready", msg->port_msg.pid);
332
333
0
    if (msg->fd[0] != -1) {
334
0
        port->queue_fd = msg->fd[0];
335
0
        port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
336
0
                                   PROT_READ | PROT_WRITE, MAP_SHARED,
337
0
                                   msg->fd[0], 0);
338
0
    }
339
340
0
    nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
341
0
}
342
343
344
void
345
nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
346
0
{
347
0
    nxt_runtime_t  *rt;
348
0
    nxt_process_t  *process;
349
350
0
    rt = task->thread->runtime;
351
352
0
    if (nxt_slow_path(msg->fd[0] == -1)) {
353
0
        nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
354
355
0
        return;
356
0
    }
357
358
0
    process = nxt_runtime_process_find(rt, msg->port_msg.pid);
359
0
    if (nxt_slow_path(process == NULL)) {
360
0
        nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
361
0
                msg->port_msg.pid);
362
363
0
        goto fail_close;
364
0
    }
365
366
0
    nxt_port_incoming_port_mmap(task, process, msg->fd[0]);
367
368
0
fail_close:
369
370
0
    nxt_fd_close(msg->fd[0]);
371
0
}
372
373
374
void
375
nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
376
    nxt_fd_t fd)
377
0
{
378
0
    nxt_buf_t      *b;
379
0
    nxt_port_t     *port;
380
0
    nxt_process_t  *process;
381
382
0
    nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
383
384
0
    nxt_runtime_process_each(rt, process) {
385
386
0
        if (nxt_pid == process->pid) {
387
0
            continue;
388
0
        }
389
390
0
        port = nxt_process_port_first(process);
391
392
0
        b = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
393
0
                              sizeof(nxt_uint_t), 0);
394
0
        if (nxt_slow_path(b == NULL)) {
395
0
            continue;
396
0
        }
397
398
0
        b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t));
399
400
0
        (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
401
0
                                     fd, 0, 0, b);
402
403
0
    } nxt_runtime_process_loop;
404
0
}
405
406
407
void
408
nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
409
0
{
410
0
    nxt_buf_t      *b;
411
0
    nxt_uint_t     slot;
412
0
    nxt_file_t     *log_file;
413
0
    nxt_runtime_t  *rt;
414
415
0
    rt = task->thread->runtime;
416
417
0
    b = msg->buf;
418
0
    slot = *(nxt_uint_t *) b->mem.pos;
419
420
0
    log_file = nxt_list_elt(rt->log_files, slot);
421
422
0
    nxt_debug(task, "change log file %FD:%FD", msg->fd[0], log_file->fd);
423
424
    /*
425
     * The old log file descriptor must be closed at the moment when no
426
     * other threads use it.  dup2() allows to use the old file descriptor
427
     * for new log file.  This change is performed atomically in the kernel.
428
     */
429
0
    if (nxt_file_redirect(log_file, msg->fd[0]) == NXT_OK) {
430
0
        if (slot == 0) {
431
0
            (void) nxt_file_stderr(log_file);
432
0
        }
433
0
    }
434
0
}
435
436
437
void
438
nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
439
0
{
440
0
    size_t     dump_size;
441
0
    nxt_buf_t  *b;
442
443
0
    b = msg->buf;
444
0
    dump_size = b->mem.free - b->mem.pos;
445
446
0
    if (dump_size > 300) {
447
0
        dump_size = 300;
448
0
    }
449
450
0
    nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
451
0
}
452
453
454
void
455
nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process)
456
0
{
457
0
    nxt_pid_t           pid;
458
0
    nxt_buf_t           *buf;
459
0
    nxt_port_t          *port;
460
0
    nxt_runtime_t       *rt;
461
0
    nxt_process_t       *p;
462
0
    nxt_process_type_t  ptype;
463
464
0
    pid = process->pid;
465
466
0
    ptype = nxt_process_type(process);
467
468
0
    rt = task->thread->runtime;
469
470
0
    nxt_runtime_process_each(rt, p) {
471
472
0
        if (p->pid == nxt_pid
473
0
            || p->pid == pid
474
0
            || nxt_queue_is_empty(&p->ports))
475
0
        {
476
0
            continue;
477
0
        }
478
479
0
        port = nxt_process_port_first(p);
480
481
0
        if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) {
482
0
            continue;
483
0
        }
484
485
0
        buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
486
0
                                   sizeof(pid));
487
488
0
        if (nxt_slow_path(buf == NULL)) {
489
0
            continue;
490
0
        }
491
492
0
        buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
493
494
0
        nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
495
0
                              process->stream, 0, buf);
496
497
0
    } nxt_runtime_process_loop;
498
0
}
499
500
501
void
502
nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
503
0
{
504
0
    nxt_pid_t  pid;
505
0
    nxt_buf_t  *buf;
506
507
0
    buf = msg->buf;
508
509
0
    nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
510
511
0
    nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t));
512
513
0
    nxt_port_remove_pid(task, msg, pid);
514
0
}
515
516
517
static void
518
nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
519
    nxt_pid_t pid)
520
0
{
521
0
    nxt_runtime_t  *rt;
522
0
    nxt_process_t  *process;
523
524
0
    msg->u.removed_pid = pid;
525
526
0
    nxt_debug(task, "port remove pid %PI handler", pid);
527
528
0
    rt = task->thread->runtime;
529
530
0
    nxt_port_rpc_remove_peer(task, msg->port, pid);
531
532
0
    process = nxt_runtime_process_find(rt, pid);
533
534
0
    if (process) {
535
0
        nxt_process_close_ports(task, process);
536
0
    }
537
0
}
538
539
540
void
541
nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
542
0
{
543
0
    nxt_debug(task, "port empty handler");
544
0
}
545
546
547
typedef struct {
548
    nxt_work_t               work;
549
    nxt_port_t               *port;
550
    nxt_port_post_handler_t  handler;
551
} nxt_port_work_t;
552
553
554
static void
555
nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
556
0
{
557
0
    nxt_port_t               *port;
558
0
    nxt_port_work_t          *pw;
559
0
    nxt_port_post_handler_t  handler;
560
561
0
    pw = obj;
562
0
    port = pw->port;
563
0
    handler = pw->handler;
564
565
0
    nxt_free(pw);
566
567
0
    handler(task, port, data);
568
569
0
    nxt_port_use(task, port, -1);
570
0
}
571
572
573
nxt_int_t
574
nxt_port_post(nxt_task_t *task, nxt_port_t *port,
575
    nxt_port_post_handler_t handler, void *data)
576
0
{
577
0
    nxt_port_work_t  *pw;
578
579
0
    if (task->thread->engine == port->engine) {
580
0
        handler(task, port, data);
581
582
0
        return NXT_OK;
583
0
    }
584
585
0
    pw = nxt_zalloc(sizeof(nxt_port_work_t));
586
587
0
    if (nxt_slow_path(pw == NULL)) {
588
0
        return NXT_ERROR;
589
0
    }
590
591
0
    nxt_atomic_fetch_add(&port->use_count, 1);
592
593
0
    pw->work.handler = nxt_port_post_handler;
594
0
    pw->work.task = &port->engine->task;
595
0
    pw->work.obj = pw;
596
0
    pw->work.data = data;
597
598
0
    pw->port = port;
599
0
    pw->handler = handler;
600
601
0
    nxt_event_engine_post(port->engine, &pw->work);
602
603
0
    return NXT_OK;
604
0
}
605
606
607
static void
608
nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
609
0
{
610
    /* no op */
611
0
}
612
613
614
void
615
nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
616
0
{
617
0
    int  c;
618
619
0
    c = nxt_atomic_fetch_add(&port->use_count, i);
620
621
0
    if (i < 0 && c == -i) {
622
623
0
        if (port->engine == NULL || task->thread->engine == port->engine) {
624
0
            nxt_port_release(task, port);
625
626
0
            return;
627
0
        }
628
629
0
        nxt_port_post(task, port, nxt_port_release_handler, NULL);
630
0
    }
631
0
}