Coverage Report

Created: 2024-09-08 06:06

/src/opensips/evi/event_interface.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2011 OpenSIPS Solutions
3
 *
4
 * This file is part of opensips, a free SIP server.
5
 *
6
 * opensips is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * opensips is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19
 *
20
 *
21
 * history:
22
 * ---------
23
 *  2011-05-xx  created (razvancrainea)
24
 */
25
26
#include "event_interface.h"
27
#include "evi_modules.h"
28
#include "../mem/shm_mem.h"
29
#include "../mi/mi.h"
30
#include "../pvar.h"
31
#include "../timer.h"
32
#include "../ut.h"
33
#include "../ipc.h"
34
35
36
int events_no = 0;
37
int max_alloc_events = 10;
38
static int events_rec_level = MAX_REC_LEV;
39
40
/* holds all exported events */
41
evi_event_t *events = NULL;
42
43
event_id_t evi_publish_event(str event_name)
44
0
{
45
0
  int idx;
46
47
0
  if (event_name.len > MAX_EVENT_NAME) {
48
0
    LM_ERR("event name too long [%d>%d]\n", event_name.len, MAX_EVENT_NAME);
49
0
    return EVI_ERROR;
50
0
  }
51
52
0
  idx = evi_get_id(&event_name);
53
0
  if (idx != EVI_ERROR) {
54
0
    LM_WARN("Event \"%.*s\" was previously published\n",
55
0
        event_name.len, event_name.s);
56
0
    return idx;
57
0
  }
58
59
  /* check if the event was already registered */
60
0
  if (!events) {
61
    /* first event */
62
0
    events = shm_malloc(max_alloc_events * sizeof(evi_event_t));
63
0
    if (!events) {
64
0
      LM_ERR("no more shm memory to hold %d events\n", max_alloc_events);
65
0
      return EVI_ERROR;
66
0
    }
67
0
  } else if (events_no == max_alloc_events) {
68
0
    max_alloc_events *= 2;
69
0
    events = shm_realloc(events, max_alloc_events * sizeof(evi_event_t));
70
0
    if (!events) {
71
0
      LM_ERR("no more shm memory to hold %d events\n", max_alloc_events);
72
0
      return EVI_ERROR;
73
0
    }
74
0
  }
75
76
0
  events[events_no].lock = lock_alloc();
77
0
  if (!events[events_no].lock) {
78
0
    LM_ERR("Failed to allocate subscribers lock\n");
79
0
    return EVI_ERROR;
80
0
  }
81
0
  events[events_no].lock = lock_init(events[events_no].lock);
82
0
  if (!events[events_no].lock) {
83
0
    LM_ERR("Failed to create subscribers lock\n");
84
0
    return EVI_ERROR;
85
0
  }
86
87
0
  events[events_no].id = events_no;
88
0
  events[events_no].name.s = event_name.s;
89
0
  events[events_no].name.len = event_name.len;
90
0
  events[events_no].subscribers = NULL;
91
0
  LM_INFO("Registered event <%.*s(%d)>\n", event_name.len, event_name.s, events_no);
92
93
0
  return events_no++;
94
0
}
95
96
int evi_raise_event(event_id_t id, evi_params_t* params)
97
0
{
98
0
  int status;
99
0
  struct sip_msg* req= NULL;
100
0
  struct usr_avp *event_avps = 0;
101
0
  struct usr_avp **bak_avps = 0;
102
103
  /*
104
   * because these might be nested, a different message has
105
   * to be generated each time
106
   */
107
0
  req = get_dummy_sip_msg();
108
0
  if(req == NULL)
109
0
  {
110
0
    LM_ERR("No more memory\n");
111
0
    return -1;
112
0
  }
113
114
0
  bak_avps = set_avp_list(&event_avps);
115
116
0
  status = evi_raise_event_msg(req, id, params);
117
118
  /* clean whatever extra structures were added by script functions */
119
0
  release_dummy_sip_msg(req);
120
121
  /* remove all avps added */
122
0
  destroy_avp_list(&event_avps);
123
0
  set_avp_list(bak_avps);
124
125
0
  return status;
126
0
}
127
/* this function checks the subscribers of an event and remove them if 
128
they are past their expiry - dont want to print expired events as well */
129
0
void evi_remove_expired_subs(event_id_t id) {
130
0
  evi_subs_p subs, prev;
131
0
  long now;
132
133
0
  lock_get(events[id].lock);
134
0
  now = time(0);
135
0
  subs = events[id].subscribers;
136
0
  prev = NULL;
137
0
  while (subs) {
138
0
    if (!subs->reply_sock) {
139
0
      LM_ERR("unknown destination\n");
140
0
      continue;
141
0
    }
142
    /* check expire */
143
0
    if (!(subs->reply_sock->flags & EVI_PENDING) &&
144
0
      subs->reply_sock->flags & EVI_EXPIRE &&
145
0
      subs->reply_sock->subscription_time +
146
0
      subs->reply_sock->expire < now) {
147
0
      LM_DBG("removing expired subscriber %.*s:%.*s:%d for event %.*s\n",
148
0
        subs->trans_mod->proto.len,subs->trans_mod->proto.s,
149
0
        subs->reply_sock->address.len, subs->reply_sock->address.s,
150
0
        subs->reply_sock->port,events[id].name.len,events[id].name.s);
151
0
      if (subs->trans_mod && subs->trans_mod->free)
152
0
        subs->trans_mod->free(subs->reply_sock);
153
0
      else
154
0
        shm_free(subs->reply_sock);
155
0
      if (!prev) {
156
0
        events[id].subscribers = subs->next;
157
0
        shm_free(subs);
158
0
        subs = events[id].subscribers;
159
0
      } else {
160
0
        prev->next = subs->next;
161
0
        shm_free(subs);
162
0
        subs = prev->next;
163
0
      }
164
0
      continue;
165
0
    }
166
0
    subs = subs->next;
167
0
  }
168
0
  lock_release(events[id].lock);
169
0
}
170
/* XXX: this function should release its parameters before exiting */
171
int evi_raise_event_msg(struct sip_msg *msg, event_id_t id, evi_params_t* params)
172
0
{
173
0
  evi_subs_p subs, prev;
174
0
  evi_async_ctx_t async_status = {NULL, NULL};
175
0
  long now;
176
0
  int flags, pflags = 0;
177
0
  int ret = 0;
178
179
0
  if (id < 0 || id >= events_no) {
180
0
    LM_ERR("invalid event %d\n", id);
181
0
    goto free;
182
0
  }
183
184
0
  if (events_rec_level == 0) {
185
0
    LM_ERR("Too many nested events %d\n", MAX_REC_LEV);
186
0
    goto free;
187
0
  }
188
0
  events_rec_level--;
189
0
  if (params)
190
0
    pflags = params->flags;
191
192
0
  lock_get(events[id].lock);
193
0
  now = time(0);
194
0
  subs = events[id].subscribers;
195
0
  prev = NULL;
196
0
  while (subs) {
197
0
    if (!subs->reply_sock) {
198
0
      LM_ERR("unknown destination\n");
199
0
      continue;
200
0
    }
201
    /* check expire */
202
0
    if (!(subs->reply_sock->flags & EVI_PENDING) &&
203
0
        subs->reply_sock->flags & EVI_EXPIRE &&
204
0
        subs->reply_sock->subscription_time +
205
0
        subs->reply_sock->expire < now) {
206
0
      if (subs->trans_mod && subs->trans_mod->free)
207
0
        subs->trans_mod->free(subs->reply_sock);
208
0
      else
209
0
        shm_free(subs->reply_sock);
210
0
      if (!prev) {
211
0
        events[id].subscribers = subs->next;
212
0
        shm_free(subs);
213
0
        subs = events[id].subscribers;
214
0
      } else {
215
0
        prev->next = subs->next;
216
0
        shm_free(subs);
217
0
        subs = prev->next;
218
0
      }
219
0
      continue;
220
0
    }
221
222
0
    if (!subs->trans_mod) {
223
0
      LM_ERR("unknown transfer protocol\n");
224
0
      goto next;
225
0
    }
226
227
0
    LM_DBG("found subscriber %.*s\n",
228
0
        subs->reply_sock->address.len, subs->reply_sock->address.s);
229
0
    if (!subs->trans_mod->raise) {
230
0
      LM_ERR("\"%.*s\" protocol cannot raise events\n",
231
0
          subs->trans_mod->proto.len, subs->trans_mod->proto.s);
232
0
      goto next;
233
0
    }
234
    /* we use this var to make sure nested calls don't reset the flag */
235
0
    flags = subs->reply_sock->flags;
236
0
    subs->reply_sock->flags |= EVI_PENDING;
237
    /* make sure nested events don't deadlock */
238
0
    lock_release(events[id].lock);
239
240
0
    ret += (subs->trans_mod->raise)(msg, &events[id].name,
241
0
          subs->reply_sock, params, &async_status);
242
243
0
    lock_get(events[id].lock);
244
0
    subs->reply_sock->flags = flags;
245
0
next:
246
0
    prev = subs;
247
0
    subs = subs->next;
248
0
  }
249
0
  lock_release(events[id].lock);
250
251
0
  events_rec_level++;
252
0
  if (params)
253
0
    params->flags = pflags;
254
0
free:
255
  /* done sending events - free parameters */
256
0
  if (params) {
257
    /* make sure no one is messing with our flags */
258
0
    if (params->flags & EVI_FREE_LIST)
259
0
      evi_free_params(params);
260
0
  }
261
0
  return ret;
262
263
0
}
264
265
int evi_probe_event(event_id_t id)
266
0
{
267
0
  if (id < 0 || id >= events_no) {
268
0
    LM_ERR("invalid event %d\n", id);
269
0
    return -1;
270
0
  }
271
272
  /* check for subscribers */
273
0
  if (!events[id].subscribers)
274
0
    return 0;
275
276
  /* returns the number of transport module loaded */
277
0
  return get_trans_mod_no();
278
0
}
279
280
281
/* returns the id of an event */
282
event_id_t evi_get_id(str *name)
283
0
{
284
0
  int i;
285
0
  for (i = 0; i < events_no; i++)
286
0
    if (events[i].name.len == name->len &&
287
0
        !memcmp(events[i].name.s, name->s, name->len))
288
0
      return i;
289
0
  return EVI_ERROR;
290
0
}
291
292
293
/* returns an event id */
294
evi_event_p evi_get_event(str *name)
295
0
{
296
0
  event_id_t id = evi_get_id(name);
297
0
  return id == EVI_ERROR ? NULL : &events[id];
298
0
}
299
300
/*
301
 * Subscribes an event
302
 * Returns:
303
 *  1 - success
304
 *  0 - internal error
305
 * -1 - param error
306
 */
