Coverage Report

Created: 2025-09-05 06:02

/src/unit/src/nxt_router.c
Line
Count
Source (jump to first uncovered line)
1
2
/*
3
 * Copyright (C) Igor Sysoev
4
 * Copyright (C) Valentin V. Bartenev
5
 * Copyright (C) NGINX, Inc.
6
 */
7
8
#include <nxt_router.h>
9
#include <nxt_conf.h>
10
#include <nxt_status.h>
11
#if (NXT_TLS)
12
#include <nxt_cert.h>
13
#endif
14
#if (NXT_HAVE_NJS)
15
#include <nxt_script.h>
16
#endif
17
#include <nxt_http.h>
18
#include <nxt_port_memory_int.h>
19
#include <nxt_unit_request.h>
20
#include <nxt_unit_response.h>
21
#include <nxt_router_request.h>
22
#include <nxt_app_queue.h>
23
#include <nxt_port_queue.h>
24
#include <nxt_http_compression.h>
25
26
0
#define NXT_SHARED_PORT_ID  0xFFFFu
27
28
#if (NXT_HAVE_OTEL)
29
#define NXT_OTEL_BATCH_DEFAULT     128
30
#define NXT_OTEL_SAMPLING_DEFAULT  1
31
#endif
32
33
typedef struct {
34
    nxt_str_t         type;
35
    uint32_t          processes;
36
    uint32_t          max_processes;
37
    uint32_t          spare_processes;
38
    nxt_msec_t        timeout;
39
    nxt_msec_t        idle_timeout;
40
    nxt_conf_value_t  *limits_value;
41
    nxt_conf_value_t  *processes_value;
42
    nxt_conf_value_t  *targets_value;
43
} nxt_router_app_conf_t;
44
45
46
typedef struct {
47
    nxt_str_t         pass;
48
    nxt_str_t         application;
49
    int               backlog;
50
} nxt_router_listener_conf_t;
51
52
53
#if (NXT_TLS)
54
55
typedef struct {
56
    nxt_str_t               name;
57
    nxt_socket_conf_t       *socket_conf;
58
    nxt_router_temp_conf_t  *temp_conf;
59
    nxt_tls_init_t          *tls_init;
60
    nxt_bool_t              last;
61
62
    nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
63
} nxt_router_tlssock_t;
64
65
#endif
66
67
68
#if (NXT_HAVE_NJS)
69
70
typedef struct {
71
    nxt_str_t               name;
72
    nxt_router_temp_conf_t  *temp_conf;
73
    nxt_queue_link_t        link;
74
} nxt_router_js_module_t;
75
76
#endif
77
78
79
typedef struct {
80
    nxt_str_t               *name;
81
    nxt_socket_conf_t       *socket_conf;
82
    nxt_router_temp_conf_t  *temp_conf;
83
    nxt_bool_t              last;
84
} nxt_socket_rpc_t;
85
86
87
typedef struct {
88
    nxt_app_t               *app;
89
    nxt_router_temp_conf_t  *temp_conf;
90
    uint8_t                 proto;  /* 1 bit */
91
} nxt_app_rpc_t;
92
93
94
typedef struct {
95
    nxt_app_joint_t         *app_joint;
96
    uint32_t                generation;
97
    uint8_t                 proto;  /* 1 bit */
98
} nxt_app_joint_rpc_t;
99
100
101
static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
102
    nxt_mp_t *mp);
103
static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
104
static void nxt_router_greet_controller(nxt_task_t *task,
105
    nxt_port_t *controller_port);
106
107
static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
108
109
static void nxt_router_new_port_handler(nxt_task_t *task,
110
    nxt_port_recv_msg_t *msg);
111
static void nxt_router_conf_data_handler(nxt_task_t *task,
112
    nxt_port_recv_msg_t *msg);
113
static void nxt_router_app_restart_handler(nxt_task_t *task,
114
    nxt_port_recv_msg_t *msg);
115
static void nxt_router_status_handler(nxt_task_t *task,
116
    nxt_port_recv_msg_t *msg);
117
static void nxt_router_remove_pid_handler(nxt_task_t *task,
118
    nxt_port_recv_msg_t *msg);
119
120
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
121
static void nxt_router_conf_ready(nxt_task_t *task,
122
    nxt_router_temp_conf_t *tmcf);
123
static void nxt_router_conf_send(nxt_task_t *task,
124
    nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
125
126
static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
127
    nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
128
static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
129
    nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
130
static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
131
    nxt_mp_t *mp, nxt_conf_value_t *conf);
132
static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
133
    nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
134
135
static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
136
static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
137
static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
138
    nxt_app_t *app);
139
static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
140
    nxt_str_t *name);
141
static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
142
    int i);
143
144
static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
145
    nxt_port_t *port);
146
static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
147
    nxt_port_t *port);
148
static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
149
    nxt_port_t *port, nxt_fd_t fd);
150
static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
151
    nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
152
static void nxt_router_listen_socket_ready(nxt_task_t *task,
153
    nxt_port_recv_msg_t *msg, void *data);
154
static void nxt_router_listen_socket_error(nxt_task_t *task,
155
    nxt_port_recv_msg_t *msg, void *data);
156
#if (NXT_TLS)
157
static void nxt_router_tls_rpc_handler(nxt_task_t *task,
158
    nxt_port_recv_msg_t *msg, void *data);
159
static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
160
    nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
161
    nxt_bool_t last);
162
#endif
163
#if (NXT_HAVE_NJS)
164
static void nxt_router_js_module_rpc_handler(nxt_task_t *task,
165
    nxt_port_recv_msg_t *msg, void *data);
166
static nxt_int_t nxt_router_js_module_insert(nxt_router_temp_conf_t *tmcf,
167
    nxt_conf_value_t *value);
168
#endif
169
static void nxt_router_app_rpc_create(nxt_task_t *task,
170
    nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
171
static void nxt_router_app_prefork_ready(nxt_task_t *task,
172
    nxt_port_recv_msg_t *msg, void *data);
173
static void nxt_router_app_prefork_error(nxt_task_t *task,
174
    nxt_port_recv_msg_t *msg, void *data);
175
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
176
    nxt_router_temp_conf_t *tmcf, nxt_str_t *name, int backlog);
177
static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
178
    nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
179
180
static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
181
    nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
182
    const nxt_event_interface_t *interface);
183
static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
184
    nxt_router_engine_conf_t *recf);
185
static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
186
    nxt_router_engine_conf_t *recf);
187
static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
188
    nxt_router_engine_conf_t *recf);
189
static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
190
    nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
191
    nxt_work_handler_t handler);
192
static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
193
    nxt_router_engine_conf_t *recf);
194
static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
195
    nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
196
197
static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
198
    nxt_router_temp_conf_t *tmcf);
199
static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
200
    nxt_event_engine_t *engine);
201
static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
202
    nxt_router_temp_conf_t *tmcf);
203
204
static void nxt_router_engines_post(nxt_router_t *router,
205
    nxt_router_temp_conf_t *tmcf);
206
static void nxt_router_engine_post(nxt_event_engine_t *engine,
207
    nxt_work_t *jobs);
208
209
static void nxt_router_thread_start(void *data);
210
static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
211
    void *data);
212
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
213
    void *data);
214
static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
215
    void *data);
216
static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
217
    void *data);
218
static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
219
    void *data);
220
static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
221
    void *data);
222
static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
223
    void *data);
224
static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
225
    nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
226
static void nxt_router_listen_socket_release(nxt_task_t *task,
227
    nxt_socket_conf_t *skcf);
228
229
static void nxt_router_app_port_ready(nxt_task_t *task,
230
    nxt_port_recv_msg_t *msg, void *data);
231
static void nxt_router_app_port_error(nxt_task_t *task,
232
    nxt_port_recv_msg_t *msg, void *data);
233
234
static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
235
static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
236
237
static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
238
    nxt_port_t *port, nxt_apr_action_t action);
239
static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
240
    nxt_request_rpc_data_t *req_rpc_data);
241
static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
242
    void *data);
243
static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
244
    void *data);
245
246
static void nxt_router_app_prepare_request(nxt_task_t *task,
247
    nxt_request_rpc_data_t *req_rpc_data);
248
static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
249
    nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
250
251
static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
252
static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
253
    void *data);
254
static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
255
    void *data);
256
static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
257
    void *data);
258
static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
259
260
static const nxt_http_request_state_t  nxt_http_request_send_state;
261
static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
262
263
static void nxt_router_app_joint_use(nxt_task_t *task,
264
    nxt_app_joint_t *app_joint, int i);
265
266
static void nxt_router_http_request_release_post(nxt_task_t *task,
267
    nxt_http_request_t *r);
268
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
269
    void *data);
270
static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
271
static void nxt_router_get_port_handler(nxt_task_t *task,
272
    nxt_port_recv_msg_t *msg);
273
static void nxt_router_get_mmap_handler(nxt_task_t *task,
274
    nxt_port_recv_msg_t *msg);
275
276
extern const nxt_http_request_state_t  nxt_http_websocket;
277
278
nxt_router_t  *nxt_router;
279
280
static const nxt_str_t http_prefix = nxt_string("HTTP_");
281
static const nxt_str_t empty_prefix = nxt_string("");
282
283
static const nxt_str_t  *nxt_app_msg_prefix[] = {
284
    [NXT_APP_EXTERNAL]  = &empty_prefix,
285
    [NXT_APP_PYTHON]    = &empty_prefix,
286
    [NXT_APP_PHP]       = &http_prefix,
287
    [NXT_APP_PERL]      = &http_prefix,
288
    [NXT_APP_RUBY]      = &http_prefix,
289
    [NXT_APP_JAVA]      = &empty_prefix,
290
    [NXT_APP_WASM]      = &empty_prefix,
291
    [NXT_APP_WASM_WC]   = &empty_prefix,
292
};
293
294
295
static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
296
    .quit         = nxt_signal_quit_handler,
297
    .new_port     = nxt_router_new_port_handler,
298
    .get_port     = nxt_router_get_port_handler,
299
    .change_file  = nxt_port_change_log_file_handler,
300
    .mmap         = nxt_port_mmap_handler,
301
    .get_mmap     = nxt_router_get_mmap_handler,
302
    .data         = nxt_router_conf_data_handler,
303
    .app_restart  = nxt_router_app_restart_handler,
304
    .status       = nxt_router_status_handler,
305
    .remove_pid   = nxt_router_remove_pid_handler,
306
    .access_log   = nxt_router_access_log_reopen_handler,
307
    .rpc_ready    = nxt_port_rpc_handler,
308
    .rpc_error    = nxt_port_rpc_handler,
309
    .oosm         = nxt_router_oosm_handler,
310
};
311
312
313
const nxt_process_init_t  nxt_router_process = {
314
    .name           = "router",
315
    .type           = NXT_PROCESS_ROUTER,
316
    .prefork        = nxt_router_prefork,
317
    .restart        = 1,
318
    .setup          = nxt_process_core_setup,
319
    .start          = nxt_router_start,
320
    .port_handlers  = &nxt_router_process_port_handlers,
321
    .signals        = nxt_process_signals,
322
};
323
324
325
/* Queues of nxt_socket_conf_t */
326
nxt_queue_t  creating_sockets;
327
nxt_queue_t  pending_sockets;
328
nxt_queue_t  updating_sockets;
329
nxt_queue_t  keeping_sockets;
330
nxt_queue_t  deleting_sockets;
331
332
333
static nxt_int_t
334
nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
335
0
{
336
0
    nxt_runtime_stop_app_processes(task, task->thread->runtime);
337
338
0
    return NXT_OK;
339
0
}
340
341
342
static nxt_int_t
343
nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
344
0
{
345
0
    nxt_int_t      ret;
346
0
    nxt_port_t     *controller_port;
347
0
    nxt_router_t   *router;
348
0
    nxt_runtime_t  *rt;
349
350
0
    rt = task->thread->runtime;
351
352
0
    nxt_log(task, NXT_LOG_INFO, "router started");
353
354
#if (NXT_TLS)
355
    rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
356
    if (nxt_slow_path(rt->tls == NULL)) {
357
        return NXT_ERROR;
358
    }
359
360
    ret = rt->tls->library_init(task);
361
    if (nxt_slow_path(ret != NXT_OK)) {
362
        return ret;
363
    }
364
#endif
365
366
0
    ret = nxt_http_init(task);
367
0
    if (nxt_slow_path(ret != NXT_OK)) {
368
0
        return ret;
369
0
    }
370
371
0
    router = nxt_zalloc(sizeof(nxt_router_t));
372
0
    if (nxt_slow_path(router == NULL)) {
373
0
        return NXT_ERROR;
374
0
    }
375
376
0
    nxt_queue_init(&router->engines);
377
0
    nxt_queue_init(&router->sockets);
378
0
    nxt_queue_init(&router->apps);
379
380
0
    nxt_router = router;
381
382
0
    controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
383
0
    if (controller_port != NULL) {
384
0
        nxt_router_greet_controller(task, controller_port);
385
0
    }
386
387
0
    return NXT_OK;
388
0
}
389
390
391
static void
392
nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
393
0
{
394
0
    nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
395
0
                          -1, 0, 0, NULL);
396
0
}
397
398
399
static void
400
nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
401
    void *data)
402
0
{
403
0
    size_t               size;
404
0
    uint32_t             stream;
405
0
    nxt_fd_t             port_fd, queue_fd;
406
0
    nxt_int_t            ret;
407
0
    nxt_app_t            *app;
408
0
    nxt_buf_t            *b;
409
0
    nxt_port_t           *dport;
410
0
    nxt_runtime_t        *rt;
411
0
    nxt_app_joint_rpc_t  *app_joint_rpc;
412
413
0
    app = data;
414
415
0
    nxt_thread_mutex_lock(&app->mutex);
416
417
0
    dport = app->proto_port;
418
419
0
    nxt_thread_mutex_unlock(&app->mutex);
420
421
0
    if (dport != NULL) {
422
0
        nxt_debug(task, "app '%V' %p start process", &app->name, app);
423
424
0
        b = NULL;
425
0
        port_fd = -1;
426
0
        queue_fd = -1;
427
428
0
    } else {
429
0
        if (app->proto_port_requests > 0) {
430
0
            nxt_debug(task, "app '%V' %p wait for prototype process",
431
0
                      &app->name, app);
432
433
0
            app->proto_port_requests++;
434
435
0
            goto skip;
436
0
        }
437
438
0
        nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
439
440
0
        rt = task->thread->runtime;
441
0
        dport = rt->port_by_type[NXT_PROCESS_MAIN];
442
443
0
        size = app->name.length + 1 + app->conf.length;
444
445
0
        b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
446
0
        if (nxt_slow_path(b == NULL)) {
447
0
            goto failed;
448
0
        }
449
450
0
        nxt_buf_cpystr(b, &app->name);
451
0
        *b->mem.free++ = '\0';
452
0
        nxt_buf_cpystr(b, &app->conf);
453
454
0
        port_fd = app->shared_port->pair[0];
455
0
        queue_fd = app->shared_port->queue_fd;
456
0
    }
457
458
0
    app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
459
0
                                                     nxt_router_app_port_ready,
460
0
                                                     nxt_router_app_port_error,
461
0
                                                   sizeof(nxt_app_joint_rpc_t));
462
0
    if (nxt_slow_path(app_joint_rpc == NULL)) {
463
0
        goto failed;
464
0
    }
465
466
0
    stream = nxt_port_rpc_ex_stream(app_joint_rpc);
467
468
0
    ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
469
0
                                 port_fd, queue_fd, stream, port->id, b);
470
0
    if (nxt_slow_path(ret != NXT_OK)) {
471
0
        nxt_port_rpc_cancel(task, port, stream);
472
473
0
        goto failed;
474
0
    }
475
476
0
    app_joint_rpc->app_joint = app->joint;
477
0
    app_joint_rpc->generation = app->generation;
478
0
    app_joint_rpc->proto = (b != NULL);
479
480
0
    if (b != NULL) {
481
0
        app->proto_port_requests++;
482
483
0
        b = NULL;
484
0
    }
485
486
0
    nxt_router_app_joint_use(task, app->joint, 1);
487
488
0
failed:
489
490
0
    if (b != NULL) {
491
0
        nxt_mp_free(b->data, b);
492
0
    }
493
494
0
skip:
495
496
0
    nxt_router_app_use(task, app, -1);
497
0
}
498
499
500
static void
501
nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
502
0
{
503
0
    app_joint->use_count += i;
504
505
0
    if (app_joint->use_count == 0) {
506
0
        nxt_assert(app_joint->app == NULL);
507
508
0
        nxt_free(app_joint);
509
0
    }
510
0
}
511
512
513
static nxt_int_t
514
nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
515
0
{
516
0
    nxt_int_t      res;
517
0
    nxt_port_t     *router_port;
518
0
    nxt_runtime_t  *rt;
519
520
0
    nxt_debug(task, "app '%V' start process", &app->name);
521
522
0
    rt = task->thread->runtime;
523
0
    router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
524
525
0
    nxt_router_app_use(task, app, 1);
526
527
0
    res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
528
0
                        app);
529
530
0
    if (res == NXT_OK) {
531
0
        return res;
532
0
    }
533
534
0
    nxt_thread_mutex_lock(&app->mutex);
535
536
0
    app->pending_processes--;
537
538
0
    nxt_thread_mutex_unlock(&app->mutex);
539
540
0
    nxt_router_app_use(task, app, -1);
541
542
0
    return NXT_ERROR;
543
0
}
544
545
546
nxt_inline nxt_bool_t
547
nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
548
0
{
549
0
    nxt_buf_t       *b, *next;
550
0
    nxt_bool_t      cancelled;
551
0
    nxt_port_t      *app_port;
552
0
    nxt_msg_info_t  *msg_info;
553
554
0
    msg_info = &req_rpc_data->msg_info;
555
556
0
    if (msg_info->buf == NULL) {
557
0
        return 0;
558
0
    }
559
560
0
    app_port = req_rpc_data->app_port;
561
562
0
    if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
563
0
        cancelled = nxt_app_queue_cancel(app_port->queue,
564
0
                                         msg_info->tracking_cookie,
565
0
                                         req_rpc_data->stream);
566
567
0
        if (cancelled) {
568
0
            nxt_debug(task, "stream #%uD: cancelled by router",
569
0
                      req_rpc_data->stream);
570
0
        }
571
572
0
    } else {
573
0
        cancelled = 0;
574
0
    }
575
576
0
    for (b = msg_info->buf; b != NULL; b = next) {
577
0
        next = b->next;
578
0
        b->next = NULL;
579
580
0
        if (b->is_port_mmap_sent) {
581
0
            b->is_port_mmap_sent = cancelled == 0;
582
0
        }
583
584
0
        b->completion_handler(task, b, b->parent);
585
0
    }
586
587
0
    msg_info->buf = NULL;
588
589
0
    return cancelled;
590
0
}
591
592
593
nxt_inline nxt_bool_t
594
nxt_queue_chk_remove(nxt_queue_link_t *lnk)
595
0
{
596
0
    if (lnk->next != NULL) {
597
0
        nxt_queue_remove(lnk);
598
599
0
        lnk->next = NULL;
600
601
0
        return 1;
602
0
    }
603
604
0
    return 0;
605
0
}
606
607
608
nxt_inline void
609
nxt_request_rpc_data_unlink(nxt_task_t *task,
610
    nxt_request_rpc_data_t *req_rpc_data)
611
0
{
612
0
    nxt_app_t           *app;
613
0
    nxt_bool_t          unlinked;
614
0
    nxt_http_request_t  *r;
615
616
0
    nxt_router_msg_cancel(task, req_rpc_data);
617
618
0
    app = req_rpc_data->app;
619
620
0
    if (req_rpc_data->app_port != NULL) {
621
0
        nxt_router_app_port_release(task, app, req_rpc_data->app_port,
622
0
                                    req_rpc_data->apr_action);
623
624
0
        req_rpc_data->app_port = NULL;
625
0
    }
626
627
0
    r = req_rpc_data->request;
628
629
0
    if (r != NULL) {
630
0
        r->timer_data = NULL;
631
632
0
        nxt_router_http_request_release_post(task, r);
633
634
0
        r->req_rpc_data = NULL;
635
0
        req_rpc_data->request = NULL;
636
637
0
        if (app != NULL) {
638
0
            unlinked = 0;
639
640
0
            nxt_thread_mutex_lock(&app->mutex);
641
642
0
            if (r->app_link.next != NULL) {
643
0
                nxt_queue_remove(&r->app_link);
644
0
                r->app_link.next = NULL;
645
646
0
                unlinked = 1;
647
0
            }
648
649
0
            nxt_thread_mutex_unlock(&app->mutex);
650
651
0
            if (unlinked) {
652
0
                nxt_mp_release(r->mem_pool);
653
0
            }
654
0
        }
655
0
    }
656
657
0
    if (app != NULL) {
658
0
        nxt_router_app_use(task, app, -1);
659
660
0
        req_rpc_data->app = NULL;
661
0
    }
662
663
0
    if (req_rpc_data->msg_info.body_fd != -1) {
664
0
        nxt_fd_close(req_rpc_data->msg_info.body_fd);
665
666
0
        req_rpc_data->msg_info.body_fd = -1;
667
0
    }
668
669
0
    if (req_rpc_data->rpc_cancel) {
670
0
        req_rpc_data->rpc_cancel = 0;
671
672
0
        nxt_port_rpc_cancel(task, task->thread->engine->port,
673
0
                            req_rpc_data->stream);
674
0
    }
675
0
}
676
677
678
static void
679
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
680
0
{
681
0
    nxt_int_t      res;
682
0
    nxt_app_t      *app;
683
0
    nxt_port_t     *port, *main_app_port;
684
0
    nxt_runtime_t  *rt;
685
686
0
    nxt_port_new_port_handler(task, msg);
687
688
0
    port = msg->u.new_port;
689
690
0
    if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
691
0
        nxt_router_greet_controller(task, msg->u.new_port);
692
0
    }
693
694
0
    if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
695
0
        nxt_port_rpc_handler(task, msg);
696
697
0
        return;
698
0
    }
699
700
0
    if (port == NULL || port->type != NXT_PROCESS_APP) {
701
702
0
        if (msg->port_msg.stream == 0) {
703
0
            return;
704
0
        }
705
706
0
        msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
707
708
0
    } else {
709
0
        if (msg->fd[1] != -1) {
710
0
            res = nxt_router_port_queue_map(task, port, msg->fd[1]);
711
0
            if (nxt_slow_path(res != NXT_OK)) {
712
0
                return;
713
0
            }
714
715
0
            nxt_fd_close(msg->fd[1]);
716
0
            msg->fd[1] = -1;
717
0
        }
718
0
    }
719
720
0
    if (msg->port_msg.stream != 0) {
721
0
        nxt_port_rpc_handler(task, msg);
722
0
        return;
723
0
    }
724
725
0
    nxt_debug(task, "new port id %d (%d)", port->id, port->type);
726
727
    /*
728
     * Port with "id == 0" is application 'main' port and it always
729
     * should come with non-zero stream.
730
     */
731
0
    nxt_assert(port->id != 0);
732
733
    /* Find 'main' app port and get app reference. */
734
0
    rt = task->thread->runtime;
735
736
    /*
737
     * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
738
     * sent to main port (with id == 0) and processed in main thread.
739
     */
740
0
    main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
741
0
    nxt_assert(main_app_port != NULL);
742
743
0
    app = main_app_port->app;
744
745
0
    if (nxt_fast_path(app != NULL)) {
746
0
        nxt_thread_mutex_lock(&app->mutex);
747
748
        /* TODO here should be find-and-add code because there can be
749
           port waiters in port_hash */
750
0
        nxt_port_hash_add(&app->port_hash, port);
751
0
        app->port_hash_count++;
752
753
0
        nxt_thread_mutex_unlock(&app->mutex);
754
755
0
        port->app = app;
756
0
    }
757
758
0
    port->main_app_port = main_app_port;
759
760
0
    nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
