Coverage Report

Created: 2025-07-12 06:16

/src/unit/src/nxt_sendbuf.c
Line
Count
Source (jump to first uncovered line)
1
2
/*
3
 * Copyright (C) Igor Sysoev
4
 * Copyright (C) NGINX, Inc.
5
 */
6
7
#include <nxt_main.h>
8
9
10
static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
11
    size_t *copied);
12
static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task,
13
    nxt_work_queue_t *wq, nxt_buf_t *start);
14
15
16
nxt_uint_t
17
nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb,
18
    struct iovec *iov, nxt_uint_t niov_max)
19
0
{
20
0
    u_char      *last;
21
0
    size_t      size, total;
22
0
    nxt_buf_t   *b;
23
0
    nxt_uint_t  n;
24
25
0
    total = sb->size;
26
0
    last = NULL;
27
0
    n = (nxt_uint_t) -1;
28
29
0
    for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
30
31
0
        nxt_prefetch(b->next);
32
33
0
        if (nxt_buf_is_file(b)) {
34
0
            break;
35
0
        }
36
37
0
        if (nxt_buf_is_mem(b)) {
38
39
0
            size = b->mem.free - b->mem.pos;
40
41
0
            if (size != 0) {
42
43
0
                if (total + size > sb->limit) {
44
0
                    size = sb->limit - total;
45
46
0
                    if (size == 0) {
47
0
                        break;
48
0
                    }
49
0
                }
50
51
0
                if (b->mem.pos != last) {
52
53
0
                    if (++n >= niov_max) {
54
0
                        goto done;
55
0
                    }
56
57
0
                    iov[n].iov_base = b->mem.pos;
58
0
                    iov[n].iov_len = size;
59
60
0
                } else {
61
0
                    iov[n].iov_len += size;
62
0
                }
63
64
0
                nxt_debug(task, "sendbuf: %ui, %p, %uz",
65
0
                          n, iov[n].iov_base, iov[n].iov_len);
66
67
0
                total += size;
68
0
                last = b->mem.pos + size;
69
0
            }
70
71
0
        } else {
72
0
            sb->sync = 1;
73
0
            sb->last |= nxt_buf_is_last(b);
74
0
        }
75
0
    }
76
77
0
    n++;
78
79
0
done:
80
81
0
    sb->buf = b;
82
83
0
    return n;
84
0
}
85
86
87
nxt_uint_t
88
nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
89
0
{
90
0
    u_char      *last;
91
0
    size_t      size, total;
92
0
    nxt_buf_t   *b;
93
0
    nxt_uint_t  n;
94
95
0
    total = sb->size;
96
0
    last = NULL;
97
0
    n = (nxt_uint_t) -1;
98
99
0
    for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
100
101
0
        nxt_prefetch(b->next);
102
103
0
        if (nxt_buf_is_file(b)) {
104
0
            break;
105
0
        }
106
107
0
        if (nxt_buf_is_mem(b)) {
108
109
0
            size = b->mem.free - b->mem.pos;
110
111
0
            if (size != 0) {
112
113
0
                if (total + size > sb->limit) {
114
0
                    size = sb->limit - total;
115
116
0
                    sb->limit_reached = 1;
117
118
0
                    if (nxt_slow_path(size == 0)) {
119
0
                        break;
120
0
                    }
121
0
                }
122
123
0
                if (b->mem.pos != last) {
124
125
0
                    if (++n >= sb->nmax) {
126
0
                        sb->nmax_reached = 1;
127
128
0
                        goto done;
129
0
                    }
130
131
0
                    sb->iobuf[n].iov_base = b->mem.pos;
132
0
                    sb->iobuf[n].iov_len = size;
133
134
0
                } else {
135
0
                    sb->iobuf[n].iov_len += size;
136
0
                }
137
138
0
                nxt_debug(task, "sendbuf: %ui, %p, %uz",
139
0
                          n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len);
140
141
0
                total += size;
142
0
                last = b->mem.pos + size;
143
0
            }
144
145
0
        } else {
146
0
            sb->sync = 1;
147
0
            sb->last |= nxt_buf_is_last(b);
148
0
        }
149
0
    }
