Coverage Report

Created: 2025-07-11 06:28

/src/opensips/async.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2014 OpenSIPS Solutions
3
 *
4
 * This file is part of opensips, a free SIP server.
5
 *
6
 * opensips is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * opensips is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19
 *
20
 *
21
 * history:
22
 * ---------
23
 *  2014-10-15  created (bogdan)
24
 */
25
26
#include "mem/shm_mem.h"
27
#include "dprint.h"
28
#include "reactor_defs.h"
29
#include "async.h"
30
#include "route.h"
31
#include "action.h"
32
#include "sr_module.h"
33
34
int async_status = ASYNC_NO_IO;
35
36
extern int return_code; /* from action.c, return code */
37
38
39
/* start/resume functions used for script async ops */
40
async_script_start_function  *async_script_start_f  = NULL;
41
async_script_resume_function *async_script_resume_f = NULL;
42
43
44
/* async context used by Launch Async operation */
45
typedef struct _async_launch_ctx {
46
  /* generic async context - MUST BE FIRST */
47
  async_ctx  async;
48
  /* ref to the report script route (NULL if none) */
49
  struct script_route_ref *report_route;
50
  str report_route_param;
51
} async_launch_ctx;
52
53
54
55
/************* Functions related to ASYNC via script functions ***************/
56
57
int register_async_script_handlers(async_script_start_function *f1,
58
                      async_script_resume_function *f2)
59
0
{
60
0
  if (async_script_start_f) {
61
0
    LM_ERR("aync script handlers already registered\n");
62
0
    return -1;
63
0
  }
64
65
0
  async_script_start_f = f1;
66
0
  async_script_resume_f = f2;
67
68
0
  return 0;
69
0
}
70
71
72
/************* Functions related to internal ASYNC support ***************/
73
74
int register_async_fd(int fd, async_resume_fd *f, void *resume_param)
75
0
{
76
0
  async_ctx *ctx = NULL;
77
78
0
  if ( (ctx=shm_malloc(sizeof(async_ctx)))==NULL) {
79
0
    LM_ERR("failed to allocate new async_ctx\n");
80
0
    return -1;
81
0
  }
82
83
0
  memset(ctx,0,sizeof(async_ctx));
84
85
0
  ctx->resume_f = f;
86
0
  ctx->resume_param = resume_param;
87
88
  /* place the FD + resume function (as param) into reactor */
89
0
  if (reactor_add_reader( fd, F_FD_ASYNC, RCT_PRIO_ASYNC, (void*)ctx)<0 ) {
90
0
    LM_ERR("failed to add async FD to reactor\n");
91
0
    shm_free(ctx);
92
0
    return -1;
93
0
  }
94
95
0
  return 0;
96
0
}
97
98
99
int async_fd_resume(int fd, void *param)
100
0
{
101
0
  async_ctx *ctx = (async_ctx *)param;
102
0
  int ret;
103
104
0
  async_status = ASYNC_DONE; /* assume default status as done */
105
106
  /* call the resume function in order to read and handle data */
107
0
  ret = ((async_resume_fd*)ctx->resume_f)( fd, ctx->resume_param );
108
0
  if (async_status==ASYNC_CONTINUE) {
109
    /* leave the fd into the reactor*/
110
0
    return 0;
111
0
  } else if (async_status==ASYNC_CHANGE_FD) {
112
0
    if (ret<0) {
113
0
      LM_ERR("ASYNC_CHANGE_FD: given file descriptor shall be "
114
0
        "positive!\n");
115
0
      return 0;
116
0
    } else if (ret>0 && ret==fd) {
117
      /*trying to add the same fd; shall continue*/
118
0
      LM_CRIT("You are trying to replace the old fd with the same fd!"
119
0
          "Will act as in ASYNC_CONTINUE!\n");
120
0
      return 0;
121
0
    }
122
123
    /* remove the old fd from the reactor */
124
0
    reactor_del_reader(fd, -1, IO_FD_CLOSING);
125
0
    fd=ret;
126
127
    /* insert the new fd inside the reactor */
128
0
    if (reactor_add_reader(fd,F_FD_ASYNC,RCT_PRIO_ASYNC,(void*)ctx)<0 ) {
129
0
      LM_ERR("failed to add async FD to reactor -> act in sync mode\n");
130
0
      do {
131
0
        async_status = ASYNC_DONE;
132
0
        ret = ((async_resume_fd*)ctx->resume_f)(fd,ctx->resume_param);
133
0
        if (async_status == ASYNC_CHANGE_FD)
134
0
          fd=ret;
135
0
      } while(async_status==ASYNC_CONTINUE||async_status==ASYNC_CHANGE_FD);
136
0
      goto done;
137
0
    } else {
138
139
      /* successfully changed fd */
140
0
      return 0;
141
0
    }
142
0
  }
143
144
  /* remove from reactor, we are done */
145
0
  reactor_del_reader(fd, -1, IO_FD_CLOSING);
146
147
0
done:
148
0
  if (async_status == ASYNC_DONE_CLOSE_FD)
149
0
    close(fd);
150
151
0
  return 0;
152
0
}
153
154
155
/************* Functions related to ASYNC Launch support ***************/
156
157
int launch_route_param_get(struct sip_msg *msg, pv_param_t *ip,
158
    pv_value_t *res, void *params, void *extra)
