Coverage Report

Created: 2026-01-10 06:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/haproxy/src/task.c
Line
Count
Source
1
/*
2
 * Task management functions.
3
 *
4
 * Copyright 2000-2009 Willy Tarreau <w@1wt.eu>
5
 *
6
 * This program is free software; you can redistribute it and/or
7
 * modify it under the terms of the GNU General Public License
8
 * as published by the Free Software Foundation; either version
9
 * 2 of the License, or (at your option) any later version.
10
 *
11
 */
12
13
#include <string.h>
14
15
#include <import/eb32tree.h>
16
17
#include <haproxy/api.h>
18
#include <haproxy/activity.h>
19
#include <haproxy/cfgparse.h>
20
#include <haproxy/clock.h>
21
#include <haproxy/fd.h>
22
#include <haproxy/list.h>
23
#include <haproxy/pool.h>
24
#include <haproxy/task.h>
25
#include <haproxy/tools.h>
26
27
extern struct task *process_stream(struct task *t, void *context, unsigned int state);
28
extern void stream_update_timings(struct task *t, uint64_t lat, uint64_t cpu);
29
30
DECLARE_TYPED_POOL(pool_head_task,    "task",    struct task, 0, 64);
31
DECLARE_TYPED_POOL(pool_head_tasklet, "tasklet", struct tasklet, 0, 64);
32
33
/* This is the memory pool containing all the signal structs. These
34
 * struct are used to store each required signal between two tasks.
35
 */
36
DECLARE_TYPED_POOL(pool_head_notification, "notification", struct notification);
37
38
/* The lock protecting all wait queues at once. For now we have no better
39
 * alternative since a task may have to be removed from a queue and placed
40
 * into another one. Storing the WQ index into the task doesn't seem to be
41
 * sufficient either.
42
 */
43
__decl_aligned_rwlock(wq_lock);
44
45
/* used to detect if the scheduler looks stuck (for warnings) */
46
static struct {
47
  int sched_stuck THREAD_ALIGNED();
48
} sched_ctx[MAX_THREADS];
49
50
/* Flags the task <t> for immediate destruction and puts it into its first
51
 * thread's shared tasklet list if not yet queued/running. This will bypass
52
 * the priority scheduling and make the task show up as fast as possible in
53
 * the other thread's queue. Note that this operation isn't idempotent and is
54
 * not supposed to be run on the same task from multiple threads at once. It's
55
 * the caller's responsibility to make sure it is the only one able to kill the
56
 * task.
57
 */
58
void task_kill(struct task *t)
59
0
{
60
0
  unsigned int state = t->state;
61
0
  unsigned int thr;
62
63
0
  BUG_ON(state & TASK_KILLED);
64
65
0
  while (1) {
66
0
    while (state & (TASK_RUNNING | TASK_QUEUED)) {
67
      /* task already in the queue and about to be executed,
68
       * or even currently running. Just add the flag and be
69
       * done with it, the process loop will detect it and kill
70
       * it. The CAS will fail if we arrive too late.
71
       */
72
0
      if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_KILLED))
73
0
        return;
74
0
    }
75
76
    /* We'll have to wake it up, but we must also secure it so that
77
     * it doesn't vanish under us. TASK_QUEUED guarantees nobody will
78
     * add past us.
79
     */
80
0
    if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_QUEUED | TASK_KILLED)) {
81
      /* Bypass the tree and go directly into the shared tasklet list.
82
       * Note: that's a task so it must be accounted for as such. Pick
83
       * the task's first thread for the job.
84
       */
85
0
      thr = t->tid >= 0 ? t->tid : tid;
86
87
      /* Beware: tasks that have never run don't have their ->list empty yet! */
88
0
      MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list,
89
0
                     list_to_mt_list(&((struct tasklet *)t)->list));
90
0
      _HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total);
91
0
      _HA_ATOMIC_INC(&ha_thread_ctx[thr].tasks_in_list);
92
0
      wake_thread(thr);
93
0
      return;
94
0
    }
95
0
  }
96
0
}
97
98
/* Equivalent of task_kill for tasklets. Mark the tasklet <t> for destruction.
99
 * It will be deleted on the next scheduler invocation. This function is
100
 * thread-safe : a thread can kill a tasklet of another thread.
101
 */
102
void tasklet_kill(struct tasklet *t)
103
0
{
104
0
  unsigned int state = t->state;
105
0
  unsigned int thr;
106
107
0
  BUG_ON(state & TASK_KILLED);
108
109
0
  while (1) {
110
0
    while (state & (TASK_QUEUED)) {
111
      /* Tasklet already in the list ready to be executed. Add
112
       * the killed flag and wait for the process loop to
113
       * detect it.
114
       */
115
0
      if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_KILLED))
116
0
        return;
117
0
    }
118
119
    /* Mark the tasklet as killed and wake the thread to process it
120
     * as soon as possible.
121
     */
122
0
    if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_QUEUED | TASK_KILLED)) {
123
0
      thr = t->tid >= 0 ? t->tid : tid;
124
0
      MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list,
125
0
                     list_to_mt_list(&t->list));
126
0
      _HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total);
127
0
      wake_thread(thr);
128
0
      return;
129
0
    }
130
0
  }
131
0
}
132
133
/* Do not call this one, please use tasklet_wakeup_on() instead, as this one is
134
 * the slow path of tasklet_wakeup_on() which performs some preliminary checks
135
 * and sets TASK_QUEUED before calling this one. A negative <thr> designates
136
 * the current thread.
137
 */
