Coverage Report

Created: 2025-07-11 06:28

/src/opensips/pt_scaling.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2019 OpenSIPS Project
3
 *
4
 * This file is part of opensips, a free SIP server.
5
 *
6
 * opensips is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version
10
 *
11
 * opensips is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19
 */
20
21
#include <sys/types.h>
22
#include <unistd.h>
23
#include <stdio.h>
24
#include "mem/shm_mem.h"
25
#include "socket_info.h"
26
#include "dprint.h"
27
#include "pt.h"
28
#include "ipc.h"
29
#include "daemonize.h"
30
#include "status_report.h"
31
32
struct process_group {
33
  enum process_type type;
34
  struct socket_info *si_filter;
35
  fork_new_process_f *fork_func;
36
  terminate_process_f *term_func;
37
  struct scaling_profile *prof;
38
  unsigned char history_size;
39
  unsigned char *history_map;
40
  unsigned char history_idx;
41
  unsigned short no_downscale_cycles;
42
  str sr_identifier_name;
43
  struct process_group *next;
44
};
45
46
static struct process_group *pg_head = NULL;
47
48
static struct scaling_profile *profiles_head = NULL;
49
50
static void *pts_sr_group = NULL;
51
52
53
int init_auto_scaling(void)
54
0
{
55
0
  if (!auto_scaling_enabled)
56
0
    return 0;
57
58
0
  pts_sr_group = sr_register_group(CHAR_INT("auto-scaling"), 0/*is_public*/);
59
0
  if (pts_sr_group==NULL) {
60
0
    LM_ERR("Failed to register 'status_report' group for "
61
0
      "'auto-scaling' support\n");
62
0
    return -1;
63
0
  }
64
65
0
  return 0;
66
0
}
67
68
69
int create_auto_scaling_profile( char *name,
70
  unsigned int max_procs, unsigned int up_threshold,
71
  unsigned int up_cycles_needed, unsigned int up_cycles_tocheck,
72
  unsigned int min_procs, unsigned int down_threshold,
73
  unsigned int down_cycles_tocheck, unsigned short down_cycles_delay)
74
0
{
75
0
  struct scaling_profile *p;
76
77
  /* check for duplicates */
78
0
  if ( get_scaling_profile(name) ) {
79
0
    LM_ERR("profile <%s> (case insensitive) already created"
80
0
      " - double definition?? \n", name);
81
0
    return -1;
82
0
  }
83
84
  /* some sanity checks */
85
0
  if (min_procs==0) {
86
0
    down_threshold = 0;
87
0
    down_cycles_tocheck = 0;
88
0
    down_cycles_delay = 0;
89
0
  }
90
0
  if (max_procs==0 || max_procs <= min_procs || max_procs>=1000) {
91
0
    LM_ERR("invalid relation or range for MIN/MAX processes [%d,%d]\n",
92
0
      min_procs, max_procs);
93
0
    return -1;
94
0
  }
95
0
  if (up_threshold==0 || up_threshold <= down_threshold ||
96
0
  up_threshold>100 || down_threshold>100) {
97
0
    LM_ERR("invalid relation or range DOWN/UP thresholds percentages "
98
0
      "[%d,%d]\n", down_threshold, up_threshold);
99
0
    return -1;
100
0
  }
101
0
  if (up_cycles_needed==0 || up_cycles_tocheck==0 ||
102
0
  up_cycles_tocheck<up_cycles_needed) {
103
0
    LM_ERR("invalid relation or values for upscaling check [%d of %d]\n",
104
0
      up_cycles_needed, up_cycles_tocheck);
105
0
    return -1;
106
0
  }
107
108
  /* all good, create it*/
109
110
0
  p = (struct scaling_profile*)pkg_malloc( sizeof(struct scaling_profile) +
111
0
    strlen(name) + 1 );
112
0
  if (p==NULL) {
113
0
    LM_ERR("failed to allocate memory for a new auto-scaling profile\n");
114
0
    return -1;
115
0
  }
116
117
  /* not really need, more to be safe for future expansions */
118
0
  memset( p, 0, sizeof(struct scaling_profile));
119
120
0
  p->max_procs = max_procs;
121
0
  p->up_threshold = up_threshold;
122
0
  p->up_cycles_needed = up_cycles_needed;
123
0
  p->up_cycles_tocheck = up_cycles_tocheck;
124
0
  p->min_procs = min_procs;
125
0
  p->down_threshold = down_threshold;
126
0
  p->down_cycles_tocheck = down_cycles_tocheck;
127
0
  p->down_cycles_delay = down_cycles_delay;
128
0
  p->name = (char*)(p+1);
129
0
  strcpy( p->name, name);
130
131
0
  LM_DBG("profile <%s> created UP [max=%d, th=%d%%, check %d/%d] DOWN "
132
0
    "[min=%d, th=%d%%, check %d, delay=%d]\n", name,
133
0
    max_procs, up_threshold, up_cycles_needed, up_cycles_tocheck,
134
0
    min_procs, down_threshold, down_cycles_tocheck, down_cycles_delay);
135
136
0
  p->next = profiles_head;
137
0
  profiles_head = p;
138
139
0
  return 0;
140
0
}
141
142
143
struct scaling_profile *get_scaling_profile(char *name)
144
0
{
145
0
  struct scaling_profile *p;
146
147
0
  for ( p=profiles_head ; p ; p=p->next )
148
0
    if (strcasecmp(name, p->name)==0)
149
0
      return p;
150
151
0
  return NULL;
152
0
}
153
154
155
#define get_sr_type_name(_type, _s) \
156
0
  do { \
157
0
    if (_type==TYPE_UDP) {              \
158
0
      (_s).s = "UDP"; (_s).len = 3;   \
159
0
    } else if (_type==TYPE_TCP) {       \
160
0
      (_s).s = "TCP"; (_s).len = 3;   \
161
0
    } else if (_type==TYPE_TIMER) {     \
162
0
      (_s).s = "TIMER"; (_s).len = 5; \
163
0
    } else {                            \
164
0
      LM_BUG("unsupported auto-scaling group %d\n", _type); \
165
0
      (_s).s = "??"; (_s).len = 2;    \
166
0
    }                                   \
167
0
  }while(0)
