Coverage Report

Created: 2025-07-12 06:13

/src/opensips/net/net_udp.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2014-2015 OpenSIPS Foundation
3
 * Copyright (C) 2001-2003 FhG Fokus
4
 *
5
 * This file is part of opensips, a free SIP server.
6
 *
7
 * opensips is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version
11
 *
12
 * opensips is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
20
 *
21
 *
22
 * History:
23
 * -------
24
 *  2015-02-09  first version (bogdan)
25
 */
26
27
28
#include <unistd.h>
29
30
#include "../ipc.h"
31
#include "../daemonize.h"
32
#include "../reactor.h"
33
#include "../timer.h"
34
#include "../pt_load.h"
35
#include "../cfg_reload.h"
36
#include "net_udp.h"
37
38
39
#define UDP_SELECT_TIMEOUT  1
40
41
/* if the UDP network layer is used or not by some protos */
42
static int udp_disabled = 1;
43
44
extern void handle_sigs(void);
45
46
/* initializes the UDP network layer */
47
int udp_init(void)
48
0
{
49
0
  unsigned int i;
50
51
  /* first we do auto-detection to see if there are any UDP based
52
   * protocols loaded */
53
0
  for ( i=PROTO_FIRST ; i<PROTO_LAST ; i++ )
54
0
    if (is_udp_based_proto(i)) {udp_disabled=0;break;}
55
56
0
  return 0;
57
0
}
58
59
/* destroys the UDP network layer */
60
void udp_destroy(void)
61
0
{
62
0
  return;
63
0
}
64
65
/* tells how many processes the UDP layer will create */
66
int udp_count_processes(unsigned int *extra)
67
0
{
68
0
  struct socket_info_full *sif;
69
0
  unsigned int n, e, i;
70
71
0
  if (udp_disabled) {
72
0
    if (extra) *extra = 0;
73
0
    return 0;
74
0
  }
75
76
0
  for( i=0,n=0,e=0 ; i<PROTO_LAST ; i++)
77
0
    if (protos[i].id!=PROTO_NONE && is_udp_based_proto(i))
78
0
      for( sif=protos[i].listeners ; sif; sif=sif->next) {
79
0
        const struct socket_info *si = &sif->socket_info;
80
0
        n+=si->workers;
81
0
        if (si->s_profile)
82
0
          if (si->s_profile->max_procs > si->workers)
83
0
            e+=si->s_profile->max_procs-si->workers;
84
0
      }
85
86
0
  if (extra) *extra = e;
87
0
  return n;
88
0
}
89
90
#ifdef USE_MCAST
91
/**
92
 * Setup a multicast receiver socket, supports IPv4 and IPv6.
93
 * \param sock socket
94
 * \param addr receiver address
95
 * \return zero on success, -1 otherwise
96
 */
97
static int setup_mcast_rcvr(int sock, union sockaddr_union* addr)
98
{
99
  struct ip_mreq mreq;
100
  struct ipv6_mreq mreq6;
101
102
  if (addr->s.sa_family==AF_INET){
103
    memcpy(&mreq.imr_multiaddr, &addr->sin.sin_addr,
104
           sizeof(struct in_addr));
105
    mreq.imr_interface.s_addr = htonl(INADDR_ANY);
106
107
    if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,&mreq,
108
             sizeof(mreq))==-1){
109
      LM_ERR("setsockopt: %s\n", strerror(errno));
110
      return -1;
111
    }
