/src/unit/src/nxt_port_memory.c
Line | Count | Source |
1 | | |
2 | | /* |
3 | | * Copyright (C) Max Romanov |
4 | | * Copyright (C) NGINX, Inc. |
5 | | */ |
6 | | |
7 | | #include <nxt_main.h> |
8 | | |
9 | | #if (NXT_HAVE_MEMFD_CREATE) |
10 | | |
11 | | #include <linux/memfd.h> |
12 | | #include <unistd.h> |
13 | | #include <sys/syscall.h> |
14 | | |
15 | | #endif |
16 | | |
17 | | #include <nxt_port_memory_int.h> |
18 | | |
19 | | |
20 | | static void nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, |
21 | | void *data); |
22 | | |
23 | | |
24 | | nxt_inline void |
25 | | nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i) |
26 | 0 | { |
27 | 0 | int c; |
28 | |
|
29 | 0 | c = nxt_atomic_fetch_add(&mmap_handler->use_count, i); |
30 | |
|
31 | 0 | if (i < 0 && c == -i) { |
32 | 0 | if (mmap_handler->hdr != NULL) { |
33 | 0 | nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE); |
34 | 0 | mmap_handler->hdr = NULL; |
35 | 0 | } |
36 | |
|
37 | 0 | if (mmap_handler->fd != -1) { |
38 | 0 | nxt_fd_close(mmap_handler->fd); |
39 | 0 | } |
40 | |
|
41 | 0 | nxt_free(mmap_handler); |
42 | 0 | } |
43 | 0 | } |
44 | | |
45 | | |
46 | | static nxt_port_mmap_t * |
47 | | nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) |
48 | 0 | { |
49 | 0 | uint32_t cap; |
50 | |
|
51 | 0 | cap = port_mmaps->cap; |
52 | |
|
53 | 0 | if (cap == 0) { |
54 | 0 | cap = i + 1; |
55 | 0 | } |
56 | |
|
57 | 0 | while (i + 1 > cap) { |
58 | |
|
59 | 0 | if (cap < 16) { |
60 | 0 | cap = cap * 2; |
61 | |
|
62 | 0 | } else { |
63 | 0 | cap = cap + cap / 2; |
64 | 0 | } |
65 | 0 | } |
66 | |
|
67 | 0 | if (cap != port_mmaps->cap) { |
68 | |
|
69 | 0 | port_mmaps->elts = nxt_realloc(port_mmaps->elts, |
70 | 0 | cap * sizeof(nxt_port_mmap_t)); |
71 | 0 | if (nxt_slow_path(port_mmaps->elts == NULL)) { |
72 | 0 | return NULL; |
73 | 0 | } |
74 | | |
75 | 0 | nxt_memzero(port_mmaps->elts + port_mmaps->cap, |
76 | 0 | sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap)); |
77 | |
|
78 | 0 | port_mmaps->cap = cap; |
79 | 0 | } |
80 | | |
81 | 0 | if (i + 1 > port_mmaps->size) { |
82 | 0 | port_mmaps->size = i + 1; |
83 | 0 | } |
84 | |
|
85 | 0 | return port_mmaps->elts + i; |
86 | 0 | } |
87 | | |
88 | | |
89 | | void |
90 | | nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts) |
91 | 0 | { |
92 | 0 | uint32_t i; |
93 | 0 | nxt_port_mmap_t *port_mmap; |
94 | |
|
95 | 0 | if (port_mmaps == NULL) { |
96 | 0 | return; |
97 | 0 | } |
98 | | |
99 | 0 | port_mmap = port_mmaps->elts; |
100 | |
|
101 | 0 | for (i = 0; i < port_mmaps->size; i++) { |
102 | 0 | nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1); |
103 | 0 | } |
104 | |
|
105 | 0 | port_mmaps->size = 0; |
106 | |
|
107 | 0 | if (free_elts != 0) { |
108 | 0 | nxt_free(port_mmaps->elts); |
109 | 0 | } |
110 | 0 | } |
111 | | |
112 | | |
113 | | #define nxt_port_mmap_free_junk(p, size) \ |
114 | 0 | memset((p), 0xA5, size) |
115 | | |
116 | | |
117 | | static void |
118 | | nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) |
119 | 0 | { |
120 | 0 | u_char *p; |
121 | 0 | nxt_mp_t *mp; |
122 | 0 | nxt_buf_t *b, *next; |
123 | 0 | nxt_process_t *process; |
124 | 0 | nxt_chunk_id_t c; |
125 | 0 | nxt_port_mmap_header_t *hdr; |
126 | 0 | nxt_port_mmap_handler_t *mmap_handler; |
127 | |
|
128 | 0 | if (nxt_buf_ts_handle(task, obj, data)) { |
129 | 0 | return; |
130 | 0 | } |
131 | | |
132 | 0 | b = obj; |
133 | |
|
134 | 0 | nxt_assert(data == b->parent); |
135 | |
|
136 | 0 | mmap_handler = data; |
137 | |
|
138 | 0 | complete_buf: |
139 | |
|
140 | 0 | hdr = mmap_handler->hdr; |
141 | |
|
142 | 0 | if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { |
143 | 0 | nxt_debug(task, "mmap buf completion: mmap for other process pair " |
144 | 0 | "%PI->%PI", hdr->src_pid, hdr->dst_pid); |
145 | |
|
146 | 0 | goto release_buf; |
147 | 0 | } |
148 | | |
149 | 0 | if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { |
150 | | /* |
151 | | * Chunks until b->mem.pos has been sent to other side, |
152 | | * let's release rest (if any). |
153 | | */ |
154 | 0 | p = b->mem.pos - 1; |
155 | 0 | c = nxt_port_mmap_chunk_id(hdr, p) + 1; |
156 | 0 | p = nxt_port_mmap_chunk_start(hdr, c); |
157 | |
|
158 | 0 | } else { |
159 | 0 | p = b->mem.start; |
160 | 0 | c = nxt_port_mmap_chunk_id(hdr, p); |
161 | 0 | } |
162 | |
|
163 | 0 | nxt_port_mmap_free_junk(p, b->mem.end - p); |
164 | |
|
165 | 0 | nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), " |
166 | 0 | "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, |
167 | 0 | b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c); |
168 | |
|
169 | 0 | while (p < b->mem.end) { |
170 | 0 | nxt_port_mmap_set_chunk_free(hdr->free_map, c); |
171 | |
|
172 | 0 | p += PORT_MMAP_CHUNK_SIZE; |
173 | 0 | c++; |
174 | 0 | } |
175 | |
|
176 | 0 | if (hdr->dst_pid == nxt_pid |
177 | 0 | && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) |
178 | 0 | { |
179 | 0 | process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid); |
180 | |
|
181 | 0 | nxt_process_broadcast_shm_ack(task, process); |
182 | 0 | } |
183 | |
|
184 | 0 | release_buf: |
185 | |
|
186 | 0 | nxt_port_mmap_handler_use(mmap_handler, -1); |
187 | |
|
188 | 0 | next = b->next; |
189 | 0 | mp = b->data; |
190 | |
|
191 | 0 | nxt_mp_free(mp, b); |
192 | 0 | nxt_mp_release(mp); |
193 | |
|
194 | 0 | if (next != NULL) { |
195 | 0 | b = next; |
196 | 0 | mmap_handler = b->parent; |
197 | |
|
198 | 0 | goto complete_buf; |
199 | 0 | } |
200 | 0 | } |
201 | | |
202 | | |
203 | | nxt_port_mmap_handler_t * |
204 | | nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, |
205 | | nxt_fd_t fd) |
206 | 0 | { |
207 | 0 | void *mem; |
208 | 0 | struct stat mmap_stat; |
209 | 0 | nxt_port_mmap_t *port_mmap; |
210 | 0 | nxt_port_mmap_header_t *hdr; |
211 | 0 | nxt_port_mmap_handler_t *mmap_handler; |
212 | |
|
213 | 0 | nxt_debug(task, "got new mmap fd #%FD from process %PI", |
214 | 0 | fd, process->pid); |
215 | |
|
216 | 0 | port_mmap = NULL; |
217 | |
|
218 | 0 | if (fstat(fd, &mmap_stat) == -1) { |
219 | 0 | nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); |
220 | |
|
221 | 0 | return NULL; |
222 | 0 | } |
223 | | |
224 | 0 | mem = nxt_mem_mmap(NULL, mmap_stat.st_size, |
225 | 0 | PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); |
226 | |
|
227 | 0 | if (nxt_slow_path(mem == MAP_FAILED)) { |
228 | 0 | nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); |
229 | |
|
230 | 0 | return NULL; |
231 | 0 | } |
232 | | |
233 | 0 | hdr = mem; |
234 | |
|
235 | 0 | if (nxt_slow_path(hdr->src_pid != process->pid |
236 | 0 | || hdr->dst_pid != nxt_pid)) |
237 | 0 | { |
238 | 0 | nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: " |
239 | 0 | "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid, |
240 | 0 | hdr->dst_pid, nxt_pid); |
241 | |
|
242 | 0 | nxt_mem_munmap(mem, PORT_MMAP_SIZE); |
243 | |
|
244 | 0 | return NULL; |
245 | 0 | } |
246 | | |
247 | 0 | mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); |
248 | 0 | if (nxt_slow_path(mmap_handler == NULL)) { |
249 | 0 | nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); |
250 | |
|
251 | 0 | nxt_mem_munmap(mem, PORT_MMAP_SIZE); |
252 | |
|
253 | 0 | return NULL; |
254 | 0 | } |
255 | | |
256 | 0 | mmap_handler->hdr = hdr; |
257 | 0 | mmap_handler->fd = -1; |
258 | |
|
259 | 0 | nxt_thread_mutex_lock(&process->incoming.mutex); |
260 | |
|
261 | 0 | port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); |
262 | 0 | if (nxt_slow_path(port_mmap == NULL)) { |
263 | 0 | nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); |
264 | |
|
265 | 0 | nxt_mem_munmap(mem, PORT_MMAP_SIZE); |
266 | |
|
267 | 0 | nxt_free(mmap_handler); |
268 | 0 | mmap_handler = NULL; |
269 | |
|
270 | 0 | goto fail; |
271 | 0 | } |
272 | | |
273 | 0 | port_mmap->mmap_handler = mmap_handler; |
274 | 0 | nxt_port_mmap_handler_use(mmap_handler, 1); |
275 | |
|
276 | 0 | hdr->sent_over = 0xFFFFu; |
277 | |
|
278 | 0 | fail: |
279 | |
|
280 | 0 | nxt_thread_mutex_unlock(&process->incoming.mutex); |
281 | |
|
282 | 0 | return mmap_handler; |
283 | 0 | } |
284 | | |
285 | | |
286 | | static nxt_port_mmap_handler_t * |
287 | | nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps, |
288 | | nxt_bool_t tracking, nxt_int_t n) |
289 | 0 | { |
290 | 0 | void *mem; |
291 | 0 | nxt_fd_t fd; |
292 | 0 | nxt_int_t i; |
293 | 0 | nxt_free_map_t *free_map; |
294 | 0 | nxt_port_mmap_t *port_mmap; |
295 | 0 | nxt_port_mmap_header_t *hdr; |
296 | 0 | nxt_port_mmap_handler_t *mmap_handler; |
297 | |
|
298 | 0 | mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); |
299 | 0 | if (nxt_slow_path(mmap_handler == NULL)) { |
300 | 0 | nxt_alert(task, "failed to allocate mmap_handler"); |
301 | |
|
302 | 0 | return NULL; |
303 | 0 | } |
304 | | |
305 | 0 | port_mmap = nxt_port_mmap_at(mmaps, mmaps->size); |
306 | 0 | if (nxt_slow_path(port_mmap == NULL)) { |
307 | 0 | nxt_alert(task, "failed to add port mmap to mmaps array"); |
308 | |
|
309 | 0 | nxt_free(mmap_handler); |
310 | 0 | return NULL; |
311 | 0 | } |
312 | | |
313 | 0 | fd = nxt_shm_open(task, PORT_MMAP_SIZE); |
314 | 0 | if (nxt_slow_path(fd == -1)) { |
315 | 0 | goto remove_fail; |
316 | 0 | } |
317 | | |
318 | 0 | mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, |
319 | 0 | MAP_SHARED, fd, 0); |
320 | |
|
321 | 0 | if (nxt_slow_path(mem == MAP_FAILED)) { |
322 | 0 | nxt_fd_close(fd); |
323 | 0 | goto remove_fail; |
324 | 0 | } |
325 | | |
326 | 0 | mmap_handler->hdr = mem; |
327 | 0 | mmap_handler->fd = fd; |
328 | 0 | port_mmap->mmap_handler = mmap_handler; |
329 | 0 | nxt_port_mmap_handler_use(mmap_handler, 1); |
330 | | |
331 | | /* Init segment header. */ |
332 | 0 | hdr = mmap_handler->hdr; |
333 | |
|
334 | 0 | nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); |
335 | 0 | nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); |
336 | |
|
337 | 0 | hdr->id = mmaps->size - 1; |
338 | 0 | hdr->src_pid = nxt_pid; |
339 | 0 | hdr->sent_over = 0xFFFFu; |
340 | | |
341 | | /* Mark first chunk as busy */ |
342 | 0 | free_map = tracking ? hdr->free_tracking_map : hdr->free_map; |
343 | |
|
344 | 0 | for (i = 0; i < n; i++) { |
345 | 0 | nxt_port_mmap_set_chunk_busy(free_map, i); |
346 | 0 | } |
347 | | |
348 | | /* Mark as busy chunk followed the last available chunk. */ |
349 | 0 | nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); |
350 | 0 | nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); |
351 | |
|
352 | 0 | nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...", |
353 | 0 | hdr->id, nxt_pid); |
354 | |
|
355 | 0 | return mmap_handler; |
356 | | |
357 | 0 | remove_fail: |
358 | |
|
359 | 0 | nxt_free(mmap_handler); |
360 | |
|
361 | 0 | mmaps->size--; |
362 | |
|
363 | 0 | return NULL; |
364 | 0 | } |
365 | | |
366 | | |
367 | | nxt_int_t |
368 | | nxt_shm_open(nxt_task_t *task, size_t size) |
369 | 0 | { |
370 | 0 | nxt_fd_t fd; |
371 | |
|
372 | 0 | #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) |
373 | |
|
374 | 0 | u_char *p, name[64]; |
375 | |
|
376 | 0 | p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD", |
377 | 0 | nxt_pid, nxt_random(&task->thread->random)); |
378 | 0 | *p = '\0'; |
379 | |
|
380 | 0 | #endif |
381 | |
|
382 | 0 | #if (NXT_HAVE_MEMFD_CREATE) |
383 | |
|
384 | 0 | fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); |
385 | |
|
386 | 0 | if (nxt_slow_path(fd == -1)) { |
387 | 0 | nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno); |
388 | |
|
389 | 0 | return -1; |
390 | 0 | } |
391 | | |
392 | 0 | nxt_debug(task, "memfd_create(%s): %FD", name, fd); |
393 | |
|
394 | | #elif (NXT_HAVE_SHM_OPEN_ANON) |
395 | | |
396 | | fd = shm_open(SHM_ANON, O_RDWR, 0600); |
397 | | if (nxt_slow_path(fd == -1)) { |
398 | | nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno); |
399 | | |
400 | | return -1; |
401 | | } |
402 | | |
403 | | nxt_debug(task, "shm_open(SHM_ANON): %FD", fd); |
404 | | |
405 | | #elif (NXT_HAVE_SHM_OPEN) |
406 | | |
407 | | /* Just in case. */ |
408 | | shm_unlink((char *) name); |
409 | | |
410 | | fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, 0600); |
411 | | if (nxt_slow_path(fd == -1)) { |
412 | | nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno); |
413 | | |
414 | | return -1; |
415 | | } |
416 | | |
417 | | nxt_debug(task, "shm_open(%s): %FD", name, fd); |
418 | | |
419 | | if (nxt_slow_path(shm_unlink((char *) name) == -1)) { |
420 | | nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, |
421 | | nxt_errno); |
422 | | } |
423 | | |
424 | | #else |
425 | | |
426 | | #error No working shared memory implementation. |
427 | | |
428 | | #endif |
429 | |
|
430 | 0 | if (nxt_slow_path(ftruncate(fd, size) == -1)) { |
431 | 0 | nxt_alert(task, "ftruncate() failed %E", nxt_errno); |
432 | |
|
433 | 0 | nxt_fd_close(fd); |
434 | |
|
435 | 0 | return -1; |
436 | 0 | } |
437 | | |
438 | 0 | return fd; |
439 | 0 | } |
440 | | |
441 | | |
442 | | static nxt_port_mmap_handler_t * |
443 | | nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c, |
444 | | nxt_int_t n, nxt_bool_t tracking) |
445 | 0 | { |
446 | 0 | nxt_int_t i, res, nchunks; |
447 | 0 | nxt_free_map_t *free_map; |
448 | 0 | nxt_port_mmap_t *port_mmap; |
449 | 0 | nxt_port_mmap_t *end_port_mmap; |
450 | 0 | nxt_port_mmap_header_t *hdr; |
451 | 0 | nxt_port_mmap_handler_t *mmap_handler; |
452 | |
|
453 | 0 | nxt_thread_mutex_lock(&mmaps->mutex); |
454 | |
|
455 | 0 | if (nxt_slow_path(mmaps->elts == NULL)) { |
456 | 0 | goto end; |
457 | 0 | } |
458 | | |
459 | 0 | end_port_mmap = mmaps->elts + mmaps->size; |
460 | |
|
461 | 0 | for (port_mmap = mmaps->elts; |
462 | 0 | port_mmap < end_port_mmap; |
463 | 0 | port_mmap++) |
464 | 0 | { |
465 | 0 | mmap_handler = port_mmap->mmap_handler; |
466 | 0 | hdr = mmap_handler->hdr; |
467 | |
|
468 | 0 | if (hdr->sent_over != 0xFFFFu) { |
469 | 0 | continue; |
470 | 0 | } |
471 | | |
472 | 0 | *c = 0; |
473 | |
|
474 | 0 | free_map = tracking ? hdr->free_tracking_map : hdr->free_map; |
475 | |
|
476 | 0 | while (nxt_port_mmap_get_free_chunk(free_map, c)) { |
477 | 0 | nchunks = 1; |
478 | |
|
479 | 0 | while (nchunks < n) { |
480 | 0 | res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks); |
481 | |
|
482 | 0 | if (res == 0) { |
483 | 0 | for (i = 0; i < nchunks; i++) { |
484 | 0 | nxt_port_mmap_set_chunk_free(free_map, *c + i); |
485 | 0 | } |
486 | |
|
487 | 0 | *c += nchunks + 1; |
488 | 0 | nchunks = 0; |
489 | 0 | break; |
490 | 0 | } |
491 | | |
492 | 0 | nchunks++; |
493 | 0 | } |
494 | |
|
495 | 0 | if (nchunks == n) { |
496 | 0 | goto unlock_return; |
497 | 0 | } |
498 | 0 | } |
499 | | |
500 | 0 | hdr->oosm = 1; |
501 | 0 | } |
502 | | |
503 | | /* TODO introduce port_mmap limit and release wait. */ |
504 | | |
505 | 0 | end: |
506 | |
|
507 | 0 | *c = 0; |
508 | 0 | mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n); |
509 | |
|
510 | 0 | unlock_return: |
511 | |
|
512 | 0 | nxt_thread_mutex_unlock(&mmaps->mutex); |
513 | |
|
514 | 0 | return mmap_handler; |
515 | 0 | } |
516 | | |
517 | | |
518 | | static nxt_port_mmap_handler_t * |
519 | | nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) |
520 | 0 | { |
521 | 0 | nxt_process_t *process; |
522 | 0 | nxt_port_mmap_handler_t *mmap_handler; |
523 | |
|
524 | 0 | process = nxt_runtime_process_find(task->thread->runtime, spid); |
525 | 0 | if (nxt_slow_path(process == NULL)) { |
526 | 0 | return NULL; |
527 | 0 | } |
528 | | |
529 | 0 | nxt_thread_mutex_lock(&process->incoming.mutex); |
530 | |
|
531 | 0 | if (nxt_fast_path(process->incoming.size > id)) { |
532 | 0 | mmap_handler = process->incoming.elts[id].mmap_handler; |
533 | |
|
534 | 0 | } else { |
535 | 0 | mmap_handler = NULL; |
536 | |
|
537 | 0 | nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid); |
538 | 0 | } |
539 | |
|
540 | 0 | nxt_thread_mutex_unlock(&process->incoming.mutex); |
541 | |
|
542 | 0 | return mmap_handler; |
543 | 0 | } |
544 | | |
545 | | |
546 | | nxt_buf_t * |
547 | | nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size) |
548 | 0 | { |
549 | 0 | nxt_mp_t *mp; |
550 | 0 | nxt_buf_t *b; |
551 | 0 | nxt_int_t nchunks; |
552 | 0 | nxt_chunk_id_t c; |
553 | 0 | nxt_port_mmap_header_t *hdr; |
554 | 0 | nxt_port_mmap_handler_t *mmap_handler; |
555 | |
|
556 | 0 | nxt_debug(task, "request %z bytes shm buffer", size); |
557 | |
|
558 | 0 | nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; |
559 | |
|
560 | 0 | if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) { |
561 | 0 | nxt_alert(task, "requested buffer (%z) too big", size); |
562 | |
|
563 | 0 | return NULL; |
564 | 0 | } |
565 | | |
566 | 0 | b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); |
567 | 0 | if (nxt_slow_path(b == NULL)) { |
568 | 0 | return NULL; |
569 | 0 | } |
570 | | |
571 | 0 | b->completion_handler = nxt_port_mmap_buf_completion; |
572 | 0 | nxt_buf_set_port_mmap(b); |
573 | |
|
574 | 0 | mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0); |
575 | 0 | if (nxt_slow_path(mmap_handler == NULL)) { |
576 | 0 | mp = task->thread->engine->mem_pool; |
577 | 0 | nxt_mp_free(mp, b); |
578 | 0 | nxt_mp_release(mp); |
579 | 0 | return NULL; |
580 | 0 | } |
581 | | |
582 | 0 | b->parent = mmap_handler; |
583 | |
|
584 | 0 | nxt_port_mmap_handler_use(mmap_handler, 1); |
585 | |
|
586 | 0 | hdr = mmap_handler->hdr; |
587 | |
|
588 | 0 | b->mem.start = nxt_port_mmap_chunk_start(hdr, c); |
589 | 0 | b->mem.pos = b->mem.start; |
590 | 0 | b->mem.free = b->mem.start; |
591 | 0 | b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; |
592 | |
|
593 | 0 | nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", |
594 | 0 | b, b->mem.start, b->mem.end - b->mem.start, |
595 | 0 | hdr->src_pid, hdr->dst_pid, hdr->id, c); |
596 | |
|
597 | 0 | return b; |
598 | 0 | } |
599 | | |
600 | | |
601 | | nxt_int_t |
602 | | nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, |
603 | | size_t min_size) |
604 | 0 | { |
605 | 0 | size_t nchunks, free_size; |
606 | 0 | nxt_chunk_id_t c, start; |
607 | 0 | nxt_port_mmap_header_t *hdr; |
608 | 0 | nxt_port_mmap_handler_t *mmap_handler; |
609 | |
|
610 | 0 | nxt_debug(task, "request increase %z bytes shm buffer", size); |
611 | |
|
612 | 0 | if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { |
613 | 0 | nxt_log(task, NXT_LOG_WARN, |
614 | 0 | "failed to increase, not a mmap buffer"); |
615 | 0 | return NXT_ERROR; |
616 | 0 | } |
617 | | |
618 | 0 | free_size = nxt_buf_mem_free_size(&b->mem); |
619 | |
|
620 | 0 | if (nxt_slow_path(size <= free_size)) { |
621 | 0 | return NXT_OK; |
622 | 0 | } |
623 | | |
624 | 0 | mmap_handler = b->parent; |
625 | 0 | hdr = mmap_handler->hdr; |
626 | |
|
627 | 0 | start = nxt_port_mmap_chunk_id(hdr, b->mem.end); |
628 | |
|
629 | 0 | size -= free_size; |
630 | |
|
631 | 0 | nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; |
632 | |
|
633 | 0 | c = start; |
634 | | |
635 | | /* Try to acquire as much chunks as required. */ |
636 | 0 | while (nchunks > 0) { |
637 | |
|
638 | 0 | if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { |
639 | 0 | break; |
640 | 0 | } |
641 | | |
642 | 0 | c++; |
643 | 0 | nchunks--; |
644 | 0 | } |
645 | |
|
646 | 0 | if (nchunks != 0 |
647 | 0 | && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) |
648 | 0 | { |
649 | 0 | c--; |
650 | 0 | while (c >= start) { |
651 | 0 | nxt_port_mmap_set_chunk_free(hdr->free_map, c); |
652 | 0 | c--; |
653 | 0 | } |
654 | |
|
655 | 0 | nxt_debug(task, "failed to increase, %uz chunks busy", nchunks); |
656 | |
|
657 | 0 | return NXT_ERROR; |
658 | |
|
659 | 0 | } else { |
660 | 0 | b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); |
661 | |
|
662 | 0 | return NXT_OK; |
663 | 0 | } |
664 | 0 | } |
665 | | |
666 | | |
667 | | static nxt_buf_t * |
668 | | nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, |
669 | | nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) |
670 | 0 | { |
671 | 0 | size_t nchunks; |
672 | 0 | nxt_buf_t *b; |
673 | 0 | nxt_port_mmap_header_t *hdr; |
674 | 0 | nxt_port_mmap_handler_t *mmap_handler; |
675 | |
|
676 | 0 | mmap_handler = nxt_port_get_port_incoming_mmap(task, spid, |
677 | 0 | mmap_msg->mmap_id); |
678 | 0 | if (nxt_slow_path(mmap_handler == NULL)) { |
679 | 0 | return NULL; |
680 | 0 | } |
681 | | |
682 | 0 | b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); |
683 | 0 | if (nxt_slow_path(b == NULL)) { |
684 | 0 | return NULL; |
685 | 0 | } |
686 | | |
687 | 0 | b->completion_handler = nxt_port_mmap_buf_completion; |
688 | |
|
689 | 0 | nxt_buf_set_port_mmap(b); |
690 | |
|
691 | 0 | nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; |
692 | 0 | if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { |
693 | 0 | nchunks++; |
694 | 0 | } |
695 | |
|
696 | 0 | hdr = mmap_handler->hdr; |
697 | |
|
698 | 0 | b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); |
699 | 0 | b->mem.pos = b->mem.start; |
700 | 0 | b->mem.free = b->mem.start + mmap_msg->size; |
701 | 0 | b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; |
702 | |
|
703 | 0 | b->parent = mmap_handler; |
704 | 0 | nxt_port_mmap_handler_use(mmap_handler, 1); |
705 | |
|
706 | 0 | nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", |
707 | 0 | b, b->mem.start, b->mem.end - b->mem.start, |
708 | 0 | hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id); |
709 | |
|
710 | 0 | return b; |
711 | 0 | } |
712 | | |
713 | | |
714 | | void |
715 | | nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, |
716 | | nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf) |
717 | 0 | { |
718 | 0 | size_t bsize; |
719 | 0 | nxt_buf_t *bmem; |
720 | 0 | nxt_uint_t i; |
721 | 0 | nxt_port_mmap_msg_t *mmap_msg; |
722 | 0 | nxt_port_mmap_header_t *hdr; |
723 | 0 | nxt_port_mmap_handler_t *mmap_handler; |
724 | |
|
725 | 0 | nxt_debug(task, "prepare %z bytes message for transfer to process %PI " |
726 | 0 | "via shared memory", sb->size, port->pid); |
727 | |
|
728 | 0 | bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); |
729 | 0 | mmap_msg = mmsg_buf; |
730 | |
|
731 | 0 | bmem = msg->buf; |
732 | |
|
733 | 0 | for (i = 0; i < sb->niov; i++, mmap_msg++) { |
734 | | |
735 | | /* Lookup buffer which starts current iov_base. */ |
736 | 0 | while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) { |
737 | 0 | bmem = bmem->next; |
738 | 0 | } |
739 | |
|
740 | 0 | if (nxt_slow_path(bmem == NULL)) { |
741 | 0 | nxt_log_error(NXT_LOG_ERR, task->log, |
742 | 0 | "failed to find buf for iobuf[%d]", i); |
743 | 0 | return; |
744 | | /* TODO clear b and exit */ |
745 | 0 | } |
746 | | |
747 | 0 | mmap_handler = bmem->parent; |
748 | 0 | hdr = mmap_handler->hdr; |
749 | |
|
750 | 0 | mmap_msg->mmap_id = hdr->id; |
751 | 0 | mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); |
752 | 0 | mmap_msg->size = sb->iobuf[i].iov_len; |
753 | |
|
754 | 0 | nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", |
755 | 0 | mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, |
756 | 0 | port->pid); |
757 | 0 | } |
758 | | |
759 | 0 | sb->iobuf[0].iov_base = mmsg_buf; |
760 | 0 | sb->iobuf[0].iov_len = bsize; |
761 | 0 | sb->niov = 1; |
762 | 0 | sb->size = bsize; |
763 | |
|
764 | 0 | msg->port_msg.mmap = 1; |
765 | 0 | } |
766 | | |
767 | | |
768 | | void |
769 | | nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) |
770 | 0 | { |
771 | 0 | nxt_buf_t *b, **pb; |
772 | 0 | nxt_port_mmap_msg_t *end, *mmap_msg; |
773 | |
|
774 | 0 | pb = &msg->buf; |
775 | 0 | msg->size = 0; |
776 | |
|
777 | 0 | for (b = msg->buf; b != NULL; b = b->next) { |
778 | |
|
779 | 0 | mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; |
780 | 0 | end = (nxt_port_mmap_msg_t *) b->mem.free; |
781 | |
|
782 | 0 | while (mmap_msg < end) { |
783 | 0 | nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", |
784 | 0 | mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, |
785 | 0 | msg->port_msg.pid); |
786 | |
|
787 | 0 | *pb = nxt_port_mmap_get_incoming_buf(task, msg->port, |
788 | 0 | msg->port_msg.pid, mmap_msg); |
789 | 0 | if (nxt_slow_path(*pb == NULL)) { |
790 | 0 | nxt_log_error(NXT_LOG_ERR, task->log, |
791 | 0 | "failed to get mmap buffer"); |
792 | |
|
793 | 0 | break; |
794 | 0 | } |
795 | | |
796 | 0 | msg->size += mmap_msg->size; |
797 | 0 | pb = &(*pb)->next; |
798 | 0 | mmap_msg++; |
799 | | |
800 | | /* Mark original buf as complete. */ |
801 | 0 | b->mem.pos += sizeof(nxt_port_mmap_msg_t); |
802 | 0 | } |
803 | 0 | } |
804 | 0 | } |
805 | | |
806 | | |
807 | | nxt_port_method_t |
808 | | nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) |
809 | 0 | { |
810 | 0 | nxt_port_method_t m; |
811 | |
|
812 | 0 | m = NXT_PORT_METHOD_ANY; |
813 | |
|
814 | 0 | for (/* void */; b != NULL; b = b->next) { |
815 | 0 | if (nxt_buf_used_size(b) == 0) { |
816 | | /* empty buffers does not affect method */ |
817 | 0 | continue; |
818 | 0 | } |
819 | | |
820 | 0 | if (nxt_buf_is_port_mmap(b)) { |
821 | 0 | if (m == NXT_PORT_METHOD_PLAIN) { |
822 | 0 | nxt_log_error(NXT_LOG_ERR, task->log, |
823 | 0 | "mixing plain and mmap buffers, " |
824 | 0 | "using plain mode"); |
825 | |
|
826 | 0 | break; |
827 | 0 | } |
828 | | |
829 | 0 | if (m == NXT_PORT_METHOD_ANY) { |
830 | 0 | nxt_debug(task, "using mmap mode"); |
831 | |
|
832 | 0 | m = NXT_PORT_METHOD_MMAP; |
833 | 0 | } |
834 | 0 | } else { |
835 | 0 | if (m == NXT_PORT_METHOD_MMAP) { |
836 | 0 | nxt_log_error(NXT_LOG_ERR, task->log, |
837 | 0 | "mixing mmap and plain buffers, " |
838 | 0 | "switching to plain mode"); |
839 | |
|
840 | 0 | m = NXT_PORT_METHOD_PLAIN; |
841 | |
|
842 | 0 | break; |
843 | 0 | } |
844 | | |
845 | 0 | if (m == NXT_PORT_METHOD_ANY) { |
846 | 0 | nxt_debug(task, "using plain mode"); |
847 | |
|
848 | 0 | m = NXT_PORT_METHOD_PLAIN; |
849 | 0 | } |
850 | 0 | } |
851 | 0 | } |
852 | |
|
853 | 0 | return m; |
854 | 0 | } |
855 | | |
856 | | |
857 | | void |
858 | | nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process) |
859 | 0 | { |
860 | 0 | nxt_port_t *port; |
861 | |
|
862 | 0 | if (nxt_slow_path(process == NULL || nxt_queue_is_empty(&process->ports))) |
863 | 0 | { |
864 | 0 | return; |
865 | 0 | } |
866 | | |
867 | 0 | port = nxt_process_port_first(process); |
868 | |
|
869 | 0 | if (port->type == NXT_PROCESS_APP) { |
870 | 0 | nxt_port_post(task, port, nxt_port_broadcast_shm_ack, process); |
871 | 0 | } |
872 | 0 | } |
873 | | |
874 | | |
875 | | static void |
876 | | nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, void *data) |
877 | 0 | { |
878 | 0 | nxt_process_t *process; |
879 | |
|
880 | 0 | process = data; |
881 | |
|
882 | 0 | nxt_queue_each(port, &process->ports, nxt_port_t, link) { |
883 | 0 | (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK, |
884 | 0 | -1, 0, 0, NULL); |
885 | 0 | } nxt_queue_loop; |
886 | 0 | } |