Coverage Report

Created: 2025-11-09 06:08

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/unit/src/nxt_port_memory.c
Line
Count
Source
1
2
/*
3
 * Copyright (C) Max Romanov
4
 * Copyright (C) NGINX, Inc.
5
 */
6
7
#include <nxt_main.h>
8
9
#if (NXT_HAVE_MEMFD_CREATE)
10
11
#include <linux/memfd.h>
12
#include <unistd.h>
13
#include <sys/syscall.h>
14
15
#endif
16
17
#include <nxt_port_memory_int.h>
18
19
20
static void nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port,
21
    void *data);
22
23
24
nxt_inline void
25
nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
26
0
{
27
0
    int  c;
28
29
0
    c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
30
31
0
    if (i < 0 && c == -i) {
32
0
        if (mmap_handler->hdr != NULL) {
33
0
            nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
34
0
            mmap_handler->hdr = NULL;
35
0
        }
36
37
0
        if (mmap_handler->fd != -1) {
38
0
            nxt_fd_close(mmap_handler->fd);
39
0
        }
40
41
0
        nxt_free(mmap_handler);
42
0
    }
43
0
}
44
45
46
static nxt_port_mmap_t *
47
nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
48
0
{
49
0
    uint32_t  cap;
50
51
0
    cap = port_mmaps->cap;
52
53
0
    if (cap == 0) {
54
0
        cap = i + 1;
55
0
    }
56
57
0
    while (i + 1 > cap) {
58
59
0
        if (cap < 16) {
60
0
            cap = cap * 2;
61
62
0
        } else {
63
0
            cap = cap + cap / 2;
64
0
        }
65
0
    }
66
67
0
    if (cap != port_mmaps->cap) {
68
69
0
        port_mmaps->elts = nxt_realloc(port_mmaps->elts,
70
0
                                       cap * sizeof(nxt_port_mmap_t));
71
0
        if (nxt_slow_path(port_mmaps->elts == NULL)) {
72
0
            return NULL;
73
0
        }
74
75
0
        nxt_memzero(port_mmaps->elts + port_mmaps->cap,
76
0
                    sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
77
78
0
        port_mmaps->cap = cap;
79
0
    }
80
81
0
    if (i + 1 > port_mmaps->size) {
82
0
        port_mmaps->size = i + 1;
83
0
    }
84
85
0
    return port_mmaps->elts + i;
86
0
}
87
88
89
void
90
nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
91
0
{
92
0
    uint32_t         i;
93
0
    nxt_port_mmap_t  *port_mmap;
94
95
0
    if (port_mmaps == NULL) {
96
0
        return;
97
0
    }
98
99
0
    port_mmap = port_mmaps->elts;
100
101
0
    for (i = 0; i < port_mmaps->size; i++) {
102
0
        nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
103
0
    }
104
105
0
    port_mmaps->size = 0;
106
107
0
    if (free_elts != 0) {
108
0
        nxt_free(port_mmaps->elts);
109
0
    }
110
0
}
111
112
113
#define nxt_port_mmap_free_junk(p, size)                                      \
114
0
    memset((p), 0xA5, size)
115
116
117
static void
118
nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
119
0
{
120
0
    u_char                   *p;
121
0
    nxt_mp_t                 *mp;
122
0
    nxt_buf_t                *b, *next;
123
0
    nxt_process_t            *process;
124
0
    nxt_chunk_id_t           c;
125
0
    nxt_port_mmap_header_t   *hdr;
126
0
    nxt_port_mmap_handler_t  *mmap_handler;
127
128
0
    if (nxt_buf_ts_handle(task, obj, data)) {
129
0
        return;
130
0
    }
131
132
0
    b = obj;
133
134
0
    nxt_assert(data == b->parent);
135
136
0
    mmap_handler = data;
137
138
0
complete_buf:
139
140
0
    hdr = mmap_handler->hdr;
141
142
0
    if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
143
0
        nxt_debug(task, "mmap buf completion: mmap for other process pair "
144
0
                  "%PI->%PI", hdr->src_pid, hdr->dst_pid);
145
146
0
        goto release_buf;
147
0
    }
148
149
0
    if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
150
        /*
151
         * Chunks until b->mem.pos has been sent to other side,
152
         * let's release rest (if any).
153
         */
154
0
        p = b->mem.pos - 1;
155
0
        c = nxt_port_mmap_chunk_id(hdr, p) + 1;
156
0
        p = nxt_port_mmap_chunk_start(hdr, c);
157
158
0
    } else {
159
0
        p = b->mem.start;
160
0
        c = nxt_port_mmap_chunk_id(hdr, p);
161
0
    }