159
0
{
160
0
  str *val = (str*)params;
161
162
  /* we do accept here only one param with index */
163
0
  if (ip->pvn.type!=PV_NAME_INTSTR || ip->pvn.u.isname.type!=0
164
0
  || ip->pvn.u.isname.name.n!=1)
165
0
    return pv_get_null(msg, ip, res);
166
167
0
  res->flags = PV_VAL_STR;
168
0
  res->rs.s =val->s;
169
0
  res->rs.len = val->len;
170
171
0
  return 0;
172
0
}
173
174
175
int async_launch_resume(int fd, void *param)
176
0
{
177
0
  struct sip_msg *req;
178
0
  async_launch_ctx *ctx = (async_launch_ctx *)param;
179
0
  int bk_rt;
180
181
0
  LM_DBG("resume for a launch job\n");
182
183
0
  req = get_dummy_sip_msg();
184
0
  if(req == NULL) {
185
0
    LM_ERR("No more memory\n");
186
0
    return -1;
187
0
  }
188
189
0
  async_status = ASYNC_DONE; /* assume default status as done */
190
191
  /* call the resume function in order to read and handle data */
192
0
  return_code = ((async_resume_module*)(ctx->async.resume_f))
193
0
    ( fd, req, ctx->async.resume_param );
194
195
0
  if (async_status==ASYNC_CONTINUE) {
196
    /* do not run the report route, leave the fd into the reactor*/
197
0
    goto restore;
198
199
0
  } else if (async_status==ASYNC_DONE_NO_IO) {
200
    /* don't do any change on the fd, since the module handled everything*/
201
0
    goto run_route;
202
203
0
  } else if (async_status==ASYNC_CHANGE_FD) {
204
0
    if (return_code<0) {
205
0
      LM_ERR("ASYNC_CHANGE_FD: given file descriptor must be "
206
0
        "positive!\n");
207
0
      goto restore;
208
0
    } else if (return_code>0 && return_code==fd) {
209
      /*trying to add the same fd; shall continue*/
210
0
      LM_CRIT("You are trying to replace the old fd with the same fd!"
211
0
          "Will act as in ASYNC_CONTINUE!\n");
212
0
      goto restore;
213
0
    }
214
215
    /* remove the old fd from the reactor */
216
0
    reactor_del_reader(fd, -1, IO_FD_CLOSING);
217
0
    fd=return_code;
218
219
    /* insert the new fd inside the reactor */
220
0
    if (reactor_add_reader(fd, F_LAUNCH_ASYNC, RCT_PRIO_ASYNC,
221
0
    (void*)ctx)<0 ) {
222
0
      LM_ERR("failed to add async FD to reactor -> act in sync mode\n");
223
0
      do {
224
0
        async_status = ASYNC_DONE;
225
0
        return_code = ((async_resume_module*)(ctx->async.resume_f))
226
0
          (fd, req, ctx->async.resume_param );
227
0
        if (async_status == ASYNC_CHANGE_FD)
228
0
          fd=return_code;
229
0
      } while(async_status==ASYNC_CONTINUE||async_status==ASYNC_CHANGE_FD);
230
0
      goto run_route;
231
0
    } else {
232
233
      /* successfully changed fd */
234
0
      goto restore;
235
0
    }
236
0
  }
237
238
  /* remove from reactor, we are done */
239
0
  reactor_del_reader(fd, -1, IO_FD_CLOSING);
240
241
0
run_route:
242
0
  if (async_status == ASYNC_DONE_CLOSE_FD)
243
0
    close(fd);
244
245
0
  if (ref_script_route_check_and_update(ctx->report_route)) {
246
0
    LM_DBG("runinng report route for a launch job,"
247
0
      " route <%s>, param [%.*s]\n",
248
0
      ctx->report_route->name.s,
249
0
      ctx->report_route_param.len, ctx->report_route_param.s);
250
0
    if (ctx->report_route_param.s)
251
0
      route_params_push_level(
252
0
        sroutes->request[ctx->report_route->idx].name, 
253
0
        &ctx->report_route_param, NULL,
254
0
        launch_route_param_get);
255
0
    swap_route_type( bk_rt, REQUEST_ROUTE);
256
0
    run_top_route( sroutes->request[ctx->report_route->idx], req);
257
0
    set_route_type( bk_rt );
258
0
    if (ctx->report_route_param.s)
259
0
      route_params_pop_level();
260
261
    /* remove all added AVP */
262
0
    reset_avps( );
263
0
  }
264
265
  /* no need for the context anymore */
266
0
  if (ctx->report_route)
267
0
    shm_free(ctx->report_route);
268
0
  shm_free(ctx);
269
0
  LM_DBG("done with a launch job\n");
270
271
0
restore:
272
  /* clean whatever extra structures were added by script functions */
273
0
  release_dummy_sip_msg(req);
274
275
0
  return 0;
276
0
}
277
278
279
int async_script_launch(struct sip_msg *msg, struct action* a,
280
                struct script_route_ref *report_route,
281
                str *report_route_param, void **params)
