/src/unit/src/nxt_sendbuf.c
Line | Count | Source (jump to first uncovered line) |
1 | | |
2 | | /* |
3 | | * Copyright (C) Igor Sysoev |
4 | | * Copyright (C) NGINX, Inc. |
5 | | */ |
6 | | |
7 | | #include <nxt_main.h> |
8 | | |
9 | | |
10 | | static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, |
11 | | size_t *copied); |
12 | | static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task, |
13 | | nxt_work_queue_t *wq, nxt_buf_t *start); |
14 | | |
15 | | |
16 | | nxt_uint_t |
17 | | nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb, |
18 | | struct iovec *iov, nxt_uint_t niov_max) |
19 | 0 | { |
20 | 0 | u_char *last; |
21 | 0 | size_t size, total; |
22 | 0 | nxt_buf_t *b; |
23 | 0 | nxt_uint_t n; |
24 | |
|
25 | 0 | total = sb->size; |
26 | 0 | last = NULL; |
27 | 0 | n = (nxt_uint_t) -1; |
28 | |
|
29 | 0 | for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) { |
30 | |
|
31 | 0 | nxt_prefetch(b->next); |
32 | |
|
33 | 0 | if (nxt_buf_is_file(b)) { |
34 | 0 | break; |
35 | 0 | } |
36 | | |
37 | 0 | if (nxt_buf_is_mem(b)) { |
38 | |
|
39 | 0 | size = b->mem.free - b->mem.pos; |
40 | |
|
41 | 0 | if (size != 0) { |
42 | |
|
43 | 0 | if (total + size > sb->limit) { |
44 | 0 | size = sb->limit - total; |
45 | |
|
46 | 0 | if (size == 0) { |
47 | 0 | break; |
48 | 0 | } |
49 | 0 | } |
50 | | |
51 | 0 | if (b->mem.pos != last) { |
52 | |
|
53 | 0 | if (++n >= niov_max) { |
54 | 0 | goto done; |
55 | 0 | } |
56 | | |
57 | 0 | iov[n].iov_base = b->mem.pos; |
58 | 0 | iov[n].iov_len = size; |
59 | |
|
60 | 0 | } else { |
61 | 0 | iov[n].iov_len += size; |
62 | 0 | } |
63 | | |
64 | 0 | nxt_debug(task, "sendbuf: %ui, %p, %uz", |
65 | 0 | n, iov[n].iov_base, iov[n].iov_len); |
66 | |
|
67 | 0 | total += size; |
68 | 0 | last = b->mem.pos + size; |
69 | 0 | } |
70 | |
|
71 | 0 | } else { |
72 | 0 | sb->sync = 1; |
73 | 0 | sb->last |= nxt_buf_is_last(b); |
74 | 0 | } |
75 | 0 | } |
76 | | |
77 | 0 | n++; |
78 | |
|
79 | 0 | done: |
80 | |
|
81 | 0 | sb->buf = b; |
82 | |
|
83 | 0 | return n; |
84 | 0 | } |
85 | | |
86 | | |
87 | | nxt_uint_t |
88 | | nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb) |
89 | 0 | { |
90 | 0 | u_char *last; |
91 | 0 | size_t size, total; |
92 | 0 | nxt_buf_t *b; |
93 | 0 | nxt_uint_t n; |
94 | |
|
95 | 0 | total = sb->size; |
96 | 0 | last = NULL; |
97 | 0 | n = (nxt_uint_t) -1; |
98 | |
|
99 | 0 | for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) { |
100 | |
|
101 | 0 | nxt_prefetch(b->next); |
102 | |
|
103 | 0 | if (nxt_buf_is_file(b)) { |
104 | 0 | break; |
105 | 0 | } |
106 | | |
107 | 0 | if (nxt_buf_is_mem(b)) { |
108 | |
|
109 | 0 | size = b->mem.free - b->mem.pos; |
110 | |
|
111 | 0 | if (size != 0) { |
112 | |
|
113 | 0 | if (total + size > sb->limit) { |
114 | 0 | size = sb->limit - total; |
115 | |
|
116 | 0 | sb->limit_reached = 1; |
117 | |
|
118 | 0 | if (nxt_slow_path(size == 0)) { |
119 | 0 | break; |
120 | 0 | } |
121 | 0 | } |
122 | | |
123 | 0 | if (b->mem.pos != last) { |
124 | |
|
125 | 0 | if (++n >= sb->nmax) { |
126 | 0 | sb->nmax_reached = 1; |
127 | |
|
128 | 0 | goto done; |
129 | 0 | } |
130 | | |
131 | 0 | sb->iobuf[n].iov_base = b->mem.pos; |
132 | 0 | sb->iobuf[n].iov_len = size; |
133 | |
|
134 | 0 | } else { |
135 | 0 | sb->iobuf[n].iov_len += size; |
136 | 0 | } |
137 | | |
138 | 0 | nxt_debug(task, "sendbuf: %ui, %p, %uz", |
139 | 0 | n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len); |
140 | |
|
141 | 0 | total += size; |
142 | 0 | last = b->mem.pos + size; |
143 | 0 | } |
144 | |
|
145 | 0 | } else { |
146 | 0 | sb->sync = 1; |
147 | 0 | sb->last |= nxt_buf_is_last(b); |
148 | 0 | } |
149 | 0 | } |
150 | | |
151 | 0 | n++; |
152 | |
|
153 | 0 | done: |
154 | |
|
155 | 0 | sb->buf = b; |
156 | 0 | sb->size = total; |
157 | 0 | sb->niov = n; |
158 | |
|
159 | 0 | return n; |
160 | 0 | } |
161 | | |
162 | | |
163 | | size_t |
164 | | nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb) |
165 | 0 | { |
166 | 0 | size_t file_start, total; |
167 | 0 | nxt_fd_t fd; |
168 | 0 | nxt_off_t size, last; |
169 | 0 | nxt_buf_t *b; |
170 | |
|
171 | 0 | b = sb->buf; |
172 | 0 | fd = b->file->fd; |
173 | |
|
174 | 0 | total = sb->size; |
175 | |
|
176 | 0 | for ( ;; ) { |
177 | |
|
178 | 0 | nxt_prefetch(b->next); |
179 | |
|
180 | 0 | size = b->file_end - b->file_pos; |
181 | |
|
182 | 0 | if (total + size >= sb->limit) { |
183 | 0 | total = sb->limit; |
184 | 0 | break; |
185 | 0 | } |
186 | | |
187 | 0 | total += size; |
188 | 0 | last = b->file_pos + size; |
189 | |
|
190 | 0 | b = b->next; |
191 | |
|
192 | 0 | if (b == NULL || !nxt_buf_is_file(b)) { |
193 | 0 | break; |
194 | 0 | } |
195 | | |
196 | 0 | if (b->file_pos != last || b->file->fd != fd) { |
197 | 0 | break; |
198 | 0 | } |
199 | 0 | } |
200 | |
|
201 | 0 | sb->buf = b; |
202 | |
|
203 | 0 | file_start = sb->size; |
204 | 0 | sb->size = total; |
205 | |
|
206 | 0 | return total - file_start; |
207 | 0 | } |
208 | | |
209 | | |
210 | | ssize_t |
211 | | nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b, |
212 | | size_t limit) |
213 | 0 | { |
214 | 0 | size_t size, bsize, copied; |
215 | 0 | ssize_t n; |
216 | 0 | nxt_bool_t flush; |
217 | |
|
218 | 0 | size = nxt_buf_mem_used_size(&b->mem); |
219 | 0 | bsize = nxt_buf_mem_size(bm); |
220 | |
|
221 | 0 | if (bsize != 0) { |
222 | |
|
223 | 0 | if (size > bsize && bm->pos == bm->free) { |
224 | | /* |
225 | | * A data buffer size is larger than the internal |
226 | | * buffer size and the internal buffer is empty. |
227 | | */ |
228 | 0 | goto no_buffer; |
229 | 0 | } |
230 | | |
231 | 0 | if (bm->pos == NULL) { |
232 | 0 | bm->pos = nxt_malloc(bsize); |
233 | 0 | if (nxt_slow_path(bm->pos == NULL)) { |
234 | 0 | return NXT_ERROR; |
235 | 0 | } |
236 | | |
237 | 0 | bm->start = bm->pos; |
238 | 0 | bm->free = bm->pos; |
239 | 0 | bm->end += (uintptr_t) bm->pos; |
240 | 0 | } |
241 | | |
242 | 0 | copied = 0; |
243 | |
|
244 | 0 | flush = nxt_sendbuf_copy(bm, b, &copied); |
245 | |
|
246 | 0 | nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush); |
247 | |
|
248 | 0 | if (flush == 0) { |
249 | 0 | return copied; |
250 | 0 | } |
251 | | |
252 | 0 | size = nxt_buf_mem_used_size(bm); |
253 | |
|
254 | 0 | if (size == 0 && nxt_buf_is_sync(b)) { |
255 | 0 | goto done; |
256 | 0 | } |
257 | | |
258 | 0 | n = c->io->send(c, bm->pos, nxt_min(size, limit)); |
259 | |
|
260 | 0 | nxt_log_debug(c->socket.log, "sendbuf sent:%z", n); |
261 | |
|
262 | 0 | if (n > 0) { |
263 | 0 | bm->pos += n; |
264 | |
|
265 | 0 | if (bm->pos == bm->free) { |
266 | 0 | bm->pos = bm->start; |
267 | 0 | bm->free = bm->start; |
268 | 0 | } |
269 | |
|
270 | 0 | n = 0; |
271 | 0 | } |
272 | |
|
273 | 0 | return (copied != 0) ? (ssize_t) copied : n; |
274 | 0 | } |
275 | | |
276 | | /* No internal buffering. */ |
277 | | |
278 | 0 | if (size == 0 && nxt_buf_is_sync(b)) { |
279 | 0 | goto done; |
280 | 0 | } |
281 | | |
282 | 0 | no_buffer: |
283 | |
|
284 | 0 | return c->io->send(c, b->mem.pos, nxt_min(size, limit)); |
285 | | |
286 | 0 | done: |
287 | |
|
288 | 0 | nxt_log_debug(c->socket.log, "sendbuf done"); |
289 | |
|
290 | 0 | return 0; |
291 | 0 | } |
292 | | |
293 | | |
294 | | static nxt_bool_t |
295 | | nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied) |
296 | 0 | { |
297 | 0 | size_t size, bsize; |
298 | 0 | nxt_bool_t flush; |
299 | |
|
300 | 0 | flush = 0; |
301 | |
|
302 | 0 | do { |
303 | 0 | nxt_prefetch(b->next); |
304 | |
|
305 | 0 | if (nxt_buf_is_mem(b)) { |
306 | 0 | bsize = bm->end - bm->free; |
307 | 0 | size = b->mem.free - b->mem.pos; |
308 | 0 | size = nxt_min(size, bsize); |
309 | |
|
310 | 0 | nxt_memcpy(bm->free, b->mem.pos, size); |
311 | |
|
312 | 0 | *copied += size; |
313 | 0 | bm->free += size; |
314 | |
|
315 | 0 | if (bm->free == bm->end) { |
316 | 0 | return 1; |
317 | 0 | } |
318 | 0 | } |
319 | | |
320 | 0 | flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b); |
321 | |
|
322 | 0 | b = b->next; |
323 | |
|
324 | 0 | } while (b != NULL); |
325 | | |
326 | 0 | return flush; |
327 | 0 | } |
328 | | |
329 | | |
330 | | nxt_buf_t * |
331 | | nxt_sendbuf_update(nxt_buf_t *b, size_t sent) |
332 | 0 | { |
333 | 0 | size_t size; |
334 | |
|
335 | 0 | while (b != NULL) { |
336 | |
|
337 | 0 | nxt_prefetch(b->next); |
338 | |
|
339 | 0 | if (!nxt_buf_is_sync(b)) { |
340 | |
|
341 | 0 | size = nxt_buf_used_size(b); |
342 | |
|
343 | 0 | if (size != 0) { |
344 | |
|
345 | 0 | if (sent == 0) { |
346 | 0 | break; |
347 | 0 | } |
348 | | |
349 | 0 | if (sent < size) { |
350 | |
|
351 | 0 | if (nxt_buf_is_mem(b)) { |
352 | 0 | b->mem.pos += sent; |
353 | 0 | } |
354 | |
|
355 | 0 | if (nxt_buf_is_file(b)) { |
356 | 0 | b->file_pos += sent; |
357 | 0 | } |
358 | |
|
359 | 0 | break; |
360 | 0 | } |
361 | | |
362 | | /* b->mem.free is NULL in file-only buffer. */ |
363 | 0 | b->mem.pos = b->mem.free; |
364 | |
|
365 | 0 | if (nxt_buf_is_file(b)) { |
366 | 0 | b->file_pos = b->file_end; |
367 | 0 | } |
368 | |
|
369 | 0 | sent -= size; |
370 | 0 | } |
371 | 0 | } |
372 | | |
373 | 0 | b = b->next; |
374 | 0 | } |
375 | |
|
376 | 0 | return b; |
377 | 0 | } |
378 | | |
379 | | |
380 | | nxt_buf_t * |
381 | | nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) |
382 | 0 | { |
383 | 0 | while (b != NULL) { |
384 | |
|
385 | 0 | if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) { |
386 | 0 | break; |
387 | 0 | } |
388 | | |
389 | 0 | b = nxt_sendbuf_coalesce_completion(task, wq, b); |
390 | 0 | } |
391 | |
|
392 | 0 | return b; |
393 | 0 | } |
394 | | |
395 | | |
396 | | void |
397 | | nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) |
398 | 0 | { |
399 | 0 | while (b != NULL) { |
400 | 0 | b = nxt_sendbuf_coalesce_completion(task, wq, b); |
401 | 0 | } |
402 | 0 | } |
403 | | |
404 | | |
405 | | static nxt_buf_t * |
406 | | nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq, |
407 | | nxt_buf_t *start) |
408 | 0 | { |
409 | 0 | nxt_buf_t *b, *next, **last, *rest, **last_rest; |
410 | 0 | nxt_work_handler_t handler; |
411 | |
|
412 | 0 | rest = NULL; |
413 | 0 | last_rest = &rest; |
414 | 0 | last = &start->next; |
415 | 0 | b = start; |
416 | 0 | handler = b->completion_handler; |
417 | |
|
418 | 0 | for ( ;; ) { |
419 | 0 | next = b->next; |
420 | 0 | if (next == NULL) { |
421 | 0 | break; |
422 | 0 | } |
423 | | |
424 | 0 | b->next = NULL; |
425 | 0 | b = next; |
426 | |
|
427 | 0 | if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) { |
428 | 0 | *last_rest = b; |
429 | 0 | break; |
430 | 0 | } |
431 | | |
432 | 0 | if (handler == b->completion_handler) { |
433 | 0 | *last = b; |
434 | 0 | last = &b->next; |
435 | |
|
436 | 0 | } else { |
437 | 0 | *last_rest = b; |
438 | 0 | last_rest = &b->next; |
439 | 0 | } |
440 | 0 | } |
441 | |
|
442 | 0 | nxt_work_queue_add(wq, handler, task, start, start->parent); |
443 | |
|
444 | 0 | return rest; |
445 | 0 | } |