162
163
0
    nxt_port_mmap_free_junk(p, b->mem.end - p);
164
165
0
    nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), "
166
0
              "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
167
0
              b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
168
169
0
    while (p < b->mem.end) {
170
0
        nxt_port_mmap_set_chunk_free(hdr->free_map, c);
171
172
0
        p += PORT_MMAP_CHUNK_SIZE;
173
0
        c++;
174
0
    }
175
176
0
    if (hdr->dst_pid == nxt_pid
177
0
        && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
178
0
    {
179
0
        process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
180
181
0
        nxt_process_broadcast_shm_ack(task, process);
182
0
    }
183
184
0
release_buf:
185
186
0
    nxt_port_mmap_handler_use(mmap_handler, -1);
187
188
0
    next = b->next;
189
0
    mp = b->data;
190
191
0
    nxt_mp_free(mp, b);
192
0
    nxt_mp_release(mp);
193
194
0
    if (next != NULL) {
195
0
        b = next;
196
0
        mmap_handler = b->parent;
197
198
0
        goto complete_buf;
199
0
    }
200
0
}
201
202
203
nxt_port_mmap_handler_t *
204
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
205
    nxt_fd_t fd)
206
0
{
207
0
    void                     *mem;
208
0
    struct stat              mmap_stat;
209
0
    nxt_port_mmap_t          *port_mmap;
210
0
    nxt_port_mmap_header_t   *hdr;
211
0
    nxt_port_mmap_handler_t  *mmap_handler;
212
213
0
    nxt_debug(task, "got new mmap fd #%FD from process %PI",
214
0
              fd, process->pid);
215
216
0
    port_mmap = NULL;
217
218
0
    if (fstat(fd, &mmap_stat) == -1) {
219
0
        nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
220
221
0
        return NULL;
222
0
    }
223
224
0
    mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
225
0
                       PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
226
227
0
    if (nxt_slow_path(mem == MAP_FAILED)) {
228
0
        nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
229
230
0
        return NULL;
231
0
    }
232
233
0
    hdr = mem;
234
235
0
    if (nxt_slow_path(hdr->src_pid != process->pid
236
0
                      || hdr->dst_pid != nxt_pid))
237
0
    {
238
0
        nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: "
239
0
                "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid,
240
0
                hdr->dst_pid, nxt_pid);
241
242
0
        nxt_mem_munmap(mem, PORT_MMAP_SIZE);
243
244
0
        return NULL;
245
0
    }
246
247
0
    mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
248
0
    if (nxt_slow_path(mmap_handler == NULL)) {
249
0
        nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
250
251
0
        nxt_mem_munmap(mem, PORT_MMAP_SIZE);
252
253
0
        return NULL;
254
0
    }
255
256
0
    mmap_handler->hdr = hdr;
257
0
    mmap_handler->fd = -1;
258
259
0
    nxt_thread_mutex_lock(&process->incoming.mutex);
260
261
0
    port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
262
0
    if (nxt_slow_path(port_mmap == NULL)) {
263
0
        nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
264
265
0
        nxt_mem_munmap(mem, PORT_MMAP_SIZE);
266
267
0
        nxt_free(mmap_handler);
268
0
        mmap_handler = NULL;
269
270
0
        goto fail;
271
0
    }
272
273
0
    port_mmap->mmap_handler = mmap_handler;
274
0
    nxt_port_mmap_handler_use(mmap_handler, 1);
275
276
0
    hdr->sent_over = 0xFFFFu;
277
278
0
fail:
279
280
0
    nxt_thread_mutex_unlock(&process->incoming.mutex);
281
282
0
    return mmap_handler;
283
0
}
284
285
286
static nxt_port_mmap_handler_t *
287
nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
288
    nxt_bool_t tracking, nxt_int_t n)
