Coverage Report

Created: 2025-08-29 06:30

/src/unit/src/nxt_h1proto_websocket.c
Line
Count
Source (jump to first uncovered line)
1
2
/*
3
 * Copyright (C) NGINX, Inc.
4
 */
5
6
#include <nxt_main.h>
7
#include <nxt_router.h>
8
#include <nxt_http.h>
9
#include <nxt_h1proto.h>
10
#include <nxt_websocket.h>
11
#include <nxt_websocket_header.h>
12
13
typedef struct {
14
    uint16_t   code;
15
    uint8_t    args;
16
    nxt_str_t  desc;
17
} nxt_ws_error_t;
18
19
static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data);
20
static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj,
21
    void *data);
22
static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task,
23
    nxt_h1proto_t *h1p);
24
static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task,
25
    nxt_h1proto_t *h1p);
26
static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
27
    nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh);
28
static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data);
29
static ssize_t nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
30
static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data);
31
static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj,
32
    void *data);
33
static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
34
    const nxt_ws_error_t *err, ...);
35
static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data);
36
static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data);
37
38
static const nxt_conn_state_t  nxt_h1p_read_ws_frame_header_state;
39
static const nxt_conn_state_t  nxt_h1p_read_ws_frame_payload_state;
40
41
static const nxt_ws_error_t  nxt_ws_err_out_of_memory = {
42
    NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR,
43
    0, nxt_string("Out of memory") };
44
static const nxt_ws_error_t  nxt_ws_err_too_big = {
45
    NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG,
46
    1, nxt_string("Message too big: %uL bytes") };
47
static const nxt_ws_error_t  nxt_ws_err_invalid_close_code = {
48
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
49
    1, nxt_string("Close code %ud is not valid") };
50
static const nxt_ws_error_t  nxt_ws_err_going_away = {
51
    NXT_WEBSOCKET_CR_GOING_AWAY,
52
    0, nxt_string("Remote peer is going away") };
53
static const nxt_ws_error_t  nxt_ws_err_not_masked = {
54
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
55
    0, nxt_string("Not masked client frame") };
56
static const nxt_ws_error_t  nxt_ws_err_ctrl_fragmented = {
57
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
58
    0, nxt_string("Fragmented control frame") };
59
static const nxt_ws_error_t  nxt_ws_err_ctrl_too_big = {
60
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
61
    1, nxt_string("Control frame too big: %uL bytes") };
62
static const nxt_ws_error_t  nxt_ws_err_invalid_close_len = {
63
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
64
    0, nxt_string("Close frame payload length cannot be 1") };
65
static const nxt_ws_error_t  nxt_ws_err_invalid_opcode = {
66
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
67
    1, nxt_string("Unrecognized opcode %ud") };
68
static const nxt_ws_error_t  nxt_ws_err_cont_expected = {
69
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
70
    1, nxt_string("Continuation expected, but %ud opcode received") };
71
72
void
73
nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r,
74
    nxt_buf_t *ws_frame)
75
0
{
76
0
    nxt_conn_t            *c;
77
0
    nxt_timer_t           *timer;
78
0
    nxt_h1proto_t         *h1p;
79
0
    nxt_websocket_conf_t  *websocket_conf;
80
81
0
    nxt_debug(task, "h1p ws first frame start");
82
83
0
    h1p = r->proto.h1;
84
0
    c = h1p->conn;
85
86
0
    if (!c->tcp_nodelay) {
87
0
        nxt_conn_tcp_nodelay_on(task, c);
88
0
    }
89
90
0
    websocket_conf = &r->conf->socket_conf->websocket_conf;
91
92
0
    if (nxt_slow_path(websocket_conf->keepalive_interval != 0)) {
93
0
        h1p->websocket_timer = nxt_mp_zget(c->mem_pool,
94
0
                                           sizeof(nxt_h1p_websocket_timer_t));
95
0
        if (nxt_slow_path(h1p->websocket_timer == NULL)) {
96
0
            hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory);
97
0
            return;
98
0
        }
99
100
0
        h1p->websocket_timer->keepalive_interval =
101
0
            websocket_conf->keepalive_interval;
102
0
        h1p->websocket_timer->h1p = h1p;
103
104
0
        timer = &h1p->websocket_timer->timer;
105
0
        timer->task = &c->task;
106
0
        timer->work_queue = &task->thread->engine->fast_work_queue;
107
0
        timer->log = &c->log;
108
0
        timer->bias = NXT_TIMER_DEFAULT_BIAS;
109
0
        timer->handler = nxt_h1p_conn_ws_keepalive;
110
0
    }
