Coverage Report

Created: 2025-07-11 06:28

/src/opensips/evi/event_interface.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2011 OpenSIPS Solutions
3
 *
4
 * This file is part of opensips, a free SIP server.
5
 *
6
 * opensips is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * opensips is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19
 *
20
 *
21
 * history:
22
 * ---------
23
 *  2011-05-xx  created (razvancrainea)
24
 */
25
26
#include "event_interface.h"
27
#include "evi_modules.h"
28
#include "../mem/shm_mem.h"
29
#include "../mi/mi.h"
30
#include "../pvar.h"
31
#include "../timer.h"
32
#include "../ut.h"
33
#include "../ipc.h"
34
35
36
int events_no = 0;
37
int max_alloc_events = 10;
38
static int events_rec_level = MAX_REC_LEV;
39
40
/* holds all exported events */
41
evi_event_t *events = NULL;
42
43
event_id_t evi_publish_event(str event_name)
44
0
{
45
0
  int idx;
46
47
0
  if (event_name.len > MAX_EVENT_NAME) {
48
0
    LM_ERR("event name too long [%d>%d]\n", event_name.len, MAX_EVENT_NAME);
49
0
    return EVI_ERROR;
50
0
  }
51
52
0
  idx = evi_get_id(&event_name);
53
0
  if (idx != EVI_ERROR) {
54
0
    LM_WARN("Event \"%.*s\" was previously published\n",
55
0
        event_name.len, event_name.s);
56
0
    return idx;
57
0
  }
58
59
  /* check if the event was already registered */
60
0
  if (!events) {
61
    /* first event */
62
0
    events = shm_malloc(max_alloc_events * sizeof(evi_event_t));
63
0
    if (!events) {
64
0
      LM_ERR("no more shm memory to hold %d events\n", max_alloc_events);
65
0
      return EVI_ERROR;
66
0
    }
67
0
  } else if (events_no == max_alloc_events) {
68
0
    max_alloc_events *= 2;
69
0
    events = shm_realloc(events, max_alloc_events * sizeof(evi_event_t));
70
0
    if (!events) {
71
0
      LM_ERR("no more shm memory to hold %d events\n", max_alloc_events);
72
0
      return EVI_ERROR;
73
0
    }
74
0
  }
75
76
0
  events[events_no].lock = lock_alloc();
77
0
  if (!events[events_no].lock) {
78
0
    LM_ERR("Failed to allocate subscribers lock\n");
79
0
    return EVI_ERROR;
80
0
  }
81
0
  events[events_no].lock = lock_init(events[events_no].lock);
82
0
  if (!events[events_no].lock) {
83
0
    LM_ERR("Failed to create subscribers lock\n");
84
0
    return EVI_ERROR;
85
0
  }
86
87
0
  events[events_no].id = events_no;
88
0
  events[events_no].name.s = event_name.s;
89
0
  events[events_no].name.len = event_name.len;
90
0
  events[events_no].subscribers = NULL;
91
0
  LM_INFO("Registered event <%.*s(%d)>\n", event_name.len, event_name.s, events_no);
92
93
0
  return events_no++;
94
0
}
95
96
int evi_raise_event(event_id_t id, evi_params_t* params)
97
0
{
98
0
  int status;
99
0
  struct sip_msg* req= NULL;
100
0
  struct usr_avp *event_avps = 0;
101
0
  struct usr_avp **bak_avps = 0;
102
103
  /*
104
   * because these might be nested, a different message has
105
   * to be generated each time
106
   */
107
0
  req = get_dummy_sip_msg();
108
0
  if(req == NULL)
109
0
  {
110
0
    LM_ERR("No more memory\n");
111
0
    return -1;
112
0
  }
113
114
0
  bak_avps = set_avp_list(&event_avps);
115
116
0
  status = evi_raise_event_msg(req, id, params);
117
118
  /* clean whatever extra structures were added by script functions */
119
0
  release_dummy_sip_msg(req);
120
121
  /* remove all avps added */
122
0
  destroy_avp_list(&event_avps);
123
0
  set_avp_list(bak_avps);
124
125
0
  return status;
126
0
}
127
/* this function checks the subscribers of an event and remove them if 
128
they are past their expiry - dont want to print expired events as well */
129
0
void evi_remove_expired_subs(event_id_t id) {
130
0
  evi_subs_p subs, prev;
131
0
  long now;
132
133
0
  lock_get(events[id].lock);
134
0
  now = time(0);
135
0
  subs = events[id].subscribers;
136
0
  prev = NULL;
137
0
  while (subs) {
138
0
    if (!subs->reply_sock) {
139
0
      LM_ERR("unknown destination\n");
140
0
      goto next_sub;
141
0
    }
142
    /* check expire */
143
0
    if (!(subs->reply_sock->flags & EVI_PENDING) &&
144
0
      subs->reply_sock->flags & EVI_EXPIRE &&
145
0
      subs->reply_sock->subscription_time +
146
0
      subs->reply_sock->expire < now) {
147
0
      LM_DBG("removing expired subscriber %.*s:%.*s:%d for event %.*s\n",
148
0
        subs->trans_mod->proto.len,subs->trans_mod->proto.s,
149
0
        subs->reply_sock->address.len, subs->reply_sock->address.s,
150
0
        subs->reply_sock->port,events[id].name.len,events[id].name.s);
151
0
      if (subs->trans_mod && subs->trans_mod->free)
152
0
        subs->trans_mod->free(subs->reply_sock);
153
0
      else
154
0
        shm_free(subs->reply_sock);
155
0
      if (!prev) {
156
0
        events[id].subscribers = subs->next;
157
0
        shm_free(subs);
158
0
        subs = events[id].subscribers;
159
0
      } else {
160
0
        prev->next = subs->next;
161
0
        shm_free(subs);
162
0
        subs = prev->next;
163
0
      }
164
0
      continue;
165
0
    }
166
0
next_sub:
167
0
    prev = subs;
168
0
    subs = subs->next;
169
0
  }
170
0
  lock_release(events[id].lock);
171
0
}
172
/* XXX: this function should release its parameters before exiting */
173
int evi_raise_event_msg(struct sip_msg *msg, event_id_t id, evi_params_t* params)
174
0
{
175
0
  evi_subs_p subs, prev;
176
0
  evi_async_ctx_t async_status = {NULL, NULL};
177
0
  long now;
178
0
  int flags, pflags = 0;
179
0
  int ret = 0;
180
181
0
  if (id < 0 || id >= events_no) {
182
0
    LM_ERR("invalid event %d\n", id);
183
0
    goto free;
184
0
  }
185
186
0
  if (events_rec_level == 0) {
187
0
    LM_ERR("Too many nested events %d\n", MAX_REC_LEV);
188
0
    goto free;
189
0
  }
190
0
  events_rec_level--;
191
0
  if (params)
192
0
    pflags = params->flags;
193
194
0
  lock_get(events[id].lock);
195
0
  now = time(0);
196
0
  subs = events[id].subscribers;
197
0
  prev = NULL;
198
0
  while (subs) {
199
0
    if (!subs->reply_sock) {
200
0
      LM_ERR("unknown destination\n");
201
0
      continue;
202
0
    }
203
    /* check expire */
204
0
    if (!(subs->reply_sock->flags & EVI_PENDING) &&
205
0
        subs->reply_sock->flags & EVI_EXPIRE &&
206
0
        subs->reply_sock->subscription_time +
207
0
        subs->reply_sock->expire < now) {
208
0
      if (subs->trans_mod && subs->trans_mod->free)
209
0
        subs->trans_mod->free(subs->reply_sock);
210
0
      else
211
0
        shm_free(subs->reply_sock);
212
0
      if (!prev) {
213
0
        events[id].subscribers = subs->next;
214
0
        shm_free(subs);
215
0
        subs = events[id].subscribers;
216
0
      } else {
217
0
        prev->next = subs->next;
218
0
        shm_free(subs);
219
0
        subs = prev->next;
220
0
      }
221
0
      continue;
222
0
    }
223
224
0
    if (!subs->trans_mod) {
225
0
      LM_ERR("unknown transfer protocol\n");
226
0
      goto next;
227
0
    }
228
229
0
    LM_DBG("found subscriber %.*s\n",
230
0
        subs->reply_sock->address.len, subs->reply_sock->address.s);
231
0
    if (!subs->trans_mod->raise) {
232
0
      LM_ERR("\"%.*s\" protocol cannot raise events\n",
233
0
          subs->trans_mod->proto.len, subs->trans_mod->proto.s);
234
0
      goto next;
235
0
    }
236
    /* we use this var to make sure nested calls don't reset the flag */
237
0
    flags = subs->reply_sock->flags;
238
0
    subs->reply_sock->flags |= EVI_PENDING;
239
    /* make sure nested events don't deadlock */
240
0
    lock_release(events[id].lock);
241
242
0
    ret += (subs->trans_mod->raise)(msg, &events[id].name,
243
0
          subs->reply_sock, params, &async_status);
244
245
0
    lock_get(events[id].lock);
246
0
    subs->reply_sock->flags = flags;
247
0
next:
248
0
    prev = subs;
249
0
    subs = subs->next;
250
0
  }
251
0
  lock_release(events[id].lock);
252
253
0
  events_rec_level++;
254
0
  if (params)
255
0
    params->flags = pflags;
256
0
free:
257
  /* done sending events - free parameters */
258
0
  if (params) {
259
    /* make sure no one is messing with our flags */
260
0
    if (params->flags & EVI_FREE_LIST)
261
0
      evi_free_params(params);
262
0
  }
263
0
  return ret;
264
265
0
}
266
267
int evi_probe_event(event_id_t id)
268
0
{
269
0
  if (id < 0 || id >= events_no) {
270
0
    LM_ERR("invalid event %d\n", id);
271
0
    return -1;
272
0
  }
273
274
  /* check for subscribers */
275
0
  if (!events[id].subscribers)
276
0
    return 0;
277
278
  /* returns the number of transport module loaded */
279
0
  return get_trans_mod_no();
280
0
}
281
282
283
/* returns the id of an event */
284
event_id_t evi_get_id(str *name)
285
0
{
286
0
  int i;
287
0
  for (i = 0; i < events_no; i++)
288
0
    if (events[i].name.len == name->len &&
289
0
        !memcmp(events[i].name.s, name->s, name->len))
290
0
      return i;
291
0
  return EVI_ERROR;
292
0
}
293
294
295
/* returns an event id */
296
evi_event_p evi_get_event(str *name)
297
0
{
298
0
  event_id_t id = evi_get_id(name);
299
0
  return id == EVI_ERROR ? NULL : &events[id];
300
0
}
301
302
/*
303
 * Subscribes an event
304
 * Returns:
305
 *  1 - success
306
 *  0 - internal error
307
 * -1 - param error
308
 */
