Coverage Report

Created: 2025-07-12 06:14

/src/opensips/pt.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2007 Voice Sistem SRL
3
 * Copyright (C) 2008-2019 OpenSIPS Project
4
 *
5
 * This file is part of opensips, a free SIP server.
6
 *
7
 * opensips is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version
11
 *
12
 * opensips is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20
 */
21
22
#include <sys/types.h>
23
#include <sys/wait.h>
24
#include <unistd.h>
25
#include <sched.h>
26
#include <stdio.h>
27
28
#include "lib/dbg/profiling.h"
29
#include "mem/shm_mem.h"
30
#include "net/net_tcp.h"
31
#include "net/net_udp.h"
32
#include "db/db_insertq.h"
33
#include "sr_module.h"
34
#include "dprint.h"
35
#include "pt.h"
36
#include "bin_interface.h"
37
#include "core_stats.h"
38
39
40
/* array with children pids, 0= main proc,
41
 * alloc'ed in shared mem if possible */
42
struct process_table *pt = NULL;
43
44
/* The maximum number of processes that will ever exist in OpenSIPS. This is
45
 * actually the size of the process table
46
 * This is READONLY!! */
47
unsigned int counted_max_processes = 0;
48
49
/* flag per process to control the termination stages */
50
int _termination_in_progress = 0;
51
52
static int internal_fork_child_setup(const struct internal_fork_params *);
53
54
static struct internal_fork_handler default_fh = {
55
  .desc = "internal_fork_child_setup()",
56
  .post_fork.in_child = internal_fork_child_setup,
57
};
58
59
static struct internal_fork_handler *_fork_handlers = &default_fh;
60
61
/* Register handlers to be invoked after internal_fork()
62
 * to do various per-subsystem setup / cleanup tasks.
63
 * Takes a reference to a "stable" structure (i.e. static or
64
 * malloc'ed) which has to be alive until the last internal_fork()
65
 * is called. */
