Coverage Report

Created: 2026-02-26 07:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/opensips/net/net_tcp.c
Line
Count
Source
1
/*
2
 * Copyright (C) 2014-2015 OpenSIPS Project
3
 * Copyright (C) 2001-2003 FhG Fokus
4
 *
5
 * This file is part of opensips, a free SIP server.
6
 *
7
 * opensips is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version.
11
 *
12
 * opensips is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20
 *
21
 *
22
 * history:
23
 * ---------
24
 *  2015-01-xx  created (razvanc)
25
 */
26
27
#include <sys/types.h>
28
#include <sys/socket.h>
29
#include <netinet/in.h>
30
#include <netinet/in_systm.h>
31
#include <netinet/ip.h>
32
#include <netinet/tcp.h>
33
#include <sys/uio.h>  /* writev*/
34
#include <netdb.h>
35
#include <stdlib.h> /*exit() */
36
#include <time.h>   /*time() */
37
#include <unistd.h>
38
#include <errno.h>
39
#include <string.h>
40
41
#include "../mem/mem.h"
42
#include "../mem/shm_mem.h"
43
#include "../globals.h"
44
#include "../locking.h"
45
#include "../socket_info.h"
46
#include "../ut.h"
47
#include "../pt.h"
48
#include "../pt_load.h"
49
#include "../daemonize.h"
50
#include "../status_report.h"
51
#include "../reactor.h"
52
#include "../timer.h"
53
#include "../ipc.h"
54
55
#include "tcp_passfd.h"
56
#include "net_tcp_proc.h"
57
#include "net_tcp_report.h"
58
#include "net_tcp.h"
59
#include "tcp_conn.h"
60
#include "tcp_conn_profile.h"
61
#include "trans.h"
62
#include "net_tcp_dbg.h"
63
64
struct struct_hist_list *con_hist;
65
66
enum tcp_worker_state { STATE_INACTIVE=0, STATE_ACTIVE, STATE_DRAINING};
67
68
/* definition of a TCP worker - the array of these TCP workers is
69
 * mainly intended to be used by the TCP main, to keep track of the
70
 * workers, about their load and so. Nevertheless, since the addition
71
 * of the process auto-scaling, other processes may need access to this
72
 * data, thus it's relocation in SHM (versus initial PKG). For example,
73
 * the attendant process is the one forking new TCP workers (scaling up),
74
 * so it must be able to set the ENABLE state for the TCP worker (and being
75
 * (seen by the TCP main proc). Similar, when a TCP worker shuts down, it has
76
 * to mark itself as DISABLED and the TCP main must see that.
77
 * Again, 99% this array is intended for TCP Main ops, it is not lock
78
 * protected, so be very careful with any ops from other procs.
79
 */
80
struct tcp_worker {
81
  pid_t pid;
82
  int unix_sock;    /*!< Main-Worker comm, worker end */
83
  int main_unix_sock; /*!< Main-Worker comm, TCP Main end */
84
  int pt_idx;     /*!< Index in the main Process Table */
85
  enum tcp_worker_state state;
86
  int n_reqs;   /*!< number of requests serviced so far */
87
};
88
89
/* definition of a TCP partition */
90
struct tcp_partition {
91
  /*! \brief connection hash table (after ip&port), includes also aliases */
92
  struct tcp_conn_alias** tcpconn_aliases_hash;
93
  /*! \brief connection hash table (after connection id) */
94
  struct tcp_connection** tcpconn_id_hash;
95
  gen_lock_t* tcpconn_lock;
96
};
97
98
99
/* array of TCP workers - to be used only by TCP MAIN */
100
struct tcp_worker *tcp_workers=0;
101
102
/* unique for each connection, used for
103
 * quickly finding the corresponding connection for a reply */
104
static unsigned int* connection_id=0;
105
106
/* array of TCP partitions */
107
static struct tcp_partition tcp_parts[TCP_PARTITION_SIZE];
108
109
/*!< tcp protocol number as returned by getprotobyname */
110
static int tcp_proto_no=-1;
111
112
/* communication socket from generic proc to TCP main */
113
int unix_tcp_sock = -1;
114
115
/*!< current number of open connections */
116
static int tcp_connections_no = 0;
117
118
/*!< by default don't accept aliases */
119
int tcp_accept_aliases=0;
120
int tcp_connect_timeout=DEFAULT_TCP_CONNECT_TIMEOUT;
121
int tcp_con_lifetime=DEFAULT_TCP_CONNECTION_LIFETIME;
122
int tcp_socket_backlog=DEFAULT_TCP_SOCKET_BACKLOG;
123
/*!< by default choose the best method */
124
enum poll_types tcp_poll_method=0;
125
int tcp_max_connections=DEFAULT_TCP_MAX_CONNECTIONS;
126
/* the configured/starting number of TCP workers */
127
int tcp_workers_no = UDP_WORKERS_NO;
128
/* the maximum numbers of TCP workers */
129
int tcp_workers_max_no;
130
/* the name of the auto-scaling profile (optional) */
131
char* tcp_auto_scaling_profile = NULL;
132
/* Max number of seconds that we except a full SIP message
133
 * to arrive in - anything above will lead to the connection to closed */
134
int tcp_max_msg_time = TCP_CHILD_MAX_MSG_TIME;
135
/* If the data reading may be performed across different workers (still
136
 * serial) or by a single worker (the TCP conns sticks to one worker) */
137
int tcp_parallel_read_on_workers = 0;
138
139
#ifdef HAVE_SO_KEEPALIVE
140
    int tcp_keepalive = 1;
141
#else
142
    int tcp_keepalive = 0;
143
#endif
144
int tcp_keepcount = 0;
145
int tcp_keepidle = 0;
146
int tcp_keepinterval = 0;
147
148
/*!< should we allow opening a new TCP conn when sending data 
149
 * over UAC branches? - branch flag to be set in the SIP messages */
150
int tcp_no_new_conn_bflag = 0;
151
/*!< should we allow opening a new TCP conn when sending data 
152
 * back to UAS (replies)? - msg flag to be set in the SIP messages */
153
int tcp_no_new_conn_rplflag = 0;
154
/*!< should a new TCP conn be open if needed? - variable used to used for
155
 * signalizing between SIP layer (branch flag) and TCP layer (tcp_send func)*/
156
int tcp_no_new_conn = 0;
157
158
/* if the TCP net layer is on or off (if no TCP based protos are loaded) */
159
static int tcp_disabled = 1;
160
161
/* is the process TCP MAIN ? */
162
int is_tcp_main = 0;
163
164
/* the ID of the TCP conn used for the last send operation in the
165
 * current process - attention, this is a really ugly HACK here */
166
unsigned int last_outgoing_tcp_id = 0;
167
168
static struct scaling_profile *s_profile = NULL;
169
170
/****************************** helper functions *****************************/
171
extern void handle_sigs(void);
172
173
static inline int init_sock_keepalive(int s, const struct tcp_conn_profile *prof)
174
0
{
175
0
  int ka;
176
0
#if defined(HAVE_TCP_KEEPINTVL) || defined(HAVE_TCP_KEEPIDLE) || defined(HAVE_TCP_KEEPCNT)
177
0
  int optval;
178
0
#endif
179
180
0
  if (prof->keepinterval || prof->keepidle || prof->keepcount)
181
0
    ka = 1; /* force on */
182
0
  else
183
0
    ka = prof->keepalive;
184
185
0
#ifdef HAVE_SO_KEEPALIVE
186
0
  if (setsockopt(s,SOL_SOCKET,SO_KEEPALIVE,&ka,sizeof(ka))<0){
187
0
    LM_WARN("setsockopt failed to enable SO_KEEPALIVE: %s\n",
188
0
      strerror(errno));
189
0
    return -1;
190
0
  }
191
0
  LM_DBG("TCP keepalive enabled on socket %d\n",s);
192
0
#endif
193
0
#ifdef HAVE_TCP_KEEPINTVL
194
0
  if ((optval = prof->keepinterval)) {
195
0
    if (setsockopt(s,IPPROTO_TCP,TCP_KEEPINTVL,&optval,sizeof(optval))<0){
196
0
      LM_WARN("setsockopt failed to set keepalive probes interval: %s\n",
197
0
        strerror(errno));
198
0
    }
199
0
  }
200
0
#endif
201
0
#ifdef HAVE_TCP_KEEPIDLE
202
0
  if ((optval = prof->keepidle)) {
203
0
    if (setsockopt(s,IPPROTO_TCP,TCP_KEEPIDLE,&optval,sizeof(optval))<0){
204
0
      LM_WARN("setsockopt failed to set keepalive idle interval: %s\n",
205
0
        strerror(errno));
206
0
    }
207
0
  }
208
0
#endif
209
0
#ifdef HAVE_TCP_KEEPCNT
210
0
  if ((optval = prof->keepcount)) {
211
0
    if (setsockopt(s,IPPROTO_TCP,TCP_KEEPCNT,&optval,sizeof(optval))<0){
212
0
      LM_WARN("setsockopt failed to set maximum keepalive count: %s\n",
213
0
        strerror(errno));
214
0
    }
215
0
  }
216
0
#endif
217
0
  return 0;
218
0
}
219
220
static inline void set_sock_reuseport(int s)
221
0
{
222
0
  int yes = 1;
223
224
0
  if (setsockopt(s,SOL_SOCKET,SO_REUSEPORT,&yes,sizeof(yes))<0){
225
0
    LM_WARN("setsockopt failed to set SO_REUSEPORT: %s\n",
226
0
      strerror(errno));
227
0
  }
228
0
  if (setsockopt(s,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))<0){
229
0
    LM_WARN("setsockopt failed to set SO_REUSEADDR: %s\n",
230
0
      strerror(errno));
231
0
  }
232
0
}
233
234
/*! \brief Set all socket/fd options:  disable nagle, tos lowdelay,
235
 * non-blocking
236
 * \return -1 on error */
237
int tcp_init_sock_opt(int s, const struct tcp_conn_profile *prof, enum si_flags socketflags, int sock_tos)
238
0
{
239
0
  int flags;
240
0
  int optval;
241
242
0
#ifdef DISABLE_NAGLE
243
0
  flags=1;
244
0
  if ( (tcp_proto_no!=-1) && (setsockopt(s, tcp_proto_no , TCP_NODELAY,
245
0
          &flags, sizeof(flags))<0) ){
246
0
    LM_WARN("could not disable Nagle: %s\n", strerror(errno));
247
0
  }
248
0
#endif
249
  /* tos*/
250
0
  optval = (sock_tos > 0) ? sock_tos : tos;
251
0
  if (optval > 0) {
252
0
    if (setsockopt(s, IPPROTO_IP, IP_TOS, (void*)&optval,sizeof(optval)) ==-1){
253
0
      LM_WARN("setsockopt tos: %s\n",  strerror(errno));
254
      /* continue since this is not critical */
255
0
    }
256
0
  }
257
258
0
  if (probe_max_sock_buff(s,1,MAX_SEND_BUFFER_SIZE,BUFFER_INCREMENT)) {
259
0
    LM_WARN("setsockopt tcp snd buff: %s\n", strerror(errno));
260
    /* continue since this is not critical */
261
0
  }
262
263
0
  init_sock_keepalive(s, prof);
264
0
  if (socketflags & SI_REUSEPORT)
265
0
    set_sock_reuseport(s);
266
267
  /* non-blocking */
268
0
  flags=fcntl(s, F_GETFL);
269
0
  if (flags==-1){
270
0
    LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
271
0
    goto error;
272
0
  }
273
0
  if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){
274
0
    LM_ERR("set non-blocking failed: (%d) %s\n", errno, strerror(errno));
275
0
    goto error;
276
0
  }
277
0
  return 0;
278
0
error:
279
0
  return -1;