309
int evi_event_subscribe(str event_name,
310
    str sock_str, unsigned expire, unsigned unsubscribe)
311
0
{
312
0
  evi_subs_t *subscriber = NULL;
313
0
  evi_event_p event;
314
0
  const evi_export_t *trans_mod = NULL;
315
0
  evi_reply_sock *sock;
316
317
0
  event = evi_get_event(&event_name);
318
0
  if (!event) {
319
0
    LM_ERR("invalid event name <%.*s>\n",
320
0
        event_name.len, event_name.s);
321
0
    goto bad_param;
322
0
  }
323
324
  /* transport module name */
325
0
  trans_mod = get_trans_mod(&sock_str);
326
0
  if (!trans_mod) {
327
0
    LM_ERR("couldn't find a protocol to support %.*s\n",
328
0
        sock_str.len, sock_str.s);
329
0
    goto bad_param;
330
0
  }
331
0
  sock_str.s += trans_mod->proto.len + 1;
332
0
  sock_str.len -= (trans_mod->proto.len + 1);
333
334
  /* parse reply socket */
335
0
  sock = trans_mod->parse(sock_str);
336
0
  if (!sock)
337
0
    goto bad_param;
338
  /* reset unrequired flags */
339
0
  if (!expire && !unsubscribe)
340
0
    sock->flags &= ~EVI_EXPIRE;
341
342
  /* tries to match other socket */
343
0
  if (trans_mod->match) {
344
0
    lock_get(event->lock);
345
0
    for (subscriber = event->subscribers; subscriber;
346
0
        subscriber = subscriber->next) {
347
0
      if (subscriber->trans_mod != trans_mod)
348
0
        continue;
349
0
      if (trans_mod->match(sock, subscriber->reply_sock)) {
350
        /* update subscription time */
351
0
        subscriber->reply_sock->subscription_time = time(0);
352
        /* update expire if required */
353
0
        if (EVI_EXPIRE & sock->flags) {
354
0
          subscriber->reply_sock->expire = expire;
355
0
          subscriber->reply_sock->flags = sock->flags;
356
0
        }
357
0
        if (trans_mod->free)
358
0
          trans_mod->free(sock);
359
0
        else
360
0
          shm_free(sock);
361
0
        break;
362
0
      }
363
0
    }
364
0
    lock_release(event->lock);
365
0
  }
366
367
  /* if no socket matches - create a new one */
368
0
  if (!subscriber) {
369
0
    subscriber = shm_malloc(sizeof(evi_subs_t));
370
0
    if (!subscriber) {
371
0
      LM_ERR("no more shm memory\n");
372
0
      if (trans_mod && sock) {
373
        /* if the module has it's own free function */
374
0
        if (trans_mod->free)
375
0
          trans_mod->free(sock);
376
0
        else
377
0
          shm_free(sock);
378
0
      }
379
0
      return 0;
380
0
    }
381
382
0
    sock->subscription_time = time(0);
383
0
    subscriber->trans_mod = trans_mod;
384
0
    subscriber->reply_sock = sock;
385
386
0
    if (EVI_EXPIRE & sock->flags)
387
0
      subscriber->reply_sock->expire = expire;
388
0
    subscriber->reply_sock->flags |= trans_mod->flags;
389
390
    /* guard subscribers list */
391
0
    lock_get(event->lock);
392
0
    subscriber->next = event->subscribers;
393
0
    event->subscribers = subscriber;
394
0
    lock_release(event->lock);
395
0
    LM_DBG("added new subscriber for event %d\n", event->id);
396
0
  }
397
398
0
  return 1;
399
0
bad_param:
400
0
  return -1;
401
0
}
402
403
int evi_raise_script_event(struct sip_msg *msg, event_id_t id, void * _a, void * _v)
404
0
{
405
0
  pv_spec_p vals = (pv_spec_p)_v;
406
0
  pv_spec_p attrs = (pv_spec_p)_a;
407
0
  struct usr_avp *v_avp = NULL;
408
0
  struct usr_avp *a_avp = NULL;
409
0
  int err = evi_probe_event(id);
410
0
  int_str val, attr;
411
0
  str *at;
412
0
  evi_params_p params = NULL;
413
414
0
  if (err < 0)
415
0
    return err;
416
0
  else if (!err)
417
0
    return 1;
418
419
0
  if (!vals)
420
0
    goto raise;
421
0
  if (!(params = evi_get_params())) {
422
0
    LM_ERR("cannot create parameters list\n");
423
0
    goto raise;
424
0
  }
425
426
  /* handle parameters */
427
0
  while ((v_avp = search_first_avp(vals->pvp.pvn.u.isname.type,
428
0
          vals->pvp.pvn.u.isname.name.n, &val, v_avp))) {
429
0
    at = NULL;
430
    /* check attribute */
431
0
    if (attrs) {
432
0
      err = -1;
433
0
      a_avp = search_first_avp(attrs->pvp.pvn.u.isname.type,
434
0
          attrs->pvp.pvn.u.isname.name.n, &attr, a_avp);
435
0
      if (!a_avp) {
436
0
        LM_ERR("missing attribute\n");
437
0
        goto error;
438
0
      }
439
0
      if (!(a_avp->flags & AVP_VAL_STR)) {
440
0
        LM_ERR("invalid attribute name - must be string\n");
441
0
        goto error;
442
0
      }
443
0
      at = &attr.s;
444
0
    }
445
446
0
    if (v_avp->flags & AVP_VAL_STR)
447
0
      err = evi_param_add_str(params, at, &val.s);
448
0
    else
449
0
      err = evi_param_add_int(params, at, &val.n);
450
0
    if (err) {
451
0
      LM_ERR("error while adding parameter\n");
452
0
      goto error;
453
0
    }
454
0
  }
455
456
  /* check if there were too many attribute names */
457
0
  if (attrs && a_avp && search_first_avp(attrs->pvp.pvn.u.isname.type,
458
0
        attrs->pvp.pvn.u.isname.name.n, &attr, a_avp)) {
459
    /* only signal error - continue */
460
0
    LM_ERR("too many attribute names\n");
461
0
  }
462
463
0
raise:
464
0
  err = evi_raise_event_msg(msg, id, params);
465
0
  return err ? err : 1;
466
0
error:
467
0
  evi_free_params(params);
468
0
  return -1;
469
0
}
470
471
472
static mi_response_t *mi_event_subscribe(const mi_params_t *params, unsigned int expire)
473
0
{
474
0
  int ret;
475
0
  str event_name, transport_sock;
476
477
0
  if (get_mi_string_param(params, "event", &event_name.s,
478
0
    &event_name.len) < 0 || !event_name.s || !event_name.len)
479
0
    return init_mi_param_error();
480
481
0
  if (get_mi_string_param(params, "socket", &transport_sock.s,
482
0
    &transport_sock.len) < 0 || !transport_sock.s || !transport_sock.len)
483
0
    return init_mi_param_error();
484
485
0
  ret = evi_event_subscribe(event_name, transport_sock, expire, 1);
486
0
  if (ret < 0)
487
0
    return init_mi_error(400, MI_SSTR("Bad parameter value"));
488
489
0
  return ret ? init_mi_result_ok() : 0;
490
0
}
491
492
mi_response_t *w_mi_event_subscribe(const mi_params_t *params,
493
                struct mi_handler *async_hdl)