761
0
}
762
763
764
static void
765
nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
766
0
{
767
0
    void                    *p;
768
0
    size_t                  size;
769
0
    nxt_int_t               ret;
770
0
    nxt_port_t              *port;
771
0
    nxt_router_temp_conf_t  *tmcf;
772
773
0
    port = nxt_runtime_port_find(task->thread->runtime,
774
0
                                 msg->port_msg.pid,
775
0
                                 msg->port_msg.reply_port);
776
0
    if (nxt_slow_path(port == NULL)) {
777
0
        nxt_alert(task, "conf_data_handler: reply port not found");
778
0
        return;
779
0
    }
780
781
0
    p = MAP_FAILED;
782
783
    /*
784
     * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
785
     * initialized in 'cleanup' section.
786
     */
787
0
    size = 0;
788
789
0
    tmcf = nxt_router_temp_conf(task);
790
0
    if (nxt_slow_path(tmcf == NULL)) {
791
0
        goto fail;
792
0
    }
793
794
0
    if (nxt_slow_path(msg->fd[0] == -1)) {
795
0
        nxt_alert(task, "conf_data_handler: invalid shm fd");
796
0
        goto fail;
797
0
    }
798
799
0
    if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
800
0
        nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
801
0
                  (int) nxt_buf_mem_used_size(&msg->buf->mem));
802
0
        goto fail;
803
0
    }
804
805
0
    nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
806
807
0
    p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
808
809
0
    nxt_fd_close(msg->fd[0]);
810
0
    msg->fd[0] = -1;
811
812
0
    if (nxt_slow_path(p == MAP_FAILED)) {
813
0
        goto fail;
814
0
    }
815
816
0
    nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
817
818
0
    tmcf->router_conf->router = nxt_router;
819
0
    tmcf->stream = msg->port_msg.stream;
820
0
    tmcf->port = port;
821
822
0
    nxt_port_use(task, tmcf->port, 1);
823
824
0
    ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
825
826
0
    if (nxt_fast_path(ret == NXT_OK)) {
827
0
        nxt_router_conf_apply(task, tmcf, NULL);
828
829
0
    } else {
830
0
        nxt_router_conf_error(task, tmcf);
831
0
    }
832
833
0
    goto cleanup;
834
835
0
fail:
836
837
0
    nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
838
0
                          msg->port_msg.stream, 0, NULL);
839
840
0
    if (tmcf != NULL) {
841
0
        nxt_mp_release(tmcf->mem_pool);
842
0
    }
843
844
0
cleanup:
845
846
0
    if (p != MAP_FAILED) {
847
0
        nxt_mem_munmap(p, size);
848
0
    }
849
850
0
    if (msg->fd[0] != -1) {
851
0
        nxt_fd_close(msg->fd[0]);
852
0
        msg->fd[0] = -1;
853
0
    }
854
0
}
855
856
857
static void
858
nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
859
0
{
860
0
    nxt_app_t            *app;
861
0
    nxt_int_t            ret;
862
0
    nxt_str_t            app_name;
863
0
    nxt_port_t           *reply_port, *shared_port, *old_shared_port;
864
0
    nxt_port_t           *proto_port;
865
0
    nxt_port_msg_type_t  reply;
866
867
0
    reply_port = nxt_runtime_port_find(task->thread->runtime,
868
0
                                       msg->port_msg.pid,
869
0
                                       msg->port_msg.reply_port);
870
0
    if (nxt_slow_path(reply_port == NULL)) {
871
0
        nxt_alert(task, "app_restart_handler: reply port not found");
872
0
        return;
873
0
    }
874
875
0
    app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
876
0
    app_name.start = msg->buf->mem.pos;
877
878
0
    nxt_debug(task, "app_restart_handler: %V", &app_name);
879
880
0
    app = nxt_router_app_find(&nxt_router->apps, &app_name);
881
882
0
    if (nxt_fast_path(app != NULL)) {
883
0
        shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
884
0
                                   NXT_PROCESS_APP);
885
0
        if (nxt_slow_path(shared_port == NULL)) {
886
0
            goto fail;
887
0
        }
888
889
0
        ret = nxt_port_socket_init(task, shared_port, 0);
890
0
        if (nxt_slow_path(ret != NXT_OK)) {
891
0
            nxt_port_use(task, shared_port, -1);
892
0
            goto fail;
893
0
        }
894
895
0
        ret = nxt_router_app_queue_init(task, shared_port);
896
0
        if (nxt_slow_path(ret != NXT_OK)) {
897
0
            nxt_port_write_close(shared_port);
898
0
            nxt_port_read_close(shared_port);
899
0
            nxt_port_use(task, shared_port, -1);
900
0
            goto fail;
901
0
        }
902
903
0
        nxt_port_write_enable(task, shared_port);
904
905
0
        nxt_thread_mutex_lock(&app->mutex);
906
907
0
        proto_port = app->proto_port;
908
909
0
        if (proto_port != NULL) {
910
0
            nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
911
0
                      proto_port->pid);
912
913
0
            app->proto_port = NULL;
914
0
            proto_port->app = NULL;
915
0
        }
916
917
0
        app->generation++;
918
919
0
        shared_port->app = app;
920
921
0
        old_shared_port = app->shared_port;
922
0
        old_shared_port->app = NULL;
923
924
0
        app->shared_port = shared_port;
925
926
0
        nxt_thread_mutex_unlock(&app->mutex);
927
928
0
        nxt_port_close(task, old_shared_port);
929
0
        nxt_port_use(task, old_shared_port, -1);
930
931
0
        if (proto_port != NULL) {
932
0
            (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
933
0
                                         -1, 0, 0, NULL);
934
935
0
            nxt_port_close(task, proto_port);
936
937
0
            nxt_port_use(task, proto_port, -1);
938
0
        }
939
940
0
        reply = NXT_PORT_MSG_RPC_READY_LAST;
941
942
0
    } else {
943
944
0
fail:
945
946
0
        reply = NXT_PORT_MSG_RPC_ERROR;
947
0
    }
948
949
0
    nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
950
0
                          0, NULL);
951
0
}
952
953
954
static void
955
nxt_router_status_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
956
0
{
957
0
    u_char               *p;
958
0
    size_t               alloc;
959
0
    nxt_app_t            *app;
960
0
    nxt_buf_t            *b;
961
0
    nxt_uint_t           type;
962
0
    nxt_port_t           *port;
963
0
    nxt_status_app_t     *app_stat;
964
0
    nxt_event_engine_t   *engine;
965
0
    nxt_status_report_t  *report;
966
967
0
    port = nxt_runtime_port_find(task->thread->runtime,
968
0
                                 msg->port_msg.pid,
969
0
                                 msg->port_msg.reply_port);
970
0
    if (nxt_slow_path(port == NULL)) {
971
0
        nxt_alert(task, "nxt_router_status_handler(): reply port not found");
972
0
        return;
973
0
    }
974
975
0
    alloc = sizeof(nxt_status_report_t);
976
977
0
    nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
978
979
0
        alloc += sizeof(nxt_status_app_t) + app->name.length;
980
981
0
    } nxt_queue_loop;
982
983
0
    b = nxt_buf_mem_alloc(port->mem_pool, alloc, 0);
984
0
    if (nxt_slow_path(b == NULL)) {
985
0
        type = NXT_PORT_MSG_RPC_ERROR;
986
0
        goto fail;
987
0
    }
988
989
0
    report = (nxt_status_report_t *) b->mem.free;
990
0
    b->mem.free = b->mem.end;
991
992
0
    nxt_memzero(report, sizeof(nxt_status_report_t));
993
994
0
    nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) {
995
996
0
        report->accepted_conns += engine->accepted_conns_cnt;
997
0
        report->idle_conns += engine->idle_conns_cnt;
998
0
        report->closed_conns += engine->closed_conns_cnt;
999
0
        report->requests += engine->requests_cnt;
1000
1001
0
    } nxt_queue_loop;
1002
1003
0
    report->apps_count = 0;
1004
0
    app_stat = report->apps;
1005
0
    p = b->mem.end;
1006
1007
0
    nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
1008
0
        p -= app->name.length;
1009
1010
0
        nxt_memcpy(p, app->name.start, app->name.length);
1011
1012
0
        app_stat->name.length = app->name.length;
1013
0
        app_stat->name.start = (u_char *) (p - b->mem.pos);
1014
1015
0
        app_stat->active_requests = app->active_requests;
1016
0
        app_stat->pending_processes = app->pending_processes;
1017
0
        app_stat->processes = app->processes;
1018
0
        app_stat->idle_processes = app->idle_processes;
1019
1020
0
        report->apps_count++;
1021
0
        app_stat++;
1022
0
    } nxt_queue_loop;
1023
1024
0
    type = NXT_PORT_MSG_RPC_READY_LAST;
1025
1026
0
fail:
1027
1028
0
    nxt_port_socket_write(task, port, type, -1, msg->port_msg.stream, 0, b);
1029
0
}
1030
1031
1032
static void
1033
nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
1034
    void *data)
1035
0
{
1036
0
    union {
1037
0
        nxt_pid_t  removed_pid;
1038
0
        void       *data;
1039
0
    } u;
1040
1041
0
    u.data = data;
1042
1043
0
    nxt_port_rpc_remove_peer(task, port, u.removed_pid);
1044
0
}
1045
1046
1047
static void
1048
nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1049
0
{
1050
0
    nxt_event_engine_t  *engine;
1051
1052
0
    nxt_port_remove_pid_handler(task, msg);
1053
1054
0
    nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
1055
0
    {
1056
0
        if (nxt_fast_path(engine->port != NULL)) {
1057
0
            nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
1058
0
                          msg->u.data);
1059
0
        }
1060
0
    }
1061
0
    nxt_queue_loop;
1062
1063
0
    if (msg->port_msg.stream == 0) {
1064
0
        return;
1065
0
    }
1066
1067
0
    msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
1068
1069
0
    nxt_port_rpc_handler(task, msg);
1070
0
}
1071
1072
1073
static nxt_router_temp_conf_t *
1074
nxt_router_temp_conf(nxt_task_t *task)
1075
0
{
1076
0
    nxt_mp_t                *mp, *tmp;
1077
0
    nxt_router_conf_t       *rtcf;
1078
0
    nxt_router_temp_conf_t  *tmcf;
1079
1080
0
    mp = nxt_mp_create(1024, 128, 256, 32);
1081
0
    if (nxt_slow_path(mp == NULL)) {
1082
0
        return NULL;
1083
0
    }
1084
1085
0
    rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1086
0
    if (nxt_slow_path(rtcf == NULL)) {
1087
0
        goto out_free_mp;
1088
0
    }
1089
1090
0
    rtcf->mem_pool = mp;
1091
1092
0
    rtcf->tstr_state = nxt_tstr_state_new(mp, 0);
1093
0
    if (nxt_slow_path(rtcf->tstr_state == NULL)) {
1094
0
        goto out_free_mp;
1095
0
    }
1096
1097
#if (NXT_HAVE_NJS)
1098
    nxt_http_register_js_proto(rtcf->tstr_state->jcf);
1099
#endif
1100
1101
0
    tmp = nxt_mp_create(1024, 128, 256, 32);
1102
0
    if (nxt_slow_path(tmp == NULL)) {
1103
0
        goto out_free_tstr_state;
1104
0
    }
1105
1106
0
    tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1107
0
    if (nxt_slow_path(tmcf == NULL)) {
1108
0
        goto out_free;
1109
0
    }
1110
1111
0
    tmcf->mem_pool = tmp;
1112
0
    tmcf->router_conf = rtcf;
1113
0
    tmcf->count = 1;
1114
0
    tmcf->engine = task->thread->engine;
1115
1116
0
    tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1117
0
                                     sizeof(nxt_router_engine_conf_t));
1118
0
    if (nxt_slow_path(tmcf->engines == NULL)) {
1119
0
        goto out_free;
1120
0
    }
1121
1122
0
    nxt_queue_init(&creating_sockets);
1123
0
    nxt_queue_init(&pending_sockets);
1124
0
    nxt_queue_init(&updating_sockets);
1125
0
    nxt_queue_init(&keeping_sockets);
1126
0
    nxt_queue_init(&deleting_sockets);
1127
1128
#if (NXT_TLS)
1129
    nxt_queue_init(&tmcf->tls);
1130
#endif
1131
1132
#if (NXT_HAVE_NJS)
1133
    nxt_queue_init(&tmcf->js_modules);
1134
#endif
1135
1136
0
    nxt_queue_init(&tmcf->apps);
1137
0
    nxt_queue_init(&tmcf->previous);
1138
1139
0
    return tmcf;
1140
1141
0
out_free:
1142
1143
0
    nxt_mp_destroy(tmp);
1144
1145
0
out_free_tstr_state:
1146
1147
0
    if (rtcf->tstr_state != NULL) {
1148
0
        nxt_tstr_state_release(rtcf->tstr_state);
1149
0
    }
1150
1151
0
out_free_mp:
1152
1153
0
    nxt_mp_destroy(mp);
1154
1155
0
    return NULL;
1156
0
}
1157
1158
1159
nxt_inline nxt_bool_t
1160
nxt_router_app_can_start(nxt_app_t *app)
1161
0
{
1162
0
    return app->processes + app->pending_processes < app->max_processes
1163
0
            && app->pending_processes < app->max_pending_processes;
1164
0
}
1165
1166
1167
nxt_inline nxt_bool_t
1168
nxt_router_app_need_start(nxt_app_t *app)
1169
0
{
1170
0
    return (app->active_requests
1171
0
              > app->port_hash_count + app->pending_processes)
1172
0
           || (app->spare_processes
1173
0
                > app->idle_processes + app->pending_processes);
1174
0
}
1175
1176
1177
void
1178
nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1179
0
{
1180
0
    nxt_int_t                    ret;
1181
0
    nxt_app_t                    *app;
1182
0
    nxt_router_t                 *router;
1183
0
    nxt_runtime_t                *rt;
1184
0
    nxt_queue_link_t             *qlk;
1185
0
    nxt_socket_conf_t            *skcf;
1186
0
    nxt_router_conf_t            *rtcf;
1187
0
    nxt_router_temp_conf_t       *tmcf;
1188
0
    const nxt_event_interface_t  *interface;
1189
#if (NXT_TLS)
1190
    nxt_router_tlssock_t         *tls;
1191
#endif
1192
#if (NXT_HAVE_NJS)
1193
    nxt_router_js_module_t       *js_module;
1194
#endif
1195
1196
0
    tmcf = obj;
1197
1198
0
    qlk = nxt_queue_first(&pending_sockets);
1199
1200
0
    if (qlk != nxt_queue_tail(&pending_sockets)) {
1201
0
        nxt_queue_remove(qlk);
1202
0
        nxt_queue_insert_tail(&creating_sockets, qlk);
1203
1204
0
        skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1205
1206
0
        nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1207
1208
0
        return;
1209
0
    }
1210
1211
#if (NXT_TLS)
1212
    qlk = nxt_queue_last(&tmcf->tls);
1213
1214
    if (qlk != nxt_queue_head(&tmcf->tls)) {
1215
        nxt_queue_remove(qlk);
1216
1217
        tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1218
1219
        nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1220
                           nxt_router_tls_rpc_handler, tls);
1221
        return;
1222
    }
1223
#endif
1224
1225
#if (NXT_HAVE_NJS)
1226
    qlk = nxt_queue_last(&tmcf->js_modules);
1227
1228
    if (qlk != nxt_queue_head(&tmcf->js_modules)) {
1229
        nxt_queue_remove(qlk);
1230
1231
        js_module = nxt_queue_link_data(qlk, nxt_router_js_module_t, link);
1232
1233
        nxt_script_store_get(task, &js_module->name, tmcf->mem_pool,
1234
                             nxt_router_js_module_rpc_handler, js_module);
1235
        return;
1236
    }
1237
#endif
1238
1239
0
    rtcf = tmcf->router_conf;
1240
1241
0
    ret = nxt_tstr_state_done(rtcf->tstr_state, NULL);
1242
0
    if (nxt_slow_path(ret != NXT_OK)) {
1243
0
        goto fail;
1244
0
    }
1245
1246
0
    nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1247
1248
0
        if (nxt_router_app_need_start(app)) {
1249
0
            nxt_router_app_rpc_create(task, tmcf, app);
1250
0
            return;
1251
0
        }
1252
1253
0
    } nxt_queue_loop;
1254
1255
0
    if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1256
0
        nxt_router_access_log_open(task, tmcf);
1257
0
        return;
1258
0
    }
1259
1260
0
    rt = task->thread->runtime;
1261
1262
0
    interface = nxt_service_get(rt->services, "engine", NULL);
1263
1264
0
    router = rtcf->router;
1265
1266
0
    ret = nxt_router_engines_create(task, router, tmcf, interface);
1267
0
    if (nxt_slow_path(ret != NXT_OK)) {
1268
0
        goto fail;
1269
0
    }
1270
1271
0
    ret = nxt_router_threads_create(task, rt, tmcf);
1272
0
    if (nxt_slow_path(ret != NXT_OK)) {
1273
0
        goto fail;
1274
0
    }
1275
1276
0
    nxt_router_apps_sort(task, router, tmcf);
1277
1278
0
    nxt_router_apps_hash_use(task, rtcf, 1);
1279
1280
0
    nxt_router_engines_post(router, tmcf);
1281
1282
0
    nxt_queue_add(&router->sockets, &updating_sockets);
1283
0
    nxt_queue_add(&router->sockets, &creating_sockets);
1284
1285
0
    if (router->access_log != rtcf->access_log) {
1286
0
        nxt_router_access_log_use(&router->lock, rtcf->access_log);
1287
1288
0
        nxt_router_access_log_release(task, &router->lock, router->access_log);
1289
1290
0
        router->access_log = rtcf->access_log;
1291
0
    }
1292
1293
0
    nxt_router_conf_ready(task, tmcf);
1294
1295
0
    return;
1296
1297
0
fail:
1298
1299
0
    nxt_router_conf_error(task, tmcf);
1300
1301
0
    return;
1302
0
}
1303
1304
1305
static void
1306
nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1307
0
{
1308
0
    nxt_joint_job_t  *job;
1309
1310
0
    job = obj;
1311
1312
0
    nxt_router_conf_ready(task, job->tmcf);
1313
0
}
1314
1315
1316
static void
1317
nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1318
0
{
1319
0
    uint32_t               count;
1320
0
    nxt_router_conf_t      *rtcf;
1321
0
    nxt_thread_spinlock_t  *lock;
1322
1323
0
    nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1324
1325
0
    if (--tmcf->count > 0) {
1326
0
        return;
1327
0
    }
1328
1329
0
    nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1330
1331
0
    rtcf = tmcf->router_conf;
1332
1333
0
    lock = &rtcf->router->lock;
1334
1335
0
    nxt_thread_spin_lock(lock);
1336
1337
0
    count = rtcf->count;
1338
1339
0
    nxt_thread_spin_unlock(lock);
1340
1341
0
    nxt_debug(task, "rtcf %p: %D", rtcf, count);
1342
1343
0
    if (count == 0) {
1344
0
        nxt_router_apps_hash_use(task, rtcf, -1);
1345
1346
0
        nxt_router_access_log_release(task, lock, rtcf->access_log);
1347
1348
0
        nxt_mp_destroy(rtcf->mem_pool);
1349
0
    }
1350
1351
0
    nxt_mp_release(tmcf->mem_pool);
1352
0
}
1353
1354
1355
void
1356
nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1357
0
{
1358
0
    nxt_app_t          *app;
1359
0
    nxt_socket_t       s;
1360
0
    nxt_router_t       *router;
1361
0
    nxt_queue_link_t   *qlk;
1362
0
    nxt_socket_conf_t  *skcf;
1363
0
    nxt_router_conf_t  *rtcf;
1364
1365
0
    nxt_alert(task, "failed to apply new conf");
1366
1367
0
    for (qlk = nxt_queue_first(&creating_sockets);
1368
0
         qlk != nxt_queue_tail(&creating_sockets);
1369
0
         qlk = nxt_queue_next(qlk))
1370
0
    {
1371
0
        skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1372
0
        s = skcf->listen->socket;
1373
1374
0
        if (s != -1) {
1375
0
            nxt_socket_close(task, s);
1376
0
        }
1377
1378
0
        nxt_free(skcf->listen);
1379
0
    }
1380
1381
0
    rtcf = tmcf->router_conf;
1382
1383
0
    nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1384
1385
0
        nxt_router_app_unlink(task, app);
1386
1387
0
    } nxt_queue_loop;
1388
1389
0
    router = rtcf->router;
1390
1391
0
    nxt_queue_add(&router->sockets, &keeping_sockets);
1392
0
    nxt_queue_add(&router->sockets, &deleting_sockets);
1393
1394
0
    nxt_queue_add(&router->apps, &tmcf->previous);
1395
1396
    // TODO: new engines and threads
1397
1398
0
    nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1399
1400
0
    nxt_mp_destroy(rtcf->mem_pool);
1401
1402
0
    nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1403
1404
0
    nxt_mp_release(tmcf->mem_pool);
1405
0
}
1406
1407
1408
static void
1409
nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1410
    nxt_port_msg_type_t type)
1411
0
{
1412
0
    nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1413
1414
0
    nxt_port_use(task, tmcf->port, -1);
1415
1416
0
    tmcf->port = NULL;
1417
0
}
1418
1419
1420
static nxt_conf_map_t  nxt_router_conf[] = {
1421
    {
1422
        nxt_string("listen_threads"),
1423
        NXT_CONF_MAP_INT32,
1424
        offsetof(nxt_router_conf_t, threads),
1425
    },
1426
};
1427
1428
1429
static nxt_conf_map_t  nxt_router_app_conf[] = {
1430
    {
1431
        nxt_string("type"),
1432
        NXT_CONF_MAP_STR,
1433
        offsetof(nxt_router_app_conf_t, type),
1434
    },
1435
1436
    {
1437
        nxt_string("limits"),
1438
        NXT_CONF_MAP_PTR,
1439
        offsetof(nxt_router_app_conf_t, limits_value),
1440
    },
1441
1442
    {
1443
        nxt_string("processes"),
1444
        NXT_CONF_MAP_INT32,
1445
        offsetof(nxt_router_app_conf_t, processes),
1446
    },
1447
1448
    {
1449
        nxt_string("processes"),
1450
        NXT_CONF_MAP_PTR,
1451
        offsetof(nxt_router_app_conf_t, processes_value),
1452
    },
1453
1454
    {
1455
        nxt_string("targets"),
1456
        NXT_CONF_MAP_PTR,
1457
        offsetof(nxt_router_app_conf_t, targets_value),
1458
    },
1459
};
1460
1461
1462
static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1463
    {
1464
        nxt_string("timeout"),
1465
        NXT_CONF_MAP_MSEC,
1466
        offsetof(nxt_router_app_conf_t, timeout),
1467
    },
1468
};
1469
1470
1471
static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1472
    {
1473
        nxt_string("spare"),
1474
        NXT_CONF_MAP_INT32,
1475
        offsetof(nxt_router_app_conf_t, spare_processes),
1476
    },
1477
1478
    {
1479
        nxt_string("max"),
1480
        NXT_CONF_MAP_INT32,
1481
        offsetof(nxt_router_app_conf_t, max_processes),
1482
    },
1483
1484
    {
1485
        nxt_string("idle_timeout"),
1486
        NXT_CONF_MAP_MSEC,
1487
        offsetof(nxt_router_app_conf_t, idle_timeout),
1488
    },