111
112
0
    nxt_h1p_websocket_frame_start(task, r, ws_frame);
113
0
}
114
115
116
void
117
nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r,
118
    nxt_buf_t *ws_frame)
119
0
{
120
0
    size_t         size;
121
0
    nxt_buf_t      *in;
122
0
    nxt_conn_t     *c;
123
0
    nxt_h1proto_t  *h1p;
124
125
0
    nxt_debug(task, "h1p ws frame start");
126
127
0
    h1p = r->proto.h1;
128
129
0
    if (nxt_slow_path(h1p->websocket_closed)) {
130
0
        return;
131
0
    }
132
133
0
    c = h1p->conn;
134
0
    c->read = ws_frame;
135
136
0
    nxt_h1p_complete_buffers(task, h1p, 0);
137
138
0
    in = c->read;
139
0
    c->read_state = &nxt_h1p_read_ws_frame_header_state;
140
141
0
    if (in == NULL) {
142
0
        nxt_conn_read(task->thread->engine, c);
143
0
        nxt_h1p_conn_ws_keepalive_enable(task, h1p);
144
145
0
    } else {
146
0
        size = nxt_buf_mem_used_size(&in->mem);
147
148
0
        nxt_debug(task, "h1p read client ws frame");
149
150
0
        nxt_memmove(in->mem.start, in->mem.pos, size);
151
152
0
        in->mem.pos = in->mem.start;
153
0
        in->mem.free = in->mem.start + size;
154
155
0
        nxt_h1p_conn_ws_frame_header_read(task, c, h1p);
156
0
    }
157
0
}
158
159
160
static void
161
nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data)
162
0
{
163
0
    nxt_buf_t                  *out;
164
0
    nxt_timer_t                *timer;
165
0
    nxt_h1proto_t              *h1p;
166
0
    nxt_http_request_t         *r;
167
0
    nxt_websocket_header_t     *wsh;
168
0
    nxt_h1p_websocket_timer_t  *ws_timer;
169
170
0
    nxt_debug(task, "h1p conn ws keepalive");
171
172
0
    timer = obj;
173
0
    ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
174
0
    h1p = ws_timer->h1p;
175
176
0
    r = h1p->request;
177
0
    if (nxt_slow_path(r == NULL)) {
178
0
        return;
179
0
    }
180
181
0
    out = nxt_http_buf_mem(task, r, 2);
182
0
    if (nxt_slow_path(out == NULL)) {
183
0
        nxt_http_request_error_handler(task, r, r->proto.any);
184
0
        return;
185
0
    }
186
187
0
    out->mem.start[0] = 0;
188
0
    out->mem.start[1] = 0;
189
190
0
    wsh = (nxt_websocket_header_t *) out->mem.start;
191
0
    out->mem.free = nxt_websocket_frame_init(wsh, 0);
192
193
0
    wsh->fin = 1;
194
0
    wsh->opcode = NXT_WEBSOCKET_OP_PING;
195
196
0
    nxt_http_request_send(task, r, out);
197
0
}
198
199
200
static const nxt_conn_state_t  nxt_h1p_read_ws_frame_header_state
201
    nxt_aligned(64) =
202
{
203
    .ready_handler = nxt_h1p_conn_ws_frame_header_read,
204
    .close_handler = nxt_h1p_conn_ws_error,
205
    .error_handler = nxt_h1p_conn_ws_error,
206
207
    .io_read_handler = nxt_h1p_ws_io_read_handler,
208
209
    .timer_handler = nxt_h1p_conn_ws_timeout,
210
    .timer_value = nxt_h1p_conn_request_timer_value,
211
    .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
212
    .timer_autoreset = 1,
213
};
214
215
216
static void
217
nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data)
218
0
{
219
0
    size_t                  size, hsize, frame_size, max_frame_size;
220
0
    uint64_t                payload_len;
221
0
    nxt_conn_t              *c;
222
0
    nxt_h1proto_t           *h1p;
223
0
    nxt_http_request_t      *r;
224
0
    nxt_event_engine_t      *engine;
225
0
    nxt_websocket_header_t  *wsh;
226
227
0
    c = obj;
228
0
    h1p = data;
229
230
0
    nxt_h1p_conn_ws_keepalive_disable(task, h1p);
231
232
0
    size = nxt_buf_mem_used_size(&c->read->mem);
233
234
0
    engine = task->thread->engine;
235
236
0
    if (size < 2) {
237
0
        nxt_debug(task, "h1p conn ws frame header read %z", size);
238
239
0
        nxt_conn_read(engine, c);
240
0
        nxt_h1p_conn_ws_keepalive_enable(task, h1p);
241
242
0
        return;
243
0
    }
244
245
0
    wsh = (nxt_websocket_header_t *) c->read->mem.pos;
246
247
0
    hsize = nxt_websocket_frame_header_size(wsh);
248
249
0
    if (size < hsize) {
250
0
        nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize);
251
252
0
        nxt_conn_read(engine, c);
253
0
        nxt_h1p_conn_ws_keepalive_enable(task, h1p);
254
255
0
        return;
256
0
    }