307
int evi_event_subscribe(str event_name,
308
    str sock_str, unsigned expire, unsigned unsubscribe)
309
0
{
310
0
  evi_subs_t *subscriber = NULL;
311
0
  evi_event_p event;
312
0
  const evi_export_t *trans_mod = NULL;
313
0
  evi_reply_sock *sock;
314
315
0
  event = evi_get_event(&event_name);
316
0
  if (!event) {
317
0
    LM_ERR("invalid event name <%.*s>\n",
318
0
        event_name.len, event_name.s);
319
0
    goto bad_param;
320
0
  }
321
322
  /* transport module name */
323
0
  trans_mod = get_trans_mod(&sock_str);
324
0
  if (!trans_mod) {
325
0
    LM_ERR("couldn't find a protocol to support %.*s\n",
326
0
        sock_str.len, sock_str.s);
327
0
    goto bad_param;
328
0
  }
329
0
  sock_str.s += trans_mod->proto.len + 1;
330
0
  sock_str.len -= (trans_mod->proto.len + 1);
331
332
  /* parse reply socket */
333
0
  sock = trans_mod->parse(sock_str);
334
0
  if (!sock)
335
0
    goto bad_param;
336
  /* reset unrequired flags */
337
0
  if (!expire && !unsubscribe)
338
0
    sock->flags &= ~EVI_EXPIRE;
339
340
  /* tries to match other socket */
341
0
  if (trans_mod->match) {
342
0
    lock_get(event->lock);
343
0
    for (subscriber = event->subscribers; subscriber;
344
0
        subscriber = subscriber->next) {
345
0
      if (subscriber->trans_mod != trans_mod)
346
0
        continue;
347
0
      if (trans_mod->match(sock, subscriber->reply_sock)) {
348
        /* update subscription time */
349
0
        subscriber->reply_sock->subscription_time = time(0);
350
        /* update expire if required */
351
0
        if (EVI_EXPIRE & sock->flags) {
352
0
          subscriber->reply_sock->expire = expire;
353
0
          subscriber->reply_sock->flags = sock->flags;
354
0
        }
355
0
        if (trans_mod->free)
356
0
          trans_mod->free(sock);
357
0
        else
358
0
          shm_free(sock);
359
0
        break;
360
0
      }
361
0
    }
362
0
    lock_release(event->lock);
363
0
  }
364
365
  /* if no socket matches - create a new one */
366
0
  if (!subscriber) {
367
0
    subscriber = shm_malloc(sizeof(evi_subs_t));
368
0
    if (!subscriber) {
369
0
      LM_ERR("no more shm memory\n");
370
0
      if (trans_mod && sock) {
371
        /* if the module has it's own free function */
372
0
        if (trans_mod->free)
373
0
          trans_mod->free(sock);
374
0
        else
375
0
          shm_free(sock);
376
0
      }
377
0
      return 0;
378
0
    }
379
380
0
    sock->subscription_time = time(0);
381
0
    subscriber->trans_mod = trans_mod;
382
0
    subscriber->reply_sock = sock;
383
384
0
    if (EVI_EXPIRE & sock->flags)
385
0
      subscriber->reply_sock->expire = expire;
386
0
    subscriber->reply_sock->flags |= trans_mod->flags;
387
388
    /* guard subscribers list */
389
0
    lock_get(event->lock);
390
0
    subscriber->next = event->subscribers;
391
0
    event->subscribers = subscriber;
392
0
    lock_release(event->lock);
393
0
    LM_DBG("added new subscriber for event %d\n", event->id);
394
0
  }
395
396
0
  return 1;
397
0
bad_param:
398
0
  return -1;
399
0
}
400
401
int evi_raise_script_event(struct sip_msg *msg, event_id_t id, void * _a, void * _v)
402
0
{
403
0
  pv_spec_p vals = (pv_spec_p)_v;
404
0
  pv_spec_p attrs = (pv_spec_p)_a;
405
0
  struct usr_avp *v_avp = NULL;
406
0
  struct usr_avp *a_avp = NULL;
407
0
  int err = evi_probe_event(id);
408
0
  int_str val, attr;
409
0
  str *at;
410
0
  evi_params_p params = NULL;
411
412
0
  if (err < 0)
413
0
    return err;
414
0
  else if (!err)
415
0
    return 1;
416
417
0
  if (!vals)
418
0
    goto raise;
419
0
  if (!(params = evi_get_params())) {
420
0
    LM_ERR("cannot create parameters list\n");
421
0
    goto raise;
422
0
  }
423
424
  /* handle parameters */
425
0
  while ((v_avp = search_first_avp(vals->pvp.pvn.u.isname.type,
426
0
          vals->pvp.pvn.u.isname.name.n, &val, v_avp))) {
427
0
    at = NULL;
428
    /* check attribute */
429
0
    if (attrs) {
430
0
      err = -1;
431
0
      a_avp = search_first_avp(attrs->pvp.pvn.u.isname.type,
432
0
          attrs->pvp.pvn.u.isname.name.n, &attr, a_avp);
433
0
      if (!a_avp) {
434
0
        LM_ERR("missing attribute\n");
435
0
        goto error;
436
0
      }
437
0
      if (!(a_avp->flags & AVP_VAL_STR)) {
438
0
        LM_ERR("invalid attribute name - must be string\n");
439
0
        goto error;
440
0
      }
441
0
      at = &attr.s;
442
0
    }
443
444
0
    if (v_avp->flags & AVP_VAL_STR)
445
0
      err = evi_param_add_str(params, at, &val.s);
446
0
    else
447
0
      err = evi_param_add_int(params, at, &val.n);
448
0
    if (err) {
449
0
      LM_ERR("error while adding parameter\n");
450
0
      goto error;
451
0
    }
452
0
  }
453
454
  /* check if there were too many attribute names */
455
0
  if (attrs && a_avp && search_first_avp(attrs->pvp.pvn.u.isname.type,
456
0
        attrs->pvp.pvn.u.isname.name.n, &attr, a_avp)) {
457
    /* only signal error - continue */
458
0
    LM_ERR("too many attribute names\n");
459
0
  }
460
461
0
raise:
462
0
  err = evi_raise_event_msg(msg, id, params);
463
0
  return err ? err : 1;
464
0
error:
465
0
  evi_free_params(params);
466
0
  return -1;
467
0
}
468
469
470
static mi_response_t *mi_event_subscribe(const mi_params_t *params, unsigned int expire)
471
0
{
472
0
  int ret;
473
0
  str event_name, transport_sock;
474
475
0
  if (get_mi_string_param(params, "event", &event_name.s,
476
0
    &event_name.len) < 0 || !event_name.s || !event_name.len)
477
0
    return init_mi_param_error();
478
479
0
  if (get_mi_string_param(params, "socket", &transport_sock.s,
480
0
    &transport_sock.len) < 0 || !transport_sock.s || !transport_sock.len)
481
0
    return init_mi_param_error();
482
483
0
  ret = evi_event_subscribe(event_name, transport_sock, expire, 1);
484
0
  if (ret < 0)
485
0
    return init_mi_error(400, MI_SSTR("Bad parameter value"));
486
487
0
  return ret ? init_mi_result_ok() : 0;
488
0
}
489
490
mi_response_t *w_mi_event_subscribe(const mi_params_t *params,
491
                struct mi_handler *async_hdl)
