Coverage Report

Created: 2026-05-04 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/opensips/async.c
Line
Count
Source
1
/*
2
 * Copyright (C) 2014 OpenSIPS Solutions
3
 *
4
 * This file is part of opensips, a free SIP server.
5
 *
6
 * opensips is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * opensips is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19
 *
20
 *
21
 * history:
22
 * ---------
23
 *  2014-10-15  created (bogdan)
24
 */
25
26
#include "mem/shm_mem.h"
27
#include "dprint.h"
28
#include "reactor_defs.h"
29
#include "async.h"
30
#include "route.h"
31
#include "action.h"
32
#include "sr_module.h"
33
#include "profiling.h"
34
35
int async_status = ASYNC_NO_IO;
36
37
extern int return_code; /* from action.c, return code */
38
39
40
/* start/resume functions used for script async ops */
41
async_script_start_function  *async_script_start_f  = NULL;
42
async_script_resume_function *async_script_resume_f = NULL;
43
44
45
/* async context used by Launch Async operation */
46
typedef struct _async_launch_ctx {
47
  /* generic async context - MUST BE FIRST */
48
  async_ctx  async;
49
  /* ref to the report script route (NULL if none) */
50
  struct script_route_ref *report_route;
51
  str report_route_param;
52
} async_launch_ctx;
53
54
55
56
/************* Functions related to ASYNC via script functions ***************/
57
58
int register_async_script_handlers(async_script_start_function *f1,
59
                      async_script_resume_function *f2)
60
0
{
61
0
  if (async_script_start_f) {
62
0
    LM_ERR("aync script handlers already registered\n");
63
0
    return -1;
64
0
  }
65
66
0
  async_script_start_f = f1;
67
0
  async_script_resume_f = f2;
68
69
0
  return 0;
70
0
}
71
72
73
/************* Functions related to internal ASYNC support ***************/
74
75
int register_async_fd(int fd, async_resume_fd *f, void *resume_param)
76
0
{
77
0
  async_ctx *ctx = NULL;
78
79
0
  if ( (ctx=shm_malloc(sizeof(async_ctx)))==NULL) {
80
0
    LM_ERR("failed to allocate new async_ctx\n");
81
0
    return -1;
82
0
  }
83
84
0
  memset(ctx,0,sizeof(async_ctx));
85
86
0
  ASYNC_SET_RESUME_F(ctx, f);
87
0
  ctx->resume_param = resume_param;
88
89
  /* place the FD + resume function (as param) into reactor */
90
0
  if (reactor_add_reader( fd, F_FD_ASYNC, RCT_PRIO_ASYNC, (void*)ctx)<0 ) {
91
0
    LM_ERR("failed to add async FD to reactor\n");
92
0
    shm_free(ctx);
93
0
    return -1;
94
0
  }
95
96
0
  return 0;
97
0
}
98
99
100
int async_fd_resume(int fd, void *param)
101
0
{
102
0
  async_ctx *ctx = (async_ctx *)param;
103
0
  int ret;
104
105
0
  async_status = ASYNC_DONE; /* assume default status as done */
106
107
0
  profiling_proc_enter( LEVEL_SIP, ctx->resume_f_name, 0);
108
109
  /* call the resume function in order to read and handle data */
110
0
  ret = ((async_resume_fd*)ctx->resume_f)( fd, ctx->resume_param );
111
0
  if (async_status==ASYNC_CONTINUE) {
112
    /* leave the fd into the reactor*/
113
0
    goto done;
114
0
  } else if (async_status==ASYNC_CHANGE_FD) {
115
0
    if (ret<0) {
116
0
      LM_ERR("ASYNC_CHANGE_FD: given file descriptor shall be "
117
0
        "positive!\n");
118
0
      goto done;
119
0
    } else if (ret>0 && ret==fd) {
120
      /*trying to add the same fd; shall continue*/
121
0
      LM_CRIT("You are trying to replace the old fd with the same fd!"
122
0
          "Will act as in ASYNC_CONTINUE!\n");
123
0
      goto done;
124
0
    }
125
126
    /* remove the old fd from the reactor */
127
0
    reactor_del_reader(fd, -1, IO_FD_CLOSING);
128
0
    fd=ret;
129
130
    /* insert the new fd inside the reactor */
131
0
    if (reactor_add_reader(fd,F_FD_ASYNC,RCT_PRIO_ASYNC,(void*)ctx)<0 ) {
132
0
      LM_ERR("failed to add async FD to reactor -> act in sync mode\n");
133
0
      do {
134
0
        async_status = ASYNC_DONE;
135
0
        ret = ((async_resume_fd*)ctx->resume_f)(fd,ctx->resume_param);
136
0
        if (async_status == ASYNC_CHANGE_FD)
137
0
          fd=ret;
138
0
      } while(async_status==ASYNC_CONTINUE||async_status==ASYNC_CHANGE_FD);
139
0
      goto done;
140
0
    } else {
141
142
      /* successfully changed fd */
143
0
      goto done;
144
0
    }
145
0
  }
146
147
  /* remove from reactor, we are done */
148
0
  reactor_del_reader(fd, -1, IO_FD_CLOSING);
149
150
0
done:
151
0
  if (async_status == ASYNC_DONE_CLOSE_FD)
152
0
    close(fd);
153
154
0
  profiling_proc_exit( LEVEL_SIP, "async-fd resume handler", async_status );
155
156
0
  return 0;
157
0
}
158
159
160
/************* Functions related to ASYNC Launch support ***************/
161
162
int launch_route_param_get(struct sip_msg *msg, pv_param_t *ip,
163
    pv_value_t *res, void *params, void *extra)