66
void register_fork_handler(struct internal_fork_handler *h)
67
0
{
68
0
  struct internal_fork_handler *hp;
69
70
0
  if (is_main == 0) {
71
0
    LM_BUG("buggy call from non-main process!!!\n");
72
0
    abort();
73
0
  }
74
0
  if (h->_next != NULL) {
75
0
    LM_BUG("buggy call h->_next != NULL!!!\n");
76
0
    abort();
77
0
  }
78
79
0
  for (hp = _fork_handlers; hp->_next != NULL; hp = hp->_next)
80
0
    continue;
81
0
  hp->_next = h;
82
0
};
83
84
static unsigned long count_running_processes(void *x)
85
0
{
86
0
  int i,cnt=0;
87
88
0
  if (pt)
89
0
    for ( i=0 ; i<counted_max_processes ; i++ )
90
0
      if (is_process_running(i))
91
0
        cnt++;
92
93
0
  return cnt;
94
0
}
95
96
97
int init_multi_proc_support(void)
98
0
{
99
0
  int i;
100
  /* at this point we know exactly the possible number of processes, since
101
   * all the other modules already adjusted their extra numbers */
102
0
  counted_max_processes = count_child_processes();
103
104
#ifdef UNIT_TESTS
105
#include "mem/test/test_malloc.h"
106
  counted_max_processes += TEST_MALLOC_PROCS - 1;
107
#endif
108
109
  /* allocate the PID table to accomodate the maximum possible number of
110
   * process we may have during runtime (covering extra procs created 
111
   * due auto-scaling) */
112
0
  pt = shm_malloc(sizeof(struct process_table)*counted_max_processes);
113
0
  if (pt==0){
114
0
    LM_ERR("out of memory\n");
115
0
    return -1;
116
0
  }
117
0
  memset(pt, 0, sizeof(struct process_table)*counted_max_processes);
118
119
0
  for( i=0 ; i<counted_max_processes ; i++ ) {
120
    /* reset fds to prevent bogus ops */
121
0
    pt[i].unix_sock = -1;
122
0
    pt[i].pid = -1;
123
0
    pt[i].ipc_pipe[0] = pt[i].ipc_pipe[1] = -1;
124
0
    pt[i].ipc_sync_pipe[0] = pt[i].ipc_sync_pipe[1] = -1;
125
0
  }
126
127
  /* create the load-related stats (initially marked as hidden */
128
  /* until the proc starts) */
129
0
  if (register_processes_load_stats( counted_max_processes ) != 0) {
130
0
    LM_ERR("failed to create load stats\n");
131
0
    return -1;
132
0
  }
133
134
  /* create the IPC pipes for all possible procs */
135
0
  if (create_ipc_pipes( counted_max_processes )<0) {
136
0
    LM_ERR("failed to create IPC pipes, aborting\n");
137
0
    return -1;
138
0
  }
139
140
  /* create the IPC pipes for all possible procs */
141
0
  if (tcp_create_comm_proc_socks( counted_max_processes )<0) {
142
0
    LM_ERR("failed to create TCP layer communication, aborting\n");
143
0
    return -1;
144
0
  }
145
146
  /* create the pkg_mem stats */
147
  #ifdef PKG_MALLOC
148
  if (init_pkg_stats(counted_max_processes)!=0) {
149
    LM_ERR("failed to init stats for pkg\n");
150
    return -1;
151
  }
152
  #endif
153
154
  /* set the pid for the starter process */
155
0
  set_proc_attrs("starter");
156
157
  /* register the stats for the global load */
158
0
  if ( register_stat2( "load", "load", (stat_var**)pt_get_rt_load,
159
0
  STAT_IS_FUNC, NULL, 0) != 0) {
160
0
    LM_ERR("failed to add RT global load stat\n");
161
0
    return -1;
162
0
  }
163
164
0
  if ( register_stat2( "load", "load1m", (stat_var**)pt_get_1m_load,
165
0
  STAT_IS_FUNC, NULL, 0) != 0) {
166
0
    LM_ERR("failed to add RT global load stat\n");
167
0
    return -1;
168
0
  }
169
170
0
  if ( register_stat2( "load", "load10m", (stat_var**)pt_get_10m_load,
171
0
  STAT_IS_FUNC, NULL, 0) != 0) {
172
0
    LM_ERR("failed to add RT global load stat\n");
173
0
    return -1;
174
0
  }
175
176
  /* register the stats for the extended global load */
177
0
  if ( register_stat2( "load", "load-all", (stat_var**)pt_get_rt_loadall,
178
0
  STAT_IS_FUNC, NULL, 0) != 0) {
179
0
    LM_ERR("failed to add RT global load stat\n");
180
0
    return -1;
181
0
  }
182
183
0
  if ( register_stat2( "load", "load1m-all", (stat_var**)pt_get_1m_loadall,
184
0
  STAT_IS_FUNC, NULL, 0) != 0) {
185
0
    LM_ERR("failed to add RT global load stat\n");
186
0
    return -1;
187
0
  }
188
189
0
  if ( register_stat2( "load", "load10m-all", (stat_var**)pt_get_10m_loadall,
190
0
  STAT_IS_FUNC, NULL, 0) != 0) {
191
0
    LM_ERR("failed to add RT global load stat\n");
192
0
    return -1;
193
0
  }
194
195
0
  if ( register_stat2( "load", "processes_number",
196
0
  (stat_var**)count_running_processes,
197
0
  STAT_IS_FUNC, NULL, 0) != 0) {
198
0
    LM_ERR("failed to add processes_number stat\n");
199
0
    return -1;
200
0
  }
201
202
0
  return 0;
203
0
}
204
205
206
void set_proc_attrs(const char *fmt, ...)
207
0
{
208
0
  va_list ap;
209
210
  /* description */
211
0
  va_start(ap, fmt);
212
0
  vsnprintf( pt[process_no].desc, MAX_PT_DESC, fmt, ap);
213
0
  va_end(ap);
214
215
  /* pid */
216
0
  pt[process_no].pid=getpid();
217
0
}
218
219
220
/* Resets all the values in the process table for a given id (a slot) so that
221
 * it can be reused later 
222
 * WARNING: this should be called only by main process and when it is 100% 
223
 *  that the process mapped on this slot is not running anymore */