289
0
{
290
0
    void                     *mem;
291
0
    nxt_fd_t                 fd;
292
0
    nxt_int_t                i;
293
0
    nxt_free_map_t           *free_map;
294
0
    nxt_port_mmap_t          *port_mmap;
295
0
    nxt_port_mmap_header_t   *hdr;
296
0
    nxt_port_mmap_handler_t  *mmap_handler;
297
298
0
    mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
299
0
    if (nxt_slow_path(mmap_handler == NULL)) {
300
0
        nxt_alert(task, "failed to allocate mmap_handler");
301
302
0
        return NULL;
303
0
    }
304
305
0
    port_mmap = nxt_port_mmap_at(mmaps, mmaps->size);
306
0
    if (nxt_slow_path(port_mmap == NULL)) {
307
0
        nxt_alert(task, "failed to add port mmap to mmaps array");
308
309
0
        nxt_free(mmap_handler);
310
0
        return NULL;
311
0
    }
312
313
0
    fd = nxt_shm_open(task, PORT_MMAP_SIZE);
314
0
    if (nxt_slow_path(fd == -1)) {
315
0
        goto remove_fail;
316
0
    }
317
318
0
    mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
319
0
                       MAP_SHARED, fd, 0);
320
321
0
    if (nxt_slow_path(mem == MAP_FAILED)) {
322
0
        nxt_fd_close(fd);
323
0
        goto remove_fail;
324
0
    }
325
326
0
    mmap_handler->hdr = mem;
327
0
    mmap_handler->fd = fd;
328
0
    port_mmap->mmap_handler = mmap_handler;
329
0
    nxt_port_mmap_handler_use(mmap_handler, 1);
330
331
    /* Init segment header. */
332
0
    hdr = mmap_handler->hdr;
333
334
0
    nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
335
0
    nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
336
337
0
    hdr->id = mmaps->size - 1;
338
0
    hdr->src_pid = nxt_pid;
339
0
    hdr->sent_over = 0xFFFFu;
340
341
    /* Mark first chunk as busy */
342
0
    free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
343
344
0
    for (i = 0; i < n; i++) {
345
0
        nxt_port_mmap_set_chunk_busy(free_map, i);
346
0
    }
347
348
    /* Mark as busy chunk followed the last available chunk. */
349
0
    nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
350
0
    nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
351
352
0
    nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...",
353
0
            hdr->id, nxt_pid);
354
355
0
    return mmap_handler;
356
357
0
remove_fail:
358
359
0
    nxt_free(mmap_handler);
360
361
0
    mmaps->size--;
362
363
0
    return NULL;
364
0
}
365
366
367
nxt_int_t
368
nxt_shm_open(nxt_task_t *task, size_t size)
369
0
{
370
0
    nxt_fd_t  fd;
371
372
0
#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
373
374
0
    u_char    *p, name[64];
375
376
0
    p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
377
0
                    nxt_pid, nxt_random(&task->thread->random));
378
0
    *p = '\0';
379
380
0
#endif
381
382
0
#if (NXT_HAVE_MEMFD_CREATE)
383
384
0
    fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
385
386
0
    if (nxt_slow_path(fd == -1)) {
387
0
        nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
388
389
0
        return -1;
390
0
    }
391
392
0
    nxt_debug(task, "memfd_create(%s): %FD", name, fd);
393
394
#elif (NXT_HAVE_SHM_OPEN_ANON)
395
396
    fd = shm_open(SHM_ANON, O_RDWR, 0600);
397
    if (nxt_slow_path(fd == -1)) {
398
        nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
399
400
        return -1;
401
    }
402
403
    nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
404
405
#elif (NXT_HAVE_SHM_OPEN)
406
407
    /* Just in case. */
408
    shm_unlink((char *) name);
409
410
    fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, 0600);
411
    if (nxt_slow_path(fd == -1)) {
412
        nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
413
414
        return -1;
415
    }
416
417
    nxt_debug(task, "shm_open(%s): %FD", name, fd);
418
419
    if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
420
        nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
421
                nxt_errno);
422
    }
423
424
#else
425
426
#error No working shared memory implementation.
427
428
#endif
429
430
0
    if (nxt_slow_path(ftruncate(fd, size) == -1)) {
431
0
        nxt_alert(task, "ftruncate() failed %E", nxt_errno);
432
433
0
        nxt_fd_close(fd);
434
435
0
        return -1;
436
0
    }
437
438
0
    return fd;
439
0
}
440
441
442
static nxt_port_mmap_handler_t *
443
nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c,
444
    nxt_int_t n, nxt_bool_t tracking)