1489
};
1490
1491
1492
static nxt_conf_map_t  nxt_router_listener_conf[] = {
1493
    {
1494
        nxt_string("pass"),
1495
        NXT_CONF_MAP_STR_COPY,
1496
        offsetof(nxt_router_listener_conf_t, pass),
1497
    },
1498
1499
    {
1500
        nxt_string("application"),
1501
        NXT_CONF_MAP_STR_COPY,
1502
        offsetof(nxt_router_listener_conf_t, application),
1503
    },
1504
1505
    {
1506
        nxt_string("backlog"),
1507
        NXT_CONF_MAP_INT32,
1508
        offsetof(nxt_router_listener_conf_t, backlog),
1509
    },
1510
};
1511
1512
1513
static nxt_conf_map_t  nxt_router_http_conf[] = {
1514
    {
1515
        nxt_string("header_buffer_size"),
1516
        NXT_CONF_MAP_SIZE,
1517
        offsetof(nxt_socket_conf_t, header_buffer_size),
1518
    },
1519
1520
    {
1521
        nxt_string("large_header_buffer_size"),
1522
        NXT_CONF_MAP_SIZE,
1523
        offsetof(nxt_socket_conf_t, large_header_buffer_size),
1524
    },
1525
1526
    {
1527
        nxt_string("large_header_buffers"),
1528
        NXT_CONF_MAP_SIZE,
1529
        offsetof(nxt_socket_conf_t, large_header_buffers),
1530
    },
1531
1532
    {
1533
        nxt_string("body_buffer_size"),
1534
        NXT_CONF_MAP_SIZE,
1535
        offsetof(nxt_socket_conf_t, body_buffer_size),
1536
    },
1537
1538
    {
1539
        nxt_string("max_body_size"),
1540
        NXT_CONF_MAP_SIZE,
1541
        offsetof(nxt_socket_conf_t, max_body_size),
1542
    },
1543
1544
    {
1545
        nxt_string("idle_timeout"),
1546
        NXT_CONF_MAP_MSEC,
1547
        offsetof(nxt_socket_conf_t, idle_timeout),
1548
    },
1549
1550
    {
1551
        nxt_string("header_read_timeout"),
1552
        NXT_CONF_MAP_MSEC,
1553
        offsetof(nxt_socket_conf_t, header_read_timeout),
1554
    },
1555
1556
    {
1557
        nxt_string("body_read_timeout"),
1558
        NXT_CONF_MAP_MSEC,
1559
        offsetof(nxt_socket_conf_t, body_read_timeout),
1560
    },
1561
1562
    {
1563
        nxt_string("send_timeout"),
1564
        NXT_CONF_MAP_MSEC,
1565
        offsetof(nxt_socket_conf_t, send_timeout),
1566
    },
1567
1568
    {
1569
        nxt_string("body_temp_path"),
1570
        NXT_CONF_MAP_STR,
1571
        offsetof(nxt_socket_conf_t, body_temp_path),
1572
    },
1573
1574
    {
1575
        nxt_string("discard_unsafe_fields"),
1576
        NXT_CONF_MAP_INT8,
1577
        offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1578
    },
1579
1580
    {
1581
        nxt_string("log_route"),
1582
        NXT_CONF_MAP_INT8,
1583
        offsetof(nxt_socket_conf_t, log_route),
1584
    },
1585
1586
    {
1587
        nxt_string("server_version"),
1588
        NXT_CONF_MAP_INT8,
1589
        offsetof(nxt_socket_conf_t, server_version),
1590
    },
1591
1592
    {
1593
        nxt_string("chunked_transform"),
1594
        NXT_CONF_MAP_INT8,
1595
        offsetof(nxt_socket_conf_t, chunked_transform),
1596
    },
1597
};
1598
1599
1600
static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1601
    {
1602
        nxt_string("max_frame_size"),
1603
        NXT_CONF_MAP_SIZE,
1604
        offsetof(nxt_websocket_conf_t, max_frame_size),
1605
    },
1606
1607
    {
1608
        nxt_string("read_timeout"),
1609
        NXT_CONF_MAP_MSEC,
1610
        offsetof(nxt_websocket_conf_t, read_timeout),
1611
    },
1612
1613
    {
1614
        nxt_string("keepalive_interval"),
1615
        NXT_CONF_MAP_MSEC,
1616
        offsetof(nxt_websocket_conf_t, keepalive_interval),
1617
    },
1618
1619
};
1620
1621
1622
static nxt_int_t
1623
nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1624
    u_char *start, u_char *end)
1625
0
{
1626
0
    u_char                      *p;
1627
0
    size_t                      size;
1628
0
    nxt_mp_t                    *mp, *app_mp;
1629
0
    uint32_t                    next, next_target;
1630
0
    nxt_int_t                   ret;
1631
0
    nxt_str_t                   name, target;
1632
0
    nxt_app_t                   *app, *prev;
1633
0
    nxt_str_t                   *t, *s, *targets;
1634
0
    nxt_uint_t                  n, i;
1635
0
    nxt_port_t                  *port;
1636
0
    nxt_router_t                *router;
1637
0
    nxt_app_joint_t             *app_joint;
1638
#if (NXT_TLS)
1639
    nxt_tls_init_t              *tls_init;
1640
    nxt_conf_value_t            *certificate;
1641
#endif
1642
#if (NXT_HAVE_NJS)
1643
    nxt_conf_value_t            *js_module;
1644
#endif
1645
#if (NXT_HAVE_OTEL)
1646
    double                      telemetry_sample_fraction, telemetry_batching;
1647
    nxt_str_t                   telemetry_endpoint, telemetry_proto;
1648
    nxt_conf_value_t            *otel, *otel_endpoint, *otel_sampling,
1649
                                *otel_batching, *otel_proto;
1650
#endif
1651
0
    nxt_conf_value_t            *root, *conf, *http, *value, *websocket;
1652
0
    nxt_conf_value_t            *applications, *application, *settings;
1653
0
    nxt_conf_value_t            *listeners, *listener;
1654
0
    nxt_socket_conf_t           *skcf;
1655
0
    nxt_router_conf_t           *rtcf;
1656
0
    nxt_http_routes_t           *routes;
1657
0
    nxt_event_engine_t          *engine;
1658
0
    nxt_app_lang_module_t       *lang;
1659
0
    nxt_router_app_conf_t       apcf;
1660
0
    nxt_router_listener_conf_t  lscf;
1661
1662
0
    static const nxt_str_t  settings_path = nxt_string("/settings");
1663
0
    static const nxt_str_t  http_path = nxt_string("/settings/http");
1664
0
    static const nxt_str_t  applications_path = nxt_string("/applications");
1665
0
    static const nxt_str_t  listeners_path = nxt_string("/listeners");
1666
0
    static const nxt_str_t  routes_path = nxt_string("/routes");
1667
0
    static const nxt_str_t  access_log_path = nxt_string("/access_log");
1668
#if (NXT_TLS)
1669
    static const nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1670
    static const nxt_str_t  conf_commands_path =
1671
                                nxt_string("/tls/conf_commands");
1672
    static const nxt_str_t  conf_cache_path =
1673
                                nxt_string("/tls/session/cache_size");
1674
    static const nxt_str_t  conf_timeout_path =
1675
                                nxt_string("/tls/session/timeout");
1676
    static const nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1677
#endif
1678
#if (NXT_HAVE_NJS)
1679
    static const nxt_str_t  js_module_path = nxt_string("/settings/js_module");
1680
#endif
1681
0
    static const nxt_str_t  static_path = nxt_string("/settings/http/static");
1682
0
    static const nxt_str_t  websocket_path =
1683
0
                                nxt_string("/settings/http/websocket");
1684
0
    static const nxt_str_t  compression_path =
1685
0
                                nxt_string("/settings/http/compression");
1686
0
    static const nxt_str_t  forwarded_path = nxt_string("/forwarded");
1687
0
    static const nxt_str_t  client_ip_path = nxt_string("/client_ip");
1688
#if (NXT_HAVE_OTEL)
1689
    static const nxt_str_t  telemetry_path = nxt_string("/settings/telemetry");
1690
    static const nxt_str_t  telemetry_endpoint_path =
1691
                                nxt_string("/settings/telemetry/endpoint");
1692
    static const nxt_str_t  telemetry_batch_path =
1693
                                nxt_string("/settings/telemetry/batch_size");
1694
    static const nxt_str_t  telemetry_sample_path =
1695
                                nxt_string("/settings/telemetry/sampling_ratio");
1696
    static const nxt_str_t  telemetry_proto_path =
1697
                                nxt_string("/settings/telemetry/protocol");
1698
#endif
1699
1700
0
    root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1701
0
    if (root == NULL) {
1702
0
        nxt_alert(task, "configuration parsing error");
1703
0
        return NXT_ERROR;
1704
0
    }
1705
1706
0
    rtcf = tmcf->router_conf;
1707
0
    mp = rtcf->mem_pool;
1708
1709
0
    settings = nxt_conf_get_path(root, &settings_path);
1710
0
    if (settings != NULL) {
1711
0
        ret = nxt_conf_map_object(mp, settings, nxt_router_conf,
1712
0
                                  nxt_nitems(nxt_router_conf), rtcf);
1713
0
        if (ret != NXT_OK) {
1714
0
            nxt_alert(task, "router_conf map error");
1715
0
            return NXT_ERROR;
1716
0
        }
1717
0
    }
1718
1719
0
    if (rtcf->threads == 0) {
1720
0
        rtcf->threads = nxt_ncpu;
1721
0
    }
1722
1723
0
    conf = nxt_conf_get_path(root, &static_path);
1724
1725
0
    ret = nxt_router_conf_process_static(task, rtcf, conf);
1726
0
    if (nxt_slow_path(ret != NXT_OK)) {
1727
0
        return NXT_ERROR;
1728
0
    }
1729
1730
0
    router = rtcf->router;
1731
1732
0
    applications = nxt_conf_get_path(root, &applications_path);
1733
1734
0
    if (applications != NULL) {
1735
0
        next = 0;
1736
1737
0
        for ( ;; ) {
1738
0
            application = nxt_conf_next_object_member(applications,
1739
0
                                                      &name, &next);
1740
0
            if (application == NULL) {
1741
0
                break;
1742
0
            }
1743
1744
0
            nxt_debug(task, "application \"%V\"", &name);
1745
1746
0
            size = nxt_conf_json_length(application, NULL);
1747
1748
0
            app_mp = nxt_mp_create(4096, 128, 1024, 64);
1749
0
            if (nxt_slow_path(app_mp == NULL)) {
1750
0
                goto fail;
1751
0
            }
1752
1753
0
            app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1754
0
            if (app == NULL) {
1755
0
                goto app_fail;
1756
0
            }
1757
1758
0
            nxt_memzero(app, sizeof(nxt_app_t));
1759
1760
0
            app->mem_pool = app_mp;
1761
1762
0
            app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1763
0
            app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1764
0
                                                  + name.length);
1765
1766
0
            p = nxt_conf_json_print(app->conf.start, application, NULL);
1767
0
            app->conf.length = p - app->conf.start;
1768
1769
0
            nxt_assert(app->conf.length <= size);
1770
1771
0
            nxt_debug(task, "application conf \"%V\"", &app->conf);
1772
1773
0
            prev = nxt_router_app_find(&router->apps, &name);
1774
1775
0
            if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1776
0
                nxt_mp_destroy(app_mp);
1777
1778
0
                nxt_queue_remove(&prev->link);
1779
0
                nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1780
1781
0
                ret = nxt_router_apps_hash_add(rtcf, prev);
1782
0
                if (nxt_slow_path(ret != NXT_OK)) {
1783
0
                    goto fail;
1784
0
                }
1785
1786
0
                continue;
1787
0
            }
1788
1789
0
            apcf.processes = 1;
1790
0
            apcf.max_processes = 1;
1791
0
            apcf.spare_processes = 0;
1792
0
            apcf.timeout = 0;
1793
0
            apcf.idle_timeout = 15000;
1794
0
            apcf.limits_value = NULL;
1795
0
            apcf.processes_value = NULL;
1796
0
            apcf.targets_value = NULL;
1797
1798
0
            app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1799
0
            if (nxt_slow_path(app_joint == NULL)) {
1800
0
                goto app_fail;
1801
0
            }
1802
1803
0
            nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1804
1805
0
            ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1806
0
                                      nxt_nitems(nxt_router_app_conf), &apcf);
1807
0
            if (ret != NXT_OK) {
1808
0
                nxt_alert(task, "application map error");
1809
0
                goto app_fail;
1810
0
            }
1811
1812
0
            if (apcf.limits_value != NULL) {
1813
1814
0
                if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1815
0
                    nxt_alert(task, "application limits is not object");
1816
0
                    goto app_fail;
1817
0
                }
1818
1819
0
                ret = nxt_conf_map_object(mp, apcf.limits_value,
1820
0
                                        nxt_router_app_limits_conf,
1821
0
                                        nxt_nitems(nxt_router_app_limits_conf),
1822
0
                                        &apcf);
1823
0
                if (ret != NXT_OK) {
1824
0
                    nxt_alert(task, "application limits map error");
1825
0
                    goto app_fail;
1826
0
                }
1827
0
            }
1828
1829
0
            if (apcf.processes_value != NULL
1830
0
                && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1831
0
            {
1832
0
                ret = nxt_conf_map_object(mp, apcf.processes_value,
1833
0
                                     nxt_router_app_processes_conf,
1834
0
                                     nxt_nitems(nxt_router_app_processes_conf),
1835
0
                                     &apcf);
1836
0
                if (ret != NXT_OK) {
1837
0
                    nxt_alert(task, "application processes map error");
1838
0
                    goto app_fail;
1839
0
                }
1840
1841
0
            } else {
1842
0
                apcf.max_processes = apcf.processes;
1843
0
                apcf.spare_processes = apcf.processes;
1844
0
            }
1845
1846
0
            if (apcf.targets_value != NULL) {
1847
0
                n = nxt_conf_object_members_count(apcf.targets_value);
1848
1849
0
                targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1850
0
                if (nxt_slow_path(targets == NULL)) {
1851
0
                    goto app_fail;
1852
0
                }
1853
1854
0
                next_target = 0;
1855
1856
0
                for (i = 0; i < n; i++) {
1857
0
                    (void) nxt_conf_next_object_member(apcf.targets_value,
1858
0
                                                       &target, &next_target);
1859
1860
0
                    s = nxt_str_dup(app_mp, &targets[i], &target);
1861
0
                    if (nxt_slow_path(s == NULL)) {
1862
0
                        goto app_fail;
1863
0
                    }
1864
0
                }
1865
1866
0
            } else {
1867
0
                targets = NULL;
1868
0
            }
1869
1870
0
            nxt_debug(task, "application type: %V", &apcf.type);
1871
0
            nxt_debug(task, "application processes: %D", apcf.processes);
1872
0
            nxt_debug(task, "application request timeout: %M", apcf.timeout);
1873
1874
0
            lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1875
1876
0
            if (lang == NULL) {
1877
0
                nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1878
0
                goto app_fail;
1879
0
            }
1880
1881
0
            nxt_debug(task, "application language module: \"%s\"", lang->file);
1882
1883
0
            ret = nxt_thread_mutex_create(&app->mutex);
1884
0
            if (ret != NXT_OK) {
1885
0
                goto app_fail;
1886
0
            }
1887
1888
0
            nxt_queue_init(&app->ports);
1889
0
            nxt_queue_init(&app->spare_ports);
1890
0
            nxt_queue_init(&app->idle_ports);
1891
0
            nxt_queue_init(&app->ack_waiting_req);
1892
1893
0
            app->name.length = name.length;
1894
0
            nxt_memcpy(app->name.start, name.start, name.length);
1895
1896
0
            app->type = lang->type;
1897
0
            app->max_processes = apcf.max_processes;
1898
0
            app->spare_processes = apcf.spare_processes;
1899
0
            app->max_pending_processes = apcf.spare_processes
1900
0
                                         ? apcf.spare_processes : 1;
1901
0
            app->timeout = apcf.timeout;
1902
0
            app->idle_timeout = apcf.idle_timeout;
1903
1904
0
            app->targets = targets;
1905
1906
0
            engine = task->thread->engine;
1907
1908
0
            app->engine = engine;
1909
1910
0
            app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1911
0
            app->adjust_idle_work.task = &engine->task;
1912
0
            app->adjust_idle_work.obj = app;
1913
1914
0
            nxt_queue_insert_tail(&tmcf->apps, &app->link);
1915
1916
0
            ret = nxt_router_apps_hash_add(rtcf, app);
1917
0
            if (nxt_slow_path(ret != NXT_OK)) {
1918
0
                goto app_fail;
1919
0
            }
1920
1921
0
            nxt_router_app_use(task, app, 1);
1922
1923
0
            app->joint = app_joint;
1924
1925
0
            app_joint->use_count = 1;
1926
0
            app_joint->app = app;
1927
1928
0
            app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1929
0
            app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1930
0
            app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1931
0
            app_joint->idle_timer.task = &engine->task;
1932
0
            app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1933
1934
0
            app_joint->free_app_work.handler = nxt_router_free_app;
1935
0
            app_joint->free_app_work.task = &engine->task;
1936
0
            app_joint->free_app_work.obj = app_joint;
1937
1938
0
            port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1939
0
                                NXT_PROCESS_APP);
1940
0
            if (nxt_slow_path(port == NULL)) {
1941
0
                return NXT_ERROR;
1942
0
            }
1943
1944
0
            ret = nxt_port_socket_init(task, port, 0);
1945
0
            if (nxt_slow_path(ret != NXT_OK)) {
1946
0
                nxt_port_use(task, port, -1);
1947
0
                return NXT_ERROR;
1948
0
            }
1949
1950
0
            ret = nxt_router_app_queue_init(task, port);
1951
0
            if (nxt_slow_path(ret != NXT_OK)) {
1952
0
                nxt_port_write_close(port);
1953
0
                nxt_port_read_close(port);
1954
0
                nxt_port_use(task, port, -1);
1955
0
                return NXT_ERROR;
1956
0
            }
1957
1958
0
            nxt_port_write_enable(task, port);
1959
0
            port->app = app;
1960
1961
0
            app->shared_port = port;
1962
1963
0
            nxt_thread_mutex_create(&app->outgoing.mutex);
1964
0
        }
1965
0
    }
1966
1967
0
    conf = nxt_conf_get_path(root, &routes_path);
1968
0
    if (nxt_fast_path(conf != NULL)) {
1969
0
        routes = nxt_http_routes_create(task, tmcf, conf);
1970
0
        if (nxt_slow_path(routes == NULL)) {
1971
0
            return NXT_ERROR;
1972
0
        }
1973
1974
0
        rtcf->routes = routes;
1975
0
    }
1976
1977
0
    ret = nxt_upstreams_create(task, tmcf, root);
1978
0
    if (nxt_slow_path(ret != NXT_OK)) {
1979
0
        return ret;
1980
0
    }
1981
1982
0
    http = nxt_conf_get_path(root, &http_path);
1983
#if 0
1984
    if (http == NULL) {
1985
        nxt_alert(task, "no \"http\" block");
1986
        return NXT_ERROR;
1987
    }
1988
#endif
1989
1990
0
    websocket = nxt_conf_get_path(root, &websocket_path);
1991
1992
0
    listeners = nxt_conf_get_path(root, &listeners_path);
1993
1994
0
    if (listeners != NULL) {
1995
0
        next = 0;
1996
1997
0
        for ( ;; ) {
1998
0
            listener = nxt_conf_next_object_member(listeners, &name, &next);
1999
0
            if (listener == NULL) {
2000
0
                break;
2001
0
            }
2002
2003
0
            nxt_memzero(&lscf, sizeof(lscf));
2004
2005
0
            lscf.backlog = -1;
2006
2007
0
            ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
2008
0
                                      nxt_nitems(nxt_router_listener_conf),
2009
0
                                      &lscf);
2010
0
            if (ret != NXT_OK) {
2011
0
                nxt_alert(task, "listener map error");
2012
0
                goto fail;
2013
0
            }
2014
2015
0
            nxt_debug(task, "application: %V", &lscf.application);
2016
2017
0
            skcf = nxt_router_socket_conf(task, tmcf, &name, lscf.backlog);
2018
0
            if (skcf == NULL) {
2019
0
                goto fail;
2020
0
            }
2021
2022
            // STUB, default values if http block is not defined.
2023
0
            skcf->header_buffer_size = 2048;
2024
0
            skcf->large_header_buffer_size = 8192;
2025
0
            skcf->large_header_buffers = 4;
2026
0
            skcf->discard_unsafe_fields = 1;
2027
0
            skcf->body_buffer_size = 16 * 1024;
2028
0
            skcf->max_body_size = 8 * 1024 * 1024;
2029
0
            skcf->proxy_header_buffer_size = 64 * 1024;
2030
0
            skcf->proxy_buffer_size = 4096;
2031
0
            skcf->proxy_buffers = 256;
2032
0
            skcf->idle_timeout = 180 * 1000;
2033
0
            skcf->header_read_timeout = 30 * 1000;
2034
0
            skcf->body_read_timeout = 30 * 1000;
2035
0
            skcf->send_timeout = 30 * 1000;
2036
0
            skcf->proxy_timeout = 60 * 1000;
2037
0
            skcf->proxy_send_timeout = 30 * 1000;
2038
0
            skcf->proxy_read_timeout = 30 * 1000;
2039
2040
0
            skcf->server_version = 1;
2041
0
            skcf->chunked_transform = 0;
2042
2043
0
            skcf->websocket_conf.max_frame_size = 1024 * 1024;
2044
0
            skcf->websocket_conf.read_timeout = 60 * 1000;
2045
0
            skcf->websocket_conf.keepalive_interval = 30 * 1000;
2046
2047
0
            nxt_str_null(&skcf->body_temp_path);
2048
2049
0
            if (http != NULL) {
2050
0
                nxt_conf_value_t  *comp;
2051
2052
0
                ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
2053
0
                                          nxt_nitems(nxt_router_http_conf),
2054
0
                                          skcf);
2055
0
                if (ret != NXT_OK) {
2056
0
                    nxt_alert(task, "http map error");
2057
0
                    goto fail;
2058
0
                }
2059
2060
0
                comp = nxt_conf_get_path(root, &compression_path);
2061
0
                if (comp != NULL) {
2062
0
                    nxt_http_comp_compression_init(task, rtcf, comp);
2063
0
                }
2064
0
            }
2065
2066
0
            if (websocket != NULL) {
2067
0
                ret = nxt_conf_map_object(mp, websocket,
2068
0
                                          nxt_router_websocket_conf,
2069
0
                                          nxt_nitems(nxt_router_websocket_conf),
2070
0
                                          &skcf->websocket_conf);
2071
0
                if (ret != NXT_OK) {
2072
0
                    nxt_alert(task, "websocket map error");
2073
0
                    goto fail;
2074
0
                }
2075
0
            }
2076
2077
0
            t = &skcf->body_temp_path;
2078
2079
0
            if (t->length == 0) {
2080
0
                t->start = (u_char *) task->thread->runtime->tmp;
2081
0
                t->length = nxt_strlen(t->start);
2082
0
            }
2083
2084
0
            conf = nxt_conf_get_path(listener, &forwarded_path);
2085
2086
0
            if (conf != NULL) {
2087
0
                skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
2088
0
                if (nxt_slow_path(skcf->forwarded == NULL)) {
2089
0
                    return NXT_ERROR;
2090
0
                }
2091
0
            }
2092
2093
0
            conf = nxt_conf_get_path(listener, &client_ip_path);
2094
2095
0
            if (conf != NULL) {
2096
0
                skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
2097
0
                if (nxt_slow_path(skcf->client_ip == NULL)) {
2098
0
                    return NXT_ERROR;
2099
0
                }
2100
0
            }
2101
2102
#if (NXT_TLS)
2103
            certificate = nxt_conf_get_path(listener, &certificate_path);
2104
2105
            if (certificate != NULL) {
2106
                tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
2107
                if (nxt_slow_path(tls_init == NULL)) {
2108
                    return NXT_ERROR;
2109
                }
2110
2111
                tls_init->cache_size = 0;
2112
                tls_init->timeout = 300;
2113
2114
                value = nxt_conf_get_path(listener, &conf_cache_path);
2115
                if (value != NULL) {
2116
                    tls_init->cache_size = nxt_conf_get_number(value);
2117
                }
2118
2119
                value = nxt_conf_get_path(listener, &conf_timeout_path);
2120
                if (value != NULL) {
2121
                    tls_init->timeout = nxt_conf_get_number(value);
2122
                }
2123
2124
                tls_init->conf_cmds = nxt_conf_get_path(listener,
2125
                                                        &conf_commands_path);
2126
2127
                tls_init->tickets_conf = nxt_conf_get_path(listener,
2128
                                                           &conf_tickets);
2129
2130
                n = nxt_conf_array_elements_count_or_1(certificate);
2131
2132
                for (i = 0; i < n; i++) {
2133
                    value = nxt_conf_get_array_element_or_itself(certificate,
2134
                                                                 i);
2135
                    nxt_assert(value != NULL);
2136
2137
                    ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
2138
                                                     tls_init, i == 0);
2139
                    if (nxt_slow_path(ret != NXT_OK)) {
2140
                        goto fail;
2141
                    }
2142
                }
2143
            }
2144
#endif
2145
2146
0
            skcf->listen->handler = nxt_http_conn_init;