224
void reset_process_slot( int p_id )
225
0
{
226
0
  if (is_main==0) {
227
0
    LM_BUG("buggy call from non-main process!!!");
228
0
    return;
229
0
  }
230
231
  /* we cannot simply do a memset here, as we need to preserve the holders
232
   * with the inter-process communication fds */
233
0
  pt[p_id].pid = -1;
234
0
  pt[p_id].type = TYPE_NONE;
235
0
  pt[p_id].pg_filter = NULL;
236
0
  pt[p_id].desc[0] = 0;
237
0
  pt[p_id].flags = 0;
238
239
0
  pt[p_id].ipc_pipe[0] = pt[p_id].ipc_pipe[1] = -1;
240
0
  pt[p_id].ipc_sync_pipe[0] = pt[p_id].ipc_sync_pipe[1] = -1;
241
0
  pt[p_id].unix_sock = -1;
242
243
0
  pt[p_id].log_level = pt[p_id].default_log_level = 0; /*not really needed*/
244
245
  /* purge all load-related data */
246
0
  memset( &pt[p_id].load, 0, sizeof(struct proc_load_info));
247
  /* hide the load stats */
248
0
  pt[p_id].load_rt->flags |= STAT_HIDDEN;
249
0
  pt[p_id].load_1m->flags |= STAT_HIDDEN;
250
0
  pt[p_id].load_10m->flags |= STAT_HIDDEN;
251
  #ifdef PKG_MALLOC
252
  pt[p_id].pkg_total->flags |= STAT_HIDDEN;
253
  pt[p_id].pkg_used->flags |= STAT_HIDDEN;
254
  pt[p_id].pkg_rused->flags |= STAT_HIDDEN;
255
  pt[p_id].pkg_mused->flags |= STAT_HIDDEN;
256
  pt[p_id].pkg_free->flags |= STAT_HIDDEN;
257
  pt[p_id].pkg_frags->flags |= STAT_HIDDEN;
258
  #endif
259
0
}
260
261
262
enum {CHLD_STARTING, CHLD_OK, CHLD_FAILED};
263
264
static __attribute__((__noreturn__)) void child_startup_failed(void)
265
0
{
266
0
  atomic_store(&pt[process_no].startup_result, CHLD_FAILED);
267
0
  exit(1);
268
0
}
269
270
static int internal_fork_child_setup(const struct internal_fork_params *ifpp)
271
0
{
272
0
  init_log_level();
273
274
0
  tcp_connect_proc_to_tcp_main(process_no, 1);
275
276
  /* free the script if not needed */
277
0
  if (!(ifpp->flags & OSS_PROC_NEEDS_SCRIPT) && sroutes) {
278
0
    free_route_lists(sroutes);
279
0
    sroutes = NULL;
280
0
  }
281
0
  return 0;
282
0
}
283
284
/* This function is to be called only by the main process!
285
 * Returns, on success, the ID (non zero) in the process table of the
286
 * newly forked procees.
287
 * */
