Line | Count | Source (jump to first uncovered line) |
1 | | |
2 | | /* |
3 | | * Copyright (C) Igor Sysoev |
4 | | * Copyright (C) NGINX, Inc. |
5 | | */ |
6 | | |
7 | | #include <nxt_main.h> |
8 | | #include <nxt_runtime.h> |
9 | | #include <nxt_port.h> |
10 | | #include <nxt_router.h> |
11 | | #include <nxt_app_queue.h> |
12 | | #include <nxt_port_queue.h> |
13 | | |
14 | | |
15 | | static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, |
16 | | nxt_pid_t pid); |
17 | | static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); |
18 | | |
19 | | static nxt_atomic_uint_t nxt_port_last_id = 1; |
20 | | |
21 | | |
22 | | static void |
23 | | nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) |
24 | 0 | { |
25 | 0 | nxt_mp_t *mp; |
26 | 0 | nxt_port_t *port; |
27 | |
|
28 | 0 | port = obj; |
29 | 0 | mp = data; |
30 | |
|
31 | 0 | nxt_assert(port->pair[0] == -1); |
32 | 0 | nxt_assert(port->pair[1] == -1); |
33 | |
|
34 | 0 | nxt_assert(port->use_count == 0); |
35 | 0 | nxt_assert(port->app_link.next == NULL); |
36 | 0 | nxt_assert(port->idle_link.next == NULL); |
37 | |
|
38 | 0 | nxt_assert(nxt_queue_is_empty(&port->messages)); |
39 | 0 | nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams)); |
40 | 0 | nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers)); |
41 | |
|
42 | 0 | nxt_thread_mutex_destroy(&port->write_mutex); |
43 | |
|
44 | 0 | nxt_mp_free(mp, port); |
45 | 0 | } |
46 | | |
47 | | |
48 | | nxt_port_t * |
49 | | nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, |
50 | | nxt_process_type_t type) |
51 | 0 | { |
52 | 0 | nxt_mp_t *mp; |
53 | 0 | nxt_port_t *port; |
54 | |
|
55 | 0 | mp = nxt_mp_create(1024, 128, 256, 32); |
56 | |
|
57 | 0 | if (nxt_slow_path(mp == NULL)) { |
58 | 0 | return NULL; |
59 | 0 | } |
60 | | |
61 | 0 | port = nxt_mp_zalloc(mp, sizeof(nxt_port_t)); |
62 | |
|
63 | 0 | if (nxt_fast_path(port != NULL)) { |
64 | 0 | port->id = id; |
65 | 0 | port->pid = pid; |
66 | 0 | port->type = type; |
67 | 0 | port->mem_pool = mp; |
68 | 0 | port->use_count = 1; |
69 | |
|
70 | 0 | nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); |
71 | |
|
72 | 0 | nxt_queue_init(&port->messages); |
73 | 0 | nxt_thread_mutex_create(&port->write_mutex); |
74 | |
|
75 | 0 | port->queue_fd = -1; |
76 | |
|
77 | 0 | } else { |
78 | 0 | nxt_mp_destroy(mp); |
79 | 0 | } |
80 | |
|
81 | 0 | nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type); |
82 | |
|
83 | 0 | return port; |
84 | 0 | } |
85 | | |
86 | | |
87 | | void |
88 | | nxt_port_close(nxt_task_t *task, nxt_port_t *port) |
89 | 0 | { |
90 | 0 | size_t size; |
91 | |
|
92 | 0 | nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid, |
93 | 0 | port->id, port->type); |
94 | |
|
95 | 0 | if (port->pair[0] != -1) { |
96 | 0 | nxt_port_rpc_close(task, port); |
97 | |
|
98 | 0 | nxt_fd_close(port->pair[0]); |
99 | 0 | port->pair[0] = -1; |
100 | 0 | } |
101 | |
|
102 | 0 | if (port->pair[1] != -1) { |
103 | 0 | nxt_fd_close(port->pair[1]); |
104 | 0 | port->pair[1] = -1; |
105 | |
|
106 | 0 | if (port->app != NULL) { |
107 | 0 | nxt_router_app_port_close(task, port); |
108 | 0 | } |
109 | 0 | } |
110 | |
|
111 | 0 | if (port->queue_fd != -1) { |
112 | 0 | nxt_fd_close(port->queue_fd); |
113 | 0 | port->queue_fd = -1; |
114 | 0 | } |
115 | |
|
116 | 0 | if (port->queue != NULL) { |
117 | 0 | size = (port->id == (nxt_port_id_t) -1) ? sizeof(nxt_app_queue_t) |
118 | 0 | : sizeof(nxt_port_queue_t); |
119 | 0 | nxt_mem_munmap(port->queue, size); |
120 | |
|
121 | 0 | port->queue = NULL; |
122 | 0 | } |
123 | 0 | } |
124 | | |
125 | | |
126 | | static void |
127 | | nxt_port_release(nxt_task_t *task, nxt_port_t *port) |
128 | 0 | { |
129 | 0 | nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid, |
130 | 0 | port->id, port->type); |
131 | |
|
132 | 0 | port->app = NULL; |
133 | |
|
134 | 0 | if (port->link.next != NULL) { |
135 | 0 | nxt_assert(port->process != NULL); |
136 | |
|
137 | 0 | nxt_process_port_remove(port); |
138 | |
|
139 | 0 | nxt_process_use(task, port->process, -1); |
140 | 0 | } |
141 | |
|
142 | 0 | nxt_mp_release(port->mem_pool); |
143 | 0 | } |
144 | | |
145 | | |
146 | | nxt_port_id_t |
147 | | nxt_port_get_next_id(void) |
148 | 0 | { |
149 | 0 | return nxt_atomic_fetch_add(&nxt_port_last_id, 1); |
150 | 0 | } |
151 | | |
152 | | |
153 | | void |
154 | | nxt_port_reset_next_id(void) |
155 | 0 | { |
156 | 0 | nxt_port_last_id = 1; |
157 | 0 | } |
158 | | |
159 | | |
160 | | void |
161 | | nxt_port_enable(nxt_task_t *task, nxt_port_t *port, |
162 | | const nxt_port_handlers_t *handlers) |
163 | 0 | { |
164 | 0 | port->pid = nxt_pid; |
165 | 0 | port->handler = nxt_port_handler; |
166 | 0 | port->data = (nxt_port_handler_t *) (handlers); |
167 | |
|
168 | 0 | nxt_port_read_enable(task, port); |
169 | 0 | } |
170 | | |
171 | | |
172 | | static void |
173 | | nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) |
174 | 0 | { |
175 | 0 | nxt_port_handler_t *handlers; |
176 | |
|
177 | 0 | if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) { |
178 | |
|
179 | 0 | nxt_debug(task, "port %d: message type:%uD fds:%d,%d", |
180 | 0 | msg->port->socket.fd, msg->port_msg.type, |
181 | 0 | msg->fd[0], msg->fd[1]); |
182 | |
|
183 | 0 | handlers = msg->port->data; |
184 | 0 | handlers[msg->port_msg.type](task, msg); |
185 | |
|
186 | 0 | return; |
187 | 0 | } |
188 | | |
189 | 0 | nxt_alert(task, "port %d: unknown message type:%uD", |
190 | 0 | msg->port->socket.fd, msg->port_msg.type); |
191 | 0 | } |
192 | | |
193 | | |
194 | | void |
195 | | nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) |
196 | 0 | { |
197 | 0 | nxt_runtime_quit(task, 0); |
198 | 0 | } |
199 | | |
200 | | |
201 | | /* TODO join with process_ready and move to nxt_main_process.c */ |
202 | | nxt_inline void |
203 | | nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, |
204 | | nxt_port_t *new_port, uint32_t stream) |
205 | 0 | { |
206 | 0 | nxt_port_t *port; |
207 | 0 | nxt_process_t *process; |
208 | |
|
209 | 0 | nxt_debug(task, "new port %d for process %PI", |
210 | 0 | new_port->pair[1], new_port->pid); |
211 | |
|
212 | 0 | nxt_runtime_process_each(rt, process) { |
213 | |
|
214 | 0 | if (process->pid == new_port->pid || process->pid == nxt_pid) { |
215 | 0 | continue; |
216 | 0 | } |
217 | | |
218 | 0 | port = nxt_process_port_first(process); |
219 | |
|
220 | 0 | if (nxt_proc_send_matrix[port->type][new_port->type]) { |
221 | 0 | (void) nxt_port_send_port(task, port, new_port, stream); |
222 | 0 | } |
223 | |
|
224 | 0 | } nxt_runtime_process_loop; |
225 | 0 | } |
226 | | |
227 | | |
228 | | nxt_int_t |
229 | | nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, |
230 | | uint32_t stream) |
231 | 0 | { |
232 | 0 | nxt_buf_t *b; |
233 | 0 | nxt_port_msg_new_port_t *msg; |
234 | |
|
235 | 0 | b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, |
236 | 0 | sizeof(nxt_port_data_t)); |
237 | 0 | if (nxt_slow_path(b == NULL)) { |
238 | 0 | return NXT_ERROR; |
239 | 0 | } |
240 | | |
241 | 0 | nxt_debug(task, "send port %FD to process %PI", |
242 | 0 | new_port->pair[1], port->pid); |
243 | |
|
244 | 0 | b->mem.free += sizeof(nxt_port_msg_new_port_t); |
245 | 0 | msg = (nxt_port_msg_new_port_t *) b->mem.pos; |
246 | |
|
247 | 0 | msg->id = new_port->id; |
248 | 0 | msg->pid = new_port->pid; |
249 | 0 | msg->max_size = port->max_size; |
250 | 0 | msg->max_share = port->max_share; |
251 | 0 | msg->type = new_port->type; |
252 | |
|
253 | 0 | return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT, |
254 | 0 | new_port->pair[1], new_port->queue_fd, |
255 | 0 | stream, 0, b); |
256 | 0 | } |
257 | | |
258 | | |
259 | | void |
260 | | nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) |
261 | 0 | { |
262 | 0 | nxt_port_t *port; |
263 | 0 | nxt_runtime_t *rt; |
264 | 0 | nxt_port_msg_new_port_t *new_port_msg; |
265 | |
|
266 | 0 | rt = task->thread->runtime; |
267 | |
|
268 | 0 | new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; |
269 | | |
270 | | /* TODO check b size and make plain */ |
271 | |
|
272 | 0 | nxt_debug(task, "new port %d received for process %PI:%d", |
273 | 0 | msg->fd[0], new_port_msg->pid, new_port_msg->id); |
274 | |
|
275 | 0 | port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id); |
276 | 0 | if (port != NULL) { |
277 | 0 | nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, |
278 | 0 | new_port_msg->id); |
279 | |
|
280 | 0 | msg->u.new_port = port; |
281 | |
|
282 | 0 | nxt_fd_close(msg->fd[0]); |
283 | 0 | msg->fd[0] = -1; |
284 | 0 | return; |
285 | 0 | } |
286 | | |
287 | 0 | port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid, |
288 | 0 | new_port_msg->id, |
289 | 0 | new_port_msg->type); |
290 | 0 | if (nxt_slow_path(port == NULL)) { |
291 | 0 | return; |
292 | 0 | } |
293 | | |
294 | 0 | nxt_fd_nonblocking(task, msg->fd[0]); |
295 | |
|
296 | 0 | port->pair[0] = -1; |
297 | 0 | port->pair[1] = msg->fd[0]; |
298 | 0 | port->max_size = new_port_msg->max_size; |
299 | 0 | port->max_share = new_port_msg->max_share; |
300 | |
|
301 | 0 | port->socket.task = task; |
302 | |
|
303 | 0 | nxt_port_write_enable(task, port); |
304 | |
|
305 | 0 | msg->u.new_port = port; |
306 | 0 | } |
307 | | |
308 | | /* TODO move to nxt_main_process.c */ |
309 | | void |
310 | | nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) |
311 | 0 | { |
312 | 0 | nxt_port_t *port; |
313 | 0 | nxt_process_t *process; |
314 | 0 | nxt_runtime_t *rt; |
315 | |
|
316 | 0 | rt = task->thread->runtime; |
317 | |
|
318 | 0 | process = nxt_runtime_process_find(rt, msg->port_msg.pid); |
319 | 0 | if (nxt_slow_path(process == NULL)) { |
320 | 0 | return; |
321 | 0 | } |
322 | | |
323 | 0 | nxt_assert(process->state != NXT_PROCESS_STATE_READY); |
324 | |
|
325 | 0 | process->state = NXT_PROCESS_STATE_READY; |
326 | |
|
327 | 0 | nxt_assert(!nxt_queue_is_empty(&process->ports)); |
328 | |
|
329 | 0 | port = nxt_process_port_first(process); |
330 | |
|
331 | 0 | nxt_debug(task, "process %PI ready", msg->port_msg.pid); |
332 | |
|
333 | 0 | if (msg->fd[0] != -1) { |
334 | 0 | port->queue_fd = msg->fd[0]; |
335 | 0 | port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), |
336 | 0 | PROT_READ | PROT_WRITE, MAP_SHARED, |
337 | 0 | msg->fd[0], 0); |
338 | 0 | } |
339 | |
|
340 | 0 | nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); |
341 | 0 | } |
342 | | |
343 | | |
344 | | void |
345 | | nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) |
346 | 0 | { |
347 | 0 | nxt_runtime_t *rt; |
348 | 0 | nxt_process_t *process; |
349 | |
|
350 | 0 | rt = task->thread->runtime; |
351 | |
|
352 | 0 | if (nxt_slow_path(msg->fd[0] == -1)) { |
353 | 0 | nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message"); |
354 | |
|
355 | 0 | return; |
356 | 0 | } |
357 | | |
358 | 0 | process = nxt_runtime_process_find(rt, msg->port_msg.pid); |
359 | 0 | if (nxt_slow_path(process == NULL)) { |
360 | 0 | nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI", |
361 | 0 | msg->port_msg.pid); |
362 | |
|
363 | 0 | goto fail_close; |
364 | 0 | } |
365 | | |
366 | 0 | nxt_port_incoming_port_mmap(task, process, msg->fd[0]); |
367 | |
|
368 | 0 | fail_close: |
369 | |
|
370 | 0 | nxt_fd_close(msg->fd[0]); |
371 | 0 | } |
372 | | |
373 | | |
374 | | void |
375 | | nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, |
376 | | nxt_fd_t fd) |
377 | 0 | { |
378 | 0 | nxt_buf_t *b; |
379 | 0 | nxt_port_t *port; |
380 | 0 | nxt_process_t *process; |
381 | |
|
382 | 0 | nxt_debug(task, "change log file #%ui fd:%FD", slot, fd); |
383 | |
|
384 | 0 | nxt_runtime_process_each(rt, process) { |
385 | |
|
386 | 0 | if (nxt_pid == process->pid) { |
387 | 0 | continue; |
388 | 0 | } |
389 | | |
390 | 0 | port = nxt_process_port_first(process); |
391 | |
|
392 | 0 | b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, |
393 | 0 | sizeof(nxt_uint_t), 0); |
394 | 0 | if (nxt_slow_path(b == NULL)) { |
395 | 0 | continue; |
396 | 0 | } |
397 | | |
398 | 0 | b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t)); |
399 | |
|
400 | 0 | (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, |
401 | 0 | fd, 0, 0, b); |
402 | |
|
403 | 0 | } nxt_runtime_process_loop; |
404 | 0 | } |
405 | | |
406 | | |
407 | | void |
408 | | nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) |
409 | 0 | { |
410 | 0 | nxt_buf_t *b; |
411 | 0 | nxt_uint_t slot; |
412 | 0 | nxt_file_t *log_file; |
413 | 0 | nxt_runtime_t *rt; |
414 | |
|
415 | 0 | rt = task->thread->runtime; |
416 | |
|
417 | 0 | b = msg->buf; |
418 | 0 | slot = *(nxt_uint_t *) b->mem.pos; |
419 | |
|
420 | 0 | log_file = nxt_list_elt(rt->log_files, slot); |
421 | |
|
422 | 0 | nxt_debug(task, "change log file %FD:%FD", msg->fd[0], log_file->fd); |
423 | | |
424 | | /* |
425 | | * The old log file descriptor must be closed at the moment when no |
426 | | * other threads use it. dup2() allows to use the old file descriptor |
427 | | * for new log file. This change is performed atomically in the kernel. |
428 | | */ |
429 | 0 | if (nxt_file_redirect(log_file, msg->fd[0]) == NXT_OK) { |
430 | 0 | if (slot == 0) { |
431 | 0 | (void) nxt_file_stderr(log_file); |
432 | 0 | } |
433 | 0 | } |
434 | 0 | } |
435 | | |
436 | | |
437 | | void |
438 | | nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) |
439 | 0 | { |
440 | 0 | size_t dump_size; |
441 | 0 | nxt_buf_t *b; |
442 | |
|
443 | 0 | b = msg->buf; |
444 | 0 | dump_size = b->mem.free - b->mem.pos; |
445 | |
|
446 | 0 | if (dump_size > 300) { |
447 | 0 | dump_size = 300; |
448 | 0 | } |
449 | |
|
450 | 0 | nxt_debug(task, "data: %*s", dump_size, b->mem.pos); |
451 | 0 | } |
452 | | |
453 | | |
454 | | void |
455 | | nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process) |
456 | 0 | { |
457 | 0 | nxt_pid_t pid; |
458 | 0 | nxt_buf_t *buf; |
459 | 0 | nxt_port_t *port; |
460 | 0 | nxt_runtime_t *rt; |
461 | 0 | nxt_process_t *p; |
462 | 0 | nxt_process_type_t ptype; |
463 | |
|
464 | 0 | pid = process->pid; |
465 | |
|
466 | 0 | ptype = nxt_process_type(process); |
467 | |
|
468 | 0 | rt = task->thread->runtime; |
469 | |
|
470 | 0 | nxt_runtime_process_each(rt, p) { |
471 | |
|
472 | 0 | if (p->pid == nxt_pid |
473 | 0 | || p->pid == pid |
474 | 0 | || nxt_queue_is_empty(&p->ports)) |
475 | 0 | { |
476 | 0 | continue; |
477 | 0 | } |
478 | | |
479 | 0 | port = nxt_process_port_first(p); |
480 | |
|
481 | 0 | if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) { |
482 | 0 | continue; |
483 | 0 | } |
484 | | |
485 | 0 | buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, |
486 | 0 | sizeof(pid)); |
487 | |
|
488 | 0 | if (nxt_slow_path(buf == NULL)) { |
489 | 0 | continue; |
490 | 0 | } |
491 | | |
492 | 0 | buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); |
493 | |
|
494 | 0 | nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, |
495 | 0 | process->stream, 0, buf); |
496 | |
|
497 | 0 | } nxt_runtime_process_loop; |
498 | 0 | } |
499 | | |
500 | | |
501 | | void |
502 | | nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) |
503 | 0 | { |
504 | 0 | nxt_pid_t pid; |
505 | 0 | nxt_buf_t *buf; |
506 | |
|
507 | 0 | buf = msg->buf; |
508 | |
|
509 | 0 | nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); |
510 | |
|
511 | 0 | nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t)); |
512 | |
|
513 | 0 | nxt_port_remove_pid(task, msg, pid); |
514 | 0 | } |
515 | | |
516 | | |
517 | | static void |
518 | | nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, |
519 | | nxt_pid_t pid) |
520 | 0 | { |
521 | 0 | nxt_runtime_t *rt; |
522 | 0 | nxt_process_t *process; |
523 | |
|
524 | 0 | msg->u.removed_pid = pid; |
525 | |
|
526 | 0 | nxt_debug(task, "port remove pid %PI handler", pid); |
527 | |
|
528 | 0 | rt = task->thread->runtime; |
529 | |
|
530 | 0 | nxt_port_rpc_remove_peer(task, msg->port, pid); |
531 | |
|
532 | 0 | process = nxt_runtime_process_find(rt, pid); |
533 | |
|
534 | 0 | if (process) { |
535 | 0 | nxt_process_close_ports(task, process); |
536 | 0 | } |
537 | 0 | } |
538 | | |
539 | | |
540 | | void |
541 | | nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) |
542 | 0 | { |
543 | 0 | nxt_debug(task, "port empty handler"); |
544 | 0 | } |
545 | | |
546 | | |
547 | | typedef struct { |
548 | | nxt_work_t work; |
549 | | nxt_port_t *port; |
550 | | nxt_port_post_handler_t handler; |
551 | | } nxt_port_work_t; |
552 | | |
553 | | |
554 | | static void |
555 | | nxt_port_post_handler(nxt_task_t *task, void *obj, void *data) |
556 | 0 | { |
557 | 0 | nxt_port_t *port; |
558 | 0 | nxt_port_work_t *pw; |
559 | 0 | nxt_port_post_handler_t handler; |
560 | |
|
561 | 0 | pw = obj; |
562 | 0 | port = pw->port; |
563 | 0 | handler = pw->handler; |
564 | |
|
565 | 0 | nxt_free(pw); |
566 | |
|
567 | 0 | handler(task, port, data); |
568 | |
|
569 | 0 | nxt_port_use(task, port, -1); |
570 | 0 | } |
571 | | |
572 | | |
573 | | nxt_int_t |
574 | | nxt_port_post(nxt_task_t *task, nxt_port_t *port, |
575 | | nxt_port_post_handler_t handler, void *data) |
576 | 0 | { |
577 | 0 | nxt_port_work_t *pw; |
578 | |
|
579 | 0 | if (task->thread->engine == port->engine) { |
580 | 0 | handler(task, port, data); |
581 | |
|
582 | 0 | return NXT_OK; |
583 | 0 | } |
584 | | |
585 | 0 | pw = nxt_zalloc(sizeof(nxt_port_work_t)); |
586 | |
|
587 | 0 | if (nxt_slow_path(pw == NULL)) { |
588 | 0 | return NXT_ERROR; |
589 | 0 | } |
590 | | |
591 | 0 | nxt_atomic_fetch_add(&port->use_count, 1); |
592 | |
|
593 | 0 | pw->work.handler = nxt_port_post_handler; |
594 | 0 | pw->work.task = &port->engine->task; |
595 | 0 | pw->work.obj = pw; |
596 | 0 | pw->work.data = data; |
597 | |
|
598 | 0 | pw->port = port; |
599 | 0 | pw->handler = handler; |
600 | |
|
601 | 0 | nxt_event_engine_post(port->engine, &pw->work); |
602 | |
|
603 | 0 | return NXT_OK; |
604 | 0 | } |
605 | | |
606 | | |
607 | | static void |
608 | | nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data) |
609 | 0 | { |
610 | | /* no op */ |
611 | 0 | } |
612 | | |
613 | | |
614 | | void |
615 | | nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i) |
616 | 0 | { |
617 | 0 | int c; |
618 | |
|
619 | 0 | c = nxt_atomic_fetch_add(&port->use_count, i); |
620 | |
|
621 | 0 | if (i < 0 && c == -i) { |
622 | |
|
623 | 0 | if (port->engine == NULL || task->thread->engine == port->engine) { |
624 | 0 | nxt_port_release(task, port); |
625 | |
|
626 | 0 | return; |
627 | 0 | } |
628 | | |
629 | 0 | nxt_port_post(task, port, nxt_port_release_handler, NULL); |
630 | 0 | } |
631 | 0 | } |