Coverage Report

Created: 2024-02-29 06:05

/src/strongswan/src/libstrongswan/processing/scheduler.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2008-2015 Tobias Brunner
3
 * Copyright (C) 2005-2006 Martin Willi
4
 * Copyright (C) 2005 Jan Hutter
5
 *
6
 * Copyright (C) secunet Security Networks AG
7
 *
8
 * This program is free software; you can redistribute it and/or modify it
9
 * under the terms of the GNU General Public License as published by the
10
 * Free Software Foundation; either version 2 of the License, or (at your
11
 * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
12
 *
13
 * This program is distributed in the hope that it will be useful, but
14
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
16
 * for more details.
17
 */
18
19
#include <stdlib.h>
20
21
#include "scheduler.h"
22
23
#include <utils/debug.h>
24
#include <processing/processor.h>
25
#include <processing/jobs/callback_job.h>
26
#include <threading/thread.h>
27
#include <threading/condvar.h>
28
#include <threading/mutex.h>
29
30
/* the initial size of the heap */
31
#define HEAP_SIZE_DEFAULT 64
32
33
typedef struct event_t event_t;
34
35
/**
36
 * Event containing a job and a schedule time
37
 */
38
struct event_t {
39
  /**
40
   * Time to fire the event.
41
   */
42
  timeval_t time;
43
44
  /**
45
   * Every event has its assigned job.
46
   */
47
  job_t *job;
48
};
49
50
/**
51
 * destroy an event and its job
52
 */
53
static void event_destroy(event_t *event)
54
0
{
55
0
  event->job->destroy(event->job);
56
0
  free(event);
57
0
}
58
59
typedef struct private_scheduler_t private_scheduler_t;
60
61
/**
62
 * Private data of a scheduler_t object.
63
 */
64
struct private_scheduler_t {
65
66
  /**
67
   * Public part of a scheduler_t object.
68
   */
69
   scheduler_t public;
70
71
  /**
72
   * The heap in which the events are stored.
73
   */
74
  event_t **heap;
75
76
  /**
77
   * The size of the heap.
78
   */
79
  u_int heap_size;
80
81
  /**
82
   * The number of scheduled events.
83
   */
84
  u_int event_count;
85
86
  /**
87
   * Exclusive access to list
88
   */
89
  mutex_t *mutex;
90
91
  /**
92
   * Condvar to wait for next job.
93
   */
94
  condvar_t *condvar;
95
};
96
97
/**
98
 * Returns the top event without removing it. Returns NULL if the heap is empty.
99
 */
100
static event_t *peek_event(private_scheduler_t *this)
101
0
{
102
0
  return this->event_count > 0 ? this->heap[1] : NULL;
103
0
}
104
105
/**
106
 * Removes the top event from the heap and returns it. Returns NULL if the heap
107
 * is empty.
108
 */
109
static event_t *remove_event(private_scheduler_t *this)
110
3.51k
{
111
3.51k
  event_t *event, *top;
112
113
3.51k
  if (!this->event_count)
114
3.51k
  {
115
3.51k
    return NULL;
116
3.51k
  }
117
118
  /* store the value to return */
119
0
  event = this->heap[1];
120
  /* move the bottom event to the top */
121
0
  top = this->heap[1] = this->heap[this->event_count];
122
123
0
  if (--this->event_count > 1)
124
0
  {
125
0
    u_int position = 1;
126
127
    /* seep down the top event */
128
0
    while ((position << 1) <= this->event_count)
129
0
    {
130
0
      u_int child = position << 1;
131
132
0
      if ((child + 1) <= this->event_count &&
133
0
        timercmp(&this->heap[child + 1]->time,
134
0
             &this->heap[child]->time, <))
135
0
      {
136
        /* the "right" child is smaller */
137
0
        child++;
138
0
      }
139
140
0
      if (!timercmp(&top->time, &this->heap[child]->time, >))
141
0
      {
142
        /* the top event fires before the smaller of the two children,
143
         * stop */
144
0
        break;
145
0
      }
146
147
      /* swap with the smaller child */
148
0
      this->heap[position] = this->heap[child];
149
0
      position = child;
150
0
    }
151
0
    this->heap[position] = top;
152
0
  }
153
0
  return event;
154
3.51k
}
155
156
/**
157
 * Get events from the queue and pass it to the processor
158
 */
159
static job_requeue_t schedule(private_scheduler_t * this)
160
0
{
161
0
  timeval_t now;
162
0
  event_t *event;
163
0
  bool timed = FALSE, oldstate;
164
165
0
  this->mutex->lock(this->mutex);
166
167
0
  time_monotonic(&now);
168
169
0
  if ((event = peek_event(this)) != NULL)
170
0
  {
171
0
    if (!timercmp(&now, &event->time, <))
172
0
    {
173
0
      remove_event(this);
174
0
      this->mutex->unlock(this->mutex);
175
0
      DBG2(DBG_JOB, "got event, queuing job for execution");
176
0
      lib->processor->queue_job(lib->processor, event->job);
177
0
      free(event);
178
0
      return JOB_REQUEUE_DIRECT;
179
0
    }
180
0
    timersub(&event->time, &now, &now);
181
0
    if (now.tv_sec)
182
0
    {
183
0
      DBG2(DBG_JOB, "next event in %ds %dms, waiting",
184
0
         now.tv_sec, now.tv_usec/1000);
185
0
    }
186
0
    else
187
0
    {
188
0
      DBG2(DBG_JOB, "next event in %dms, waiting", now.tv_usec/1000);
189
0
    }
190
0
    timed = TRUE;
191
0
  }
192
0
  thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
193
0
  oldstate = thread_cancelability(TRUE);
194
195
0
  if (timed)
196
0
  {
197
0
    this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time);
198
0
  }
199
0
  else
200
0
  {
201
0
    DBG2(DBG_JOB, "no events, waiting");
202
0
    this->condvar->wait(this->condvar, this->mutex);
203
0
  }
204
0
  thread_cancelability(oldstate);
205
0
  thread_cleanup_pop(TRUE);
206
0
  return JOB_REQUEUE_DIRECT;
207
0
}
208
209
METHOD(scheduler_t, get_job_load, u_int,
210
  private_scheduler_t *this)