257
258
0
    r = h1p->request;
259
0
    if (nxt_slow_path(r == NULL)) {
260
0
        return;
261
0
    }
262
263
0
    r->ws_frame = c->read;
264
265
0
    if (nxt_slow_path(wsh->mask == 0)) {
266
0
        hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked);
267
0
        return;
268
0
    }
269
270
0
    if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) {
271
0
        if (nxt_slow_path(wsh->fin == 0)) {
272
0
            hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented);
273
0
            return;
274
0
        }
275
276
0
        if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING
277
0
                          && wsh->opcode != NXT_WEBSOCKET_OP_PONG
278
0
                          && wsh->opcode != NXT_WEBSOCKET_OP_CLOSE))
279
0
        {
280
0
            hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
281
0
                                  wsh->opcode);
282
0
            return;
283
0
        }
284
285
0
        if (nxt_slow_path(wsh->payload_len > 125)) {
286
0
            hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big,
287
0
                                  nxt_websocket_frame_payload_len(wsh));
288
0
            return;
289
0
        }
290
291
0
        if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE
292
0
                          && wsh->payload_len == 1))
293
0
        {
294
0
            hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len);
295
0
            return;
296
0
        }
297
298
0
    } else {
299
0
        if (h1p->websocket_cont_expected) {
300
0
            if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) {
301
0
                hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected,
302
0
                                      wsh->opcode);
303
0
                return;
304
0
            }
305
306
0
        } else {
307
0
            if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY
308
0
                              && wsh->opcode != NXT_WEBSOCKET_OP_TEXT))
309
0
            {
310
0
                hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
311
0
                                      wsh->opcode);
312
0
                return;
313
0
            }
314
0
        }
315
316
0
        h1p->websocket_cont_expected = !wsh->fin;
317
0
    }
318
319
0
    max_frame_size = r->conf->socket_conf->websocket_conf.max_frame_size;
320
321
0
    payload_len = nxt_websocket_frame_payload_len(wsh);
322
323
0
    if (nxt_slow_path(hsize > max_frame_size
324
0
                      || payload_len > (max_frame_size - hsize)))
325
0
    {
326
0
        hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len);
327
0
        return;
328
0
    }
329
330
0
    c->read_state = &nxt_h1p_read_ws_frame_payload_state;
331
332
0
    frame_size = payload_len + hsize;
333
334
0
    nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size);
335
336
0
    if (frame_size <= size) {
337
0
        nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
338
339
0
        return;
340
0
    }
341
342
0
    if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) {
343
0
        c->read->mem.end = c->read->mem.start + frame_size;
344
345
0
    } else {
346
0
        nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0);
347
348
0
        c->read->next = b;
349
0
        c->read = b;
350
0
    }
351
352
0
    nxt_conn_read(engine, c);
353
0
    nxt_h1p_conn_ws_keepalive_enable(task, h1p);
354
0
}
355
356
357
static void
358
nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p)
359
0
{
360
0
    nxt_timer_t  *timer;
361
362
0
    if (h1p->websocket_timer == NULL) {
363
0
        return;
364
0
    }
365
366
0
    timer = &h1p->websocket_timer->timer;
367
368
0
    if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
369
0
        nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown");
370
0
        return;
371
0
    }
372
373
0
    nxt_timer_disable(task->thread->engine, timer);
374
0
}
375
376
377
static void
378
nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p)
379
0
{
380
0
    nxt_timer_t  *timer;
381
382
0
    if (h1p->websocket_timer == NULL) {
383
0
        return;
384
0
    }
385
386
0
    timer = &h1p->websocket_timer->timer;
387
388
0
    if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
389
0
        nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown");
390
0
        return;
391
0
    }
392
393
0
    nxt_timer_add(task->thread->engine, timer,
394
0
                  h1p->websocket_timer->keepalive_interval);
395
0
}
396
397
398
static void
399
nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
400
    nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh)
