Coverage Report

Created: 2026-06-06 06:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/frr/lib/pullwr.c
Line
Count
Source
1
// SPDX-License-Identifier: GPL-2.0-or-later
2
/*
3
 * Pull-driven write event handler
4
 * Copyright (C) 2019  David Lamparter
5
 */
6
7
#include "zebra.h"
8
9
#include "pullwr.h"
10
#include "memory.h"
11
#include "monotime.h"
12
13
/* defaults */
14
0
#define PULLWR_THRESH 16384  /* size at which we start to call write() */
15
0
#define PULLWR_MAXSPIN  2500  /* max µs to spend grabbing more data */
16
17
struct pullwr {
18
  int fd;
19
  struct event_loop *tm;
20
  /* writer == NULL <=> we're idle */
21
  struct event *writer;
22
23
  void *arg;
24
  void (*fill)(void *, struct pullwr *);
25
  void (*err)(void *, struct pullwr *, bool);
26
27
  /* ring buffer (although it's "un-ringed" on resizing, it WILL wrap
28
   * around if data is trickling in while keeping it at a constant size)
29
   */
30
  size_t bufsz, valid, pos;
31
  uint64_t total_written;
32
  char *buffer;
33
34
  size_t thresh;    /* PULLWR_THRESH */
35
  int64_t maxspin;  /* PULLWR_MAXSPIN */
36
};
37
38
6
DEFINE_MTYPE_STATIC(LIB, PULLWR_HEAD, "pull-driven write controller");
39
6
DEFINE_MTYPE_STATIC(LIB, PULLWR_BUF,  "pull-driven write buffer");
40
6
41
6
static void pullwr_run(struct event *t);
42
6
43
6
struct pullwr *_pullwr_new(struct event_loop *tm, int fd, void *arg,
44
6
         void (*fill)(void *, struct pullwr *),
45
6
         void (*err)(void *, struct pullwr *, bool))
46
6
{
47
0
  struct pullwr *pullwr;
48
49
0
  pullwr = XCALLOC(MTYPE_PULLWR_HEAD, sizeof(*pullwr));
50
0
  pullwr->fd = fd;
51
0
  pullwr->tm = tm;
52
0
  pullwr->arg = arg;
53
0
  pullwr->fill = fill;
54
0
  pullwr->err = err;
55
56
0
  pullwr->thresh = PULLWR_THRESH;
57
0
  pullwr->maxspin = PULLWR_MAXSPIN;
58
59
0
  return pullwr;
60
0
}
61
62
void pullwr_del(struct pullwr *pullwr)
63
0
{
64
0
  EVENT_OFF(pullwr->writer);
65
66
0
  XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
67
0
  XFREE(MTYPE_PULLWR_HEAD, pullwr);
68
0
}
69
70
void pullwr_cfg(struct pullwr *pullwr, int64_t max_spin_usec,
71
    size_t write_threshold)