150
151
0
    n++;
152
153
0
done:
154
155
0
    sb->buf = b;
156
0
    sb->size = total;
157
0
    sb->niov = n;
158
159
0
    return n;
160
0
}
161
162
163
size_t
164
nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
165
0
{
166
0
    size_t     file_start, total;
167
0
    nxt_fd_t   fd;
168
0
    nxt_off_t  size, last;
169
0
    nxt_buf_t  *b;
170
171
0
    b = sb->buf;
172
0
    fd = b->file->fd;
173
174
0
    total = sb->size;
175
176
0
    for ( ;; ) {
177
178
0
        nxt_prefetch(b->next);
179
180
0
        size = b->file_end - b->file_pos;
181
182
0
        if (total + size >= sb->limit) {
183
0
            total = sb->limit;
184
0
            break;
185
0
        }
186
187
0
        total += size;
188
0
        last = b->file_pos + size;
189
190
0
        b = b->next;
191
192
0
        if (b == NULL || !nxt_buf_is_file(b)) {
193
0
            break;
194
0
        }
195
196
0
        if (b->file_pos != last || b->file->fd != fd) {
197
0
            break;
198
0
        }
199
0
    }
200
201
0
    sb->buf = b;
202
203
0
    file_start = sb->size;
204
0
    sb->size = total;
205
206
0
    return total - file_start;
207
0
}
208
209
210
ssize_t
211
nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b,
212
    size_t limit)
213
0
{
214
0
    size_t      size, bsize, copied;
215
0
    ssize_t     n;
216
0
    nxt_bool_t  flush;
217
218
0
    size = nxt_buf_mem_used_size(&b->mem);
219
0
    bsize = nxt_buf_mem_size(bm);
220
221
0
    if (bsize != 0) {
222
223
0
        if (size > bsize && bm->pos == bm->free) {
224
            /*
225
             * A data buffer size is larger than the internal
226
             * buffer size and the internal buffer is empty.
227
             */
228
0
            goto no_buffer;
229
0
        }
230
231
0
        if (bm->pos == NULL) {
232
0
            bm->pos = nxt_malloc(bsize);
233
0
            if (nxt_slow_path(bm->pos == NULL)) {
234
0
                return NXT_ERROR;
235
0
            }
236
237
0
            bm->start = bm->pos;
238
0
            bm->free = bm->pos;
239
0
            bm->end += (uintptr_t) bm->pos;
240
0
        }
241
242
0
        copied = 0;
243
244
0
        flush = nxt_sendbuf_copy(bm, b, &copied);
245
246
0
        nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush);
247
248
0
        if (flush == 0) {
249
0
            return copied;
250
0
        }
251
252
0
        size = nxt_buf_mem_used_size(bm);
253
254
0
        if (size == 0 && nxt_buf_is_sync(b)) {
255
0
            goto done;
256
0
        }
257
258
0
        n = c->io->send(c, bm->pos, nxt_min(size, limit));
259
260
0
        nxt_log_debug(c->socket.log, "sendbuf sent:%z", n);
261
262
0
        if (n > 0) {
263
0
            bm->pos += n;
264
265
0
            if (bm->pos == bm->free) {
266
0
                bm->pos = bm->start;
267
0
                bm->free = bm->start;
268
0
            }
269
270
0
            n = 0;
271
0
        }
272
273
0
        return (copied != 0) ? (ssize_t) copied : n;
274
0
    }
275
276
    /* No internal buffering. */
277
278
0
    if (size == 0 && nxt_buf_is_sync(b)) {
279
0
        goto done;
280
0
    }
281
282
0
no_buffer:
283
284
0
    return c->io->send(c, b->mem.pos, nxt_min(size, limit));