211
0
{
212
0
  int count;
213
214
0
  this->mutex->lock(this->mutex);
215
0
  count = this->event_count;
216
0
  this->mutex->unlock(this->mutex);
217
0
  return count;
218
0
}
219
220
METHOD(scheduler_t, schedule_job_tv, void,
221
  private_scheduler_t *this, job_t *job, timeval_t tv)
222
0
{
223
0
  event_t *event;
224
0
  u_int position;
225
226
0
  event = malloc_thing(event_t);
227
0
  event->job = job;
228
0
  event->job->status = JOB_STATUS_QUEUED;
229
0
  event->time = tv;
230
231
0
  this->mutex->lock(this->mutex);
232
233
0
  this->event_count++;
234
0
  if (this->event_count > this->heap_size)
235
0
  {
236
    /* double the size of the heap */
237
0
    this->heap_size <<= 1;
238
0
    this->heap = (event_t**)realloc(this->heap,
239
0
                  (this->heap_size + 1) * sizeof(event_t*));
240
0
  }
241
  /* "put" the event to the bottom */
242
0
  position = this->event_count;
243
244
  /* then bubble it up */
245
0
  while (position > 1 &&
246
0
       timercmp(&this->heap[position >> 1]->time, &event->time, >))
247
0
  {
248
    /* parent has to be fired after the new event, move up */
249
0
    this->heap[position] = this->heap[position >> 1];
250
0
    position >>= 1;
251
0
  }
252
0
  this->heap[position] = event;
253
254
0
  this->condvar->signal(this->condvar);
255
0
  this->mutex->unlock(this->mutex);
256
0
}
257
258
METHOD(scheduler_t, schedule_job, void,
259
  private_scheduler_t *this, job_t *job, uint32_t s)
260
0
{
261
0
  timeval_t tv;
262
263
0
  time_monotonic(&tv);
264
0
  tv.tv_sec += s;
265
266
0
  schedule_job_tv(this, job, tv);
267
0
}
268
269
METHOD(scheduler_t, schedule_job_ms, void,
270
  private_scheduler_t *this, job_t *job, uint32_t ms)
271
0
{
272
0
  timeval_t tv, add;
273
274
0
  time_monotonic(&tv);
275
0
  add.tv_sec = ms / 1000;
276
0
  add.tv_usec = (ms % 1000) * 1000;
277
278
0
  timeradd(&tv, &add, &tv);
279
280
0
  schedule_job_tv(this, job, tv);
281
0
}
282
283
METHOD(scheduler_t, flush, void,
284
  private_scheduler_t *this)
285
3.51k
{
286
3.51k
  event_t *event;
287
288
3.51k
  this->mutex->lock(this->mutex);
289
3.51k
  while ((event = remove_event(this)) != NULL)
290
0
  {
291
0
    event_destroy(event);
292
0
  }
293
3.51k
  this->condvar->signal(this->condvar);
294
3.51k
  this->mutex->unlock(this->mutex);
295
3.51k
}
296
297
METHOD(scheduler_t, destroy, void,
298
  private_scheduler_t *this)
299
3.51k
{
300
3.51k
  flush(this);
301
3.51k
  this->condvar->destroy(this->condvar);
302
3.51k
  this->mutex->destroy(this->mutex);
303
3.51k
  free(this->heap);
304
3.51k
  free(this);
305
3.51k
}
306
307
/*
308
 * Described in header.
309
 */
310
scheduler_t * scheduler_create()
311
3.51k
{
312
3.51k
  private_scheduler_t *this;
313
3.51k
  callback_job_t *job;
314
315
3.51k
  INIT(this,
316
3.51k
    .public = {
317
3.51k
      .get_job_load = _get_job_load,
318
3.51k
      .schedule_job = _schedule_job,
319
3.51k
      .schedule_job_ms = _schedule_job_ms,
320
3.51k
      .schedule_job_tv = _schedule_job_tv,
321
3.51k
      .flush = _flush,
322
3.51k
      .destroy = _destroy,
323
3.51k
    },
324
3.51k
    .heap_size = HEAP_SIZE_DEFAULT,
325
3.51k
    .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
326
3.51k
    .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
327
3.51k
  );
328
329
3.51k
  this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
330
331
3.51k
  job = callback_job_create_with_prio((callback_job_cb_t)schedule, this,
332
3.51k
                    NULL, return_false, JOB_PRIO_CRITICAL);
333
3.51k
  lib->processor->queue_job(lib->processor, (job_t*)job);
334
335
3.51k
  return &this->public;
336
3.51k
}