492
0
{
493
0
  return mi_event_subscribe(params, DEFAULT_EXPIRE);
494
0
}
495
496
mi_response_t *w_mi_event_subscribe_1(const mi_params_t *params,
497
                struct mi_handler *async_hdl)
498
0
{
499
0
  int expire;
500
501
0
  if (get_mi_int_param(params, "expire", &expire) < 0)
502
0
    return init_mi_param_error();
503
504
0
  if (expire < 0)
505
0
    return init_mi_error_extra(JSONRPC_INVAL_PARAMS_CODE,
506
0
      MI_SSTR(JSONRPC_INVAL_PARAMS_MSG),
507
0
      MI_SSTR("Negative expire value"));
508
509
0
  return mi_event_subscribe(params, expire);
510
0
}
511
512
513
/* used to list all the registered events */
514
mi_response_t *mi_events_list(const mi_params_t *params,
515
                struct mi_handler *async_hdl)
516
0
{
517
0
  mi_response_t *resp;
518
0
  mi_item_t *resp_obj;
519
0
  mi_item_t *events_arr, *event_item;
520
0
  unsigned i;
521
522
0
  resp = init_mi_result_object(&resp_obj);
523
0
  if (!resp)
524
0
    return 0;
525
526
0
  events_arr = add_mi_array(resp_obj, MI_SSTR("Events"));
527
0
  if (!events_arr) {
528
0
    free_mi_response(resp);
529
0
    return 0;
530
0
  }
531
532
0
  for (i = 0; i < events_no; i++) {
533
0
    event_item = add_mi_object(events_arr, NULL, 0);
534
0
    if (!event_item)
535
0
      goto error;
536
537
0
    if (add_mi_string(event_item, MI_SSTR("name"),
538
0
      events[i].name.s, events[i].name.len) < 0)
539
0
      goto error;
540
541
0
    if (add_mi_number(event_item, MI_SSTR("id"), events[i].id) < 0)
542
0
      goto error;
543
0
  }
544
545
0
  return resp;
546
547
0
error:
548
0
  free_mi_response(resp);
549
0
  return 0;
550
0
}
551
552
static int evi_print_subscriber(mi_item_t *subs_obj, evi_subs_p subs)
553
0
{
554
0
  evi_reply_sock *sock;
555
0
  str socket;
556
0
  long now;
557
0
  int expiry_countdown;
558
559
0
  if (!subs || !subs->trans_mod || !subs->trans_mod->print) {
560
0
    LM_ERR("subscriber does not have a print method exported\n");
561
0
    return -1;
562
0
  }
563
564
0
  sock = subs->reply_sock;
565
0
  if (!sock) {
566
0
    LM_DBG("no socket specified\n");
567
0
    if (add_mi_string(subs_obj, MI_SSTR("protocol"),
568
0
      subs->trans_mod->proto.s, subs->trans_mod->proto.len) < 0)
569
0
      return -1;
570
0
    return 0;
571
0
  }
572
573
0
  socket = subs->trans_mod->print(sock);
574
0
  LM_DBG("print subscriber socket <%.*s> %d\n",
575
0
      socket.len, socket.s, socket.len);
576
0
  if (add_mi_string_fmt(subs_obj, MI_SSTR("socket"), "%.*s:%.*s",
577
0
      subs->trans_mod->proto.len, subs->trans_mod->proto.s,
578
0
      socket.len, socket.s) < 0)
579
0
    return -1;
580
0
  if (sock->flags & EVI_EXPIRE) {
581
    /* indicate the expiration time used for the event*/
582
0
    if (add_mi_number(subs_obj, MI_SSTR("expire"), sock->expire) < 0)
583
0
        return -1;
584
0
    now = time(0);
585
    /* calculate the remaining time for the subscriber to be expired */
586
0
    expiry_countdown = sock->expire - (now - subs->reply_sock->subscription_time);
587
0
    if (expiry_countdown > 0) {
588
0
      if (add_mi_number(subs_obj, MI_SSTR("ttl"), expiry_countdown) < 0)
589
0
        return -1;
590
0
    }else{
591
      /* Mark event as expired if time-to-expire is reached*/
592
0
      if (add_mi_string(subs_obj, MI_SSTR("ttl"), MI_SSTR("expired")) < 0)
593
0
        return -1;
594
0
    }
595
0
  } else {
596
0
    if (add_mi_string(subs_obj, MI_SSTR("expire"), MI_SSTR("never")) < 0)
597
0
      return -1;
598
0
  }
599
  /* XXX - does subscription time make sense? */
600
601
0
  return 0;
602
0
}
603
604
static int evi_print_event(mi_item_t *ev_obj, evi_event_t *ev, evi_subs_p subs)
605
0
{
606
0
  mi_item_t *subs_arr, *subs_item;
607
608
  /* add event only if there are subscribers */
609
0
  if (!subs && !ev->subscribers)
610
0
    return 0;
611
612
0
  if (add_mi_string(ev_obj, MI_SSTR("name"), ev->name.s, ev->name.len) < 0)
613
0
    goto error;
614
615
0
  if (add_mi_number(ev_obj, MI_SSTR("id"), ev->id) < 0)
616
0
    goto error;
617
618
0
  if (subs) {
619
0
    subs_item = add_mi_object(ev_obj, MI_SSTR("Subscriber"));
620
0
    if (!subs_item)
621
0
      goto error;
622
623
0
    if (evi_print_subscriber(subs_item, subs) < 0) {
624
0
      LM_ERR("cannot print subscriber info\n");
625
0
      goto error;
626
0
    }
627
0
  } else {
628
0
    subs_arr = add_mi_array(ev_obj, MI_SSTR("Subscribers"));
629
0
    if (!subs_arr)
630
0
      goto error;
631
632
0
    for (subs = ev->subscribers; subs; subs = subs->next) {
633
0
      subs_item = add_mi_object(subs_arr, NULL, 0);
634
0
      if (!subs_item)
635
0
        goto error;
636
637
0
      if (evi_print_subscriber(subs_item, subs) < 0) {
638
0
        LM_ERR("cannot print subscriber info\n");
639
0
        goto error;
640
0
      }
641
0
    }
642
0
  }
643
0
  return 0;
644
645
0
error:
646
0
  return -1;
647
0
}
648
649
static evi_subs_p evi_get_subscriber(evi_event_p event, str sock_str)
650
0
{
651
0
  const evi_export_t * trans_mod;
652
0
  evi_subs_p subscriber = NULL;
653
0
  evi_reply_sock * sock;
654
655
  /* transport module name */
656
0
  trans_mod = get_trans_mod(&sock_str);
657
0
  if (!trans_mod) {
658
0
    LM_DBG("couldn't find a protocol to support %.*s\n",
659
0
        sock_str.len, sock_str.s);
660
0
    return NULL;
661
0
  }
662
0
  sock_str.s += trans_mod->proto.len + 1;
663
0
  sock_str.len -= (trans_mod->proto.len + 1);
664
665
  /* parse reply socket */
666
0
  sock = trans_mod->parse(sock_str);
667
0
  if (!sock)
668
0
    return NULL;
669
670
  /* tries to match other socket */
671
0
  if (trans_mod->match) {
672
0
    lock_get(event->lock);
673
0
    for (subscriber = event->subscribers; subscriber;
674
0
        subscriber = subscriber->next) {
675
0
      if (subscriber->trans_mod != trans_mod)
676
0
        continue;
677
0
      if (trans_mod->match(sock, subscriber->reply_sock)) {
678
0
        if (trans_mod->free)
679
0
          trans_mod->free(sock);
680
0
        else
681
0
          shm_free(sock);
682
0
        break;
683
0
      }
684
0
    }
685
0
    lock_release(event->lock);
686
0
  }
687
0
  return subscriber;
688
0
}
689
690
static mi_response_t *mi_subscribers_list(evi_event_p event, evi_subs_p subs)
691
0
{
692
0
  mi_response_t *resp;
693
0
  mi_item_t *resp_obj;
694
0
  mi_item_t *event_obj;
695
696
0
  resp = init_mi_result_object(&resp_obj);
697
0
  if (!resp)
698
0
    return 0;
699
700
0
  event_obj = add_mi_object(resp_obj, MI_SSTR("Event"));
701
0
  if (!event_obj)
702
0
    goto error;
703
704
0
  if (evi_print_event(event_obj, event, subs) < 0) {
705
0
    LM_ERR("cannot print event %.*s info\n",
706
0
      event->name.len, event->name.s);
707
0
    goto error;
708
0
  }
709
710
0
  return resp;
711
712
0
error:
713
0
  free_mi_response(resp);
714
0
  return NULL;
715
0
}
716
717
/* used to list all subscribers */
718
mi_response_t *w_mi_subscribers_list(const mi_params_t *params,
719
                struct mi_handler *async_hdl)