164
0
{
165
0
  str *val = (str*)params;
166
167
  /* we do accept here only one param with index */
168
0
  if (ip->pvn.type!=PV_NAME_INTSTR || ip->pvn.u.isname.type!=0
169
0
  || ip->pvn.u.isname.name.n!=1)
170
0
    return pv_get_null(msg, ip, res);
171
172
0
  res->flags = PV_VAL_STR;
173
0
  res->rs.s =val->s;
174
0
  res->rs.len = val->len;
175
176
0
  return 0;
177
0
}
178
179
180
int async_launch_resume(int fd, void *param)
181
0
{
182
0
  struct sip_msg *req;
183
0
  async_launch_ctx *ctx = (async_launch_ctx *)param;
184
0
  int bk_rt;
185
186
0
  LM_DBG("resume for a launch job\n");
187
188
0
  req = get_dummy_sip_msg();
189
0
  if(req == NULL) {
190
0
    LM_ERR("No more memory\n");
191
0
    return -1;
192
0
  }
193
194
0
  profiling_proc_enter( LEVEL_SIP, ctx->async.resume_f_name, 0);
195
196
0
  async_status = ASYNC_DONE; /* assume default status as done */
197
198
  /* call the resume function in order to read and handle data */
199
0
  return_code = ((async_resume_module*)(ctx->async.resume_f))
200
0
    ( fd, req, ctx->async.resume_param );
201
202
0
  if (async_status==ASYNC_CONTINUE) {
203
    /* do not run the report route, leave the fd into the reactor*/
204
0
    goto restore;
205
206
0
  } else if (async_status==ASYNC_DONE_NO_IO) {
207
    /* don't do any change on the fd, since the module handled everything*/
208
0
    goto run_route;
209
210
0
  } else if (async_status==ASYNC_CHANGE_FD) {
211
0
    if (return_code<0) {
212
0
      LM_ERR("ASYNC_CHANGE_FD: given file descriptor must be "
213
0
        "positive!\n");
214
0
      goto restore;
215
0
    } else if (return_code>0 && return_code==fd) {
216
      /*trying to add the same fd; shall continue*/
217
0
      LM_CRIT("You are trying to replace the old fd with the same fd!"
218
0
          "Will act as in ASYNC_CONTINUE!\n");
219
0
      goto restore;
220
0
    }
221
222
    /* remove the old fd from the reactor */
223
0
    reactor_del_reader(fd, -1, IO_FD_CLOSING);
224
0
    fd=return_code;
225
226
    /* insert the new fd inside the reactor */
227
0
    if (reactor_add_reader(fd, F_LAUNCH_ASYNC, RCT_PRIO_ASYNC,
228
0
    (void*)ctx)<0 ) {
229
0
      LM_ERR("failed to add async FD to reactor -> act in sync mode\n");
230
0
      do {
231
0
        async_status = ASYNC_DONE;
232
0
        return_code = ((async_resume_module*)(ctx->async.resume_f))
233
0
          (fd, req, ctx->async.resume_param );
234
0
        if (async_status == ASYNC_CHANGE_FD)
235
0
          fd=return_code;
236
0
      } while(async_status==ASYNC_CONTINUE||async_status==ASYNC_CHANGE_FD);
237
0
      goto run_route;
238
0
    } else {
239
240
      /* successfully changed fd */
241
0
      goto restore;
242
0
    }
243
0
  }
244
245
  /* remove from reactor, we are done */
246
0
  reactor_del_reader(fd, -1, IO_FD_CLOSING);
247
248
0
run_route:
249
0
  if (async_status == ASYNC_DONE_CLOSE_FD)
250
0
    close(fd);
251
252
0
  if (ref_script_route_check_and_update(ctx->report_route)) {
253
0
    LM_DBG("runinng report route for a launch job,"
254
0
      " route <%s>, param [%.*s]\n",
255
0
      ctx->report_route->name.s,
256
0
      ctx->report_route_param.len, ctx->report_route_param.s);
257
0
    if (ctx->report_route_param.s)
258
0
      route_params_push_level(
259
0
        sroutes->request[ctx->report_route->idx].name, 
260
0
        &ctx->report_route_param, NULL,
261
0
        launch_route_param_get);
262
0
    swap_route_type( bk_rt, REQUEST_ROUTE);
263
0
    run_top_route( sroutes->request[ctx->report_route->idx], req);
264
0
    set_route_type( bk_rt );
265
0
    if (ctx->report_route_param.s)
266
0
      route_params_pop_level();
267
268
    /* remove all added AVP */
269
0
    reset_avps( );
270
0
  }
271
272
  /* no need for the context anymore */
273
0
  if (ctx->report_route)
274
0
    shm_free(ctx->report_route);
275
0
  shm_free(ctx);
276
0
  LM_DBG("done with a launch job\n");
277
278
0
restore:
279
  /* clean whatever extra structures were added by script functions */
280
0
  release_dummy_sip_msg(req);
281
282
0
  profiling_proc_exit( LEVEL_SIP, "async-fd resume handler", async_status );
283
284
0
  return 0;
285
0
}
286
287
288
int async_script_launch(struct sip_msg *msg, struct action* a,
289
                struct script_route_ref *report_route,
290
                str *report_route_param, void **params)
