Coverage Report

Created: 2024-02-25 06:34

/src/kamailio/src/core/async_task.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2014 Daniel-Constantin Mierla (asipto.com)
3
 *
4
 * This file is part of Kamailio, a free SIP server.
5
 *
6
 * Permission to use, copy, modify, and distribute this software for any
7
 * purpose with or without fee is hereby granted, provided that the above
8
 * copyright notice and this permission notice appear in all copies.
9
 *
10
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17
 */
18
/*!
19
* \file
20
* \brief Kamailio core :: Asynchronus tasks
21
* \ingroup core
22
* Module: \ref core
23
*/
24
25
#include <stdio.h>
26
#include <unistd.h>
27
#include <stdlib.h>
28
#include <string.h>
29
30
#include <sys/socket.h>
31
#include <sys/types.h>
32
#include <sys/un.h>
33
#include <netinet/in.h>
34
#include <arpa/inet.h>
35
#include <fcntl.h>
36
#include <errno.h>
37
38
#include "dprint.h"
39
#include "sr_module.h"
40
#include "ut.h"
41
#include "pt.h"
42
#include "cfg/cfg_struct.h"
43
#include "parser/parse_param.h"
44
45
46
#include "async_task.h"
47
48
static async_wgroup_t *_async_wgroup_list = NULL;
49
static async_wgroup_t *_async_wgroup_crt = NULL;
50
51
int async_task_run(async_wgroup_t *awg, int idx);
52
53
/**
54
 *
55
 */
56
int async_task_workers_get(void)
57
0
{
58
0
  return (_async_wgroup_list) ? _async_wgroup_list->workers : 0;
59
0
}
60
61
/**
62
 *
63
 */
64
int async_task_workers_active(void)
65
0
{
66
0
  if(_async_wgroup_list == NULL || _async_wgroup_list->workers <= 0)
67
0
    return 0;
68
69
0
  return 1;
70
0
}
71
72
/**
73
 *
74
 */
75
async_wgroup_t *async_task_workers_get_crt(void)
76
0
{
77
0
  return _async_wgroup_crt;
78
0
}
79
80
/**
81
 *
82
 */
83
int async_task_init_sockets(void)
84
0
{
85
0
  int val;
86
0
  async_wgroup_t *awg;
87
88
0
  for(awg = _async_wgroup_list; awg != NULL; awg = awg->next) {
89
0
    if(socketpair(PF_UNIX, SOCK_DGRAM, 0, awg->sockets) < 0) {
90
0
      LM_ERR("opening tasks dgram socket pair\n");
91
0
      return -1;
92
0
    }
93
94
0
    if(awg->nonblock) {
95
0
      val = fcntl(awg->sockets[1], F_GETFL, 0);
96
0
      if(val < 0) {
97
0
        LM_WARN("failed to get socket flags\n");
98
0
      } else {
99
0
        if(fcntl(awg->sockets[1], F_SETFL, val | O_NONBLOCK) < 0) {
100
0
          LM_WARN("failed to set socket nonblock flag\n");
101
0
        }
102
0
      }
103
0
    }
104
0
  }
105
106
0
  LM_DBG("inter-process event notification sockets initialized\n");
107
0
  return 0;
108
0
}
109
110
/**
111
 *
112
 */
113
void async_task_close_sockets_child(void)
114
0
{
115
0
  async_wgroup_t *awg;
116
117
0
  LM_DBG("closing the notification socket used by children\n");
118
119
0
  for(awg = _async_wgroup_list; awg != NULL; awg = awg->next) {
120
0
    close(awg->sockets[1]);
121
0
  }
122
0
}
123
124
/**
125
 *
126
 */
127
void async_task_close_sockets_parent(void)
128
0
{
129
0
  async_wgroup_t *awg;
130
131
0
  LM_DBG("closing the notification socket used by parent\n");
132
133
0
  for(awg = _async_wgroup_list; awg != NULL; awg = awg->next) {
134
0
    close(awg->sockets[0]);
135
0
  }
136
0
}
137
138
/**
139
 *
140
 */