720
0
{
721
0
  mi_response_t *resp;
722
0
  mi_item_t *resp_obj;
723
0
  mi_item_t *events_arr, *event_item;
724
0
  int i;
725
726
0
  resp = init_mi_result_object(&resp_obj);
727
0
  if (!resp)
728
0
    return 0;
729
730
0
  events_arr = add_mi_array(resp_obj, MI_SSTR("Events"));
731
0
  if (!events_arr)
732
0
    goto error;
733
734
0
  for (i = 0; i < events_no; i++) {
735
    /* before printing the subs list check for any expired subscribers and remove them*/
736
0
    evi_remove_expired_subs(events[i].id);
737
0
    if (!events[i].subscribers)
738
0
      continue;
739
740
0
    event_item = add_mi_object(events_arr, NULL, 0);
741
0
    if (!event_item)
742
0
      goto error;
743
744
0
    if (evi_print_event(event_item, &events[i], NULL) < 0) {
745
0
      LM_ERR("cannot print event %.*s info\n",
746
0
        events[i].name.len, events[i].name.s);
747
0
      goto error;
748
0
    }
749
0
  }
750
751
0
  return resp;
752
753
0
error:
754
0
  free_mi_response(resp);
755
0
  return 0;
756
0
}
757
758
mi_response_t *w_mi_subscribers_list_1(const mi_params_t *params,
759
                struct mi_handler *async_hdl)