494
0
{
495
0
  return mi_event_subscribe(params, DEFAULT_EXPIRE);
496
0
}
497
498
mi_response_t *w_mi_event_subscribe_1(const mi_params_t *params,
499
                struct mi_handler *async_hdl)
500
0
{
501
0
  int expire;
502
503
0
  if (get_mi_int_param(params, "expire", &expire) < 0)
504
0
    return init_mi_param_error();
505
506
0
  if (expire < 0)
507
0
    return init_mi_error_extra(JSONRPC_INVAL_PARAMS_CODE,
508
0
      MI_SSTR(JSONRPC_INVAL_PARAMS_MSG),
509
0
      MI_SSTR("Negative expire value"));
510
511
0
  return mi_event_subscribe(params, expire);
512
0
}
513
514
515
/* used to list all the registered events */
516
mi_response_t *mi_events_list(const mi_params_t *params,
517
                struct mi_handler *async_hdl)
518
0
{
519
0
  mi_response_t *resp;
520
0
  mi_item_t *resp_obj;
521
0
  mi_item_t *events_arr, *event_item;
522
0
  unsigned i;
523
524
0
  resp = init_mi_result_object(&resp_obj);
525
0
  if (!resp)
526
0
    return 0;
527
528
0
  events_arr = add_mi_array(resp_obj, MI_SSTR("Events"));
529
0
  if (!events_arr) {
530
0
    free_mi_response(resp);
531
0
    return 0;
532
0
  }
533
534
0
  for (i = 0; i < events_no; i++) {
535
0
    event_item = add_mi_object(events_arr, NULL, 0);
536
0
    if (!event_item)
537
0
      goto error;
538
539
0
    if (add_mi_string(event_item, MI_SSTR("name"),
540
0
      events[i].name.s, events[i].name.len) < 0)
541
0
      goto error;
542
543
0
    if (add_mi_number(event_item, MI_SSTR("id"), events[i].id) < 0)
544
0
      goto error;
545
0
  }
546
547
0
  return resp;
548
549
0
error:
550
0
  free_mi_response(resp);
551
0
  return 0;
552
0
}
553
554
static int evi_print_subscriber(mi_item_t *subs_obj, evi_subs_p subs)
555
0
{
556
0
  evi_reply_sock *sock;
557
0
  str socket;
558
0
  long now;
559
0
  int expiry_countdown;
560
561
0
  if (!subs || !subs->trans_mod || !subs->trans_mod->print) {
562
0
    LM_ERR("subscriber does not have a print method exported\n");
563
0
    return -1;
564
0
  }
565
566
0
  sock = subs->reply_sock;
567
0
  if (!sock) {
568
0
    LM_DBG("no socket specified\n");
569
0
    if (add_mi_string(subs_obj, MI_SSTR("protocol"),
570
0
      subs->trans_mod->proto.s, subs->trans_mod->proto.len) < 0)
571
0
      return -1;
572
0
    return 0;
573
0
  }
574
575
0
  socket = subs->trans_mod->print(sock);
576
0
  LM_DBG("print subscriber socket <%.*s> %d\n",
577
0
      socket.len, socket.s, socket.len);
578
0
  if (add_mi_string_fmt(subs_obj, MI_SSTR("socket"), "%.*s:%.*s",
579
0
      subs->trans_mod->proto.len, subs->trans_mod->proto.s,
580
0
      socket.len, socket.s) < 0)
581
0
    return -1;
582
0
  if (sock->flags & EVI_EXPIRE) {
583
    /* indicate the expiration time used for the event*/
584
0
    if (add_mi_number(subs_obj, MI_SSTR("expire"), sock->expire) < 0)
585
0
        return -1;
586
0
    now = time(0);
587
    /* calculate the remaining time for the subscriber to be expired */
588
0
    expiry_countdown = sock->expire - (now - subs->reply_sock->subscription_time);
589
0
    if (expiry_countdown > 0) {
590
0
      if (add_mi_number(subs_obj, MI_SSTR("ttl"), expiry_countdown) < 0)
591
0
        return -1;
592
0
    }else{
593
      /* Mark event as expired if time-to-expire is reached*/
594
0
      if (add_mi_string(subs_obj, MI_SSTR("ttl"), MI_SSTR("expired")) < 0)
595
0
        return -1;
596
0
    }
597
0
  } else {
598
0
    if (add_mi_string(subs_obj, MI_SSTR("expire"), MI_SSTR("never")) < 0)
599
0
      return -1;
600
0
  }
601
  /* XXX - does subscription time make sense? */
602
603
0
  return 0;
604
0
}
605
606
static int evi_print_event(mi_item_t *ev_obj, evi_event_t *ev, evi_subs_p subs)
607
0
{
608
0
  mi_item_t *subs_arr, *subs_item;
609
610
  /* add event only if there are subscribers */
611
0
  if (!subs && !ev->subscribers)
612
0
    return 0;
613
614
0
  if (add_mi_string(ev_obj, MI_SSTR("name"), ev->name.s, ev->name.len) < 0)
615
0
    goto error;
616
617
0
  if (add_mi_number(ev_obj, MI_SSTR("id"), ev->id) < 0)
618
0
    goto error;
619
620
0
  if (subs) {
621
0
    subs_item = add_mi_object(ev_obj, MI_SSTR("Subscriber"));
622
0
    if (!subs_item)
623
0
      goto error;
624
625
0
    if (evi_print_subscriber(subs_item, subs) < 0) {
626
0
      LM_ERR("cannot print subscriber info\n");
627
0
      goto error;
628
0
    }
629
0
  } else {
630
0
    subs_arr = add_mi_array(ev_obj, MI_SSTR("Subscribers"));
631
0
    if (!subs_arr)
632
0
      goto error;
633
634
0
    for (subs = ev->subscribers; subs; subs = subs->next) {
635
0
      subs_item = add_mi_object(subs_arr, NULL, 0);
636
0
      if (!subs_item)
637
0
        goto error;
638
639
0
      if (evi_print_subscriber(subs_item, subs) < 0) {
640
0
        LM_ERR("cannot print subscriber info\n");
641
0
        goto error;
642
0
      }
643
0
    }
644
0
  }
645
0
  return 0;
646
647
0
error:
648
0
  return -1;
649
0
}
650
651
static evi_subs_p evi_get_subscriber(evi_event_p event, str sock_str)
652
0
{
653
0
  const evi_export_t * trans_mod;
654
0
  evi_subs_p subscriber = NULL;
655
0
  evi_reply_sock * sock;
656
657
  /* transport module name */
658
0
  trans_mod = get_trans_mod(&sock_str);
659
0
  if (!trans_mod) {
660
0
    LM_DBG("couldn't find a protocol to support %.*s\n",
661
0
        sock_str.len, sock_str.s);
662
0
    return NULL;
663
0
  }
664
0
  sock_str.s += trans_mod->proto.len + 1;
665
0
  sock_str.len -= (trans_mod->proto.len + 1);
666
667
  /* parse reply socket */
668
0
  sock = trans_mod->parse(sock_str);
669
0
  if (!sock)
670
0
    return NULL;
671
672
  /* tries to match other socket */
673
0
  if (trans_mod->match) {
674
0
    lock_get(event->lock);
675
0
    for (subscriber = event->subscribers; subscriber;
676
0
        subscriber = subscriber->next) {
677
0
      if (subscriber->trans_mod != trans_mod)
678
0
        continue;
679
0
      if (trans_mod->match(sock, subscriber->reply_sock)) {
680
0
        if (trans_mod->free)
681
0
          trans_mod->free(sock);
682
0
        else
683
0
          shm_free(sock);
684
0
        break;
685
0
      }
686
0
    }
687
0
    lock_release(event->lock);
688
0
  }
689
0
  return subscriber;
690
0
}
691
692
static mi_response_t *mi_subscribers_list(evi_event_p event, evi_subs_p subs)
693
0
{
694
0
  mi_response_t *resp;
695
0
  mi_item_t *resp_obj;
696
0
  mi_item_t *event_obj;
697
698
0
  resp = init_mi_result_object(&resp_obj);
699
0
  if (!resp)
700
0
    return 0;
701
702
0
  event_obj = add_mi_object(resp_obj, MI_SSTR("Event"));
703
0
  if (!event_obj)
704
0
    goto error;
705
706
0
  if (evi_print_event(event_obj, event, subs) < 0) {
707
0
    LM_ERR("cannot print event %.*s info\n",
708
0
      event->name.len, event->name.s);
709
0
    goto error;
710
0
  }
711
712
0
  return resp;
713
714
0
error:
715
0
  free_mi_response(resp);
716
0
  return NULL;
717
0
}
718
719
/* used to list all subscribers */
720
mi_response_t *w_mi_subscribers_list(const mi_params_t *params,
721
                struct mi_handler *async_hdl)