288
int internal_fork(const struct internal_fork_params *ifpp)
289
0
{
290
0
  int new_idx;
291
0
  pid_t pid;
292
0
  unsigned int seed;
293
294
0
  if (is_main==0) {
295
0
    LM_BUG("buggy call from non-main process!!!");
296
0
    return -1;
297
0
  }
298
299
0
  new_idx = 1; /* start from 1 as 0 (attendent) is always running */
300
0
  for( ; new_idx<counted_max_processes ; new_idx++)
301
0
    if ( (pt[new_idx].flags&OSS_PROC_IS_RUNNING)==0 ) break;
302
0
  if (new_idx==counted_max_processes) {
303
0
    LM_BUG("no free process slot found while trying to fork again\n");
304
0
    return -1;
305
0
  }
306
307
0
  seed = rand();
308
309
0
  LM_DBG("forking new process \"%s\" on slot %d\n", ifpp->proc_desc, new_idx);
310
311
  /* set TCP communication */
312
0
  if (tcp_activate_comm_proc_socks(new_idx)<0){
313
0
    LM_ERR("failed to connect future proc %d to TCP main\n",
314
0
      process_no);
315
0
    return -1;
316
0
  }
317
318
  /* set the IPC pipes */
319
0
  if ( (ifpp->flags & OSS_PROC_NO_IPC) ) {
320
    /* advertise no IPC to the rest of the procs */
321
0
    pt[new_idx].ipc_pipe[0] = -1;
322
0
    pt[new_idx].ipc_pipe[1] = -1;
323
0
    pt[new_idx].ipc_sync_pipe[0] = -1;
324
0
    pt[new_idx].ipc_sync_pipe[1] = -1;
325
    /* NOTE: the IPC fds will remain open in the other processes,
326
     * but they will not be known */
327
0
  } else {
328
    /* activate the IPC pipes */
329
0
    pt[new_idx].ipc_pipe[0]=pt[new_idx].ipc_pipe_holder[0];
330
0
    pt[new_idx].ipc_pipe[1]=pt[new_idx].ipc_pipe_holder[1];
331
0
    pt[new_idx].ipc_sync_pipe[0]=pt[new_idx].ipc_sync_pipe_holder[0];
332
0
    pt[new_idx].ipc_sync_pipe[1]=pt[new_idx].ipc_sync_pipe_holder[1];
333
0
  }
334
335
0
  pt[new_idx].pid = 0;
336
337
0
  atomic_init(&pt[new_idx].startup_result, CHLD_STARTING);
338
339
0
  if ( (pid=fork())<0 ){
340
0
    LM_CRIT("cannot fork \"%s\" process (%d: %s)\n",ifpp->proc_desc,
341
0
        errno, strerror(errno));
342
0
    reset_process_slot( new_idx );
343
0
    return -1;
344
0
  }
345
346
0
  if (pid==0){
347
0
    const struct internal_fork_handler *cfhp;
348
    /* child process */
349
0
    is_main = 0; /* a child is not main process */
350
    /* set uid */
351
0
    process_no = new_idx;
352
    /* set attributes, pid etc */
353
0
    set_proc_attrs(ifpp->proc_desc);
354
355
0
    pt[process_no].flags |= ifpp->flags;
356
0
    pt[process_no].type = ifpp->type;
357
    /* activate its load & pkg statistics, but only if IPC present */
358
0
    if ( (ifpp->flags & OSS_PROC_NO_IPC)==0 ) {
359
0
      pt[process_no].load_rt->flags &= (~STAT_HIDDEN);
360
0
      pt[process_no].load_1m->flags &= (~STAT_HIDDEN);
361
0
      pt[process_no].load_10m->flags &= (~STAT_HIDDEN);
362
      #ifdef PKG_MALLOC
363
      pt[process_no].pkg_total->flags &= (~STAT_HIDDEN);
364
      pt[process_no].pkg_used->flags &= (~STAT_HIDDEN);
365
      pt[process_no].pkg_rused->flags &= (~STAT_HIDDEN);
366
      pt[process_no].pkg_mused->flags &= (~STAT_HIDDEN);
367
      pt[process_no].pkg_free->flags &= (~STAT_HIDDEN);
368
      pt[process_no].pkg_frags->flags &= (~STAT_HIDDEN);
369
      #endif
370
0
    }
371
    /* each children need a unique seed */
372
0
    seed_child(seed);
373
374
0
    for (cfhp = _fork_handlers; cfhp != NULL; cfhp = cfhp->_next) {
375
0
      if (cfhp->post_fork.in_child == NULL)
376
0
        continue;
377
0
      if (cfhp->post_fork.in_child(ifpp) != 0) {
378
0
        LM_CRIT("failed to run %s for process %d\n", cfhp->desc,
379
0
            process_no);
380
0
        child_startup_failed();
381
0
      }
382
0
    }
383
0
    atomic_store(&pt[process_no].startup_result, CHLD_OK);
384
0
    return 0;
385
0
  }else{
386
    /* parent process */
387
    /* wait for the child to complete the critical sectoin of the
388
     * start-up */
389
0
    while (atomic_load(&pt[new_idx].startup_result) == CHLD_STARTING) {
390
0
      int status;
391
0
      sched_yield();
392
0
      pid_t result = waitpid(pid, &status, WNOHANG);
393
0
      if (result < 0) {
394
0
        if (errno == EINTR)
395
0
          continue;
396
0
        goto child_is_down;
397
0
      }
398
0
      if (result == 0) {
399
        // Child has not exited yet
400
0
        continue;
401
0
      }
402
      // Child has exited, oops
403
0
      goto child_is_down;
404
0
    }
405
0
    if (atomic_load(&pt[new_idx].startup_result) != CHLD_OK) {
406
0
      goto child_is_down;
407
0
    }
408
0
    pt[new_idx].flags |= OSS_PROC_IS_RUNNING;
409
0
    tcp_connect_proc_to_tcp_main( new_idx, 0);
410
0
    return new_idx;
411
0
child_is_down:
412
0
    LM_CRIT("failed to initialize child process %d\n", new_idx);
413
0
    reset_process_slot( new_idx );
414
0
    return -1;
415
0
  }
416
0
}
417
418
419
/* counts the number of processes created by OpenSIPS at startup. processes
420
 * that also do child_init() (the per-process module init)
421
 *
422
 * used for proper status return code
423
 */