445
0
{
446
0
    nxt_int_t                i, res, nchunks;
447
0
    nxt_free_map_t           *free_map;
448
0
    nxt_port_mmap_t          *port_mmap;
449
0
    nxt_port_mmap_t          *end_port_mmap;
450
0
    nxt_port_mmap_header_t   *hdr;
451
0
    nxt_port_mmap_handler_t  *mmap_handler;
452
453
0
    nxt_thread_mutex_lock(&mmaps->mutex);
454
455
0
    if (nxt_slow_path(mmaps->elts == NULL)) {
456
0
        goto end;
457
0
    }
458
459
0
    end_port_mmap = mmaps->elts + mmaps->size;
460
461
0
    for (port_mmap = mmaps->elts;
462
0
         port_mmap < end_port_mmap;
463
0
         port_mmap++)
464
0
    {
465
0
        mmap_handler = port_mmap->mmap_handler;
466
0
        hdr = mmap_handler->hdr;
467
468
0
        if (hdr->sent_over != 0xFFFFu) {
469
0
            continue;
470
0
        }
471
472
0
        *c = 0;
473
474
0
        free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
475
476
0
        while (nxt_port_mmap_get_free_chunk(free_map, c)) {
477
0
            nchunks = 1;
478
479
0
            while (nchunks < n) {
480
0
                res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks);
481
482
0
                if (res == 0) {
483
0
                    for (i = 0; i < nchunks; i++) {
484
0
                        nxt_port_mmap_set_chunk_free(free_map, *c + i);
485
0
                    }
486
487
0
                    *c += nchunks + 1;
488
0
                    nchunks = 0;
489
0
                    break;
490
0
                }
491
492
0
                nchunks++;
493
0
            }
494
495
0
            if (nchunks == n) {
496
0
                goto unlock_return;
497
0
            }
498
0
        }
499
500
0
        hdr->oosm = 1;
501
0
    }
502
503
    /* TODO introduce port_mmap limit and release wait. */
504
505
0
end:
506
507
0
    *c = 0;
508
0
    mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n);
509
510
0
unlock_return:
511
512
0
    nxt_thread_mutex_unlock(&mmaps->mutex);
513
514
0
    return mmap_handler;
515
0
}
516
517
518
static nxt_port_mmap_handler_t *
519
nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
520
0
{
521
0
    nxt_process_t            *process;
522
0
    nxt_port_mmap_handler_t  *mmap_handler;
523
524
0
    process = nxt_runtime_process_find(task->thread->runtime, spid);
525
0
    if (nxt_slow_path(process == NULL)) {
526
0
        return NULL;
527
0
    }
528
529
0
    nxt_thread_mutex_lock(&process->incoming.mutex);
530
531
0
    if (nxt_fast_path(process->incoming.size > id)) {
532
0
        mmap_handler = process->incoming.elts[id].mmap_handler;
533
534
0
    } else {
535
0
        mmap_handler = NULL;
536
537
0
        nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
538
0
    }
539
540
0
    nxt_thread_mutex_unlock(&process->incoming.mutex);
541
542
0
    return mmap_handler;
543
0
}
544
545
546
nxt_buf_t *
547
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size)
548
0
{
549
0
    nxt_mp_t                 *mp;
550
0
    nxt_buf_t                *b;
551
0
    nxt_int_t                nchunks;
552
0
    nxt_chunk_id_t           c;
553
0
    nxt_port_mmap_header_t   *hdr;
554
0
    nxt_port_mmap_handler_t  *mmap_handler;
555
556
0
    nxt_debug(task, "request %z bytes shm buffer", size);
557
558
0
    nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
559
560
0
    if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) {
561
0
        nxt_alert(task, "requested buffer (%z) too big", size);
562
563
0
        return NULL;
564
0
    }
565
566
0
    b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
567
0
    if (nxt_slow_path(b == NULL)) {
568
0
        return NULL;
569
0
    }
570
571
0
    b->completion_handler = nxt_port_mmap_buf_completion;
572
0
    nxt_buf_set_port_mmap(b);
573
574
0
    mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0);
575
0
    if (nxt_slow_path(mmap_handler == NULL)) {
576
0
        mp = task->thread->engine->mem_pool;
577
0
        nxt_mp_free(mp, b);
578
0
        nxt_mp_release(mp);
579
0
        return NULL;
580
0
    }
581
582
0
    b->parent = mmap_handler;
583
584
0
    nxt_port_mmap_handler_use(mmap_handler, 1);
585
586
0
    hdr = mmap_handler->hdr;
587
588
0
    b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
589
0
    b->mem.pos = b->mem.start;
590
0
    b->mem.free = b->mem.start;
591
0
    b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
592
593
0
    nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
594
0
              b, b->mem.start, b->mem.end - b->mem.start,