280
0
}
281
282
static int send2worker(struct tcp_connection* tcpconn,int rw)
283
0
{
284
0
  int i;
285
0
  int min_load;
286
0
  int idx;
287
0
  long response[2];
288
0
  unsigned int load;
289
290
0
  min_load=100; /* it is a percentage */
291
0
  idx=0;
292
0
  for (i=0; i<tcp_workers_max_no; i++){
293
0
    if (tcp_workers[i].state==STATE_ACTIVE) {
294
0
      load = pt_get_1m_proc_load( tcp_workers[i].pt_idx );
295
#ifdef EXTRA_DEBUG
296
      LM_DBG("checking TCP worker %d (proc %d), with load %u,"
297
        "min_load so far %u\n", i, tcp_workers[i].pt_idx, load,
298
        min_load);
299
#endif
300
0
      if (min_load>load) {
301
0
        min_load = load;
302
0
        idx = i;
303
0
      }
304
0
    }
305
0
  }
306
307
0
  tcp_workers[idx].n_reqs++;
308
0
  LM_DBG("to tcp worker %d (%d/%d) load %u, %p/%d rw %d\n", idx,
309
0
    tcp_workers[idx].pid, tcp_workers[idx].pt_idx, min_load,
310
0
    tcpconn, tcpconn->s, rw);
311
0
  response[0]=(long)tcpconn;
312
0
  response[1]=rw;
313
0
  if (send_fd(tcp_workers[idx].unix_sock, response, sizeof(response),
314
0
      tcpconn->s)<=0){
315
0
    LM_ERR("send_fd failed\n");
316
0
    return -1;
317
0
  }
318
319
0
  return 0;
320
0
}
321
322
323
324
/********************** TCP conn management functions ************************/
325
326
/* initializes an already defined TCP listener */
327
int tcp_init_listener(struct socket_info *si)
328
0
{
329
0
  union sockaddr_union* addr = &si->su;
330
0
  if (init_su(addr, &si->address, si->port_no)<0){
331
0
    LM_ERR("could no init sockaddr_union\n");
332
0
    return -1;
333
0
  }
334
0
  return 0;
335
0
}
336
337
/* binding an defined TCP listener */
338
int tcp_bind_listener(struct socket_info *si)
339
0
{
340
0
  union sockaddr_union* addr;
341
0
  int optval;
342
0
#ifdef DISABLE_NAGLE
343
0
  int flag;
344
0
  struct protoent* pe;
345
346
0
  if (tcp_proto_no==-1){ /* if not already set */
347
0
    pe=getprotobyname("tcp");
348
0
    if (pe==0){
349
0
      LM_ERR("could not get TCP protocol number\n");
350
0
      tcp_proto_no=-1;
351
0
    }else{
352
0
      tcp_proto_no=pe->p_proto;
353
0
    }
354
0
  }
355
0
#endif
356
357
0
  addr = &si->su;
358
0
  si->socket = socket(AF2PF(addr->s.sa_family), SOCK_STREAM, 0);
359
0
  if (si->socket==-1){
360
0
    LM_ERR("socket failed with [%s]\n", strerror(errno));
361
0
    goto error;
362
0
  }
363
0
#ifdef DISABLE_NAGLE
364
0
  flag=1;
365
0
  if ( (tcp_proto_no!=-1) &&
366
0
     (setsockopt(si->socket, tcp_proto_no , TCP_NODELAY,
367
0
           &flag, sizeof(flag))<0) ){
368
0
    LM_ERR("could not disable Nagle: %s\n",strerror(errno));
369
0
  }
370
0
#endif
371
372
0
#if  !defined(TCP_DONT_REUSEADDR)
373
  /* Stevens, "Network Programming", Section 7.5, "Generic Socket
374
   * Options": "...server started,..a child continues..on existing
375
   * connection..listening server is restarted...call to bind fails
376
   * ... ALL TCP servers should specify the SO_REUSEADDRE option
377
   * to allow the server to be restarted in this situation
378
   */
379
0
  optval=1;
380
0
  if (setsockopt(si->socket, SOL_SOCKET, SO_REUSEADDR,
381
0
  (void*)&optval, sizeof(optval))==-1) {
382
0
    LM_ERR("setsockopt failed with [%s]\n", strerror(errno));
383
0
    goto error;
384
0
  }
385
0
#endif
386
  /* tos */
387
0
  optval = (si->tos > 0) ? si->tos : tos;
388
0
  if (optval > 0) {
389
0
    if (setsockopt(si->socket, IPPROTO_IP, IP_TOS, (void*)&optval,
390
0
    sizeof(optval)) ==-1){
391
0
      LM_WARN("setsockopt tos: %s\n", strerror(errno));
392
      /* continue since this is not critical */
393
0
    }
394
0
  }
395
396
0
  if (probe_max_sock_buff(si->socket,1,MAX_SEND_BUFFER_SIZE,
397
0
  BUFFER_INCREMENT)) {
398
0
    LM_WARN("setsockopt tcp snd buff: %s\n",strerror(errno));
399
    /* continue since this is not critical */
400
0
  }
401
402
0
  init_sock_keepalive(si->socket, &tcp_con_df_profile);
403
0
  if (si->flags & SI_REUSEPORT)
404
0
    set_sock_reuseport(si->socket);
405
0
  if (bind(si->socket, &addr->s, sockaddru_len(*addr))==-1){
406
0
    LM_ERR("bind(%x, %p, %d) on %s:%d : %s\n",
407
0
        si->socket, &addr->s,
408
0
        (unsigned)sockaddru_len(*addr),
409
0
        si->address_str.s,
410
0
        si->port_no,
411
0
        strerror(errno));
412
0
    goto error;
413
0
  }
414
0
  if (listen(si->socket, tcp_socket_backlog)==-1){
415
0
    LM_ERR("listen(%x, %p, %d) on %s: %s\n",
416
0
        si->socket, &addr->s,
417
0
        (unsigned)sockaddru_len(*addr),
418
0
        si->address_str.s,
419
0
        strerror(errno));
420
0
    goto error;
421
0
  }
422
423
0
  return 0;
424
0
error:
425
0
  if (si->socket!=-1){
426
0
    close(si->socket);
427
0
    si->socket=-1;
428
0
  }
429
0
  return -1;
430
0
}
431
432
433
/*! \brief finds a connection, if id=0 return NULL
434
 * \note WARNING: unprotected (locks) use tcpconn_get unless you really
435
 * know what you are doing */
436
static struct tcp_connection* _tcpconn_find(unsigned int id)
437
0
{
438
0
  struct tcp_connection *c;
439
0
  unsigned hash;
440
441
0
  if (id){
442
0
    hash=tcp_id_hash(id);
443
0
    for (c=TCP_PART(id).tcpconn_id_hash[hash]; c; c=c->id_next){
444
#ifdef EXTRA_DEBUG
445
      LM_DBG("c=%p, c->id=%u, port=%d\n",c, c->id, c->rcv.src_port);
446
      print_ip("ip=", &c->rcv.src_ip, "\n");
447
#endif
448
0
      if ((id==c->id)&&(c->state!=S_CONN_BAD)) return c;
449
0
    }
450
0
  }
451
0
  return 0;
452
0
}
453
454
455
/* returns the correlation ID of a TCP connection */
456
int tcp_get_correlation_id( unsigned int id, unsigned long long *cid)
457
0
{
458
0
  struct tcp_connection* c;
459
460
0
  TCPCONN_LOCK(id);
461
0
  if ( (c=_tcpconn_find(id))!=NULL ) {
462
0
    *cid = c->cid;
463
0
    TCPCONN_UNLOCK(id);
464
0
    return 0;
465
0
  }
466
0
  *cid = 0;
467
0
  TCPCONN_UNLOCK(id);
468
0
  return -1;
469
0
}
470
471
/* returns the correlation ID of a TCP connection */
472
int tcp_get_rcv( unsigned int id, struct receive_info *ri)
473
0
{
474
0
  struct tcp_connection* c;
475
476
0
  TCPCONN_LOCK(id);
477
0
  if ( (c=_tcpconn_find(id))!=NULL ) {
478
0
    memcpy(ri, &c->rcv, sizeof *ri);
479
0
    TCPCONN_UNLOCK(id);
480
0
    return 0;
481
0
  }
482
0
  TCPCONN_UNLOCK(id);
483
0
  return -1;
484
0
}
485
486
487
/*! \brief _tcpconn_find with locks and acquire fd */
488
int tcp_conn_get(unsigned int id, struct ip_addr* ip, int port,
489
    enum sip_protos proto, void *proto_extra_id,
490
    struct tcp_connection** conn, int* conn_fd,
491
    const struct socket_info* send_sock)
492
0
{
493
0
  struct tcp_connection* c;
494
0
  struct tcp_connection* tmp;
495
0
  struct tcp_conn_alias* a;
496
0
  unsigned hash;
497
0
  long response[2];
498
0
  unsigned int part;
499
0
  int n;
500
0
  int fd;
501
502
0
  if (id) {
503
0
    part = id;
504
0
    TCPCONN_LOCK(part);
505
0
    if ( (c=_tcpconn_find(part))!=NULL )
506
0
      goto found;
507
0
    TCPCONN_UNLOCK(part);
508
0
  }
509
510
  /* continue search based on IP address + port + transport */
511
#ifdef EXTRA_DEBUG
512
  LM_DBG("%d  port %u\n",id, port);
513
  if (ip) print_ip("tcpconn_find: ip ", ip, "\n");
514
#endif
515
0
  if (ip){
516
0
    hash=tcp_addr_hash(ip, port);
517
0
    for( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) {
518
0
      TCPCONN_LOCK(part);
519
0
      for (a=TCP_PART(part).tcpconn_aliases_hash[hash]; a; a=a->next) {
520
#ifdef EXTRA_DEBUG
521
        LM_DBG("a=%p, c=%p, c->id=%u, alias port= %d port=%d\n",
522
          a, a->parent, a->parent->id, a->port,
523
          a->parent->rcv.src_port);
524
        print_ip("ip=",&a->parent->rcv.src_ip,"\n");
525
        if (send_sock && a->parent->rcv.bind_address) {
526
          print_ip("requested send_sock ip=", &send_sock->address,"\n");
527
          print_ip("found send_sock ip=", &a->parent->rcv.bind_address->address,"\n");
528
        }
529
#endif
530
0
        c = a->parent;
531
0
        if (c->state != S_CONN_BAD &&
532
0
            c->flags&F_CONN_INIT &&
533
0
            (send_sock==NULL || send_sock == a->parent->rcv.bind_address) &&
534
0
            port == a->port &&
535
0
            proto == c->type &&
536
0
            ip_addr_cmp(ip, &c->rcv.src_ip) &&
537
0
            (proto_extra_id==NULL ||
538
0
            protos[proto].net.stream.conn.match==NULL ||
539
0
            protos[proto].net.stream.conn.match( c, proto_extra_id)) )
540
0
          goto found;
541
0
      }
542
0
      TCPCONN_UNLOCK(part);
543
0
    }
544
0
  }
545
546
  /* not found */
547
0
  *conn = NULL;
548
0
  if (conn_fd) *conn_fd = -1;
549
0
  return 0;
550
551
0
found:
552
0
  c->refcnt++;
553
0
  TCPCONN_UNLOCK(part);
554
0
  sh_log(c->hist, TCP_REF, "tcp_conn_get, (%d)", c->refcnt);
555
556
0
  LM_DBG("con found in state %d\n",c->state);
557
558
0
  if (c->state!=S_CONN_OK || conn_fd==NULL) {
559
    /* no need to acquired, just return the conn with an invalid fd */
560
0
    *conn = c;
561
0
    if (conn_fd) *conn_fd = -1;
562
0
    return 1;
563
0
  }
564
565
0
  if (c->proc_id == process_no) {
566
0
    LM_DBG("tcp connection found (%p) already in this process ( %d ) ,"
567
0
      " fd = %d\n", c, c->proc_id, c->fd);
568
    /* we already have the connection in this worker's reactor, */
569
    /* no need to acquire FD */
570
0
    *conn = c;
571
0
    *conn_fd = c->fd;
572
0
    return 1;
573
0
  }
574
575
  /* acquire the fd for this connection too */
576
0
  LM_DBG("tcp connection found (%p), acquiring fd\n", c);
577
  /* get the fd */
578
0
  response[0]=(long)c;
579
0
  response[1]=CONN_GET_FD;
580
0
  n=send_all(unix_tcp_sock, response, sizeof(response));
581
0
  if (n<=0){
582
0
    LM_ERR("failed to get fd(write):%s (%d)\n",
583
0
        strerror(errno), errno);
584
0
    n=-1;
585
0
    goto error;
586
0
  }
587
0
  LM_DBG("c= %p, n=%d, Usock=%d\n", c, n, unix_tcp_sock);
588
0
  tmp = c;
589
0
  n=receive_fd(unix_tcp_sock, &c, sizeof(c), &fd, MSG_WAITALL);
590
0
  if (n<=0){
591
0
    LM_ERR("failed to get fd(receive_fd):"
592
0
      " %s (%d)\n", strerror(errno), errno);
593
0
    n=-1;
594
0
    goto error;
595
0
  }
596
0
  if (c!=tmp){
597
0
    LM_CRIT("got different connection:"
598
0
      "  %p (id= %u, refcnt=%d state=%d != "
599
0
      "  %p (id= %u, refcnt=%d state=%d (n=%d)\n",
600
0
        c,   c->id,   c->refcnt,   c->state,
601
0
        tmp, tmp->id, tmp->refcnt, tmp->state, n
602
0
       );
603
0
    n=-1; /* fail */
604
0
    close(fd);
605
0
    goto error;
606
0
  }
607
0
  LM_DBG("after receive_fd: c= %p n=%d fd=%d\n",c, n, fd);
608
609
0
  *conn = c;
610
0
  *conn_fd = fd;
611
612
0
  return 1;
613
0
error:
614
0
  tcpconn_put(c);
615
0
  sh_log(c->hist, TCP_UNREF, "tcp_conn_get, (%d)", c->refcnt);
616
0
  *conn = NULL;
617
0
  *conn_fd = -1;
618
0
  return -1;
619
0
}
620
621
622
/* used to tune the tcp_connection attributes - not to be used inside the
623
   network layer, but onlu from the above layer (otherwise we may end up
624
   in strange deadlocks!) */
