/src/opensips/pt_scaling.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (C) 2019 OpenSIPS Project |
3 | | * |
4 | | * This file is part of opensips, a free SIP server. |
5 | | * |
6 | | * opensips is free software; you can redistribute it and/or modify |
7 | | * it under the terms of the GNU General Public License as published by |
8 | | * the Free Software Foundation; either version 2 of the License, or |
9 | | * (at your option) any later version |
10 | | * |
11 | | * opensips is distributed in the hope that it will be useful, |
12 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | | * GNU General Public License for more details. |
15 | | * |
16 | | * You should have received a copy of the GNU General Public License |
17 | | * along with this program; if not, write to the Free Software |
18 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
19 | | */ |
20 | | |
21 | | #include <sys/types.h> |
22 | | #include <unistd.h> |
23 | | #include <stdio.h> |
24 | | #include "mem/shm_mem.h" |
25 | | #include "socket_info.h" |
26 | | #include "dprint.h" |
27 | | #include "pt.h" |
28 | | #include "ipc.h" |
29 | | #include "daemonize.h" |
30 | | #include "status_report.h" |
31 | | |
32 | | struct process_group { |
33 | | enum process_type type; |
34 | | struct socket_info *si_filter; |
35 | | fork_new_process_f *fork_func; |
36 | | terminate_process_f *term_func; |
37 | | struct scaling_profile *prof; |
38 | | unsigned char history_size; |
39 | | unsigned char *history_map; |
40 | | unsigned char history_idx; |
41 | | unsigned short no_downscale_cycles; |
42 | | str sr_identifier_name; |
43 | | struct process_group *next; |
44 | | }; |
45 | | |
46 | | static struct process_group *pg_head = NULL; |
47 | | |
48 | | static struct scaling_profile *profiles_head = NULL; |
49 | | |
50 | | static void *pts_sr_group = NULL; |
51 | | |
52 | | |
53 | | int init_auto_scaling(void) |
54 | 0 | { |
55 | 0 | if (!auto_scaling_enabled) |
56 | 0 | return 0; |
57 | | |
58 | 0 | pts_sr_group = sr_register_group(CHAR_INT("auto-scaling"), 0/*is_public*/); |
59 | 0 | if (pts_sr_group==NULL) { |
60 | 0 | LM_ERR("Failed to register 'status_report' group for " |
61 | 0 | "'auto-scaling' support\n"); |
62 | 0 | return -1; |
63 | 0 | } |
64 | | |
65 | 0 | return 0; |
66 | 0 | } |
67 | | |
68 | | |
69 | | int create_auto_scaling_profile( char *name, |
70 | | unsigned int max_procs, unsigned int up_threshold, |
71 | | unsigned int up_cycles_needed, unsigned int up_cycles_tocheck, |
72 | | unsigned int min_procs, unsigned int down_threshold, |
73 | | unsigned int down_cycles_tocheck, unsigned short down_cycles_delay) |
74 | 0 | { |
75 | 0 | struct scaling_profile *p; |
76 | | |
77 | | /* check for duplicates */ |
78 | 0 | if ( get_scaling_profile(name) ) { |
79 | 0 | LM_ERR("profile <%s> (case insensitive) already created" |
80 | 0 | " - double definition?? \n", name); |
81 | 0 | return -1; |
82 | 0 | } |
83 | | |
84 | | /* some sanity checks */ |
85 | 0 | if (min_procs==0) { |
86 | 0 | down_threshold = 0; |
87 | 0 | down_cycles_tocheck = 0; |
88 | 0 | down_cycles_delay = 0; |
89 | 0 | } |
90 | 0 | if (max_procs==0 || max_procs <= min_procs || max_procs>=1000) { |
91 | 0 | LM_ERR("invalid relation or range for MIN/MAX processes [%d,%d]\n", |
92 | 0 | min_procs, max_procs); |
93 | 0 | return -1; |
94 | 0 | } |
95 | 0 | if (up_threshold==0 || up_threshold <= down_threshold || |
96 | 0 | up_threshold>100 || down_threshold>100) { |
97 | 0 | LM_ERR("invalid relation or range DOWN/UP thresholds percentages " |
98 | 0 | "[%d,%d]\n", down_threshold, up_threshold); |
99 | 0 | return -1; |
100 | 0 | } |
101 | 0 | if (up_cycles_needed==0 || up_cycles_tocheck==0 || |
102 | 0 | up_cycles_tocheck<up_cycles_needed) { |
103 | 0 | LM_ERR("invalid relation or values for upscaling check [%d of %d]\n", |
104 | 0 | up_cycles_needed, up_cycles_tocheck); |
105 | 0 | return -1; |
106 | 0 | } |
107 | | |
108 | | /* all good, create it*/ |
109 | | |
110 | 0 | p = (struct scaling_profile*)pkg_malloc( sizeof(struct scaling_profile) + |
111 | 0 | strlen(name) + 1 ); |
112 | 0 | if (p==NULL) { |
113 | 0 | LM_ERR("failed to allocate memory for a new auto-scaling profile\n"); |
114 | 0 | return -1; |
115 | 0 | } |
116 | | |
117 | | /* not really need, more to be safe for future expansions */ |
118 | 0 | memset( p, 0, sizeof(struct scaling_profile)); |
119 | |
|
120 | 0 | p->max_procs = max_procs; |
121 | 0 | p->up_threshold = up_threshold; |
122 | 0 | p->up_cycles_needed = up_cycles_needed; |
123 | 0 | p->up_cycles_tocheck = up_cycles_tocheck; |
124 | 0 | p->min_procs = min_procs; |
125 | 0 | p->down_threshold = down_threshold; |
126 | 0 | p->down_cycles_tocheck = down_cycles_tocheck; |
127 | 0 | p->down_cycles_delay = down_cycles_delay; |
128 | 0 | p->name = (char*)(p+1); |
129 | 0 | strcpy( p->name, name); |
130 | |
|
131 | 0 | LM_DBG("profile <%s> created UP [max=%d, th=%d%%, check %d/%d] DOWN " |
132 | 0 | "[min=%d, th=%d%%, check %d, delay=%d]\n", name, |
133 | 0 | max_procs, up_threshold, up_cycles_needed, up_cycles_tocheck, |
134 | 0 | min_procs, down_threshold, down_cycles_tocheck, down_cycles_delay); |
135 | |
|
136 | 0 | p->next = profiles_head; |
137 | 0 | profiles_head = p; |
138 | |
|
139 | 0 | return 0; |
140 | 0 | } |
141 | | |
142 | | |
143 | | struct scaling_profile *get_scaling_profile(char *name) |
144 | 0 | { |
145 | 0 | struct scaling_profile *p; |
146 | |
|
147 | 0 | for ( p=profiles_head ; p ; p=p->next ) |
148 | 0 | if (strcasecmp(name, p->name)==0) |
149 | 0 | return p; |
150 | | |
151 | 0 | return NULL; |
152 | 0 | } |
153 | | |
154 | | |
155 | | #define get_sr_type_name(_type, _s) \ |
156 | 0 | do { \ |
157 | 0 | if (_type==TYPE_UDP) { \ |
158 | 0 | (_s).s = "UDP"; (_s).len = 3; \ |
159 | 0 | } else if (_type==TYPE_TCP) { \ |
160 | 0 | (_s).s = "TCP"; (_s).len = 3; \ |
161 | 0 | } else if (_type==TYPE_TIMER) { \ |
162 | 0 | (_s).s = "TIMER"; (_s).len = 5; \ |
163 | 0 | } else { \ |
164 | 0 | LM_BUG("unsupported auto-scaling group %d\n", _type); \ |
165 | 0 | (_s).s = "??"; (_s).len = 2; \ |
166 | 0 | } \ |
167 | 0 | }while(0) |
168 | | |
169 | | |
170 | | int create_process_group(enum process_type type, |
171 | | struct socket_info *si_filter, struct scaling_profile *prof, |
172 | | fork_new_process_f *f1, terminate_process_f *f2) |
173 | 0 | { |
174 | 0 | struct process_group *pg, *it; |
175 | 0 | int h_size; |
176 | 0 | str type_s; |
177 | | |
178 | | /* how much of a history do we need in order to cover both up and down |
179 | | * tranzitions ? */ |
180 | 0 | h_size = (prof->up_cycles_tocheck > prof->down_cycles_tocheck) ? |
181 | 0 | prof->up_cycles_tocheck : prof->down_cycles_tocheck; |
182 | |
|
183 | 0 | get_sr_type_name( type, type_s); |
184 | |
|
185 | 0 | pg = (struct process_group*)shm_malloc( sizeof(struct process_group) + |
186 | 0 | sizeof(char)*h_size + |
187 | 0 | type_s.len+(si_filter?1+(si_filter->sock_str.len):0) |
188 | 0 | ); |
189 | 0 | if (pg==NULL) { |
190 | 0 | LM_ERR("failed to allocate memory for a new process group\n"); |
191 | 0 | return -1; |
192 | 0 | } |
193 | 0 | memset( pg, 0, sizeof(struct process_group) + sizeof(char)*h_size ); |
194 | |
|
195 | 0 | pg->type = type; |
196 | 0 | pg->si_filter = si_filter; |
197 | 0 | pg->prof = prof; |
198 | 0 | pg->fork_func = f1; |
199 | 0 | pg->term_func = f2; |
200 | 0 | pg->next = NULL; |
201 | |
|
202 | 0 | pg->history_size = h_size; |
203 | 0 | pg->history_map = (unsigned char*)(pg+1); |
204 | 0 | pg->history_idx = 0; |
205 | 0 | pg->no_downscale_cycles = pg->prof->down_cycles_delay; |
206 | |
|
207 | 0 | pg->sr_identifier_name.s = (char*)(pg->history_map + h_size); |
208 | 0 | memcpy( pg->sr_identifier_name.s, type_s.s, type_s.len); |
209 | 0 | pg->sr_identifier_name.len = type_s.len; |
210 | 0 | if (si_filter) { |
211 | 0 | pg->sr_identifier_name.s[pg->sr_identifier_name.len++] = '/'; |
212 | 0 | memcpy( pg->sr_identifier_name.s + pg->sr_identifier_name.len, |
213 | 0 | si_filter->sock_str.s, si_filter->sock_str.len); |
214 | 0 | pg->sr_identifier_name.len += si_filter->sock_str.len; |
215 | 0 | } |
216 | |
|
217 | 0 | LM_DBG("registering group of processes type %d, socket filter %p, " |
218 | 0 | "name <%.*s> scaling profile <%s>\n", type, si_filter, |
219 | 0 | pg->sr_identifier_name.len, pg->sr_identifier_name.s, prof->name); |
220 | |
|
221 | 0 | if (sr_register_identifier( pts_sr_group, |
222 | 0 | pg->sr_identifier_name.s, pg->sr_identifier_name.len, |
223 | 0 | SR_STATUS_READY, CHAR_INT("active"), 100)<0 |
224 | 0 | ) { |
225 | 0 | LM_ERR("failed to register auto-scaling identifier for group" |
226 | 0 | "name <%.*s>, disabling\n", |
227 | 0 | pg->sr_identifier_name.len, pg->sr_identifier_name.s); |
228 | 0 | pg->sr_identifier_name.s = NULL; |
229 | 0 | pg->sr_identifier_name.len = 0; |
230 | 0 | } |
231 | | |
232 | | /* add at the end of list, to avoid changing the head of the list due |
233 | | * forking */ |
234 | 0 | for( it=pg_head ; it && it->next ; it=it->next); |
235 | 0 | if (it==NULL) |
236 | 0 | pg_head = pg; |
237 | 0 | else |
238 | 0 | it->next = pg; |
239 | |
|
240 | 0 | return 0; |
241 | 0 | } |
242 | | |
243 | | |
244 | | static void _pt_raise_event(struct process_group *pg, int p_id, int load, |
245 | | char *scale) |
246 | 0 | { |
247 | 0 | static str pt_ev_type = str_init("group_type"); |
248 | 0 | static str pt_ev_filter = str_init("group_filter"); |
249 | 0 | static str pt_ev_load = str_init("group_load"); |
250 | 0 | static str pt_ev_scale = str_init("scale"); |
251 | 0 | static str pt_ev_p_id = str_init("process_id"); |
252 | 0 | static str pt_ev_pid = str_init("pid"); |
253 | 0 | evi_params_p list = NULL; |
254 | 0 | str s; |
255 | |
|
256 | 0 | if (!evi_probe_event(EVI_PROC_AUTO_SCALE_ID)) |
257 | 0 | return; |
258 | | |
259 | 0 | list = evi_get_params(); |
260 | 0 | if (!list) { |
261 | 0 | LM_ERR("cannot create event params\n"); |
262 | 0 | return; |
263 | 0 | } |
264 | | |
265 | 0 | get_sr_type_name( pg->type, s); |
266 | |
|
267 | 0 | if (evi_param_add_str(list, &pt_ev_type, &s) < 0) { |
268 | 0 | LM_ERR("cannot add group type\n"); |
269 | 0 | goto error; |
270 | 0 | } |
271 | | |
272 | 0 | if (pg->si_filter==NULL) { |
273 | 0 | s.s = "none"; s.len = 4; |
274 | 0 | } else { |
275 | 0 | s = pg->si_filter->sock_str; |
276 | 0 | } |
277 | 0 | if (evi_param_add_str(list, &pt_ev_filter, &s) < 0) { |
278 | 0 | LM_ERR("cannot add group filter\n"); |
279 | 0 | goto error; |
280 | 0 | } |
281 | | |
282 | 0 | if (evi_param_add_int(list, &pt_ev_load, &load) < 0) { |
283 | 0 | LM_ERR("cannot add group load\n"); |
284 | 0 | goto error; |
285 | 0 | } |
286 | | |
287 | 0 | s.s = scale; s.len = strlen(s.s); |
288 | 0 | if (evi_param_add_str(list, &pt_ev_scale, &s) < 0) { |
289 | 0 | LM_ERR("cannot add scaling type\n"); |
290 | 0 | goto error; |
291 | 0 | } |
292 | | |
293 | 0 | if (evi_param_add_int(list, &pt_ev_p_id, &p_id) < 0) { |
294 | 0 | LM_ERR("cannot add process id\n"); |
295 | 0 | goto error; |
296 | 0 | } |
297 | | |
298 | 0 | if (evi_param_add_int(list, &pt_ev_pid, &(pt[p_id].pid)) < 0) { |
299 | 0 | LM_ERR("cannot add process pid\n"); |
300 | 0 | goto error; |
301 | 0 | } |
302 | | |
303 | 0 | if (evi_dispatch_event(EVI_PROC_AUTO_SCALE_ID, list)) { |
304 | 0 | LM_ERR("unable to send auto scaling event\n"); |
305 | 0 | } |
306 | 0 | return; |
307 | | |
308 | 0 | error: |
309 | 0 | evi_free_params(list); |
310 | 0 | } |
311 | | |
312 | | |
313 | | static void rescale_group_history(struct process_group *pg, unsigned int idx, |
314 | | int org_size, int offset) |
315 | 0 | { |
316 | 0 | unsigned int k; |
317 | 0 | unsigned char old; |
318 | |
|
319 | 0 | k = idx; |
320 | 0 | do { |
321 | 0 | old = pg->history_map[k] ; |
322 | 0 | pg->history_map[k] = (pg->history_map[k]*org_size)/(org_size+offset); |
323 | 0 | LM_DBG("rescaling old %d to %d [idx %d]\n", |
324 | 0 | old, pg->history_map[k], k); |
325 | |
|
326 | 0 | k = k ? (k-1) : (pg->history_size-1) ; |
327 | 0 | } while(k!=idx); |
328 | 0 | } |
329 | | |
330 | | |
331 | | void do_workers_auto_scaling(void) |
332 | 0 | { |
333 | 0 | struct process_group *pg; |
334 | 0 | unsigned int i, k, idx; |
335 | 0 | unsigned int load; |
336 | 0 | unsigned int procs_no; |
337 | 0 | unsigned char cnt_under, cnt_over; |
338 | 0 | int p_id, last_idx_in_pg; |
339 | | |
340 | | /* iterate all the groups we have */ |
341 | 0 | for ( pg=pg_head ; pg ; pg=pg->next ) { |
342 | |
|
343 | 0 | load = 0; |
344 | 0 | procs_no = 0; |
345 | 0 | last_idx_in_pg = -1; |
346 | | |
347 | | /* find the processes belonging to this group */ |
348 | 0 | for ( i=0 ; i<counted_max_processes ; i++) { |
349 | | |
350 | | /* skip processes: |
351 | | * - not running |
352 | | * - runing, but marked for termination |
353 | | * - not part of the group |
354 | | * - with a different group filter (socket interface) */ |
355 | 0 | if (!is_process_running(i) || pt[i].flags&OSS_PROC_TO_TERMINATE || |
356 | 0 | pt[i].type != pg->type || pg->si_filter!=pt[i].pg_filter) |
357 | 0 | continue; |
358 | | |
359 | 0 | load += get_stat_val( pt[i].load_rt ); |
360 | 0 | last_idx_in_pg = i; |
361 | 0 | procs_no++; |
362 | |
|
363 | 0 | } |
364 | |
|
365 | 0 | if (!procs_no) { |
366 | 0 | LM_BUG("no process beloging to group %d\n", pg->type); |
367 | 0 | continue; |
368 | 0 | } |
369 | | |
370 | | /* set the current value */ |
371 | 0 | idx = (pg->history_idx+1)%pg->history_size; |
372 | 0 | pg->history_map[idx] = (unsigned char) ( load / procs_no ); |
373 | | |
374 | | //LM_DBG("group %d (with %d procs) has average load of %d\n", |
375 | | // pg->type, procs_no, pg->history_map[idx]); |
376 | | |
377 | | /* do the check over the history */ |
378 | 0 | cnt_over = 0; |
379 | 0 | cnt_under = 0; |
380 | 0 | k = idx; |
381 | 0 | i = 1; |
382 | 0 | do { |
383 | 0 | if ( pg->history_map[k] > pg->prof->up_threshold && |
384 | 0 | i <= pg->prof->up_cycles_tocheck ) |
385 | 0 | cnt_over++; |
386 | 0 | else if ( pg->history_map[k] < pg->prof->down_threshold && |
387 | 0 | i <= pg->prof->down_cycles_tocheck ) |
388 | 0 | cnt_under++; |
389 | |
|
390 | 0 | i++; |
391 | 0 | k = k ? (k-1) : (pg->history_size-1) ; |
392 | 0 | } while(k!=idx); |
393 | | |
394 | | /* decide what to do */ |
395 | 0 | if ( cnt_over >= pg->prof->up_cycles_needed ) { |
396 | 0 | if ( procs_no < pg->prof->max_procs ) { |
397 | 0 | LM_NOTICE("score %d/%d -> forking new proc in group %d " |
398 | 0 | "(with %d procs)\n", cnt_over, pg->prof->up_cycles_tocheck, |
399 | 0 | pg->type, procs_no); |
400 | | /* we need to fork one more process here */ |
401 | 0 | if ( (p_id=pg->fork_func(pg->si_filter))<0 || |
402 | 0 | wait_for_one_child()<0 ) { |
403 | 0 | LM_ERR("failed to fork new process for group %d " |
404 | 0 | "(current %d procs)\n",pg->type,procs_no); |
405 | 0 | } else { |
406 | 0 | sr_add_report_fmt( pts_sr_group, |
407 | 0 | pg->sr_identifier_name.s, pg->sr_identifier_name.len, |
408 | 0 | 0, "Forking new process id %d at load %d " |
409 | 0 | "in group with %d processes", |
410 | 0 | p_id, pg->history_map[idx], procs_no); |
411 | 0 | _pt_raise_event( pg, p_id, pg->history_map[idx] ,"up"); |
412 | 0 | rescale_group_history( pg, idx, procs_no, +1); |
413 | 0 | pg->no_downscale_cycles = pg->prof->down_cycles_delay; |
414 | 0 | } |
415 | 0 | } |
416 | 0 | } else if ( pg->prof->down_cycles_tocheck != 0 && |
417 | 0 | cnt_under == pg->prof->down_cycles_tocheck ) { |
418 | 0 | if ( procs_no > pg->prof->min_procs && |
419 | 0 | pg->no_downscale_cycles==0) { |
420 | | /* try to estimate the load after downscaling */ |
421 | 0 | load = 0; |
422 | 0 | k = idx; |
423 | 0 | i = 0; |
424 | 0 | do { |
425 | 0 | load += pg->history_map[k]; |
426 | 0 | k = k ? (k-1) : (pg->history_size-1) ; |
427 | 0 | i++; |
428 | 0 | } while( k != idx && i <= pg->prof->down_cycles_tocheck ); |
429 | 0 | load = (load*procs_no) / |
430 | 0 | (pg->prof->down_cycles_tocheck * (procs_no-1)); |
431 | 0 | if ( load < pg->prof->up_threshold ) { |
432 | | /* down scale one more process here */ |
433 | 0 | LM_NOTICE("score %d/%d -> ripping proc %d from group %d " |
434 | 0 | "(with %d procs), estimated load -> %d\n", cnt_under, |
435 | 0 | pg->prof->down_cycles_tocheck, last_idx_in_pg, |
436 | 0 | pg->type, procs_no, load ); |
437 | 0 | pt[last_idx_in_pg].flags |= OSS_PROC_TO_TERMINATE; |
438 | 0 | ipc_send_rpc( last_idx_in_pg, pg->term_func, NULL); |
439 | 0 | sr_add_report_fmt( pts_sr_group, |
440 | 0 | pg->sr_identifier_name.s, pg->sr_identifier_name.len, |
441 | 0 | 0, "Ripping process id %d at load %d " |
442 | 0 | "in group with %d processes", |
443 | 0 | last_idx_in_pg, pg->history_map[idx], procs_no); |
444 | 0 | _pt_raise_event( pg, last_idx_in_pg, |
445 | 0 | pg->history_map[idx], "down"); |
446 | 0 | } |
447 | 0 | } |
448 | 0 | } |
449 | |
|
450 | 0 | pg->history_idx++; |
451 | 0 | if (pg->no_downscale_cycles) pg->no_downscale_cycles--; |
452 | 0 | } |
453 | 0 | } |
454 | | |