138
void __tasklet_wakeup_on(struct tasklet *tl, int thr)
139
0
{
140
0
  if (likely(thr < 0)) {
141
    /* this tasklet runs on the caller thread */
142
0
    if (tl->state & TASK_HEAVY) {
143
0
      LIST_APPEND(&th_ctx->tasklets[TL_HEAVY], &tl->list);
144
0
      th_ctx->tl_class_mask |= 1 << TL_HEAVY;
145
0
    }
146
0
    else if (tl->state & TASK_SELF_WAKING) {
147
0
      LIST_APPEND(&th_ctx->tasklets[TL_BULK], &tl->list);
148
0
      th_ctx->tl_class_mask |= 1 << TL_BULK;
149
0
    }
150
0
    else if ((struct task *)tl == th_ctx->current) {
151
0
      _HA_ATOMIC_OR(&tl->state, TASK_SELF_WAKING);
152
0
      LIST_APPEND(&th_ctx->tasklets[TL_BULK], &tl->list);
153
0
      th_ctx->tl_class_mask |= 1 << TL_BULK;
154
0
    }
155
0
    else if (th_ctx->current_queue < 0) {
156
0
      LIST_APPEND(&th_ctx->tasklets[TL_URGENT], &tl->list);
157
0
      th_ctx->tl_class_mask |= 1 << TL_URGENT;
158
0
    }
159
0
    else {
160
0
      LIST_APPEND(&th_ctx->tasklets[th_ctx->current_queue], &tl->list);
161
0
      th_ctx->tl_class_mask |= 1 << th_ctx->current_queue;
162
0
    }
163
0
    _HA_ATOMIC_INC(&th_ctx->rq_total);
164
0
  } else {
165
    /* this tasklet runs on a specific thread. */
166
0
    MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list, list_to_mt_list(&tl->list));
167
0
    _HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total);
168
0
    wake_thread(thr);
169
0
  }
170
0
}
171
172
/* Do not call this one, please use tasklet_wakeup_after_on() instead, as this one is
173
 * the slow path of tasklet_wakeup_after() which performs some preliminary checks
174
 * and sets TASK_QUEUED before calling this one.
175
 */
176
struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl)
177
0
{
178
0
  BUG_ON(tl->tid >= 0 && tid != tl->tid);
179
  /* this tasklet runs on the caller thread */
180
0
  if (!head) {
181
0
    if (tl->state & TASK_HEAVY) {
182
0
      LIST_INSERT(&th_ctx->tasklets[TL_HEAVY], &tl->list);
183
0
      th_ctx->tl_class_mask |= 1 << TL_HEAVY;
184
0
    }
185
0
    else if (tl->state & TASK_SELF_WAKING) {
186
0
      LIST_INSERT(&th_ctx->tasklets[TL_BULK], &tl->list);
187
0
      th_ctx->tl_class_mask |= 1 << TL_BULK;
188
0
    }
189
0
    else if ((struct task *)tl == th_ctx->current) {
190
0
      _HA_ATOMIC_OR(&tl->state, TASK_SELF_WAKING);
191
0
      LIST_INSERT(&th_ctx->tasklets[TL_BULK], &tl->list);
192
0
      th_ctx->tl_class_mask |= 1 << TL_BULK;
193
0
    }
194
0
    else if (th_ctx->current_queue < 0) {
195
0
      LIST_INSERT(&th_ctx->tasklets[TL_URGENT], &tl->list);
196
0
      th_ctx->tl_class_mask |= 1 << TL_URGENT;
197
0
    }
198
0
    else {
199
0
      LIST_INSERT(&th_ctx->tasklets[th_ctx->current_queue], &tl->list);
200
0
      th_ctx->tl_class_mask |= 1 << th_ctx->current_queue;
201
0
    }
202
0
  }
203
0
  else {
204
0
    LIST_APPEND(head, &tl->list);
205
0
  }
206
0
  _HA_ATOMIC_INC(&th_ctx->rq_total);
207
0
  return &tl->list;
208
0
}
209
210
/* Puts the task <t> in run queue at a position depending on t->nice. <t> is
211
 * returned. The nice value assigns boosts in 32th of the run queue size. A
212
 * nice value of -1024 sets the task to -tasks_run_queue*32, while a nice value
213
 * of 1024 sets the task to tasks_run_queue*32. The state flags are cleared, so
214
 * the caller will have to set its flags after this call.
215
 * The task must not already be in the run queue. If unsure, use the safer
216
 * task_wakeup() function.
217
 */
218
void __task_wakeup(struct task *t)
219
0
{
220
0
  struct eb_root *root = &th_ctx->rqueue;
221
0
  int thr __maybe_unused = t->tid >= 0 ? t->tid : tid;
222
223
#ifdef USE_THREAD
224
  if (thr != tid) {
225
    root = &ha_thread_ctx[thr].rqueue_shared;
226
227
    _HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total);
228
    HA_SPIN_LOCK(TASK_RQ_LOCK, &ha_thread_ctx[thr].rqsh_lock);
229
230
    t->rq.key = _HA_ATOMIC_ADD_FETCH(&ha_thread_ctx[thr].rqueue_ticks, 1);
231
    __ha_barrier_store();
232
  } else
233
#endif
234
0
  {
235
0
    _HA_ATOMIC_INC(&th_ctx->rq_total);
236
0
    t->rq.key = _HA_ATOMIC_ADD_FETCH(&th_ctx->rqueue_ticks, 1);
237
0
  }
238
239
0
  if (likely(t->nice)) {
240
0
    int offset;
241
242
0
    _HA_ATOMIC_INC(&tg_ctx->niced_tasks);
243
0
    offset = t->nice * (int)global.tune.runqueue_depth;
244
0
    t->rq.key += offset;
245
0
  }