285
286
0
done:
287
288
0
    nxt_log_debug(c->socket.log, "sendbuf done");
289
290
0
    return 0;
291
0
}
292
293
294
static nxt_bool_t
295
nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied)
296
0
{
297
0
    size_t      size, bsize;
298
0
    nxt_bool_t  flush;
299
300
0
    flush = 0;
301
302
0
    do {
303
0
        nxt_prefetch(b->next);
304
305
0
        if (nxt_buf_is_mem(b)) {
306
0
            bsize = bm->end - bm->free;
307
0
            size = b->mem.free - b->mem.pos;
308
0
            size = nxt_min(size, bsize);
309
310
0
            nxt_memcpy(bm->free, b->mem.pos, size);
311
312
0
            *copied += size;
313
0
            bm->free += size;
314
315
0
            if (bm->free == bm->end) {
316
0
                return 1;
317
0
            }
318
0
        }
319
320
0
        flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b);
321
322
0
        b = b->next;
323
324
0
    } while (b != NULL);
325
326
0
    return flush;
327
0
}
328
329
330
nxt_buf_t *
331
nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
332
0
{
333
0
    size_t  size;
334
335
0
    while (b != NULL) {
336
337
0
        nxt_prefetch(b->next);
338
339
0
        if (!nxt_buf_is_sync(b)) {
340
341
0
            size = nxt_buf_used_size(b);
342
343
0
            if (size != 0) {
344
345
0
                if (sent == 0) {
346
0
                    break;
347
0
                }
348
349
0
                if (sent < size) {
350
351
0
                    if (nxt_buf_is_mem(b)) {
352
0
                        b->mem.pos += sent;
353
0
                    }
354
355
0
                    if (nxt_buf_is_file(b)) {
356
0
                        b->file_pos += sent;
357
0
                    }
358
359
0
                    break;
360
0
                }
361
362
                /* b->mem.free is NULL in file-only buffer. */
363
0
                b->mem.pos = b->mem.free;
364
365
0
                if (nxt_buf_is_file(b)) {
366
0
                    b->file_pos = b->file_end;
367
0
                }
368
369
0
                sent -= size;
370
0
            }
371
0
        }
372
373
0
        b = b->next;
374
0
    }
375
376
0
    return b;
377
0
}
378
379
380
nxt_buf_t *
381
nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
382
0
{
383
0
    while (b != NULL) {
384
385
0
        if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
386
0
            break;
387
0
        }
388
389
0
        b = nxt_sendbuf_coalesce_completion(task, wq, b);
390
0
    }
391
392
0
    return b;
393
0
}
394
395
396
void
397
nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
398
0
{
399
0
    while (b != NULL) {
400
0
        b = nxt_sendbuf_coalesce_completion(task, wq, b);
401
0
    }
402
0
}
403
404
405
static nxt_buf_t *
406
nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq,
407
    nxt_buf_t *start)
408
0
{
409
0
    nxt_buf_t           *b, *next, **last, *rest, **last_rest;
410
0
    nxt_work_handler_t  handler;
411
412
0
    rest = NULL;
413
0
    last_rest = &rest;
414
0
    last = &start->next;
415
0
    b = start;
416
0
    handler = b->completion_handler;
417
418
0
    for ( ;; ) {
419
0
        next = b->next;
420
0
        if (next == NULL) {
421
0
            break;
422
0
        }
423
424
0
        b->next = NULL;
425
0
        b = next;
426
427
0
        if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
428
0
            *last_rest = b;
429
0
            break;
430
0
        }
431
432
0
        if (handler == b->completion_handler) {
433
0
            *last = b;
434
0
            last = &b->next;
435
436
0
        } else {
437
0
            *last_rest = b;
438
0
            last_rest = &b->next;
439
0
        }
440
0
    }
441
442
0
    nxt_work_queue_add(wq, handler, task, start, start->parent);
443
444
0
    return rest;
445
0
}