625
int tcp_conn_fcntl(struct receive_info *rcv, int attr, void *value)
626
0
{
627
0
  struct tcp_connection *con;
628
629
0
  switch (attr) {
630
0
  case DST_FCNTL_SET_LIFETIME:
631
    /* set connection timeout */
632
0
    TCPCONN_LOCK(rcv->proto_reserved1);
633
0
    con =_tcpconn_find(rcv->proto_reserved1);
634
0
    if (!con) {
635
0
      LM_ERR("Strange, tcp conn not found (id=%u)\n",
636
0
        rcv->proto_reserved1);
637
0
    } else {
638
0
      tcp_conn_set_lifetime( con, (int)(long)(value));
639
0
    }
640
0
    TCPCONN_UNLOCK(rcv->proto_reserved1);
641
0
    return 0;
642
0
  default:
643
0
    LM_ERR("unsupported operation %d on conn\n",attr);
644
0
    return -1;
645
0
  }
646
0
  return -1;
647
0
}
648
649
650
static struct tcp_connection* tcpconn_add(struct tcp_connection *c)
651
0
{
652
0
  unsigned hash;
653
654
0
  if (c){
655
0
    TCPCONN_LOCK(c->id);
656
    /* add it at the beginning of the list*/
657
0
    hash=tcp_id_hash(c->id);
658
0
    c->id_hash=hash;
659
0
    tcpconn_listadd(TCP_PART(c->id).tcpconn_id_hash[hash], c, id_next,
660
0
      id_prev);
661
662
0
    hash=tcp_addr_hash(&c->rcv.src_ip, c->rcv.src_port);
663
    /* set the first alias */
664
0
    c->con_aliases[0].port=c->rcv.src_port;
665
0
    c->con_aliases[0].hash=hash;
666
0
    c->con_aliases[0].parent=c;
667
0
    tcpconn_listadd(TCP_PART(c->id).tcpconn_aliases_hash[hash],
668
0
      &c->con_aliases[0], next, prev);
669
0
    c->aliases++;
670
0
    TCPCONN_UNLOCK(c->id);
671
0
    LM_DBG("hashes: %d, %d\n", hash, c->id_hash);
672
0
    return c;
673
0
  }else{
674
0
    LM_CRIT("null connection pointer\n");
675
0
    return 0;
676
0
  }
677
0
}
678
679
static str e_tcp_src_ip = str_init("src_ip");
680
static str e_tcp_src_port = str_init("src_port");
681
static str e_tcp_dst_ip = str_init("dst_ip");
682
static str e_tcp_dst_port = str_init("dst_port");
683
static str e_tcp_c_proto = str_init("proto");
684
685
static void tcp_disconnect_event_raise(struct tcp_connection* c)
686
0
{
687
0
  evi_params_p list = 0;
688
0
  str src_ip,dst_ip, proto;
689
0
  int src_port,dst_port;
690
0
  char src_ip_buf[IP_ADDR_MAX_STR_SIZE],dst_ip_buf[IP_ADDR_MAX_STR_SIZE];
691
692
  // event has to be triggered - check for subscribers
693
0
  if (!evi_probe_event(EVI_TCP_DISCONNECT)) {
694
0
    goto end;
695
0
  }
696
697
0
  if (!(list = evi_get_params()))
698
0
    goto end;
699
700
0
  src_ip.s = ip_addr2a( &c->rcv.src_ip );
701
0
  memcpy(src_ip_buf,src_ip.s,IP_ADDR_MAX_STR_SIZE);
702
0
  src_ip.s = src_ip_buf;
703
0
  src_ip.len = strlen(src_ip.s);
704
705
0
  if (evi_param_add_str(list, &e_tcp_src_ip, &src_ip)) {
706
0
    LM_ERR("unable to add parameter\n");
707
0
    goto end;
708
0
  }
709
710
0
  src_port = c->rcv.src_port;
711
712
0
  if (evi_param_add_int(list, &e_tcp_src_port, &src_port)) {
713
0
    LM_ERR("unable to add parameter\n");
714
0
    goto end;
715
0
  }
716
717
0
  dst_ip.s = ip_addr2a( &c->rcv.dst_ip );
718
0
  memcpy(dst_ip_buf,dst_ip.s,IP_ADDR_MAX_STR_SIZE);
719
0
  dst_ip.s = dst_ip_buf;
720
0
  dst_ip.len = strlen(dst_ip.s);
721
722
0
  if (evi_param_add_str(list, &e_tcp_dst_ip, &dst_ip)) {
723
0
    LM_ERR("unable to add parameter\n");
724
0
    goto end;
725
0
  }
726
727
0
  dst_port = c->rcv.dst_port;
728
729
0
  if (evi_param_add_int(list, &e_tcp_dst_port, &dst_port)) {
730
0
    LM_ERR("unable to add parameter\n");
731
0
    goto end;
732
0
  }
733
734
0
  proto.s = protos[c->rcv.proto].name;
735
0
  proto.len = strlen(proto.s);
736
737
0
  if (evi_param_add_str(list, &e_tcp_c_proto, &proto)) {
738
0
    LM_ERR("unable to add parameter\n");
739
0
    goto end;
740
0
  }
741
742
0
  if (is_tcp_main) {
743
0
    if (evi_dispatch_event(EVI_TCP_DISCONNECT, list)) {
744
0
      LM_ERR("unable to dispatch tcp disconnect event\n");
745
0
    }
746
0
  } else {
747
0
    if (evi_raise_event(EVI_TCP_DISCONNECT, list)) {
748
0
      LM_ERR("unable to send tcp disconnect event\n");
749
0
    }
750
0
  }
751
0
  list = 0;
752
753
0
end:
754
0
  if (list)
755
0
    evi_free_params(list);
756
0
}
757
758
/* convenience macro to aid in shm_free() debugging */
759
#define _tcpconn_rm(c, ne) \
760
0
  do {\
761
0
    __tcpconn_rm(c, ne);\
762
0
    shm_free(c);\
763
0
  } while (0)
764
765
/*! \brief unsafe tcpconn_rm version (nolocks) */
766
static void __tcpconn_rm(struct tcp_connection* c, int no_event)
767
0
{
768
0
  int r;
769
770
0
  tcpconn_listrm(TCP_PART(c->id).tcpconn_id_hash[c->id_hash], c,
771
0
    id_next, id_prev);
772
  /* remove all the aliases */
773
0
  for (r=0; r<c->aliases; r++)
774
0
    tcpconn_listrm(TCP_PART(c->id).tcpconn_aliases_hash[c->con_aliases[r].hash],
775
0
      &c->con_aliases[r], next, prev);
776
0
  lock_destroy(&c->write_lock);
777
778
0
  if (c->async) {
779
0
    for (r = 0; r<c->async->pending; r++)
780
0
      shm_free(c->async->chunks[r]);
781
0
    shm_free(c->async);
782
0
    c->async = NULL;
783
0
  }
784
785
0
  if (c->con_req)
786
0
    shm_free(c->con_req);
787
788
0
  if (protos[c->type].net.stream.conn.clean)
789
0
    protos[c->type].net.stream.conn.clean(c);
790
791
0
  if (!no_event) tcp_disconnect_event_raise(c);
792
793
#ifdef DBG_TCPCON
794
  sh_log(c->hist, TCP_DESTROY, "type=%d", c->type);
795
  sh_unref(c->hist);
796
  c->hist = NULL;
797
#endif
798
799
  /* shm_free(c); -- freed by _tcpconn_rm() */
800
0
}
801
802
803
804
#if 0
805
static void tcpconn_rm(struct tcp_connection* c)
806
{
807
  int r;
808
809
  TCPCONN_LOCK(c->id);
810
  tcpconn_listrm(TCP_PART(c->id).tcpconn_id_hash[c->id_hash], c,
811
    id_next, id_prev);
812
  /* remove all the aliases */
813
  for (r=0; r<c->aliases; r++)
814
    tcpconn_listrm(TCP_PART(c->id).tcpconn_aliases_hash
815
      [c->con_aliases[r].hash],
816
      &c->con_aliases[r], next, prev);
817
  TCPCONN_UNLOCK(c->id);
818
  lock_destroy(&c->write_lock);
819
820
  if (protos[c->type].net.stream.conn.clean)
821
    protos[c->type].net.stream.conn.clean(c);
822
823
  shm_free(c);
824
}
825
#endif
826
827
828
/*! \brief add port as an alias for the "id" connection
829
 * \return 0 on success,-1 on failure */
830
int tcpconn_add_alias(struct sip_msg *msg, unsigned int id, int port, int proto)
831
0
{
832
0
  struct tcp_connection* c;
833
0
  unsigned hash;
834
0
  struct tcp_conn_alias* a;
835
836
0
  a=0;
837
  /* fix the port */
838
0
  port=port ? port : protos[proto].default_port ;
839
0
  TCPCONN_LOCK(id);
840
  /* check if alias already exists */
841
0
  c=_tcpconn_find(id);
842
0
  if (c) {
843
0
    if (msg && !(c->profile.alias_mode == TCP_ALIAS_ALWAYS
844
0
                   || (c->profile.alias_mode == TCP_ALIAS_RFC_5923
845
0
                       && msg->via1->alias))) {
846
0
      LM_DBG("refusing to add alias (alias_mode: %u, via 'alias': %u)\n",
847
0
              c->profile.alias_mode, !!msg->via1->alias);
848
0
      TCPCONN_UNLOCK(id);
849
0
      return 0;
850
0
    }
851
852
0
    hash=tcp_addr_hash(&c->rcv.src_ip, port);
853
    /* search the aliases for an already existing one */
854
0
    for (a=TCP_PART(id).tcpconn_aliases_hash[hash]; a; a=a->next) {
855
0
      if (a->parent->state != S_CONN_BAD &&
856
0
          port == a->port &&
857
0
          proto == a->parent->type &&
858
0
          ip_addr_cmp(&c->rcv.src_ip, &a->parent->rcv.src_ip)) {
859
        /* found */
860
0
        if (a->parent!=c) goto error_sec;
861
0
        else goto ok;
862
0
      }
863
0
    }
864
0
    if (c->aliases>=TCP_CON_MAX_ALIASES) goto error_aliases;
865
0
    c->con_aliases[c->aliases].parent=c;
866
0
    c->con_aliases[c->aliases].port=port;
867
0
    c->con_aliases[c->aliases].hash=hash;
868
0
    tcpconn_listadd(TCP_PART(id).tcpconn_aliases_hash[hash],
869
0
                &c->con_aliases[c->aliases], next, prev);
870
0
    c->aliases++;
871
0
  }else goto error_not_found;
872
0
ok:
873
0
  TCPCONN_UNLOCK(id);
874
#ifdef EXTRA_DEBUG
875
  if (a) LM_DBG("alias already present\n");
876
  else   LM_DBG("alias port %d for hash %d, id %u\n", port, hash, id);
877
#endif
878
0
  return 0;
879
0
error_aliases:
880
0
  TCPCONN_UNLOCK(id);
881
0
  LM_ERR("too many aliases for connection %p (%u)\n", c, id);
882
0
  return -1;
883
0
error_not_found:
884
0
  TCPCONN_UNLOCK(id);
885
0
  LM_ERR("no connection found for id %u\n",id);
886
0
  return -1;
887
0
error_sec:
888
0
  LM_WARN("possible port hijack attempt\n");
889
0
  LM_WARN("alias already present and points to another connection "
890
0
      "(%d : %d and %u : %d)\n", a->parent->id,  port, id, port);
891
0
  TCPCONN_UNLOCK(id);
892
0
  return -1;
893
0
}
894
895
896
void tcpconn_put(struct tcp_connection* c)
897
0
{
898
0
  TCPCONN_LOCK(c->id);
899
0
  c->refcnt--;
900
0
  TCPCONN_UNLOCK(c->id);
901
0
}
902
903
904
static inline void tcpconn_ref(struct tcp_connection* c)
905
0
{
906
0
  TCPCONN_LOCK(c->id);
907
0
  c->refcnt++;
908
0
  TCPCONN_UNLOCK(c->id);
909
0
}
910
911
912
static struct tcp_connection* tcpconn_new(int sock, const union sockaddr_union* su,
913
                    const struct socket_info* si, const struct tcp_conn_profile *prof,
914
                    int state, int flags, int in_main_proc)