168
169
170
int create_process_group(enum process_type type,
171
    struct socket_info *si_filter, struct scaling_profile *prof,
172
    fork_new_process_f *f1, terminate_process_f *f2)
173
0
{
174
0
  struct process_group *pg, *it;
175
0
  int h_size;
176
0
  str type_s;
177
178
  /* how much of a history do we need in order to cover both up and down
179
   * tranzitions ? */
180
0
  h_size = (prof->up_cycles_tocheck > prof->down_cycles_tocheck) ?
181
0
    prof->up_cycles_tocheck : prof->down_cycles_tocheck;
182
183
0
  get_sr_type_name( type, type_s);
184
185
0
  pg = (struct process_group*)shm_malloc( sizeof(struct process_group) +
186
0
    sizeof(char)*h_size +
187
0
    type_s.len+(si_filter?1+(si_filter->sock_str.len):0)
188
0
    );
189
0
  if (pg==NULL) {
190
0
    LM_ERR("failed to allocate memory for a new process group\n");
191
0
    return -1;
192
0
  }
193
0
  memset( pg, 0, sizeof(struct process_group) + sizeof(char)*h_size );
194
195
0
  pg->type = type;
196
0
  pg->si_filter = si_filter;
197
0
  pg->prof = prof;
198
0
  pg->fork_func = f1;
199
0
  pg->term_func = f2;
200
0
  pg->next = NULL;
201
202
0
  pg->history_size = h_size;
203
0
  pg->history_map = (unsigned char*)(pg+1);
204
0
  pg->history_idx = 0;
205
0
  pg->no_downscale_cycles = pg->prof->down_cycles_delay;
206
207
0
  pg->sr_identifier_name.s = (char*)(pg->history_map + h_size);
208
0
  memcpy( pg->sr_identifier_name.s, type_s.s, type_s.len);
209
0
  pg->sr_identifier_name.len = type_s.len;
210
0
  if (si_filter) {
211
0
    pg->sr_identifier_name.s[pg->sr_identifier_name.len++] = '/';
212
0
    memcpy( pg->sr_identifier_name.s + pg->sr_identifier_name.len,
213
0
      si_filter->sock_str.s, si_filter->sock_str.len);
214
0
    pg->sr_identifier_name.len += si_filter->sock_str.len;
215
0
  }
216
217
0
  LM_DBG("registering group of processes type %d, socket filter %p, "
218
0
    "name <%.*s> scaling profile <%s>\n", type, si_filter,
219
0
    pg->sr_identifier_name.len, pg->sr_identifier_name.s, prof->name);
220
221
0
  if (sr_register_identifier( pts_sr_group,
222
0
    pg->sr_identifier_name.s, pg->sr_identifier_name.len,
223
0
    SR_STATUS_READY, CHAR_INT("active"), 100)<0
224
0
  ) {
225
0
    LM_ERR("failed to register auto-scaling identifier for group"
226
0
      "name <%.*s>, disabling\n",
227
0
    pg->sr_identifier_name.len, pg->sr_identifier_name.s);
228
0
    pg->sr_identifier_name.s = NULL;
229
0
    pg->sr_identifier_name.len = 0;
230
0
  }
231
232
  /* add at the end of list, to avoid changing the head of the list due
233
   * forking */
234
0
  for( it=pg_head ; it && it->next ; it=it->next);
235
0
  if (it==NULL)
236
0
    pg_head = pg;
237
0
  else
238
0
    it->next = pg;
239
240
0
  return 0;
241
0
}
242
243
244
static void _pt_raise_event(struct process_group *pg, int p_id, int load,
245
                                char *scale)