112
  } else if (addr->s.sa_family==AF_INET6){
113
    memcpy(&mreq6.ipv6mr_multiaddr, &addr->sin6.sin6_addr,
114
           sizeof(struct in6_addr));
115
    mreq6.ipv6mr_interface = 0;
116
#ifdef __OS_linux
117
    if (setsockopt(sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6,
118
#else
119
    if (setsockopt(sock, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq6,
120
#endif
121
             sizeof(mreq6))==-1){
122
      LM_ERR("setsockopt:%s\n",  strerror(errno));
123
      return -1;
124
    }
125
  } else {
126
    LM_ERR("unsupported protocol family\n");
127
    return -1;
128
  }
129
  return 0;
130
}
131
132
#endif /* USE_MCAST */
133
134
135
/**
136
 * Initialize a UDP socket, supports multicast, IPv4 and IPv6.
137
 * \param si socket that should be bind
138
 * \return zero on success, -1 otherwise
139
 *
140
 * @status_flags - extra status flags to be set for the socket fd
141
 */
142
int udp_init_listener(struct socket_info *si, int status_flags)
143
0
{
144
0
  union sockaddr_union* addr;
145
0
  int optval;
146
#ifdef USE_MCAST
147
  unsigned char m_optval;
148
#endif
149
150
0
  addr=&si->su;
151
0
  if (init_su(addr, &si->address, si->port_no)<0){
152
0
    LM_ERR("could not init sockaddr_union\n");
153
0
    goto error;
154
0
  }
155
156
0
  si->socket = socket(AF2PF(addr->s.sa_family), SOCK_DGRAM, 0);
157
0
  if (si->socket==-1){
158
0
    LM_ERR("socket: %s\n", strerror(errno));
159
0
    goto error;
160
0
  }
161
162
  /* make socket non-blocking */
163
0
  if (status_flags) {
164
0
    optval=fcntl(si->socket, F_GETFL);
165
0
    if (optval==-1){
166
0
      LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
167
0
      goto error;
168
0
    }
169
0
    if (fcntl(si->socket,F_SETFL,optval|status_flags)==-1){
170
0
      LM_ERR("set non-blocking failed: (%d) %s\n",
171
0
        errno, strerror(errno));
172
0
      goto error;
173
0
    }
174
0
  }
175
176
  /* set sock opts? */
177
0
  optval=1;
178
0
  if (setsockopt(si->socket, SOL_SOCKET, SO_REUSEADDR ,
179
0
          (void*)&optval, sizeof(optval)) ==-1){
180
0
    LM_ERR("setsockopt: %s\n", strerror(errno));
181
0
    goto error;
182
0
  }
183
184
0
  if (si->flags & SI_REUSEPORT) {
185
0
    optval=1;
186
0
    if (setsockopt(si->socket, SOL_SOCKET, SO_REUSEPORT ,
187
0
            (void*)&optval, sizeof(optval)) ==-1){
188
0
      LM_ERR("setsockopt: %s\n", strerror(errno));
189
0
      goto error;
190
0
    }
191
0
  }
192
193
0
  if (si->flags & SI_FRAG) {
194
    /* no DF */
195
0
#if defined(IP_MTU_DISCOVER)
196
0
    optval = IP_PMTUDISC_DONT;
197
0
    setsockopt(si->socket, IPPROTO_IP, IP_MTU_DISCOVER, (void*)&optval, sizeof(optval));
198
#else
199
#if defined(IP_DONTFRAG)
200
    optval = 1;
201
    setsockopt(si->socket, IPPROTO_IP, IP_DONTFRAG, (void*)&optval, sizeof(optval));
202
#else
203
    LM_ERR("DF flag is not supported by your system\n");
204
    goto error;
205
#endif
206
#endif
207
0
  }
208
209
  /* tos */
210
0
  optval = (si->tos > 0) ? si->tos : tos;
211
0
  if (optval > 0) {
212
0
    if (addr->s.sa_family==AF_INET6){
213
0
      if (setsockopt(si->socket,  IPPROTO_IPV6, IPV6_TCLASS, (void*)&optval, sizeof(optval)) ==-1){
214
0
        LM_WARN("setsockopt tos for IPV6: %s\n", strerror(errno));
215
        /* continue since this is not critical */
216
0
      }
217
0
    } else {
218
0
      if (setsockopt(si->socket, IPPROTO_IP, IP_TOS, (void*)&optval, sizeof(optval)) ==-1){
219
0
        LM_WARN("setsockopt tos: %s\n", strerror(errno));
220
        /* continue since this is not critical */
221
0
      }
222
0
    }
223
0
  }
224
#if defined (__linux__) && defined(UDP_ERRORS)
225
  optval=1;
226
  /* enable error receiving on unconnected sockets */
227
  if(setsockopt(si->socket, SOL_IP, IP_RECVERR,
228
          (void*)&optval, sizeof(optval)) ==-1){
229
    LM_ERR("setsockopt: %s\n", strerror(errno));
230
    goto error;
231
  }
232
#endif
233
234
#ifdef USE_MCAST
235
  if ((si->flags & SI_IS_MCAST)
236
      && (setup_mcast_rcvr(si->socket, addr)<0)){
237
      goto error;
238
  }
239
  /* set the multicast options */
240
  if (addr->s.sa_family==AF_INET){
241
    m_optval = mcast_loopback;
242
    if (setsockopt(si->socket, IPPROTO_IP, IP_MULTICAST_LOOP,
243
            &m_optval, sizeof(m_optval))==-1){
244
      LM_WARN("setsockopt(IP_MULTICAST_LOOP): %s\n", strerror(errno));
245
      /* it's only a warning because we might get this error if the
246
        network interface doesn't support multicasting */
247
    }
248
    if (mcast_ttl>=0){
249
      m_optval = mcast_ttl;
250
      if (setsockopt(si->socket, IPPROTO_IP, IP_MULTICAST_TTL,
251
            &m_optval, sizeof(m_optval))==-1){
252
        LM_ERR("setsockopt (IP_MULTICAST_TTL): %s\n", strerror(errno));
253
        goto error;
254
      }
255
    }
256
  } else if (addr->s.sa_family==AF_INET6){
257
    if (setsockopt(si->socket, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
258
            &mcast_loopback, sizeof(mcast_loopback))==-1){
259
      LM_WARN("setsockopt (IPV6_MULTICAST_LOOP): %s\n", strerror(errno));
260
      /* it's only a warning because we might get this error if the
261
        network interface doesn't support multicasting */
262
    }
263
    if (mcast_ttl>=0){
264
      if (setsockopt(si->socket, IPPROTO_IP, IPV6_MULTICAST_HOPS,
265
            &mcast_ttl, sizeof(mcast_ttl))==-1){
266
        LM_ERR("setssckopt (IPV6_MULTICAST_HOPS): %s\n",
267
            strerror(errno));
268
        goto error;
269
      }
270
    }
271
  } else {
272
    LM_ERR("unsupported protocol family %d\n", addr->s.sa_family);
273
    goto error;
274
  }
275
#endif /* USE_MCAST */
276
277
0
  if (probe_max_sock_buff(si->socket,0,MAX_RECV_BUFFER_SIZE,
278
0
        BUFFER_INCREMENT)==-1) goto error;
279
280
0
  return 0;
281
282
0
error:
283
0
  return -1;
284
0
}
285
286
int udp_bind_listener(struct socket_info *si)
287
0
{
288
0
  union sockaddr_union* addr = &si->su;
289
0
  if (bind(si->socket,  &addr->s, sockaddru_len(*addr))==-1){
290
0
    LM_ERR("bind(%x, %p, %d) on %s: %s\n", si->socket, &addr->s,
291
0
        (unsigned)sockaddru_len(*addr),  si->address_str.s,
292
0
        strerror(errno));
293
0
    if (addr->s.sa_family==AF_INET6)
294
0
      LM_ERR("might be caused by using a link "
295
0
          " local address, try site local or global\n");
296
0
    return -1;
297
0
  }
298
0
  return 0;
299
0
}
300
301
302
inline static int handle_io(struct fd_map* fm, int idx,int event_type)
303
0
{
304
0
  int n = 0;
305
0
  int read;
306
307
0
  pt_become_active();
308
309
0
  pre_run_handle_script_reload(fm->app_flags);
310
311
0
  switch(fm->type){
312
0
    case F_UDP_READ:
313
0
      n = protos[((struct socket_info*)fm->data)->proto].net.
314
0
        dgram.read( fm->data /*si*/, &read);
315
0
      break;
316
0
    case F_TIMER_JOB:
317
0
      handle_timer_job();
318
0
      break;
319
0
    case F_SCRIPT_ASYNC:
320
0
      async_script_resume_f( fm->fd, fm->data,
321
0
        (event_type==IO_WATCH_TIMEOUT)?1:0 );
322
0
      break;
323
0
    case F_FD_ASYNC:
324
0
      async_fd_resume( fm->fd, fm->data);
325
0
      break;
326
0
    case F_LAUNCH_ASYNC:
327
0
      async_launch_resume( fm->fd, fm->data);
328
0
      break;
329
0
    case F_IPC:
330
0
      ipc_handle_job(fm->fd);
331
0
      break;
332
0
    default:
333
0
      LM_CRIT("unknown fd type %d in UDP worker\n", fm->type);
334
0
      n = -1;
335
0
      break;
336
0
  }
337
338
0
  if (reactor_is_empty() && _termination_in_progress==1) {
339
0
    LM_WARN("reactor got empty while termination in progress\n");
340
0
    ipc_handle_all_pending_jobs(IPC_FD_READ_SELF);
341
0
    if (reactor_is_empty())
342
0
      dynamic_process_final_exit();
343
0
  }
344
345
0
  post_run_handle_script_reload();
346
347
0
  pt_become_idle();
348
0
  return n;
349
0
}
350
351
352
int udp_proc_reactor_init( struct socket_info *si )
353
0
{
354
355
  /* create the reactor for UDP proc */
356
0
  if ( init_worker_reactor( "UDP_worker", RCT_PRIO_MAX)<0 ) {
357
0
    LM_ERR("failed to init reactor\n");
358
0
    goto error;
359
0
  }
360
361
  /* init: start watching for the timer jobs */
362
0
  if (reactor_add_reader( timer_fd_out, F_TIMER_JOB, RCT_PRIO_TIMER,NULL)<0){
363
0
    LM_CRIT("failed to add timer pipe_out to reactor\n");
364
0
    goto error;
365
0
  }
366
367
  /* init: start watching for the IPC jobs */
368
0
  if (reactor_add_reader(IPC_FD_READ_SELF, F_IPC, RCT_PRIO_ASYNC, NULL)<0){
369
0
    LM_CRIT("failed to add IPC pipe to reactor\n");
370
0
    goto error;
371
0
  }
372
373
  /* init: start watching for IPC "dispatched" jobs */
374
0
  if (reactor_add_reader(IPC_FD_READ_SHARED, F_IPC, RCT_PRIO_ASYNC, NULL)<0){
375
0
    LM_CRIT("failed to add IPC shared pipe to reactor\n");
376
0
    return -1;
377
0
  }
378
379
  /* init: start watching the SIP UDP fd */
380
0
  if (reactor_add_reader( si->socket, F_UDP_READ, RCT_PRIO_NET, si)<0) {
381
0
    LM_CRIT("failed to add UDP listen socket to reactor\n");
382
0
    goto error;
383
0
  }
384
385
0
  return 0;
386
0
error:
387
0
  destroy_worker_reactor();
388
0
  return -1;
389
0
}
390
391
392
static int fork_dynamic_udp_process(void *si_filter)
393
0
{
394
0
  struct socket_info *si = (struct socket_info*)si_filter;
395
0
  int p_id;
396
0
  const struct internal_fork_params ifp_udp_rcv = {
397
0
    .proc_desc = "UDP receiver",
398
0
    .flags = OSS_PROC_DYNAMIC|OSS_PROC_NEEDS_SCRIPT,
399
0
    .type = TYPE_UDP,
400
0
  };
401
402
0
  if ((p_id=internal_fork(&ifp_udp_rcv))<0) {
403
0
    LM_CRIT("cannot fork UDP process\n");
404
0
    return(-1);
405
0
  } else if (p_id==0) {
406
    /* new UDP process */
407
    /* set a more detailed description */
408
0
    set_proc_attrs("SIP receiver %.*s",
409
0
      si->sock_str.len, si->sock_str.s);
410
0
    pt[process_no].pg_filter = si;
411
0
    bind_address=si; /* shortcut */
412
    /* we first need to init the reactor to be able to add fd
413
     * into it in child_init routines */
414
0
    if (udp_proc_reactor_init(si) < 0 ||
415
0
    init_child(10000/*FIXME*/) < 0) {
416
0
      goto error;
417
0
    }
418
0
    report_conditional_status( 1, 0); /*report success*/
419
    /* the child proc is done read&write) dealing with the status pipe */
420
0
    clean_read_pipeend();
421
422
0
    reactor_main_loop(UDP_SELECT_TIMEOUT, error, );
423
0
    destroy_worker_reactor();
424
0
error:
425
0
    report_failure_status();
426
0
    LM_ERR("Initializing new process failed, exiting with error \n");
427
0
    pt[process_no].flags |= OSS_PROC_SELFEXIT;
428
0
    exit( -1);
429
0
  } else {
430
    /*parent/main*/
431
0
    return p_id;
432
0
  }
433
0
}
434
435
436
static void udp_process_graceful_terminate(int sender, void *param)
437
0
{
438
  /* we accept this only from the main proccess */
439
0
  if (sender!=0) {
440
0
    LM_BUG("graceful terminate received from a non-main process!!\n");
441
0
    return;
442
0
  }
443
0
  LM_NOTICE("process %d received RPC to terminate from Main\n",process_no);
444
445
  /*remove from reactor all the shared fds, so we stop reading from them */
446
447
  /*remove timer jobs pipe */
448
0
  reactor_del_reader( timer_fd_out, -1, 0);
449
450
  /*remove IPC dispatcher pipe */
451
0
  reactor_del_reader( IPC_FD_READ_SHARED, -1, 0);
452
453
  /*remove network interface */
454
0
  reactor_del_reader( bind_address->socket, -1, 0);
455
456
  /*remove private IPC pipe */
457
0
  reactor_del_reader( IPC_FD_READ_SELF, -1, 0);
458
459
  /* let's drain the private IPC */
460
0
  ipc_handle_all_pending_jobs(IPC_FD_READ_SELF);
461
462
  /* what is left now is the reactor are async fd's, so we need to 
463
   * wait to complete all of them */
464
0
  if (reactor_is_empty())
465
0
    dynamic_process_final_exit();
466
467
  /* the exit will be triggered by the reactor, when empty */
468
0
  _termination_in_progress = 1;
469
0
  LM_INFO("reactor not empty, waiting for pending async\n");
470
0
}
471
472
473
/* starts all UDP related processes */
474
int udp_start_processes(int *chd_rank, int *startup_done)
475
0
{
476
0
  struct socket_info_full *sif;
477
0
  int p_id;
478
0
  int i,p;
479
0
  const struct internal_fork_params ifp_udp_rcv = {
480
0
    .proc_desc = "UDP receiver",
481
0
    .flags = OSS_PROC_NEEDS_SCRIPT,
482
0
    .type = TYPE_UDP,
483
0
  };
484
485
0
  if (udp_disabled)
486
0
    return 0;
487
488
0
  for( p=PROTO_FIRST ; p<PROTO_LAST ; p++ ) {
489
0
    if ( !is_udp_based_proto(p) )
490
0
      continue;
491
492
0
    for( sif=protos[p].listeners; sif ; sif=sif->next ) {
493
0
      struct socket_info* si = &sif->socket_info;
494
495
0
      if ( auto_scaling_enabled && si->s_profile &&
496
0
      create_process_group( TYPE_UDP, si, si->s_profile,
497
0
      fork_dynamic_udp_process, udp_process_graceful_terminate)!=0)
498
0
        LM_ERR("failed to create group of UDP processes for <%.*s>, "
499
0
          "auto forking will not be possible\n",
500
0
          si->name.len, si->name.s);
501
502
0
      for (i=0;i<si->workers;i++) {
503
0
        (*chd_rank)++;
504
0
        if ( (p_id=internal_fork(&ifp_udp_rcv))<0 ) {
505
0
          LM_CRIT("cannot fork UDP process\n");
506
0
          goto error;
507
0
        } else if (p_id==0) {
508
          /* new UDP process */
509
          /* set a more detailed description */
510
0
          set_proc_attrs("SIP receiver %.*s",
511
0
            si->sock_str.len, si->sock_str.s);
512
0
          pt[process_no].pg_filter = si;
513
0
          bind_address=si; /* shortcut */
514
          /* we first need to init the reactor to be able to add fd
515
           * into it in child_init routines */
516
0
          if (udp_proc_reactor_init(si) < 0 ||
517
0
              init_child(*chd_rank) < 0) {
518
0
            report_failure_status();
519
0
            if (*chd_rank == 1 && startup_done)
520
0
              *startup_done = -1;
521
0
            exit(-1);
522
0
          }
523
524
          /* first UDP proc runs statup_route (if defined) */
525
0
          if(*chd_rank == 1 && startup_done!=NULL) {
526
0
            LM_DBG("running startup for first UDP\n");
527
0
            if(run_startup_route()< 0) {
528
0
              report_failure_status();
529
0
              *startup_done = -1;
530
0
              LM_ERR("Startup route processing failed\n");
531
0
              exit(-1);
532
0
            }
533
0
            *startup_done = 1;
534
0
          }
535
536
0
          report_conditional_status( (!no_daemon_mode), 0);
537
538
          /**
539
           * Main UDP receiver loop, processes data from the
540
           * network, does some error checking and save it in an
541
           * allocated buffer. This data is then forwarded to the
542
           * receive_msg function. If an dynamic buffer is used, the
543
           * buffer must be freed in later steps.
544
           * \see receive_msg
545
           * \see main_loop
546
           */
547
0
          reactor_main_loop(UDP_SELECT_TIMEOUT, error, );
548
0
          destroy_worker_reactor();
549
0
          exit(-1);
550
0
        } else {
551
          /*parent*/
552
          /* wait for first proc to finish the startup route */
553
0
          if (*chd_rank == 1 && startup_done)
554
0
            while(!(*startup_done)) {
555
0
              usleep(5);
556
0
              handle_sigs();
557
0
            }
558
0
        }
559
0
      } /* procs per listener */
560
0
    } /* looping through the listeners per proto */
561
0
  } /* looping through the available protos */
562
563
0
  return 0;
564
0
error:
565
0
  return -1;
566
0
}
567