Line | Count | Source (jump to first uncovered line) |
1 | | // SPDX-License-Identifier: GPL-2.0-or-later |
2 | | /* |
3 | | * Quagga Work Queue Support. |
4 | | * |
5 | | * Copyright (C) 2005 Sun Microsystems, Inc. |
6 | | */ |
7 | | |
8 | | #include <zebra.h> |
9 | | #include "frrevent.h" |
10 | | #include "memory.h" |
11 | | #include "workqueue.h" |
12 | | #include "linklist.h" |
13 | | #include "command.h" |
14 | | #include "log.h" |
15 | | |
16 | | DEFINE_MTYPE(LIB, WORK_QUEUE, "Work queue"); |
17 | | DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_ITEM, "Work queue item"); |
18 | | DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_NAME, "Work queue name string"); |
19 | | |
20 | | /* master list of work_queues */ |
21 | | static struct list _work_queues; |
22 | | /* pointer primarily to avoid an otherwise harmless warning on |
23 | | * ALL_LIST_ELEMENTS_RO |
24 | | */ |
25 | | static struct list *work_queues = &_work_queues; |
26 | | |
27 | 0 | #define WORK_QUEUE_MIN_GRANULARITY 1 |
28 | | |
29 | | static struct work_queue_item *work_queue_item_new(struct work_queue *wq) |
30 | 0 | { |
31 | 0 | struct work_queue_item *item; |
32 | 0 | assert(wq); |
33 | | |
34 | 0 | item = XCALLOC(MTYPE_WORK_QUEUE_ITEM, sizeof(struct work_queue_item)); |
35 | |
|
36 | 0 | return item; |
37 | 0 | } |
38 | | |
39 | | static void work_queue_item_free(struct work_queue_item *item) |
40 | 0 | { |
41 | 0 | XFREE(MTYPE_WORK_QUEUE_ITEM, item); |
42 | 0 | return; |
43 | 0 | } |
44 | | |
45 | | static void work_queue_item_remove(struct work_queue *wq, |
46 | | struct work_queue_item *item) |
47 | 0 | { |
48 | 0 | assert(item && item->data); |
49 | | |
50 | | /* call private data deletion callback if needed */ |
51 | 0 | if (wq->spec.del_item_data) |
52 | 0 | wq->spec.del_item_data(wq, item->data); |
53 | |
|
54 | 0 | work_queue_item_dequeue(wq, item); |
55 | |
|
56 | 0 | work_queue_item_free(item); |
57 | |
|
58 | 0 | return; |
59 | 0 | } |
60 | | |
61 | | /* create new work queue */ |
62 | | struct work_queue *work_queue_new(struct event_loop *m, const char *queue_name) |
63 | 0 | { |
64 | 0 | struct work_queue *new; |
65 | |
|
66 | 0 | new = XCALLOC(MTYPE_WORK_QUEUE, sizeof(struct work_queue)); |
67 | |
|
68 | 0 | new->name = XSTRDUP(MTYPE_WORK_QUEUE_NAME, queue_name); |
69 | 0 | new->master = m; |
70 | 0 | SET_FLAG(new->flags, WQ_UNPLUGGED); |
71 | |
|
72 | 0 | STAILQ_INIT(&new->items); |
73 | |
|
74 | 0 | listnode_add(work_queues, new); |
75 | |
|
76 | 0 | new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; |
77 | | |
78 | | /* Default values, can be overridden by caller */ |
79 | 0 | new->spec.hold = WORK_QUEUE_DEFAULT_HOLD; |
80 | 0 | new->spec.yield = EVENT_YIELD_TIME_SLOT; |
81 | 0 | new->spec.retry = WORK_QUEUE_DEFAULT_RETRY; |
82 | |
|
83 | 0 | return new; |
84 | 0 | } |
85 | | |
86 | | void work_queue_free_and_null(struct work_queue **wqp) |
87 | 0 | { |
88 | 0 | struct work_queue *wq = *wqp; |
89 | |
|
90 | 0 | EVENT_OFF(wq->thread); |
91 | |
|
92 | 0 | while (!work_queue_empty(wq)) { |
93 | 0 | struct work_queue_item *item = work_queue_last_item(wq); |
94 | |
|
95 | 0 | work_queue_item_remove(wq, item); |
96 | 0 | } |
97 | |
|
98 | 0 | listnode_delete(work_queues, wq); |
99 | |
|
100 | 0 | XFREE(MTYPE_WORK_QUEUE_NAME, wq->name); |
101 | 0 | XFREE(MTYPE_WORK_QUEUE, wq); |
102 | |
|
103 | 0 | *wqp = NULL; |
104 | 0 | } |
105 | | |
106 | | bool work_queue_is_scheduled(struct work_queue *wq) |
107 | 0 | { |
108 | 0 | return event_is_scheduled(wq->thread); |
109 | 0 | } |
110 | | |
111 | | static int work_queue_schedule(struct work_queue *wq, unsigned int delay) |
112 | 0 | { |
113 | | /* if appropriate, schedule work queue thread */ |
114 | 0 | if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && |
115 | 0 | !event_is_scheduled(wq->thread) && !work_queue_empty(wq)) { |
116 | | /* Schedule timer if there's a delay, otherwise just schedule |
117 | | * as an 'event' |
118 | | */ |
119 | 0 | if (delay > 0) { |
120 | 0 | event_add_timer_msec(wq->master, work_queue_run, wq, |
121 | 0 | delay, &wq->thread); |
122 | 0 | event_ignore_late_timer(wq->thread); |
123 | 0 | } else |
124 | 0 | event_add_event(wq->master, work_queue_run, wq, 0, |
125 | 0 | &wq->thread); |
126 | | |
127 | | /* set thread yield time, if needed */ |
128 | 0 | if (event_is_scheduled(wq->thread) && |
129 | 0 | wq->spec.yield != EVENT_YIELD_TIME_SLOT) |
130 | 0 | event_set_yield_time(wq->thread, wq->spec.yield); |
131 | 0 | return 1; |
132 | 0 | } else |
133 | 0 | return 0; |
134 | 0 | } |
135 | | |
136 | | void work_queue_add(struct work_queue *wq, void *data) |
137 | 0 | { |
138 | 0 | struct work_queue_item *item; |
139 | |
|
140 | 0 | assert(wq); |
141 | | |
142 | 0 | item = work_queue_item_new(wq); |
143 | |
|
144 | 0 | item->data = data; |
145 | 0 | work_queue_item_enqueue(wq, item); |
146 | |
|
147 | 0 | work_queue_schedule(wq, wq->spec.hold); |
148 | |
|
149 | 0 | return; |
150 | 0 | } |
151 | | |
152 | | static void work_queue_item_requeue(struct work_queue *wq, |
153 | | struct work_queue_item *item) |
154 | 0 | { |
155 | 0 | work_queue_item_dequeue(wq, item); |
156 | | |
157 | | /* attach to end of list */ |
158 | 0 | work_queue_item_enqueue(wq, item); |
159 | 0 | } |
160 | | |
161 | | DEFUN (show_work_queues, |
162 | | show_work_queues_cmd, |
163 | | "show work-queues", |
164 | | SHOW_STR |
165 | | "Work Queue information\n") |
166 | 0 | { |
167 | 0 | struct listnode *node; |
168 | 0 | struct work_queue *wq; |
169 | |
|
170 | 0 | vty_out(vty, "%c %8s %5s %8s %8s %21s\n", ' ', "List", "(ms) ", |
171 | 0 | "Q. Runs", "Yields", "Cycle Counts "); |
172 | 0 | vty_out(vty, "%c %8s %5s %8s %8s %7s %6s %8s %6s %s\n", 'P', "Items", |
173 | 0 | "Hold", "Total", "Total", "Best", "Gran.", "Total", "Avg.", |
174 | 0 | "Name"); |
175 | |
|
176 | 0 | for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) { |
177 | 0 | vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n", |
178 | 0 | (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'), |
179 | 0 | work_queue_item_count(wq), wq->spec.hold, wq->runs, |
180 | 0 | wq->yields, wq->cycles.best, wq->cycles.granularity, |
181 | 0 | wq->cycles.total, |
182 | 0 | (wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs) |
183 | 0 | : 0, |
184 | 0 | wq->name); |
185 | 0 | } |
186 | | |
187 | 0 | return CMD_SUCCESS; |
188 | 0 | } |
189 | | |
190 | | void workqueue_cmd_init(void) |
191 | 2 | { |
192 | 2 | install_element(VIEW_NODE, &show_work_queues_cmd); |
193 | 2 | } |
194 | | |
195 | | /* 'plug' a queue: Stop it from being scheduled, |
196 | | * ie: prevent the queue from draining. |
197 | | */ |
198 | | void work_queue_plug(struct work_queue *wq) |
199 | 0 | { |
200 | 0 | EVENT_OFF(wq->thread); |
201 | |
|
202 | 0 | UNSET_FLAG(wq->flags, WQ_UNPLUGGED); |
203 | 0 | } |
204 | | |
205 | | /* unplug queue, schedule it again, if appropriate |
206 | | * Ie: Allow the queue to be drained again |
207 | | */ |
208 | | void work_queue_unplug(struct work_queue *wq) |
209 | 0 | { |
210 | 0 | SET_FLAG(wq->flags, WQ_UNPLUGGED); |
211 | | |
212 | | /* if thread isnt already waiting, add one */ |
213 | 0 | work_queue_schedule(wq, wq->spec.hold); |
214 | 0 | } |
215 | | |
216 | | /* timer thread to process a work queue |
217 | | * will reschedule itself if required, |
218 | | * otherwise work_queue_item_add |
219 | | */ |
220 | | void work_queue_run(struct event *thread) |
221 | 0 | { |
222 | 0 | struct work_queue *wq; |
223 | 0 | struct work_queue_item *item, *titem; |
224 | 0 | wq_item_status ret = WQ_SUCCESS; |
225 | 0 | unsigned int cycles = 0; |
226 | 0 | char yielded = 0; |
227 | |
|
228 | 0 | wq = EVENT_ARG(thread); |
229 | |
|
230 | 0 | assert(wq); |
231 | | |
232 | | /* calculate cycle granularity: |
233 | | * list iteration == 1 run |
234 | | * listnode processing == 1 cycle |
235 | | * granularity == # cycles between checks whether we should yield. |
236 | | * |
237 | | * granularity should be > 0, and can increase slowly after each run to |
238 | | * provide some hysteris, but not past cycles.best or 2*cycles. |
239 | | * |
240 | | * Best: starts low, can only increase |
241 | | * |
242 | | * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased |
243 | | * if we run to end of time slot, can increase otherwise |
244 | | * by a small factor. |
245 | | * |
246 | | * We could use just the average and save some work, however we want to |
247 | | * be |
248 | | * able to adjust quickly to CPU pressure. Average wont shift much if |
249 | | * daemon has been running a long time. |
250 | | */ |
251 | 0 | if (wq->cycles.granularity == 0) |
252 | 0 | wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; |
253 | |
|
254 | 0 | STAILQ_FOREACH_SAFE (item, &wq->items, wq, titem) { |
255 | 0 | assert(item->data); |
256 | | |
257 | | /* dont run items which are past their allowed retries */ |
258 | 0 | if (item->ran > wq->spec.max_retries) { |
259 | 0 | work_queue_item_remove(wq, item); |
260 | 0 | continue; |
261 | 0 | } |
262 | | |
263 | | /* run and take care of items that want to be retried |
264 | | * immediately */ |
265 | 0 | do { |
266 | 0 | ret = wq->spec.workfunc(wq, item->data); |
267 | 0 | item->ran++; |
268 | 0 | } while ((ret == WQ_RETRY_NOW) |
269 | 0 | && (item->ran < wq->spec.max_retries)); |
270 | |
|
271 | 0 | switch (ret) { |
272 | 0 | case WQ_QUEUE_BLOCKED: { |
273 | | /* decrement item->ran again, cause this isn't an item |
274 | | * specific error, and fall through to WQ_RETRY_LATER |
275 | | */ |
276 | 0 | item->ran--; |
277 | 0 | } |
278 | 0 | case WQ_RETRY_LATER: { |
279 | 0 | goto stats; |
280 | 0 | } |
281 | 0 | case WQ_REQUEUE: { |
282 | 0 | item->ran--; |
283 | 0 | work_queue_item_requeue(wq, item); |
284 | | /* If a single node is being used with a meta-queue |
285 | | * (e.g., zebra), |
286 | | * update the next node as we don't want to exit the |
287 | | * thread and |
288 | | * reschedule it after every node. By definition, |
289 | | * WQ_REQUEUE is |
290 | | * meant to continue the processing; the yield logic |
291 | | * will kick in |
292 | | * to terminate the thread when time has exceeded. |
293 | | */ |
294 | 0 | if (titem == NULL) |
295 | 0 | titem = item; |
296 | 0 | break; |
297 | 0 | } |
298 | 0 | case WQ_RETRY_NOW: |
299 | | /* a RETRY_NOW that gets here has exceeded max_tries, same as |
300 | | * ERROR */ |
301 | | /* fallthru */ |
302 | 0 | case WQ_SUCCESS: |
303 | 0 | default: { |
304 | 0 | work_queue_item_remove(wq, item); |
305 | 0 | break; |
306 | 0 | } |
307 | 0 | } |
308 | | |
309 | | /* completed cycle */ |
310 | 0 | cycles++; |
311 | | |
312 | | /* test if we should yield */ |
313 | 0 | if (!(cycles % wq->cycles.granularity) && |
314 | 0 | event_should_yield(thread)) { |
315 | 0 | yielded = 1; |
316 | 0 | goto stats; |
317 | 0 | } |
318 | 0 | } |
319 | | |
320 | 0 | stats: |
321 | |
|
322 | 0 | #define WQ_HYSTERESIS_FACTOR 4 |
323 | | |
324 | | /* we yielded, check whether granularity should be reduced */ |
325 | 0 | if (yielded && (cycles < wq->cycles.granularity)) { |
326 | 0 | wq->cycles.granularity = |
327 | 0 | ((cycles > 0) ? cycles : WORK_QUEUE_MIN_GRANULARITY); |
328 | 0 | } |
329 | | /* otherwise, should granularity increase? */ |
330 | 0 | else if (cycles >= (wq->cycles.granularity)) { |
331 | 0 | if (cycles > wq->cycles.best) |
332 | 0 | wq->cycles.best = cycles; |
333 | | |
334 | | /* along with yielded check, provides hysteresis for granularity |
335 | | */ |
336 | 0 | if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR |
337 | 0 | * WQ_HYSTERESIS_FACTOR)) |
338 | 0 | wq->cycles.granularity *= |
339 | 0 | WQ_HYSTERESIS_FACTOR; /* quick ramp-up */ |
340 | 0 | else if (cycles |
341 | 0 | > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR)) |
342 | 0 | wq->cycles.granularity += WQ_HYSTERESIS_FACTOR; |
343 | 0 | } |
344 | 0 | #undef WQ_HYSTERIS_FACTOR |
345 | |
|
346 | 0 | wq->runs++; |
347 | 0 | wq->cycles.total += cycles; |
348 | 0 | if (yielded) |
349 | 0 | wq->yields++; |
350 | | |
351 | | /* Is the queue done yet? If it is, call the completion callback. */ |
352 | 0 | if (!work_queue_empty(wq)) { |
353 | 0 | if (ret == WQ_RETRY_LATER || |
354 | 0 | ret == WQ_QUEUE_BLOCKED) |
355 | 0 | work_queue_schedule(wq, wq->spec.retry); |
356 | 0 | else |
357 | 0 | work_queue_schedule(wq, 0); |
358 | |
|
359 | 0 | } else if (wq->spec.completion_func) |
360 | 0 | wq->spec.completion_func(wq); |
361 | 0 | } |