Coverage Report

Created: 2025-08-28 06:29

/src/frr/lib/workqueue.c
Line
Count
Source (jump to first uncovered line)
1
// SPDX-License-Identifier: GPL-2.0-or-later
2
/*
3
 * Quagga Work Queue Support.
4
 *
5
 * Copyright (C) 2005 Sun Microsystems, Inc.
6
 */
7
8
#include <zebra.h>
9
#include "frrevent.h"
10
#include "memory.h"
11
#include "workqueue.h"
12
#include "linklist.h"
13
#include "command.h"
14
#include "log.h"
15
16
DEFINE_MTYPE(LIB, WORK_QUEUE, "Work queue");
17
DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_ITEM, "Work queue item");
18
DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_NAME, "Work queue name string");
19
20
/* master list of work_queues */
21
static struct list _work_queues;
22
/* pointer primarily to avoid an otherwise harmless warning on
23
 * ALL_LIST_ELEMENTS_RO
24
 */
25
static struct list *work_queues = &_work_queues;
26
27
0
#define WORK_QUEUE_MIN_GRANULARITY 1
28
29
static struct work_queue_item *work_queue_item_new(struct work_queue *wq)
30
0
{
31
0
  struct work_queue_item *item;
32
0
  assert(wq);
33
34
0
  item = XCALLOC(MTYPE_WORK_QUEUE_ITEM, sizeof(struct work_queue_item));
35
36
0
  return item;
37
0
}
38
39
static void work_queue_item_free(struct work_queue_item *item)
40
0
{
41
0
  XFREE(MTYPE_WORK_QUEUE_ITEM, item);
42
0
  return;
43
0
}
44
45
static void work_queue_item_remove(struct work_queue *wq,
46
           struct work_queue_item *item)
47
0
{
48
0
  assert(item && item->data);
49
50
  /* call private data deletion callback if needed */
51
0
  if (wq->spec.del_item_data)
52
0
    wq->spec.del_item_data(wq, item->data);
53
54
0
  work_queue_item_dequeue(wq, item);
55
56
0
  work_queue_item_free(item);
57
58
0
  return;
59
0
}
60
61
/* create new work queue */
62
struct work_queue *work_queue_new(struct event_loop *m, const char *queue_name)
63
0
{
64
0
  struct work_queue *new;
65
66
0
  new = XCALLOC(MTYPE_WORK_QUEUE, sizeof(struct work_queue));
67
68
0
  new->name = XSTRDUP(MTYPE_WORK_QUEUE_NAME, queue_name);
69
0
  new->master = m;
70
0
  SET_FLAG(new->flags, WQ_UNPLUGGED);
71
72
0
  STAILQ_INIT(&new->items);
73
74
0
  listnode_add(work_queues, new);
75
76
0
  new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
77
78
  /* Default values, can be overridden by caller */
79
0
  new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
80
0
  new->spec.yield = EVENT_YIELD_TIME_SLOT;
81
0
  new->spec.retry = WORK_QUEUE_DEFAULT_RETRY;
82
83
0
  return new;
84
0
}
85
86
void work_queue_free_and_null(struct work_queue **wqp)
87
0
{
88
0
  struct work_queue *wq = *wqp;
89
90
0
  EVENT_OFF(wq->thread);
91
92
0
  while (!work_queue_empty(wq)) {
93
0
    struct work_queue_item *item = work_queue_last_item(wq);
94
95
0
    work_queue_item_remove(wq, item);
96
0
  }
97
98
0
  listnode_delete(work_queues, wq);
99
100
0
  XFREE(MTYPE_WORK_QUEUE_NAME, wq->name);
101
0
  XFREE(MTYPE_WORK_QUEUE, wq);
102
103
0
  *wqp = NULL;
104
0
}
105
106
bool work_queue_is_scheduled(struct work_queue *wq)
107
0
{
108
0
  return event_is_scheduled(wq->thread);
109
0
}
110
111
static int work_queue_schedule(struct work_queue *wq, unsigned int delay)
112
0
{
113
  /* if appropriate, schedule work queue thread */
114
0
  if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) &&