401
0
{
402
0
    size_t              hsize;
403
0
    uint8_t             *p, *mask;
404
0
    uint16_t            code;
405
0
    nxt_http_request_t  *r;
406
407
0
    r = h1p->request;
408
409
0
    c->read = NULL;
410
411
0
    if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) {
412
0
        nxt_h1p_conn_ws_pong(task, r, NULL);
413
0
        return;
414
0
    }
415
416
0
    if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) {
417
0
        if (wsh->payload_len >= 2) {
418
0
            hsize = nxt_websocket_frame_header_size(wsh);
419
0
            mask = nxt_pointer_to(wsh, hsize - 4);
420
0
            p = nxt_pointer_to(wsh, hsize);
421
422
0
            code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]);
423
424
0
            if (nxt_slow_path(code < 1000 || code >= 5000
425
0
                              || (code > 1003 && code < 1007)
426
0
                              || (code > 1014 && code < 3000)))
427
0
            {
428
0
                hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code,
429
0
                                      code);
430
0
                return;
431
0
            }
432
0
        }
433
434
0
        h1p->websocket_closed = 1;
435
0
    }
436
437
0
    r->state->ready_handler(task, r, NULL);
438
0
}
439
440
441
static void
442
nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data)
443
0
{
444
0
    nxt_h1proto_t       *h1p;
445
0
    nxt_http_request_t  *r;
446
447
0
    h1p = data;
448
449
0
    nxt_debug(task, "h1p conn ws error");
450
451
0
    r = h1p->request;
452
453
0
    h1p->keepalive = 0;
454
455
0
    if (nxt_fast_path(r != NULL)) {
456
0
        r->state->error_handler(task, r, h1p);
457
0
    }
458
0
}
459
460
461
static ssize_t
462
nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
463
0
{
464
0
    size_t     size;
465
0
    ssize_t    n;
466
0
    nxt_buf_t  *b;
467
468
0
    b = c->read;
469
470
0
    if (b == NULL) {
471
        /* Enough for control frame. */
472
0
        size = 10 + 125;
473
474
0
        b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
475
0
        if (nxt_slow_path(b == NULL)) {
476
0
            c->socket.error = NXT_ENOMEM;
477
0
            return NXT_ERROR;
478
0
        }
479
0
    }
480
481
0
    n = c->io->recvbuf(c, b);
482
483
0
    if (n > 0) {
484
0
        c->read = b;
485
486
0
    } else {
487
0
        c->read = NULL;
488
0
        nxt_mp_free(c->mem_pool, b);
489
0
    }
490
491
0
    return n;
492
0
}
493
494
495
static void
496
nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data)
497
0
{
498
0
    nxt_conn_t          *c;
499
0
    nxt_timer_t         *timer;
500
0
    nxt_h1proto_t       *h1p;
501
0
    nxt_http_request_t  *r;
502
503
0
    timer = obj;
504
505
0
    nxt_debug(task, "h1p conn ws timeout");
506
507
0
    c = nxt_read_timer_conn(timer);
508
0
    c->block_read = 1;
509
    /*
510
     * Disable SO_LINGER off during socket closing
511
     * to send "408 Request Timeout" error response.
512
     */
513
0
    c->socket.timedout = 0;
514
515
0
    h1p = c->socket.data;
516
0
    h1p->keepalive = 0;
517
518
0
    r = h1p->request;
519
0
    if (nxt_slow_path(r == NULL)) {
520
0
        return;
521
0
    }
522
523
0
    hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away);
524
0
}
525
526
527
static const nxt_conn_state_t  nxt_h1p_read_ws_frame_payload_state
528
    nxt_aligned(64) =
529
{
530
    .ready_handler = nxt_h1p_conn_ws_frame_payload_read,
531
    .close_handler = nxt_h1p_conn_ws_error,
532
    .error_handler = nxt_h1p_conn_ws_error,
533
534
    .timer_handler = nxt_h1p_conn_ws_timeout,
535
    .timer_value = nxt_h1p_conn_request_timer_value,
536
    .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
537
    .timer_autoreset = 1,
538
};
539
540
541
static void
542
nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data)
543
0
{
544
0
    nxt_conn_t              *c;
545
0
    nxt_h1proto_t           *h1p;
546
0
    nxt_http_request_t      *r;
547
0
    nxt_event_engine_t      *engine;
548
0
    nxt_websocket_header_t  *wsh;
549
550
0
    c = obj;
551
0
    h1p = data;
552
553
0
    nxt_h1p_conn_ws_keepalive_disable(task, h1p);
554
555
0
    nxt_debug(task, "h1p conn ws frame read");
556
557
0
    if (nxt_buf_mem_free_size(&c->read->mem) == 0) {
558
0
        r = h1p->request;
559
0
        if (nxt_slow_path(r == NULL)) {
560
0
            return;
561
0
        }
562
563
0
        wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
564
565
0
        nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
566
567
0
        return;
568
0
    }
569
570
0
    engine = task->thread->engine;
571
572
0
    nxt_conn_read(engine, c);
573
0
    nxt_h1p_conn_ws_keepalive_enable(task, h1p);
574
0
}
575
576
577
static void
578
hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
579
    const nxt_ws_error_t *err, ...)
