Coverage Report

Created: 2025-08-29 06:30

/src/unit/src/nxt_http_websocket.c
Line
Count
Source (jump to first uncovered line)
1
2
/*
3
 * Copyright (C) NGINX, Inc.
4
 */
5
6
#include <nxt_main.h>
7
#include <nxt_router.h>
8
#include <nxt_http.h>
9
#include <nxt_router_request.h>
10
#include <nxt_port_memory_int.h>
11
#include <nxt_websocket.h>
12
#include <nxt_websocket_header.h>
13
14
15
static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);
16
static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,
17
    void *data);
18
19
20
const nxt_http_request_state_t  nxt_http_websocket
21
    nxt_aligned(64) =
22
{
23
    .ready_handler = nxt_http_websocket_client,
24
    .error_handler = nxt_http_websocket_error_handler,
25
};
26
27
28
static void
29
nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
30
0
{
31
0
    size_t                  frame_size, used_size, copy_size, buf_free_size;
32
0
    size_t                  chunk_copy_size;
33
0
    nxt_buf_t               *out, *buf, **out_tail, *b, *next;
34
0
    nxt_int_t               res;
35
0
    nxt_http_request_t      *r;
36
0
    nxt_request_rpc_data_t  *req_rpc_data;
37
0
    nxt_websocket_header_t  *wsh;
38
39
0
    r = obj;
40
0
    req_rpc_data = r->req_rpc_data;
41
42
0
    if (nxt_slow_path(req_rpc_data == NULL)) {
43
0
        nxt_debug(task, "websocket client frame for destroyed request");
44
45
0
        return;
46
0
    }
47
48
0
    nxt_debug(task, "http websocket client frame");
49
50
0
    wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
51
52
0
    frame_size = nxt_websocket_frame_header_size(wsh)
53
0
                  + nxt_websocket_frame_payload_len(wsh);
54
55
0
    buf = NULL;
56
0
    buf_free_size = 0;
57
0
    out = NULL;
58
0
    out_tail = &out;
59
60
0
    b = r->ws_frame;
61
62
0
    while (b != NULL && frame_size > 0) {
63
0
        used_size = nxt_buf_mem_used_size(&b->mem);
64
0
        copy_size = nxt_min(used_size, frame_size);
65
66
0
        while (copy_size > 0) {
67
0
            if (buf == NULL || buf_free_size == 0) {
68
0
                buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
69
70
0
                buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing,
71
0
                                            buf_free_size);
72
73
0
                *out_tail = buf;
74
0
                out_tail = &buf->next;
75
0
            }
76
77
0
            chunk_copy_size = nxt_min(buf_free_size, copy_size);
78
79
0
            buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,
80
0
                                       chunk_copy_size);
81
82
0
            copy_size -= chunk_copy_size;
83
0
            b->mem.pos += chunk_copy_size;
84
0
            buf_free_size -= chunk_copy_size;
85
0
        }
86
87
0
        frame_size -= copy_size;
88
0
        next = b->next;
89
0
        b->next = NULL;
90
91
0
        if (nxt_buf_mem_used_size(&b->mem) == 0) {
92
0
            nxt_work_queue_add(&task->thread->engine->fast_work_queue,
93
0
                               b->completion_handler, task, b, b->parent);
94
95
0
            r->ws_frame = next;
96
0
        }
97
98
0
        b = next;
99
0
    }
100
101
0
    res = nxt_port_socket_write(task, req_rpc_data->app_port,
102
0
                                NXT_PORT_MSG_WEBSOCKET, -1,
103
0
                                req_rpc_data->stream,
104
0
                                task->thread->engine->port->id, out);
105
0
    if (nxt_slow_path(res != NXT_OK)) {
106
        // TODO: handle
107
0
    }
108
109
0
    b = r->ws_frame;
110
111
0
    if (b != NULL) {
112
0
        used_size = nxt_buf_mem_used_size(&b->mem);
113
114
0
        if (used_size > 0) {
115
0
            nxt_memmove(b->mem.start, b->mem.pos, used_size);
116
117
0
            b->mem.pos = b->mem.start;
118
0
            b->mem.free = b->mem.start + used_size;
119
0
        }
120
0
    }
121
122
0
    nxt_http_request_ws_frame_start(task, r, r->ws_frame);
123
0
}
124
125
126
static void
127
nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
128
0
{
129
0
    nxt_http_request_t      *r;
130
0
    nxt_request_rpc_data_t  *req_rpc_data;
131
132
0
    nxt_debug(task, "http websocket error handler");
133
134
0
    r = obj;
135
0
    req_rpc_data = r->req_rpc_data;
136
137
0
    if (req_rpc_data == NULL) {
138
0
        nxt_debug(task, "  req_rpc_data is NULL");
139
0
        goto close_handler;
140
0
    }
141
142
0
    if (req_rpc_data->app_port == NULL) {
143
0
        nxt_debug(task, "  app_port is NULL");
144
0
        goto close_handler;
145
0
    }
146
147
0
    (void) nxt_port_socket_write(task, req_rpc_data->app_port,
148
0
                                 NXT_PORT_MSG_WEBSOCKET_LAST,
149
0
                                 -1, req_rpc_data->stream,
150
0
                                 task->thread->engine->port->id, NULL);
151
152
0
close_handler:
153
154
0
    nxt_http_request_close_handler(task, obj, data);
155
0
}