722
0
{
723
0
  mi_response_t *resp;
724
0
  mi_item_t *resp_obj;
725
0
  mi_item_t *events_arr, *event_item;
726
0
  int i;
727
728
0
  resp = init_mi_result_object(&resp_obj);
729
0
  if (!resp)
730
0
    return 0;
731
732
0
  events_arr = add_mi_array(resp_obj, MI_SSTR("Events"));
733
0
  if (!events_arr)
734
0
    goto error;
735
736
0
  for (i = 0; i < events_no; i++) {
737
    /* before printing the subs list check for any expired subscribers and remove them*/
738
0
    evi_remove_expired_subs(events[i].id);
739
0
    if (!events[i].subscribers)
740
0
      continue;
741
742
0
    event_item = add_mi_object(events_arr, NULL, 0);
743
0
    if (!event_item)
744
0
      goto error;
745
746
0
    if (evi_print_event(event_item, &events[i], NULL) < 0) {
747
0
      LM_ERR("cannot print event %.*s info\n",
748
0
        events[i].name.len, events[i].name.s);
749
0
      goto error;
750
0
    }
751
0
  }
752
753
0
  return resp;
754
755
0
error:
756
0
  free_mi_response(resp);
757
0
  return 0;
758
0
}
759
760
mi_response_t *w_mi_subscribers_list_1(const mi_params_t *params,
761
                struct mi_handler *async_hdl)