915
0
{
916
0
  struct tcp_connection *c;
917
0
  union sockaddr_union local_su;
918
0
  unsigned int su_size;
919
920
0
  c=(struct tcp_connection*)shm_malloc(sizeof(struct tcp_connection));
921
0
  if (c==0){
922
0
    LM_ERR("shared memory allocation failure\n");
923
0
    return 0;
924
0
  }
925
0
  memset(c, 0, sizeof(struct tcp_connection)); /* zero init */
926
0
  c->s=sock;
927
0
  c->fd=-1; /* not initialized */
928
0
  if (lock_init(&c->write_lock)==0){
929
0
    LM_ERR("init lock failed\n");
930
0
    goto error0;
931
0
  }
932
933
0
  c->rcv.src_su=*su;
934
935
0
  c->refcnt=0;
936
0
  su2ip_addr(&c->rcv.src_ip, su);
937
0
  c->rcv.src_port=su_getport(su);
938
0
  c->rcv.bind_address = si;
939
0
  c->rcv.dst_ip = si->address;
940
0
  su_size = sockaddru_len(*su);
941
0
  if (getsockname(sock, (struct sockaddr *)&local_su, &su_size)<0) {
942
0
    LM_ERR("failed to get info on received interface/IP %d/%s\n",
943
0
      errno, strerror(errno));
944
0
    goto error;
945
0
  }
946
0
  c->rcv.dst_port = su_getport(&local_su);
947
0
  print_ip("tcpconn_new: new tcp connection to: ", &c->rcv.src_ip, "\n");
948
0
  LM_DBG("on port %d, proto %d\n", c->rcv.src_port, si->proto);
949
0
  c->id=(*connection_id)++;
950
0
  c->cid = (unsigned long long)c->id
951
0
        | ( (unsigned long long)(startup_time&0xFFFFFF) << 32 )
952
0
          | ( (unsigned long long)(rand()&0xFF) << 56 );
953
954
0
  c->rcv.proto_reserved1=0; /* this will be filled before receive_message*/
955
0
  c->rcv.proto_reserved2=0;
956
0
  c->state=state;
957
0
  c->extra_data=0;
958
0
  c->type = si->proto;
959
0
  c->rcv.proto = si->proto;
960
  /* start with the default conn lifetime */
961
0
  c->lifetime = get_ticks() + prof->con_lifetime;
962
0
  c->profile = *prof;
963
0
  c->flags|=F_CONN_REMOVED|flags;
964
#ifdef DBG_TCPCON
965
  c->hist = sh_push(c, con_hist);
966
#endif
967
968
0
  if (protos[si->proto].net.stream.async_chunks) {
969
0
    c->async = shm_malloc(sizeof(struct tcp_async_data) +
970
0
        protos[si->proto].net.stream.async_chunks *
971
0
        sizeof(struct tcp_async_chunk));
972
0
    if (c->async) {
973
0
      c->async->allocated = protos[si->proto].net.stream.async_chunks;
974
0
      c->async->oldest = 0;
975
0
      c->async->pending = 0;
976
0
    } else {
977
0
      LM_ERR("could not allocate async data for con!\n");
978
0
      goto error;
979
0
    }
980
0
  }
981
0
  if(in_main_proc)
982
0
    tcp_connections_no++;
983
0
  return c;
984
985
0
error:
986
0
  lock_destroy(&c->write_lock);
987
0
error0:
988
0
  shm_free(c);
989
0
  return 0;
990
0
}
991
992
993
/* creates a new tcp connection structure
994
 * if send2main is 1, the function informs the TCP Main about the new conn
995
 * a +1 ref is set for the new conn !
996
 * IMPORTANT - the function assumes you want to create a new TCP conn as
997
 * a result of a connect operation - the conn will be set as connect !!
998
 * Accepted connection are triggered internally only */
999
struct tcp_connection* tcp_conn_create(int sock, const union sockaddr_union* su,
1000
    const struct socket_info* si, struct tcp_conn_profile *prof,
1001
    int state, int send2main)
1002
0
{
1003
0
  struct tcp_connection *c;
1004
1005
0
  if (!prof)
1006
0
    tcp_con_get_profile(su, &si->su, si->proto, prof);
1007
1008
  /* create the connection structure */
1009
0
  c = tcpconn_new(sock, su, si, prof, state, 0, !send2main);
1010
0
  if (c==NULL) {
1011
0
    LM_ERR("tcpconn_new failed\n");
1012
0
    return NULL;
1013
0
  }
1014
1015
0
  if (protos[c->type].net.stream.conn.init &&
1016
0
      protos[c->type].net.stream.conn.init(c) < 0) {
1017
0
    LM_ERR("failed to do proto %d specific init for conn %p\n",
1018
0
        c->type, c);
1019
0
    tcp_conn_destroy(c);
1020
0
    return NULL;
1021
0
  }
1022
0
  c->flags |= F_CONN_INIT;
1023
1024
0
  c->refcnt++; /* safe to do it w/o locking, it's not yet
1025
          available to the rest of the world */
1026
0
  sh_log(c->hist, TCP_REF, "connect, (%d)", c->refcnt);
1027
0
  if (!send2main)
1028
0
    return c;
1029
1030
0
  return (tcp_conn_send(c) == 0 ? c : NULL);
1031
0
}
1032
1033
/* sends a new connection from a worker to main */
1034
int tcp_conn_send(struct tcp_connection *c)
1035
0
{
1036
0
  long response[2];
1037
0
  int n, fd;
1038
1039
  /* inform TCP main about this new connection */
1040
0
  if (c->state==S_CONN_CONNECTING) {
1041
    /* store the local fd now, before TCP main overwrites it */
1042
0
    fd = c->s;
1043
0
    response[0]=(long)c;
1044
0
    response[1]=ASYNC_CONNECT;
1045
0
    n=send_fd(unix_tcp_sock, response, sizeof(response), fd);
1046
0
    if (n<=0) {
1047
0
      LM_ERR("Failed to send the socket to main for async connection\n");
1048
0
      goto error;
1049
0
    }
1050
0
    close(fd);
1051
0
  } else {
1052
0
    response[0]=(long)c;
1053
0
    response[1]=CONN_NEW;
1054
0
    n=send_fd(unix_tcp_sock, response, sizeof(response), c->s);
1055
0
    if (n<=0){
1056
0
      LM_ERR("failed send_fd: %s (%d)\n", strerror(errno), errno);
1057
0
      goto error;
1058
0
    }
1059
0
  }
1060
1061
0
  return 0;
1062
0
error:
1063
  /* no reporting as closed, as PROTO layer did not reporte it as
1064
   * OPEN yet */
1065
0
  _tcpconn_rm(c,1);
1066
0
  tcp_connections_no--;
1067
0
  return -1;
1068
0
}
1069
1070
1071
static inline void tcpconn_destroy(struct tcp_connection* tcpconn)
1072
0
{
1073
0
  int fd;
1074
0
  int unsigned id = tcpconn->id;
1075
1076
0
  TCPCONN_LOCK(id); /*avoid races w/ tcp_send*/
1077
0
  tcpconn->refcnt--;
1078
0
  if (tcpconn->refcnt==0){
1079
0
    LM_DBG("destroying connection %p, flags %04x\n",
1080
0
        tcpconn, tcpconn->flags);
1081
0
    fd=tcpconn->s;
1082
    /* no reporting here - the tcpconn_destroy() function is called
1083
     * from the TCP_MAIN reactor when handling connectioned received
1084
     * from a worker; and we generate the CLOSE reports from WORKERs */
1085
0
    _tcpconn_rm(tcpconn,0);
1086
0
    if (fd >= 0)
1087
0
      close(fd);
1088
0
    tcp_connections_no--;
1089
0
  }else{
1090
    /* force timeout */
1091
0
    tcpconn->lifetime=0;
1092
0
    tcpconn->state=S_CONN_BAD;
1093
0
    sh_log(tcpconn->hist, TCP_DEL_DELAY, "tcpconn_destroy delayed, (%d)",
1094
0
      tcpconn->refcnt);
1095
0
    LM_DBG("delaying (%p, flags %04x) ref = %d ...\n",
1096
0
        tcpconn, tcpconn->flags, tcpconn->refcnt);
1097
1098
0
  }
1099
0
  TCPCONN_UNLOCK(id);
1100
0
}
1101
1102
/* wrapper to the internally used function */
1103
void tcp_conn_destroy(struct tcp_connection* tcpconn)
1104
0
{
1105
0
  tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE,
1106
0
        "Closed by Proto layer");
1107
0
  sh_log(tcpconn->hist, TCP_UNREF, "tcp_conn_destroy, (%d)", tcpconn->refcnt);
1108
0
  return tcpconn_destroy(tcpconn);
1109
0
}
1110
1111
1112
/************************ TCP MAIN process functions ************************/
1113
1114
/*! \brief
1115
 * handles a new connection, called internally by tcp_main_loop/handle_io.
1116
 * \param si - pointer to one of the tcp socket_info structures on which
1117
 *              an io event was detected (connection attempt)
1118
 * \return  handle_* return convention: -1 on error, 0 on EAGAIN (no more
1119
 *           io events queued), >0 on success. success/error refer only to
1120
 *           the accept.
1121
 */
1122
static inline int handle_new_connect(const struct socket_info* si)
1123
0
{
1124
0
  union sockaddr_union su;
1125
0
  struct tcp_connection* tcpconn;
1126
0
  struct tcp_conn_profile prof;
1127
0
  socklen_t su_len = sizeof(su);
1128
0
  int new_sock;
1129
0
  unsigned int id;
1130
1131
  /* coverity[overrun-buffer-arg: FALSE] - union has 28 bytes, CID #200070 */
1132
0
  new_sock=accept(si->socket, &(su.s), &su_len);
1133
0
  if (new_sock==-1){
1134
0
    if ((errno==EAGAIN)||(errno==EWOULDBLOCK))
1135
0
      return 0;
1136
0
    LM_ERR("failed to accept connection(%d): %s\n", errno, strerror(errno));
1137
0
    return -1;
1138
0
  }
1139
0
  if (tcp_connections_no>=tcp_max_connections){
1140
0
    LM_ERR("maximum number of connections exceeded: %d/%d\n",
1141
0
          tcp_connections_no, tcp_max_connections);
1142
0
    close(new_sock);
1143
0
    return 1; /* success, because the accept was successful */
1144
0
  }
1145
1146
0
  tcp_con_get_profile(&su, &si->su, si->proto, &prof);
1147
0
  if (tcp_init_sock_opt(new_sock, &prof, si->flags, si->tos)<0){
1148
0
    LM_ERR("tcp_init_sock_opt failed\n");
1149
0
    close(new_sock);
1150
0
    return 1; /* success, because the accept was successful */
1151
0
  }
1152
1153
  /* add socket to list */
1154
0
  tcpconn=tcpconn_new(new_sock, &su, si, &prof, S_CONN_OK, F_CONN_ACCEPTED, 1);
1155
0
  if (tcpconn){
1156
0
    tcpconn->refcnt++; /* safe, not yet available to the
1157
                outside world */
1158
0
    sh_log(tcpconn->hist, TCP_REF, "accept, (%d)", tcpconn->refcnt);
1159
0
    tcpconn_add(tcpconn);
1160
0
    LM_DBG("new connection: %p %d flags: %04x\n",
1161
0
        tcpconn, tcpconn->s, tcpconn->flags);
1162
    /* pass it to a workerr */
1163
0
    sh_log(tcpconn->hist, TCP_SEND2CHILD, "accept");
1164
0
    if(send2worker(tcpconn,IO_WATCH_READ)<0){
1165
0
      LM_ERR("no TCP workers available\n");
1166
0
      id = tcpconn->id;
1167
0
      sh_log(tcpconn->hist, TCP_UNREF, "accept, (%d)", tcpconn->refcnt);
1168
0
      TCPCONN_LOCK(id);
1169
0
      tcpconn->refcnt--;
1170
0
      if (tcpconn->refcnt==0){
1171
        /* no close to report here as the connection was not yet
1172
         * reported as OPEN by the proto layer...this sucks a bit */
1173
0
        _tcpconn_rm(tcpconn,1);
1174
0
        close(new_sock/*same as tcpconn->s*/);
1175
0
      }else tcpconn->lifetime=0; /* force expire */
1176
0
      TCPCONN_UNLOCK(id);
1177
0
    }
1178
0
  }else{ /*tcpconn==0 */
1179
0
    LM_ERR("tcpconn_new failed, closing socket\n");
1180
0
    close(new_sock);
1181
0
  }
1182
0
  return 1; /* accept() was successful */
1183
0
}
1184
1185
1186
/*! \brief
1187
 * handles an io event on one of the watched tcp connections
1188
 *
1189
 * \param    tcpconn - pointer to the tcp_connection for which we have an io ev.
1190
 * \param    fd_i    - index in the fd_array table (needed for delete)
1191
 * \return   handle_* return convention, but on success it always returns 0
1192
 *           (because it's one-shot, after a successful execution the fd is
1193
 *            removed from tcp_main's watch fd list and passed to a worker =>
1194
 *            tcp_main is not interested in further io events that might be
1195
 *            queued for this fd)
1196
 */
1197
inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i,
1198
                                int event_type)