2147
0
            skcf->router_conf = rtcf;
2148
0
            skcf->router_conf->count++;
2149
2150
0
            if (lscf.pass.length != 0) {
2151
0
                skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
2152
2153
            /* COMPATIBILITY: listener application. */
2154
0
            } else if (lscf.application.length > 0) {
2155
0
                skcf->action = nxt_http_pass_application(task, rtcf,
2156
0
                                                         &lscf.application);
2157
0
            }
2158
2159
0
            if (nxt_slow_path(skcf->action == NULL)) {
2160
0
                goto fail;
2161
0
            }
2162
0
        }
2163
0
    }
2164
2165
0
    ret = nxt_http_routes_resolve(task, tmcf);
2166
0
    if (nxt_slow_path(ret != NXT_OK)) {
2167
0
        goto fail;
2168
0
    }
2169
2170
0
    value = nxt_conf_get_path(root, &access_log_path);
2171
2172
0
    if (value != NULL) {
2173
0
        ret = nxt_router_access_log_create(task, rtcf, value);
2174
0
        if (nxt_slow_path(ret != NXT_OK)) {
2175
0
            goto fail;
2176
0
        }
2177
0
    }
2178
2179
#if (NXT_HAVE_NJS)
2180
    js_module = nxt_conf_get_path(root, &js_module_path);
2181
2182
    if (js_module != NULL) {
2183
        if (nxt_conf_type(js_module) == NXT_CONF_ARRAY) {
2184
            n = nxt_conf_array_elements_count(js_module);
2185
2186
            for (i = 0; i < n; i++) {
2187
                value = nxt_conf_get_array_element(js_module, i);
2188
2189
                ret = nxt_router_js_module_insert(tmcf, value);
2190
                if (nxt_slow_path(ret != NXT_OK)) {
2191
                    goto fail;
2192
                }
2193
            }
2194
2195
        } else {
2196
            /* NXT_CONF_STRING */
2197
2198
            ret = nxt_router_js_module_insert(tmcf, js_module);
2199
            if (nxt_slow_path(ret != NXT_OK)) {
2200
                goto fail;
2201
            }
2202
        }
2203
    }
2204
2205
#endif
2206
2207
#if (NXT_HAVE_OTEL)
2208
    otel = nxt_conf_get_path(root, &telemetry_path);
2209
2210
    if (otel) {
2211
        otel_endpoint = nxt_conf_get_path(root, &telemetry_endpoint_path);
2212
        otel_batching = nxt_conf_get_path(root, &telemetry_batch_path);
2213
        otel_sampling = nxt_conf_get_path(root, &telemetry_sample_path);
2214
        otel_proto    = nxt_conf_get_path(root, &telemetry_proto_path);
2215
2216
        nxt_conf_get_string(otel_endpoint, &telemetry_endpoint);
2217
        nxt_conf_get_string(otel_proto, &telemetry_proto);
2218
2219
        telemetry_batching = otel_batching
2220
            ? nxt_conf_get_number(otel_batching)
2221
            : NXT_OTEL_BATCH_DEFAULT;
2222
2223
        telemetry_sample_fraction = otel_sampling
2224
            ? nxt_conf_get_number(otel_sampling)
2225
            : NXT_OTEL_SAMPLING_DEFAULT;
2226
2227
        nxt_otel_rs_init(&nxt_otel_log_callback, &telemetry_endpoint,
2228
                         &telemetry_proto, telemetry_sample_fraction,
2229
                         telemetry_batching);
2230
    } else {
2231
        nxt_otel_rs_uninit();
2232
    }
2233
#endif
2234
2235
0
    nxt_queue_add(&deleting_sockets, &router->sockets);
2236
0
    nxt_queue_init(&router->sockets);
2237
2238
0
    return NXT_OK;
2239
2240
0
app_fail:
2241
2242
0
    nxt_mp_destroy(app_mp);
2243
2244
0
fail:
2245
2246
0
    nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2247
2248
0
        nxt_queue_remove(&app->link);
2249
0
        nxt_thread_mutex_destroy(&app->mutex);
2250
0
        nxt_mp_destroy(app->mem_pool);
2251
2252
0
    } nxt_queue_loop;
2253
2254
0
    return NXT_ERROR;
2255
0
}
2256
2257
2258
#if (NXT_TLS)
2259
2260
static nxt_int_t
2261
nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2262
    nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2263
    nxt_tls_init_t *tls_init, nxt_bool_t last)
2264
{
2265
    nxt_router_tlssock_t  *tls;
2266
2267
    tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2268
    if (nxt_slow_path(tls == NULL)) {
2269
        return NXT_ERROR;
2270
    }
2271
2272
    tls->tls_init = tls_init;
2273
    tls->socket_conf = skcf;
2274
    tls->temp_conf = tmcf;
2275
    tls->last = last;
2276
    nxt_conf_get_string(value, &tls->name);
2277
2278
    nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2279
2280
    return NXT_OK;
2281
}
2282
2283
#endif
2284
2285
2286
#if (NXT_HAVE_NJS)
2287
2288
static void
2289
nxt_router_js_module_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2290
    void *data)
2291
{
2292
    nxt_int_t               ret;
2293
    nxt_str_t               text;
2294
    nxt_router_conf_t       *rtcf;
2295
    nxt_router_temp_conf_t  *tmcf;
2296
    nxt_router_js_module_t  *js_module;
2297
2298
    nxt_debug(task, "auto module rpc handler");
2299
2300
    js_module = data;
2301
    tmcf = js_module->temp_conf;
2302
2303
    if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2304
        goto fail;
2305
    }
2306
2307
    rtcf = tmcf->router_conf;
2308
2309
    ret = nxt_script_file_read(msg->fd[0], &text);
2310
2311
    nxt_fd_close(msg->fd[0]);
2312
2313
    if (nxt_slow_path(ret == NXT_ERROR)) {
2314
        goto fail;
2315
    }
2316
2317
    if (text.length > 0) {
2318
        ret = nxt_js_add_module(rtcf->tstr_state->jcf, &js_module->name, &text);
2319
2320
        nxt_free(text.start);
2321
2322
        if (nxt_slow_path(ret == NXT_ERROR)) {
2323
            goto fail;
2324
        }
2325
    }
2326
2327
    nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2328
                       nxt_router_conf_apply, task, tmcf, NULL);
2329
    return;
2330
2331
fail:
2332
2333
    nxt_router_conf_error(task, tmcf);
2334
}
2335
2336
2337
static nxt_int_t
2338
nxt_router_js_module_insert(nxt_router_temp_conf_t *tmcf,
2339
    nxt_conf_value_t *value)
2340
{
2341
    nxt_router_js_module_t  *js_module;
2342
2343
    js_module = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_js_module_t));
2344
    if (nxt_slow_path(js_module == NULL)) {
2345
        return NXT_ERROR;
2346
    }
2347
2348
    js_module->temp_conf = tmcf;
2349
    nxt_conf_get_string(value, &js_module->name);
2350
2351
    nxt_queue_insert_tail(&tmcf->js_modules, &js_module->link);
2352
2353
    return NXT_OK;
2354
}
2355
2356
#endif
2357
2358
2359
static nxt_int_t
2360
nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2361
    nxt_conf_value_t *conf)
2362
0
{
2363
0
    uint32_t          next, i;
2364
0
    nxt_mp_t          *mp;
2365
0
    nxt_str_t         *type, exten, str, *s;
2366
0
    nxt_int_t         ret;
2367
0
    nxt_uint_t        exts;
2368
0
    nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2369
2370
0
    static const nxt_str_t  mtypes_path = nxt_string("/mime_types");
2371
2372
0
    mp = rtcf->mem_pool;
2373
2374
0
    ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2375
0
    if (nxt_slow_path(ret != NXT_OK)) {
2376
0
        return NXT_ERROR;
2377
0
    }
2378
2379
0
    if (conf == NULL) {
2380
0
        return NXT_OK;
2381
0
    }
2382
2383
0
    mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2384
2385
0
    if (mtypes_conf != NULL) {
2386
0
        next = 0;
2387
2388
0
        for ( ;; ) {
2389
0
            ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2390
2391
0
            if (ext_conf == NULL) {
2392
0
                break;
2393
0
            }
2394
2395
0
            type = nxt_str_dup(mp, NULL, &str);
2396
0
            if (nxt_slow_path(type == NULL)) {
2397
0
                return NXT_ERROR;
2398
0
            }
2399
2400
0
            if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2401
0
                s = nxt_conf_get_string_dup(ext_conf, mp, &exten);
2402
0
                if (nxt_slow_path(s == NULL)) {
2403
0
                    return NXT_ERROR;
2404
0
                }
2405
2406
0
                ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2407
0
                                                      &exten, type);
2408
0
                if (nxt_slow_path(ret != NXT_OK)) {
2409
0
                    return NXT_ERROR;
2410
0
                }
2411
2412
0
                continue;
2413
0
            }
2414
2415
0
            exts = nxt_conf_array_elements_count(ext_conf);
2416
2417
0
            for (i = 0; i < exts; i++) {
2418
0
                value = nxt_conf_get_array_element(ext_conf, i);
2419
2420
0
                s = nxt_conf_get_string_dup(value, mp, &exten);
2421
0
                if (nxt_slow_path(s == NULL)) {
2422
0
                    return NXT_ERROR;
2423
0
                }
2424
2425
0
                ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2426
0
                                                      &exten, type);
2427
0
                if (nxt_slow_path(ret != NXT_OK)) {
2428
0
                    return NXT_ERROR;
2429
0
                }
2430
0
            }
2431
0
        }
2432
0
    }
2433
2434
0
    return NXT_OK;
2435
0
}
2436
2437
2438
static nxt_http_forward_t *
2439
nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2440
0
{
2441
0
    nxt_int_t                   ret;
2442
0
    nxt_conf_value_t            *header_conf, *client_ip_conf, *protocol_conf;
2443
0
    nxt_conf_value_t            *source_conf, *recursive_conf;
2444
0
    nxt_http_forward_t          *forward;
2445
0
    nxt_http_route_addr_rule_t  *source;
2446
2447
0
    static const nxt_str_t  header_path = nxt_string("/header");
2448
0
    static const nxt_str_t  client_ip_path = nxt_string("/client_ip");
2449
0
    static const nxt_str_t  protocol_path = nxt_string("/protocol");
2450
0
    static const nxt_str_t  source_path = nxt_string("/source");
2451
0
    static const nxt_str_t  recursive_path = nxt_string("/recursive");
2452
2453
0
    header_conf = nxt_conf_get_path(conf, &header_path);
2454
2455
0
    if (header_conf != NULL) {
2456
0
        client_ip_conf = nxt_conf_get_path(conf, &header_path);
2457
0
        protocol_conf = NULL;
2458
2459
0
    } else {
2460
0
        client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2461
0
        protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2462
0
    }
2463
2464
0
    source_conf = nxt_conf_get_path(conf, &source_path);
2465
0
    recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2466
2467
0
    if (source_conf == NULL
2468
0
        || (protocol_conf == NULL && client_ip_conf == NULL))
2469
0
    {
2470
0
        return NULL;
2471
0
    }
2472
2473
0
    forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2474
0
    if (nxt_slow_path(forward == NULL)) {
2475
0
        return NULL;
2476
0
    }
2477
2478
0
    source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2479
0
    if (nxt_slow_path(source == NULL)) {
2480
0
        return NULL;
2481
0
    }
2482
2483
0
    forward->source = source;
2484
2485
0
    if (recursive_conf != NULL) {
2486
0
        forward->recursive = nxt_conf_get_boolean(recursive_conf);
2487
0
    }
2488
2489
0
    if (client_ip_conf != NULL) {
2490
0
        ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2491
0
                                             &forward->client_ip);
2492
0
        if (nxt_slow_path(ret != NXT_OK)) {
2493
0
            return NULL;
2494
0
        }
2495
0
    }
2496
2497
0
    if (protocol_conf != NULL) {
2498
0
        ret = nxt_router_conf_forward_header(mp, protocol_conf,
2499
0
                                             &forward->protocol);
2500
0
        if (nxt_slow_path(ret != NXT_OK)) {
2501
0
            return NULL;
2502
0
        }
2503
0
    }
2504
2505
0
    return forward;
2506
0
}
2507
2508
2509
static nxt_int_t
2510
nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2511
    nxt_http_forward_header_t *fh)
2512
0
{
2513
0
    char      c;
2514
0
    size_t    i;
2515
0
    uint32_t  hash;
2516
2517
0
    fh->header = nxt_conf_get_string_dup(conf, mp, NULL);
2518
0
    if (nxt_slow_path(fh->header == NULL)) {
2519
0
        return NXT_ERROR;
2520
0
    }
2521
2522
0
    hash = NXT_HTTP_FIELD_HASH_INIT;
2523
2524
0
    for (i = 0; i < fh->header->length; i++) {
2525
0
        c = fh->header->start[i];
2526
0
        hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2527
0
    }
2528
2529
0
    hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2530
2531
0
    fh->header_hash = hash;
2532
2533
0
    return NXT_OK;
2534
0
}
2535
2536
2537
static nxt_app_t *
2538
nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2539
0
{
2540
0
    nxt_app_t  *app;
2541
2542
0
    nxt_queue_each(app, queue, nxt_app_t, link) {
2543
2544
0
        if (nxt_strstr_eq(name, &app->name)) {
2545
0
            return app;
2546
0
        }
2547
2548
0
    } nxt_queue_loop;
2549
2550
0
    return NULL;
2551
0
}
2552
2553
2554
static nxt_int_t
2555
nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2556
0
{
2557
0
    void       *mem;
2558
0
    nxt_int_t  fd;
2559
2560
0
    fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2561
0
    if (nxt_slow_path(fd == -1)) {
2562
0
        return NXT_ERROR;
2563
0
    }
2564
2565
0
    mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2566
0
                       PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2567
0
    if (nxt_slow_path(mem == MAP_FAILED)) {
2568
0
        nxt_fd_close(fd);
2569
2570
0
        return NXT_ERROR;
2571
0
    }
2572
2573
0
    nxt_app_queue_init(mem);
2574
2575
0
    port->queue_fd = fd;
2576
0
    port->queue = mem;
2577
2578
0
    return NXT_OK;
2579
0
}
2580
2581
2582
static nxt_int_t
2583
nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2584
0
{
2585
0
    void       *mem;
2586
0
    nxt_int_t  fd;
2587
2588
0
    fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2589
0
    if (nxt_slow_path(fd == -1)) {
2590
0
        return NXT_ERROR;
2591
0
    }
2592
2593
0
    mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2594
0
                       PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2595
0
    if (nxt_slow_path(mem == MAP_FAILED)) {
2596
0
        nxt_fd_close(fd);
2597
2598
0
        return NXT_ERROR;
2599
0
    }
2600
2601
0
    nxt_port_queue_init(mem);
2602
2603
0
    port->queue_fd = fd;
2604
0
    port->queue = mem;
2605
2606
0
    return NXT_OK;
2607
0
}
2608
2609
2610
static nxt_int_t
2611
nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2612
0
{
2613
0
    void  *mem;
2614
2615
0
    nxt_assert(fd != -1);
2616
2617
0
    mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2618
0
                       PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2619
0
    if (nxt_slow_path(mem == MAP_FAILED)) {
2620
2621
0
        return NXT_ERROR;
2622
0
    }
2623
2624
0
    port->queue = mem;
2625
2626
0
    return NXT_OK;
2627
0
}
2628
2629
2630
static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2631
    NXT_LVLHSH_DEFAULT,
2632
    nxt_router_apps_hash_test,
2633
    nxt_mp_lvlhsh_alloc,
2634
    nxt_mp_lvlhsh_free,
2635
};
2636
2637
2638
static nxt_int_t
2639
nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2640
0
{
2641
0
    nxt_app_t  *app;
2642
2643
0
    app = data;
2644
2645
0
    return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2646
0
}
2647
2648
2649
static nxt_int_t
2650
nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2651
0
{
2652
0
    nxt_lvlhsh_query_t  lhq;
2653
2654
0
    lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2655
0
    lhq.replace = 0;
2656
0
    lhq.key = app->name;
2657
0
    lhq.value = app;
2658
0
    lhq.proto = &nxt_router_apps_hash_proto;
2659
0
    lhq.pool = rtcf->mem_pool;
2660
2661
0
    switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2662
2663
0
    case NXT_OK:
2664
0
        return NXT_OK;
2665
2666
0
    case NXT_DECLINED:
2667
0
        nxt_thread_log_alert("router app hash adding failed: "
2668
0
                             "\"%V\" is already in hash", &lhq.key);
2669
        /* Fall through. */
2670
0
    default:
2671
0
        return NXT_ERROR;
2672
0
    }
2673
0
}
2674
2675
2676
static nxt_app_t *
2677
nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2678
0
{
2679
0
    nxt_lvlhsh_query_t  lhq;
2680
2681
0
    lhq.key_hash = nxt_djb_hash(name->start, name->length);
2682
0
    lhq.key = *name;
2683
0
    lhq.proto = &nxt_router_apps_hash_proto;
2684
2685
0
    if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2686
0
        return NULL;
2687
0
    }
2688
2689
0
    return lhq.value;
2690
0
}
2691
2692
2693
static void
2694
nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2695
0
{
2696
0
    nxt_app_t          *app;
2697
0
    nxt_lvlhsh_each_t  lhe;
2698
2699
0
    nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2700
2701
0
    for ( ;; ) {
2702
0
        app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2703
2704
0
        if (app == NULL) {
2705
0
            break;
2706
0
        }
2707
2708
0
        nxt_router_app_use(task, app, i);
2709
0
    }
2710
0
}
2711
2712
2713
typedef struct {
2714
    nxt_app_t  *app;
2715
    nxt_int_t  target;
2716
} nxt_http_app_conf_t;
2717
2718
2719
nxt_int_t
2720
nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2721
    nxt_str_t *target, nxt_http_action_t *action)
2722
0
{
2723
0
    nxt_app_t            *app;
2724
0
    nxt_str_t            *targets;
2725
0
    nxt_uint_t           i;
2726
0
    nxt_http_app_conf_t  *conf;
2727
2728
0
    app = nxt_router_apps_hash_get(rtcf, name);
2729
0
    if (app == NULL) {
2730
0
        return NXT_DECLINED;
2731
0
    }
2732
2733
0
    conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2734
0
    if (nxt_slow_path(conf == NULL)) {
2735
0
        return NXT_ERROR;
2736
0
    }
2737
2738
0
    action->handler = nxt_http_application_handler;
2739
0
    action->u.conf = conf;
2740
2741
0
    conf->app = app;
2742
2743
0
    if (target != NULL && target->length != 0) {
2744
0
        targets = app->targets;
2745
2746
0
        for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2747
2748
0
        conf->target = i;
2749
2750
0
    } else {
2751
0
        conf->target = 0;
2752
0
    }
2753
2754
0
    return NXT_OK;
2755
0
}
2756
2757
2758
static nxt_socket_conf_t *
2759
nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2760
    nxt_str_t *name, int backlog)
2761
0
{
2762
0
    size_t               size;
2763
0
    nxt_int_t            ret;
2764
0
    nxt_bool_t           wildcard;
2765
0
    nxt_sockaddr_t       *sa;
2766
0
    nxt_socket_conf_t    *skcf;
2767
0
    nxt_listen_socket_t  *ls;
2768
2769
0
    sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2770
0
    if (nxt_slow_path(sa == NULL)) {
2771
0
        nxt_alert(task, "invalid listener \"%V\"", name);
2772
0
        return NULL;
2773
0
    }
2774
2775
0
    sa->type = SOCK_STREAM;
2776
2777
0
    nxt_debug(task, "router listener: \"%*s\"",
2778
0
              (size_t) sa->length, nxt_sockaddr_start(sa));
2779
2780
0
    skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2781
0
    if (nxt_slow_path(skcf == NULL)) {
2782
0
        return NULL;
2783
0
    }
2784
2785
0
    size = nxt_sockaddr_size(sa);
2786
2787
0
    ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2788
2789
0
    if (ret != NXT_OK) {
2790
2791
0
        ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2792
0
        if (nxt_slow_path(ls == NULL)) {
2793
0
            return NULL;
2794
0
        }
2795
2796
0
        skcf->listen = ls;
2797
2798
0
        ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2799
0
        nxt_memcpy(ls->sockaddr, sa, size);
2800
2801
0
        nxt_listen_socket_remote_size(ls);
2802
2803
0
        ls->socket = -1;
2804
0
        ls->backlog = backlog > -1 ? backlog : NXT_LISTEN_BACKLOG;
2805
0
        ls->flags = NXT_NONBLOCK;
2806
0
        ls->read_after_accept = 1;
2807
0
    }
2808
2809
0
    switch (sa->u.sockaddr.sa_family) {
2810
0
#if (NXT_HAVE_UNIX_DOMAIN)
2811
0
    case AF_UNIX:
2812
0
        wildcard = 0;
2813
0
        break;
2814
0
#endif
2815
0
#if (NXT_INET6)
2816
0
    case AF_INET6:
2817
0
        wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2818
0
        break;
2819
0
#endif
2820
0
    case AF_INET:
2821
0
    default:
2822
0
        wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2823
0
        break;
2824
0
    }
2825
2826
0
    if (!wildcard) {
2827
0
        skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2828
0
        if (nxt_slow_path(skcf->sockaddr == NULL)) {
2829
0
            return NULL;
2830
0
        }
2831
2832
0
        nxt_memcpy(skcf->sockaddr, sa, size);
2833
0
    }
2834
2835
0
    return skcf;
2836
0
}
2837
2838
2839
static nxt_int_t
2840
nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2841
    nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2842
0
{
2843
0
    nxt_router_t       *router;
2844
0
    nxt_queue_link_t   *qlk;
2845
0
    nxt_socket_conf_t  *skcf;
2846
2847
0
    router = tmcf->router_conf->router;
2848
2849
0
    for (qlk = nxt_queue_first(&router->sockets);
2850
0
         qlk != nxt_queue_tail(&router->sockets);
2851
0
         qlk = nxt_queue_next(qlk))
2852
0
    {
2853
0
        skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2854
2855
0
        if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2856
0
            nskcf->listen = skcf->listen;
2857
2858
0
            nxt_queue_remove(qlk);
2859
0
            nxt_queue_insert_tail(&keeping_sockets, qlk);
2860
2861
0
            nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2862
2863
0
            return NXT_OK;
2864
0
        }
2865
0
    }
2866
2867
0
    nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2868
2869
0
    return NXT_DECLINED;
2870
0
}
2871
2872
2873
static void
2874
nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2875
    nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2876
0
{
2877
0
    size_t            size;
2878
0
    uint32_t          stream;
2879
0
    nxt_int_t         ret;
2880
0
    nxt_buf_t         *b;
2881
0
    nxt_port_t        *main_port, *router_port;
2882
0
    nxt_runtime_t     *rt;
2883
0
    nxt_socket_rpc_t  *rpc;
2884
2885
0
    rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2886
0
    if (rpc == NULL) {
2887
0
        goto fail;
2888
0
    }
2889
2890
0
    rpc->socket_conf = skcf;
2891
0
    rpc->temp_conf = tmcf;
2892
2893
0
    size = nxt_sockaddr_size(skcf->listen->sockaddr);
2894
2895
0
    b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2896
0
    if (b == NULL) {
2897
0
        goto fail;
2898
0
    }
2899
2900
0
    b->completion_handler = nxt_buf_dummy_completion;
2901
2902
0
    b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2903
2904
0
    rt = task->thread->runtime;
2905
0
    main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2906
0
    router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2907
2908
0
    stream = nxt_port_rpc_register_handler(task, router_port,
2909
0
                                           nxt_router_listen_socket_ready,
2910
0
                                           nxt_router_listen_socket_error,
2911
0
                                           main_port->pid, rpc);
2912
0
    if (nxt_slow_path(stream == 0)) {
2913
0
        goto fail;
2914
0
    }
2915
2916
0
    ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2917
0
                                stream, router_port->id, b);
2918
2919
0
    if (nxt_slow_path(ret != NXT_OK)) {
2920
0
        nxt_port_rpc_cancel(task, router_port, stream);
2921
0
        goto fail;
2922
0
    }
2923
2924
0
    return;
2925
2926
0
fail:
2927
2928
0
    nxt_router_conf_error(task, tmcf);