762
0
{
763
0
  str event_s;
764
0
  evi_event_p event;
765
0
  int evid;
766
0
  if (get_mi_string_param(params, "event", &event_s.s, &event_s.len) < 0)
767
0
    return init_mi_param_error();
768
769
0
  event = evi_get_event(&event_s);
770
0
  if (!event)
771
0
    return init_mi_error(404, MI_SSTR("Event not published"));
772
  /* get the event id & before printing the subs list check for any expired subscribers and remove them*/
773
0
  evid = evi_get_id(&event_s);
774
0
  if (evid == -1)
775
0
    return init_mi_error(404, MI_SSTR("Can't get the id"));
776
777
0
  evi_remove_expired_subs(evid);
778
779
0
  return mi_subscribers_list(event, NULL);
780
0
}
781
782
mi_response_t *w_mi_subscribers_list_2(const mi_params_t *params,
783
                struct mi_handler *async_hdl)
784
0
{
785
0
  str event_s;
786
0
  str subs_s;
787
0
  evi_event_p event;
788
0
  evi_subs_p subs;
789
0
  int evid;
790
0
  if (get_mi_string_param(params, "event", &event_s.s, &event_s.len) < 0)
791
0
    return init_mi_param_error();
792
793
0
  event = evi_get_event(&event_s);
794
0
  if (!event)
795
0
    return init_mi_error(404, MI_SSTR("Event not published"));
796
  /* get the event id & before printing the subs list check for any expired subscribers and remove them*/
797
0
  evid = evi_get_id(&event_s);
798
0
  if (evid == -1)
799
0
    return init_mi_error(404, MI_SSTR("Can't get the id"));
800
801
0
  evi_remove_expired_subs(evid);
802
803
0
  if (get_mi_string_param(params, "socket", &subs_s.s, &subs_s.len) < 0)
804
0
    return init_mi_param_error();
805
806
0
  subs = evi_get_subscriber(event, subs_s);
807
0
  if (!subs)
808
0
    return init_mi_error(404, MI_SSTR("Subscriber does not exist"));
809
810
0
  return mi_subscribers_list(event, subs);
811
0
}
812
813
evi_params_p mi_raise_event_json_params(str *params)
814
0
{
815
0
  int err;
816
0
  cJSON *param;
817
0
  cJSON *jparams;
818
0
  str name, jstring;
819
0
  evi_params_p eparams = NULL;
820
0
  char *tmp = pkg_malloc(params->len + 1);
821
0
  if (!tmp) {
822
0
    LM_ERR("could not create temporary buffer!\n");
823
0
    return NULL;
824
0
  }
825
0
  memcpy(tmp, params->s, params->len);
826
0
  tmp[params->len] = 0;
827
0
  jparams = cJSON_Parse(tmp);
828
0
  pkg_free(tmp);
829
0
  if (!jparams) {
830
0
    LM_DBG("could not parse json '%.*s'\n", params->len, params->s);
831
0
    return NULL;
832
0
  } else
833
0
    LM_DBG("treating params as json '%.*s'\n", params->len, params->s);
834
835
0
  if (!(jparams->type &cJSON_Object)) {
836
0
    LM_ERR("params json is not an object\n");
837
0
    return NULL;
838
0
  }
839
  /* parse params as json */
840
0
  if (!(eparams = evi_get_params())) {
841
0
    LM_ERR("cannot create parameters list\n");
842
0
    goto error;
843
0
  }
844
0
  for (param = jparams->child; param; param = param->next) {
845
0
    name.s = param->string;
846
0
    name.len = strlen(name.s);
847
0
    switch (param->type) {
848
0
      case cJSON_Number:
849
0
        err = evi_param_add_int(eparams, &name, &param->valueint);
850
0
        break;
851
0
      case cJSON_String:
852
0
        jstring.s = param->valuestring;
853
0
        jstring.len = strlen(jstring.s);
854
0
        err = evi_param_add_str(eparams, &name, &jstring);
855
0
        break;
856
0
      default:
857
0
        jstring.s = cJSON_PrintUnformatted(param);
858
0
        jstring.len = strlen(jstring.s);
859
0
        err = evi_param_add_str(eparams, &name, &jstring);
860
0
        cJSON_PurgeString(jstring.s);
861
0
        break;
862
0
    }
863
0
    if (err) {
864
0
      LM_ERR("could not add parameter %s\n", name.s);
865
0
      goto error_free;
866
0
    }
867
0
  }
868
0
  cJSON_Delete(jparams);
869
0
  return eparams;
870
0
error_free:
871
0
  evi_free_params(eparams);
872
0
error:
873
0
  cJSON_Delete(jparams);
874
0
  return NULL;
875
0
}
876
877
evi_params_p mi_raise_event_array_params(mi_item_t *array, int no)
878
0
{
879
0
  int i;
880
0
  str param;
881
0
  evi_params_p eparams = NULL;
882
883
0
  LM_DBG("treating params as array\n");
884
885
  /* parse params as json */
886
0
  if (!(eparams = evi_get_params())) {
887
0
    LM_ERR("cannot create parameters list\n");
888
0
    return NULL;
889
0
  }
890
891
0
  for (i = 0; i < no; i++) {
892
0
    if (get_mi_arr_param_string(array, i, &param.s, &param.len) < 0) {
893
0
      LM_ERR("cannot fetch array element %d\n", i);
894
0
      goto error;
895
0
    }
896
0
    if (evi_param_add_str(eparams, NULL, &param)) {
897
0
      LM_ERR("cannot add new params %d\n", i);
898
0
      goto error;
899
0
    }
900
0
  }
901
902
0
  return eparams;
903
0
error:
904
0
  evi_free_params(eparams);
905
0
  return NULL;
906
0
}
907
908
struct rpc_raise_event_dispatch {
909
  event_id_t id;
910
  evi_params_p params;
911
};
912
913
914
void rpc_raise_event(int sender, void *param)
915
0
{
916
0
  struct rpc_raise_event_dispatch *p = (struct rpc_raise_event_dispatch *)param;
917
0
  if (evi_raise_event(p->id, p->params))
918
0
    LM_ERR("cannot raise event RPC\n");
919
0
  evi_free_shm_params(p->params);
920
0
  shm_free(p);
921
0
}
922
923
924
int evi_dispatch_event( event_id_t id, evi_params_p params)
925
0
{
926
0
  evi_params_p sparams = NULL;
927
0
  struct rpc_raise_event_dispatch *djob;
928
929
0
  if (params) {
930
0
    sparams = evi_dup_shm_params(params);
931
0
    evi_free_params(params);
932
0
    if (!sparams) {
933
0
      LM_ERR("could not shm duplicate evi params!\n");
934
0
      goto error;
935
0
    }
936
0
  }
937
938
0
  djob = shm_malloc(sizeof (*djob));
939
0
  if (!djob) {
940
0
    LM_ERR("could not allocate new job!\n");
941
0
    goto error;
942
0
  }
943
0
  djob->id = id;
944
0
  djob->params = sparams;
945
946
0
  if (ipc_dispatch_rpc(rpc_raise_event, djob) < 0) {
947
0
    LM_ERR("could not dispatch raise event job!\n");
948
0
    goto error;
949
0
  }
950
951
0
  return 0;
952
0
error:
953
0
  if (sparams)
954
0
    evi_free_shm_params(sparams);
955
0
  return -1;
956
0
}
957
958
959
mi_response_t *w_mi_raise_event(const mi_params_t *params,
960
                struct mi_handler *async_hdl)