246
247
0
  if (_HA_ATOMIC_LOAD(&th_ctx->flags) & TH_FL_TASK_PROFILING)
248
0
    t->wake_date = now_mono_time();
249
250
0
  eb32_insert(root, &t->rq);
251
252
#ifdef USE_THREAD
253
  if (thr != tid) {
254
    HA_SPIN_UNLOCK(TASK_RQ_LOCK, &ha_thread_ctx[thr].rqsh_lock);
255
256
    /* If all threads that are supposed to handle this task are sleeping,
257
     * wake one.
258
     */
259
    wake_thread(thr);
260
  }
261
#endif
262
0
  return;
263
0
}
264
265
/*
266
 * __task_queue()
267
 *
268
 * Inserts a task into wait queue <wq> at the position given by its expiration
269
 * date. It does not matter if the task was already in the wait queue or not,
270
 * as it will be unlinked. The task MUST NOT have an infinite expiration timer.
271
 * Last, tasks must not be queued further than the end of the tree, which is
272
 * between <now_ms> and <now_ms> + 2^31 ms (now+24days in 32bit).
273
 *
274
 * This function should not be used directly, it is meant to be called by the
275
 * inline version of task_queue() which performs a few cheap preliminary tests
276
 * before deciding to call __task_queue(). Moreover this function doesn't care
277
 * at all about locking so the caller must be careful when deciding whether to
278
 * lock or not around this call.
279
 */
280
void __task_queue(struct task *task, struct eb_root *wq)
281
0
{
282
#ifdef USE_THREAD
283
  BUG_ON((wq == &tg_ctx->timers && task->tid >= 0) ||
284
         (wq == &th_ctx->timers && task->tid < 0) ||
285
         (wq != &tg_ctx->timers && wq != &th_ctx->timers));
286
#endif
287
  /* if this happens the process is doomed anyway, so better catch it now
288
   * so that we have the caller in the stack.
289
   */
290
0
  BUG_ON(task->expire == TICK_ETERNITY);
291
292
0
  if (likely(task_in_wq(task)))
293
0
    __task_unlink_wq(task);
294
295
  /* the task is not in the queue now */
296
0
  task->wq.key = task->expire;
297
#ifdef DEBUG_CHECK_INVALID_EXPIRATION_DATES
298
  if (tick_is_lt(task->wq.key, now_ms))
299
    /* we're queuing too far away or in the past (most likely) */
300
    return;
301
#endif
302
303
0
  eb32_insert(wq, &task->wq);
304
0
}
305
306
/*
307
 * Extract all expired timers from the timer queue, and wakes up all
308
 * associated tasks.
309
 */
310
void wake_expired_tasks()
311
0
{
312
0
  struct thread_ctx * const tt = th_ctx; // thread's tasks
313
0
  int max_processed = global.tune.runqueue_depth;
314
0
  struct task *task;
315
0
  struct eb32_node *eb;
316
0
  __decl_thread(int key);
317
318
0
  while (1) {
319
0
    if (max_processed-- <= 0)
320
0
      goto leave;
321
322
0
    eb = eb32_lookup_ge(&tt->timers, now_ms - TIMER_LOOK_BACK);
323
0
    if (!eb) {
324
      /* we might have reached the end of the tree, typically because
325
      * <now_ms> is in the first half and we're first scanning the last
326
      * half. Let's loop back to the beginning of the tree now.
327
      */
328
0
      eb = eb32_first(&tt->timers);
329
0
      if (likely(!eb))
330
0
        break;
331
0
    }
332
333
    /* It is possible that this task was left at an earlier place in the
334
     * tree because a recent call to task_queue() has not moved it. This
335
     * happens when the new expiration date is later than the old one.
336
     * Since it is very unlikely that we reach a timeout anyway, it's a
337
     * lot cheaper to proceed like this because we almost never update
338
     * the tree. We may also find disabled expiration dates there. Since
339
     * we have detached the task from the tree, we simply call task_queue
340
     * to take care of this. Note that we might occasionally requeue it at
341
     * the same place, before <eb>, so we have to check if this happens,
342
     * and adjust <eb>, otherwise we may skip it which is not what we want.
343
     * We may also not requeue the task (and not point eb at it) if its
344
     * expiration time is not set. We also make sure we leave the real
345
     * expiration date for the next task in the queue so that when calling
346
     * next_timer_expiry() we're guaranteed to see the next real date and
347
     * not the next apparent date. This is in order to avoid useless
348
     * wakeups.
349
     */
350
351
0
    task = eb32_entry(eb, struct task, wq);
352
0
    if (tick_is_expired(task->expire, now_ms)) {
353
      /* expired task, wake it up */
354
0
      __task_unlink_wq(task);
355
0
      _task_wakeup(task, TASK_WOKEN_TIMER, 0);
356
0
    }
357
0
    else if (task->expire != eb->key) {
358
      /* task is not expired but its key doesn't match so let's
359
       * update it and skip to next apparently expired task.
360
       */
361
0
      __task_unlink_wq(task);
362
0
      if (tick_isset(task->expire))
363
0
        __task_queue(task, &tt->timers);
364
0
    }
365
0
    else {
366
      /* task not expired and correctly placed. It may not be eternal. */
367
0
      BUG_ON(task->expire == TICK_ETERNITY);
368
0
      break;
369
0
    }
370
0
  }
371
372
#ifdef USE_THREAD
373
  if (eb_is_empty(&tg_ctx->timers))
374
    goto leave;
375
376
  HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock);
377
  eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
378
  if (!eb) {
379
    eb = eb32_first(&tg_ctx->timers);
380
    if (likely(!eb)) {
381
      HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
382
      goto leave;
383
    }
384
  }