595
0
              hdr->src_pid, hdr->dst_pid, hdr->id, c);
596
597
0
    return b;
598
0
}
599
600
601
nxt_int_t
602
nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
603
    size_t min_size)
604
0
{
605
0
    size_t                   nchunks, free_size;
606
0
    nxt_chunk_id_t           c, start;
607
0
    nxt_port_mmap_header_t   *hdr;
608
0
    nxt_port_mmap_handler_t  *mmap_handler;
609
610
0
    nxt_debug(task, "request increase %z bytes shm buffer", size);
611
612
0
    if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
613
0
        nxt_log(task, NXT_LOG_WARN,
614
0
                "failed to increase, not a mmap buffer");
615
0
        return NXT_ERROR;
616
0
    }
617
618
0
    free_size = nxt_buf_mem_free_size(&b->mem);
619
620
0
    if (nxt_slow_path(size <= free_size)) {
621
0
        return NXT_OK;
622
0
    }
623
624
0
    mmap_handler = b->parent;
625
0
    hdr = mmap_handler->hdr;
626
627
0
    start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
628
629
0
    size -= free_size;
630
631
0
    nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
632
633
0
    c = start;
634
635
    /* Try to acquire as much chunks as required. */
636
0
    while (nchunks > 0) {
637
638
0
        if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
639
0
            break;
640
0
        }
641
642
0
        c++;
643
0
        nchunks--;
644
0
    }
645
646
0
    if (nchunks != 0
647
0
        && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
648
0
    {
649
0
        c--;
650
0
        while (c >= start) {
651
0
            nxt_port_mmap_set_chunk_free(hdr->free_map, c);
652
0
            c--;
653
0
        }
654
655
0
        nxt_debug(task, "failed to increase, %uz chunks busy", nchunks);
656
657
0
        return NXT_ERROR;
658
659
0
    } else {
660
0
        b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
661
662
0
        return NXT_OK;
663
0
    }
664
0
}
665
666
667
static nxt_buf_t *
668
nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
669
    nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
670
0
{
671
0
    size_t                   nchunks;
672
0
    nxt_buf_t                *b;
673
0
    nxt_port_mmap_header_t   *hdr;
674
0
    nxt_port_mmap_handler_t  *mmap_handler;
675
676
0
    mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
677
0
                                                   mmap_msg->mmap_id);
678
0
    if (nxt_slow_path(mmap_handler == NULL)) {
679
0
        return NULL;
680
0
    }
681
682
0
    b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
683
0
    if (nxt_slow_path(b == NULL)) {
684
0
        return NULL;
685
0
    }
686
687
0
    b->completion_handler = nxt_port_mmap_buf_completion;
688
689
0
    nxt_buf_set_port_mmap(b);
690
691
0
    nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
692
0
    if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
693
0
        nchunks++;
694
0
    }
695
696
0
    hdr = mmap_handler->hdr;
697
698
0
    b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
699
0
    b->mem.pos = b->mem.start;
700
0
    b->mem.free = b->mem.start + mmap_msg->size;
701
0
    b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
702
703
0
    b->parent = mmap_handler;
704
0
    nxt_port_mmap_handler_use(mmap_handler, 1);
705
706
0
    nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
707
0
              b, b->mem.start, b->mem.end - b->mem.start,
708
0
              hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
709
710
0
    return b;
711
0
}
712
713
714
void
715
nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
716
    nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf)
717
0
{
718
0
    size_t                   bsize;
719
0
    nxt_buf_t                *bmem;
720
0
    nxt_uint_t               i;
721
0
    nxt_port_mmap_msg_t      *mmap_msg;
722
0
    nxt_port_mmap_header_t   *hdr;
723
0
    nxt_port_mmap_handler_t  *mmap_handler;
724
725
0
    nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
726
0
                    "via shared memory", sb->size, port->pid);
727
728
0
    bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
729
0
    mmap_msg = mmsg_buf;
730
731
0
    bmem = msg->buf;
732
733
0
    for (i = 0; i < sb->niov; i++, mmap_msg++) {
734
735
        /* Lookup buffer which starts current iov_base. */
736
0
        while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
737
0
            bmem = bmem->next;
738
0
        }
739
740
0
        if (nxt_slow_path(bmem == NULL)) {
741
0
            nxt_log_error(NXT_LOG_ERR, task->log,
742
0
                          "failed to find buf for iobuf[%d]", i);
743
0
            return;
744
            /* TODO clear b and exit */
745
0
        }
