/src/unit/src/nxt_h1proto_websocket.c
Line | Count | Source (jump to first uncovered line) |
1 | | |
2 | | /* |
3 | | * Copyright (C) NGINX, Inc. |
4 | | */ |
5 | | |
6 | | #include <nxt_main.h> |
7 | | #include <nxt_router.h> |
8 | | #include <nxt_http.h> |
9 | | #include <nxt_h1proto.h> |
10 | | #include <nxt_websocket.h> |
11 | | #include <nxt_websocket_header.h> |
12 | | |
13 | | typedef struct { |
14 | | uint16_t code; |
15 | | uint8_t args; |
16 | | nxt_str_t desc; |
17 | | } nxt_ws_error_t; |
18 | | |
19 | | static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data); |
20 | | static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, |
21 | | void *data); |
22 | | static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, |
23 | | nxt_h1proto_t *h1p); |
24 | | static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, |
25 | | nxt_h1proto_t *h1p); |
26 | | static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c, |
27 | | nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh); |
28 | | static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data); |
29 | | static ssize_t nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c); |
30 | | static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data); |
31 | | static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, |
32 | | void *data); |
33 | | static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r, |
34 | | const nxt_ws_error_t *err, ...); |
35 | | static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data); |
36 | | static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data); |
37 | | |
38 | | static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state; |
39 | | static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state; |
40 | | |
41 | | static const nxt_ws_error_t nxt_ws_err_out_of_memory = { |
42 | | NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR, |
43 | | 0, nxt_string("Out of memory") }; |
44 | | static const nxt_ws_error_t nxt_ws_err_too_big = { |
45 | | NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG, |
46 | | 1, nxt_string("Message too big: %uL bytes") }; |
47 | | static const nxt_ws_error_t nxt_ws_err_invalid_close_code = { |
48 | | NXT_WEBSOCKET_CR_PROTOCOL_ERROR, |
49 | | 1, nxt_string("Close code %ud is not valid") }; |
50 | | static const nxt_ws_error_t nxt_ws_err_going_away = { |
51 | | NXT_WEBSOCKET_CR_GOING_AWAY, |
52 | | 0, nxt_string("Remote peer is going away") }; |
53 | | static const nxt_ws_error_t nxt_ws_err_not_masked = { |
54 | | NXT_WEBSOCKET_CR_PROTOCOL_ERROR, |
55 | | 0, nxt_string("Not masked client frame") }; |
56 | | static const nxt_ws_error_t nxt_ws_err_ctrl_fragmented = { |
57 | | NXT_WEBSOCKET_CR_PROTOCOL_ERROR, |
58 | | 0, nxt_string("Fragmented control frame") }; |
59 | | static const nxt_ws_error_t nxt_ws_err_ctrl_too_big = { |
60 | | NXT_WEBSOCKET_CR_PROTOCOL_ERROR, |
61 | | 1, nxt_string("Control frame too big: %uL bytes") }; |
62 | | static const nxt_ws_error_t nxt_ws_err_invalid_close_len = { |
63 | | NXT_WEBSOCKET_CR_PROTOCOL_ERROR, |
64 | | 0, nxt_string("Close frame payload length cannot be 1") }; |
65 | | static const nxt_ws_error_t nxt_ws_err_invalid_opcode = { |
66 | | NXT_WEBSOCKET_CR_PROTOCOL_ERROR, |
67 | | 1, nxt_string("Unrecognized opcode %ud") }; |
68 | | static const nxt_ws_error_t nxt_ws_err_cont_expected = { |
69 | | NXT_WEBSOCKET_CR_PROTOCOL_ERROR, |
70 | | 1, nxt_string("Continuation expected, but %ud opcode received") }; |
71 | | |
72 | | void |
73 | | nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r, |
74 | | nxt_buf_t *ws_frame) |
75 | 0 | { |
76 | 0 | nxt_conn_t *c; |
77 | 0 | nxt_timer_t *timer; |
78 | 0 | nxt_h1proto_t *h1p; |
79 | 0 | nxt_websocket_conf_t *websocket_conf; |
80 | |
|
81 | 0 | nxt_debug(task, "h1p ws first frame start"); |
82 | |
|
83 | 0 | h1p = r->proto.h1; |
84 | 0 | c = h1p->conn; |
85 | |
|
86 | 0 | if (!c->tcp_nodelay) { |
87 | 0 | nxt_conn_tcp_nodelay_on(task, c); |
88 | 0 | } |
89 | |
|
90 | 0 | websocket_conf = &r->conf->socket_conf->websocket_conf; |
91 | |
|
92 | 0 | if (nxt_slow_path(websocket_conf->keepalive_interval != 0)) { |
93 | 0 | h1p->websocket_timer = nxt_mp_zget(c->mem_pool, |
94 | 0 | sizeof(nxt_h1p_websocket_timer_t)); |
95 | 0 | if (nxt_slow_path(h1p->websocket_timer == NULL)) { |
96 | 0 | hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory); |
97 | 0 | return; |
98 | 0 | } |
99 | | |
100 | 0 | h1p->websocket_timer->keepalive_interval = |
101 | 0 | websocket_conf->keepalive_interval; |
102 | 0 | h1p->websocket_timer->h1p = h1p; |
103 | |
|
104 | 0 | timer = &h1p->websocket_timer->timer; |
105 | 0 | timer->task = &c->task; |
106 | 0 | timer->work_queue = &task->thread->engine->fast_work_queue; |
107 | 0 | timer->log = &c->log; |
108 | 0 | timer->bias = NXT_TIMER_DEFAULT_BIAS; |
109 | 0 | timer->handler = nxt_h1p_conn_ws_keepalive; |
110 | 0 | } |
111 | | |
112 | 0 | nxt_h1p_websocket_frame_start(task, r, ws_frame); |
113 | 0 | } |
114 | | |
115 | | |
116 | | void |
117 | | nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r, |
118 | | nxt_buf_t *ws_frame) |
119 | 0 | { |
120 | 0 | size_t size; |
121 | 0 | nxt_buf_t *in; |
122 | 0 | nxt_conn_t *c; |
123 | 0 | nxt_h1proto_t *h1p; |
124 | |
|
125 | 0 | nxt_debug(task, "h1p ws frame start"); |
126 | |
|
127 | 0 | h1p = r->proto.h1; |
128 | |
|
129 | 0 | if (nxt_slow_path(h1p->websocket_closed)) { |
130 | 0 | return; |
131 | 0 | } |
132 | | |
133 | 0 | c = h1p->conn; |
134 | 0 | c->read = ws_frame; |
135 | |
|
136 | 0 | nxt_h1p_complete_buffers(task, h1p, 0); |
137 | |
|
138 | 0 | in = c->read; |
139 | 0 | c->read_state = &nxt_h1p_read_ws_frame_header_state; |
140 | |
|
141 | 0 | if (in == NULL) { |
142 | 0 | nxt_conn_read(task->thread->engine, c); |
143 | 0 | nxt_h1p_conn_ws_keepalive_enable(task, h1p); |
144 | |
|
145 | 0 | } else { |
146 | 0 | size = nxt_buf_mem_used_size(&in->mem); |
147 | |
|
148 | 0 | nxt_debug(task, "h1p read client ws frame"); |
149 | |
|
150 | 0 | nxt_memmove(in->mem.start, in->mem.pos, size); |
151 | |
|
152 | 0 | in->mem.pos = in->mem.start; |
153 | 0 | in->mem.free = in->mem.start + size; |
154 | |
|
155 | 0 | nxt_h1p_conn_ws_frame_header_read(task, c, h1p); |
156 | 0 | } |
157 | 0 | } |
158 | | |
159 | | |
160 | | static void |
161 | | nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data) |
162 | 0 | { |
163 | 0 | nxt_buf_t *out; |
164 | 0 | nxt_timer_t *timer; |
165 | 0 | nxt_h1proto_t *h1p; |
166 | 0 | nxt_http_request_t *r; |
167 | 0 | nxt_websocket_header_t *wsh; |
168 | 0 | nxt_h1p_websocket_timer_t *ws_timer; |
169 | |
|
170 | 0 | nxt_debug(task, "h1p conn ws keepalive"); |
171 | |
|
172 | 0 | timer = obj; |
173 | 0 | ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); |
174 | 0 | h1p = ws_timer->h1p; |
175 | |
|
176 | 0 | r = h1p->request; |
177 | 0 | if (nxt_slow_path(r == NULL)) { |
178 | 0 | return; |
179 | 0 | } |
180 | | |
181 | 0 | out = nxt_http_buf_mem(task, r, 2); |
182 | 0 | if (nxt_slow_path(out == NULL)) { |
183 | 0 | nxt_http_request_error_handler(task, r, r->proto.any); |
184 | 0 | return; |
185 | 0 | } |
186 | | |
187 | 0 | out->mem.start[0] = 0; |
188 | 0 | out->mem.start[1] = 0; |
189 | |
|
190 | 0 | wsh = (nxt_websocket_header_t *) out->mem.start; |
191 | 0 | out->mem.free = nxt_websocket_frame_init(wsh, 0); |
192 | |
|
193 | 0 | wsh->fin = 1; |
194 | 0 | wsh->opcode = NXT_WEBSOCKET_OP_PING; |
195 | |
|
196 | 0 | nxt_http_request_send(task, r, out); |
197 | 0 | } |
198 | | |
199 | | |
200 | | static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state |
201 | | nxt_aligned(64) = |
202 | | { |
203 | | .ready_handler = nxt_h1p_conn_ws_frame_header_read, |
204 | | .close_handler = nxt_h1p_conn_ws_error, |
205 | | .error_handler = nxt_h1p_conn_ws_error, |
206 | | |
207 | | .io_read_handler = nxt_h1p_ws_io_read_handler, |
208 | | |
209 | | .timer_handler = nxt_h1p_conn_ws_timeout, |
210 | | .timer_value = nxt_h1p_conn_request_timer_value, |
211 | | .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout), |
212 | | .timer_autoreset = 1, |
213 | | }; |
214 | | |
215 | | |
216 | | static void |
217 | | nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data) |
218 | 0 | { |
219 | 0 | size_t size, hsize, frame_size, max_frame_size; |
220 | 0 | uint64_t payload_len; |
221 | 0 | nxt_conn_t *c; |
222 | 0 | nxt_h1proto_t *h1p; |
223 | 0 | nxt_http_request_t *r; |
224 | 0 | nxt_event_engine_t *engine; |
225 | 0 | nxt_websocket_header_t *wsh; |
226 | |
|
227 | 0 | c = obj; |
228 | 0 | h1p = data; |
229 | |
|
230 | 0 | nxt_h1p_conn_ws_keepalive_disable(task, h1p); |
231 | |
|
232 | 0 | size = nxt_buf_mem_used_size(&c->read->mem); |
233 | |
|
234 | 0 | engine = task->thread->engine; |
235 | |
|
236 | 0 | if (size < 2) { |
237 | 0 | nxt_debug(task, "h1p conn ws frame header read %z", size); |
238 | |
|
239 | 0 | nxt_conn_read(engine, c); |
240 | 0 | nxt_h1p_conn_ws_keepalive_enable(task, h1p); |
241 | |
|
242 | 0 | return; |
243 | 0 | } |
244 | | |
245 | 0 | wsh = (nxt_websocket_header_t *) c->read->mem.pos; |
246 | |
|
247 | 0 | hsize = nxt_websocket_frame_header_size(wsh); |
248 | |
|
249 | 0 | if (size < hsize) { |
250 | 0 | nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize); |
251 | |
|
252 | 0 | nxt_conn_read(engine, c); |
253 | 0 | nxt_h1p_conn_ws_keepalive_enable(task, h1p); |
254 | |
|
255 | 0 | return; |
256 | 0 | } |
257 | | |
258 | 0 | r = h1p->request; |
259 | 0 | if (nxt_slow_path(r == NULL)) { |
260 | 0 | return; |
261 | 0 | } |
262 | | |
263 | 0 | r->ws_frame = c->read; |
264 | |
|
265 | 0 | if (nxt_slow_path(wsh->mask == 0)) { |
266 | 0 | hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked); |
267 | 0 | return; |
268 | 0 | } |
269 | | |
270 | 0 | if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) { |
271 | 0 | if (nxt_slow_path(wsh->fin == 0)) { |
272 | 0 | hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented); |
273 | 0 | return; |
274 | 0 | } |
275 | | |
276 | 0 | if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING |
277 | 0 | && wsh->opcode != NXT_WEBSOCKET_OP_PONG |
278 | 0 | && wsh->opcode != NXT_WEBSOCKET_OP_CLOSE)) |
279 | 0 | { |
280 | 0 | hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode, |
281 | 0 | wsh->opcode); |
282 | 0 | return; |
283 | 0 | } |
284 | | |
285 | 0 | if (nxt_slow_path(wsh->payload_len > 125)) { |
286 | 0 | hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big, |
287 | 0 | nxt_websocket_frame_payload_len(wsh)); |
288 | 0 | return; |
289 | 0 | } |
290 | | |
291 | 0 | if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE |
292 | 0 | && wsh->payload_len == 1)) |
293 | 0 | { |
294 | 0 | hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len); |
295 | 0 | return; |
296 | 0 | } |
297 | |
|
298 | 0 | } else { |
299 | 0 | if (h1p->websocket_cont_expected) { |
300 | 0 | if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) { |
301 | 0 | hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected, |
302 | 0 | wsh->opcode); |
303 | 0 | return; |
304 | 0 | } |
305 | |
|
306 | 0 | } else { |
307 | 0 | if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY |
308 | 0 | && wsh->opcode != NXT_WEBSOCKET_OP_TEXT)) |
309 | 0 | { |
310 | 0 | hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode, |
311 | 0 | wsh->opcode); |
312 | 0 | return; |
313 | 0 | } |
314 | 0 | } |
315 | | |
316 | 0 | h1p->websocket_cont_expected = !wsh->fin; |
317 | 0 | } |
318 | | |
319 | 0 | max_frame_size = r->conf->socket_conf->websocket_conf.max_frame_size; |
320 | |
|
321 | 0 | payload_len = nxt_websocket_frame_payload_len(wsh); |
322 | |
|
323 | 0 | if (nxt_slow_path(hsize > max_frame_size |
324 | 0 | || payload_len > (max_frame_size - hsize))) |
325 | 0 | { |
326 | 0 | hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len); |
327 | 0 | return; |
328 | 0 | } |
329 | | |
330 | 0 | c->read_state = &nxt_h1p_read_ws_frame_payload_state; |
331 | |
|
332 | 0 | frame_size = payload_len + hsize; |
333 | |
|
334 | 0 | nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size); |
335 | |
|
336 | 0 | if (frame_size <= size) { |
337 | 0 | nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh); |
338 | |
|
339 | 0 | return; |
340 | 0 | } |
341 | | |
342 | 0 | if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) { |
343 | 0 | c->read->mem.end = c->read->mem.start + frame_size; |
344 | |
|
345 | 0 | } else { |
346 | 0 | nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0); |
347 | |
|
348 | 0 | c->read->next = b; |
349 | 0 | c->read = b; |
350 | 0 | } |
351 | |
|
352 | 0 | nxt_conn_read(engine, c); |
353 | 0 | nxt_h1p_conn_ws_keepalive_enable(task, h1p); |
354 | 0 | } |
355 | | |
356 | | |
357 | | static void |
358 | | nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p) |
359 | 0 | { |
360 | 0 | nxt_timer_t *timer; |
361 | |
|
362 | 0 | if (h1p->websocket_timer == NULL) { |
363 | 0 | return; |
364 | 0 | } |
365 | | |
366 | 0 | timer = &h1p->websocket_timer->timer; |
367 | |
|
368 | 0 | if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) { |
369 | 0 | nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown"); |
370 | 0 | return; |
371 | 0 | } |
372 | | |
373 | 0 | nxt_timer_disable(task->thread->engine, timer); |
374 | 0 | } |
375 | | |
376 | | |
377 | | static void |
378 | | nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p) |
379 | 0 | { |
380 | 0 | nxt_timer_t *timer; |
381 | |
|
382 | 0 | if (h1p->websocket_timer == NULL) { |
383 | 0 | return; |
384 | 0 | } |
385 | | |
386 | 0 | timer = &h1p->websocket_timer->timer; |
387 | |
|
388 | 0 | if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) { |
389 | 0 | nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown"); |
390 | 0 | return; |
391 | 0 | } |
392 | | |
393 | 0 | nxt_timer_add(task->thread->engine, timer, |
394 | 0 | h1p->websocket_timer->keepalive_interval); |
395 | 0 | } |
396 | | |
397 | | |
398 | | static void |
399 | | nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c, |
400 | | nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh) |
401 | 0 | { |
402 | 0 | size_t hsize; |
403 | 0 | uint8_t *p, *mask; |
404 | 0 | uint16_t code; |
405 | 0 | nxt_http_request_t *r; |
406 | |
|
407 | 0 | r = h1p->request; |
408 | |
|
409 | 0 | c->read = NULL; |
410 | |
|
411 | 0 | if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) { |
412 | 0 | nxt_h1p_conn_ws_pong(task, r, NULL); |
413 | 0 | return; |
414 | 0 | } |
415 | | |
416 | 0 | if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) { |
417 | 0 | if (wsh->payload_len >= 2) { |
418 | 0 | hsize = nxt_websocket_frame_header_size(wsh); |
419 | 0 | mask = nxt_pointer_to(wsh, hsize - 4); |
420 | 0 | p = nxt_pointer_to(wsh, hsize); |
421 | |
|
422 | 0 | code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]); |
423 | |
|
424 | 0 | if (nxt_slow_path(code < 1000 || code >= 5000 |
425 | 0 | || (code > 1003 && code < 1007) |
426 | 0 | || (code > 1014 && code < 3000))) |
427 | 0 | { |
428 | 0 | hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code, |
429 | 0 | code); |
430 | 0 | return; |
431 | 0 | } |
432 | 0 | } |
433 | | |
434 | 0 | h1p->websocket_closed = 1; |
435 | 0 | } |
436 | | |
437 | 0 | r->state->ready_handler(task, r, NULL); |
438 | 0 | } |
439 | | |
440 | | |
441 | | static void |
442 | | nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data) |
443 | 0 | { |
444 | 0 | nxt_h1proto_t *h1p; |
445 | 0 | nxt_http_request_t *r; |
446 | |
|
447 | 0 | h1p = data; |
448 | |
|
449 | 0 | nxt_debug(task, "h1p conn ws error"); |
450 | |
|
451 | 0 | r = h1p->request; |
452 | |
|
453 | 0 | h1p->keepalive = 0; |
454 | |
|
455 | 0 | if (nxt_fast_path(r != NULL)) { |
456 | 0 | r->state->error_handler(task, r, h1p); |
457 | 0 | } |
458 | 0 | } |
459 | | |
460 | | |
461 | | static ssize_t |
462 | | nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c) |
463 | 0 | { |
464 | 0 | size_t size; |
465 | 0 | ssize_t n; |
466 | 0 | nxt_buf_t *b; |
467 | |
|
468 | 0 | b = c->read; |
469 | |
|
470 | 0 | if (b == NULL) { |
471 | | /* Enough for control frame. */ |
472 | 0 | size = 10 + 125; |
473 | |
|
474 | 0 | b = nxt_buf_mem_alloc(c->mem_pool, size, 0); |
475 | 0 | if (nxt_slow_path(b == NULL)) { |
476 | 0 | c->socket.error = NXT_ENOMEM; |
477 | 0 | return NXT_ERROR; |
478 | 0 | } |
479 | 0 | } |
480 | | |
481 | 0 | n = c->io->recvbuf(c, b); |
482 | |
|
483 | 0 | if (n > 0) { |
484 | 0 | c->read = b; |
485 | |
|
486 | 0 | } else { |
487 | 0 | c->read = NULL; |
488 | 0 | nxt_mp_free(c->mem_pool, b); |
489 | 0 | } |
490 | |
|
491 | 0 | return n; |
492 | 0 | } |
493 | | |
494 | | |
495 | | static void |
496 | | nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data) |
497 | 0 | { |
498 | 0 | nxt_conn_t *c; |
499 | 0 | nxt_timer_t *timer; |
500 | 0 | nxt_h1proto_t *h1p; |
501 | 0 | nxt_http_request_t *r; |
502 | |
|
503 | 0 | timer = obj; |
504 | |
|
505 | 0 | nxt_debug(task, "h1p conn ws timeout"); |
506 | |
|
507 | 0 | c = nxt_read_timer_conn(timer); |
508 | 0 | c->block_read = 1; |
509 | | /* |
510 | | * Disable SO_LINGER off during socket closing |
511 | | * to send "408 Request Timeout" error response. |
512 | | */ |
513 | 0 | c->socket.timedout = 0; |
514 | |
|
515 | 0 | h1p = c->socket.data; |
516 | 0 | h1p->keepalive = 0; |
517 | |
|
518 | 0 | r = h1p->request; |
519 | 0 | if (nxt_slow_path(r == NULL)) { |
520 | 0 | return; |
521 | 0 | } |
522 | | |
523 | 0 | hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away); |
524 | 0 | } |
525 | | |
526 | | |
527 | | static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state |
528 | | nxt_aligned(64) = |
529 | | { |
530 | | .ready_handler = nxt_h1p_conn_ws_frame_payload_read, |
531 | | .close_handler = nxt_h1p_conn_ws_error, |
532 | | .error_handler = nxt_h1p_conn_ws_error, |
533 | | |
534 | | .timer_handler = nxt_h1p_conn_ws_timeout, |
535 | | .timer_value = nxt_h1p_conn_request_timer_value, |
536 | | .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout), |
537 | | .timer_autoreset = 1, |
538 | | }; |
539 | | |
540 | | |
541 | | static void |
542 | | nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data) |
543 | 0 | { |
544 | 0 | nxt_conn_t *c; |
545 | 0 | nxt_h1proto_t *h1p; |
546 | 0 | nxt_http_request_t *r; |
547 | 0 | nxt_event_engine_t *engine; |
548 | 0 | nxt_websocket_header_t *wsh; |
549 | |
|
550 | 0 | c = obj; |
551 | 0 | h1p = data; |
552 | |
|
553 | 0 | nxt_h1p_conn_ws_keepalive_disable(task, h1p); |
554 | |
|
555 | 0 | nxt_debug(task, "h1p conn ws frame read"); |
556 | |
|
557 | 0 | if (nxt_buf_mem_free_size(&c->read->mem) == 0) { |
558 | 0 | r = h1p->request; |
559 | 0 | if (nxt_slow_path(r == NULL)) { |
560 | 0 | return; |
561 | 0 | } |
562 | | |
563 | 0 | wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; |
564 | |
|
565 | 0 | nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh); |
566 | |
|
567 | 0 | return; |
568 | 0 | } |
569 | | |
570 | 0 | engine = task->thread->engine; |
571 | |
|
572 | 0 | nxt_conn_read(engine, c); |
573 | 0 | nxt_h1p_conn_ws_keepalive_enable(task, h1p); |
574 | 0 | } |
575 | | |
576 | | |
577 | | static void |
578 | | hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r, |
579 | | const nxt_ws_error_t *err, ...) |
580 | 0 | { |
581 | 0 | u_char *p; |
582 | 0 | va_list args; |
583 | 0 | nxt_buf_t *out; |
584 | 0 | nxt_str_t desc; |
585 | 0 | nxt_websocket_header_t *wsh; |
586 | 0 | u_char buf[125]; |
587 | |
|
588 | 0 | if (nxt_slow_path(err->args)) { |
589 | 0 | va_start(args, err); |
590 | 0 | p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start, |
591 | 0 | args); |
592 | 0 | va_end(args); |
593 | |
|
594 | 0 | desc.start = buf; |
595 | 0 | desc.length = p - buf; |
596 | |
|
597 | 0 | } else { |
598 | 0 | desc = err->desc; |
599 | 0 | } |
600 | |
|
601 | 0 | nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc); |
602 | |
|
603 | 0 | out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length); |
604 | 0 | if (nxt_slow_path(out == NULL)) { |
605 | 0 | nxt_http_request_error_handler(task, r, r->proto.any); |
606 | 0 | return; |
607 | 0 | } |
608 | | |
609 | 0 | out->mem.start[0] = 0; |
610 | 0 | out->mem.start[1] = 0; |
611 | |
|
612 | 0 | wsh = (nxt_websocket_header_t *) out->mem.start; |
613 | 0 | p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length); |
614 | |
|
615 | 0 | wsh->fin = 1; |
616 | 0 | wsh->opcode = NXT_WEBSOCKET_OP_CLOSE; |
617 | |
|
618 | 0 | *p++ = (err->code >> 8) & 0xFF; |
619 | 0 | *p++ = err->code & 0xFF; |
620 | |
|
621 | 0 | out->mem.free = nxt_cpymem(p, desc.start, desc.length); |
622 | 0 | out->next = nxt_http_buf_last(r); |
623 | |
|
624 | 0 | if (out->next != NULL) { |
625 | 0 | out->next->completion_handler = nxt_h1p_conn_ws_error_sent; |
626 | 0 | } |
627 | |
|
628 | 0 | nxt_http_request_send(task, r, out); |
629 | 0 | } |
630 | | |
631 | | |
632 | | static void |
633 | | nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data) |
634 | 0 | { |
635 | 0 | nxt_http_request_t *r; |
636 | |
|
637 | 0 | r = data; |
638 | |
|
639 | 0 | nxt_debug(task, "h1p conn ws error sent"); |
640 | |
|
641 | 0 | r->state->error_handler(task, r, r->proto.any); |
642 | 0 | } |
643 | | |
644 | | |
645 | | static void |
646 | | nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data) |
647 | 0 | { |
648 | 0 | uint8_t payload_len, i; |
649 | 0 | nxt_buf_t *b, *out, *next; |
650 | 0 | nxt_http_request_t *r; |
651 | 0 | nxt_websocket_header_t *wsh; |
652 | 0 | uint8_t mask[4]; |
653 | |
|
654 | 0 | nxt_debug(task, "h1p conn ws pong"); |
655 | |
|
656 | 0 | r = obj; |
657 | 0 | b = r->ws_frame; |
658 | |
|
659 | 0 | wsh = (nxt_websocket_header_t *) b->mem.pos; |
660 | 0 | payload_len = wsh->payload_len; |
661 | |
|
662 | 0 | b->mem.pos += 2; |
663 | |
|
664 | 0 | nxt_memcpy(mask, b->mem.pos, 4); |
665 | |
|
666 | 0 | b->mem.pos += 4; |
667 | |
|
668 | 0 | out = nxt_http_buf_mem(task, r, 2 + payload_len); |
669 | 0 | if (nxt_slow_path(out == NULL)) { |
670 | 0 | nxt_http_request_error_handler(task, r, r->proto.any); |
671 | 0 | return; |
672 | 0 | } |
673 | | |
674 | 0 | out->mem.start[0] = 0; |
675 | 0 | out->mem.start[1] = 0; |
676 | |
|
677 | 0 | wsh = (nxt_websocket_header_t *) out->mem.start; |
678 | 0 | out->mem.free = nxt_websocket_frame_init(wsh, payload_len); |
679 | |
|
680 | 0 | wsh->fin = 1; |
681 | 0 | wsh->opcode = NXT_WEBSOCKET_OP_PONG; |
682 | |
|
683 | 0 | for (i = 0; i < payload_len; i++) { |
684 | 0 | while (nxt_buf_mem_used_size(&b->mem) == 0) { |
685 | 0 | next = b->next; |
686 | 0 | b->next = NULL; |
687 | |
|
688 | 0 | nxt_work_queue_add(&task->thread->engine->fast_work_queue, |
689 | 0 | b->completion_handler, task, b, b->parent); |
690 | |
|
691 | 0 | b = next; |
692 | 0 | } |
693 | |
|
694 | 0 | *out->mem.free++ = *b->mem.pos++ ^ mask[i % 4]; |
695 | 0 | } |
696 | |
|
697 | 0 | r->ws_frame = b; |
698 | |
|
699 | 0 | nxt_http_request_send(task, r, out); |
700 | |
|
701 | 0 | nxt_http_request_ws_frame_start(task, r, r->ws_frame); |
702 | 0 | } |