2929
0
}
2930
2931
2932
static void
2933
nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2934
    void *data)
2935
0
{
2936
0
    nxt_int_t         ret;
2937
0
    nxt_socket_t      s;
2938
0
    nxt_socket_rpc_t  *rpc;
2939
2940
0
    rpc = data;
2941
2942
0
    s = msg->fd[0];
2943
2944
0
    ret = nxt_socket_nonblocking(task, s);
2945
0
    if (nxt_slow_path(ret != NXT_OK)) {
2946
0
        goto fail;
2947
0
    }
2948
2949
0
    nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2950
2951
0
    ret = nxt_listen_socket(task, s, rpc->socket_conf->listen->backlog);
2952
0
    if (nxt_slow_path(ret != NXT_OK)) {
2953
0
        goto fail;
2954
0
    }
2955
2956
0
    rpc->socket_conf->listen->socket = s;
2957
2958
0
    nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2959
0
                       nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2960
2961
0
    return;
2962
2963
0
fail:
2964
2965
0
    nxt_socket_close(task, s);
2966
2967
0
    nxt_router_conf_error(task, rpc->temp_conf);
2968
0
}
2969
2970
2971
static void
2972
nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2973
    void *data)
2974
0
{
2975
0
    nxt_socket_rpc_t        *rpc;
2976
0
    nxt_router_temp_conf_t  *tmcf;
2977
2978
0
    rpc = data;
2979
0
    tmcf = rpc->temp_conf;
2980
2981
#if 0
2982
    u_char                  *p;
2983
    size_t                  size;
2984
    uint8_t                 error;
2985
    nxt_buf_t               *in, *out;
2986
    nxt_sockaddr_t          *sa;
2987
2988
    static nxt_str_t  socket_errors[] = {
2989
        nxt_string("ListenerSystem"),
2990
        nxt_string("ListenerNoIPv6"),
2991
        nxt_string("ListenerPort"),
2992
        nxt_string("ListenerInUse"),
2993
        nxt_string("ListenerNoAddress"),
2994
        nxt_string("ListenerNoAccess"),
2995
        nxt_string("ListenerPath"),
2996
    };
2997
2998
    sa = rpc->socket_conf->listen->sockaddr;
2999
3000
    in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
3001
3002
    if (nxt_slow_path(in == NULL)) {
3003
        return;
3004
    }
3005
3006
    p = in->mem.pos;
3007
3008
    error = *p++;
3009
3010
    size = nxt_length("listen socket error: ")
3011
           + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
3012
           + sa->length + socket_errors[error].length + (in->mem.free - p);
3013
3014
    out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
3015
    if (nxt_slow_path(out == NULL)) {
3016
        return;
3017
    }
3018
3019
    out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
3020
                        "listen socket error: "
3021
                        "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
3022
                        (size_t) sa->length, nxt_sockaddr_start(sa),
3023
                        &socket_errors[error], in->mem.free - p, p);
3024
3025
    nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
3026
#endif
3027
3028
0
    nxt_router_conf_error(task, tmcf);
3029
0
}
3030
3031
3032
#if (NXT_TLS)
3033
3034
static void
3035
nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3036
    void *data)
3037
{
3038
    nxt_mp_t                *mp;
3039
    nxt_int_t               ret;
3040
    nxt_tls_conf_t          *tlscf;
3041
    nxt_router_tlssock_t    *tls;
3042
    nxt_tls_bundle_conf_t   *bundle;
3043
    nxt_router_temp_conf_t  *tmcf;
3044
3045
    nxt_debug(task, "tls rpc handler");
3046
3047
    tls = data;
3048
    tmcf = tls->temp_conf;
3049
3050
    if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
3051
        goto fail;
3052
    }
3053
3054
    mp = tmcf->router_conf->mem_pool;
3055
3056
    if (tls->socket_conf->tls == NULL) {
3057
        tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
3058
        if (nxt_slow_path(tlscf == NULL)) {
3059
            goto fail;
3060
        }
3061
3062
        tlscf->no_wait_shutdown = 1;
3063
        tls->socket_conf->tls = tlscf;
3064
3065
    } else {
3066
        tlscf = tls->socket_conf->tls;
3067
    }
3068
3069
    tls->tls_init->conf = tlscf;
3070
3071
    bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
3072
    if (nxt_slow_path(bundle == NULL)) {
3073
        goto fail;
3074
    }
3075
3076
    if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
3077
        goto fail;
3078
    }
3079
3080
    bundle->chain_file = msg->fd[0];
3081
    bundle->next = tlscf->bundle;
3082
    tlscf->bundle = bundle;
3083
3084
    ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
3085
                                                  tls->last);
3086
    if (nxt_slow_path(ret != NXT_OK)) {
3087
        goto fail;
3088
    }
3089
3090
    nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3091
                       nxt_router_conf_apply, task, tmcf, NULL);
3092
    return;
3093
3094
fail:
3095
3096
    nxt_router_conf_error(task, tmcf);
3097
}
3098
3099
#endif
3100
3101
3102
static void
3103
nxt_router_app_rpc_create(nxt_task_t *task,
3104
    nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
3105
0
{
3106
0
    size_t         size;
3107
0
    uint32_t       stream;
3108
0
    nxt_fd_t       port_fd, queue_fd;
3109
0
    nxt_int_t      ret;
3110
0
    nxt_buf_t      *b;
3111
0
    nxt_port_t     *router_port, *dport;
3112
0
    nxt_runtime_t  *rt;
3113
0
    nxt_app_rpc_t  *rpc;
3114
3115
0
    rt = task->thread->runtime;
3116
3117
0
    dport = app->proto_port;
3118
3119
0
    if (dport == NULL) {
3120
0
        nxt_debug(task, "app '%V' prototype prefork", &app->name);
3121
3122
0
        size = app->name.length + 1 + app->conf.length;
3123
3124
0
        b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
3125
0
        if (nxt_slow_path(b == NULL)) {
3126
0
            goto fail;
3127
0
        }
3128
3129
0
        b->completion_handler = nxt_buf_dummy_completion;
3130
3131
0
        nxt_buf_cpystr(b, &app->name);
3132
0
        *b->mem.free++ = '\0';
3133
0
        nxt_buf_cpystr(b, &app->conf);
3134
3135
0
        dport = rt->port_by_type[NXT_PROCESS_MAIN];
3136
3137
0
        port_fd = app->shared_port->pair[0];
3138
0
        queue_fd = app->shared_port->queue_fd;
3139
3140
0
    } else {
3141
0
        nxt_debug(task, "app '%V' prefork", &app->name);
3142
3143
0
        b = NULL;
3144
0
        port_fd = -1;
3145
0
        queue_fd = -1;
3146
0
    }
3147
3148
0
    router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3149
3150
0
    rpc = nxt_port_rpc_register_handler_ex(task, router_port,
3151
0
                                           nxt_router_app_prefork_ready,
3152
0
                                           nxt_router_app_prefork_error,
3153
0
                                           sizeof(nxt_app_rpc_t));
3154
0
    if (nxt_slow_path(rpc == NULL)) {
3155
0
        goto fail;
3156
0
    }
3157
3158
0
    rpc->app = app;
3159
0
    rpc->temp_conf = tmcf;
3160
0
    rpc->proto = (b != NULL);
3161
3162
0
    stream = nxt_port_rpc_ex_stream(rpc);
3163
3164
0
    ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
3165
0
                                 port_fd, queue_fd, stream, router_port->id, b);
3166
0
    if (nxt_slow_path(ret != NXT_OK)) {
3167
0
        nxt_port_rpc_cancel(task, router_port, stream);
3168
0
        goto fail;
3169
0
    }
3170
3171
0
    if (b == NULL) {
3172
0
        nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
3173
3174
0
        app->pending_processes++;
3175
0
    }
3176
3177
0
    return;
3178
3179
0
fail:
3180
3181
0
    nxt_router_conf_error(task, tmcf);
3182
0
}
3183
3184
3185
static void
3186
nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3187
    void *data)
3188
0
{
3189
0
    nxt_app_t           *app;
3190
0
    nxt_port_t          *port;
3191
0
    nxt_app_rpc_t       *rpc;
3192
0
    nxt_event_engine_t  *engine;
3193
3194
0
    rpc = data;
3195
0
    app = rpc->app;
3196
3197
0
    port = msg->u.new_port;
3198
3199
0
    nxt_assert(port != NULL);
3200
0
    nxt_assert(port->id == 0);
3201
3202
0
    if (rpc->proto) {
3203
0
        nxt_assert(app->proto_port == NULL);
3204
0
        nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
3205
3206
0
        nxt_port_inc_use(port);
3207
3208
0
        app->proto_port = port;
3209
0
        port->app = app;
3210
3211
0
        nxt_router_app_rpc_create(task, rpc->temp_conf, app);
3212
3213
0
        return;
3214
0
    }
3215
3216
0
    nxt_assert(port->type == NXT_PROCESS_APP);
3217
3218
0
    port->app = app;
3219
0
    port->main_app_port = port;
3220
3221
0
    app->pending_processes--;
3222
0
    app->processes++;
3223
0
    app->idle_processes++;
3224
3225
0
    engine = task->thread->engine;
3226
3227
0
    nxt_queue_insert_tail(&app->ports, &port->app_link);
3228
0
    nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
3229
3230
0
    nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
3231
0
              &app->name, port->pid, port->id);
3232
3233
0
    nxt_port_hash_add(&app->port_hash, port);
3234
0
    app->port_hash_count++;
3235
3236
0
    port->idle_start = 0;
3237
3238
0
    nxt_port_inc_use(port);
3239
3240
0
    nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
3241
3242
0
    nxt_work_queue_add(&engine->fast_work_queue,
3243
0
                       nxt_router_conf_apply, task, rpc->temp_conf, NULL);
3244
0
}
3245
3246
3247
static void
3248
nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3249
    void *data)
3250
0
{
3251
0
    nxt_app_t               *app;
3252
0
    nxt_app_rpc_t           *rpc;
3253
0
    nxt_router_temp_conf_t  *tmcf;
3254
3255
0
    rpc = data;
3256
0
    app = rpc->app;
3257
0
    tmcf = rpc->temp_conf;
3258
3259
0
    if (rpc->proto) {
3260
0
        nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
3261
0
                &app->name);
3262
3263
0
    } else {
3264
0
        nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
3265
0
                &app->name);
3266
3267
0
        app->pending_processes--;
3268
0
    }
3269
3270
0
    nxt_router_conf_error(task, tmcf);
3271
0
}
3272
3273
3274
static nxt_int_t
3275
nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
3276
    nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
3277
0
{
3278
0
    nxt_int_t                 ret;
3279
0
    nxt_uint_t                n, threads;
3280
0
    nxt_queue_link_t          *qlk;
3281
0
    nxt_router_engine_conf_t  *recf;
3282
3283
0
    threads = tmcf->router_conf->threads;
3284
3285
0
    tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
3286
0
                                     sizeof(nxt_router_engine_conf_t));
3287
0
    if (nxt_slow_path(tmcf->engines == NULL)) {
3288
0
        return NXT_ERROR;
3289
0
    }
3290
3291
0
    n = 0;
3292
3293
0
    for (qlk = nxt_queue_first(&router->engines);
3294
0
         qlk != nxt_queue_tail(&router->engines);
3295
0
         qlk = nxt_queue_next(qlk))
3296
0
    {
3297
0
        recf = nxt_array_zero_add(tmcf->engines);
3298
0
        if (nxt_slow_path(recf == NULL)) {
3299
0
            return NXT_ERROR;
3300
0
        }
3301
3302
0
        recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
3303
3304
0
        if (n < threads) {
3305
0
            recf->action = NXT_ROUTER_ENGINE_KEEP;
3306
0
            ret = nxt_router_engine_conf_update(tmcf, recf);
3307
3308
0
        } else {
3309
0
            recf->action = NXT_ROUTER_ENGINE_DELETE;
3310
0
            ret = nxt_router_engine_conf_delete(tmcf, recf);
3311
0
        }
3312
3313
0
        if (nxt_slow_path(ret != NXT_OK)) {
3314
0
            return ret;
3315
0
        }
3316
3317
0
        n++;
3318
0
    }
3319
3320
0
    tmcf->new_threads = n;
3321
3322
0
    while (n < threads) {
3323
0
        recf = nxt_array_zero_add(tmcf->engines);
3324
0
        if (nxt_slow_path(recf == NULL)) {
3325
0
            return NXT_ERROR;
3326
0
        }
3327
3328
0
        recf->action = NXT_ROUTER_ENGINE_ADD;
3329
3330
0
        recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3331
0
        if (nxt_slow_path(recf->engine == NULL)) {
3332
0
            return NXT_ERROR;
3333
0
        }
3334
3335
0
        ret = nxt_router_engine_conf_create(tmcf, recf);
3336
0
        if (nxt_slow_path(ret != NXT_OK)) {
3337
0
            return ret;
3338
0
        }
3339
3340
0
        n++;
3341
0
    }
3342
3343
0
    return NXT_OK;
3344
0
}
3345
3346
3347
static nxt_int_t
3348
nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3349
    nxt_router_engine_conf_t *recf)
3350
0
{
3351
0
    nxt_int_t  ret;
3352
3353
0
    ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3354
0
                                          nxt_router_listen_socket_create);
3355
0
    if (nxt_slow_path(ret != NXT_OK)) {
3356
0
        return ret;
3357
0
    }
3358
3359
0
    ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3360
0
                                          nxt_router_listen_socket_create);
3361
0
    if (nxt_slow_path(ret != NXT_OK)) {
3362
0
        return ret;
3363
0
    }
3364
3365
0
    return ret;
3366
0
}
3367
3368
3369
static nxt_int_t
3370
nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3371
    nxt_router_engine_conf_t *recf)
3372
0
{
3373
0
    nxt_int_t  ret;
3374
3375
0
    ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3376
0
                                          nxt_router_listen_socket_create);
3377
0
    if (nxt_slow_path(ret != NXT_OK)) {
3378
0
        return ret;
3379
0
    }
3380
3381
0
    ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3382
0
                                          nxt_router_listen_socket_update);
3383
0
    if (nxt_slow_path(ret != NXT_OK)) {
3384
0
        return ret;
3385
0
    }
3386
3387
0
    ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3388
0
    if (nxt_slow_path(ret != NXT_OK)) {
3389
0
        return ret;
3390
0
    }
3391
3392
0
    return ret;
3393
0
}
3394
3395
3396
static nxt_int_t
3397
nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3398
    nxt_router_engine_conf_t *recf)
3399
0
{
3400
0
    nxt_int_t  ret;
3401
3402
0
    ret = nxt_router_engine_quit(tmcf, recf);
3403
0
    if (nxt_slow_path(ret != NXT_OK)) {
3404
0
        return ret;
3405
0
    }
3406
3407
0
    ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3408
0
    if (nxt_slow_path(ret != NXT_OK)) {
3409
0
        return ret;
3410
0
    }
3411
3412
0
    return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3413
0
}
3414
3415
3416
static nxt_int_t
3417
nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3418
    nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3419
    nxt_work_handler_t handler)
3420
0
{
3421
0
    nxt_int_t                ret;
3422
0
    nxt_joint_job_t          *job;
3423
0
    nxt_queue_link_t         *qlk;
3424
0
    nxt_socket_conf_t        *skcf;
3425
0
    nxt_socket_conf_joint_t  *joint;
3426
3427
0
    for (qlk = nxt_queue_first(sockets);
3428
0
         qlk != nxt_queue_tail(sockets);
3429
0
         qlk = nxt_queue_next(qlk))
3430
0
    {
3431
0
        job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3432
0
        if (nxt_slow_path(job == NULL)) {
3433
0
            return NXT_ERROR;
3434
0
        }
3435
3436
0
        job->work.next = recf->jobs;
3437
0
        recf->jobs = &job->work;
3438
3439
0
        job->task = tmcf->engine->task;
3440
0
        job->work.handler = handler;
3441
0
        job->work.task = &job->task;
3442
0
        job->work.obj = job;
3443
0
        job->tmcf = tmcf;
3444
3445
0
        tmcf->count++;
3446
3447
0
        joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3448
0
                             sizeof(nxt_socket_conf_joint_t));
3449
0
        if (nxt_slow_path(joint == NULL)) {
3450
0
            return NXT_ERROR;
3451
0
        }
3452
3453
0
        job->work.data = joint;
3454
3455
0
        ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3456
0
        if (nxt_slow_path(ret != NXT_OK)) {
3457
0
            return ret;
3458
0
        }
3459
3460
0
        joint->count = 1;
3461
3462
0
        skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3463
0
        skcf->count++;
3464
0
        joint->socket_conf = skcf;
3465
3466
0
        joint->engine = recf->engine;
3467
0
    }
3468
3469
0
    return NXT_OK;
3470
0
}
3471
3472
3473
static nxt_int_t
3474
nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3475
    nxt_router_engine_conf_t *recf)
3476
0
{
3477
0
    nxt_joint_job_t  *job;
3478
3479
0
    job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3480
0
    if (nxt_slow_path(job == NULL)) {
3481
0
        return NXT_ERROR;
3482
0
    }
3483
3484
0
    job->work.next = recf->jobs;
3485
0
    recf->jobs = &job->work;
3486
3487
0
    job->task = tmcf->engine->task;
3488
0
    job->work.handler = nxt_router_worker_thread_quit;
3489
0
    job->work.task = &job->task;
3490
0
    job->work.obj = NULL;
3491
0
    job->work.data = NULL;
3492
0
    job->tmcf = NULL;
3493
3494
0
    return NXT_OK;
3495
0
}
3496
3497
3498
static nxt_int_t
3499
nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3500
    nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3501
0
{
3502
0
    nxt_joint_job_t   *job;
3503
0
    nxt_queue_link_t  *qlk;
3504
3505
0
    for (qlk = nxt_queue_first(sockets);
3506
0
         qlk != nxt_queue_tail(sockets);
3507
0
         qlk = nxt_queue_next(qlk))
3508
0
    {
3509
0
        job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3510
0
        if (nxt_slow_path(job == NULL)) {
3511
0
            return NXT_ERROR;
3512
0
        }
3513
3514
0
        job->work.next = recf->jobs;
3515
0
        recf->jobs = &job->work;
3516
3517
0
        job->task = tmcf->engine->task;
3518
0
        job->work.handler = nxt_router_listen_socket_delete;
3519
0
        job->work.task = &job->task;
3520
0
        job->work.obj = job;
3521
0
        job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3522
0
        job->tmcf = tmcf;
3523
3524
0
        tmcf->count++;
3525
0
    }
3526
3527
0
    return NXT_OK;
3528
0
}
3529
3530
3531
static nxt_int_t
3532
nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3533
    nxt_router_temp_conf_t *tmcf)
3534
0
{
3535
0
    nxt_int_t                 ret;
3536
0
    nxt_uint_t                i, threads;
3537
0
    nxt_router_engine_conf_t  *recf;
3538
3539
0
    recf = tmcf->engines->elts;
3540
0
    threads = tmcf->router_conf->threads;
3541
3542
0
    for (i = tmcf->new_threads; i < threads; i++) {
3543
0
        ret = nxt_router_thread_create(task, rt, recf[i].engine);
3544
0
        if (nxt_slow_path(ret != NXT_OK)) {
3545
0
            return ret;
3546
0
        }
3547
0
    }
3548
3549
0
    return NXT_OK;
3550
0
}
3551
3552
3553
static nxt_int_t
3554
nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3555
    nxt_event_engine_t *engine)
3556
0
{
3557
0
    nxt_int_t            ret;
3558
0
    nxt_thread_link_t    *link;
3559
0
    nxt_thread_handle_t  handle;
3560
3561
0
    link = nxt_zalloc(sizeof(nxt_thread_link_t));
3562
3563
0
    if (nxt_slow_path(link == NULL)) {
3564
0
        return NXT_ERROR;
3565
0
    }
3566
3567
0
    link->start = nxt_router_thread_start;
3568
0
    link->engine = engine;
3569
0
    link->work.handler = nxt_router_thread_exit_handler;
3570
0
    link->work.task = task;
3571
0
    link->work.data = link;
3572
3573
0
    nxt_queue_insert_tail(&rt->engines, &engine->link);
3574
3575
0
    ret = nxt_thread_create(&handle, link);
3576
3577
0
    if (nxt_slow_path(ret != NXT_OK)) {
3578
0
        nxt_queue_remove(&engine->link);
3579
0
    }
3580
3581
0
    return ret;
3582
0
}
3583
3584
3585
static void
3586
nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3587
    nxt_router_temp_conf_t *tmcf)
3588
0
{
3589
0
    nxt_app_t  *app;
3590
3591
0
    nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3592
3593
0
        nxt_router_app_unlink(task, app);
3594
3595
0
    } nxt_queue_loop;
3596
3597
0
    nxt_queue_add(&router->apps, &tmcf->previous);
3598
0
    nxt_queue_add(&router->apps, &tmcf->apps);
3599
0
}
3600
3601
3602
static void
3603
nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3604
0
{
3605
0
    nxt_uint_t                n;
3606
0
    nxt_event_engine_t        *engine;
3607
0
    nxt_router_engine_conf_t  *recf;
3608
3609
0
    recf = tmcf->engines->elts;
3610
3611
0
    for (n = tmcf->engines->nelts; n != 0; n--) {
3612
0
        engine = recf->engine;
3613
3614
0
        switch (recf->action) {
3615
3616
0
        case NXT_ROUTER_ENGINE_KEEP:
3617
0
            break;
3618
3619
0
        case NXT_ROUTER_ENGINE_ADD:
3620
0
            nxt_queue_insert_tail(&router->engines, &engine->link0);
3621
0
            break;
3622
3623
0
        case NXT_ROUTER_ENGINE_DELETE:
3624
0
            nxt_queue_remove(&engine->link0);
3625
0
            break;
3626
0
        }
3627
3628
0
        nxt_router_engine_post(engine, recf->jobs);
3629
3630
0
        recf++;
3631
0
    }
3632
0
}
3633
3634
3635
static void
3636
nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3637
0
{
3638
0
    nxt_work_t  *work, *next;
3639
3640
0
    for (work = jobs; work != NULL; work = next) {
3641
0
        next = work->next;
3642
0
        work->next = NULL;
3643
3644
0
        nxt_event_engine_post(engine, work);
3645
0
    }
3646
0
}
3647
3648
3649
static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3650
    .rpc_error       = nxt_port_rpc_handler,
3651
    .mmap            = nxt_port_mmap_handler,
3652
    .data            = nxt_port_rpc_handler,
3653
    .oosm            = nxt_router_oosm_handler,
3654
    .req_headers_ack = nxt_port_rpc_handler,
3655
};
3656
3657
3658
static void
3659
nxt_router_thread_start(void *data)
3660
0
{
3661
0
    nxt_int_t           ret;
3662
0
    nxt_port_t          *port;
3663
0
    nxt_task_t          *task;
3664
0
    nxt_work_t          *work;
3665
0
    nxt_thread_t        *thread;
3666
0
    nxt_thread_link_t   *link;
3667
0
    nxt_event_engine_t  *engine;
3668
3669
0
    link = data;
3670
0
    engine = link->engine;
3671
0
    task = &engine->task;
3672
3673
0
    thread = nxt_thread();
3674
3675
0
    nxt_event_engine_thread_adopt(engine);
3676
3677
    /* STUB */
3678
0
    thread->runtime = engine->task.thread->runtime;
3679
3680
0
    engine->task.thread = thread;
3681
0
    engine->task.log = thread->log;
3682
0
    thread->engine = engine;
3683
0
    thread->task = &engine->task;
3684
#if 0
3685
    thread->fiber = &engine->fibers->fiber;
3686
#endif
3687
3688
0
    engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3689
0
    if (nxt_slow_path(engine->mem_pool == NULL)) {
3690
0
        return;
3691
0
    }
3692
3693
0
    port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3694
0
                        NXT_PROCESS_ROUTER);
3695
0
    if (nxt_slow_path(port == NULL)) {
3696
0
        return;
3697
0
    }
3698
3699
0
    ret = nxt_port_socket_init(task, port, 0);
3700
0
    if (nxt_slow_path(ret != NXT_OK)) {
3701
0
        nxt_port_use(task, port, -1);
3702
0
        return;
3703
0
    }
3704
3705
0
    ret = nxt_router_port_queue_init(task, port);
