/src/unit/src/nxt_conn_read.c
Line | Count | Source (jump to first uncovered line) |
1 | | |
2 | | /* |
3 | | * Copyright (C) Igor Sysoev |
4 | | * Copyright (C) NGINX, Inc. |
5 | | */ |
6 | | |
7 | | #include <nxt_main.h> |
8 | | |
9 | | |
10 | | void |
11 | | nxt_conn_wait(nxt_conn_t *c) |
12 | 0 | { |
13 | 0 | nxt_event_engine_t *engine; |
14 | 0 | const nxt_conn_state_t *state; |
15 | |
|
16 | 0 | nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", |
17 | 0 | c->socket.fd, c->socket.read_ready); |
18 | |
|
19 | 0 | engine = c->socket.task->thread->engine; |
20 | 0 | state = c->read_state; |
21 | |
|
22 | 0 | if (c->socket.read_ready) { |
23 | 0 | nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, |
24 | 0 | c->socket.task, c, c->socket.data); |
25 | 0 | return; |
26 | 0 | } |
27 | | |
28 | 0 | c->socket.read_handler = state->ready_handler; |
29 | 0 | c->socket.error_handler = state->error_handler; |
30 | |
|
31 | 0 | nxt_conn_timer(engine, c, state, &c->read_timer); |
32 | |
|
33 | 0 | nxt_fd_event_enable_read(engine, &c->socket); |
34 | 0 | } |
35 | | |
36 | | |
37 | | void |
38 | | nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) |
39 | 0 | { |
40 | 0 | ssize_t n; |
41 | 0 | nxt_conn_t *c; |
42 | 0 | nxt_event_engine_t *engine; |
43 | 0 | nxt_work_handler_t handler; |
44 | 0 | const nxt_conn_state_t *state; |
45 | |
|
46 | 0 | c = obj; |
47 | |
|
48 | 0 | nxt_debug(task, "conn read fd:%d rdy:%d cl:%d er:%d bl:%d", |
49 | 0 | c->socket.fd, c->socket.read_ready, c->socket.closed, |
50 | 0 | c->socket.error, c->block_read); |
51 | |
|
52 | 0 | if (c->socket.error != 0 || c->block_read) { |
53 | 0 | return; |
54 | 0 | } |
55 | | |
56 | 0 | engine = task->thread->engine; |
57 | | |
58 | | /* |
59 | | * Here c->io->read() is assigned instead of direct nxt_conn_io_read() |
60 | | * because the function can be called by nxt_kqueue_conn_io_read(). |
61 | | */ |
62 | 0 | c->socket.read_handler = c->io->read; |
63 | 0 | state = c->read_state; |
64 | 0 | c->socket.error_handler = state->error_handler; |
65 | |
|
66 | 0 | if (c->socket.read_ready) { |
67 | |
|
68 | 0 | if (state->io_read_handler == NULL) { |
69 | 0 | n = c->io->recvbuf(c, c->read); |
70 | |
|
71 | 0 | } else { |
72 | 0 | n = state->io_read_handler(task, c); |
73 | | /* The state can be changed by io_read_handler. */ |
74 | 0 | state = c->read_state; |
75 | 0 | } |
76 | |
|
77 | 0 | if (n > 0) { |
78 | 0 | c->nbytes = n; |
79 | |
|
80 | 0 | nxt_recvbuf_update(c->read, n); |
81 | |
|
82 | 0 | nxt_fd_event_block_read(engine, &c->socket); |
83 | |
|
84 | 0 | if (state->timer_autoreset) { |
85 | 0 | nxt_timer_disable(engine, &c->read_timer); |
86 | 0 | } |
87 | |
|
88 | 0 | nxt_work_queue_add(c->read_work_queue, |
89 | 0 | state->ready_handler, task, c, data); |
90 | 0 | return; |
91 | 0 | } |
92 | | |
93 | 0 | if (n != NXT_AGAIN) { |
94 | | /* n == 0 or n == NXT_ERROR. */ |
95 | 0 | handler = (n == 0) ? state->close_handler : state->error_handler; |
96 | |
|
97 | 0 | nxt_fd_event_block_read(engine, &c->socket); |
98 | 0 | nxt_timer_disable(engine, &c->read_timer); |
99 | |
|
100 | 0 | nxt_work_queue_add(&engine->fast_work_queue, |
101 | 0 | handler, task, c, data); |
102 | 0 | return; |
103 | 0 | } |
104 | | |
105 | | /* n == NXT_AGAIN. */ |
106 | | |
107 | 0 | if (c->socket.read_ready) { |
108 | | /* |
109 | | * SSL/TLS library can return NXT_AGAIN if renegotiation |
110 | | * occured during read operation, it toggled write event |
111 | | * internally so only read timer should be set. |
112 | | */ |
113 | 0 | if (!c->read_timer.enabled) { |
114 | 0 | nxt_conn_timer(engine, c, state, &c->read_timer); |
115 | 0 | } |
116 | |
|
117 | 0 | return; |
118 | 0 | } |
119 | 0 | } |
120 | | |
121 | 0 | if (nxt_fd_event_is_disabled(c->socket.read)) { |
122 | 0 | nxt_fd_event_enable_read(engine, &c->socket); |
123 | 0 | } |
124 | |
|
125 | 0 | if (state->timer_autoreset || !c->read_timer.enabled) { |
126 | 0 | nxt_conn_timer(engine, c, state, &c->read_timer); |
127 | 0 | } |
128 | 0 | } |
129 | | |
130 | | |
131 | | ssize_t |
132 | | nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) |
133 | 0 | { |
134 | 0 | ssize_t n; |
135 | 0 | nxt_err_t err; |
136 | 0 | nxt_uint_t niov; |
137 | 0 | struct iovec iov[NXT_IOBUF_MAX]; |
138 | 0 | nxt_recvbuf_coalesce_t rb; |
139 | |
|
140 | 0 | rb.buf = b; |
141 | 0 | rb.iobuf = iov; |
142 | 0 | rb.nmax = NXT_IOBUF_MAX; |
143 | 0 | rb.size = 0; |
144 | |
|
145 | 0 | niov = nxt_recvbuf_mem_coalesce(&rb); |
146 | |
|
147 | 0 | if (niov == 1) { |
148 | | /* Disposal of surplus kernel iovec copy-in operation. */ |
149 | 0 | return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); |
150 | 0 | } |
151 | | |
152 | 0 | for ( ;; ) { |
153 | 0 | n = readv(c->socket.fd, iov, niov); |
154 | |
|
155 | 0 | err = (n == -1) ? nxt_socket_errno : 0; |
156 | |
|
157 | 0 | nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n); |
158 | |
|
159 | 0 | if (n > 0) { |
160 | 0 | if ((size_t) n < rb.size) { |
161 | 0 | c->socket.read_ready = 0; |
162 | 0 | } |
163 | |
|
164 | 0 | return n; |
165 | 0 | } |
166 | | |
167 | 0 | if (n == 0) { |
168 | 0 | c->socket.closed = 1; |
169 | 0 | c->socket.read_ready = 0; |
170 | 0 | return n; |
171 | 0 | } |
172 | | |
173 | | /* n == -1 */ |
174 | | |
175 | 0 | switch (err) { |
176 | | |
177 | 0 | case NXT_EAGAIN: |
178 | 0 | nxt_debug(c->socket.task, "readv() %E", err); |
179 | 0 | c->socket.read_ready = 0; |
180 | 0 | return NXT_AGAIN; |
181 | | |
182 | 0 | case NXT_EINTR: |
183 | 0 | nxt_debug(c->socket.task, "readv() %E", err); |
184 | 0 | continue; |
185 | | |
186 | 0 | default: |
187 | 0 | c->socket.error = err; |
188 | 0 | nxt_log(c->socket.task, nxt_socket_error_level(err), |
189 | 0 | "readv(%d, %ui) failed %E", c->socket.fd, niov, err); |
190 | |
|
191 | 0 | return NXT_ERROR; |
192 | 0 | } |
193 | 0 | } |
194 | 0 | } |
195 | | |
196 | | |
197 | | ssize_t |
198 | | nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags) |
199 | 0 | { |
200 | 0 | ssize_t n; |
201 | 0 | nxt_err_t err; |
202 | |
|
203 | 0 | for ( ;; ) { |
204 | 0 | n = recv(c->socket.fd, buf, size, flags); |
205 | |
|
206 | 0 | err = (n == -1) ? nxt_socket_errno : 0; |
207 | |
|
208 | 0 | nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", |
209 | 0 | c->socket.fd, buf, size, flags, n); |
210 | |
|
211 | 0 | if (n > 0) { |
212 | 0 | if ((size_t) n < size && (flags & MSG_PEEK) == 0) { |
213 | 0 | c->socket.read_ready = 0; |
214 | 0 | } |
215 | |
|
216 | 0 | return n; |
217 | 0 | } |
218 | | |
219 | 0 | if (n == 0) { |
220 | 0 | c->socket.closed = 1; |
221 | |
|
222 | 0 | if ((flags & MSG_PEEK) == 0) { |
223 | 0 | c->socket.read_ready = 0; |
224 | 0 | } |
225 | |
|
226 | 0 | return n; |
227 | 0 | } |
228 | | |
229 | | /* n == -1 */ |
230 | | |
231 | 0 | switch (err) { |
232 | | |
233 | 0 | case NXT_EAGAIN: |
234 | 0 | nxt_debug(c->socket.task, "recv() %E", err); |
235 | 0 | c->socket.read_ready = 0; |
236 | |
|
237 | 0 | return NXT_AGAIN; |
238 | | |
239 | 0 | case NXT_EINTR: |
240 | 0 | nxt_debug(c->socket.task, "recv() %E", err); |
241 | 0 | continue; |
242 | | |
243 | 0 | default: |
244 | 0 | c->socket.error = err; |
245 | 0 | nxt_log(c->socket.task, nxt_socket_error_level(err), |
246 | 0 | "recv(%d, %p, %uz, %ui) failed %E", |
247 | 0 | c->socket.fd, buf, size, flags, err); |
248 | |
|
249 | 0 | return NXT_ERROR; |
250 | 0 | } |
251 | 0 | } |
252 | 0 | } |