Coverage Report

Created: 2025-06-22 06:07

/src/unit/src/nxt_conn_read.c
Line
Count
Source (jump to first uncovered line)
1
2
/*
3
 * Copyright (C) Igor Sysoev
4
 * Copyright (C) NGINX, Inc.
5
 */
6
7
#include <nxt_main.h>
8
9
10
void
11
nxt_conn_wait(nxt_conn_t *c)
12
0
{
13
0
    nxt_event_engine_t      *engine;
14
0
    const nxt_conn_state_t  *state;
15
16
0
    nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
17
0
              c->socket.fd, c->socket.read_ready);
18
19
0
    engine = c->socket.task->thread->engine;
20
0
    state = c->read_state;
21
22
0
    if (c->socket.read_ready) {
23
0
        nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
24
0
                           c->socket.task, c, c->socket.data);
25
0
        return;
26
0
    }
27
28
0
    c->socket.read_handler = state->ready_handler;
29
0
    c->socket.error_handler = state->error_handler;
30
31
0
    nxt_conn_timer(engine, c, state, &c->read_timer);
32
33
0
    nxt_fd_event_enable_read(engine, &c->socket);
34
0
}
35
36
37
void
38
nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
39
0
{
40
0
    ssize_t                 n;
41
0
    nxt_conn_t              *c;
42
0
    nxt_event_engine_t      *engine;
43
0
    nxt_work_handler_t      handler;
44
0
    const nxt_conn_state_t  *state;
45
46
0
    c = obj;
47
48
0
    nxt_debug(task, "conn read fd:%d rdy:%d cl:%d er:%d bl:%d",
49
0
              c->socket.fd, c->socket.read_ready, c->socket.closed,
50
0
              c->socket.error, c->block_read);
51
52
0
    if (c->socket.error != 0 || c->block_read) {
53
0
        return;
54
0
    }
55
56
0
    engine = task->thread->engine;
57
58
    /*
59
     * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
60
     * because the function can be called by nxt_kqueue_conn_io_read().
61
     */
62
0
    c->socket.read_handler = c->io->read;
63
0
    state = c->read_state;
64
0
    c->socket.error_handler = state->error_handler;
65
66
0
    if (c->socket.read_ready) {
67
68
0
        if (state->io_read_handler == NULL) {
69
0
            n = c->io->recvbuf(c, c->read);
70
71
0
        } else {
72
0
            n = state->io_read_handler(task, c);
73
            /* The state can be changed by io_read_handler. */
74
0
            state = c->read_state;
75
0
        }
76
77
0
        if (n > 0) {
78
0
            c->nbytes = n;
79
80
0
            nxt_recvbuf_update(c->read, n);
81
82
0
            nxt_fd_event_block_read(engine, &c->socket);
83
84
0
            if (state->timer_autoreset) {
85
0
                nxt_timer_disable(engine, &c->read_timer);
86
0
            }
87
88
0
            nxt_work_queue_add(c->read_work_queue,
89
0
                               state->ready_handler, task, c, data);
90
0
            return;
91
0
        }
92
93
0
        if (n != NXT_AGAIN) {
94
            /* n == 0 or n == NXT_ERROR. */
95
0
            handler = (n == 0) ? state->close_handler : state->error_handler;
96
97
0
            nxt_fd_event_block_read(engine, &c->socket);
98
0
            nxt_timer_disable(engine, &c->read_timer);
99
100
0
            nxt_work_queue_add(&engine->fast_work_queue,
101
0
                               handler, task, c, data);
102
0
            return;
103
0
        }
104
105
        /* n == NXT_AGAIN. */
106
107
0
        if (c->socket.read_ready) {
108
            /*
109
             * SSL/TLS library can return NXT_AGAIN if renegotiation
110
             * occured during read operation, it toggled write event
111
             * internally so only read timer should be set.
112
             */
113
0
            if (!c->read_timer.enabled) {
114
0
                nxt_conn_timer(engine, c, state, &c->read_timer);
115
0
            }
116
117
0
            return;
118
0
        }
119
0
    }
120
121
0
    if (nxt_fd_event_is_disabled(c->socket.read)) {
122
0
        nxt_fd_event_enable_read(engine, &c->socket);
123
0
    }
124
125
0
    if (state->timer_autoreset || !c->read_timer.enabled) {
126
0
        nxt_conn_timer(engine, c, state, &c->read_timer);
127
0
    }
128
0
}
129
130
131
ssize_t
132
nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
133
0
{
134
0
    ssize_t                 n;
135
0
    nxt_err_t               err;
136
0
    nxt_uint_t              niov;
137
0
    struct iovec            iov[NXT_IOBUF_MAX];
138
0
    nxt_recvbuf_coalesce_t  rb;
139
140
0
    rb.buf = b;
141
0
    rb.iobuf = iov;
142
0
    rb.nmax = NXT_IOBUF_MAX;
143
0
    rb.size = 0;
144
145
0
    niov = nxt_recvbuf_mem_coalesce(&rb);
146
147
0
    if (niov == 1) {
148
        /* Disposal of surplus kernel iovec copy-in operation. */
149
0
        return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
150
0
    }
151
152
0
    for ( ;; ) {
153
0
        n = readv(c->socket.fd, iov, niov);
154
155
0
        err = (n == -1) ? nxt_socket_errno : 0;
156
157
0
        nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n);
158
159
0
        if (n > 0) {
160
0
            if ((size_t) n < rb.size) {
161
0
                c->socket.read_ready = 0;
162
0
            }
163
164
0
            return n;
165
0
        }
166
167
0
        if (n == 0) {
168
0
            c->socket.closed = 1;
169
0
            c->socket.read_ready = 0;
170
0
            return n;
171
0
        }
172
173
        /* n == -1 */
174
175
0
        switch (err) {
176
177
0
        case NXT_EAGAIN:
178
0
            nxt_debug(c->socket.task, "readv() %E", err);
179
0
            c->socket.read_ready = 0;
180
0
            return NXT_AGAIN;
181
182
0
        case NXT_EINTR:
183
0
            nxt_debug(c->socket.task, "readv() %E", err);
184
0
            continue;
185
186
0
        default:
187
0
            c->socket.error = err;
188
0
            nxt_log(c->socket.task, nxt_socket_error_level(err),
189
0
                    "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
190
191
0
            return NXT_ERROR;
192
0
        }
193
0
    }
194
0
}
195
196
197
ssize_t
198
nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
199
0
{
200
0
    ssize_t    n;
201
0
    nxt_err_t  err;
202
203
0
    for ( ;; ) {
204
0
        n = recv(c->socket.fd, buf, size, flags);
205
206
0
        err = (n == -1) ? nxt_socket_errno : 0;
207
208
0
        nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
209
0
                  c->socket.fd, buf, size, flags, n);
210
211
0
        if (n > 0) {
212
0
            if ((size_t) n < size && (flags & MSG_PEEK) == 0) {
213
0
                c->socket.read_ready = 0;
214
0
            }
215
216
0
            return n;
217
0
        }
218
219
0
        if (n == 0) {
220
0
            c->socket.closed = 1;
221
222
0
            if ((flags & MSG_PEEK) == 0) {
223
0
                c->socket.read_ready = 0;
224
0
            }
225
226
0
            return n;
227
0
        }
228
229
        /* n == -1 */
230
231
0
        switch (err) {
232
233
0
        case NXT_EAGAIN:
234
0
            nxt_debug(c->socket.task, "recv() %E", err);
235
0
            c->socket.read_ready = 0;
236
237
0
            return NXT_AGAIN;
238
239
0
        case NXT_EINTR:
240
0
            nxt_debug(c->socket.task, "recv() %E", err);
241
0
            continue;
242
243
0
        default:
244
0
            c->socket.error = err;
245
0
            nxt_log(c->socket.task, nxt_socket_error_level(err),
246
0
                    "recv(%d, %p, %uz, %ui) failed %E",
247
0
                    c->socket.fd, buf, size, flags, err);
248
249
0
            return NXT_ERROR;
250
0
        }
251
0
    }
252
0
}