3706
0
    if (nxt_slow_path(ret != NXT_OK)) {
3707
0
        nxt_port_use(task, port, -1);
3708
0
        return;
3709
0
    }
3710
3711
0
    engine->port = port;
3712
3713
0
    nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3714
3715
0
    work = nxt_zalloc(sizeof(nxt_work_t));
3716
0
    if (nxt_slow_path(work == NULL)) {
3717
0
        return;
3718
0
    }
3719
3720
0
    work->handler = nxt_router_rt_add_port;
3721
0
    work->task = link->work.task;
3722
0
    work->obj = work;
3723
0
    work->data = port;
3724
3725
0
    nxt_event_engine_post(link->work.task->thread->engine, work);
3726
3727
0
    nxt_event_engine_start(engine);
3728
0
}
3729
3730
3731
static void
3732
nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3733
0
{
3734
0
    nxt_int_t      res;
3735
0
    nxt_port_t     *port;
3736
0
    nxt_runtime_t  *rt;
3737
3738
0
    rt = task->thread->runtime;
3739
0
    port = data;
3740
3741
0
    nxt_free(obj);
3742
3743
0
    res = nxt_port_hash_add(&rt->ports, port);
3744
3745
0
    if (nxt_fast_path(res == NXT_OK)) {
3746
0
        nxt_port_use(task, port, 1);
3747
0
    }
3748
0
}
3749
3750
3751
static void
3752
nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3753
0
{
3754
0
    nxt_joint_job_t          *job;
3755
0
    nxt_socket_conf_t        *skcf;
3756
0
    nxt_listen_event_t       *lev;
3757
0
    nxt_listen_socket_t      *ls;
3758
0
    nxt_thread_spinlock_t    *lock;
3759
0
    nxt_socket_conf_joint_t  *joint;
3760
3761
0
    job = obj;
3762
0
    joint = data;
3763
3764
0
    nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3765
3766
0
    skcf = joint->socket_conf;
3767
0
    ls = skcf->listen;
3768
3769
0
    lev = nxt_listen_event(task, ls);
3770
0
    if (nxt_slow_path(lev == NULL)) {
3771
0
        nxt_router_listen_socket_release(task, skcf);
3772
0
        return;
3773
0
    }
3774
3775
0
    lev->socket.data = joint;
3776
3777
0
    lock = &skcf->router_conf->router->lock;
3778
3779
0
    nxt_thread_spin_lock(lock);
3780
0
    ls->count++;
3781
0
    nxt_thread_spin_unlock(lock);
3782
3783
0
    job->work.next = NULL;
3784
0
    job->work.handler = nxt_router_conf_wait;
3785
3786
0
    nxt_event_engine_post(job->tmcf->engine, &job->work);
3787
0
}
3788
3789
3790
nxt_inline nxt_listen_event_t *
3791
nxt_router_listen_event(nxt_queue_t *listen_connections,
3792
    nxt_socket_conf_t *skcf)
3793
0
{
3794
0
    nxt_socket_t        fd;
3795
0
    nxt_queue_link_t    *qlk;
3796
0
    nxt_listen_event_t  *lev;
3797
3798
0
    fd = skcf->listen->socket;
3799
3800
0
    for (qlk = nxt_queue_first(listen_connections);
3801
0
         qlk != nxt_queue_tail(listen_connections);
3802
0
         qlk = nxt_queue_next(qlk))
3803
0
    {
3804
0
        lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3805
3806
0
        if (fd == lev->socket.fd) {
3807
0
            return lev;
3808
0
        }
3809
0
    }
3810
3811
0
    return NULL;
3812
0
}
3813
3814
3815
static void
3816
nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3817
0
{
3818
0
    nxt_joint_job_t          *job;
3819
0
    nxt_event_engine_t       *engine;
3820
0
    nxt_listen_event_t       *lev;
3821
0
    nxt_socket_conf_joint_t  *joint, *old;
3822
3823
0
    job = obj;
3824
0
    joint = data;
3825
3826
0
    engine = task->thread->engine;
3827
3828
0
    nxt_queue_insert_tail(&engine->joints, &joint->link);
3829
3830
0
    lev = nxt_router_listen_event(&engine->listen_connections,
3831
0
                                  joint->socket_conf);
3832
3833
0
    old = lev->socket.data;
3834
0
    lev->socket.data = joint;
3835
0
    lev->listen = joint->socket_conf->listen;
3836
3837
0
    job->work.next = NULL;
3838
0
    job->work.handler = nxt_router_conf_wait;
3839
3840
0
    nxt_event_engine_post(job->tmcf->engine, &job->work);
3841
3842
    /*
3843
     * The task is allocated from configuration temporary
3844
     * memory pool so it can be freed after engine post operation.
3845
     */
3846
3847
0
    nxt_router_conf_release(&engine->task, old);
3848
0
}
3849
3850
3851
static void
3852
nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3853
0
{
3854
0
    nxt_socket_conf_t        *skcf;
3855
0
    nxt_listen_event_t       *lev;
3856
0
    nxt_event_engine_t       *engine;
3857
0
    nxt_socket_conf_joint_t  *joint;
3858
3859
0
    skcf = data;
3860
3861
0
    engine = task->thread->engine;
3862
3863
0
    lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3864
3865
0
    nxt_fd_event_delete(engine, &lev->socket);
3866
3867
0
    nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3868
0
              lev->socket.fd);
3869
3870
0
    joint = lev->socket.data;
3871
0
    joint->close_job = obj;
3872
3873
0
    lev->timer.handler = nxt_router_listen_socket_close;
3874
0
    lev->timer.work_queue = &engine->fast_work_queue;
3875
3876
0
    nxt_timer_add(engine, &lev->timer, 0);
3877
0
}
3878
3879
3880
static void
3881
nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3882
0
{
3883
0
    nxt_event_engine_t  *engine;
3884
3885
0
    nxt_debug(task, "router worker thread quit");
3886
3887
0
    engine = task->thread->engine;
3888
3889
0
    engine->shutdown = 1;
3890
3891
0
    if (nxt_queue_is_empty(&engine->joints)) {
3892
0
        nxt_thread_exit(task->thread);
3893
0
    }
3894
0
}
3895
3896
3897
static void
3898
nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3899
0
{
3900
0
    nxt_timer_t              *timer;
3901
0
    nxt_joint_job_t          *job;
3902
0
    nxt_listen_event_t       *lev;
3903
0
    nxt_socket_conf_joint_t  *joint;
3904
3905
0
    timer = obj;
3906
0
    lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3907
3908
0
    nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3909
0
              lev->socket.fd);
3910
3911
0
    nxt_queue_remove(&lev->link);
3912
3913
0
    joint = lev->socket.data;
3914
0
    lev->socket.data = NULL;
3915
3916
    /* 'task' refers to lev->task and we cannot use after nxt_free() */
3917
0
    task = &task->thread->engine->task;
3918
3919
0
    nxt_router_listen_socket_release(task, joint->socket_conf);
3920
3921
0
    job = joint->close_job;
3922
0
    job->work.next = NULL;
3923
0
    job->work.handler = nxt_router_conf_wait;
3924
3925
0
    nxt_event_engine_post(job->tmcf->engine, &job->work);
3926
3927
0
    nxt_router_listen_event_release(task, lev, joint);
3928
0
}
3929
3930
3931
static void
3932
nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3933
0
{
3934
0
#if (NXT_HAVE_UNIX_DOMAIN)
3935
0
    size_t                 size;
3936
0
    nxt_buf_t              *b;
3937
0
    nxt_port_t             *main_port;
3938
0
    nxt_runtime_t          *rt;
3939
0
    nxt_sockaddr_t         *sa;
3940
0
#endif
3941
0
    nxt_listen_socket_t    *ls;
3942
0
    nxt_thread_spinlock_t  *lock;
3943
3944
0
    ls = skcf->listen;
3945
0
    lock = &skcf->router_conf->router->lock;
3946
3947
0
    nxt_thread_spin_lock(lock);
3948
3949
0
    nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3950
0
              task->thread->engine, ls->count);
3951
3952
0
    if (--ls->count != 0) {
3953
0
        ls = NULL;
3954
0
    }
3955
3956
0
    nxt_thread_spin_unlock(lock);
3957
3958
0
    if (ls == NULL) {
3959
0
        return;
3960
0
    }
3961
3962
0
    nxt_socket_close(task, ls->socket);
3963
3964
0
#if (NXT_HAVE_UNIX_DOMAIN)
3965
0
    sa = ls->sockaddr;
3966
0
    if (sa->u.sockaddr.sa_family != AF_UNIX
3967
0
        || sa->u.sockaddr_un.sun_path[0] == '\0')
3968
0
    {
3969
0
        goto out_free_ls;
3970
0
    }
3971
3972
0
    size = nxt_sockaddr_size(ls->sockaddr);
3973
3974
0
    b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
3975
0
    if (b == NULL) {
3976
0
        goto out_free_ls;
3977
0
    }
3978
3979
0
    b->mem.free = nxt_cpymem(b->mem.free, ls->sockaddr, size);
3980
3981
0
    rt = task->thread->runtime;
3982
0
    main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3983
3984
0
    (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET_UNLINK,
3985
0
                                 -1, 0, 0, b);
3986
3987
0
out_free_ls:
3988
0
#endif
3989
0
    nxt_free(ls);
3990
0
}
3991
3992
3993
void
3994
nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3995
    nxt_socket_conf_joint_t *joint)
3996
0
{
3997
0
    nxt_event_engine_t  *engine;
3998
3999
0
    nxt_debug(task, "listen event count: %D", lev->count);
4000
4001
0
    engine = task->thread->engine;
4002
4003
0
    if (--lev->count == 0) {
4004
0
        if (lev->next != NULL) {
4005
0
            nxt_sockaddr_cache_free(engine, lev->next);
4006
4007
0
            nxt_conn_free(task, lev->next);
4008
0
        }
4009
4010
0
        nxt_free(lev);
4011
0
    }
4012
4013
0
    if (joint != NULL) {
4014
0
        nxt_router_conf_release(task, joint);
4015
0
    }
4016
4017
0
    if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
4018
0
        nxt_thread_exit(task->thread);
4019
0
    }
4020
0
}
4021
4022
4023
void
4024
nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
4025
0
{
4026
0
    nxt_socket_conf_t      *skcf;
4027
0
    nxt_router_conf_t      *rtcf;
4028
0
    nxt_thread_spinlock_t  *lock;
4029
4030
0
    nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
4031
4032
0
    if (--joint->count != 0) {
4033
0
        return;
4034
0
    }
4035
4036
0
    nxt_queue_remove(&joint->link);
4037
4038
    /*
4039
     * The joint content can not be safely used after the critical
4040
     * section protected by the spinlock because its memory pool may
4041
     * be already destroyed by another thread.
4042
     */
4043
0
    skcf = joint->socket_conf;
4044
0
    rtcf = skcf->router_conf;
4045
0
    lock = &rtcf->router->lock;
4046
4047
0
    nxt_thread_spin_lock(lock);
4048
4049
0
    nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
4050
0
              rtcf, rtcf->count);
4051
4052
0
    if (--skcf->count != 0) {
4053
0
        skcf = NULL;
4054
0
        rtcf = NULL;
4055
4056
0
    } else {
4057
0
        nxt_queue_remove(&skcf->link);
4058
4059
0
        if (--rtcf->count != 0) {
4060
0
            rtcf = NULL;
4061
0
        }
4062
0
    }
4063
4064
0
    nxt_thread_spin_unlock(lock);
4065
4066
#if (NXT_TLS)
4067
    if (skcf != NULL && skcf->tls != NULL) {
4068
        task->thread->runtime->tls->server_free(task, skcf->tls);
4069
    }
4070
#endif
4071
4072
    /* TODO remove engine->port */
4073
4074
0
    if (rtcf != NULL) {
4075
0
        nxt_debug(task, "old router conf is destroyed");
4076
4077
0
        nxt_router_apps_hash_use(task, rtcf, -1);
4078
4079
0
        nxt_router_access_log_release(task, lock, rtcf->access_log);
4080
4081
0
        nxt_tstr_state_release(rtcf->tstr_state);
4082
4083
0
        nxt_mp_thread_adopt(rtcf->mem_pool);
4084
4085
0
        nxt_mp_destroy(rtcf->mem_pool);
4086
0
    }
4087
0
}
4088
4089
4090
static void
4091
nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4092
0
{
4093
0
    nxt_port_t           *port;
4094
0
    nxt_thread_link_t    *link;
4095
0
    nxt_event_engine_t   *engine;
4096
0
    nxt_thread_handle_t  handle;
4097
4098
0
    handle = (nxt_thread_handle_t) (uintptr_t) obj;
4099
0
    link = data;
4100
4101
0
    nxt_thread_wait(handle);
4102
4103
0
    engine = link->engine;
4104
4105
0
    nxt_queue_remove(&engine->link);
4106
4107
0
    port = engine->port;
4108
4109
    // TODO notify all apps
4110
4111
0
    port->engine = task->thread->engine;
4112
0
    nxt_mp_thread_adopt(port->mem_pool);
4113
0
    nxt_port_use(task, port, -1);
4114
4115
0
    nxt_mp_thread_adopt(engine->mem_pool);
4116
0
    nxt_mp_destroy(engine->mem_pool);
4117
4118
0
    nxt_event_engine_free(engine);
4119
4120
0
    nxt_free(link);
4121
0
}
4122
4123
4124
static void
4125
nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4126
    void *data)
4127
0
{
4128
0
    size_t                  b_size, count;
4129
0
    nxt_int_t               ret;
4130
0
    nxt_app_t               *app;
4131
0
    nxt_buf_t               *b, *next;
4132
0
    nxt_port_t              *app_port;
4133
0
    nxt_unit_field_t        *f;
4134
0
    nxt_http_field_t        *field;
4135
0
    nxt_http_request_t      *r;
4136
0
    nxt_unit_response_t     *resp;
4137
0
    nxt_request_rpc_data_t  *req_rpc_data;
4138
4139
0
    req_rpc_data = data;
4140
4141
0
    r = req_rpc_data->request;
4142
0
    if (nxt_slow_path(r == NULL)) {
4143
0
        return;
4144
0
    }
4145
4146
0
    if (r->error) {
4147
0
        nxt_request_rpc_data_unlink(task, req_rpc_data);
4148
0
        return;
4149
0
    }
4150
4151
0
    app = req_rpc_data->app;
4152
0
    nxt_assert(app != NULL);
4153
4154
0
    if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4155
0
        nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4156
4157
0
        return;
4158
0
    }
4159
4160
0
    b = (msg->size == 0) ? NULL : msg->buf;
4161
4162
0
    if (msg->port_msg.last != 0) {
4163
0
        nxt_debug(task, "router data create last buf");
4164
4165
0
        nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4166
4167
0
        req_rpc_data->rpc_cancel = 0;
4168
4169
0
        if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4170
0
            req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4171
0
        }
4172
4173
0
        nxt_request_rpc_data_unlink(task, req_rpc_data);
4174
4175
0
    } else {
4176
0
        if (app->timeout != 0) {
4177
0
            r->timer.handler = nxt_router_app_timeout;
4178
0
            r->timer_data = req_rpc_data;
4179
0
            nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4180
0
        }
4181
0
    }
4182
4183
0
    if (b == NULL) {
4184
0
        return;
4185
0
    }
4186
4187
0
    if (msg->buf == b) {
4188
        /* Disable instant buffer completion/re-using by port. */
4189
0
        msg->buf = NULL;
4190
0
    }
4191
4192
0
    if (r->header_sent) {
4193
0
        nxt_buf_chain_add(&r->out, b);
4194
4195
0
        ret = nxt_http_comp_compress_app_response(task, r, &r->out);
4196
0
        if (ret == NXT_ERROR) {
4197
0
            goto fail;
4198
0
        }
4199
4200
0
        nxt_http_request_send_body(task, r, NULL);
4201
0
    } else {
4202
0
        b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4203
4204
0
        if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4205
0
            nxt_alert(task, "response buffer too small: %z", b_size);
4206
0
            goto fail;
4207
0
        }
4208
4209
0
        resp = (void *) b->mem.pos;
4210
0
        count = (b_size - sizeof(nxt_unit_response_t))
4211
0
                    / sizeof(nxt_unit_field_t);
4212
4213
0
        if (nxt_slow_path(count < resp->fields_count)) {
4214
0
            nxt_alert(task, "response buffer too small for fields count: %D",
4215
0
                      resp->fields_count);
4216
0
            goto fail;
4217
0
        }
4218
4219
0
        field = NULL;
4220
4221
0
        for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4222
0
            if (f->skip) {
4223
0
                continue;
4224
0
            }
4225
4226
0
            field = nxt_list_add(r->resp.fields);
4227
4228
0
            if (nxt_slow_path(field == NULL)) {
4229
0
                goto fail;
4230
0
            }
4231
4232
0
            field->hash = f->hash;
4233
0
            field->skip = 0;
4234
0
            field->hopbyhop = 0;
4235
4236
0
            field->name_length = f->name_length;
4237
0
            field->value_length = f->value_length;
4238
0
            field->name = nxt_unit_sptr_get(&f->name);
4239
0
            field->value = nxt_unit_sptr_get(&f->value);
4240
4241
0
            ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4242
0
            if (nxt_slow_path(ret != NXT_OK)) {
4243
0
                goto fail;
4244
0
            }
4245
4246
0
            nxt_debug(task, "header%s: %*s: %*s",
4247
0
                      (field->skip ? " skipped" : ""),
4248
0
                      (size_t) field->name_length, field->name,
4249
0
                      (size_t) field->value_length, field->value);
4250
4251
0
            if (field->skip) {
4252
0
                r->resp.fields->last->nelts--;
4253
0
            }
4254
0
        }
4255
4256
0
        r->status = resp->status;
4257
4258
0
        if (resp->piggyback_content_length != 0) {
4259
0
            b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4260
0
            b->mem.free = b->mem.pos + resp->piggyback_content_length;
4261
4262
0
        } else {
4263
0
            b->mem.pos = b->mem.free;
4264
0
        }
4265
4266
0
        if (nxt_buf_mem_used_size(&b->mem) == 0) {
4267
0
            next = b->next;
4268
0
            b->next = NULL;
4269
4270
0
            nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4271
0
                               b->completion_handler, task, b, b->parent);
4272
4273
0
            b = next;
4274
0
        }
4275
4276
0
        if (b != NULL) {
4277
0
            nxt_buf_chain_add(&r->out, b);
4278
0
        }
4279
4280
0
        ret = nxt_http_comp_check_compression(task, r);
4281
0
        if (ret != NXT_OK) {
4282
0
            goto fail;
4283
0
        }
4284
4285
0
        nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4286
4287
0
        if (r->websocket_handshake
4288
0
            && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4289
0
        {
4290
0
            app_port = req_rpc_data->app_port;
4291
0
            if (nxt_slow_path(app_port == NULL)) {
4292
0
                goto fail;
4293
0
            }
4294
4295
0
            nxt_thread_mutex_lock(&app->mutex);
4296
4297
0
            app_port->main_app_port->active_websockets++;
4298
4299
0
            nxt_thread_mutex_unlock(&app->mutex);
4300
4301
0
            nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
4302
0
            req_rpc_data->apr_action = NXT_APR_CLOSE;
4303
4304
0
            nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4305
4306
0
            r->state = &nxt_http_websocket;
4307
4308
0
        } else {
4309
0
            r->state = &nxt_http_request_send_state;
4310
0
        }
4311
0
    }
4312
4313
0
    return;
4314
4315
0
fail:
4316
4317
0
    nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4318
4319
0
    nxt_request_rpc_data_unlink(task, req_rpc_data);
4320
0
}
4321
4322
4323
static void
4324
nxt_router_req_headers_ack_handler(nxt_task_t *task,
4325
    nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4326
0
{
4327
0
    int                 res;
4328
0
    nxt_app_t           *app;
4329
0
    nxt_buf_t           *b;
4330
0
    nxt_bool_t          start_process, unlinked;
4331
0
    nxt_port_t          *app_port, *main_app_port, *idle_port;
4332
0
    nxt_queue_link_t    *idle_lnk;
4333
0
    nxt_http_request_t  *r;
4334
4335
0
    nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4336
0
              req_rpc_data->stream,
4337
0
              msg->port_msg.pid, msg->port_msg.reply_port);
4338
4339
0
    nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4340
0
                             msg->port_msg.pid);
4341
4342
0
    app = req_rpc_data->app;
4343
0
    r = req_rpc_data->request;
4344
4345
0
    start_process = 0;
4346
0
    unlinked = 0;
4347
4348
0
    nxt_thread_mutex_lock(&app->mutex);
4349
4350
0
    if (r->app_link.next != NULL) {
4351
0
        nxt_queue_remove(&r->app_link);
4352
0
        r->app_link.next = NULL;
4353
4354
0
        unlinked = 1;
4355
0
    }
4356
4357
0
    app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4358
0
                                  msg->port_msg.reply_port);
4359
0
    if (nxt_slow_path(app_port == NULL)) {
4360
0
        nxt_thread_mutex_unlock(&app->mutex);
4361
4362
0
        nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4363
4364
0
        if (unlinked) {
4365
0
            nxt_mp_release(r->mem_pool);
4366
0
        }
4367
4368
0
        return;
4369
0
    }
4370
4371
0
    main_app_port = app_port->main_app_port;
4372
4373
0
    if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4374
0
        app->idle_processes--;
4375
4376
0
        nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4377
0
                  &app->name, main_app_port->pid, main_app_port->id,
4378
0
                  (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4379
4380
        /* Check port was in 'spare_ports' using idle_start field. */
4381
0
        if (main_app_port->idle_start == 0
4382
0
            && app->idle_processes >= app->spare_processes)
4383
0
        {
4384
            /*
4385
             * If there is a vacant space in spare ports,
4386
             * move the last idle to spare_ports.
4387
             */
4388
0
            nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4389
4390
0
            idle_lnk = nxt_queue_last(&app->idle_ports);
4391
0
            idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4392
0
            nxt_queue_remove(idle_lnk);
4393
4394
0
            nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4395
4396
0
            idle_port->idle_start = 0;
4397
4398
0
            nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4399
0
                      "to spare_ports",
4400
0
                      &app->name, idle_port->pid, idle_port->id);
4401
0
        }
4402
4403
0
        if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4404
0
            app->pending_processes++;
4405
0
            start_process = 1;
4406
0
        }
4407
0
    }
4408
4409
0
    main_app_port->active_requests++;
4410
4411
0
    nxt_port_inc_use(app_port);
4412
4413
0
    nxt_thread_mutex_unlock(&app->mutex);
4414
4415
0
    if (unlinked) {
4416
0
        nxt_mp_release(r->mem_pool);
4417
0
    }
4418
4419
0
    if (start_process) {
4420
0
        nxt_router_start_app_process(task, app);
4421
0
    }
4422
4423
0
    nxt_port_use(task, req_rpc_data->app_port, -1);
4424
4425
0
    req_rpc_data->app_port = app_port;
4426
4427
0
    b = req_rpc_data->msg_info.buf;
4428
4429
0
    if (b != NULL) {
4430
        /* First buffer is already sent.  Start from second. */
4431
0
        b = b->next;
4432
4433
0
        req_rpc_data->msg_info.buf->next = NULL;
4434
0
    }
4435
4436
0
    if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4437
0
        nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4438
0
                  req_rpc_data->msg_info.body_fd);
4439
4440
0
        if (req_rpc_data->msg_info.body_fd != -1) {
4441
0
            lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4442
0
        }
4443
4444
0
        res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4445
0
                                    req_rpc_data->msg_info.body_fd,
4446
0
                                    req_rpc_data->stream,
4447
0
                                    task->thread->engine->port->id, b);
4448
4449
0
        if (nxt_slow_path(res != NXT_OK)) {
4450
0
            nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4451
0
        }
4452
0
    }
4453
4454
0
    if (app->timeout != 0) {
4455
0
        r->timer.handler = nxt_router_app_timeout;
4456
0
        r->timer_data = req_rpc_data;
4457
0
        nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4458
0
    }
4459
0
}
4460
4461
4462
static const nxt_http_request_state_t  nxt_http_request_send_state
4463
    nxt_aligned(64) =