385
  key = eb->key;
386
387
  if (tick_is_lt(now_ms, key)) {
388
    HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
389
    goto leave;
390
  }
391
392
  /* There's really something of interest here, let's visit the queue */
393
394
  if (HA_RWLOCK_TRYRDTOSK(TASK_WQ_LOCK, &wq_lock)) {
395
    /* if we failed to grab the lock it means another thread is
396
     * already doing the same here, so let it do the job.
397
     */
398
    HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
399
    goto leave;
400
  }
401
402
  while (1) {
403
  lookup_next:
404
    if (max_processed-- <= 0)
405
      break;
406
    eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
407
    if (!eb) {
408
      /* we might have reached the end of the tree, typically because
409
      * <now_ms> is in the first half and we're first scanning the last
410
      * half. Let's loop back to the beginning of the tree now.
411
      */
412
      eb = eb32_first(&tg_ctx->timers);
413
      if (likely(!eb))
414
        break;
415
    }
416
417
    task = eb32_entry(eb, struct task, wq);
418
419
    /* Check for any competing run of the task (quite rare but may
420
     * involve a dangerous concurrent access on task->expire). In
421
     * order to protect against this, we'll take an exclusive access
422
     * on TASK_RUNNING before checking/touching task->expire. If the
423
     * task is already RUNNING on another thread, it will deal by
424
     * itself with the requeuing so we must not do anything and
425
     * simply quit the loop for now, because we cannot wait with the
426
     * WQ lock held as this would prevent the running thread from
427
     * requeuing the task. One annoying effect of holding RUNNING
428
     * here is that a concurrent task_wakeup() will refrain from
429
     * waking it up. This forces us to check for a wakeup after
430
     * releasing the flag.
431
     */
432
    if (HA_ATOMIC_FETCH_OR(&task->state, TASK_RUNNING) & TASK_RUNNING)
433
      break;
434
435
    if (tick_is_expired(task->expire, now_ms)) {
436
      /* expired task, wake it up */
437
      HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
438
      __task_unlink_wq(task);
439
      HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
440
      task_drop_running(task, TASK_WOKEN_TIMER);
441
    }
442
    else if (task->expire != eb->key) {
443
      /* task is not expired but its key doesn't match so let's
444
       * update it and skip to next apparently expired task.
445
       */
446
      HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
447
      __task_unlink_wq(task);
448
      if (tick_isset(task->expire))
449
        __task_queue(task, &tg_ctx->timers);
450
      HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
451
      task_drop_running(task, 0);
452
      goto lookup_next;
453
    }
454
    else {
455
      /* task not expired and correctly placed. It may not be eternal. */
456
      BUG_ON(task->expire == TICK_ETERNITY);
457
      task_drop_running(task, 0);
458
      break;
459
    }
460
  }
461
462
  HA_RWLOCK_SKUNLOCK(TASK_WQ_LOCK, &wq_lock);
463
#endif
464
0
leave:
465
0
  return;
466
0
}
467
468
/* Checks the next timer for the current thread by looking into its own timer
469
 * list and the global one. It may return TICK_ETERNITY if no timer is present.
470
 * Note that the next timer might very well be slightly in the past.
471
 */
472
int next_timer_expiry()
473
0
{
474
0
  struct thread_ctx * const tt = th_ctx; // thread's tasks
475
0
  struct eb32_node *eb;
476
0
  int ret = TICK_ETERNITY;
477
0
  __decl_thread(int key = TICK_ETERNITY);
478
479
  /* first check in the thread-local timers */
480
0
  eb = eb32_lookup_ge(&tt->timers, now_ms - TIMER_LOOK_BACK);
481
0
  if (!eb) {
482
    /* we might have reached the end of the tree, typically because
483
     * <now_ms> is in the first half and we're first scanning the last
484
     * half. Let's loop back to the beginning of the tree now.
485
     */
486
0
    eb = eb32_first(&tt->timers);
487
0
  }
488
489
0
  if (eb)
490
0
    ret = eb->key;
491
492
#ifdef USE_THREAD
493
  if (!eb_is_empty(&tg_ctx->timers)) {
494
    HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock);
495
    eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
496
    if (!eb)
497
      eb = eb32_first(&tg_ctx->timers);
498
    if (eb)
499
      key = eb->key;
500
    HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
501
    if (eb)
502
      ret = tick_first(ret, key);
503
  }
504
#endif
505
0
  return ret;
506
0
}
507
508
/* Walks over tasklet lists th_ctx->tasklets[0..TL_CLASSES-1] and run at most
509
 * budget[TL_*] of them. Returns the number of entries effectively processed
510
 * (tasks and tasklets merged). The count of tasks in the list for the current
511
 * thread is adjusted.
512
 */