282
0
{
283
0
  struct sip_msg *req;
284
0
  struct usr_avp *report_avps = NULL, **bak_avps = NULL;
285
0
  async_launch_ctx *ctx;
286
0
  int fd = -1;
287
0
  int bk_rt;
288
289
  /* run the function (the action) and get back from it the FD,
290
   * resume function and param */
291
0
  if ( a->type!=AMODULE_T || a->elem[0].type!=ACMD_ST ||
292
0
  a->elem[0].u.data==NULL ) {
293
0
    LM_CRIT("BUG - invalid action for async I/O - it must be"
294
0
      " a MODULE_T ACMD_ST \n");
295
0
    return -1;
296
0
  }
297
298
0
  if ( (ctx=shm_malloc(sizeof(async_launch_ctx) + ( (report_route&&report_route_param)?report_route_param->len:0)))==NULL) {
299
0
    LM_ERR("failed to allocate new ctx, forcing sync mode\n");
300
0
    return -1;
301
0
  }
302
303
0
  memset(ctx,0,sizeof(async_launch_ctx));
304
305
0
  async_status = ASYNC_NO_IO; /*assume defauly status "no IO done" */
306
307
0
  return_code = ((const acmd_export_t*)(a->elem[0].u.data_const))->function(msg,
308
0
      (async_ctx*)ctx,
309
0
      params[0], params[1], params[2],
310
0
      params[3], params[4], params[5],
311
0
      params[6], params[7]);
312
  /* what to do now ? */
313
0
  if (async_status>=0) {
314
    /* async I/O was successfully launched */
315
0
    fd = async_status;
316
0
  } else if (async_status==ASYNC_NO_FD) {
317
    /* async was successfully launched but without a FD resume
318
     * in this case, we need to push the async ctx back to the
319
     * function, so it can trigger the resume later, by itself */
320
0
  } else if (async_status==ASYNC_NO_IO) {
321
    /* no IO, so simply continue with the script */
322
0
    shm_free(ctx);
323
0
    return 1;
324
0
  } else if (async_status==ASYNC_SYNC) {
325
    /* IO already done in SYNC'ed way */
326
0
    goto report;
327
0
  } else if (async_status==ASYNC_CHANGE_FD) {
328
0
    LM_ERR("Incorrect ASYNC_CHANGE_FD status usage!"
329
0
        "You should use this status only from the"
330
0
        "resume function in case something went wrong"
331
0
        "and you have other alternatives!\n");
332
0
    shm_free(ctx);
333
0
    return -1;
334
0
  } else {
335
    /* generic error, go for resume route, report it to script */
336
0
    shm_free(ctx);
337
0
    return -1;
338
0
  }
339
340
  /* ctx is to be used from this point further */
341
342
0
  if (report_route) {
343
0
    ctx->report_route = dup_ref_script_route_in_shm( report_route, 0);
344
0
    if (!ref_script_route_is_valid(ctx->report_route)) {
345
0
      LM_ERR("failed dup resume route -> ignoring it\n");
346
0
      if (ctx->report_route) {
347
0
        shm_free(ctx->report_route);
348
0
        ctx->report_route = NULL;
349
0
      }
350
0
    } else {
351
0
      if (report_route_param) {
352
0
        ctx->report_route_param.s = (char *)(ctx+1);
353
0
        ctx->report_route_param.len = report_route_param->len;
354
0
        memcpy(ctx->report_route_param.s, report_route_param->s,
355
0
          report_route_param->len);
356
0
      } else {
357
0
        ctx->report_route_param.s = NULL;
358
0
        ctx->report_route_param.len = 0;
359
0
      }
360
0
    }
361
0
  }
362
363
0
  if (async_status!=ASYNC_NO_FD) {
364
0
    LM_DBG("placing launch job into reactor\n");
365
    /* place the FD + resume function (as param) into reactor */
366
0
    if (reactor_add_reader(fd,F_LAUNCH_ASYNC,RCT_PRIO_ASYNC,(void*)ctx)<0){
367
0
      LM_ERR("failed to add async FD to reactor -> act in sync mode\n");
368
0
      goto sync;
369
0
    }
370
0
  }
371
372
  /* done, return to the script */
373
0
  return 1;
374
0
sync:
375
  /* run the resume function */
376
0
  LM_DBG("running launch job in sync mode\n");
377
0
  do {
378
0
    async_status = ASYNC_DONE;
379
0
    return_code = ((async_resume_module*)(ctx->async.resume_f))
380
0
      ( fd, msg, ctx->async.resume_param );
381
0
    if (async_status == ASYNC_CHANGE_FD)
382
0
      fd = return_code;
383
0
  } while(async_status==ASYNC_CONTINUE||async_status==ASYNC_CHANGE_FD);
384
  /* the IO completed, so report now */
385
0
report:
386
0
  if (ctx->report_route)
387
0
    shm_free(ctx->report_route);
388
0
  shm_free(ctx);
389
0
  if (!ref_script_route_check_and_update(report_route))
390
0
    return 1;
391
392
  /* run the report route inline */
393
0
  req = get_dummy_sip_msg();
394
0
  if(req == NULL) {
395
0
    LM_ERR("No more memory\n");
396
0
    return -1;
397
0
  }
398
399
0
  bak_avps = set_avp_list(&report_avps);
400
0
  swap_route_type( bk_rt, REQUEST_ROUTE);
401
402
0
  run_top_route( sroutes->request[report_route->idx], req);
403
404
0
  set_route_type( bk_rt );
405
0
  destroy_avp_list(&report_avps);
406
0
  set_avp_list(bak_avps);
407
408
0
  release_dummy_sip_msg(req);
409
410
0
  return 1;
411
0
}
412
413
414