961
0
{
962
0
  int no;
963
0
  str event_s;
964
0
  str tparams;
965
0
  event_id_t id;
966
0
  mi_item_t *values;
967
0
  evi_params_p eparams = NULL;
968
969
0
  if (get_mi_string_param(params, "event", &event_s.s, &event_s.len) < 0)
970
0
    return init_mi_param_error();
971
972
0
  id = evi_get_id(&event_s);
973
0
  if (id == EVI_ERROR)
974
0
    return init_mi_error(404, MI_SSTR("Event not registered"));
975
976
  /* check if there are any subscribers */
977
0
  if (!evi_probe_event(id))
978
0
    return init_mi_error(480, MI_SSTR("Temporarily Unavailable"));
979
980
  /* check to see if we have an array params, or key-value one */
981
0
  switch (try_get_mi_array_param(params, "params", &values, &no)) {
982
0
    case -1:
983
0
    case -3:
984
      /* no params used */
985
0
      break;
986
0
    case -2:
987
      /* not an array - most likely it's a string */
988
0
      if (get_mi_string_param(params, "params", &tparams.s, &tparams.len) < 0)
989
0
        return init_mi_error(400, MI_SSTR("No Params"));
990
0
      eparams = mi_raise_event_json_params(&tparams);
991
0
      if (!eparams)
992
0
        return init_mi_error(400, MI_SSTR("Bad Params"));
993
0
      break;
994
0
    case 0:
995
      /* this is an array - push it like this */
996
0
      eparams = mi_raise_event_array_params(values, no);
997
0
      if (!eparams)
998
0
        return init_mi_error(400, MI_SSTR("Bad Params"));
999
0
      break;
1000
0
  }
1001
1002
0
  if (evi_dispatch_event( id, eparams)<0) {
1003
0
    LM_ERR("could not dispatch raise event job!\n");
1004
0
    goto error;
1005
0
  }
1006
1007
0
  return init_mi_result_ok();
1008
0
error:
1009
0
  if (eparams)
1010
0
    evi_free_params(eparams);
1011
0
  return init_mi_error(500, MI_SSTR("Cannot Raise Event"));
1012
0
}