513
unsigned int run_tasks_from_lists(unsigned int budgets[])
514
0
{
515
0
  struct task *(*process)(struct task *t, void *ctx, unsigned int state);
516
0
  struct list *tl_queues = th_ctx->tasklets;
517
0
  struct task *t;
518
0
  uint8_t budget_mask = (1 << TL_CLASSES) - 1;
519
0
  struct sched_activity *profile_entry = NULL;
520
0
  unsigned int done = 0;
521
0
  unsigned int queue;
522
0
  unsigned int state;
523
0
  void *ctx;
524
525
0
  for (queue = 0; queue < TL_CLASSES;) {
526
0
    th_ctx->current_queue = queue;
527
528
    /* global.tune.sched.low-latency is set */
529
0
    if (global.tune.options & GTUNE_SCHED_LOW_LATENCY) {
530
0
      if (unlikely(th_ctx->tl_class_mask & budget_mask & ((1 << queue) - 1))) {
531
        /* a lower queue index has tasks again and still has a
532
         * budget to run them. Let's switch to it now.
533
         */
534
0
        queue = (th_ctx->tl_class_mask & 1) ? 0 :
535
0
          (th_ctx->tl_class_mask & 2) ? 1 : 2;
536
0
        continue;
537
0
      }
538
539
0
      if (unlikely(queue > TL_URGENT &&
540
0
             budget_mask & (1 << TL_URGENT) &&
541
0
             !MT_LIST_ISEMPTY(&th_ctx->shared_tasklet_list))) {
542
        /* an urgent tasklet arrived from another thread */
543
0
        break;
544
0
      }
545
546
0
      if (unlikely(queue > TL_NORMAL &&
547
0
             budget_mask & (1 << TL_NORMAL) &&
548
0
             (!eb_is_empty(&th_ctx->rqueue) || !eb_is_empty(&th_ctx->rqueue_shared)))) {
549
        /* a task was woken up by a bulk tasklet or another thread */
550
0
        break;
551
0
      }
552
0
    }
553
554
0
    if (LIST_ISEMPTY(&tl_queues[queue])) {
555
0
      th_ctx->tl_class_mask &= ~(1 << queue);
556
0
      queue++;
557
0
      continue;
558
0
    }
559
560
0
    if (!budgets[queue]) {
561
0
      budget_mask &= ~(1 << queue);
562
0
      queue++;
563
0
      continue;
564
0
    }
565
566
0
    budgets[queue]--;
567
0
    activity[tid].ctxsw++;
568
569
0
    t = (struct task *)LIST_ELEM(tl_queues[queue].n, struct tasklet *, list);
570
0
    ctx = t->context;
571
0
    process = t->process;
572
0
    t->calls++;
573
574
0
    th_ctx->lock_wait_total = 0;
575
0
    th_ctx->mem_wait_total = 0;
576
0
    th_ctx->locked_total = 0;
577
0
    th_ctx->sched_wake_date = t->wake_date;
578
0
    if (th_ctx->sched_wake_date || (t->state & TASK_F_WANTS_TIME)) {
579
      /* take the most accurate clock we have, either
580
       * mono_time() or last now_ns (monotonic but only
581
       * incremented once per poll loop).
582
       */
583
0
      th_ctx->sched_call_date = now_mono_time();
584
0
      if (unlikely(!th_ctx->sched_call_date))
585
0
        th_ctx->sched_call_date = now_ns;
586
0
    }
587
588
0
    if (th_ctx->sched_wake_date) {
589
0
      t->wake_date = 0;
590
0
      profile_entry = sched_activity_entry(sched_activity, t->process, t->caller);
591
0
      th_ctx->sched_profile_entry = profile_entry;
592
0
      HA_ATOMIC_ADD(&profile_entry->lat_time, (uint32_t)(th_ctx->sched_call_date - th_ctx->sched_wake_date));
593
0
      HA_ATOMIC_INC(&profile_entry->calls);
594
0
    }
595
596
0
    __ha_barrier_store();
597
598
0
    th_ctx->current = t;
599
0
    _HA_ATOMIC_AND(&th_ctx->flags, ~TH_FL_STUCK); // this thread is still running
600
601
0
    _HA_ATOMIC_DEC(&th_ctx->rq_total);
602
0
    LIST_DEL_INIT(&((struct tasklet *)t)->list);
603
0
    __ha_barrier_store();
604
605
606
    /* We must be the exclusive owner of the TASK_RUNNING bit, and
607
     * have to be careful that the task is not being manipulated on
608
     * another thread finding it expired in wake_expired_tasks().
609
     * The TASK_RUNNING bit will be set during these operations,
610
     * they are extremely rare and do not last long so the best to
611
     * do here is to wait.
612
     */
613
0
    state = _HA_ATOMIC_LOAD(&t->state);
614
0
    do {
615
0
      while (unlikely(state & TASK_RUNNING)) {
616
0
        __ha_cpu_relax();
617
0
        state = _HA_ATOMIC_LOAD(&t->state);
618
0
      }
619
0
    } while (!_HA_ATOMIC_CAS(&t->state, &state, (state & TASK_PERSISTENT) | TASK_RUNNING));
620
621
0
    __ha_barrier_atomic_store();
622
623
    /* keep the task counter up to date */
624
0
    if (!(state & TASK_F_TASKLET))
625
0
      _HA_ATOMIC_DEC(&ha_thread_ctx[tid].tasks_in_list);
626
627
    /* From this point, we know that the task or tasklet was properly
628
     * dequeued, flagged and accounted for. Let's now check if it was
629
     * killed. If TASK_KILLED arrived before we've read the state, we
630
     * directly free the task/tasklet. Otherwise for tasks it will be
631
     * seen after processing and it's freed on the exit path.
632
     */
633
634
0
    if (unlikely((state & TASK_KILLED) || process == NULL)) {
635
      /* Task or tasklet has been killed, let's remove it */
636
0
      if (state & TASK_F_TASKLET)
637
0
        pool_free(pool_head_tasklet, t);
638
0
      else {
639
0
        task_unlink_wq(t);
640
0
        __task_free(t);
641
0
      }
642
      /* We don't want max_processed to be decremented if
643
       * we're just freeing a destroyed task, we should only
644
       * do so if we really ran a task.
645
       */
646
0
      goto next;
647
0
    }
648
649
    /* OK now the task or tasklet is well alive and is going to be run */
650
0
    if (state & TASK_F_TASKLET) {
651
      /* this is a tasklet */
652
653
0
      t = process(t, ctx, state);
654
0
      if (t != NULL)
655
0
        _HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
656
0
    } else {
657
      /* This is a regular task */
658
659
0
      if (process == process_stream)
660
0
        t = process_stream(t, ctx, state);
661
0
      else
662
0
        t = process(t, ctx, state);
663
664
      /* If there is a pending state, we have to wake up the task
665
       * immediately, else we defer it into wait queue.
666
       */
667
0
      if (t != NULL) {
668
0
        state = _HA_ATOMIC_LOAD(&t->state);
669
0
        if (unlikely(state & TASK_KILLED)) {
670
0
          task_unlink_wq(t);
671
0
          __task_free(t);
672
0
        }
673
0
        else {
674
0
          task_queue(t);
675
0
          task_drop_running(t, 0);
676
0
        }
677
0
      }
678
0
    }
679
0
    done++;
680
0
  next:
681
0
    th_ctx->current = NULL;
682
0
    sched_ctx[tid].sched_stuck = 0; // scheduler is not stuck (don't warn)
683
0
    __ha_barrier_store();
684
685
    /* stats are only registered for non-zero wake dates */
686
0
    if (unlikely(th_ctx->sched_wake_date)) {
687
0
      HA_ATOMIC_ADD(&profile_entry->cpu_time, (uint32_t)(now_mono_time() - th_ctx->sched_call_date));
688
0
      if (th_ctx->lock_wait_total)
689
0
        HA_ATOMIC_ADD(&profile_entry->lkw_time, th_ctx->lock_wait_total);
690
0
      if (th_ctx->mem_wait_total)
691
0
        HA_ATOMIC_ADD(&profile_entry->mem_time, th_ctx->mem_wait_total);
692
0
      if (th_ctx->locked_total)
693
0
        HA_ATOMIC_ADD(&profile_entry->lkd_time, th_ctx->locked_total);
694
0
    }
695
0
  }
696
0
  th_ctx->current_queue = -1;
697
0
  th_ctx->sched_wake_date = TICK_ETERNITY;
698
699
0
  return done;
700
0
}
701
702
/* The run queue is chronologically sorted in a tree. An insertion counter is
703
 * used to assign a position to each task. This counter may be combined with
704
 * other variables (eg: nice value) to set the final position in the tree. The
705
 * counter may wrap without a problem, of course. We then limit the number of
706
 * tasks processed to 200 in any case, so that general latency remains low and
707
 * so that task positions have a chance to be considered. The function scans
708
 * both the global and local run queues and picks the most urgent task between
709
 * the two. We need to grab the global runqueue lock to touch it so it's taken
710
 * on the very first access to the global run queue and is released as soon as
711
 * it reaches the end.
712
 *
713
 * The function adjusts <next> if a new event is closer.
714
 */