72
0
{
73
0
  pullwr->maxspin = max_spin_usec ?: PULLWR_MAXSPIN;
74
0
  pullwr->thresh = write_threshold ?: PULLWR_THRESH;
75
0
}
76
77
void pullwr_bump(struct pullwr *pullwr)
78
0
{
79
0
  if (pullwr->writer)
80
0
    return;
81
82
0
  event_add_timer(pullwr->tm, pullwr_run, pullwr, 0, &pullwr->writer);
83
0
}
84
85
static size_t pullwr_iov(struct pullwr *pullwr, struct iovec *iov)
86
0
{
87
0
  size_t len1;
88
89
0
  if (pullwr->valid == 0)
90
0
    return 0;
91
92
0
  if (pullwr->pos + pullwr->valid <= pullwr->bufsz) {
93
0
    iov[0].iov_base = pullwr->buffer + pullwr->pos;
94
0
    iov[0].iov_len = pullwr->valid;
95
0
    return 1;
96
0
  }
97
98
0
  len1 = pullwr->bufsz - pullwr->pos;
99
100
0
  iov[0].iov_base = pullwr->buffer + pullwr->pos;
101
0
  iov[0].iov_len = len1;
102
0
  iov[1].iov_base = pullwr->buffer;
103
0
  iov[1].iov_len = pullwr->valid - len1;
104
0
  return 2;
105
0
}
106
107
static void pullwr_resize(struct pullwr *pullwr, size_t need)
108
0
{
109
0
  struct iovec iov[2];
110
0
  size_t niov, newsize;
111
0
  char *newbuf;
112
113
  /* the buffer is maintained at pullwr->thresh * 2 since we'll be
114
   * trying to fill it as long as it's anywhere below pullwr->thresh.
115
   * That means we frequently end up a little short of it and then write
116
   * something that goes over the threshold.  So, just use double.
117
   */
118
0
  if (need) {
119
    /* resize up */
120
0
    if (pullwr->bufsz - pullwr->valid >= need)
121
0
      return;
122
123
0
    newsize = MAX((pullwr->valid + need) * 2, pullwr->thresh * 2);
124
0
    newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
125
0
  } else if (!pullwr->valid) {
126
    /* resize down, buffer empty */
127
0
    newsize = 0;
128
0
    newbuf = NULL;
129
0
  } else {
130
    /* resize down */
131
0
    if (pullwr->bufsz - pullwr->valid < pullwr->thresh)
132
0
      return;
133
0
    newsize = MAX(pullwr->valid, pullwr->thresh * 2);
134
0
    newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
135
0
  }
136
137
0
  niov = pullwr_iov(pullwr, iov);
138
0
  if (niov >= 1) {
139
0
    memcpy(newbuf, iov[0].iov_base, iov[0].iov_len);
140
0
    if (niov >= 2)
141
0
      memcpy(newbuf + iov[0].iov_len,
142
0
        iov[1].iov_base, iov[1].iov_len);
143
0
  }
144
145
0
  XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
146
0
  pullwr->buffer = newbuf;
147
0
  pullwr->bufsz = newsize;
148
0
  pullwr->pos = 0;
149
0
}
150
151
void pullwr_write(struct pullwr *pullwr, const void *data, size_t len)
152
0
{
153
0
  pullwr_resize(pullwr, len);
154
155
0
  if (pullwr->pos + pullwr->valid > pullwr->bufsz) {
156
0
    size_t pos;
157
158
0
    pos = (pullwr->pos + pullwr->valid) % pullwr->bufsz;
159
0
    memcpy(pullwr->buffer + pos, data, len);
160
0
  } else {
161
0
    size_t max1, len1;
162
0
    max1 = pullwr->bufsz - (pullwr->pos + pullwr->valid);
163
0
    max1 = MIN(max1, len);
164
165
0
    memcpy(pullwr->buffer + pullwr->pos + pullwr->valid,
166
0
        data, max1);
167
0
    len1 = len - max1;
168
169
0
    if (len1)
170
0
      memcpy(pullwr->buffer, (char *)data + max1, len1);
171
172
0
  }
173
0
  pullwr->valid += len;
174
175
0
  pullwr_bump(pullwr);
176
0
}
177
178
static void pullwr_run(struct event *t)
179
0
{
180
0
  struct pullwr *pullwr = EVENT_ARG(t);
181
0
  struct iovec iov[2];
182
0
  size_t niov, lastvalid;
183
0
  ssize_t nwr;
184
0
  struct timeval t0;
185
0
  bool maxspun = false;
186
0
187
0
  monotime(&t0);
188
0
189
0
  do {
190
0
    lastvalid = pullwr->valid - 1;
191
0
    while (pullwr->valid < pullwr->thresh
192
0
        && pullwr->valid != lastvalid
193
0
        && !maxspun) {
194
0
      lastvalid = pullwr->valid;
195
0
      pullwr->fill(pullwr->arg, pullwr);
196
0
197
0
      /* check after doing at least one fill() call so we
198
0
       * don't spin without making progress on slow boxes
199
0
       */
200
0
      if (!maxspun && monotime_since(&t0, NULL)
201
0
          >= pullwr->maxspin)
202
0
        maxspun = true;
203
0
    }
204
0
205
0
    if (pullwr->valid == 0) {
206
0
      /* we made a fill() call above that didn't feed any
207
0
       * data in, and we have nothing more queued, so we go
208
0
       * into idle, i.e. no calling event_add_write()
209
0
       */
210
0
      pullwr_resize(pullwr, 0);
211
0
      return;
212
0
    }
213
0
214
0
    niov = pullwr_iov(pullwr, iov);
215
0
    assert(niov);
216
0
217
0
    nwr = writev(pullwr->fd, iov, niov);
218
0
    if (nwr < 0) {
219
0
      if (errno == EAGAIN || errno == EWOULDBLOCK)
220
0
        break;
221
0
      pullwr->err(pullwr->arg, pullwr, false);
222
0
      return;
223
0
    }
224
0
225
0
    if (nwr == 0) {
226
0
      pullwr->err(pullwr->arg, pullwr, true);
227
0
      return;
228
0
    }
229
0
230
0
    pullwr->total_written += nwr;
231
0
    pullwr->valid -= nwr;
232
0
    pullwr->pos += nwr;
233
0
    pullwr->pos %= pullwr->bufsz;
234
0
  } while (pullwr->valid == 0 && !maxspun);
235
0
  /* pullwr->valid != 0 implies we did an incomplete write, i.e. socket
236
0
   * is full and we go wait until it's available for writing again.
237
0
   */
238
0
239
0
  event_add_write(pullwr->tm, pullwr_run, pullwr, pullwr->fd,
240
0
      &pullwr->writer);
241
0
242
0
  /* if we hit the time limit, just keep the buffer, we'll probably need
243
0
   * it anyway & another run is already coming up.
244
0
   */
245
0
  if (!maxspun)
246
0
    pullwr_resize(pullwr, 0);
247
0
}
248
249
void pullwr_stats(struct pullwr *pullwr, uint64_t *total_written,
250
      size_t *pending, size_t *kernel_pending)
251
0
{
252
0
  int tmp;
253
254
0
  *total_written = pullwr->total_written;
255
0
  *pending = pullwr->valid;
256
257
0
  if (ioctl(pullwr->fd, TIOCOUTQ, &tmp) != 0)
258
0
    tmp = 0;
259
0
  *kernel_pending = tmp;
260
0
}