424
int count_init_child_processes(void)
425
0
{
426
0
  int ret=0;
427
428
  /* listening children to be create at startup */
429
0
  ret += udp_count_processes(NULL);
430
0
  ret += tcp_count_processes(NULL);
431
0
  ret += timer_count_processes(NULL) - 2/*for keeper & trigger*/;
432
433
  /* attendent */
434
0
  ret++;
435
436
  /* count number of module procs going to be initialised */
437
0
  ret += count_module_procs(PROC_FLAG_INITCHILD);
438
439
0
  LM_DBG("%d children are going to be inited\n",ret);
440
0
  return ret;
441
0
}
442
443
/* counts the number of processes known by OpenSIPS at startup.
444
 * Note that the number of processes might change during init, if one of the
445
 * module decides that it will no longer use a process (ex; rtpproxy timeout
446
 * process)
447
 */
448
int count_child_processes(void)
449
0
{
450
0
  unsigned int proc_no;
451
0
  unsigned int proc_extra_no;
452
0
  unsigned int extra;
453
454
0
  proc_no = 0;
455
0
  proc_extra_no = 0;
456
457
  /* UDP based listeners */
458
0
  proc_no += udp_count_processes( &extra );
459
0
  proc_extra_no += extra;
460
461
  /* TCP based listeners */
462
0
  proc_no += tcp_count_processes( &extra );
463
0
  proc_extra_no += extra;
464
465
  /* Timer related processes */
466
0
  proc_no += timer_count_processes( &extra );
467
0
  proc_extra_no += extra;
468
469
  /* attendent */
470
0
  proc_no++;
471
472
  /* count the processes requested by modules */
473
0
  proc_no += count_module_procs(0);
474
475
0
  return proc_no + proc_extra_no;
476
0
}
477
478
479
void dynamic_process_final_exit(void)
480
0
{
481
  /* prevent any more IPC */
482
0
  pt[process_no].ipc_pipe[0] = -1;
483
0
  pt[process_no].ipc_pipe[1] = -1;
484
0
  pt[process_no].ipc_sync_pipe[0] = -1;
485
0
  pt[process_no].ipc_sync_pipe[1] = -1;
486
487
  /* clear the per-process connection from the DB queues */
488
0
  ql_force_process_disconnect(process_no);
489
490
  /* if a TCP proc by chance, reset the tcp-related data */
491
0
  tcp_reset_worker_slot();
492
493
0
  pt_become_idle();
494
495
  /* mark myself as DYNAMIC (just in case) to have an err-less termination */
496
0
  pt[process_no].flags |= OSS_PROC_SELFEXIT;
497
0
  LM_INFO("doing self termination\n");
498
499
  /* the process slot in the proc table will be purge on SIGCHLD by main */
500
0
  exit(0);
501
0
}
502
503
int run_post_fork_handlers(void)
504
0
{
505
0
  const struct internal_fork_handler *cfhp;
506
507
0
  for (cfhp = _fork_handlers; cfhp != NULL; cfhp = cfhp->_next) {
508
0
    if (cfhp->post_fork.in_parent == NULL)
509
0
      continue;
510
0
    if (cfhp->post_fork.in_parent() != 0) {
511
0
      LM_CRIT("failed to run %s for process %d\n", cfhp->desc,
512
0
          process_no);
513
0
      return (-1);
514
0
    }
515
0
  }
516
0
  return (0);
517
0
}