715
void process_runnable_tasks()
716
0
{
717
0
  struct thread_ctx * const tt = th_ctx;
718
0
  struct eb32_node *lrq; // next local run queue entry
719
0
  struct eb32_node *grq; // next global run queue entry
720
0
  struct task *t;
721
0
  const unsigned int default_weights[TL_CLASSES] = {
722
0
    [TL_URGENT] = 64, // ~50% of CPU bandwidth for I/O
723
0
    [TL_NORMAL] = 48, // ~37% of CPU bandwidth for tasks
724
0
    [TL_BULK]   = 16, // ~13% of CPU bandwidth for self-wakers
725
0
    [TL_HEAVY]  = 1,  // never more than 1 heavy task at once
726
0
  };
727
0
  unsigned int max[TL_CLASSES]; // max to be run per class
728
0
  unsigned int max_total;       // sum of max above
729
0
  struct mt_list *tmp_list;
730
0
  unsigned int queue;
731
0
  int max_processed;
732
0
  int lpicked, gpicked;
733
0
  int heavy_queued = 0;
734
0
  int budget;
735
736
0
  _HA_ATOMIC_AND(&th_ctx->flags, ~TH_FL_STUCK); // this thread is still running
737
738
0
  if (!thread_has_tasks()) {
739
0
    activity[tid].empty_rq++;
740
0
    return;
741
0
  }
742
743
0
  max_processed = global.tune.runqueue_depth;
744
745
0
  if (likely(tg_ctx->niced_tasks))
746
0
    max_processed = (max_processed + 3) / 4;
747
748
0
  if (max_processed < th_ctx->rq_total && th_ctx->rq_total <= 2*max_processed) {
749
    /* If the run queue exceeds the budget by up to 50%, let's cut it
750
     * into two identical halves to improve latency.
751
     */
752
0
    max_processed = th_ctx->rq_total / 2;
753
0
  }
754
755
0
 not_done_yet:
756
0
  max[TL_URGENT] = max[TL_NORMAL] = max[TL_BULK] = 0;
757
758
  /* urgent tasklets list gets a default weight of ~50% */
759
0
  if ((tt->tl_class_mask & (1 << TL_URGENT)) ||
760
0
      !MT_LIST_ISEMPTY(&tt->shared_tasklet_list))
761
0
    max[TL_URGENT] = default_weights[TL_URGENT];
762
763
  /* normal tasklets list gets a default weight of ~37% */
764
0
  if ((tt->tl_class_mask & (1 << TL_NORMAL)) ||
765
0
      !eb_is_empty(&th_ctx->rqueue) || !eb_is_empty(&th_ctx->rqueue_shared))
766
0
    max[TL_NORMAL] = default_weights[TL_NORMAL];
767
768
  /* bulk tasklets list gets a default weight of ~13% */
769
0
  if ((tt->tl_class_mask & (1 << TL_BULK)))
770
0
    max[TL_BULK] = default_weights[TL_BULK];
771
772
  /* heavy tasks are processed only once and never refilled in a
773
   * call round. That budget is not lost either as we don't reset
774
   * it unless consumed.
775
   */
776
0
  if (!heavy_queued) {
777
0
    if ((tt->tl_class_mask & (1 << TL_HEAVY)))
778
0
      max[TL_HEAVY] = default_weights[TL_HEAVY];
779
0
    else
780
0
      max[TL_HEAVY] = 0;
781
0
    heavy_queued = 1;
782
0
  }
783
784
  /* Now compute a fair share of the weights. Total may slightly exceed
785
   * 100% due to rounding, this is not a problem. Note that while in
786
   * theory the sum cannot be NULL as we cannot get there without tasklets
787
   * to process, in practice it seldom happens when multiple writers
788
   * conflict and rollback on MT_LIST_TRY_APPEND(shared_tasklet_list), causing
789
   * a first MT_LIST_ISEMPTY() to succeed for thread_has_task() and the
790
   * one above to finally fail. This is extremely rare and not a problem.
791
   */
792
0
  max_total = max[TL_URGENT] + max[TL_NORMAL] + max[TL_BULK] + max[TL_HEAVY];
793
0
  if (!max_total)
794
0
    goto leave;
795
796
0
  for (queue = 0; queue < TL_CLASSES; queue++)
797
0
    max[queue]  = ((unsigned)max_processed * max[queue] + max_total - 1) / max_total;
798
799
  /* The heavy queue must never process more than very few tasks at once
800
   * anyway. We set the limit to 1 if running on low_latency scheduling,
801
   * given that we know that other values can have an impact on latency
802
   * (~500us end-to-end connection achieved at 130kcps in SSL), 1 + one
803
   * per 1024 tasks if there is at least one non-heavy task while still
804
   * respecting the ratios above, or 1 + one per 128 tasks if only heavy
805
   * tasks are present. This allows to drain excess SSL handshakes more
806
   * efficiently if the queue becomes congested.
807
   */
808
0
  if (max[TL_HEAVY] > 1) {
809
0
    if (global.tune.options & GTUNE_SCHED_LOW_LATENCY)
810
0
      budget = 1;
811
0
    else if (tt->tl_class_mask & ~(1 << TL_HEAVY))
812
0
      budget = 1 + tt->rq_total / 1024;
813
0
    else
814
0
      budget = 1 + tt->rq_total / 128;
815
816
0
    if (max[TL_HEAVY] > budget)
817
0
      max[TL_HEAVY] = budget;
818
0
  }
819
820
0
  lrq = grq = NULL;
821
822
  /* pick up to max[TL_NORMAL] regular tasks from prio-ordered run queues */
823
  /* Note: the grq lock is always held when grq is not null */
824
0
  lpicked = gpicked = 0;
825
0
  budget = max[TL_NORMAL] - tt->tasks_in_list;
826
0
  while (lpicked + gpicked < budget) {
827
0
    if (!eb_is_empty(&th_ctx->rqueue_shared) && !grq) {
828
#ifdef USE_THREAD
829
      HA_SPIN_LOCK(TASK_RQ_LOCK, &th_ctx->rqsh_lock);
830
      grq = eb32_lookup_ge(&th_ctx->rqueue_shared, _HA_ATOMIC_LOAD(&tt->rqueue_ticks) - TIMER_LOOK_BACK);
831
      if (unlikely(!grq)) {
832
        grq = eb32_first(&th_ctx->rqueue_shared);
833
        if (!grq)
834
          HA_SPIN_UNLOCK(TASK_RQ_LOCK, &th_ctx->rqsh_lock);
835
      }
836
#endif
837
0
    }
838
839
    /* If a global task is available for this thread, it's in grq
840
     * now and the global RQ is locked.
841
     */
842
843
0
    if (!lrq) {
844
0
      lrq = eb32_lookup_ge(&tt->rqueue, _HA_ATOMIC_LOAD(&tt->rqueue_ticks) - TIMER_LOOK_BACK);
845
0
      if (unlikely(!lrq))
846
0
        lrq = eb32_first(&tt->rqueue);
847
0
    }
848
849
0
    if (!lrq && !grq)
850
0
      break;
851
852
0
    if (likely(!grq || (lrq && (int)(lrq->key - grq->key) <= 0))) {
853
0
      t = eb32_entry(lrq, struct task, rq);
854
0
      lrq = eb32_next(lrq);
855
0
      eb32_delete(&t->rq);
856
0
      lpicked++;
857
0
    }
858
#ifdef USE_THREAD
859
    else {
860
      t = eb32_entry(grq, struct task, rq);
861
      grq = eb32_next(grq);
862
      eb32_delete(&t->rq);
863
864
      if (unlikely(!grq)) {
865
        grq = eb32_first(&th_ctx->rqueue_shared);
866
        if (!grq)
867
          HA_SPIN_UNLOCK(TASK_RQ_LOCK, &th_ctx->rqsh_lock);
868
      }
869
      gpicked++;
870
    }
871
#endif
872
0
    if (t->nice)
873
0
      _HA_ATOMIC_DEC(&tg_ctx->niced_tasks);
874
875
    /* Add it to the local task list */
876
0
    LIST_APPEND(&tt->tasklets[TL_NORMAL], &((struct tasklet *)t)->list);
877
0
  }
878
879
  /* release the rqueue lock */
880
0
  if (grq) {
881
0
    HA_SPIN_UNLOCK(TASK_RQ_LOCK, &th_ctx->rqsh_lock);
882
0
    grq = NULL;
883
0
  }
884
885
0
  if (lpicked + gpicked) {
886
0
    tt->tl_class_mask |= 1 << TL_NORMAL;
887
0
    _HA_ATOMIC_ADD(&tt->tasks_in_list, lpicked + gpicked);
888
0
    activity[tid].tasksw += lpicked + gpicked;
889
0
  }
890
891
  /* Merge the list of tasklets waken up by other threads to the
892
   * main list.
893
   */
894
0
  tmp_list = MT_LIST_BEHEAD(&tt->shared_tasklet_list);
895
0
  if (tmp_list) {
896
0
    LIST_SPLICE_END_DETACHED(&tt->tasklets[TL_URGENT], (struct list *)tmp_list);
897
0
    if (!LIST_ISEMPTY(&tt->tasklets[TL_URGENT]))
898
0
      tt->tl_class_mask |= 1 << TL_URGENT;
899
0
  }
900
901
  /* execute tasklets in each queue */
902
0
  max_processed -= run_tasks_from_lists(max);
903
904
  /* some tasks may have woken other ones up */
905
0
  if (max_processed > 0 && thread_has_tasks())
906
0
    goto not_done_yet;
907
908
0
 leave:
909
0
  if (tt->tl_class_mask)
910
0
    activity[tid].long_rq++;
911
0
}
912
913
/* Pings the scheduler to verify that tasks continue running for thread <thr>.
914
 * Returns 1 if the scheduler made progress since last call, 0 if it looks
915
 * stuck. It marks it as stuck for next visit.
916
 */