746
747
0
        mmap_handler = bmem->parent;
748
0
        hdr = mmap_handler->hdr;
749
750
0
        mmap_msg->mmap_id = hdr->id;
751
0
        mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
752
0
        mmap_msg->size = sb->iobuf[i].iov_len;
753
754
0
        nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
755
0
                  mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
756
0
                  port->pid);
757
0
    }
758
759
0
    sb->iobuf[0].iov_base = mmsg_buf;
760
0
    sb->iobuf[0].iov_len = bsize;
761
0
    sb->niov = 1;
762
0
    sb->size = bsize;
763
764
0
    msg->port_msg.mmap = 1;
765
0
}
766
767
768
void
769
nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
770
0
{
771
0
    nxt_buf_t            *b, **pb;
772
0
    nxt_port_mmap_msg_t  *end, *mmap_msg;
773
774
0
    pb = &msg->buf;
775
0
    msg->size = 0;
776
777
0
    for (b = msg->buf; b != NULL; b = b->next) {
778
779
0
        mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
780
0
        end = (nxt_port_mmap_msg_t *) b->mem.free;
781
782
0
        while (mmap_msg < end) {
783
0
            nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
784
0
                      mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
785
0
                      msg->port_msg.pid);
786
787
0
            *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
788
0
                                                 msg->port_msg.pid, mmap_msg);
789
0
            if (nxt_slow_path(*pb == NULL)) {
790
0
                nxt_log_error(NXT_LOG_ERR, task->log,
791
0
                              "failed to get mmap buffer");
792
793
0
                break;
794
0
            }
795
796
0
            msg->size += mmap_msg->size;
797
0
            pb = &(*pb)->next;
798
0
            mmap_msg++;
799
800
            /* Mark original buf as complete. */
801
0
            b->mem.pos += sizeof(nxt_port_mmap_msg_t);
802
0
        }
803
0
    }
804
0
}
805
806
807
nxt_port_method_t
808
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
809
0
{
810
0
    nxt_port_method_t  m;
811
812
0
    m = NXT_PORT_METHOD_ANY;
813
814
0
    for (/* void */; b != NULL; b = b->next) {
815
0
        if (nxt_buf_used_size(b) == 0) {
816
            /* empty buffers does not affect method */
817
0
            continue;
818
0
        }
819
820
0
        if (nxt_buf_is_port_mmap(b)) {
821
0
            if (m == NXT_PORT_METHOD_PLAIN) {
822
0
                nxt_log_error(NXT_LOG_ERR, task->log,
823
0
                              "mixing plain and mmap buffers, "
824
0
                              "using plain mode");
825
826
0
                break;
827
0
            }
828
829
0
            if (m == NXT_PORT_METHOD_ANY) {
830
0
                nxt_debug(task, "using mmap mode");
831
832
0
                m = NXT_PORT_METHOD_MMAP;
833
0
            }
834
0
        } else {
835
0
            if (m == NXT_PORT_METHOD_MMAP) {
836
0
                nxt_log_error(NXT_LOG_ERR, task->log,
837
0
                              "mixing mmap and plain buffers, "
838
0
                              "switching to plain mode");
839
840
0
                m = NXT_PORT_METHOD_PLAIN;
841
842
0
                break;
843
0
            }
844
845
0
            if (m == NXT_PORT_METHOD_ANY) {
846
0
                nxt_debug(task, "using plain mode");
847
848
0
                m = NXT_PORT_METHOD_PLAIN;
849
0
            }
850
0
        }
851
0
    }
852
853
0
    return m;
854
0
}
855
856
857
void
858
nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process)
859
0
{
860
0
    nxt_port_t  *port;
861
862
0
    if (nxt_slow_path(process == NULL || nxt_queue_is_empty(&process->ports)))
863
0
    {
864
0
        return;
865
0
    }
866
867
0
    port = nxt_process_port_first(process);
868
869
0
    if (port->type == NXT_PROCESS_APP) {
870
0
        nxt_port_post(task, port, nxt_port_broadcast_shm_ack, process);
871
0
    }
872
0
}
873
874
875
static void
876
nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, void *data)
877
0
{
878
0
    nxt_process_t  *process;
879
880
0
    process = data;
881
882
0
    nxt_queue_each(port, &process->ports, nxt_port_t, link) {
883
0
        (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
884
0
                                     -1, 0, 0, NULL);
885
0
    } nxt_queue_loop;
886
0
}