141
int async_task_init(void)
142
0
{
143
0
  int nrg = 0;
144
0
  async_wgroup_t *awg;
145
146
0
  LM_DBG("start initializing async task framework\n");
147
0
  if(_async_wgroup_list == NULL || _async_wgroup_list->workers <= 0)
148
0
    return 0;
149
150
  /* overall number of processes */
151
0
  for(awg = _async_wgroup_list; awg != NULL; awg = awg->next) {
152
0
    nrg += awg->workers;
153
0
  }
154
155
  /* advertise new processes to core */
156
0
  register_procs(nrg);
157
158
  /* advertise new processes to cfg framework */
159
0
  cfg_register_child(nrg);
160
161
0
  return 0;
162
0
}
163
164
/**
165
 *
166
 */
167
int async_task_initialized(void)
168
0
{
169
0
  if(_async_wgroup_list == NULL || _async_wgroup_list->workers <= 0)
170
0
    return 0;
171
0
  return 1;
172
0
}
173
174
/**
175
 *
176
 */
177
int async_task_child_init(int rank)
178
0
{
179
0
  int pid;
180
0
  int i;
181
0
  char pname[64];
182
0
  async_wgroup_t *awg;
183
184
0
  if(_async_wgroup_list == NULL || _async_wgroup_list->workers <= 0)
185
0
    return 0;
186
187
0
  LM_DBG("child initializing async task framework\n");
188
189
0
  if(rank == PROC_INIT) {
190
0
    if(async_task_init_sockets() < 0) {
191
0
      LM_ERR("failed to initialize tasks sockets\n");
192
0
      return -1;
193
0
    }
194
0
    return 0;
195
0
  }
196
197
0
  if(rank > 0) {
198
    /* no need to close the socket from sip workers */
199
    /* async_task_close_sockets_parent(); */
200
0
    return 0;
201
0
  }
202
0
  if(rank != PROC_MAIN)
203
0
    return 0;
204
205
0
  for(awg = _async_wgroup_list; awg != NULL; awg = awg->next) {
206
0
    snprintf(pname, 62, "Async Task Worker - %s",
207
0
        (awg->name.s) ? awg->name.s : "unknown");
208
0
    for(i = 0; i < awg->workers; i++) {
209
0
      pid = fork_process(PROC_RPC, pname, 1);
210
0
      if(pid < 0) {
211
0
        return -1; /* error */
212
0
      }
213
0
      if(pid == 0) {
214
        /* child */
215
216
        /* initialize the config framework */
217
0
        if(cfg_child_init()) {
218
0
          return -1;
219
0
        }
220
        /* main function for workers */
221
0
        if(async_task_run(awg, i + 1) < 0) {
222
0
          LM_ERR("failed to initialize task worker process: %d\n", i);
223
0
          return -1;
224
0
        }
225
0
      }
226
0
    }
227
0
  }
228
229
0
  return 0;
230
0
}
231
232
/**
233
 *
234
 */
235
int async_task_set_workers(int n)
236
0
{
237
0
  str gname = str_init("default");
238
239
0
  if(_async_wgroup_list != NULL && _async_wgroup_list->workers > 0) {
240
0
    LM_WARN("task workers already set\n");
241
0
    return 0;
242
0
  }
243
0
  if(n <= 0)
244
0
    return 0;
245
246
0
  if(_async_wgroup_list == NULL) {
247
0
    _async_wgroup_list = (async_wgroup_t *)pkg_malloc(
248
0
        sizeof(async_wgroup_t) + (gname.len + 1) * sizeof(char));
249
0
    if(_async_wgroup_list == NULL) {
250
0
      LM_ERR("failed to create async wgroup\n");
251
0
      return -1;
252
0
    }
253
0
    memset(_async_wgroup_list, 0,
254
0
        sizeof(async_wgroup_t) + (gname.len + 1) * sizeof(char));
255
0
    _async_wgroup_list->name.s =
256
0
        (char *)_async_wgroup_list + sizeof(async_wgroup_t);
257
0
    memcpy(_async_wgroup_list->name.s, gname.s, gname.len);
258
0
    _async_wgroup_list->name.len = gname.len;
259
0
  }
260
0
  _async_wgroup_list->workers = n;
261
262
0
  return 0;
263
0
}
264
265
/**
266
 *
267
 */
