/src/unit/src/nxt_http_websocket.c
Line | Count | Source (jump to first uncovered line) |
1 | | |
2 | | /* |
3 | | * Copyright (C) NGINX, Inc. |
4 | | */ |
5 | | |
6 | | #include <nxt_main.h> |
7 | | #include <nxt_router.h> |
8 | | #include <nxt_http.h> |
9 | | #include <nxt_router_request.h> |
10 | | #include <nxt_port_memory_int.h> |
11 | | #include <nxt_websocket.h> |
12 | | #include <nxt_websocket_header.h> |
13 | | |
14 | | |
15 | | static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data); |
16 | | static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, |
17 | | void *data); |
18 | | |
19 | | |
20 | | const nxt_http_request_state_t nxt_http_websocket |
21 | | nxt_aligned(64) = |
22 | | { |
23 | | .ready_handler = nxt_http_websocket_client, |
24 | | .error_handler = nxt_http_websocket_error_handler, |
25 | | }; |
26 | | |
27 | | |
28 | | static void |
29 | | nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) |
30 | 0 | { |
31 | 0 | size_t frame_size, used_size, copy_size, buf_free_size; |
32 | 0 | size_t chunk_copy_size; |
33 | 0 | nxt_buf_t *out, *buf, **out_tail, *b, *next; |
34 | 0 | nxt_int_t res; |
35 | 0 | nxt_http_request_t *r; |
36 | 0 | nxt_request_rpc_data_t *req_rpc_data; |
37 | 0 | nxt_websocket_header_t *wsh; |
38 | |
|
39 | 0 | r = obj; |
40 | 0 | req_rpc_data = r->req_rpc_data; |
41 | |
|
42 | 0 | if (nxt_slow_path(req_rpc_data == NULL)) { |
43 | 0 | nxt_debug(task, "websocket client frame for destroyed request"); |
44 | |
|
45 | 0 | return; |
46 | 0 | } |
47 | | |
48 | 0 | nxt_debug(task, "http websocket client frame"); |
49 | |
|
50 | 0 | wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; |
51 | |
|
52 | 0 | frame_size = nxt_websocket_frame_header_size(wsh) |
53 | 0 | + nxt_websocket_frame_payload_len(wsh); |
54 | |
|
55 | 0 | buf = NULL; |
56 | 0 | buf_free_size = 0; |
57 | 0 | out = NULL; |
58 | 0 | out_tail = &out; |
59 | |
|
60 | 0 | b = r->ws_frame; |
61 | |
|
62 | 0 | while (b != NULL && frame_size > 0) { |
63 | 0 | used_size = nxt_buf_mem_used_size(&b->mem); |
64 | 0 | copy_size = nxt_min(used_size, frame_size); |
65 | |
|
66 | 0 | while (copy_size > 0) { |
67 | 0 | if (buf == NULL || buf_free_size == 0) { |
68 | 0 | buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); |
69 | |
|
70 | 0 | buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing, |
71 | 0 | buf_free_size); |
72 | |
|
73 | 0 | *out_tail = buf; |
74 | 0 | out_tail = &buf->next; |
75 | 0 | } |
76 | |
|
77 | 0 | chunk_copy_size = nxt_min(buf_free_size, copy_size); |
78 | |
|
79 | 0 | buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos, |
80 | 0 | chunk_copy_size); |
81 | |
|
82 | 0 | copy_size -= chunk_copy_size; |
83 | 0 | b->mem.pos += chunk_copy_size; |
84 | 0 | buf_free_size -= chunk_copy_size; |
85 | 0 | } |
86 | |
|
87 | 0 | frame_size -= copy_size; |
88 | 0 | next = b->next; |
89 | 0 | b->next = NULL; |
90 | |
|
91 | 0 | if (nxt_buf_mem_used_size(&b->mem) == 0) { |
92 | 0 | nxt_work_queue_add(&task->thread->engine->fast_work_queue, |
93 | 0 | b->completion_handler, task, b, b->parent); |
94 | |
|
95 | 0 | r->ws_frame = next; |
96 | 0 | } |
97 | |
|
98 | 0 | b = next; |
99 | 0 | } |
100 | |
|
101 | 0 | res = nxt_port_socket_write(task, req_rpc_data->app_port, |
102 | 0 | NXT_PORT_MSG_WEBSOCKET, -1, |
103 | 0 | req_rpc_data->stream, |
104 | 0 | task->thread->engine->port->id, out); |
105 | 0 | if (nxt_slow_path(res != NXT_OK)) { |
106 | | // TODO: handle |
107 | 0 | } |
108 | |
|
109 | 0 | b = r->ws_frame; |
110 | |
|
111 | 0 | if (b != NULL) { |
112 | 0 | used_size = nxt_buf_mem_used_size(&b->mem); |
113 | |
|
114 | 0 | if (used_size > 0) { |
115 | 0 | nxt_memmove(b->mem.start, b->mem.pos, used_size); |
116 | |
|
117 | 0 | b->mem.pos = b->mem.start; |
118 | 0 | b->mem.free = b->mem.start + used_size; |
119 | 0 | } |
120 | 0 | } |
121 | |
|
122 | 0 | nxt_http_request_ws_frame_start(task, r, r->ws_frame); |
123 | 0 | } |
124 | | |
125 | | |
126 | | static void |
127 | | nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data) |
128 | 0 | { |
129 | 0 | nxt_http_request_t *r; |
130 | 0 | nxt_request_rpc_data_t *req_rpc_data; |
131 | |
|
132 | 0 | nxt_debug(task, "http websocket error handler"); |
133 | |
|
134 | 0 | r = obj; |
135 | 0 | req_rpc_data = r->req_rpc_data; |
136 | |
|
137 | 0 | if (req_rpc_data == NULL) { |
138 | 0 | nxt_debug(task, " req_rpc_data is NULL"); |
139 | 0 | goto close_handler; |
140 | 0 | } |
141 | | |
142 | 0 | if (req_rpc_data->app_port == NULL) { |
143 | 0 | nxt_debug(task, " app_port is NULL"); |
144 | 0 | goto close_handler; |
145 | 0 | } |
146 | | |
147 | 0 | (void) nxt_port_socket_write(task, req_rpc_data->app_port, |
148 | 0 | NXT_PORT_MSG_WEBSOCKET_LAST, |
149 | 0 | -1, req_rpc_data->stream, |
150 | 0 | task->thread->engine->port->id, NULL); |
151 | |
|
152 | 0 | close_handler: |
153 | |
|
154 | 0 | nxt_http_request_close_handler(task, obj, data); |
155 | 0 | } |