760
0
{
761
0
  str event_s;
762
0
  evi_event_p event;
763
0
  int evid;
764
0
  if (get_mi_string_param(params, "event", &event_s.s, &event_s.len) < 0)
765
0
    return init_mi_param_error();
766
767
0
  event = evi_get_event(&event_s);
768
0
  if (!event)
769
0
    return init_mi_error(404, MI_SSTR("Event not published"));
770
  /* get the event id & before printing the subs list check for any expired subscribers and remove them*/
771
0
  evid = evi_get_id(&event_s);
772
0
  evi_remove_expired_subs(evid);
773
774
0
  return mi_subscribers_list(event, NULL);
775
0
}
776
777
mi_response_t *w_mi_subscribers_list_2(const mi_params_t *params,
778
                struct mi_handler *async_hdl)
779
0
{
780
0
  str event_s;
781
0
  str subs_s;
782
0
  evi_event_p event;
783
0
  evi_subs_p subs;
784
0
  int evid;
785
0
  if (get_mi_string_param(params, "event", &event_s.s, &event_s.len) < 0)
786
0
    return init_mi_param_error();
787
788
0
  event = evi_get_event(&event_s);
789
0
  if (!event)
790
0
    return init_mi_error(404, MI_SSTR("Event not published"));
791
  /* get the event id & before printing the subs list check for any expired subscribers and remove them*/
792
0
  evid = evi_get_id(&event_s);
793
0
  evi_remove_expired_subs(evid);
794
795
0
  if (get_mi_string_param(params, "socket", &subs_s.s, &subs_s.len) < 0)
796
0
    return init_mi_param_error();
797
798
0
  subs = evi_get_subscriber(event, subs_s);
799
0
  if (!subs)
800
0
    return init_mi_error(404, MI_SSTR("Subscriber does not exist"));
801
802
0
  return mi_subscribers_list(event, subs);
803
0
}
804
805
evi_params_p mi_raise_event_json_params(str *params)
806
0
{
807
0
  int err;
808
0
  cJSON *param;
809
0
  cJSON *jparams;
810
0
  str name, jstring;
811
0
  evi_params_p eparams = NULL;
812
0
  char *tmp = pkg_malloc(params->len + 1);
813
0
  if (!tmp) {
814
0
    LM_ERR("could not create temporary buffer!\n");
815
0
    return NULL;
816
0
  }
817
0
  memcpy(tmp, params->s, params->len);
818
0
  tmp[params->len] = 0;
819
0
  jparams = cJSON_Parse(tmp);
820
0
  pkg_free(tmp);
821
0
  if (!jparams) {
822
0
    LM_DBG("could not parse json '%.*s'\n", params->len, params->s);
823
0
    return NULL;
824
0
  } else
825
0
    LM_DBG("treating params as json '%.*s'\n", params->len, params->s);
826
827
0
  if (!(jparams->type &cJSON_Object)) {
828
0
    LM_ERR("params json is not an object\n");
829
0
    return NULL;
830
0
  }
831
  /* parse params as json */
832
0
  if (!(eparams = evi_get_params())) {
833
0
    LM_ERR("cannot create parameters list\n");
834
0
    goto error;
835
0
  }
836
0
  for (param = jparams->child; param; param = param->next) {
837
0
    name.s = param->string;
838
0
    name.len = strlen(name.s);
839
0
    switch (param->type) {
840
0
      case cJSON_Number:
841
0
        err = evi_param_add_int(eparams, &name, &param->valueint);
842
0
        break;
843
0
      case cJSON_String:
844
0
        jstring.s = param->valuestring;
845
0
        jstring.len = strlen(jstring.s);
846
0
        err = evi_param_add_str(eparams, &name, &jstring);
847
0
        break;
848
0
      default:
849
0
        jstring.s = cJSON_PrintUnformatted(param);
850
0
        jstring.len = strlen(jstring.s);
851
0
        err = evi_param_add_str(eparams, &name, &jstring);
852
0
        cJSON_PurgeString(jstring.s);
853
0
        break;
854
0
    }
855
0
    if (err) {
856
0
      LM_ERR("could not add parameter %s\n", name.s);
857
0
      goto error_free;
858
0
    }
859
0
  }
860
0
  cJSON_Delete(jparams);
861
0
  return eparams;
862
0
error_free:
863
0
  evi_free_params(eparams);
864
0
error:
865
0
  cJSON_Delete(jparams);
866
0
  return NULL;
867
0
}
868
869
evi_params_p mi_raise_event_array_params(mi_item_t *array, int no)
870
0
{
871
0
  int i;
872
0
  str param;
873
0
  evi_params_p eparams = NULL;
874
875
0
  LM_DBG("treating params as array\n");
876
877
  /* parse params as json */
878
0
  if (!(eparams = evi_get_params())) {
879
0
    LM_ERR("cannot create parameters list\n");
880
0
    return NULL;
881
0
  }
882
883
0
  for (i = 0; i < no; i++) {
884
0
    if (get_mi_arr_param_string(array, i, &param.s, &param.len) < 0) {
885
0
      LM_ERR("cannot fetch array element %d\n", i);
886
0
      goto error;
887
0
    }
888
0
    if (evi_param_add_str(eparams, NULL, &param)) {
889
0
      LM_ERR("cannot add new params %d\n", i);
890
0
      goto error;
891
0
    }
892
0
  }
893
894
0
  return eparams;
895
0
error:
896
0
  evi_free_params(eparams);
897
0
  return NULL;
898
0
}
899
900
struct rpc_raise_event_dispatch {
901
  event_id_t id;
902
  evi_params_p params;
903
};
904
905
906
void rpc_raise_event(int sender, void *param)
907
0
{
908
0
  struct rpc_raise_event_dispatch *p = (struct rpc_raise_event_dispatch *)param;
909
0
  if (evi_raise_event(p->id, p->params))
910
0
    LM_ERR("cannot raise event RPC\n");
911
0
  evi_free_shm_params(p->params);
912
0
  shm_free(p);
913
0
}
914
915
916
int evi_dispatch_event( event_id_t id, evi_params_p params)
917
0
{
918
0
  evi_params_p sparams = NULL;
919
0
  struct rpc_raise_event_dispatch *djob;
920
921
0
  if (params) {
922
0
    sparams = evi_dup_shm_params(params);
923
0
    evi_free_params(params);
924
0
    if (!sparams) {
925
0
      LM_ERR("could not shm duplicate evi params!\n");
926
0
      goto error;
927
0
    }
928
0
  }
929
930
0
  djob = shm_malloc(sizeof (*djob));
931
0
  if (!djob) {
932
0
    LM_ERR("could not allocate new job!\n");
933
0
    goto error;
934
0
  }
935
0
  djob->id = id;
936
0
  djob->params = sparams;
937
938
0
  if (ipc_dispatch_rpc(rpc_raise_event, djob) < 0) {
939
0
    LM_ERR("could not dispatch raise event job!\n");
940
0
    goto error;
941
0
  }
942
943
0
  return 0;
944
0
error:
945
0
  if (sparams)
946
0
    evi_free_shm_params(sparams);
947
0
  return -1;
948
0
}
949
950
951
mi_response_t *w_mi_raise_event(const mi_params_t *params,
952
                struct mi_handler *async_hdl)