246
0
{
247
0
  static str pt_ev_type = str_init("group_type");
248
0
  static str pt_ev_filter = str_init("group_filter");
249
0
  static str pt_ev_load = str_init("group_load");
250
0
  static str pt_ev_scale = str_init("scale");
251
0
  static str pt_ev_p_id = str_init("process_id");
252
0
  static str pt_ev_pid = str_init("pid");
253
0
  evi_params_p list = NULL;
254
0
  str s;
255
256
0
  if (!evi_probe_event(EVI_PROC_AUTO_SCALE_ID))
257
0
    return;
258
259
0
  list = evi_get_params();
260
0
  if (!list) {
261
0
    LM_ERR("cannot create event params\n");
262
0
    return;
263
0
  }
264
265
0
  get_sr_type_name( pg->type, s);
266
267
0
  if (evi_param_add_str(list, &pt_ev_type, &s) < 0) {
268
0
    LM_ERR("cannot add group type\n");
269
0
    goto error;
270
0
  }
271
272
0
  if (pg->si_filter==NULL) {
273
0
    s.s = "none"; s.len = 4;
274
0
  } else {
275
0
    s = pg->si_filter->sock_str;
276
0
  }
277
0
  if (evi_param_add_str(list, &pt_ev_filter, &s) < 0) {
278
0
    LM_ERR("cannot add group filter\n");
279
0
    goto error;
280
0
  }
281
282
0
  if (evi_param_add_int(list, &pt_ev_load, &load) < 0) {
283
0
    LM_ERR("cannot add group load\n");
284
0
    goto error;
285
0
  }
286
287
0
  s.s = scale; s.len = strlen(s.s);
288
0
  if (evi_param_add_str(list, &pt_ev_scale, &s) < 0) {
289
0
    LM_ERR("cannot add scaling type\n");
290
0
    goto error;
291
0
  }
292
293
0
  if (evi_param_add_int(list, &pt_ev_p_id, &p_id) < 0) {
294
0
    LM_ERR("cannot add process id\n");
295
0
    goto error;
296
0
  }
297
298
0
  if (evi_param_add_int(list, &pt_ev_pid, &(pt[p_id].pid)) < 0) {
299
0
    LM_ERR("cannot add process pid\n");
300
0
    goto error;
301
0
  }
302
303
0
  if (evi_dispatch_event(EVI_PROC_AUTO_SCALE_ID, list)) {
304
0
    LM_ERR("unable to send auto scaling event\n");
305
0
  }
306
0
  return;
307
308
0
error:
309
0
  evi_free_params(list);
310
0
}
311
312
313
static void rescale_group_history(struct process_group *pg, unsigned int idx,
314
    int org_size, int offset)