268
int async_task_set_nonblock(int n)
269
0
{
270
0
  if(n > 0 && _async_wgroup_list != NULL) {
271
0
    _async_wgroup_list->nonblock = 1;
272
0
  }
273
274
0
  return 0;
275
0
}
276
277
/**
278
 *
279
 */
280
int async_task_set_usleep(int n)
281
0
{
282
0
  int v = 0;
283
284
0
  if(_async_wgroup_list != NULL) {
285
0
    v = _async_wgroup_list->usleep;
286
0
    _async_wgroup_list->usleep = n;
287
0
  }
288
289
0
  return v;
290
0
}
291
292
/**
293
 *
294
 */
295
int async_task_set_workers_group(char *data)
296
0
{
297
0
  str sval;
298
0
  param_t *params_list = NULL;
299
0
  param_hooks_t phooks;
300
0
  param_t *pit = NULL;
301
0
  async_wgroup_t awg;
302
0
  async_wgroup_t *newg;
303
304
0
  if(data == NULL) {
305
0
    return -1;
306
0
  }
307
0
  sval.s = data;
308
0
  sval.len = strlen(sval.s);
309
310
0
  if(sval.len <= 0) {
311
0
    LM_ERR("invalid parameter value\n");
312
0
    return -1;
313
0
  }
314
315
0
  if(sval.s[sval.len - 1] == ';') {
316
0
    sval.len--;
317
0
  }
318
0
  if(parse_params(&sval, CLASS_ANY, &phooks, &params_list) < 0) {
319
0
    return -1;
320
0
  }
321
0
  memset(&awg, 0, sizeof(async_wgroup_t));
322
323
0
  for(pit = params_list; pit; pit = pit->next) {
324
0
    if(pit->name.len == 4 && strncasecmp(pit->name.s, "name", 4) == 0) {
325
0
      awg.name = pit->body;
326
0
    } else if(pit->name.len == 7
327
0
          && strncasecmp(pit->name.s, "workers", 7) == 0) {
328
0
      if(str2sint(&pit->body, &awg.workers) < 0) {
329
0
        LM_ERR("invalid workers value: %.*s\n", pit->body.len,
330
0
            pit->body.s);
331
0
        return -1;
332
0
      }
333
0
    } else if(pit->name.len == 6
334
0
          && strncasecmp(pit->name.s, "usleep", 6) == 0) {
335
0
      if(str2sint(&pit->body, &awg.usleep) < 0) {
336
0
        LM_ERR("invalid usleep value: %.*s\n", pit->body.len,
337
0
            pit->body.s);
338
0
        return -1;
339
0
      }
340
0
    } else if(pit->name.len == 8
341
0
          && strncasecmp(pit->name.s, "nonblock", 8) == 0) {
342
0
      if(str2sint(&pit->body, &awg.nonblock) < 0) {
343
0
        LM_ERR("invalid nonblock value: %.*s\n", pit->body.len,
344
0
            pit->body.s);
345
0
        return -1;
346
0
      }
347
0
    }
348
0
  }
349
350
0
  if(awg.name.len <= 0) {
351
0
    LM_ERR("invalid name value: [%.*s]\n", sval.len, sval.s);
352
0
    return -1;
353
0
  }
354
0
  if(awg.workers <= 0) {
355
0
    LM_ERR("invalid workers value: %d\n", awg.workers);
356
0
    return -1;
357
0
  }
358
359
0
  if(awg.name.len == 7 && strncmp(awg.name.s, "default", 7) == 0) {
360
0
    if(async_task_set_workers(awg.workers) < 0) {
361
0
      LM_ERR("failed to create the default group\n");
362
0
      return -1;
363
0
    }
364
0
    async_task_set_nonblock(awg.nonblock);
365
0
    async_task_set_usleep(awg.usleep);
366
0
    return 0;
367
0
  }
368
0
  if(_async_wgroup_list == NULL) {
369
0
    if(async_task_set_workers(1) < 0) {
370
0
      LM_ERR("failed to create the initial default group\n");
371
0
      return -1;
372
0
    }
373
0
  }
374
375
0
  newg = (async_wgroup_t *)pkg_malloc(
376
0
      sizeof(async_wgroup_t) + (awg.name.len + 1) * sizeof(char));
377
0
  if(newg == NULL) {
378
0
    LM_ERR("failed to create async wgroup [%.*s]\n", sval.len, sval.s);
379
0
    return -1;
380
0
  }
381
0
  memset(newg, 0, sizeof(async_wgroup_t) + (awg.name.len + 1) * sizeof(char));
382
0
  newg->name.s = (char *)newg + sizeof(async_wgroup_t);
383
0
  memcpy(newg->name.s, awg.name.s, awg.name.len);
384
0
  newg->name.len = awg.name.len;
385
0
  newg->workers = awg.workers;
386
0
  newg->nonblock = awg.nonblock;
387
0
  newg->usleep = awg.usleep;
388
389
0
  newg->next = _async_wgroup_list->next;
390
0
  _async_wgroup_list->next = newg;
391
392
0
  return 0;
393
0
}
394
395
/**
396
 *
397
 */