1199
0
{
1200
0
  int fd;
1201
0
  int err;
1202
0
  unsigned int id;
1203
0
  unsigned int err_len;
1204
1205
0
  if (event_type == IO_WATCH_READ) {
1206
    /* pass it to worker, so remove it from the io watch list */
1207
0
    LM_DBG("data available on %p %d\n", tcpconn, tcpconn->s);
1208
0
    if (reactor_del_reader(tcpconn->s, fd_i, 0)==-1)
1209
0
      return -1;
1210
0
    tcpconn->flags|=F_CONN_REMOVED_READ;
1211
0
    tcpconn_ref(tcpconn); /* refcnt ++ */
1212
0
    sh_log(tcpconn->hist, TCP_REF, "tcpconn read, (%d)", tcpconn->refcnt);
1213
0
    sh_log(tcpconn->hist, TCP_SEND2CHILD, "read");
1214
0
    if (send2worker(tcpconn,IO_WATCH_READ)<0){
1215
0
      LM_ERR("no TCP workers available\n");
1216
0
      id = tcpconn->id;
1217
0
      TCPCONN_LOCK(id);
1218
0
      tcpconn->refcnt--;
1219
0
      sh_log(tcpconn->hist, TCP_UNREF, "tcpconn read, (%d)", tcpconn->refcnt);
1220
0
      if (tcpconn->refcnt==0){
1221
0
        fd=tcpconn->s;
1222
0
        tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE,
1223
0
          "No worker for read");
1224
0
        _tcpconn_rm(tcpconn,0);
1225
0
        close(fd);
1226
0
      }else tcpconn->lifetime=0; /* force expire*/
1227
0
      TCPCONN_UNLOCK(id);
1228
0
    }
1229
0
    return 0; /* we are not interested in possibly queued io events,
1230
           the fd was either passed to a worker, or closed */
1231
0
  } else {
1232
0
    LM_DBG("connection %p fd %d is now writable\n", tcpconn, tcpconn->s);
1233
    /* we received a write event */
1234
0
    if (tcpconn->state==S_CONN_CONNECTING) {
1235
      /* we're coming from an async connect & write
1236
       * let's see if we connected successfully */
1237
0
      err_len=sizeof(err);
1238
0
      if (getsockopt(tcpconn->s, SOL_SOCKET, SO_ERROR, &err, &err_len) < 0 || \
1239
0
          err != 0) {
1240
0
        LM_DBG("Failed connection attempt\n");
1241
0
        tcpconn_ref(tcpconn);
1242
0
        sh_log(tcpconn->hist, TCP_REF, "tcpconn connect, (%d)", tcpconn->refcnt);
1243
0
        reactor_del_all(tcpconn->s, fd_i, IO_FD_CLOSING);
1244
0
        tcpconn->flags|=F_CONN_REMOVED;
1245
0
        tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE,
1246
0
          "Async connect failed");
1247
0
        sh_log(tcpconn->hist, TCP_UNREF, "tcpconn connect, (%d)", tcpconn->refcnt);
1248
0
        tcpconn_destroy(tcpconn);
1249
0
        return 0;
1250
0
      }
1251
1252
      /* we successfully connected - further treat this case as if we
1253
       * were coming from an async write */
1254
0
      tcpconn->state = S_CONN_OK;
1255
0
      LM_DBG("Successfully completed previous async connect\n");
1256
1257
      /* now that we completed the async connection, we also need to
1258
       * listen for READ events, otherwise these will get lost */
1259
0
      if (tcpconn->flags & F_CONN_REMOVED_READ) {
1260
0
        reactor_add_reader( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1261
0
        tcpconn->flags&=~F_CONN_REMOVED_READ;
1262
0
      }
1263
1264
0
      goto async_write;
1265
0
    } else {
1266
      /* we're coming from an async write -
1267
       * just pass to worker and have it write
1268
       * our TCP chunks */
1269
0
async_write:
1270
      /* no more write events for now */
1271
0
      if (reactor_del_writer( tcpconn->s, fd_i, 0)==-1)
1272
0
        return -1;
1273
0
      tcpconn->flags|=F_CONN_REMOVED_WRITE;
1274
0
      tcpconn_ref(tcpconn); /* refcnt ++ */
1275
0
      sh_log(tcpconn->hist, TCP_REF, "tcpconn write, (%d)",
1276
0
        tcpconn->refcnt);
1277
0
      sh_log(tcpconn->hist, TCP_SEND2CHILD, "write");
1278
0
      if (send2worker(tcpconn,IO_WATCH_WRITE)<0){
1279
0
        LM_ERR("no TCP worker available\n");
1280
0
        id = tcpconn->id;
1281
0
        TCPCONN_LOCK(id);
1282
0
        tcpconn->refcnt--;
1283
0
        sh_log(tcpconn->hist, TCP_UNREF, "tcpconn write, (%d)",
1284
0
          tcpconn->refcnt);
1285
0
        if (tcpconn->refcnt==0){
1286
0
          fd=tcpconn->s;
1287
0
          tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE,
1288
0
            "No worker for write");
1289
0
          _tcpconn_rm(tcpconn,0);
1290
0
          close(fd);
1291
0
        }else tcpconn->lifetime=0; /* force expire*/
1292
0
        TCPCONN_UNLOCK(id);
1293
0
      }
1294
0
      return 0;
1295
0
    }
1296
0
  }
1297
0
}
1298
1299
1300
/*! \brief handles io from a tcp worker process
1301
 * \param  tcp_c - pointer in the tcp_workers array, to the entry for
1302
 *                 which an io event was detected
1303
 * \param  fd_i  - fd index in the fd_array (useful for optimizing
1304
 *                 io_watch_deletes)
1305
 * \return handle_* return convention: -1 on error, 0 on EAGAIN (no more
1306
 *           io events queued), >0 on success. success/error refer only to
1307
 *           the reads from the fd.
1308
 */
1309
inline static int handle_tcp_worker(struct tcp_worker* tcp_c, int fd_i)
1310
0
{
1311
0
  struct tcp_connection* tcpconn;
1312
0
  long response[2];
1313
0
  int cmd;
1314
0
  int bytes;
1315
1316
0
  if (tcp_c->unix_sock<=0){
1317
    /* (we can't have a fd==0, 0 is never closed )*/
1318
0
    LM_CRIT("fd %d for %d (pid %d)\n", tcp_c->unix_sock,
1319
0
        (int)(tcp_c-&tcp_workers[0]), tcp_c->pid);
1320
0
    goto error;
1321
0
  }
1322
  /* read until sizeof(response)
1323
   * (this is a SOCK_STREAM so read is not atomic) */
1324
0
  bytes=recv_all(tcp_c->unix_sock, response, sizeof(response), MSG_DONTWAIT);
1325
0
  if (bytes<(int)sizeof(response)){
1326
0
    if (bytes==0){
1327
      /* EOF -> bad, worker has died */
1328
0
      if (sr_get_core_status()!=STATE_TERMINATING)
1329
0
        LM_CRIT("dead tcp worker %d (EOF received), pid %d\n",
1330
0
          (int)(tcp_c-&tcp_workers[0]), tcp_c->pid );
1331
      /* don't listen on it any more */
1332
0
      reactor_del_reader( tcp_c->unix_sock, fd_i, 0/*flags*/);
1333
      /* eof. so no more io here, it's ok to return error */
1334
0
      goto error;
1335
0
    }else if (bytes<0){
1336
      /* EAGAIN is ok if we try to empty the buffer
1337
       * e.g.: SIGIO_RT overflow mode or EPOLL ET */
1338
0
      if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){
1339
0
        LM_CRIT("read from tcp worker %ld (pid %d) %s [%d]\n",
1340
0
            (long)(tcp_c-&tcp_workers[0]), tcp_c->pid,
1341
0
            strerror(errno), errno );
1342
0
      }else{
1343
0
        bytes=0;
1344
0
      }
1345
      /* try to ignore ? */
1346
0
      goto end;
1347
0
    }else{
1348
      /* should never happen */
1349
0
      LM_CRIT("too few bytes received (%d)\n", bytes );
1350
0
      bytes=0; /* something was read so there is no error; otoh if
1351
            receive_fd returned less then requested => the receive
1352
            buffer is empty => no more io queued on this fd */
1353
0
      goto end;
1354
0
    }
1355
0
  }
1356
1357
0
  LM_DBG("response= %lx, %ld from tcp worker %d (%d)\n",
1358
0
    response[0], response[1], tcp_c->pid, (int)(tcp_c-&tcp_workers[0]));
1359
1360
0
  cmd=response[1];
1361
0
  tcpconn=(struct tcp_connection*)response[0];
1362
0
  if (tcpconn==0){
1363
    /* should never happen */
1364
0
    LM_CRIT("null tcpconn pointer received from tcp worker %d (pid %d):"
1365
0
      "%lx, %lx\n", (int)(tcp_c-&tcp_workers[0]), tcp_c->pid,
1366
0
      response[0], response[1]) ;
1367
0
    goto end;
1368
0
  }