315
0
{
316
0
  unsigned int k;
317
0
  unsigned char old;
318
319
0
  k = idx;
320
0
  do {
321
0
    old = pg->history_map[k] ;
322
0
    pg->history_map[k] = (pg->history_map[k]*org_size)/(org_size+offset);
323
0
    LM_DBG("rescaling old %d to %d [idx %d]\n",
324
0
      old, pg->history_map[k], k);
325
326
0
    k = k ? (k-1) : (pg->history_size-1) ;
327
0
  } while(k!=idx);
328
0
}
329
330
331
void do_workers_auto_scaling(void)
332
0
{
333
0
  struct process_group *pg;
334
0
  unsigned int i, k, idx;
335
0
  unsigned int load;
336
0
  unsigned int procs_no;
337
0
  unsigned char cnt_under, cnt_over;
338
0
  int p_id, last_idx_in_pg;
339
340
  /* iterate all the groups we have */
341
0
  for ( pg=pg_head ; pg ; pg=pg->next ) {
342
343
0
    load = 0;
344
0
    procs_no = 0;
345
0
    last_idx_in_pg = -1;
346
347
    /* find the processes belonging to this group */
348
0
    for ( i=0 ; i<counted_max_processes ; i++) {
349
350
      /* skip processes:
351
       * - not running
352
       * - runing, but marked for termination
353
       * - not part of the group
354
       * - with a different group filter (socket interface) */
355
0
      if (!is_process_running(i) || pt[i].flags&OSS_PROC_TO_TERMINATE ||
356
0
      pt[i].type != pg->type || pg->si_filter!=pt[i].pg_filter)
357
0
        continue;
358
359
0
      load += get_stat_val( pt[i].load_rt );
360
0
      last_idx_in_pg = i;
361
0
      procs_no++;
362
363
0
    }
364
365
0
    if (!procs_no) {
366
0
      LM_BUG("no process beloging to group %d\n", pg->type);
367
0
      continue;
368
0
    }
369
370
    /* set the current value */
371
0
    idx = (pg->history_idx+1)%pg->history_size;
372
0
    pg->history_map[idx] = (unsigned char) ( load / procs_no );
373
374
    //LM_DBG("group %d (with %d procs) has average load of %d\n",
375
    //  pg->type, procs_no, pg->history_map[idx]);
376
377
    /* do the check over the history */
378
0
    cnt_over = 0;
379
0
    cnt_under = 0;
380
0
    k = idx;
381
0
    i = 1;
382
0
    do {
383
0
      if ( pg->history_map[k] > pg->prof->up_threshold &&
384
0
      i <= pg->prof->up_cycles_tocheck )
385
0
        cnt_over++;
386
0
      else if ( pg->history_map[k] < pg->prof->down_threshold &&
387
0
      i <= pg->prof->down_cycles_tocheck )
388
0
        cnt_under++;
389
390
0
      i++;
391
0
      k = k ? (k-1) : (pg->history_size-1) ;
392
0
    } while(k!=idx);
393
394
    /* decide what to do */
395
0
    if ( cnt_over >= pg->prof->up_cycles_needed ) {
396
0
      if ( procs_no < pg->prof->max_procs ) {
397
0
        LM_NOTICE("score %d/%d -> forking new proc in group %d "
398
0
          "(with %d procs)\n", cnt_over, pg->prof->up_cycles_tocheck,
399
0
          pg->type, procs_no);
400
        /* we need to fork one more process here */
401
0
        if ( (p_id=pg->fork_func(pg->si_filter))<0 ||
402
0
        wait_for_one_child()<0 ) {
403
0
          LM_ERR("failed to fork new process for group %d "
404
0
            "(current %d procs)\n",pg->type,procs_no);
405
0
        } else {
406
0
          sr_add_report_fmt( pts_sr_group,
407
0
            pg->sr_identifier_name.s, pg->sr_identifier_name.len,
408
0
            0, "Forking new process id %d at load %d "
409
0
            "in group with %d processes",
410
0
            p_id, pg->history_map[idx], procs_no);
411
0
          _pt_raise_event( pg, p_id, pg->history_map[idx] ,"up");
412
0
          rescale_group_history( pg, idx, procs_no, +1);
413
0
          pg->no_downscale_cycles = pg->prof->down_cycles_delay;
414
0
        }
415
0
      }
416
0
    } else if ( pg->prof->down_cycles_tocheck != 0 &&
417
0
    cnt_under == pg->prof->down_cycles_tocheck ) {
418
0
      if ( procs_no > pg->prof->min_procs &&
419
0
      pg->no_downscale_cycles==0) {
420
        /* try to estimate the load after downscaling */
421
0
        load = 0;
422
0
        k = idx;
423
0
        i = 0;
424
0
        do {
425
0
          load += pg->history_map[k];
426
0
          k = k ? (k-1) : (pg->history_size-1) ;
427
0
          i++;
428
0
        } while( k != idx && i <= pg->prof->down_cycles_tocheck );
429
0
        load = (load*procs_no) /
430
0
          (pg->prof->down_cycles_tocheck * (procs_no-1));
431
0
        if ( load < pg->prof->up_threshold ) {
432
          /* down scale one more process here */
433
0
          LM_NOTICE("score %d/%d -> ripping proc %d from group %d "
434
0
            "(with %d procs), estimated load -> %d\n", cnt_under,
435
0
            pg->prof->down_cycles_tocheck, last_idx_in_pg,
436
0
            pg->type, procs_no, load );
437
0
          pt[last_idx_in_pg].flags |= OSS_PROC_TO_TERMINATE;
438
0
          ipc_send_rpc( last_idx_in_pg, pg->term_func, NULL);
439
0
          sr_add_report_fmt( pts_sr_group,
440
0
            pg->sr_identifier_name.s, pg->sr_identifier_name.len,
441
0
            0, "Ripping process id %d at load %d "
442
0
            "in group with %d processes",
443
0
            last_idx_in_pg, pg->history_map[idx], procs_no);
444
0
          _pt_raise_event( pg, last_idx_in_pg,
445
0
            pg->history_map[idx], "down");
446
0
        }
447
0
      }
448
0
    }
449
450
0
    pg->history_idx++;
451
0
    if (pg->no_downscale_cycles) pg->no_downscale_cycles--;
452
0
  }
453
0
}
454