4464
{
4465
    .error_handler = nxt_http_request_error_handler,
4466
};
4467
4468
4469
static void
4470
nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4471
0
{
4472
0
    nxt_buf_t           *out;
4473
0
    nxt_http_request_t  *r;
4474
4475
0
    r = obj;
4476
4477
0
    out = r->out;
4478
4479
0
    if (out != NULL) {
4480
0
        r->out = NULL;
4481
0
        nxt_http_request_send(task, r, out);
4482
0
    }
4483
0
}
4484
4485
4486
static void
4487
nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4488
    void *data)
4489
0
{
4490
0
    nxt_request_rpc_data_t  *req_rpc_data;
4491
4492
0
    req_rpc_data = data;
4493
4494
0
    req_rpc_data->rpc_cancel = 0;
4495
4496
    /* TODO cancel message and return if cancelled. */
4497
    // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4498
4499
0
    if (req_rpc_data->request != NULL) {
4500
0
        nxt_http_request_error(task, req_rpc_data->request,
4501
0
                               NXT_HTTP_SERVICE_UNAVAILABLE);
4502
0
    }
4503
4504
0
    nxt_request_rpc_data_unlink(task, req_rpc_data);
4505
0
}
4506
4507
4508
static void
4509
nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4510
    void *data)
4511
0
{
4512
0
    uint32_t             n;
4513
0
    nxt_app_t            *app;
4514
0
    nxt_bool_t           start_process, restarted;
4515
0
    nxt_port_t           *port;
4516
0
    nxt_app_joint_t      *app_joint;
4517
0
    nxt_app_joint_rpc_t  *app_joint_rpc;
4518
4519
0
    nxt_assert(data != NULL);
4520
4521
0
    app_joint_rpc = data;
4522
0
    app_joint = app_joint_rpc->app_joint;
4523
0
    port = msg->u.new_port;
4524
4525
0
    nxt_assert(app_joint != NULL);
4526
0
    nxt_assert(port != NULL);
4527
0
    nxt_assert(port->id == 0);
4528
4529
0
    app = app_joint->app;
4530
4531
0
    nxt_router_app_joint_use(task, app_joint, -1);
4532
4533
0
    if (nxt_slow_path(app == NULL)) {
4534
0
        nxt_debug(task, "new port ready for released app, send QUIT");
4535
4536
0
        nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4537
4538
0
        return;
4539
0
    }
4540
4541
0
    nxt_thread_mutex_lock(&app->mutex);
4542
4543
0
    restarted = (app->generation != app_joint_rpc->generation);
4544
4545
0
    if (app_joint_rpc->proto) {
4546
0
        nxt_assert(app->proto_port == NULL);
4547
0
        nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
4548
4549
0
        n = app->proto_port_requests;
4550
0
        app->proto_port_requests = 0;
4551
4552
0
        if (nxt_slow_path(restarted)) {
4553
0
            nxt_thread_mutex_unlock(&app->mutex);
4554
4555
0
            nxt_debug(task, "proto port ready for restarted app, send QUIT");
4556
4557
0
            nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4558
0
                                  NULL);
4559
4560
0
        } else {
4561
0
            port->app = app;
4562
0
            app->proto_port = port;
4563
4564
0
            nxt_thread_mutex_unlock(&app->mutex);
4565
4566
0
            nxt_port_use(task, port, 1);
4567
0
        }
4568
4569
0
        port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER];
4570
4571
0
        while (n > 0) {
4572
0
            nxt_router_app_use(task, app, 1);
4573
4574
0
            nxt_router_start_app_process_handler(task, port, app);
4575
4576
0
            n--;
4577
0
        }
4578
4579
0
        return;
4580
0
    }
4581
4582
0
    nxt_assert(port->type == NXT_PROCESS_APP);
4583
0
    nxt_assert(app->pending_processes != 0);
4584
4585
0
    app->pending_processes--;
4586
4587
0
    if (nxt_slow_path(restarted)) {
4588
0
        nxt_debug(task, "new port ready for restarted app, send QUIT");
4589
4590
0
        start_process = !task->thread->engine->shutdown
4591
0
                        && nxt_router_app_can_start(app)
4592
0
                        && nxt_router_app_need_start(app);
4593
4594
0
        if (start_process) {
4595
0
            app->pending_processes++;
4596
0
        }
4597
4598
0
        nxt_thread_mutex_unlock(&app->mutex);
4599
4600
0
        nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4601
4602
0
        if (start_process) {
4603
0
            nxt_router_start_app_process(task, app);
4604
0
        }
4605
4606
0
        return;
4607
0
    }
4608
4609
0
    port->app = app;
4610
0
    port->main_app_port = port;
4611
4612
0
    app->processes++;
4613
0
    nxt_port_hash_add(&app->port_hash, port);
4614
0
    app->port_hash_count++;
4615
4616
0
    nxt_thread_mutex_unlock(&app->mutex);
4617
4618
0
    nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4619
0
              &app->name, port->pid, app->processes, app->pending_processes);
4620
4621
0
    nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
4622
4623
0
    nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
4624
0
}
4625
4626
4627
static void
4628
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4629
    void *data)
4630
0
{
4631
0
    nxt_app_t            *app;
4632
0
    nxt_app_joint_t      *app_joint;
4633
0
    nxt_queue_link_t     *link;
4634
0
    nxt_http_request_t   *r;
4635
0
    nxt_app_joint_rpc_t  *app_joint_rpc;
4636
4637
0
    nxt_assert(data != NULL);
4638
4639
0
    app_joint_rpc = data;
4640
0
    app_joint = app_joint_rpc->app_joint;
4641
4642
0
    nxt_assert(app_joint != NULL);
4643
4644
0
    app = app_joint->app;
4645
4646
0
    nxt_router_app_joint_use(task, app_joint, -1);
4647
4648
0
    if (nxt_slow_path(app == NULL)) {
4649
0
        nxt_debug(task, "start error for released app");
4650
4651
0
        return;
4652
0
    }
4653
4654
0
    nxt_debug(task, "app '%V' %p start error", &app->name, app);
4655
4656
0
    link = NULL;
4657
4658
0
    nxt_thread_mutex_lock(&app->mutex);
4659
4660
0
    nxt_assert(app->pending_processes != 0);
4661
4662
0
    app->pending_processes--;
4663
4664
0
    if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4665
0
        link = nxt_queue_first(&app->ack_waiting_req);
4666
4667
0
        nxt_queue_remove(link);
4668
0
        link->next = NULL;
4669
0
    }
4670
4671
0
    nxt_thread_mutex_unlock(&app->mutex);
4672
4673
0
    while (link != NULL) {
4674
0
        r = nxt_container_of(link, nxt_http_request_t, app_link);
4675
4676
0
        nxt_event_engine_post(r->engine, &r->err_work);
4677
4678
0
        link = NULL;
4679
4680
0
        nxt_thread_mutex_lock(&app->mutex);
4681
4682
0
        if (app->processes == 0 && app->pending_processes == 0
4683
0
            && !nxt_queue_is_empty(&app->ack_waiting_req))
4684
0
        {
4685
0
            link = nxt_queue_first(&app->ack_waiting_req);
4686
4687
0
            nxt_queue_remove(link);
4688
0
            link->next = NULL;
4689
0
        }
4690
4691
0
        nxt_thread_mutex_unlock(&app->mutex);
4692
0
    }
4693
0
}
4694
4695
4696
nxt_inline nxt_port_t *
4697
nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4698
0
{
4699
0
    nxt_port_t  *port;
4700
4701
0
    port = NULL;
4702
4703
0
    nxt_thread_mutex_lock(&app->mutex);
4704
4705
0
    nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4706
4707
        /* Caller is responsible to decrease port use count. */
4708
0
        nxt_queue_chk_remove(&port->app_link);
4709
4710
0
        if (nxt_queue_chk_remove(&port->idle_link)) {
4711
0
            app->idle_processes--;
4712
4713
0
            nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4714
0
                      &app->name, port->pid, port->id,
4715
0
                      (port->idle_start ? "idle_ports" : "spare_ports"));
4716
0
        }
4717
4718
0
        nxt_port_hash_remove(&app->port_hash, port);
4719
0
        app->port_hash_count--;
4720
4721
0
        port->app = NULL;
4722
0
        app->processes--;
4723
4724
0
        break;
4725
4726
0
    } nxt_queue_loop;
4727
4728
0
    nxt_thread_mutex_unlock(&app->mutex);
4729
4730
0
    return port;
4731
0
}
4732
4733
4734
static void
4735
nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4736
0
{
4737
0
    int  c;
4738
4739
0
    c = nxt_atomic_fetch_add(&app->use_count, i);
4740
4741
0
    if (i < 0 && c == -i) {
4742
4743
0
        if (task->thread->engine != app->engine) {
4744
0
            nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4745
4746
0
        } else {
4747
0
            nxt_router_free_app(task, app->joint, NULL);
4748
0
        }
4749
0
    }
4750
0
}
4751
4752
4753
static void
4754
nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4755
0
{
4756
0
    nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4757
4758
0
    nxt_queue_remove(&app->link);
4759
4760
0
    nxt_router_app_use(task, app, -1);
4761
0
}
4762
4763
4764
static void
4765
nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4766
    nxt_apr_action_t action)
4767
0
{
4768
0
    int         inc_use;
4769
0
    uint32_t    got_response, dec_requests;
4770
0
    nxt_bool_t  adjust_idle_timer;
4771
0
    nxt_port_t  *main_app_port;
4772
4773
0
    nxt_assert(port != NULL);
4774
4775
0
    inc_use = 0;
4776
0
    got_response = 0;
4777
0
    dec_requests = 0;
4778
4779
0
    switch (action) {
4780
0
    case NXT_APR_NEW_PORT:
4781
0
        break;
4782
0
    case NXT_APR_REQUEST_FAILED:
4783
0
        dec_requests = 1;
4784
0
        inc_use = -1;
4785
0
        break;
4786
0
    case NXT_APR_GOT_RESPONSE:
4787
0
        got_response = 1;
4788
0
        inc_use = -1;
4789
0
        break;
4790
0
    case NXT_APR_UPGRADE:
4791
0
        got_response = 1;
4792
0
        break;
4793
0
    case NXT_APR_CLOSE:
4794
0
        inc_use = -1;
4795
0
        break;
4796
0
    }
4797
4798
0
    nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4799
0
              port->pid, port->id,
4800
0
              (int) inc_use, (int) got_response);
4801
4802
0
    if (port->id == NXT_SHARED_PORT_ID) {
4803
0
        nxt_thread_mutex_lock(&app->mutex);
4804
4805
0
        app->active_requests -= got_response + dec_requests;
4806
4807
0
        nxt_thread_mutex_unlock(&app->mutex);
4808
4809
0
        goto adjust_use;
4810
0
    }
4811
4812
0
    main_app_port = port->main_app_port;
4813
4814
0
    nxt_thread_mutex_lock(&app->mutex);
4815
4816
0
    main_app_port->active_requests -= got_response + dec_requests;
4817
0
    app->active_requests -= got_response + dec_requests;
4818
4819
0
    if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4820
0
        nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4821
4822
0
        nxt_port_inc_use(main_app_port);
4823
0
    }
4824
4825
0
    adjust_idle_timer = 0;
4826
4827
0
    if (main_app_port->pair[1] != -1
4828
0
        && main_app_port->active_requests == 0
4829
0
        && main_app_port->active_websockets == 0
4830
0
        && main_app_port->idle_link.next == NULL)
4831
0
    {
4832
0
        if (app->idle_processes == app->spare_processes
4833
0
            && app->adjust_idle_work.data == NULL)
4834
0
        {
4835
0
            adjust_idle_timer = 1;
4836
0
            app->adjust_idle_work.data = app;
4837
0
            app->adjust_idle_work.next = NULL;
4838
0
        }
4839
4840
0
        if (app->idle_processes < app->spare_processes) {
4841
0
            nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4842
4843
0
            nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4844
0
                      &app->name, main_app_port->pid, main_app_port->id);
4845
0
        } else {
4846
0
            nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4847
4848
0
            main_app_port->idle_start = task->thread->engine->timers.now;
4849
4850
0
            nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4851
0
                      &app->name, main_app_port->pid, main_app_port->id);
4852
0
        }
4853
4854
0
        app->idle_processes++;
4855
0
    }
4856
4857
0
    nxt_thread_mutex_unlock(&app->mutex);
4858
4859
0
    if (adjust_idle_timer) {
4860
0
        nxt_router_app_use(task, app, 1);
4861
0
        nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4862
0
    }
4863
4864
    /* ? */
4865
0
    if (main_app_port->pair[1] == -1) {
4866
0
        nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4867
0
                  &app->name, app, main_app_port, main_app_port->pid);
4868
4869
0
        goto adjust_use;
4870
0
    }
4871
4872
0
    nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4873
0
              &app->name, app);
4874
4875
0
adjust_use:
4876
4877
0
    nxt_port_use(task, port, inc_use);
4878
0
}
4879
4880
4881
void
4882
nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4883
0
{
4884
0
    nxt_app_t         *app;
4885
0
    nxt_bool_t        unchain, start_process;
4886
0
    nxt_port_t        *idle_port;
4887
0
    nxt_queue_link_t  *idle_lnk;
4888
4889
0
    app = port->app;
4890
4891
0
    nxt_assert(app != NULL);
4892
4893
0
    nxt_thread_mutex_lock(&app->mutex);
4894
4895
0
    if (port == app->proto_port) {
4896
0
        app->proto_port = NULL;
4897
0
        port->app = NULL;
4898
4899
0
        nxt_thread_mutex_unlock(&app->mutex);
4900
4901
0
        nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name,
4902
0
                  port->pid);
4903
4904
0
        nxt_port_use(task, port, -1);
4905
4906
0
        return;
4907
0
    }
4908
4909
0
    nxt_port_hash_remove(&app->port_hash, port);
4910
0
    app->port_hash_count--;
4911
4912
0
    if (port->id != 0) {
4913
0
        nxt_thread_mutex_unlock(&app->mutex);
4914
4915
0
        nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4916
0
                  port->pid, port->id);
4917
4918
0
        return;
4919
0
    }
4920
4921
0
    unchain = nxt_queue_chk_remove(&port->app_link);
4922
4923
0
    if (nxt_queue_chk_remove(&port->idle_link)) {
4924
0
        app->idle_processes--;
4925
4926
0
        nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4927
0
                  &app->name, port->pid, port->id,
4928
0
                  (port->idle_start ? "idle_ports" : "spare_ports"));
4929
4930
0
        if (port->idle_start == 0
4931
0
            && app->idle_processes >= app->spare_processes)
4932
0
        {
4933
0
            nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4934
4935
0
            idle_lnk = nxt_queue_last(&app->idle_ports);
4936
0
            idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4937
0
            nxt_queue_remove(idle_lnk);
4938
4939
0
            nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4940
4941
0
            idle_port->idle_start = 0;
4942
4943
0
            nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4944
0
                      "to spare_ports",
4945
0
                      &app->name, idle_port->pid, idle_port->id);
4946
0
        }
4947
0
    }
4948
4949
0
    app->processes--;
4950
4951
0
    start_process = !task->thread->engine->shutdown
4952
0
                    && nxt_router_app_can_start(app)
4953
0
                    && nxt_router_app_need_start(app);
4954
4955
0
    if (start_process) {
4956
0
        app->pending_processes++;
4957
0
    }
4958
4959
0
    nxt_thread_mutex_unlock(&app->mutex);
4960
4961
0
    nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4962
4963
0
    if (unchain) {
4964
0
        nxt_port_use(task, port, -1);
4965
0
    }
4966
4967
0
    if (start_process) {
4968
0
        nxt_router_start_app_process(task, app);
4969
0
    }
4970
0
}
4971
4972
4973
static void
4974
nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4975
0
{
4976
0
    nxt_app_t           *app;
4977
0
    nxt_bool_t          queued;
4978
0
    nxt_port_t          *port;
4979
0
    nxt_msec_t          timeout, threshold;
4980
0
    nxt_queue_link_t    *lnk;
4981
0
    nxt_event_engine_t  *engine;
4982
4983
0
    app = obj;
4984
0
    queued = (data == app);
4985
4986
0
    nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4987
0
              &app->name, queued);
4988
4989
0
    engine = task->thread->engine;
4990
4991
0
    nxt_assert(app->engine == engine);
4992
4993
0
    threshold = engine->timers.now + app->joint->idle_timer.bias;
4994
0
    timeout = 0;
4995
4996
0
    nxt_thread_mutex_lock(&app->mutex);
4997
4998
0
    if (queued) {
4999
0
        app->adjust_idle_work.data = NULL;
5000
0
    }
5001
5002
0
    nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
5003
0
              &app->name,
5004
0
              (int) app->idle_processes, (int) app->spare_processes);
5005
5006
0
    while (app->idle_processes > app->spare_processes) {
5007
5008
0
        nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
5009
5010
0
        lnk = nxt_queue_first(&app->idle_ports);
5011
0
        port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
5012
5013
0
        timeout = port->idle_start + app->idle_timeout;
5014
5015
0
        nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
5016
0
                  &app->name, port->pid,
5017
0
                  port->idle_start, timeout, threshold);
5018
5019
0
        if (timeout > threshold) {
5020
0
            break;
5021
0
        }
5022
5023
0
        nxt_queue_remove(lnk);
5024
0
        lnk->next = NULL;
5025
5026
0
        nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
5027
0
                  &app->name, port->pid, port->id);
5028
5029
0
        nxt_queue_chk_remove(&port->app_link);
5030
5031
0
        nxt_port_hash_remove(&app->port_hash, port);
5032
0
        app->port_hash_count--;
5033
5034
0
        app->idle_processes--;
5035
0
        app->processes--;
5036
0
        port->app = NULL;
5037
5038
0
        nxt_thread_mutex_unlock(&app->mutex);
5039
5040
0
        nxt_debug(task, "app '%V' send QUIT to idle port %PI",
5041
0
                  &app->name, port->pid);
5042
5043
0
        nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
5044
5045
0
        nxt_port_use(task, port, -1);
5046
5047
0
        nxt_thread_mutex_lock(&app->mutex);
5048
0
    }
5049
5050
0
    nxt_thread_mutex_unlock(&app->mutex);
5051
5052
0
    if (timeout > threshold) {
5053
0
        nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
5054
5055
0
    } else {
5056
0
        nxt_timer_disable(engine, &app->joint->idle_timer);
5057
0
    }
5058
5059
0
    if (queued) {
5060
0
        nxt_router_app_use(task, app, -1);
5061
0
    }
5062
0
}
5063
5064
5065
static void
5066
nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
5067
0
{
5068
0
    nxt_timer_t      *timer;
5069
0
    nxt_app_joint_t  *app_joint;
5070
5071
0
    timer = obj;
5072
0
    app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5073
5074
0
    if (nxt_fast_path(app_joint->app != NULL)) {
5075
0
        nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
5076
0
    }
5077
0
}
5078
5079
5080
static void
5081
nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
5082
0
{
5083
0
    nxt_timer_t      *timer;
5084
0
    nxt_app_joint_t  *app_joint;
5085
5086
0
    timer = obj;
5087
0
    app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5088
5089
0
    nxt_router_app_joint_use(task, app_joint, -1);
5090
0
}
5091
5092
5093
static void
5094
nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
5095
0
{
5096
0
    nxt_app_t        *app;
5097
0
    nxt_port_t       *port, *proto_port;
5098
0
    nxt_app_joint_t  *app_joint;
5099
5100
0
    app_joint = obj;
5101
0
    app = app_joint->app;
5102
5103
0
    for ( ;; ) {
5104
0
        port = nxt_router_app_get_port_for_quit(task, app);
5105
0
        if (port == NULL) {
5106
0
            break;
5107
0
        }
5108
5109
0
        nxt_port_use(task, port, -1);
5110
0
    }
5111
5112
0
    nxt_thread_mutex_lock(&app->mutex);
5113
5114
0
    for ( ;; ) {
5115
0
        port = nxt_port_hash_retrieve(&app->port_hash);
5116
0
        if (port == NULL) {
5117
0
            break;
5118
0
        }
5119
5120
0
        app->port_hash_count--;
5121
5122
0
        port->app = NULL;
5123
5124
0
        nxt_port_close(task, port);
5125
5126
0
        nxt_port_use(task, port, -1);
5127
0
    }
5128
5129
0
    proto_port = app->proto_port;
5130
5131
0
    if (proto_port != NULL) {
5132
0
        nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
5133
0
                  proto_port->pid);
5134
5135
0
        app->proto_port = NULL;
5136
0
        proto_port->app = NULL;
5137
0
    }
5138
5139
0
    nxt_thread_mutex_unlock(&app->mutex);
5140
5141
0
    if (proto_port != NULL) {
5142
0
        nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
5143
0
                              -1, 0, 0, NULL);
5144
5145
0
        nxt_port_close(task, proto_port);
5146
5147
0
        nxt_port_use(task, proto_port, -1);
5148
0
    }
5149
5150
0
    nxt_assert(app->proto_port == NULL);
5151
0
    nxt_assert(app->processes == 0);
5152
0
    nxt_assert(app->active_requests == 0);
5153
0
    nxt_assert(app->port_hash_count == 0);
5154
0
    nxt_assert(app->idle_processes == 0);
5155
0
    nxt_assert(nxt_queue_is_empty(&app->ports));
5156
0
    nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5157
0
    nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5158
5159
0
    nxt_port_mmaps_destroy(&app->outgoing, 1);
5160
5161
0
    nxt_thread_mutex_destroy(&app->outgoing.mutex);
5162
5163
0
    if (app->shared_port != NULL) {
5164
0
        app->shared_port->app = NULL;
5165
0
        nxt_port_close(task, app->shared_port);
5166
0
        nxt_port_use(task, app->shared_port, -1);
5167
5168
0
        app->shared_port = NULL;
5169
0
    }
5170
5171
0
    nxt_thread_mutex_destroy(&app->mutex);
5172
0
    nxt_mp_destroy(app->mem_pool);
5173
5174
0
    app_joint->app = NULL;
5175
5176
0
    if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5177
0
        app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5178
0
        nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5179
5180
0
    } else {
5181
0
        nxt_router_app_joint_use(task, app_joint, -1);
5182
0
    }
5183
0
}
5184
5185
5186
static void
5187
nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5188
    nxt_request_rpc_data_t *req_rpc_data)
5189
0
{
5190
0
    nxt_bool_t          start_process;
5191
0
    nxt_port_t          *port;
5192
0
    nxt_http_request_t  *r;
5193
5194
0
    start_process = 0;
5195
5196
0
    nxt_thread_mutex_lock(&app->mutex);
5197
5198
0
    port = app->shared_port;
5199
0
    nxt_port_inc_use(port);
5200
5201
0
    app->active_requests++;
5202
5203
0
    if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5204
0
        app->pending_processes++;
5205
0
        start_process = 1;
5206
0
    }
5207
5208
0
    r = req_rpc_data->request;
5209
5210
    /*
5211
     * Put request into application-wide list to be able to cancel request
5212
     * if something goes wrong with application processes.
5213
     */
5214
0
    nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5215
5216
0
    nxt_thread_mutex_unlock(&app->mutex);
5217
5218
    /*
5219
     * Retain request memory pool while request is linked in ack_waiting_req
5220
     * to guarantee request structure memory is accessble.
5221
     */
5222
0
    nxt_mp_retain(r->mem_pool);
5223
5224
0
    req_rpc_data->app_port = port;
5225
0
    req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5226
5227
0
    if (start_process) {
5228
0
        nxt_router_start_app_process(task, app);
5229
0
    }
5230
0
}
5231
5232
5233
void
5234
nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5235
    nxt_http_action_t *action)
5236
0
{
5237
0
    nxt_event_engine_t      *engine;
5238
0
    nxt_http_app_conf_t     *conf;
5239
0
    nxt_request_rpc_data_t  *req_rpc_data;
5240
5241
0
    conf = action->u.conf;
5242
0
    engine = task->thread->engine;
5243
5244
0
    r->app_target = conf->target;
5245
5246
0
    req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5247
0
                                          nxt_router_response_ready_handler,
5248
0
                                          nxt_router_response_error_handler,
5249
0
                                          sizeof(nxt_request_rpc_data_t));
5250
0
    if (nxt_slow_path(req_rpc_data == NULL)) {
5251
0
        nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5252
0
        return;
5253
0
    }