1369
0
  switch(cmd){
1370
0
    case CONN_RELEASE:
1371
0
      if (tcpconn->state==S_CONN_BAD){
1372
0
        sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release bad, (%d)", tcpconn->refcnt);
1373
0
        tcpconn_destroy(tcpconn);
1374
0
        break;
1375
0
      }
1376
0
      sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release, (%d)", tcpconn->refcnt);
1377
0
      tcpconn_put(tcpconn);
1378
      /* must be after the de-ref*/
1379
0
      reactor_add_reader( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1380
0
      tcpconn->flags&=~F_CONN_REMOVED_READ;
1381
0
      break;
1382
0
    case CONN_RELEASE_WRITE:
1383
0
      if (tcpconn->state==S_CONN_BAD){
1384
0
        sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release write bad, (%d)", tcpconn->refcnt);
1385
0
        tcpconn_destroy(tcpconn);
1386
0
        break;
1387
0
      }
1388
0
      sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release write, (%d)", tcpconn->refcnt);
1389
0
      tcpconn_put(tcpconn);
1390
0
      break;
1391
0
    case ASYNC_WRITE_TCPW:
1392
0
      if (tcpconn->state==S_CONN_BAD){
1393
0
        sh_log(tcpconn->hist, TCP_UNREF, "tcpworker async write bad, (%d)", tcpconn->refcnt);
1394
0
        tcpconn_destroy(tcpconn);
1395
0
        break;
1396
0
      }
1397
0
      sh_log(tcpconn->hist, TCP_UNREF, "tcpworker async write, (%d)", tcpconn->refcnt);
1398
0
      tcpconn_put(tcpconn);
1399
      /* must be after the de-ref*/
1400
0
      reactor_add_writer( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1401
0
      tcpconn->flags&=~F_CONN_REMOVED_WRITE;
1402
0
      break;
1403
0
    case CONN_ERROR_TCPW:
1404
0
      LM_ERR("TCP_DBG - main: received conn %p / %u as faulty "
1405
0
        "(state %d, rfcnt=%d)\n", tcpconn, tcpconn->id,
1406
0
        tcpconn->state, tcpconn->refcnt);
1407
0
    case CONN_DESTROY:
1408
0
    case CONN_EOF:
1409
      /* WARNING: this will auto-dec. refcnt! */
1410
0
      if ((tcpconn->flags & F_CONN_REMOVED) != F_CONN_REMOVED &&
1411
0
        (tcpconn->s!=-1)){
1412
0
        reactor_del_all( tcpconn->s, -1, IO_FD_CLOSING);
1413
0
        tcpconn->flags|=F_CONN_REMOVED;
1414
0
      }
1415
0
      sh_log(tcpconn->hist, TCP_UNREF, "tcpworker destroy, (%d)", tcpconn->refcnt);
1416
0
      tcpconn_destroy(tcpconn); /* closes also the fd */
1417
0
      break;
1418
0
    default:
1419
0
      LM_CRIT("unknown cmd %d from tcp worker %d (%d)\n",
1420
0
        cmd, tcp_c->pid, (int)(tcp_c-&tcp_workers[0]));
1421
0
  }
1422
0
end:
1423
0
  return bytes;
1424
0
error:
1425
0
  return -1;
1426
0
}
1427
1428
1429
/*! \brief handles io from a "generic" ser process (get fd or new_fd from a tcp_send)
1430
 *
1431
 * \param p     - pointer in the ser processes array (pt[]), to the entry for
1432
 *                 which an io event was detected
1433
 * \param fd_i  - fd index in the fd_array (useful for optimizing
1434
 *                 io_watch_deletes)
1435
 * \return  handle_* return convention:
1436
 *          - -1 on error reading from the fd,
1437
 *          -  0 on EAGAIN  or when no  more io events are queued
1438
 *             (receive buffer empty),
1439
 *          -  >0 on successful reads from the fd (the receive buffer might
1440
 *             be non-empty).
1441
 */
1442
inline static int handle_worker(struct process_table* p, int fd_i)
1443
0
{
1444
0
  struct tcp_connection* tcpconn;
1445
0
  long response[2];
1446
0
  int cmd;
1447
0
  int bytes;
1448
0
  int ret;
1449
0
  int fd;
1450
1451
0
  ret=-1;
1452
0
  if (p->unix_sock<=0){
1453
    /* (we can't have a fd==0, 0 is never closed )*/
1454
0
    LM_CRIT("fd %d for %d (pid %d)\n",
1455
0
        p->unix_sock, (int)(p-&pt[0]), p->pid);
1456
0
    goto error;
1457
0
  }
1458
1459
  /* get all bytes and the fd (if transmitted)
1460
   * (this is a SOCK_STREAM so read is not atomic) */
1461
0
  bytes=receive_fd(p->unix_sock, response, sizeof(response), &fd,
1462
0
            MSG_DONTWAIT);
1463
0
  if (bytes<(int)sizeof(response)){
1464
    /* too few bytes read */
1465
0
    if (bytes==0){
1466
      /* EOF -> bad, worker has died */
1467
0
      if (sr_get_core_status()!=STATE_TERMINATING)
1468
0
        LM_CRIT("dead tcp worker %d (EOF received), pid %d\n",
1469
0
          (int)(p-&pt[0]), p->pid);
1470
      /* don't listen on it any more */
1471
0
      reactor_del_reader( p->unix_sock, fd_i, 0/*flags*/);
1472
0
      goto error; /* worker dead => no further io events from it */
1473
0
    }else if (bytes<0){
1474
      /* EAGAIN is ok if we try to empty the buffer
1475
       * e.g: SIGIO_RT overflow mode or EPOLL ET */
1476
0
      if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){
1477
0
        LM_CRIT("read from worker %d (pid %d):  %s [%d]\n",
1478
0
            (int)(p-&pt[0]), p->pid, strerror(errno), errno);
1479
0
        ret=-1;
1480
0
      }else{
1481
0
        ret=0;
1482
0
      }
1483
      /* try to ignore ? */
1484
0
      goto end;
1485
0
    }else{
1486
      /* should never happen */
1487
0
      LM_CRIT("too few bytes received (%d)\n", bytes );
1488
0
      ret=0; /* something was read so there is no error; otoh if
1489
            receive_fd returned less then requested => the receive
1490
            buffer is empty => no more io queued on this fd */
1491
0
      goto end;
1492
0
    }
1493
0
  }
1494
0
  ret=1; /* something was received, there might be more queued */
1495
0
  LM_DBG("read response= %lx, %ld, fd %d from %d (%d)\n",
1496
0
          response[0], response[1], fd, (int)(p-&pt[0]), p->pid);
1497
0
  cmd=response[1];
1498
0
  tcpconn=(struct tcp_connection*)response[0];
1499
0
  if (tcpconn==0){
1500
0
    LM_CRIT("null tcpconn pointer received from worker %d (pid %d)"
1501
0
      "%lx, %lx\n", (int)(p-&pt[0]), p->pid, response[0], response[1]) ;
1502
0
    goto end;
1503
0
  }
1504
0
  switch(cmd){
1505
0
    case CONN_ERROR_GENW:
1506
      /* remove from reactor only if the fd exists, and it wasn't
1507
       * removed before */
1508
0
      if ((tcpconn->flags & F_CONN_REMOVED) != F_CONN_REMOVED &&
1509
0
          (tcpconn->s!=-1)){
1510
0
        reactor_del_all( tcpconn->s, -1, IO_FD_CLOSING);
1511
0
        tcpconn->flags|=F_CONN_REMOVED;
1512
0
      }
1513
0
      sh_log(tcpconn->hist, TCP_UNREF, "worker error, (%d)", tcpconn->refcnt);
1514
0
      tcpconn_destroy(tcpconn); /* will close also the fd */
1515
0
      break;
1516
0
    case CONN_GET_FD:
1517
      /* send the requested FD  */
1518
      /* WARNING: take care of setting refcnt properly to
1519
       * avoid race condition */
1520
0
      if (send_fd(p->unix_sock, &tcpconn, sizeof(tcpconn),
1521
0
              tcpconn->s)<=0){
1522
0
        LM_ERR("send_fd failed\n");
1523
0
      }
1524
0
      break;
1525
0
    case CONN_NEW:
1526
      /* update the fd in the requested tcpconn*/
1527
      /* WARNING: take care of setting refcnt properly to
1528
       * avoid race condition */
1529
0
      if (fd==-1){
1530
0
        LM_CRIT(" cmd CONN_NEW: no fd received\n");
1531
0
        break;
1532
0
      }
1533
0
      tcpconn->s=fd;
1534
      /* add tcpconn to the list*/
1535
0
      tcpconn_add(tcpconn);
1536
0
      tcp_connections_no++;
1537
0
      reactor_add_reader( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1538
0
      tcpconn->flags&=~F_CONN_REMOVED_READ;
1539
0
      break;
1540
0
    case ASYNC_CONNECT:
1541
      /* connection is not yet linked to hash = not yet
1542
       * available to the outside world */
1543
0
      if (fd==-1){
1544
0
        LM_CRIT(" cmd CONN_NEW: no fd received\n");
1545
0
        break;
1546
0
      }
1547
0
      tcpconn->s=fd;
1548
      /* add tcpconn to the list*/
1549
0
      tcpconn_add(tcpconn);
1550
0
      tcp_connections_no++;
1551
      /* FIXME - now we have lifetime==default_lifetime - should we
1552
       * set a shorter one when waiting for a connect ??? */
1553
      /* only maintain the socket in the IO_WATCH_WRITE watcher
1554
       * while we have stuff to write - otherwise we're going to get
1555
       * useless events */
1556
0
      reactor_add_writer( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1557
0
      tcpconn->flags&=~F_CONN_REMOVED_WRITE;
1558
0
      break;
1559
0
    case ASYNC_WRITE_GENW:
1560
0
      sh_log(tcpconn->hist,TCP_UNREF,"ASYNC_WRITE_GENW, (%d)",
1561
0
        tcpconn->refcnt);
1562
0
      if (tcpconn->state==S_CONN_BAD){
1563
0
        tcpconn->lifetime=0;
1564
0
        tcpconn_put(tcpconn);
1565
0
        break;
1566
0
      }
1567
0
      tcpconn_put(tcpconn);
1568
      /* must be after the de-ref*/
1569
0
      reactor_add_writer( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1570
0
      tcpconn->flags&=~F_CONN_REMOVED_WRITE;
1571
0
      break;
1572
0
    default:
1573
0
      LM_CRIT("unknown cmd %d from worker %d (pid %d)\n", cmd,
1574
0
        (int)(p-&pt[0]), p->pid);
1575
0
  }
1576
0
end:
1577
0
  return ret;
1578
0
error:
1579
0
  return -1;
1580
0
}
1581
1582
1583
/*! \brief generic handle io routine, it will call the appropiate
1584
 *  handle_xxx() based on the fd_map type
1585
 *
1586
 * \param  fm  - pointer to a fd hash entry
1587
 * \param  idx - index in the fd_array (or -1 if not known)
1588
 * \return -1 on error
1589
 *          0 on EAGAIN or when by some other way it is known that no more
1590
 *            io events are queued on the fd (the receive buffer is empty).
1591
 *            Usefull to detect when there are no more io events queued for
1592
 *            sigio_rt, epoll_et, kqueue.
1593
 *         >0 on successful read from the fd (when there might be more io
1594
 *            queued -- the receive buffer might still be non-empty)
1595
 */
1596
inline static int handle_io(struct fd_map* fm, int idx,int event_type)
1597
0
{
1598
0
  int ret = 0;
1599
1600
0
  pt_become_active();
1601
0
  switch(fm->type){
1602
0
    case F_TCP_LISTENER:
1603
0
      ret = handle_new_connect((const struct socket_info*)fm->data);
1604
0
      break;
1605
0
    case F_TCPCONN:
1606
0
      ret = handle_tcpconn_ev((struct tcp_connection*)fm->data, idx,
1607
0
        event_type);
1608
0
      break;
1609
0
    case F_TCP_TCPWORKER:
1610
0
      ret = handle_tcp_worker((struct tcp_worker*)fm->data, idx);
1611
0
      break;
1612
0
    case F_TCP_WORKER:
1613
0
      ret = handle_worker((struct process_table*)fm->data, idx);
1614
0
      break;
1615
0
    case F_IPC:
1616
0
      ipc_handle_job(fm->fd);
1617
0
      break;
1618
0
    case F_NONE:
1619
0
      LM_CRIT("empty fd map\n");
1620
0
      goto error;
1621
0
    default:
1622
0
      LM_CRIT("unknown fd type %d\n", fm->type);
1623
0
      goto error;
1624
0
  }
1625
0
  pt_become_idle();
1626
0
  return ret;
1627
0
error:
1628
0
  pt_become_idle();
1629
0
  return -1;
1630
0
}
1631
1632
1633
/*
1634
 * iterates through all TCP connections and closes expired ones
1635
 * Note: runs once per second at most
1636
 */
1637
#define tcpconn_lifetime(last_sec) \
1638
  do { \
1639
    int now; \
1640
    now = get_ticks(); \
1641
    if (last_sec != now) { \
1642
      last_sec = now; \
1643
      __tcpconn_lifetime(0); \
1644
    } \
1645
  } while (0)
1646
1647
1648
/*! \brief very inefficient for now - FIXME
1649
 * keep in sync with tcpconn_destroy, the "delete" part should be
1650
 * the same except for io_watch_del..
1651
 * \todo FIXME (very inefficient for now)
1652
 */
1653
static inline void __tcpconn_lifetime(int shutdown)
1654
0
{
1655
0
  struct tcp_connection *c, *next;
1656
0
  unsigned int ticks,part;
1657
0
  unsigned h;
1658
0
  int fd;
1659
1660
0
  if (have_ticks())
1661
0
    ticks=get_ticks();
1662
0
  else
1663
0
    ticks=0;
1664
1665
0
  for( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) {
1666
0
    if (!shutdown) TCPCONN_LOCK(part); /* fixme: we can lock only on delete IMO */
1667
0
    for(h=0; h<TCP_ID_HASH_SIZE; h++){
1668
0
      c=TCP_PART(part).tcpconn_id_hash[h];
1669
0
      while(c){
1670
0
        next=c->id_next;
1671
0
        if (shutdown ||((c->refcnt==0) && (ticks>c->lifetime))) {
1672
0
          if (!shutdown)
1673
0
            LM_DBG("timeout for hash=%d - %p"
1674
0
                " (%d > %d)\n", h, c, ticks, c->lifetime);
1675
0
          fd=c->s;
1676
          /* report the closing of the connection . Note that
1677
           * there are connectioned that use an foced expire to 0
1678
           * as a way to be deleted - we are not interested in */
1679
          /* Also, do not trigger reporting when shutdown
1680
           * is done */
1681
0
          if (c->lifetime>0 && !shutdown)
1682
0
            tcp_trigger_report(c, TCP_REPORT_CLOSE,
1683
0
              "Timeout on no traffic");
1684
0
          if ((!shutdown)&&(fd>0)&&(c->refcnt==0)) {
1685
            /* if any of read or write are set, we need to remove
1686
             * the fd from the reactor */
1687
0
            if ((c->flags & F_CONN_REMOVED) != F_CONN_REMOVED){
1688
0
              reactor_del_all( fd, -1, IO_FD_CLOSING);
1689
0
              c->flags|=F_CONN_REMOVED;
1690
0
            }
1691
0
            close(fd);
1692
0
            c->s = -1;
1693
0
          }
1694
0
          _tcpconn_rm(c, shutdown?1:0);
1695
0
          tcp_connections_no--;
1696
0
        }
1697
0
        c=next;
1698
0
      }
1699
0
    }
1700
0
    if (!shutdown) TCPCONN_UNLOCK(part);
1701
0
  }
1702
0
}
1703
1704
1705
static void tcp_main_server(void)
1706
0
{
1707
0
  static unsigned int last_sec = 0;
1708
0
  int flags;
1709
0
  struct socket_info_full* sif;
1710
0
  int n;
1711
1712
  /* we run in a separate, dedicated process, with its own reactor
1713
   * (reactors are per process) */
1714
0
  if (init_worker_reactor("TCP_main", RCT_PRIO_MAX)<0)
1715
0
    goto error;
1716
1717
  /* now start watching all the fds */
1718
1719
  /* add all the sockets we listens on for connections */
1720
0
  for( n=PROTO_FIRST ; n<PROTO_LAST ; n++ )
1721
0
    if ( is_tcp_based_proto(n) )
1722
0
      for( sif=protos[n].listeners ; sif ; sif=sif->next ) {
1723
0
        struct socket_info* si = &sif->socket_info;
1724
0
        if (protos[n].tran.bind_listener && protos[n].tran.bind_listener(si)<0) {
1725
0
          LM_ERR("failed to bind listener [%.*s], proto %s\n",
1726
0
              si->name.len, si->name.s,
1727
0
              protos[n].name );
1728
0
          goto error;
1729
0
        }
1730
0
        if ( (si->socket!=-1) &&
1731
0
        reactor_add_reader( si->socket, F_TCP_LISTENER,
1732
0
        RCT_PRIO_NET, si)<0 ) {
1733
0
          LM_ERR("failed to add listen socket to reactor\n");
1734
0
          goto error;
1735
0
        }
1736
0
      }
1737
  /* add all the unix sockets used for communcation with other opensips
1738
   * processes (get fd, new connection a.s.o)
1739
   * NOTE: we add even the socks for the inactive/unfork processes - the
1740
   *       socks are already created, but the triggering is from proc to
1741
   *       main, having them into reactor is harmless - they will never
1742
   *       trigger as there is no proc on the other end to write us */
1743
0
  for (n=1; n<counted_max_processes; n++) {
1744
    /* skip myslef (as process) and -1 socks (disabled)
1745
       (we can't have 0, we never close it!) */
1746
0
    if (n!=process_no && pt[n].tcp_socks_holder[0]>0)
1747
0
      if (reactor_add_reader( pt[n].tcp_socks_holder[0], F_TCP_WORKER,
1748
0
      RCT_PRIO_PROC, &pt[n])<0){
1749
0
        LM_ERR("failed to add process %d (%s) unix socket "
1750
0
          "to the fd list\n", n, pt[n].desc);
1751
0
        goto error;
1752
0
      }
1753
0
  }
1754
  /* add all the unix sokets used for communication with the tcp workers */
1755
0
  for (n=0; n<tcp_workers_max_no; n++) {
1756
    /*we can't have 0, we never close it!*/
1757
0
    if (tcp_workers[n].unix_sock>0) {
1758
      /* make socket non-blocking */
1759
0
      flags=fcntl(tcp_workers[n].unix_sock, F_GETFL);
1760
0
      if (flags==-1){
1761
0
        LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
1762
0
        goto error;
1763
0
      }
1764
0
      if (fcntl(tcp_workers[n].unix_sock,F_SETFL,flags|O_NONBLOCK)==-1){
1765
0
        LM_ERR("set non-blocking failed: (%d) %s\n",
1766
0
          errno, strerror(errno));
1767
0
        goto error;
1768
0
      }
1769
      /* add socket for listening */
1770
0
      if (reactor_add_reader( tcp_workers[n].unix_sock,
1771
0
      F_TCP_TCPWORKER, RCT_PRIO_PROC, &tcp_workers[n])<0) {
1772
0
        LM_ERR("failed to add tcp worker %d unix socket to "
1773
0
            "the fd list\n", n);
1774
0
        goto error;
1775
0
      }
1776
0
    }
1777
0
  }
1778
1779
  /* init: start watching for the IPC jobs */
1780
0
  if (reactor_add_reader(IPC_FD_READ_SELF, F_IPC, RCT_PRIO_ASYNC, NULL)<0){
1781
0
    LM_CRIT("failed to add IPC pipe to reactor\n");
1782
0
    goto error;
1783
0
  }
1784
1785
0
  is_tcp_main = 1;
1786
1787
  /* main loop (requires "handle_io()" implementation) */
1788
0
  reactor_main_loop( TCP_MAIN_SELECT_TIMEOUT, error,
1789
0
      tcpconn_lifetime(last_sec) );
1790
1791
0
error:
1792
0
  destroy_worker_reactor();
1793
0
  LM_CRIT("exiting...");
1794
0
  exit(-1);
1795
0
}
1796
1797
1798
1799
/**************************** Control functions ******************************/
1800
1801
/* initializes the TCP network level in terms of data structures */
1802
int tcp_init(void)
1803
0
{
1804
0
  unsigned int i;
1805
1806
  /* first we do auto-detection to see if there are any TCP based
1807
   * protocols loaded */
1808
0
  for ( i=PROTO_FIRST ; i<PROTO_LAST ; i++ )
1809
0
    if (is_tcp_based_proto(i) && proto_has_listeners(i)) {
1810
0
      tcp_disabled=0;
1811
0
      break;
1812
0
    }
1813
1814
0
  tcp_init_con_profiles();
1815
1816
0
  if (tcp_disabled)
1817
0
    return 0;
1818
1819
#ifdef DBG_TCPCON
1820
  con_hist = shl_init("TCP con", 10000, 0);
1821
  if (!con_hist) {
1822
    LM_ERR("oom con hist\n");
1823
    goto error;
1824
  }
1825
#endif
1826
1827
0
  if (tcp_auto_scaling_profile) {
1828
0
    s_profile = get_scaling_profile(tcp_auto_scaling_profile);
1829
0
    if (s_profile==NULL) {
1830
0
      LM_WARN("TCP scaling profile <%s> not defined "
1831
0
        "-> ignoring it...\n", tcp_auto_scaling_profile);
1832
0
    } else {
1833
0
      auto_scaling_enabled = 1;
1834
0
    }
1835
0
  }
1836
1837
0
  tcp_workers_max_no = (s_profile && (tcp_workers_no<s_profile->max_procs)) ?
1838
0
    s_profile->max_procs : tcp_workers_no ;
1839
1840
  /* init tcp workers array */
1841
0
  tcp_workers = (struct tcp_worker*)shm_malloc
1842
0
    ( tcp_workers_max_no*sizeof(struct tcp_worker) );
1843
0
  if (tcp_workers==0) {
1844
0
    LM_CRIT("could not alloc tcp_workers array in shm memory\n");
1845
0
    goto error;
1846
0
  }
1847
0
  memset( tcp_workers, 0, tcp_workers_max_no*sizeof(struct tcp_worker));
1848
  /* init globals */
1849
0
  connection_id=(unsigned int*)shm_malloc(sizeof(unsigned int));
1850
0
  if (connection_id==0){
1851
0
    LM_CRIT("could not alloc globals in shm memory\n");
1852
0
    goto error;
1853
0
  }
1854
  // The  rand()  function returns a pseudo-random integer in the range 0 to
1855
  // RAND_MAX inclusive (i.e., the mathematical range [0, RAND_MAX]).
1856
0
  *connection_id=(unsigned int)rand();
1857
0
  memset( &tcp_parts, 0, TCP_PARTITION_SIZE*sizeof(struct tcp_partition));
1858
  /* init partitions */
1859
0
  for( i=0 ; i<TCP_PARTITION_SIZE ; i++ ) {
1860
    /* init lock */
1861
0
    tcp_parts[i].tcpconn_lock=lock_alloc();
1862
0
    if (tcp_parts[i].tcpconn_lock==0){
1863
0
      LM_CRIT("could not alloc lock\n");
1864
0
      goto error;
1865
0
    }
1866
0
    if (lock_init(tcp_parts[i].tcpconn_lock)==0){
1867
0
      LM_CRIT("could not init lock\n");
1868
0
      lock_dealloc((void*)tcp_parts[i].tcpconn_lock);
1869
0
      tcp_parts[i].tcpconn_lock=0;
1870
0
      goto error;
1871
0
    }
1872
    /* alloc hashtables*/
1873
0
    tcp_parts[i].tcpconn_aliases_hash=(struct tcp_conn_alias**)
1874
0
      shm_malloc(TCP_ALIAS_HASH_SIZE* sizeof(struct tcp_conn_alias*));
1875
0
    if (tcp_parts[i].tcpconn_aliases_hash==0){
1876
0
      LM_CRIT("could not alloc address hashtable in shm memory\n");
1877
0
      goto error;
1878
0
    }
1879
0
    tcp_parts[i].tcpconn_id_hash=(struct tcp_connection**)
1880
0
      shm_malloc(TCP_ID_HASH_SIZE*sizeof(struct tcp_connection*));
1881
0
    if (tcp_parts[i].tcpconn_id_hash==0){
1882
0
      LM_CRIT("could not alloc id hashtable in shm memory\n");
1883
0
      goto error;
1884
0
    }
1885
    /* init hashtables*/
1886
0
    memset((void*)tcp_parts[i].tcpconn_aliases_hash, 0,
1887
0
      TCP_ALIAS_HASH_SIZE * sizeof(struct tcp_conn_alias*));
1888
0
    memset((void*)tcp_parts[i].tcpconn_id_hash, 0,
1889
0
      TCP_ID_HASH_SIZE * sizeof(struct tcp_connection*));
1890
0
  }
1891
1892
0
  return 0;
1893
0
error:
1894
  /* clean-up */
1895
0
  tcp_destroy();
1896
0
  return -1;
1897
0
}
1898
1899
1900
/* destroys the TCP data */
1901
void tcp_destroy(void)
1902
0
{
1903
0
  int part;
1904
1905
0
  if (tcp_parts[0].tcpconn_id_hash)
1906
      /* force close/expire for all active tcpconns*/
1907
0
      __tcpconn_lifetime(1);
1908
1909
0
  if (connection_id){
1910
0
    shm_free(connection_id);
1911
0
    connection_id=0;
1912
0
  }
1913
1914
0
  for ( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) {
1915
0
    if (tcp_parts[part].tcpconn_id_hash){
1916
0
      shm_free(tcp_parts[part].tcpconn_id_hash);
1917
0
      tcp_parts[part].tcpconn_id_hash=0;
1918
0
    }
1919
0
    if (tcp_parts[part].tcpconn_aliases_hash){
1920
0
      shm_free(tcp_parts[part].tcpconn_aliases_hash);
1921
0
      tcp_parts[part].tcpconn_aliases_hash=0;
1922
0
    }
1923
0
    if (tcp_parts[part].tcpconn_lock){
1924
0
      lock_destroy(tcp_parts[part].tcpconn_lock);
1925
0
      lock_dealloc((void*)tcp_parts[part].tcpconn_lock);
1926
0
      tcp_parts[part].tcpconn_lock=0;
1927
0
    }
1928
0
  }
1929
0
}
1930
1931
1932
int tcp_create_comm_proc_socks( int proc_no)
1933
0
{
1934
0
  int i;
1935
1936
0
  if (tcp_disabled)
1937
0
    return 0;
1938
1939
0
  for( i=0 ; i<proc_no ; i++ ) {
1940
0
    if (socketpair(AF_UNIX, SOCK_STREAM, 0, pt[i].tcp_socks_holder)<0){
1941
0
      LM_ERR("socketpair failed for process %d: %d/%s\n",
1942
0
        i, errno, strerror(errno));
1943
0
      return -1;
1944
0
    }
1945
0
  }
1946
1947
0
  return 0;
1948
0
}
1949
1950
1951
int tcp_activate_comm_proc_socks( int proc_no)
1952
0
{
1953
0
  if (tcp_disabled)
1954
0
    return 0;
1955
1956
0
  unix_tcp_sock = pt[proc_no].tcp_socks_holder[1];
1957
0
  pt[proc_no].unix_sock = pt[proc_no].tcp_socks_holder[0];
1958
1959
0
  return 0;
1960
0
}
1961
1962
1963
void tcp_connect_proc_to_tcp_main( int proc_no, int worker )
1964
0
{
1965
0
  if (tcp_disabled)
1966
0
    return;
1967
1968
0
  if (worker) {
1969
0
    close( pt[proc_no].unix_sock );
1970
0
  } else {
1971
0
    unix_tcp_sock = -1;
1972
0
  }
1973
0
}
1974
1975
1976
int _get_own_tcp_worker_id(void)
1977
0
{
1978
0
  pid_t pid;
1979
0
  int i;
1980
1981
0
  pid = getpid();
1982
0
  for( i=0 ; i<tcp_workers_max_no ; i++)
1983
0
    if(tcp_workers[i].pid==pid)
1984
0
      return i;
1985
1986
0
  return -1;
1987
0
}
1988
1989
1990
void tcp_reset_worker_slot(void)
1991
0
{
1992
0
  int i;
1993
1994
0
  if ((i=_get_own_tcp_worker_id())>=0) {
1995
0
    tcp_workers[i].state=STATE_INACTIVE;
1996
0
    tcp_workers[i].pid=0;
1997
0
    tcp_workers[i].pt_idx=0;
1998
0
  }
1999
0
}
2000
2001
2002
static int fork_dynamic_tcp_process(void *foo)
2003
0
{
2004
0
  int p_id;
2005
0
  int r;
2006
0
  const struct internal_fork_params ifp_sr_tcp = {
2007
0
    .proc_desc = "SIP receiver TCP",
2008
0
    .flags = OSS_PROC_DYNAMIC|OSS_PROC_NEEDS_SCRIPT,
2009
0
    .type = TYPE_TCP,
2010
0
  };
2011
2012
  /* search for free slot in the TCP workers table */
2013
0
  for( r=0 ; r<tcp_workers_max_no ; r++ )
2014
0
    if (tcp_workers[r].state==STATE_INACTIVE)
2015
0
      break;
2016
2017
0
  if (r==tcp_workers_max_no) {
2018
0
    LM_BUG("trying to fork one more TCP worker but no free slots in "
2019
0
      "the TCP table (size=%d)\n",tcp_workers_max_no);
2020
0
    return -1;
2021
0
  }
2022
2023
0
  if((p_id=internal_fork(&ifp_sr_tcp))<0){
2024
0
    LM_ERR("cannot fork dynamic TCP worker process\n");
2025
0
    return(-1);
2026
0
  }else if (p_id==0){
2027
    /* new TCP process */
2028
0
    set_proc_attrs("TCP receiver");
2029
0
    tcp_workers[r].pid = getpid();
2030
2031
0
    if (tcp_worker_proc_reactor_init(tcp_workers[r].main_unix_sock)<0||
2032
0
    init_child(20000) < 0) {
2033
0
      goto error;
2034
0
    }
2035
2036
0
    report_conditional_status( 1, 0);/*report success*/
2037
    /* the child proc is done read&write) dealing with the status pipe */
2038
0
    clean_read_pipeend();
2039
2040
0
    tcp_worker_proc_loop();
2041
0
    destroy_worker_reactor();
2042
2043
0
error:
2044
0
    report_failure_status();
2045
0
    LM_ERR("Initializing new process failed, exiting with error \n");
2046
0
    pt[process_no].flags |= OSS_PROC_SELFEXIT;
2047
0
    exit( -1);
2048
0
  } else {
2049
    /*parent/main*/
2050
0
    tcp_workers[r].state=STATE_ACTIVE;
2051
0
    tcp_workers[r].n_reqs=0;
2052
0
    tcp_workers[r].pt_idx=p_id;
2053
0
    return p_id;
2054
0
  }
2055
2056
0
  return 0;
2057
0
}
2058
2059
2060
static void tcp_process_graceful_terminate(int sender, void *param)
2061
0
{
2062
0
  int i;
2063
2064
  /* we accept this only from the main proccess */
2065
0
  if (sender!=0) {
2066
0
    LM_BUG("graceful terminate received from a non-main process!!\n");
2067
0
    return;
2068
0
  }
2069
0
  LM_NOTICE("process %d received RPC to terminate from Main\n",process_no);
2070
2071
  /* going into "draining" state will avoid:
2072
   *  - getting jobs from TCP MAIN (active state required for that)
2073
   *  - having othe worker slot re-used (inactive state required for that) */
2074
0
  if ((i=_get_own_tcp_worker_id())>=0)
2075
0
    tcp_workers[i].state=STATE_DRAINING;
2076
2077
0
  tcp_terminate_worker();
2078
2079
0
  return;
2080
0
}
2081
2082
2083
/* counts the number of TCP processes to start with; this number may
2084
 * change during runtime due auto-scaling */
2085
int tcp_count_processes(unsigned int *extra)
2086
0
{
2087
0
  if (extra) *extra = 0;
2088
2089
0
  if (tcp_disabled)
2090
0
    return 0;
2091
2092
2093
0
  if (s_profile && extra) {
2094
    /* how many can be forked over the number of procs to start with ?*/
2095
0
    if (s_profile->max_procs > tcp_workers_no)
2096
0
      *extra = s_profile->max_procs - tcp_workers_no;
2097
0
  }
2098
2099
0
  return 1/* tcp main */ + tcp_workers_no /*workers to start with*/;
2100
0
}
2101
2102
2103
int tcp_start_processes(int *chd_rank, int *startup_done)
2104
0
{
2105
0
  int r, n, p_id;
2106
0
  int reader_fd[2]; /* for comm. with the tcp workers read  */
2107
0
  struct socket_info_full *sif;
2108
0
  const struct internal_fork_params ifp_sr_tcp = {
2109
0
    .proc_desc = "SIP receiver TCP",
2110
0
    .flags = OSS_PROC_NEEDS_SCRIPT,
2111
0
    .type = TYPE_TCP,
2112
0
  };
2113
2114
0
  if (tcp_disabled)
2115
0
    return 0;
2116
2117
  /* estimate max fd. no:
2118
   * 1 tcp send unix socket/all_proc,
2119
   *  + 1 udp sock/udp proc + 1 tcp_worker sock/tcp worker*
2120
   *  + no_listen_tcp */
2121
0
  for( r=0,n=PROTO_FIRST ; n<PROTO_LAST ; n++ )
2122
0
    if ( is_tcp_based_proto(n) )
2123
0
      for(sif=protos[n].listeners; sif ; sif=sif->next,r++ );
2124
2125
  /* create the socket pairs for ALL potential processes */
2126
0
  for(r=0; r<tcp_workers_max_no; r++){
2127
    /* create sock to communicate from TCP main to worker */
2128
0
    if (socketpair(AF_UNIX, SOCK_STREAM, 0, reader_fd)<0){
2129
0
      LM_ERR("socketpair failed: %s\n", strerror(errno));
2130
0
      goto error;
2131
0
    }
2132
0
    tcp_workers[r].unix_sock = reader_fd[0]; /* worker's end */
2133
0
    tcp_workers[r].main_unix_sock = reader_fd[1]; /* main's end */
2134
0
  }
2135
2136
0
  if ( auto_scaling_enabled && s_profile &&
2137
0
  create_process_group( TYPE_TCP, NULL, s_profile,
2138
0
  fork_dynamic_tcp_process, tcp_process_graceful_terminate)!=0)
2139
0
    LM_ERR("failed to create group of TCP processes for, "
2140
0
      "auto forking will not be possible\n");
2141
2142
  /* start the TCP workers */
2143
0
  for(r=0; r<tcp_workers_no; r++){
2144
0
    (*chd_rank)++;
2145
0
    p_id=internal_fork(&ifp_sr_tcp);
2146
0
    if (p_id<0){
2147
0
      LM_ERR("fork failed\n");
2148
0
      goto error;
2149
0
    }else if (p_id>0){
2150
      /* parent */
2151
0
      tcp_workers[r].state=STATE_ACTIVE;
2152
0
      tcp_workers[r].n_reqs=0;
2153
0
      tcp_workers[r].pt_idx=p_id;
2154
0
    }else{
2155
      /* child */
2156
0
      set_proc_attrs("TCP receiver");
2157
0
      tcp_workers[r].pid = getpid();
2158
0
      if (tcp_worker_proc_reactor_init(tcp_workers[r].main_unix_sock)<0||
2159
0
          init_child(*chd_rank) < 0) {
2160
0
        LM_ERR("init_children failed\n");
2161
0
        report_failure_status();
2162
0
        if (startup_done)
2163
0
          *startup_done = -1;
2164
0
        exit(-1);
2165
0
      }
2166
2167
      /* was startup route executed so far ? */
2168
0
      if (startup_done!=NULL && *startup_done==0 && r==0) {
2169
0
        LM_DBG("running startup for first TCP\n");
2170
0
        if(run_startup_route()< 0) {
2171
0
          LM_ERR("Startup route processing failed\n");
2172
0
          report_failure_status();
2173
0
          *startup_done = -1;
2174
0
          exit(-1);
2175
0
        }
2176
0
        *startup_done = 1;
2177
0
      }
2178
2179
0
      report_conditional_status( (!no_daemon_mode), 0);
2180
2181
0
      tcp_worker_proc_loop();
2182
0
    }
2183
0
  }
2184
2185
  /* wait for the startup route to be executed */
2186
0
  if (startup_done)
2187
0
    while (!(*startup_done)) {
2188
0
      usleep(5);
2189
0
      handle_sigs();
2190
0
    }
2191
2192
0
  return 0;
2193
0
error:
2194
0
  return -1;
2195
0
}
2196
2197
2198
int tcp_start_listener(void)
2199
0
{
2200
0
  int p_id;
2201
0
  const struct internal_fork_params ifp_tcp_main = {
2202
0
    .proc_desc = "TCP main",
2203
0
    .flags = 0,
2204
0
    .type = TYPE_NONE,
2205
0
  };
2206
2207
0
  if (tcp_disabled)
2208
0
    return 0;
2209
2210
  /* start the TCP manager process */
2211
0
  if ( (p_id=internal_fork(&ifp_tcp_main))<0 ) {
2212
0
    LM_CRIT("cannot fork tcp main process\n");
2213
0
    goto error;
2214
0
  }else if (p_id==0){
2215
      /* child */
2216
    /* close the TCP inter-process sockets */
2217
0
    close(unix_tcp_sock);
2218
0
    unix_tcp_sock = -1;
2219
0
    close(pt[process_no].unix_sock);
2220
0
    pt[process_no].unix_sock = -1;
2221
2222
0
    report_conditional_status( (!no_daemon_mode), 0);
2223
2224
0
    tcp_main_server();
2225
0
    exit(-1);
2226
0
  }
2227
2228
0
  return 0;
2229
0
error:
2230
0
  return -1;
2231
0
}
2232
2233
int tcp_has_async_write(void)
2234
0
{
2235
0
  return reactor_has_async();
2236
0
}
2237
2238
2239
/***************************** MI functions **********************************/
2240
2241
mi_response_t *mi_tcp_list_conns(const mi_params_t *params,
2242
            struct mi_handler *async_hdl)
2243
0
{
2244
0
  mi_response_t *resp;
2245
0
  mi_item_t *resp_obj;
2246
0
  mi_item_t *conns_arr, *conn_item;
2247
0
  struct tcp_connection *conn;
2248
0
  time_t _ts;
2249
0
  char date_buf[MI_DATE_BUF_LEN];
2250
0
  int date_buf_len;
2251
0
  unsigned int i,j,part;
2252
0
  char proto[PROTO_NAME_MAX_SIZE];
2253
0
  struct tm ltime;
2254
0
  char *p;
2255
2256
0
  if (tcp_disabled)
2257
0
    return init_mi_result_null();
2258
2259
0
  resp = init_mi_result_object(&resp_obj);
2260
0
  if (!resp)
2261
0
    return 0;
2262
2263
0
  conns_arr = add_mi_array(resp_obj, MI_SSTR("Connections"));
2264
0
  if (!conns_arr) {
2265
0
    free_mi_response(resp);
2266
0
    return 0;
2267
0
  }
2268
2269
0
  for( part=0 ; part<TCP_PARTITION_SIZE ; part++) {
2270
0
    TCPCONN_LOCK(part);
2271
0
    for( i=0; i<TCP_ID_HASH_SIZE ; i++ ) {
2272
0
      for(conn=TCP_PART(part).tcpconn_id_hash[i];conn;conn=conn->id_next){
2273
        /* add one object fo each conn */
2274
0
        conn_item = add_mi_object(conns_arr, 0, 0);
2275
0
        if (!conn_item)
2276
0
          goto error;
2277
2278
        /* add ID */
2279
0
        if (add_mi_number(conn_item, MI_SSTR("ID"), conn->id) < 0)
2280
0
          goto error;
2281
2282
        /* add type/proto */
2283
0
        p = proto2str(conn->type, proto);
2284
0
        if (add_mi_string(conn_item, MI_SSTR("Type"), proto,
2285
0
          (int)(long)(p-proto)) > 0)
2286
0
          goto error;
2287
2288
        /* add state */
2289
0
        if (add_mi_number(conn_item, MI_SSTR("State"), conn->state) < 0)
2290
0
          goto error;
2291
2292
        /* add Remote IP:Port */
2293
0
        if (add_mi_string_fmt(conn_item, MI_SSTR("Remote"), "%s:%d",
2294
0
          ip_addr2a(&conn->rcv.src_ip), conn->rcv.src_port) < 0)
2295
0
          goto error;
2296
2297
        /* add Local IP:Port */
2298
0
        if (add_mi_string_fmt(conn_item, MI_SSTR("Local"), "%s:%d",
2299
0
          ip_addr2a(&conn->rcv.dst_ip), conn->rcv.dst_port) < 0)
2300
0
          goto error;
2301
2302
        /* add lifetime */
2303
0
        _ts = (time_t)conn->lifetime + startup_time;
2304
0
        localtime_r(&_ts, &ltime);
2305
0
        date_buf_len = strftime(date_buf, MI_DATE_BUF_LEN - 1,
2306
0
                    "%Y-%m-%d %H:%M:%S", &ltime);
2307
0
        if (date_buf_len != 0) {
2308
0
          if (add_mi_string(conn_item, MI_SSTR("Lifetime"),
2309
0
            date_buf, date_buf_len) < 0)
2310
0
            goto error;
2311
0
        } else {
2312
0
          if (add_mi_number(conn_item, MI_SSTR("Lifetime"), _ts) < 0)
2313
0
            goto error;
2314
0
        }
2315
2316
        /* add the port-aliases */
2317
0
        for( j=0 ; j<conn->aliases ; j++ )
2318
          /* add one node for each conn */
2319
0
          add_mi_number( conn_item, MI_SSTR("Alias port"),
2320
0
            conn->con_aliases[j].port );
2321
2322
0
      }
2323
0
    }
2324
2325
0
    TCPCONN_UNLOCK(part);
2326
0
  }
2327
2328
0
  return resp;
2329
2330
0
error:
2331
0
  TCPCONN_UNLOCK(part);
2332
0
  LM_ERR("failed to add MI item\n");
2333
0
  free_mi_response(resp);
2334
0
  return 0;
2335
0
}