917
int is_sched_alive(int thr)
918
0
{
919
0
  return !HA_ATOMIC_XCHG(&sched_ctx[thr].sched_stuck, 1);
920
0
}
921
922
/*
923
 * Delete every tasks before running the master polling loop
924
 */
925
void mworker_cleantasks()
926
0
{
927
0
  struct task *t;
928
0
  int i;
929
0
  struct eb32_node *tmp_wq = NULL;
930
0
  struct eb32_node *tmp_rq = NULL;
931
932
#ifdef USE_THREAD
933
  /* cleanup the global run queue */
934
  tmp_rq = eb32_first(&th_ctx->rqueue_shared);
935
  while (tmp_rq) {
936
    t = eb32_entry(tmp_rq, struct task, rq);
937
    tmp_rq = eb32_next(tmp_rq);
938
    task_destroy(t);
939
  }
940
  /* cleanup the timers queue */
941
  tmp_wq = eb32_first(&tg_ctx->timers);
942
  while (tmp_wq) {
943
    t = eb32_entry(tmp_wq, struct task, wq);
944
    tmp_wq = eb32_next(tmp_wq);
945
    task_destroy(t);
946
  }
947
#endif
948
  /* clean the per thread run queue */
949
0
  for (i = 0; i < global.nbthread; i++) {
950
0
    tmp_rq = eb32_first(&ha_thread_ctx[i].rqueue);
951
0
    while (tmp_rq) {
952
0
      t = eb32_entry(tmp_rq, struct task, rq);
953
0
      tmp_rq = eb32_next(tmp_rq);
954
0
      task_destroy(t);
955
0
    }
956
    /* cleanup the per thread timers queue */
957
0
    tmp_wq = eb32_first(&ha_thread_ctx[i].timers);
958
0
    while (tmp_wq) {
959
0
      t = eb32_entry(tmp_wq, struct task, wq);
960
0
      tmp_wq = eb32_next(tmp_wq);
961
0
      task_destroy(t);
962
0
    }
963
0
  }
964
0
}
965
966
/* perform minimal initializations */
967
static void init_task()
968
0
{
969
0
  int i, q;
970
971
0
  for (i = 0; i < MAX_TGROUPS; i++)
972
0
    memset(&ha_tgroup_ctx[i].timers, 0, sizeof(ha_tgroup_ctx[i].timers));
973
974
0
  for (i = 0; i < MAX_THREADS; i++) {
975
0
    for (q = 0; q < TL_CLASSES; q++)
976
0
      LIST_INIT(&ha_thread_ctx[i].tasklets[q]);
977
0
    MT_LIST_INIT(&ha_thread_ctx[i].shared_tasklet_list);
978
0
  }
979
0
}
980
981
/* config parser for global "tune.sched.low-latency", accepts "on" or "off" */
982
static int cfg_parse_tune_sched_low_latency(char **args, int section_type, struct proxy *curpx,
983
                                      const struct proxy *defpx, const char *file, int line,
984
                                      char **err)
985
0
{
986
0
  if (too_many_args(1, args, err, NULL))
987
0
    return -1;
988
989
0
  if (strcmp(args[1], "on") == 0)
990
0
    global.tune.options |= GTUNE_SCHED_LOW_LATENCY;
991
0
  else if (strcmp(args[1], "off") == 0)
992
0
    global.tune.options &= ~GTUNE_SCHED_LOW_LATENCY;
993
0
  else {
994
0
    memprintf(err, "'%s' expects either 'on' or 'off' but got '%s'.", args[0], args[1]);
995
0
    return -1;
996
0
  }
997
0
  return 0;
998
0
}
999
1000
/* config keyword parsers */
1001
static struct cfg_kw_list cfg_kws = {ILH, {
1002
  { CFG_GLOBAL, "tune.sched.low-latency", cfg_parse_tune_sched_low_latency },
1003
  { 0, NULL, NULL }
1004
}};
1005
1006
INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);
1007
INITCALL0(STG_PREPARE, init_task);
1008
1009
/*
1010
 * Local variables:
1011
 *  c-indent-level: 8
1012
 *  c-basic-offset: 8
1013
 * End:
1014
 */