398
int async_task_push(async_task_t *task)
399
0
{
400
0
  int len;
401
402
0
  if(_async_wgroup_list == NULL || _async_wgroup_list->workers <= 0) {
403
0
    LM_WARN("async task pushed, but no async workers - ignoring\n");
404
0
    return 0;
405
0
  }
406
407
0
  len = write(_async_wgroup_list->sockets[1], &task, sizeof(async_task_t *));
408
0
  if(len <= 0) {
409
0
    LM_ERR("failed to pass the task to async workers\n");
410
0
    return -1;
411
0
  }
412
0
  LM_DBG("task sent [%p]\n", task);
413
0
  return 0;
414
0
}
415
416
/**
417
 *
418
 */
419
int async_task_group_push(str *gname, async_task_t *task)
420
0
{
421
0
  int len;
422
0
  async_wgroup_t *awg = NULL;
423
424
0
  if(_async_wgroup_list == NULL) {
425
0
    LM_WARN("async task pushed, but no async group - ignoring\n");
426
0
    return 0;
427
0
  }
428
0
  for(awg = _async_wgroup_list; awg != NULL; awg = awg->next) {
429
0
    if(awg->name.len == gname->len
430
0
        && memcmp(awg->name.s, gname->s, gname->len) == 0) {
431
0
      break;
432
0
    }
433
0
  }
434
0
  if(awg == NULL) {
435
0
    LM_WARN("group [%.*s] not found - ignoring\n", gname->len, gname->s);
436
0
    return 0;
437
0
  }
438
0
  len = write(awg->sockets[1], &task, sizeof(async_task_t *));
439
0
  if(len <= 0) {
440
0
    LM_ERR("failed to pass the task [%p] to group [%.*s]\n", task,
441
0
        gname->len, gname->s);
442
0
    return -1;
443
0
  }
444
0
  LM_DBG("task [%p] sent to group [%.*s]\n", task, gname->len, gname->s);
445
0
  return 0;
446
0
}
447
448
/**
449
 *
450
 */
451
int async_task_run(async_wgroup_t *awg, int idx)
452
0
{
453
0
  async_task_t *ptask;
454
0
  int received;
455
456
0
  LM_DBG("async task worker [%.*s] idx [%d] ready\n", awg->name.len,
457
0
      awg->name.s, idx);
458
459
0
  _async_wgroup_crt = awg;
460
461
0
  for(;;) {
462
0
    if(unlikely(awg->usleep))
463
0
      sleep_us(awg->usleep);
464
0
    if((received = recvfrom(awg->sockets[0], &ptask, sizeof(async_task_t *),
465
0
          0, NULL, 0))
466
0
        < 0) {
467
0
      LM_ERR("failed to received task (%d: %s)\n", errno,
468
0
          strerror(errno));
469
0
      continue;
470
0
    }
471
0
    if(received != sizeof(async_task_t *)) {
472
0
      LM_ERR("invalid task size %d\n", received);
473
0
      continue;
474
0
    }
475
0
    if(ptask->exec != NULL) {
476
0
      LM_DBG("task executed [%p] (%p/%p)\n", (void *)ptask,
477
0
          (void *)ptask->exec, (void *)ptask->param);
478
0
      ptask->exec(ptask->param);
479
0
    } else {
480
0
      LM_DBG("task with no callback function - ignoring\n");
481
0
    }
482
0
    shm_free(ptask);
483
0
  }
484
485
0
  return 0;
486
0
}