580
0
{
581
0
    u_char                  *p;
582
0
    va_list                 args;
583
0
    nxt_buf_t               *out;
584
0
    nxt_str_t               desc;
585
0
    nxt_websocket_header_t  *wsh;
586
0
    u_char                  buf[125];
587
588
0
    if (nxt_slow_path(err->args)) {
589
0
        va_start(args, err);
590
0
        p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start,
591
0
                         args);
592
0
        va_end(args);
593
594
0
        desc.start = buf;
595
0
        desc.length = p - buf;
596
597
0
    } else {
598
0
        desc = err->desc;
599
0
    }
600
601
0
    nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc);
602
603
0
    out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length);
604
0
    if (nxt_slow_path(out == NULL)) {
605
0
        nxt_http_request_error_handler(task, r, r->proto.any);
606
0
        return;
607
0
    }
608
609
0
    out->mem.start[0] = 0;
610
0
    out->mem.start[1] = 0;
611
612
0
    wsh = (nxt_websocket_header_t *) out->mem.start;
613
0
    p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length);
614
615
0
    wsh->fin = 1;
616
0
    wsh->opcode = NXT_WEBSOCKET_OP_CLOSE;
617
618
0
    *p++ = (err->code >> 8) & 0xFF;
619
0
    *p++ = err->code & 0xFF;
620
621
0
    out->mem.free = nxt_cpymem(p, desc.start, desc.length);
622
0
    out->next = nxt_http_buf_last(r);
623
624
0
    if (out->next != NULL) {
625
0
        out->next->completion_handler = nxt_h1p_conn_ws_error_sent;
626
0
    }
627
628
0
    nxt_http_request_send(task, r, out);
629
0
}
630
631
632
static void
633
nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data)
634
0
{
635
0
    nxt_http_request_t  *r;
636
637
0
    r = data;
638
639
0
    nxt_debug(task, "h1p conn ws error sent");
640
641
0
    r->state->error_handler(task, r, r->proto.any);
642
0
}
643
644
645
static void
646
nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
647
0
{
648
0
    uint8_t                 payload_len, i;
649
0
    nxt_buf_t               *b, *out, *next;
650
0
    nxt_http_request_t      *r;
651
0
    nxt_websocket_header_t  *wsh;
652
0
    uint8_t                 mask[4];
653
654
0
    nxt_debug(task, "h1p conn ws pong");
655
656
0
    r = obj;
657
0
    b = r->ws_frame;
658
659
0
    wsh = (nxt_websocket_header_t *) b->mem.pos;
660
0
    payload_len = wsh->payload_len;
661
662
0
    b->mem.pos += 2;
663
664
0
    nxt_memcpy(mask, b->mem.pos, 4);
665
666
0
    b->mem.pos += 4;
667
668
0
    out = nxt_http_buf_mem(task, r, 2 + payload_len);
669
0
    if (nxt_slow_path(out == NULL)) {
670
0
        nxt_http_request_error_handler(task, r, r->proto.any);
671
0
        return;
672
0
    }
673
674
0
    out->mem.start[0] = 0;
675
0
    out->mem.start[1] = 0;
676
677
0
    wsh = (nxt_websocket_header_t *) out->mem.start;
678
0
    out->mem.free = nxt_websocket_frame_init(wsh, payload_len);
679
680
0
    wsh->fin = 1;
681
0
    wsh->opcode = NXT_WEBSOCKET_OP_PONG;
682
683
0
    for (i = 0; i < payload_len; i++) {
684
0
        while (nxt_buf_mem_used_size(&b->mem) == 0) {
685
0
            next = b->next;
686
0
            b->next = NULL;
687
688
0
            nxt_work_queue_add(&task->thread->engine->fast_work_queue,
689
0
                               b->completion_handler, task, b, b->parent);
690
691
0
            b = next;
692
0
        }
693
694
0
        *out->mem.free++ = *b->mem.pos++ ^ mask[i % 4];
695
0
    }
696
697
0
    r->ws_frame = b;
698
699
0
    nxt_http_request_send(task, r, out);
700
701
0
    nxt_http_request_ws_frame_start(task, r, r->ws_frame);
702
0
}