115
0
      !event_is_scheduled(wq->thread) && !work_queue_empty(wq)) {
116
    /* Schedule timer if there's a delay, otherwise just schedule
117
     * as an 'event'
118
     */
119
0
    if (delay > 0) {
120
0
      event_add_timer_msec(wq->master, work_queue_run, wq,
121
0
               delay, &wq->thread);
122
0
      event_ignore_late_timer(wq->thread);
123
0
    } else
124
0
      event_add_event(wq->master, work_queue_run, wq, 0,
125
0
          &wq->thread);
126
127
    /* set thread yield time, if needed */
128
0
    if (event_is_scheduled(wq->thread) &&
129
0
        wq->spec.yield != EVENT_YIELD_TIME_SLOT)
130
0
      event_set_yield_time(wq->thread, wq->spec.yield);
131
0
    return 1;
132
0
  } else
133
0
    return 0;
134
0
}
135
136
void work_queue_add(struct work_queue *wq, void *data)
137
0
{
138
0
  struct work_queue_item *item;
139
140
0
  assert(wq);
141
142
0
  item = work_queue_item_new(wq);
143
144
0
  item->data = data;
145
0
  work_queue_item_enqueue(wq, item);
146
147
0
  work_queue_schedule(wq, wq->spec.hold);
148
149
0
  return;
150
0
}
151
152
static void work_queue_item_requeue(struct work_queue *wq,
153
            struct work_queue_item *item)
154
0
{
155
0
  work_queue_item_dequeue(wq, item);
156
157
  /* attach to end of list */
158
0
  work_queue_item_enqueue(wq, item);
159
0
}
160
161
DEFUN (show_work_queues,
162
       show_work_queues_cmd,
163
       "show work-queues",
164
       SHOW_STR
165
       "Work Queue information\n")
166
0
{
167
0
  struct listnode *node;
168
0
  struct work_queue *wq;
169
170
0
  vty_out(vty, "%c %8s %5s %8s %8s %21s\n", ' ', "List", "(ms) ",
171
0
    "Q. Runs", "Yields", "Cycle Counts   ");
172
0
  vty_out(vty, "%c %8s %5s %8s %8s %7s %6s %8s %6s %s\n", 'P', "Items",
173
0
    "Hold", "Total", "Total", "Best", "Gran.", "Total", "Avg.",
174
0
    "Name");
175
176
0
  for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) {
177
0
    vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n",
178
0
      (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
179
0
      work_queue_item_count(wq), wq->spec.hold, wq->runs,
180
0
      wq->yields, wq->cycles.best, wq->cycles.granularity,
181
0
      wq->cycles.total,
182
0
      (wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs)
183
0
           : 0,
184
0
      wq->name);
185
0
  }
186
187
0
  return CMD_SUCCESS;
188
0
}
189
190
void workqueue_cmd_init(void)
191
2
{
192
2
  install_element(VIEW_NODE, &show_work_queues_cmd);
193
2
}
194
195
/* 'plug' a queue: Stop it from being scheduled,
196
 * ie: prevent the queue from draining.
197
 */
198
void work_queue_plug(struct work_queue *wq)
199
0
{
200
0
  EVENT_OFF(wq->thread);
201
202
0
  UNSET_FLAG(wq->flags, WQ_UNPLUGGED);
203
0
}
204
205
/* unplug queue, schedule it again, if appropriate
206
 * Ie: Allow the queue to be drained again
207
 */
208
void work_queue_unplug(struct work_queue *wq)
209
0
{
210
0
  SET_FLAG(wq->flags, WQ_UNPLUGGED);
211
212
  /* if thread isnt already waiting, add one */
213
0
  work_queue_schedule(wq, wq->spec.hold);
214
0
}
215
216
/* timer thread to process a work queue
217
 * will reschedule itself if required,
218
 * otherwise work_queue_item_add
219
 */