291
0
{
292
0
  struct sip_msg *req;
293
0
  struct usr_avp *report_avps = NULL, **bak_avps = NULL;
294
0
  async_launch_ctx *ctx;
295
0
  int fd = -1;
296
0
  int bk_rt;
297
298
  /* run the function (the action) and get back from it the FD,
299
   * resume function and param */
300
0
  if ( a->type!=AMODULE_T || a->elem[0].type!=ACMD_ST ||
301
0
  a->elem[0].u.data==NULL ) {
302
0
    LM_CRIT("BUG - invalid action for async I/O - it must be"
303
0
      " a MODULE_T ACMD_ST \n");
304
0
    return -1;
305
0
  }
306
307
0
  if ( (ctx=shm_malloc(sizeof(async_launch_ctx) + ( (report_route&&report_route_param)?report_route_param->len:0)))==NULL) {
308
0
    LM_ERR("failed to allocate new ctx, forcing sync mode\n");
309
0
    return -1;
310
0
  }
311
312
0
  memset(ctx,0,sizeof(async_launch_ctx));
313
314
0
  async_status = ASYNC_NO_IO; /*assume defauly status "no IO done" */
315
316
0
  return_code = ((const acmd_export_t*)(a->elem[0].u.data_const))->function(msg,
317
0
      (async_ctx*)ctx,
318
0
      params[0], params[1], params[2],
319
0
      params[3], params[4], params[5],
320
0
      params[6], params[7]);
321
  /* what to do now ? */
322
0
  if (async_status>=0) {
323
    /* async I/O was successfully launched */
324
0
    fd = async_status;
325
0
  } else if (async_status==ASYNC_NO_FD) {
326
    /* async was successfully launched but without a FD resume
327
     * in this case, we need to push the async ctx back to the
328
     * function, so it can trigger the resume later, by itself */
329
0
  } else if (async_status==ASYNC_NO_IO) {
330
    /* no IO, so simply continue with the script */
331
0
    shm_free(ctx);
332
0
    return 1;
333
0
  } else if (async_status==ASYNC_SYNC) {
334
    /* IO already done in SYNC'ed way */
335
0
    goto report;
336
0
  } else if (async_status==ASYNC_CHANGE_FD) {
337
0
    LM_ERR("Incorrect ASYNC_CHANGE_FD status usage!"
338
0
        "You should use this status only from the"
339
0
        "resume function in case something went wrong"
340
0
        "and you have other alternatives!\n");
341
0
    shm_free(ctx);
342
0
    return -1;
343
0
  } else {
344
    /* generic error, go for resume route, report it to script */
345
0
    shm_free(ctx);
346
0
    return -1;
347
0
  }
348
349
  /* ctx is to be used from this point further */
350
351
0
  if (report_route) {
352
0
    ctx->report_route = dup_ref_script_route_in_shm( report_route, 0);
353
0
    if (!ref_script_route_is_valid(ctx->report_route)) {
354
0
      LM_ERR("failed dup resume route -> ignoring it\n");
355
0
      if (ctx->report_route) {
356
0
        shm_free(ctx->report_route);
357
0
        ctx->report_route = NULL;
358
0
      }
359
0
    } else {
360
0
      if (report_route_param) {
361
0
        ctx->report_route_param.s = (char *)(ctx+1);
362
0
        ctx->report_route_param.len = report_route_param->len;
363
0
        memcpy(ctx->report_route_param.s, report_route_param->s,
364
0
          report_route_param->len);
365
0
      } else {
366
0
        ctx->report_route_param.s = NULL;
367
0
        ctx->report_route_param.len = 0;
368
0
      }
369
0
    }
370
0
  }
371
372
0
  if (async_status!=ASYNC_NO_FD) {
373
0
    LM_DBG("placing launch job into reactor\n");
374
    /* place the FD + resume function (as param) into reactor */
375
0
    if (reactor_add_reader(fd,F_LAUNCH_ASYNC,RCT_PRIO_ASYNC,(void*)ctx)<0){
376
0
      LM_ERR("failed to add async FD to reactor -> act in sync mode\n");
377
0
      goto sync;
378
0
    }
379
0
  }
380
381
0
  async_status = ASYNC_DONE;
382
383
  /* done, return to the script */
384
0
  return 1;
385
0
sync:
386
  /* run the resume function */
387
0
  LM_DBG("running launch job in sync mode\n");
388
0
  do {
389
0
    async_status = ASYNC_DONE;
390
0
    return_code = ((async_resume_module*)(ctx->async.resume_f))
391
0
      ( fd, msg, ctx->async.resume_param );
392
0
    if (async_status == ASYNC_CHANGE_FD)
393
0
      fd = return_code;
394
0
  } while(async_status==ASYNC_CONTINUE||async_status==ASYNC_CHANGE_FD);
395
  /* the IO completed, so report now */
396
0
report:
397
0
  if (ctx->report_route)
398
0
    shm_free(ctx->report_route);
399
0
  shm_free(ctx);
400
0
  if (!ref_script_route_check_and_update(report_route))
401
0
    return 1;
402
403
  /* run the report route inline */
404
0
  req = get_dummy_sip_msg();
405
0
  if(req == NULL) {
406
0
    LM_ERR("No more memory\n");
407
0
    return -1;
408
0
  }
409
410
0
  if (report_route_param)
411
0
    route_params_push_level(
412
0
      sroutes->request[report_route->idx].name,
413
0
      report_route_param, NULL,
414
0
      launch_route_param_get);
415
416
0
  bak_avps = set_avp_list(&report_avps);
417
418
0
  swap_route_type( bk_rt, REQUEST_ROUTE);
419
0
  run_top_route( sroutes->request[report_route->idx], req);
420
0
  set_route_type( bk_rt );
421
422
0
  if (report_route_param)
423
0
    route_params_pop_level();
424
425
0
  destroy_avp_list(&report_avps);
426
0
  set_avp_list(bak_avps);
427
428
0
  release_dummy_sip_msg(req);
429
430
0
  return 1;
431
0
}
432
433
434