953
0
{
954
0
  int no;
955
0
  str event_s;
956
0
  str tparams;
957
0
  event_id_t id;
958
0
  mi_item_t *values;
959
0
  evi_params_p eparams = NULL;
960
961
0
  if (get_mi_string_param(params, "event", &event_s.s, &event_s.len) < 0)
962
0
    return init_mi_param_error();
963
964
0
  id = evi_get_id(&event_s);
965
0
  if (id == EVI_ERROR)
966
0
    return init_mi_error(404, MI_SSTR("Event not registered"));
967
968
  /* check if there are any subscribers */
969
0
  if (!evi_probe_event(id))
970
0
    return init_mi_error(480, MI_SSTR("Temporarily Unavailable"));
971
972
  /* check to see if we have an array params, or key-value one */
973
0
  switch (try_get_mi_array_param(params, "params", &values, &no)) {
974
0
    case -1:
975
0
    case -3:
976
      /* no params used */
977
0
      break;
978
0
    case -2:
979
      /* not an array - most likely it's a string */
980
0
      if (get_mi_string_param(params, "params", &tparams.s, &tparams.len) < 0)
981
0
        return init_mi_error(400, MI_SSTR("No Params"));
982
0
      eparams = mi_raise_event_json_params(&tparams);
983
0
      if (!eparams)
984
0
        return init_mi_error(400, MI_SSTR("Bad Params"));
985
0
      break;
986
0
    case 0:
987
      /* this is an array - push it like this */
988
0
      eparams = mi_raise_event_array_params(values, no);
989
0
      if (!eparams)
990
0
        return init_mi_error(400, MI_SSTR("Bad Params"));
991
0
      break;
992
0
  }
993
994
0
  if (evi_dispatch_event( id, eparams)<0) {
995
0
    LM_ERR("could not dispatch raise event job!\n");
996
0
    goto error;
997
0
  }
998
999
0
  return init_mi_result_ok();
1000
0
error:
1001
0
  if (eparams)
1002
0
    evi_free_params(eparams);
1003
0
  return init_mi_error(500, MI_SSTR("Cannot Raise Event"));
1004
0
}