220
void work_queue_run(struct event *thread)
221
0
{
222
0
  struct work_queue *wq;
223
0
  struct work_queue_item *item, *titem;
224
0
  wq_item_status ret = WQ_SUCCESS;
225
0
  unsigned int cycles = 0;
226
0
  char yielded = 0;
227
228
0
  wq = EVENT_ARG(thread);
229
230
0
  assert(wq);
231
232
  /* calculate cycle granularity:
233
   * list iteration == 1 run
234
   * listnode processing == 1 cycle
235
   * granularity == # cycles between checks whether we should yield.
236
   *
237
   * granularity should be > 0, and can increase slowly after each run to
238
   * provide some hysteris, but not past cycles.best or 2*cycles.
239
   *
240
   * Best: starts low, can only increase
241
   *
242
   * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
243
   *              if we run to end of time slot, can increase otherwise
244
   *              by a small factor.
245
   *
246
   * We could use just the average and save some work, however we want to
247
   * be
248
   * able to adjust quickly to CPU pressure. Average wont shift much if
249
   * daemon has been running a long time.
250
   */
251
0
  if (wq->cycles.granularity == 0)
252
0
    wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
253
254
0
  STAILQ_FOREACH_SAFE (item, &wq->items, wq, titem) {
255
0
    assert(item->data);
256
257
    /* dont run items which are past their allowed retries */
258
0
    if (item->ran > wq->spec.max_retries) {
259
0
      work_queue_item_remove(wq, item);
260
0
      continue;
261
0
    }
262
263
    /* run and take care of items that want to be retried
264
     * immediately */
265
0
    do {
266
0
      ret = wq->spec.workfunc(wq, item->data);
267
0
      item->ran++;
268
0
    } while ((ret == WQ_RETRY_NOW)
269
0
       && (item->ran < wq->spec.max_retries));
270
271
0
    switch (ret) {
272
0
    case WQ_QUEUE_BLOCKED: {
273
      /* decrement item->ran again, cause this isn't an item
274
       * specific error, and fall through to WQ_RETRY_LATER
275
       */
276
0
      item->ran--;
277
0
    }
278
0
    case WQ_RETRY_LATER: {
279
0
      goto stats;
280
0
    }
281
0
    case WQ_REQUEUE: {
282
0
      item->ran--;
283
0
      work_queue_item_requeue(wq, item);
284
      /* If a single node is being used with a meta-queue
285
       * (e.g., zebra),
286
       * update the next node as we don't want to exit the
287
       * thread and
288
       * reschedule it after every node. By definition,
289
       * WQ_REQUEUE is
290
       * meant to continue the processing; the yield logic
291
       * will kick in
292
       * to terminate the thread when time has exceeded.
293
       */
294
0
      if (titem == NULL)
295
0
        titem = item;
296
0
      break;
297
0
    }
298
0
    case WQ_RETRY_NOW:
299
    /* a RETRY_NOW that gets here has exceeded max_tries, same as
300
     * ERROR */
301
    /* fallthru */
302
0
    case WQ_SUCCESS:
303
0
    default: {
304
0
      work_queue_item_remove(wq, item);
305
0
      break;
306
0
    }
307
0
    }
308
309
    /* completed cycle */
310
0
    cycles++;
311
312
    /* test if we should yield */
313
0
    if (!(cycles % wq->cycles.granularity) &&
314
0
        event_should_yield(thread)) {
315
0
      yielded = 1;
316
0
      goto stats;
317
0
    }
318
0
  }
319
320
0
stats:
321
322
0
#define WQ_HYSTERESIS_FACTOR 4
323
324
  /* we yielded, check whether granularity should be reduced */
325
0
  if (yielded && (cycles < wq->cycles.granularity)) {
326
0
    wq->cycles.granularity =
327
0
      ((cycles > 0) ? cycles : WORK_QUEUE_MIN_GRANULARITY);
328
0
  }
329
  /* otherwise, should granularity increase? */
330
0
  else if (cycles >= (wq->cycles.granularity)) {
331
0
    if (cycles > wq->cycles.best)
332
0
      wq->cycles.best = cycles;
333
334
    /* along with yielded check, provides hysteresis for granularity
335
     */
336
0
    if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
337
0
            * WQ_HYSTERESIS_FACTOR))
338
0
      wq->cycles.granularity *=
339
0
        WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
340
0
    else if (cycles
341
0
       > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
342
0
      wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
343
0
  }
344
0
#undef WQ_HYSTERIS_FACTOR
345
346
0
  wq->runs++;
347
0
  wq->cycles.total += cycles;
348
0
  if (yielded)
349
0
    wq->yields++;
350
351
  /* Is the queue done yet? If it is, call the completion callback. */
352
0
  if (!work_queue_empty(wq)) {
353
0
    if (ret == WQ_RETRY_LATER ||
354
0
        ret == WQ_QUEUE_BLOCKED)
355
0
      work_queue_schedule(wq, wq->spec.retry);
356
0
    else
357
0
      work_queue_schedule(wq, 0);
358
359
0
  } else if (wq->spec.completion_func)
360
0
    wq->spec.completion_func(wq);
361
0
}