5254
5255
    /*
5256
     * At this point we have request req_rpc_data allocated and registered
5257
     * in port handlers.  Need to fixup request memory pool.  Counterpart
5258
     * release will be called via following call chain:
5259
     *    nxt_request_rpc_data_unlink() ->
5260
     *        nxt_router_http_request_release_post() ->
5261
     *            nxt_router_http_request_release()
5262
     */
5263
0
    nxt_mp_retain(r->mem_pool);
5264
5265
0
    r->timer.task = &engine->task;
5266
0
    r->timer.work_queue = &engine->fast_work_queue;
5267
0
    r->timer.log = engine->task.log;
5268
0
    r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5269
5270
0
    r->engine = engine;
5271
0
    r->err_work.handler = nxt_router_http_request_error;
5272
0
    r->err_work.task = task;
5273
0
    r->err_work.obj = r;
5274
5275
0
    req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5276
0
    req_rpc_data->app = conf->app;
5277
0
    req_rpc_data->msg_info.body_fd = -1;
5278
0
    req_rpc_data->rpc_cancel = 1;
5279
5280
0
    nxt_router_app_use(task, conf->app, 1);
5281
5282
0
    req_rpc_data->request = r;
5283
0
    r->req_rpc_data = req_rpc_data;
5284
5285
0
    if (r->last != NULL) {
5286
0
        r->last->completion_handler = nxt_router_http_request_done;
5287
0
    }
5288
5289
0
    nxt_router_app_port_get(task, conf->app, req_rpc_data);
5290
0
    nxt_router_app_prepare_request(task, req_rpc_data);
5291
0
}
5292
5293
5294
static void
5295
nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5296
0
{
5297
0
    nxt_http_request_t  *r;
5298
5299
0
    r = obj;
5300
5301
0
    nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5302
5303
0
    nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5304
5305
0
    if (r->req_rpc_data != NULL) {
5306
0
        nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5307
0
    }
5308
5309
0
    nxt_mp_release(r->mem_pool);
5310
0
}
5311
5312
5313
static void
5314
nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5315
0
{
5316
0
    nxt_http_request_t  *r;
5317
5318
0
    r = data;
5319
5320
0
    nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5321
5322
0
    if (r->req_rpc_data != NULL) {
5323
0
        nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5324
0
    }
5325
5326
0
    nxt_http_request_close_handler(task, r, r->proto.any);
5327
0
}
5328
5329
5330
static void
5331
nxt_router_app_prepare_request(nxt_task_t *task,
5332
    nxt_request_rpc_data_t *req_rpc_data)
5333
0
{
5334
0
    nxt_app_t         *app;
5335
0
    nxt_buf_t         *buf, *body;
5336
0
    nxt_int_t         res;
5337
0
    nxt_port_t        *port, *reply_port;
5338
5339
0
    int                   notify;
5340
0
    struct {
5341
0
        nxt_port_msg_t       pm;
5342
0
        nxt_port_mmap_msg_t  mm;
5343
0
    } msg;
5344
5345
5346
0
    app = req_rpc_data->app;
5347
5348
0
    nxt_assert(app != NULL);
5349
5350
0
    port = req_rpc_data->app_port;
5351
5352
0
    nxt_assert(port != NULL);
5353
0
    nxt_assert(port->queue != NULL);
5354
5355
0
    reply_port = task->thread->engine->port;
5356
5357
0
    buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5358
0
                                 nxt_app_msg_prefix[app->type]);
5359
0
    if (nxt_slow_path(buf == NULL)) {
5360
0
        nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5361
0
                  req_rpc_data->stream, &app->name);
5362
5363
0
        nxt_http_request_error(task, req_rpc_data->request,
5364
0
                               NXT_HTTP_INTERNAL_SERVER_ERROR);
5365
5366
0
        return;
5367
0
    }
5368
5369
0
    nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5370
0
                    nxt_buf_used_size(buf),
5371
0
                    port->socket.fd);
5372
5373
0
    req_rpc_data->msg_info.buf = buf;
5374
5375
0
    body = req_rpc_data->request->body;
5376
5377
0
    if (body != NULL && nxt_buf_is_file(body)) {
5378
0
        req_rpc_data->msg_info.body_fd = body->file->fd;
5379
5380
0
        body->file->fd = -1;
5381
5382
0
    } else {
5383
0
        req_rpc_data->msg_info.body_fd = -1;
5384
0
    }
5385
5386
0
    msg.pm.stream = req_rpc_data->stream;
5387
0
    msg.pm.pid = reply_port->pid;
5388
0
    msg.pm.reply_port = reply_port->id;
5389
0
    msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5390
0
    msg.pm.last = 0;
5391
0
    msg.pm.mmap = 1;
5392
0
    msg.pm.nf = 0;
5393
0
    msg.pm.mf = 0;
5394
5395
0
    nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5396
0
    nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5397
5398
0
    msg.mm.mmap_id = hdr->id;
5399
0
    msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5400
0
    msg.mm.size = nxt_buf_used_size(buf);
5401
5402
0
    res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5403
0
                             req_rpc_data->stream, &notify,
5404
0
                             &req_rpc_data->msg_info.tracking_cookie);
5405
0
    if (nxt_fast_path(res == NXT_OK)) {
5406
0
        if (notify != 0) {
5407
0
            (void) nxt_port_socket_write(task, port,
5408
0
                                         NXT_PORT_MSG_READ_QUEUE,
5409
0
                                         -1, req_rpc_data->stream,
5410
0
                                         reply_port->id, NULL);
5411
5412
0
        } else {
5413
0
            nxt_debug(task, "queue is not empty");
5414
0
        }
5415
5416
0
        buf->is_port_mmap_sent = 1;
5417
0
        buf->mem.pos = buf->mem.free;
5418
5419
0
    } else {
5420
0
        nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5421
0
                  req_rpc_data->stream, &app->name);
5422
5423
0
        nxt_http_request_error(task, req_rpc_data->request,
5424
0
                               NXT_HTTP_INTERNAL_SERVER_ERROR);
5425
0
    }
5426
0
}
5427
5428
5429
struct nxt_fields_iter_s {
5430
    nxt_list_part_t   *part;
5431
    nxt_http_field_t  *field;
5432
};
5433
5434
typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5435
5436
5437
static nxt_http_field_t *
5438
nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5439
0
{
5440
0
    if (part == NULL) {
5441
0
        return NULL;
5442
0
    }
5443
5444
0
    while (part->nelts == 0) {
5445
0
        part = part->next;
5446
0
        if (part == NULL) {
5447
0
            return NULL;
5448
0
        }
5449
0
    }
5450
5451
0
    i->part = part;
5452
0
    i->field = nxt_list_data(i->part);
5453
5454
0
    return i->field;
5455
0
}
5456
5457
5458
static nxt_http_field_t *
5459
nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5460
0
{
5461
0
    return nxt_fields_part_first(nxt_list_part(fields), i);
5462
0
}
5463
5464
5465
static nxt_http_field_t *
5466
nxt_fields_next(nxt_fields_iter_t *i)
5467
0
{
5468
0
    nxt_http_field_t  *end = nxt_list_data(i->part);
5469
5470
0
    end += i->part->nelts;
5471
0
    i->field++;
5472
5473
0
    if (i->field < end) {
5474
0
        return i->field;
5475
0
    }
5476
5477
0
    return nxt_fields_part_first(i->part->next, i);
5478
0
}
5479
5480
5481
static nxt_buf_t *
5482
nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5483
    nxt_app_t *app, const nxt_str_t *prefix)
5484
0
{
5485
0
    void                *target_pos, *query_pos;
5486
0
    u_char              *pos, *end, *p, c;
5487
0
    size_t              fields_count, req_size, size, free_size;
5488
0
    size_t              copy_size;
5489
0
    nxt_off_t           content_length;
5490
0
    nxt_buf_t           *b, *buf, *out, **tail;
5491
0
    nxt_http_field_t    *field, *dup;
5492
0
    nxt_unit_field_t    *dst_field;
5493
0
    nxt_fields_iter_t   iter, dup_iter;
5494
0
    nxt_unit_request_t  *req;
5495
5496
0
    req_size = sizeof(nxt_unit_request_t)
5497
0
               + r->method->length + 1
5498
0
               + r->version.length + 1
5499
0
               + r->remote->address_length + 1
5500
0
               + r->local->address_length + 1
5501
0
               + nxt_sockaddr_port_length(r->local) + 1
5502
0
               + r->server_name.length + 1
5503
0
               + r->target.length + 1
5504
0
               + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5505
5506
0
    content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5507
0
    fields_count = 0;
5508
5509
0
    nxt_list_each(field, r->fields) {
5510
0
        fields_count++;
5511
5512
0
        req_size += field->name_length + prefix->length + 1
5513
0
                    + field->value_length + 1;
5514
0
    } nxt_list_loop;
5515
5516
0
    req_size += fields_count * sizeof(nxt_unit_field_t);
5517
5518
0
    if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5519
0
        nxt_alert(task, "headers to big to fit in shared memory (%d)",
5520
0
                  (int) req_size);
5521
5522
0
        return NULL;
5523
0
    }
5524
5525
0
    out = nxt_port_mmap_get_buf(task, &app->outgoing,
5526
0
              nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5527
0
    if (nxt_slow_path(out == NULL)) {
5528
0
        return NULL;
5529
0
    }
5530
5531
0
    req = (nxt_unit_request_t *) out->mem.free;
5532
0
    out->mem.free += req_size;
5533
5534
0
    req->app_target = r->app_target;
5535
5536
0
    req->content_length = content_length;
5537
5538
0
    p = (u_char *) (req->fields + fields_count);
5539
5540
0
    nxt_debug(task, "fields_count=%d", (int) fields_count);
5541
5542
0
    req->method_length = r->method->length;
5543
0
    nxt_unit_sptr_set(&req->method, p);
5544
0
    p = nxt_cpymem(p, r->method->start, r->method->length);
5545
0
    *p++ = '\0';
5546
5547
0
    req->version_length = r->version.length;
5548
0
    nxt_unit_sptr_set(&req->version, p);
5549
0
    p = nxt_cpymem(p, r->version.start, r->version.length);
5550
0
    *p++ = '\0';
5551
5552
0
    req->remote_length = r->remote->address_length;
5553
0
    nxt_unit_sptr_set(&req->remote, p);
5554
0
    p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5555
0
                   r->remote->address_length);
5556
0
    *p++ = '\0';
5557
5558
0
    req->local_addr_length = r->local->address_length;
5559
0
    nxt_unit_sptr_set(&req->local_addr, p);
5560
0
    p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5561
0
    *p++ = '\0';
5562
5563
0
    req->local_port_length = nxt_sockaddr_port_length(r->local);
5564
0
    nxt_unit_sptr_set(&req->local_port, p);
5565
0
    p = nxt_cpymem(p, nxt_sockaddr_port(r->local),
5566
0
                   nxt_sockaddr_port_length(r->local));
5567
0
    *p++ = '\0';
5568
5569
0
    req->tls = r->tls;
5570
0
    req->websocket_handshake = r->websocket_handshake;
5571
5572
0
    req->server_name_length = r->server_name.length;
5573
0
    nxt_unit_sptr_set(&req->server_name, p);
5574
0
    p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5575
0
    *p++ = '\0';
5576
5577
0
    target_pos = p;
5578
0
    req->target_length = (uint32_t) r->target.length;
5579
0
    nxt_unit_sptr_set(&req->target, p);
5580
0
    p = nxt_cpymem(p, r->target.start, r->target.length);
5581
0
    *p++ = '\0';
5582
5583
0
    req->path_length = (uint32_t) r->path->length;
5584
0
    if (r->path->start == r->target.start) {
5585
0
        nxt_unit_sptr_set(&req->path, target_pos);
5586
5587
0
    } else {
5588
0
        nxt_unit_sptr_set(&req->path, p);
5589
0
        p = nxt_cpymem(p, r->path->start, r->path->length);
5590
0
        *p++ = '\0';
5591
0
    }
5592
5593
0
    req->query_length = (uint32_t) r->args->length;
5594
0
    if (r->args->start != NULL) {
5595
0
        query_pos = nxt_pointer_to(target_pos,
5596
0
                                   r->args->start - r->target.start);
5597
5598
0
        nxt_unit_sptr_set(&req->query, query_pos);
5599
5600
0
    } else {
5601
0
        req->query.offset = 0;
5602
0
    }
5603
5604
0
    req->content_length_field = NXT_UNIT_NONE_FIELD;
5605
0
    req->content_type_field   = NXT_UNIT_NONE_FIELD;
5606
0
    req->cookie_field         = NXT_UNIT_NONE_FIELD;
5607
0
    req->authorization_field  = NXT_UNIT_NONE_FIELD;
5608
5609
0
    dst_field = req->fields;
5610
5611
0
    for (field = nxt_fields_first(r->fields, &iter);
5612
0
         field != NULL;
5613
0
         field = nxt_fields_next(&iter))
5614
0
    {
5615
0
        if (field->skip) {
5616
0
            continue;
5617
0
        }
5618
5619
0
        dst_field->hash = field->hash;
5620
0
        dst_field->skip = 0;
5621
0
        dst_field->name_length = field->name_length + prefix->length;
5622
0
        dst_field->value_length = field->value_length;
5623
5624
0
        if (field == r->content_length) {
5625
0
            req->content_length_field = dst_field - req->fields;
5626
5627
0
        } else if (field == r->content_type) {
5628
0
            req->content_type_field = dst_field - req->fields;
5629
5630
0
        } else if (field == r->cookie) {
5631
0
            req->cookie_field = dst_field - req->fields;
5632
5633
0
        } else if (field == r->authorization) {
5634
0
            req->authorization_field = dst_field - req->fields;
5635
0
        }
5636
5637
0
        nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5638
0
                  (int) field->hash, (int) field->skip,
5639
0
                  (int) field->name_length, field->name,
5640
0
                  (int) field->value_length, field->value);
5641
5642
0
        if (prefix->length != 0) {
5643
0
            nxt_unit_sptr_set(&dst_field->name, p);
5644
0
            p = nxt_cpymem(p, prefix->start, prefix->length);
5645
5646
0
            end = field->name + field->name_length;
5647
0
            for (pos = field->name; pos < end; pos++) {
5648
0
                c = *pos;
5649
5650
0
                if (c >= 'a' && c <= 'z') {
5651
0
                    *p++ = (c & ~0x20);
5652
0
                    continue;
5653
0
                }
5654
5655
0
                if (c == '-') {
5656
0
                    *p++ = '_';
5657
0
                    continue;
5658
0
                }
5659
5660
0
                *p++ = c;
5661
0
            }
5662
5663
0
        } else {
5664
0
            nxt_unit_sptr_set(&dst_field->name, p);
5665
0
            p = nxt_cpymem(p, field->name, field->name_length);
5666
0
        }
5667
5668
0
        *p++ = '\0';
5669
5670
0
        nxt_unit_sptr_set(&dst_field->value, p);
5671
0
        p = nxt_cpymem(p, field->value, field->value_length);
5672
5673
0
        if (prefix->length != 0) {
5674
0
            dup_iter = iter;
5675
5676
0
            for (dup = nxt_fields_next(&dup_iter);
5677
0
                 dup != NULL;
5678
0
                 dup = nxt_fields_next(&dup_iter))
5679
0
            {
5680
0
                if (dup->name_length != field->name_length
5681
0
                    || dup->skip
5682
0
                    || dup->hash != field->hash
5683
0
                    || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5684
0
                {
5685
0
                    continue;
5686
0
                }
5687
5688
0
                p = nxt_cpymem(p, ", ", 2);
5689
0
                p = nxt_cpymem(p, dup->value, dup->value_length);
5690
5691
0
                dst_field->value_length += 2 + dup->value_length;
5692
5693
0
                dup->skip = 1;
5694
0
            }
5695
0
        }
5696
5697
0
        *p++ = '\0';
5698
5699
0
        dst_field++;
5700
0
    }
5701
5702
0
    req->fields_count = (uint32_t) (dst_field - req->fields);
5703
5704
0
    nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5705
5706
0
    buf = out;
5707
0
    tail = &buf->next;
5708
5709
0
    for (b = r->body; b != NULL; b = b->next) {
5710
0
        size = nxt_buf_mem_used_size(&b->mem);
5711
0
        pos = b->mem.pos;
5712
5713
0
        while (size > 0) {
5714
0
            if (buf == NULL) {
5715
0
                free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5716
5717
0
                buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5718
0
                if (nxt_slow_path(buf == NULL)) {
5719
0
                    while (out != NULL) {
5720
0
                        buf = out->next;
5721
0
                        out->next = NULL;
5722
0
                        out->completion_handler(task, out, out->parent);
5723
0
                        out = buf;
5724
0
                    }
5725
0
                    return NULL;
5726
0
                }
5727
5728
0
                *tail = buf;
5729
0
                tail = &buf->next;
5730
5731
0
            } else {
5732
0
                free_size = nxt_buf_mem_free_size(&buf->mem);
5733
0
                if (free_size < size
5734
0
                    && nxt_port_mmap_increase_buf(task, buf, size, 1)
5735
0
                       == NXT_OK)
5736
0
                {
5737
0
                    free_size = nxt_buf_mem_free_size(&buf->mem);
5738
0
                }
5739
0
            }
5740
5741
0
            if (free_size > 0) {
5742
0
                copy_size = nxt_min(free_size, size);
5743
5744
0
                buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5745
5746
0
                size -= copy_size;
5747
0
                pos += copy_size;
5748
5749
0
                if (size == 0) {
5750
0
                    break;
5751
0
                }
5752
0
            }
5753
5754
0
            buf = NULL;
5755
0
        }
5756
0
    }
5757
5758
0
    return out;
5759
0
}
5760
5761
5762
static void
5763
nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5764
0
{
5765
0
    nxt_timer_t              *timer;
5766
0
    nxt_http_request_t       *r;
5767
0
    nxt_request_rpc_data_t   *req_rpc_data;
5768
5769
0
    timer = obj;
5770
5771
0
    nxt_debug(task, "router app timeout");
5772
5773
0
    r = nxt_timer_data(timer, nxt_http_request_t, timer);
5774
0
    req_rpc_data = r->timer_data;
5775
5776
0
    nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5777
5778
0
    nxt_request_rpc_data_unlink(task, req_rpc_data);
5779
0
}
5780
5781
5782
static void
5783
nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5784
0
{
5785
0
    r->timer.handler = nxt_router_http_request_release;
5786
0
    nxt_timer_add(task->thread->engine, &r->timer, 0);
5787
0
}
5788
5789
5790
static void
5791
nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5792
0
{
5793
0
    nxt_http_request_t  *r;
5794
5795
0
    nxt_debug(task, "http request pool release");
5796
5797
0
    r = nxt_timer_data(obj, nxt_http_request_t, timer);
5798
5799
0
    nxt_mp_release(r->mem_pool);
5800
0
}
5801
5802
5803
static void
5804
nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5805
0
{
5806
0
    size_t                   mi;
5807
0
    uint32_t                 i;
5808
0
    nxt_bool_t               ack;
5809
0
    nxt_process_t            *process;
5810
0
    nxt_free_map_t           *m;
5811
0
    nxt_port_mmap_handler_t  *mmap_handler;
5812
5813
0
    nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5814
5815
0
    process = nxt_runtime_process_find(task->thread->runtime,
5816
0
                                       msg->port_msg.pid);
5817
0
    if (nxt_slow_path(process == NULL)) {
5818
0
        return;
5819
0
    }
5820
5821
0
    ack = 0;
5822
5823
    /*
5824
     * To mitigate possible racing condition (when OOSM message received
5825
     * after some of the memory was already freed), need to try to find
5826
     * first free segment in shared memory and send ACK if found.
5827
     */
5828
5829
0
    nxt_thread_mutex_lock(&process->incoming.mutex);
5830
5831
0
    for (i = 0; i < process->incoming.size; i++) {
5832
0
        mmap_handler = process->incoming.elts[i].mmap_handler;
5833
5834
0
        if (nxt_slow_path(mmap_handler == NULL)) {
5835
0
            continue;
5836
0
        }
5837
5838
0
        m = mmap_handler->hdr->free_map;
5839
5840
0
        for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5841
0
            if (m[mi] != 0) {
5842
0
                ack = 1;
5843
5844
0
                nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5845
0
                          i, mi, m[mi]);
5846
5847
0
                break;
5848
0
            }
5849
0
        }
5850
0
    }
5851
5852
0
    nxt_thread_mutex_unlock(&process->incoming.mutex);
5853
5854
0
    if (ack) {
5855
0
        nxt_process_broadcast_shm_ack(task, process);
5856
0
    }
5857
0
}
5858
5859
5860
static void
5861
nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5862
0
{
5863
0
    nxt_fd_t                 fd;
5864
0
    nxt_port_t               *port;
5865
0
    nxt_runtime_t            *rt;
5866
0
    nxt_port_mmaps_t         *mmaps;
5867
0
    nxt_port_msg_get_mmap_t  *get_mmap_msg;
5868
0
    nxt_port_mmap_handler_t  *mmap_handler;
5869
5870
0
    rt = task->thread->runtime;
5871
5872
0
    port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5873
0
                                 msg->port_msg.reply_port);
5874
0
    if (nxt_slow_path(port == NULL)) {
5875
0
        nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5876
0
                  msg->port_msg.pid, msg->port_msg.reply_port);
5877
5878
0
        return;
5879
0
    }
5880
5881
0
    if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5882
0
                      < (int) sizeof(nxt_port_msg_get_mmap_t)))
5883
0
    {
5884
0
        nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5885
0
                  (int) nxt_buf_used_size(msg->buf));
5886
5887
0
        return;
5888
0
    }
5889
5890
0
    get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5891
5892
0
    nxt_assert(port->type == NXT_PROCESS_APP);
5893
5894
0
    if (nxt_slow_path(port->app == NULL)) {
5895
0
        nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5896
0
                  port->pid, port->id);
5897
5898
        // FIXME
5899
0
        nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5900
0
                              -1, msg->port_msg.stream, 0, NULL);
5901
5902
0
        return;
5903
0
    }
5904
5905
0
    mmaps = &port->app->outgoing;
5906
0
    nxt_thread_mutex_lock(&mmaps->mutex);
5907
5908
0
    if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5909
0
        nxt_thread_mutex_unlock(&mmaps->mutex);
5910
5911
0
        nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5912
0
                  (int) get_mmap_msg->id);
5913
5914
        // FIXME
5915
0
        nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5916
0
                              -1, msg->port_msg.stream, 0, NULL);
5917
0
        return;
5918
0
    }
5919
5920
0
    mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5921
5922
0
    fd = mmap_handler->fd;
5923
5924
0
    nxt_thread_mutex_unlock(&mmaps->mutex);
5925
5926
0
    nxt_debug(task, "get mmap %PI:%d found",
5927
0
              msg->port_msg.pid, (int) get_mmap_msg->id);
5928
5929
0
    (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5930
0
}
5931
5932
5933
static void
5934
nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5935
0
{
5936
0
    nxt_port_t               *port, *reply_port;
5937
0
    nxt_runtime_t            *rt;
5938
0
    nxt_port_msg_get_port_t  *get_port_msg;
5939
5940
0
    rt = task->thread->runtime;
5941
5942
0
    reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5943
0
                                       msg->port_msg.reply_port);
5944
0
    if (nxt_slow_path(reply_port == NULL)) {
5945
0
        nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5946
0
                  msg->port_msg.pid, msg->port_msg.reply_port);
5947
5948
0
        return;
5949
0
    }
5950
5951
0
    if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5952
0
                      < (int) sizeof(nxt_port_msg_get_port_t)))
5953
0
    {
5954
0
        nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5955
0
                  (int) nxt_buf_used_size(msg->buf));
5956
5957
0
        return;
5958
0
    }
5959
5960
0
    get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5961
5962
0
    port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5963
0
    if (nxt_slow_path(port == NULL)) {
5964
0
        nxt_alert(task, "get_port_handler: port %PI:%d not found",
5965
0
                  get_port_msg->pid, get_port_msg->id);
5966
5967
0
        return;
5968
0
    }
5969
5970
0
    nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5971
0
              get_port_msg->id);
5972
5973
0
    (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5974
0
}