Coverage Report

Created: 2025-07-09 06:29

/src/opensips/net/net_tcp.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2014-2015 OpenSIPS Project
3
 * Copyright (C) 2001-2003 FhG Fokus
4
 *
5
 * This file is part of opensips, a free SIP server.
6
 *
7
 * opensips is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version.
11
 *
12
 * opensips is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20
 *
21
 *
22
 * history:
23
 * ---------
24
 *  2015-01-xx  created (razvanc)
25
 */
26
27
#include <sys/types.h>
28
#include <sys/socket.h>
29
#include <netinet/in.h>
30
#include <netinet/in_systm.h>
31
#include <netinet/ip.h>
32
#include <netinet/tcp.h>
33
#include <sys/uio.h>  /* writev*/
34
#include <netdb.h>
35
#include <stdlib.h> /*exit() */
36
#include <time.h>   /*time() */
37
#include <unistd.h>
38
#include <errno.h>
39
#include <string.h>
40
41
#include "../mem/mem.h"
42
#include "../mem/shm_mem.h"
43
#include "../globals.h"
44
#include "../locking.h"
45
#include "../socket_info.h"
46
#include "../ut.h"
47
#include "../pt.h"
48
#include "../pt_load.h"
49
#include "../daemonize.h"
50
#include "../status_report.h"
51
#include "../reactor.h"
52
#include "../timer.h"
53
#include "../ipc.h"
54
55
#include "tcp_passfd.h"
56
#include "net_tcp_proc.h"
57
#include "net_tcp_report.h"
58
#include "net_tcp.h"
59
#include "tcp_conn.h"
60
#include "tcp_conn_profile.h"
61
#include "trans.h"
62
#include "net_tcp_dbg.h"
63
64
struct struct_hist_list *con_hist;
65
66
enum tcp_worker_state { STATE_INACTIVE=0, STATE_ACTIVE, STATE_DRAINING};
67
68
/* definition of a TCP worker - the array of these TCP workers is
69
 * mainly intended to be used by the TCP main, to keep track of the
70
 * workers, about their load and so. Nevertheless, since the addition
71
 * of the process auto-scaling, other processes may need access to this
72
 * data, thus it's relocation in SHM (versus initial PKG). For example,
73
 * the attendant process is the one forking new TCP workers (scaling up),
74
 * so it must be able to set the ENABLE state for the TCP worker (and being
75
 * (seen by the TCP main proc). Similar, when a TCP worker shuts down, it has
76
 * to mark itself as DISABLED and the TCP main must see that.
77
 * Again, 99% this array is intended for TCP Main ops, it is not lock
78
 * protected, so be very careful with any ops from other procs.
79
 */
80
struct tcp_worker {
81
  pid_t pid;
82
  int unix_sock;    /*!< Main-Worker comm, worker end */
83
  int main_unix_sock; /*!< Main-Worker comm, TCP Main end */
84
  int pt_idx;     /*!< Index in the main Process Table */
85
  enum tcp_worker_state state;
86
  int n_reqs;   /*!< number of requests serviced so far */
87
};
88
89
/* definition of a TCP partition */
90
struct tcp_partition {
91
  /*! \brief connection hash table (after ip&port), includes also aliases */
92
  struct tcp_conn_alias** tcpconn_aliases_hash;
93
  /*! \brief connection hash table (after connection id) */
94
  struct tcp_connection** tcpconn_id_hash;
95
  gen_lock_t* tcpconn_lock;
96
};
97
98
99
/* array of TCP workers - to be used only by TCP MAIN */
100
struct tcp_worker *tcp_workers=0;
101
102
/* unique for each connection, used for
103
 * quickly finding the corresponding connection for a reply */
104
static unsigned int* connection_id=0;
105
106
/* array of TCP partitions */
107
static struct tcp_partition tcp_parts[TCP_PARTITION_SIZE];
108
109
/*!< tcp protocol number as returned by getprotobyname */
110
static int tcp_proto_no=-1;
111
112
/* communication socket from generic proc to TCP main */
113
int unix_tcp_sock = -1;
114
115
/*!< current number of open connections */
116
static int tcp_connections_no = 0;
117
118
/*!< by default don't accept aliases */
119
int tcp_accept_aliases=0;
120
int tcp_connect_timeout=DEFAULT_TCP_CONNECT_TIMEOUT;
121
int tcp_con_lifetime=DEFAULT_TCP_CONNECTION_LIFETIME;
122
int tcp_socket_backlog=DEFAULT_TCP_SOCKET_BACKLOG;
123
/*!< by default choose the best method */
124
enum poll_types tcp_poll_method=0;
125
int tcp_max_connections=DEFAULT_TCP_MAX_CONNECTIONS;
126
/* the configured/starting number of TCP workers */
127
int tcp_workers_no = UDP_WORKERS_NO;
128
/* the maximum numbers of TCP workers */
129
int tcp_workers_max_no;
130
/* the name of the auto-scaling profile (optional) */
131
char* tcp_auto_scaling_profile = NULL;
132
/* Max number of seconds that we except a full SIP message
133
 * to arrive in - anything above will lead to the connection to closed */
134
int tcp_max_msg_time = TCP_CHILD_MAX_MSG_TIME;
135
/* If the data reading may be performed across different workers (still
136
 * serial) or by a single worker (the TCP conns sticks to one worker) */
137
int tcp_parallel_read_on_workers = 0;
138
139
#ifdef HAVE_SO_KEEPALIVE
140
    int tcp_keepalive = 1;
141
#else
142
    int tcp_keepalive = 0;
143
#endif
144
int tcp_keepcount = 0;
145
int tcp_keepidle = 0;
146
int tcp_keepinterval = 0;
147
148
/*!< should we allow opening a new TCP conn when sending data 
149
 * over UAC branches? - branch flag to be set in the SIP messages */
150
int tcp_no_new_conn_bflag = 0;
151
/*!< should we allow opening a new TCP conn when sending data 
152
 * back to UAS (replies)? - msg flag to be set in the SIP messages */
153
int tcp_no_new_conn_rplflag = 0;
154
/*!< should a new TCP conn be open if needed? - variable used to used for
155
 * signalizing between SIP layer (branch flag) and TCP layer (tcp_send func)*/
156
int tcp_no_new_conn = 0;
157
158
/* if the TCP net layer is on or off (if no TCP based protos are loaded) */
159
static int tcp_disabled = 1;
160
161
/* is the process TCP MAIN ? */
162
int is_tcp_main = 0;
163
164
/* the ID of the TCP conn used for the last send operation in the
165
 * current process - attention, this is a really ugly HACK here */
166
unsigned int last_outgoing_tcp_id = 0;
167
168
static struct scaling_profile *s_profile = NULL;
169
170
/****************************** helper functions *****************************/
171
extern void handle_sigs(void);
172
173
static inline int init_sock_keepalive(int s, const struct tcp_conn_profile *prof)
174
0
{
175
0
  int ka;
176
0
#if defined(HAVE_TCP_KEEPINTVL) || defined(HAVE_TCP_KEEPIDLE) || defined(HAVE_TCP_KEEPCNT)
177
0
  int optval;
178
0
#endif
179
180
0
  if (prof->keepinterval || prof->keepidle || prof->keepcount)
181
0
    ka = 1; /* force on */
182
0
  else
183
0
    ka = prof->keepalive;
184
185
0
#ifdef HAVE_SO_KEEPALIVE
186
0
  if (setsockopt(s,SOL_SOCKET,SO_KEEPALIVE,&ka,sizeof(ka))<0){
187
0
    LM_WARN("setsockopt failed to enable SO_KEEPALIVE: %s\n",
188
0
      strerror(errno));
189
0
    return -1;
190
0
  }
191
0
  LM_DBG("TCP keepalive enabled on socket %d\n",s);
192
0
#endif
193
0
#ifdef HAVE_TCP_KEEPINTVL
194
0
  if ((optval = prof->keepinterval)) {
195
0
    if (setsockopt(s,IPPROTO_TCP,TCP_KEEPINTVL,&optval,sizeof(optval))<0){
196
0
      LM_WARN("setsockopt failed to set keepalive probes interval: %s\n",
197
0
        strerror(errno));
198
0
    }
199
0
  }
200
0
#endif
201
0
#ifdef HAVE_TCP_KEEPIDLE
202
0
  if ((optval = prof->keepidle)) {
203
0
    if (setsockopt(s,IPPROTO_TCP,TCP_KEEPIDLE,&optval,sizeof(optval))<0){
204
0
      LM_WARN("setsockopt failed to set keepalive idle interval: %s\n",
205
0
        strerror(errno));
206
0
    }
207
0
  }
208
0
#endif
209
0
#ifdef HAVE_TCP_KEEPCNT
210
0
  if ((optval = prof->keepcount)) {
211
0
    if (setsockopt(s,IPPROTO_TCP,TCP_KEEPCNT,&optval,sizeof(optval))<0){
212
0
      LM_WARN("setsockopt failed to set maximum keepalive count: %s\n",
213
0
        strerror(errno));
214
0
    }
215
0
  }
216
0
#endif
217
0
  return 0;
218
0
}
219
220
static inline void set_sock_reuseport(int s)
221
0
{
222
0
  int yes = 1;
223
224
0
  if (setsockopt(s,SOL_SOCKET,SO_REUSEPORT,&yes,sizeof(yes))<0){
225
0
    LM_WARN("setsockopt failed to set SO_REUSEPORT: %s\n",
226
0
      strerror(errno));
227
0
  }
228
0
  if (setsockopt(s,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))<0){
229
0
    LM_WARN("setsockopt failed to set SO_REUSEADDR: %s\n",
230
0
      strerror(errno));
231
0
  }
232
0
}
233
234
/*! \brief Set all socket/fd options:  disable nagle, tos lowdelay,
235
 * non-blocking
236
 * \return -1 on error */
237
int tcp_init_sock_opt(int s, const struct tcp_conn_profile *prof, enum si_flags socketflags, int sock_tos)
238
0
{
239
0
  int flags;
240
0
  int optval;
241
242
0
#ifdef DISABLE_NAGLE
243
0
  flags=1;
244
0
  if ( (tcp_proto_no!=-1) && (setsockopt(s, tcp_proto_no , TCP_NODELAY,
245
0
          &flags, sizeof(flags))<0) ){
246
0
    LM_WARN("could not disable Nagle: %s\n", strerror(errno));
247
0
  }
248
0
#endif
249
  /* tos*/
250
0
  optval = (sock_tos > 0) ? sock_tos : tos;
251
0
  if (optval > 0) {
252
0
    if (setsockopt(s, IPPROTO_IP, IP_TOS, (void*)&optval,sizeof(optval)) ==-1){
253
0
      LM_WARN("setsockopt tos: %s\n",  strerror(errno));
254
      /* continue since this is not critical */
255
0
    }
256
0
  }
257
258
0
  if (probe_max_sock_buff(s,1,MAX_SEND_BUFFER_SIZE,BUFFER_INCREMENT)) {
259
0
    LM_WARN("setsockopt tcp snd buff: %s\n", strerror(errno));
260
    /* continue since this is not critical */
261
0
  }
262
263
0
  init_sock_keepalive(s, prof);
264
0
  if (socketflags & SI_REUSEPORT)
265
0
    set_sock_reuseport(s);
266
267
  /* non-blocking */
268
0
  flags=fcntl(s, F_GETFL);
269
0
  if (flags==-1){
270
0
    LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
271
0
    goto error;
272
0
  }
273
0
  if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){
274
0
    LM_ERR("set non-blocking failed: (%d) %s\n", errno, strerror(errno));
275
0
    goto error;
276
0
  }
277
0
  return 0;
278
0
error:
279
0
  return -1;
280
0
}
281
282
static int send2worker(struct tcp_connection* tcpconn,int rw)
283
0
{
284
0
  int i;
285
0
  int min_load;
286
0
  int idx;
287
0
  long response[2];
288
0
  unsigned int load;
289
290
0
  min_load=100; /* it is a percentage */
291
0
  idx=0;
292
0
  for (i=0; i<tcp_workers_max_no; i++){
293
0
    if (tcp_workers[i].state==STATE_ACTIVE) {
294
0
      load = pt_get_1m_proc_load( tcp_workers[i].pt_idx );
295
#ifdef EXTRA_DEBUG
296
      LM_DBG("checking TCP worker %d (proc %d), with load %u,"
297
        "min_load so far %u\n", i, tcp_workers[i].pt_idx, load,
298
        min_load);
299
#endif
300
0
      if (min_load>load) {
301
0
        min_load = load;
302
0
        idx = i;
303
0
      }
304
0
    }
305
0
  }
306
307
0
  tcp_workers[idx].n_reqs++;
308
0
  LM_DBG("to tcp worker %d (%d/%d) load %u, %p/%d rw %d\n", idx,
309
0
    tcp_workers[idx].pid, tcp_workers[idx].pt_idx, min_load,
310
0
    tcpconn, tcpconn->s, rw);
311
0
  response[0]=(long)tcpconn;
312
0
  response[1]=rw;
313
0
  if (send_fd(tcp_workers[idx].unix_sock, response, sizeof(response),
314
0
      tcpconn->s)<=0){
315
0
    LM_ERR("send_fd failed\n");
316
0
    return -1;
317
0
  }
318
319
0
  return 0;
320
0
}
321
322
323
324
/********************** TCP conn management functions ************************/
325
326
/* initializes an already defined TCP listener */
327
int tcp_init_listener(struct socket_info *si)
328
0
{
329
0
  union sockaddr_union* addr;
330
0
  int optval;
331
0
#ifdef DISABLE_NAGLE
332
0
  int flag;
333
0
  struct protoent* pe;
334
335
0
  if (tcp_proto_no==-1){ /* if not already set */
336
0
    pe=getprotobyname("tcp");
337
0
    if (pe==0){
338
0
      LM_ERR("could not get TCP protocol number\n");
339
0
      tcp_proto_no=-1;
340
0
    }else{
341
0
      tcp_proto_no=pe->p_proto;
342
0
    }
343
0
  }
344
0
#endif
345
346
0
  addr = &si->su;
347
0
  if (init_su(addr, &si->address, si->port_no)<0){
348
0
    LM_ERR("could no init sockaddr_union\n");
349
0
    goto error;
350
0
  }
351
0
  si->socket = socket(AF2PF(addr->s.sa_family), SOCK_STREAM, 0);
352
0
  if (si->socket==-1){
353
0
    LM_ERR("socket failed with [%s]\n", strerror(errno));
354
0
    goto error;
355
0
  }
356
0
#ifdef DISABLE_NAGLE
357
0
  flag=1;
358
0
  if ( (tcp_proto_no!=-1) &&
359
0
     (setsockopt(si->socket, tcp_proto_no , TCP_NODELAY,
360
0
           &flag, sizeof(flag))<0) ){
361
0
    LM_ERR("could not disable Nagle: %s\n",strerror(errno));
362
0
  }
363
0
#endif
364
365
0
#if  !defined(TCP_DONT_REUSEADDR)
366
  /* Stevens, "Network Programming", Section 7.5, "Generic Socket
367
   * Options": "...server started,..a child continues..on existing
368
   * connection..listening server is restarted...call to bind fails
369
   * ... ALL TCP servers should specify the SO_REUSEADDRE option
370
   * to allow the server to be restarted in this situation
371
   */
372
0
  optval=1;
373
0
  if (setsockopt(si->socket, SOL_SOCKET, SO_REUSEADDR,
374
0
  (void*)&optval, sizeof(optval))==-1) {
375
0
    LM_ERR("setsockopt failed with [%s]\n", strerror(errno));
376
0
    goto error;
377
0
  }
378
0
#endif
379
  /* tos */
380
0
  optval = (si->tos > 0) ? si->tos : tos;
381
0
  if (optval > 0) {
382
0
    if (setsockopt(si->socket, IPPROTO_IP, IP_TOS, (void*)&optval,
383
0
    sizeof(optval)) ==-1){
384
0
      LM_WARN("setsockopt tos: %s\n", strerror(errno));
385
      /* continue since this is not critical */
386
0
    }
387
0
  }
388
389
0
  if (probe_max_sock_buff(si->socket,1,MAX_SEND_BUFFER_SIZE,
390
0
  BUFFER_INCREMENT)) {
391
0
    LM_WARN("setsockopt tcp snd buff: %s\n",strerror(errno));
392
    /* continue since this is not critical */
393
0
  }
394
395
0
  init_sock_keepalive(si->socket, &tcp_con_df_profile);
396
0
  if (si->flags & SI_REUSEPORT)
397
0
    set_sock_reuseport(si->socket);
398
0
  if (bind(si->socket, &addr->s, sockaddru_len(*addr))==-1){
399
0
    LM_ERR("bind(%x, %p, %d) on %s:%d : %s\n",
400
0
        si->socket, &addr->s,
401
0
        (unsigned)sockaddru_len(*addr),
402
0
        si->address_str.s,
403
0
        si->port_no,
404
0
        strerror(errno));
405
0
    goto error;
406
0
  }
407
0
  if (listen(si->socket, tcp_socket_backlog)==-1){
408
0
    LM_ERR("listen(%x, %p, %d) on %s: %s\n",
409
0
        si->socket, &addr->s,
410
0
        (unsigned)sockaddru_len(*addr),
411
0
        si->address_str.s,
412
0
        strerror(errno));
413
0
    goto error;
414
0
  }
415
416
0
  return 0;
417
0
error:
418
0
  if (si->socket!=-1){
419
0
    close(si->socket);
420
0
    si->socket=-1;
421
0
  }
422
0
  return -1;
423
0
}
424
425
426
/*! \brief finds a connection, if id=0 return NULL
427
 * \note WARNING: unprotected (locks) use tcpconn_get unless you really
428
 * know what you are doing */
429
static struct tcp_connection* _tcpconn_find(unsigned int id)
430
0
{
431
0
  struct tcp_connection *c;
432
0
  unsigned hash;
433
434
0
  if (id){
435
0
    hash=tcp_id_hash(id);
436
0
    for (c=TCP_PART(id).tcpconn_id_hash[hash]; c; c=c->id_next){
437
#ifdef EXTRA_DEBUG
438
      LM_DBG("c=%p, c->id=%u, port=%d\n",c, c->id, c->rcv.src_port);
439
      print_ip("ip=", &c->rcv.src_ip, "\n");
440
#endif
441
0
      if ((id==c->id)&&(c->state!=S_CONN_BAD)) return c;
442
0
    }
443
0
  }
444
0
  return 0;
445
0
}
446
447
448
/* returns the correlation ID of a TCP connection */
449
int tcp_get_correlation_id( unsigned int id, unsigned long long *cid)
450
0
{
451
0
  struct tcp_connection* c;
452
453
0
  TCPCONN_LOCK(id);
454
0
  if ( (c=_tcpconn_find(id))!=NULL ) {
455
0
    *cid = c->cid;
456
0
    TCPCONN_UNLOCK(id);
457
0
    return 0;
458
0
  }
459
0
  *cid = 0;
460
0
  TCPCONN_UNLOCK(id);
461
0
  return -1;
462
0
}
463
464
465
/*! \brief _tcpconn_find with locks and acquire fd */
466
int tcp_conn_get(unsigned int id, struct ip_addr* ip, int port,
467
    enum sip_protos proto, void *proto_extra_id,
468
    struct tcp_connection** conn, int* conn_fd,
469
    const struct socket_info* send_sock)
470
0
{
471
0
  struct tcp_connection* c;
472
0
  struct tcp_connection* tmp;
473
0
  struct tcp_conn_alias* a;
474
0
  unsigned hash;
475
0
  long response[2];
476
0
  unsigned int part;
477
0
  int n;
478
0
  int fd;
479
480
0
  if (id) {
481
0
    part = id;
482
0
    TCPCONN_LOCK(part);
483
0
    if ( (c=_tcpconn_find(part))!=NULL )
484
0
      goto found;
485
0
    TCPCONN_UNLOCK(part);
486
0
  }
487
488
  /* continue search based on IP address + port + transport */
489
#ifdef EXTRA_DEBUG
490
  LM_DBG("%d  port %u\n",id, port);
491
  if (ip) print_ip("tcpconn_find: ip ", ip, "\n");
492
#endif
493
0
  if (ip){
494
0
    hash=tcp_addr_hash(ip, port);
495
0
    for( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) {
496
0
      TCPCONN_LOCK(part);
497
0
      for (a=TCP_PART(part).tcpconn_aliases_hash[hash]; a; a=a->next) {
498
#ifdef EXTRA_DEBUG
499
        LM_DBG("a=%p, c=%p, c->id=%u, alias port= %d port=%d\n",
500
          a, a->parent, a->parent->id, a->port,
501
          a->parent->rcv.src_port);
502
        print_ip("ip=",&a->parent->rcv.src_ip,"\n");
503
        if (send_sock && a->parent->rcv.bind_address) {
504
          print_ip("requested send_sock ip=", &send_sock->address,"\n");
505
          print_ip("found send_sock ip=", &a->parent->rcv.bind_address->address,"\n");
506
        }
507
#endif
508
0
        c = a->parent;
509
0
        if (c->state != S_CONN_BAD &&
510
0
            c->flags&F_CONN_INIT &&
511
0
            (send_sock==NULL || send_sock == a->parent->rcv.bind_address) &&
512
0
            port == a->port &&
513
0
            proto == c->type &&
514
0
            ip_addr_cmp(ip, &c->rcv.src_ip) &&
515
0
            (proto_extra_id==NULL ||
516
0
            protos[proto].net.stream.conn.match==NULL ||
517
0
            protos[proto].net.stream.conn.match( c, proto_extra_id)) )
518
0
          goto found;
519
0
      }
520
0
      TCPCONN_UNLOCK(part);
521
0
    }
522
0
  }
523
524
  /* not found */
525
0
  *conn = NULL;
526
0
  if (conn_fd) *conn_fd = -1;
527
0
  return 0;
528
529
0
found:
530
0
  c->refcnt++;
531
0
  TCPCONN_UNLOCK(part);
532
0
  sh_log(c->hist, TCP_REF, "tcp_conn_get, (%d)", c->refcnt);
533
534
0
  LM_DBG("con found in state %d\n",c->state);
535
536
0
  if (c->state!=S_CONN_OK || conn_fd==NULL) {
537
    /* no need to acquired, just return the conn with an invalid fd */
538
0
    *conn = c;
539
0
    if (conn_fd) *conn_fd = -1;
540
0
    return 1;
541
0
  }
542
543
0
  if (c->proc_id == process_no) {
544
0
    LM_DBG("tcp connection found (%p) already in this process ( %d ) ,"
545
0
      " fd = %d\n", c, c->proc_id, c->fd);
546
    /* we already have the connection in this worker's reactor, */
547
    /* no need to acquire FD */
548
0
    *conn = c;
549
0
    *conn_fd = c->fd;
550
0
    return 1;
551
0
  }
552
553
  /* acquire the fd for this connection too */
554
0
  LM_DBG("tcp connection found (%p), acquiring fd\n", c);
555
  /* get the fd */
556
0
  response[0]=(long)c;
557
0
  response[1]=CONN_GET_FD;
558
0
  n=send_all(unix_tcp_sock, response, sizeof(response));
559
0
  if (n<=0){
560
0
    LM_ERR("failed to get fd(write):%s (%d)\n",
561
0
        strerror(errno), errno);
562
0
    n=-1;
563
0
    goto error;
564
0
  }
565
0
  LM_DBG("c= %p, n=%d, Usock=%d\n", c, n, unix_tcp_sock);
566
0
  tmp = c;
567
0
  n=receive_fd(unix_tcp_sock, &c, sizeof(c), &fd, MSG_WAITALL);
568
0
  if (n<=0){
569
0
    LM_ERR("failed to get fd(receive_fd):"
570
0
      " %s (%d)\n", strerror(errno), errno);
571
0
    n=-1;
572
0
    goto error;
573
0
  }
574
0
  if (c!=tmp){
575
0
    LM_CRIT("got different connection:"
576
0
      "  %p (id= %u, refcnt=%d state=%d != "
577
0
      "  %p (id= %u, refcnt=%d state=%d (n=%d)\n",
578
0
        c,   c->id,   c->refcnt,   c->state,
579
0
        tmp, tmp->id, tmp->refcnt, tmp->state, n
580
0
       );
581
0
    n=-1; /* fail */
582
0
    close(fd);
583
0
    goto error;
584
0
  }
585
0
  LM_DBG("after receive_fd: c= %p n=%d fd=%d\n",c, n, fd);
586
587
0
  *conn = c;
588
0
  *conn_fd = fd;
589
590
0
  return 1;
591
0
error:
592
0
  tcpconn_put(c);
593
0
  sh_log(c->hist, TCP_UNREF, "tcp_conn_get, (%d)", c->refcnt);
594
0
  *conn = NULL;
595
0
  *conn_fd = -1;
596
0
  return -1;
597
0
}
598
599
600
/* used to tune the tcp_connection attributes - not to be used inside the
601
   network layer, but onlu from the above layer (otherwise we may end up
602
   in strange deadlocks!) */
603
int tcp_conn_fcntl(struct receive_info *rcv, int attr, void *value)
604
0
{
605
0
  struct tcp_connection *con;
606
607
0
  switch (attr) {
608
0
  case DST_FCNTL_SET_LIFETIME:
609
    /* set connection timeout */
610
0
    TCPCONN_LOCK(rcv->proto_reserved1);
611
0
    con =_tcpconn_find(rcv->proto_reserved1);
612
0
    if (!con) {
613
0
      LM_ERR("Strange, tcp conn not found (id=%u)\n",
614
0
        rcv->proto_reserved1);
615
0
    } else {
616
0
      tcp_conn_set_lifetime( con, (int)(long)(value));
617
0
    }
618
0
    TCPCONN_UNLOCK(rcv->proto_reserved1);
619
0
    return 0;
620
0
  default:
621
0
    LM_ERR("unsupported operation %d on conn\n",attr);
622
0
    return -1;
623
0
  }
624
0
  return -1;
625
0
}
626
627
628
static struct tcp_connection* tcpconn_add(struct tcp_connection *c)
629
0
{
630
0
  unsigned hash;
631
632
0
  if (c){
633
0
    TCPCONN_LOCK(c->id);
634
    /* add it at the beginning of the list*/
635
0
    hash=tcp_id_hash(c->id);
636
0
    c->id_hash=hash;
637
0
    tcpconn_listadd(TCP_PART(c->id).tcpconn_id_hash[hash], c, id_next,
638
0
      id_prev);
639
640
0
    hash=tcp_addr_hash(&c->rcv.src_ip, c->rcv.src_port);
641
    /* set the first alias */
642
0
    c->con_aliases[0].port=c->rcv.src_port;
643
0
    c->con_aliases[0].hash=hash;
644
0
    c->con_aliases[0].parent=c;
645
0
    tcpconn_listadd(TCP_PART(c->id).tcpconn_aliases_hash[hash],
646
0
      &c->con_aliases[0], next, prev);
647
0
    c->aliases++;
648
0
    TCPCONN_UNLOCK(c->id);
649
0
    LM_DBG("hashes: %d, %d\n", hash, c->id_hash);
650
0
    return c;
651
0
  }else{
652
0
    LM_CRIT("null connection pointer\n");
653
0
    return 0;
654
0
  }
655
0
}
656
657
static str e_tcp_src_ip = str_init("src_ip");
658
static str e_tcp_src_port = str_init("src_port");
659
static str e_tcp_dst_ip = str_init("dst_ip");
660
static str e_tcp_dst_port = str_init("dst_port");
661
static str e_tcp_c_proto = str_init("proto");
662
663
static void tcp_disconnect_event_raise(struct tcp_connection* c)
664
0
{
665
0
  evi_params_p list = 0;
666
0
  str src_ip,dst_ip, proto;
667
0
  int src_port,dst_port;
668
0
  char src_ip_buf[IP_ADDR_MAX_STR_SIZE],dst_ip_buf[IP_ADDR_MAX_STR_SIZE];
669
670
  // event has to be triggered - check for subscribers
671
0
  if (!evi_probe_event(EVI_TCP_DISCONNECT)) {
672
0
    goto end;
673
0
  }
674
675
0
  if (!(list = evi_get_params()))
676
0
    goto end;
677
678
0
  src_ip.s = ip_addr2a( &c->rcv.src_ip );
679
0
  memcpy(src_ip_buf,src_ip.s,IP_ADDR_MAX_STR_SIZE);
680
0
  src_ip.s = src_ip_buf;
681
0
  src_ip.len = strlen(src_ip.s);
682
683
0
  if (evi_param_add_str(list, &e_tcp_src_ip, &src_ip)) {
684
0
    LM_ERR("unable to add parameter\n");
685
0
    goto end;
686
0
  }
687
688
0
  src_port = c->rcv.src_port;
689
690
0
  if (evi_param_add_int(list, &e_tcp_src_port, &src_port)) {
691
0
    LM_ERR("unable to add parameter\n");
692
0
    goto end;
693
0
  }
694
695
0
  dst_ip.s = ip_addr2a( &c->rcv.dst_ip );
696
0
  memcpy(dst_ip_buf,dst_ip.s,IP_ADDR_MAX_STR_SIZE);
697
0
  dst_ip.s = dst_ip_buf;
698
0
  dst_ip.len = strlen(dst_ip.s);
699
700
0
  if (evi_param_add_str(list, &e_tcp_dst_ip, &dst_ip)) {
701
0
    LM_ERR("unable to add parameter\n");
702
0
    goto end;
703
0
  }
704
705
0
  dst_port = c->rcv.dst_port;
706
707
0
  if (evi_param_add_int(list, &e_tcp_dst_port, &dst_port)) {
708
0
    LM_ERR("unable to add parameter\n");
709
0
    goto end;
710
0
  }
711
712
0
  proto.s = protos[c->rcv.proto].name;
713
0
  proto.len = strlen(proto.s);
714
715
0
  if (evi_param_add_str(list, &e_tcp_c_proto, &proto)) {
716
0
    LM_ERR("unable to add parameter\n");
717
0
    goto end;
718
0
  }
719
720
0
  if (is_tcp_main) {
721
0
    if (evi_dispatch_event(EVI_TCP_DISCONNECT, list)) {
722
0
      LM_ERR("unable to dispatch tcp disconnect event\n");
723
0
    }
724
0
  } else {
725
0
    if (evi_raise_event(EVI_TCP_DISCONNECT, list)) {
726
0
      LM_ERR("unable to send tcp disconnect event\n");
727
0
    }
728
0
  }
729
0
  list = 0;
730
731
0
end:
732
0
  if (list)
733
0
    evi_free_params(list);
734
0
}
735
736
/* convenience macro to aid in shm_free() debugging */
737
#define _tcpconn_rm(c, ne) \
738
0
  do {\
739
0
    __tcpconn_rm(c, ne);\
740
0
    shm_free(c);\
741
0
  } while (0)
742
743
/*! \brief unsafe tcpconn_rm version (nolocks) */
744
static void __tcpconn_rm(struct tcp_connection* c, int no_event)
745
0
{
746
0
  int r;
747
748
0
  tcpconn_listrm(TCP_PART(c->id).tcpconn_id_hash[c->id_hash], c,
749
0
    id_next, id_prev);
750
  /* remove all the aliases */
751
0
  for (r=0; r<c->aliases; r++)
752
0
    tcpconn_listrm(TCP_PART(c->id).tcpconn_aliases_hash[c->con_aliases[r].hash],
753
0
      &c->con_aliases[r], next, prev);
754
0
  lock_destroy(&c->write_lock);
755
756
0
  if (c->async) {
757
0
    for (r = 0; r<c->async->pending; r++)
758
0
      shm_free(c->async->chunks[r]);
759
0
    shm_free(c->async);
760
0
    c->async = NULL;
761
0
  }
762
763
0
  if (c->con_req)
764
0
    shm_free(c->con_req);
765
766
0
  if (protos[c->type].net.stream.conn.clean)
767
0
    protos[c->type].net.stream.conn.clean(c);
768
769
0
  if (!no_event) tcp_disconnect_event_raise(c);
770
771
#ifdef DBG_TCPCON
772
  sh_log(c->hist, TCP_DESTROY, "type=%d", c->type);
773
  sh_unref(c->hist);
774
  c->hist = NULL;
775
#endif
776
777
  /* shm_free(c); -- freed by _tcpconn_rm() */
778
0
}
779
780
781
782
#if 0
783
static void tcpconn_rm(struct tcp_connection* c)
784
{
785
  int r;
786
787
  TCPCONN_LOCK(c->id);
788
  tcpconn_listrm(TCP_PART(c->id).tcpconn_id_hash[c->id_hash], c,
789
    id_next, id_prev);
790
  /* remove all the aliases */
791
  for (r=0; r<c->aliases; r++)
792
    tcpconn_listrm(TCP_PART(c->id).tcpconn_aliases_hash
793
      [c->con_aliases[r].hash],
794
      &c->con_aliases[r], next, prev);
795
  TCPCONN_UNLOCK(c->id);
796
  lock_destroy(&c->write_lock);
797
798
  if (protos[c->type].net.stream.conn.clean)
799
    protos[c->type].net.stream.conn.clean(c);
800
801
  shm_free(c);
802
}
803
#endif
804
805
806
/*! \brief add port as an alias for the "id" connection
807
 * \return 0 on success,-1 on failure */
808
int tcpconn_add_alias(struct sip_msg *msg, unsigned int id, int port, int proto)
809
0
{
810
0
  struct tcp_connection* c;
811
0
  unsigned hash;
812
0
  struct tcp_conn_alias* a;
813
814
0
  a=0;
815
  /* fix the port */
816
0
  port=port ? port : protos[proto].default_port ;
817
0
  TCPCONN_LOCK(id);
818
  /* check if alias already exists */
819
0
  c=_tcpconn_find(id);
820
0
  if (c) {
821
0
    if (msg && !(c->profile.alias_mode == TCP_ALIAS_ALWAYS
822
0
                   || (c->profile.alias_mode == TCP_ALIAS_RFC_5923
823
0
                       && msg->via1->alias))) {
824
0
      LM_DBG("refusing to add alias (alias_mode: %u, via 'alias': %u)\n",
825
0
              c->profile.alias_mode, !!msg->via1->alias);
826
0
      TCPCONN_UNLOCK(id);
827
0
      return 0;
828
0
    }
829
830
0
    hash=tcp_addr_hash(&c->rcv.src_ip, port);
831
    /* search the aliases for an already existing one */
832
0
    for (a=TCP_PART(id).tcpconn_aliases_hash[hash]; a; a=a->next) {
833
0
      if (a->parent->state != S_CONN_BAD &&
834
0
          port == a->port &&
835
0
          proto == a->parent->type &&
836
0
          ip_addr_cmp(&c->rcv.src_ip, &a->parent->rcv.src_ip)) {
837
        /* found */
838
0
        if (a->parent!=c) goto error_sec;
839
0
        else goto ok;
840
0
      }
841
0
    }
842
0
    if (c->aliases>=TCP_CON_MAX_ALIASES) goto error_aliases;
843
0
    c->con_aliases[c->aliases].parent=c;
844
0
    c->con_aliases[c->aliases].port=port;
845
0
    c->con_aliases[c->aliases].hash=hash;
846
0
    tcpconn_listadd(TCP_PART(id).tcpconn_aliases_hash[hash],
847
0
                &c->con_aliases[c->aliases], next, prev);
848
0
    c->aliases++;
849
0
  }else goto error_not_found;
850
0
ok:
851
0
  TCPCONN_UNLOCK(id);
852
#ifdef EXTRA_DEBUG
853
  if (a) LM_DBG("alias already present\n");
854
  else   LM_DBG("alias port %d for hash %d, id %u\n", port, hash, id);
855
#endif
856
0
  return 0;
857
0
error_aliases:
858
0
  TCPCONN_UNLOCK(id);
859
0
  LM_ERR("too many aliases for connection %p (%u)\n", c, id);
860
0
  return -1;
861
0
error_not_found:
862
0
  TCPCONN_UNLOCK(id);
863
0
  LM_ERR("no connection found for id %u\n",id);
864
0
  return -1;
865
0
error_sec:
866
0
  LM_WARN("possible port hijack attempt\n");
867
0
  LM_WARN("alias already present and points to another connection "
868
0
      "(%d : %d and %u : %d)\n", a->parent->id,  port, id, port);
869
0
  TCPCONN_UNLOCK(id);
870
0
  return -1;
871
0
}
872
873
874
void tcpconn_put(struct tcp_connection* c)
875
0
{
876
0
  TCPCONN_LOCK(c->id);
877
0
  c->refcnt--;
878
0
  TCPCONN_UNLOCK(c->id);
879
0
}
880
881
882
static inline void tcpconn_ref(struct tcp_connection* c)
883
0
{
884
0
  TCPCONN_LOCK(c->id);
885
0
  c->refcnt++;
886
0
  TCPCONN_UNLOCK(c->id);
887
0
}
888
889
890
static struct tcp_connection* tcpconn_new(int sock, const union sockaddr_union* su,
891
                    const struct socket_info* si, const struct tcp_conn_profile *prof,
892
                    int state, int flags, int in_main_proc)
893
0
{
894
0
  struct tcp_connection *c;
895
0
  union sockaddr_union local_su;
896
0
  unsigned int su_size;
897
898
0
  c=(struct tcp_connection*)shm_malloc(sizeof(struct tcp_connection));
899
0
  if (c==0){
900
0
    LM_ERR("shared memory allocation failure\n");
901
0
    return 0;
902
0
  }
903
0
  memset(c, 0, sizeof(struct tcp_connection)); /* zero init */
904
0
  c->s=sock;
905
0
  c->fd=-1; /* not initialized */
906
0
  if (lock_init(&c->write_lock)==0){
907
0
    LM_ERR("init lock failed\n");
908
0
    goto error0;
909
0
  }
910
911
0
  c->rcv.src_su=*su;
912
913
0
  c->refcnt=0;
914
0
  su2ip_addr(&c->rcv.src_ip, su);
915
0
  c->rcv.src_port=su_getport(su);
916
0
  c->rcv.bind_address = si;
917
0
  c->rcv.dst_ip = si->address;
918
0
  su_size = sockaddru_len(*su);
919
0
  if (getsockname(sock, (struct sockaddr *)&local_su, &su_size)<0) {
920
0
    LM_ERR("failed to get info on received interface/IP %d/%s\n",
921
0
      errno, strerror(errno));
922
0
    goto error;
923
0
  }
924
0
  c->rcv.dst_port = su_getport(&local_su);
925
0
  print_ip("tcpconn_new: new tcp connection to: ", &c->rcv.src_ip, "\n");
926
0
  LM_DBG("on port %d, proto %d\n", c->rcv.src_port, si->proto);
927
0
  c->id=(*connection_id)++;
928
0
  c->cid = (unsigned long long)c->id
929
0
        | ( (unsigned long long)(startup_time&0xFFFFFF) << 32 )
930
0
          | ( (unsigned long long)(rand()&0xFF) << 56 );
931
932
0
  c->rcv.proto_reserved1=0; /* this will be filled before receive_message*/
933
0
  c->rcv.proto_reserved2=0;
934
0
  c->state=state;
935
0
  c->extra_data=0;
936
0
  c->type = si->proto;
937
0
  c->rcv.proto = si->proto;
938
  /* start with the default conn lifetime */
939
0
  c->lifetime = get_ticks() + prof->con_lifetime;
940
0
  c->profile = *prof;
941
0
  c->flags|=F_CONN_REMOVED|flags;
942
#ifdef DBG_TCPCON
943
  c->hist = sh_push(c, con_hist);
944
#endif
945
946
0
  if (protos[si->proto].net.stream.async_chunks) {
947
0
    c->async = shm_malloc(sizeof(struct tcp_async_data) +
948
0
        protos[si->proto].net.stream.async_chunks *
949
0
        sizeof(struct tcp_async_chunk));
950
0
    if (c->async) {
951
0
      c->async->allocated = protos[si->proto].net.stream.async_chunks;
952
0
      c->async->oldest = 0;
953
0
      c->async->pending = 0;
954
0
    } else {
955
0
      LM_ERR("could not allocate async data for con!\n");
956
0
      goto error;
957
0
    }
958
0
  }
959
0
  if(in_main_proc)
960
0
    tcp_connections_no++;
961
0
  return c;
962
963
0
error:
964
0
  lock_destroy(&c->write_lock);
965
0
error0:
966
0
  shm_free(c);
967
0
  return 0;
968
0
}
969
970
971
/* creates a new tcp connection structure
972
 * if send2main is 1, the function informs the TCP Main about the new conn
973
 * a +1 ref is set for the new conn !
974
 * IMPORTANT - the function assumes you want to create a new TCP conn as
975
 * a result of a connect operation - the conn will be set as connect !!
976
 * Accepted connection are triggered internally only */
977
struct tcp_connection* tcp_conn_create(int sock, const union sockaddr_union* su,
978
    const struct socket_info* si, struct tcp_conn_profile *prof,
979
    int state, int send2main)
980
0
{
981
0
  struct tcp_connection *c;
982
983
0
  if (!prof)
984
0
    tcp_con_get_profile(su, &si->su, si->proto, prof);
985
986
  /* create the connection structure */
987
0
  c = tcpconn_new(sock, su, si, prof, state, 0, !send2main);
988
0
  if (c==NULL) {
989
0
    LM_ERR("tcpconn_new failed\n");
990
0
    return NULL;
991
0
  }
992
993
0
  if (protos[c->type].net.stream.conn.init &&
994
0
      protos[c->type].net.stream.conn.init(c) < 0) {
995
0
    LM_ERR("failed to do proto %d specific init for conn %p\n",
996
0
        c->type, c);
997
0
    tcp_conn_destroy(c);
998
0
    return NULL;
999
0
  }
1000
0
  c->flags |= F_CONN_INIT;
1001
1002
0
  c->refcnt++; /* safe to do it w/o locking, it's not yet
1003
          available to the rest of the world */
1004
0
  sh_log(c->hist, TCP_REF, "connect, (%d)", c->refcnt);
1005
0
  if (!send2main)
1006
0
    return c;
1007
1008
0
  return (tcp_conn_send(c) == 0 ? c : NULL);
1009
0
}
1010
1011
/* sends a new connection from a worker to main */
1012
int tcp_conn_send(struct tcp_connection *c)
1013
0
{
1014
0
  long response[2];
1015
0
  int n, fd;
1016
1017
  /* inform TCP main about this new connection */
1018
0
  if (c->state==S_CONN_CONNECTING) {
1019
    /* store the local fd now, before TCP main overwrites it */
1020
0
    fd = c->s;
1021
0
    response[0]=(long)c;
1022
0
    response[1]=ASYNC_CONNECT;
1023
0
    n=send_fd(unix_tcp_sock, response, sizeof(response), fd);
1024
0
    if (n<=0) {
1025
0
      LM_ERR("Failed to send the socket to main for async connection\n");
1026
0
      goto error;
1027
0
    }
1028
0
    close(fd);
1029
0
  } else {
1030
0
    response[0]=(long)c;
1031
0
    response[1]=CONN_NEW;
1032
0
    n=send_fd(unix_tcp_sock, response, sizeof(response), c->s);
1033
0
    if (n<=0){
1034
0
      LM_ERR("failed send_fd: %s (%d)\n", strerror(errno), errno);
1035
0
      goto error;
1036
0
    }
1037
0
  }
1038
1039
0
  return 0;
1040
0
error:
1041
  /* no reporting as closed, as PROTO layer did not reporte it as
1042
   * OPEN yet */
1043
0
  _tcpconn_rm(c,1);
1044
0
  tcp_connections_no--;
1045
0
  return -1;
1046
0
}
1047
1048
1049
static inline void tcpconn_destroy(struct tcp_connection* tcpconn)
1050
0
{
1051
0
  int fd;
1052
0
  int unsigned id = tcpconn->id;
1053
1054
0
  TCPCONN_LOCK(id); /*avoid races w/ tcp_send*/
1055
0
  tcpconn->refcnt--;
1056
0
  if (tcpconn->refcnt==0){
1057
0
    LM_DBG("destroying connection %p, flags %04x\n",
1058
0
        tcpconn, tcpconn->flags);
1059
0
    fd=tcpconn->s;
1060
    /* no reporting here - the tcpconn_destroy() function is called
1061
     * from the TCP_MAIN reactor when handling connectioned received
1062
     * from a worker; and we generate the CLOSE reports from WORKERs */
1063
0
    _tcpconn_rm(tcpconn,0);
1064
0
    if (fd >= 0)
1065
0
      close(fd);
1066
0
    tcp_connections_no--;
1067
0
  }else{
1068
    /* force timeout */
1069
0
    tcpconn->lifetime=0;
1070
0
    tcpconn->state=S_CONN_BAD;
1071
0
    LM_DBG("delaying (%p, flags %04x) ref = %d ...\n",
1072
0
        tcpconn, tcpconn->flags, tcpconn->refcnt);
1073
1074
0
  }
1075
0
  TCPCONN_UNLOCK(id);
1076
0
}
1077
1078
/* wrapper to the internally used function */
1079
void tcp_conn_destroy(struct tcp_connection* tcpconn)
1080
0
{
1081
0
  tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE,
1082
0
        "Closed by Proto layer");
1083
0
  sh_log(tcpconn->hist, TCP_UNREF, "tcp_conn_destroy, (%d)", tcpconn->refcnt);
1084
0
  return tcpconn_destroy(tcpconn);
1085
0
}
1086
1087
1088
/************************ TCP MAIN process functions ************************/
1089
1090
/*! \brief
1091
 * handles a new connection, called internally by tcp_main_loop/handle_io.
1092
 * \param si - pointer to one of the tcp socket_info structures on which
1093
 *              an io event was detected (connection attempt)
1094
 * \return  handle_* return convention: -1 on error, 0 on EAGAIN (no more
1095
 *           io events queued), >0 on success. success/error refer only to
1096
 *           the accept.
1097
 */
1098
static inline int handle_new_connect(const struct socket_info* si)
1099
0
{
1100
0
  union sockaddr_union su;
1101
0
  struct tcp_connection* tcpconn;
1102
0
  struct tcp_conn_profile prof;
1103
0
  socklen_t su_len = sizeof(su);
1104
0
  int new_sock;
1105
0
  unsigned int id;
1106
1107
  /* coverity[overrun-buffer-arg: FALSE] - union has 28 bytes, CID #200070 */
1108
0
  new_sock=accept(si->socket, &(su.s), &su_len);
1109
0
  if (new_sock==-1){
1110
0
    if ((errno==EAGAIN)||(errno==EWOULDBLOCK))
1111
0
      return 0;
1112
0
    LM_ERR("failed to accept connection(%d): %s\n", errno, strerror(errno));
1113
0
    return -1;
1114
0
  }
1115
0
  if (tcp_connections_no>=tcp_max_connections){
1116
0
    LM_ERR("maximum number of connections exceeded: %d/%d\n",
1117
0
          tcp_connections_no, tcp_max_connections);
1118
0
    close(new_sock);
1119
0
    return 1; /* success, because the accept was successful */
1120
0
  }
1121
1122
0
  tcp_con_get_profile(&su, &si->su, si->proto, &prof);
1123
0
  if (tcp_init_sock_opt(new_sock, &prof, si->flags, si->tos)<0){
1124
0
    LM_ERR("tcp_init_sock_opt failed\n");
1125
0
    close(new_sock);
1126
0
    return 1; /* success, because the accept was successful */
1127
0
  }
1128
1129
  /* add socket to list */
1130
0
  tcpconn=tcpconn_new(new_sock, &su, si, &prof, S_CONN_OK, F_CONN_ACCEPTED, 1);
1131
0
  if (tcpconn){
1132
0
    tcpconn->refcnt++; /* safe, not yet available to the
1133
                outside world */
1134
0
    sh_log(tcpconn->hist, TCP_REF, "accept, (%d)", tcpconn->refcnt);
1135
0
    tcpconn_add(tcpconn);
1136
0
    LM_DBG("new connection: %p %d flags: %04x\n",
1137
0
        tcpconn, tcpconn->s, tcpconn->flags);
1138
    /* pass it to a workerr */
1139
0
    sh_log(tcpconn->hist, TCP_SEND2CHILD, "accept");
1140
0
    if(send2worker(tcpconn,IO_WATCH_READ)<0){
1141
0
      LM_ERR("no TCP workers available\n");
1142
0
      id = tcpconn->id;
1143
0
      sh_log(tcpconn->hist, TCP_UNREF, "accept, (%d)", tcpconn->refcnt);
1144
0
      TCPCONN_LOCK(id);
1145
0
      tcpconn->refcnt--;
1146
0
      if (tcpconn->refcnt==0){
1147
        /* no close to report here as the connection was not yet
1148
         * reported as OPEN by the proto layer...this sucks a bit */
1149
0
        _tcpconn_rm(tcpconn,1);
1150
0
        close(new_sock/*same as tcpconn->s*/);
1151
0
      }else tcpconn->lifetime=0; /* force expire */
1152
0
      TCPCONN_UNLOCK(id);
1153
0
    }
1154
0
  }else{ /*tcpconn==0 */
1155
0
    LM_ERR("tcpconn_new failed, closing socket\n");
1156
0
    close(new_sock);
1157
0
  }
1158
0
  return 1; /* accept() was successful */
1159
0
}
1160
1161
1162
/*! \brief
1163
 * handles an io event on one of the watched tcp connections
1164
 *
1165
 * \param    tcpconn - pointer to the tcp_connection for which we have an io ev.
1166
 * \param    fd_i    - index in the fd_array table (needed for delete)
1167
 * \return   handle_* return convention, but on success it always returns 0
1168
 *           (because it's one-shot, after a successful execution the fd is
1169
 *            removed from tcp_main's watch fd list and passed to a worker =>
1170
 *            tcp_main is not interested in further io events that might be
1171
 *            queued for this fd)
1172
 */
1173
inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i,
1174
                                int event_type)
1175
0
{
1176
0
  int fd;
1177
0
  int err;
1178
0
  unsigned int id;
1179
0
  unsigned int err_len;
1180
1181
0
  if (event_type == IO_WATCH_READ) {
1182
    /* pass it to worker, so remove it from the io watch list */
1183
0
    LM_DBG("data available on %p %d\n", tcpconn, tcpconn->s);
1184
0
    if (reactor_del_reader(tcpconn->s, fd_i, 0)==-1)
1185
0
      return -1;
1186
0
    tcpconn->flags|=F_CONN_REMOVED_READ;
1187
0
    tcpconn_ref(tcpconn); /* refcnt ++ */
1188
0
    sh_log(tcpconn->hist, TCP_REF, "tcpconn read, (%d)", tcpconn->refcnt);
1189
0
    sh_log(tcpconn->hist, TCP_SEND2CHILD, "read");
1190
0
    if (send2worker(tcpconn,IO_WATCH_READ)<0){
1191
0
      LM_ERR("no TCP workers available\n");
1192
0
      id = tcpconn->id;
1193
0
      TCPCONN_LOCK(id);
1194
0
      tcpconn->refcnt--;
1195
0
      sh_log(tcpconn->hist, TCP_UNREF, "tcpconn read, (%d)", tcpconn->refcnt);
1196
0
      if (tcpconn->refcnt==0){
1197
0
        fd=tcpconn->s;
1198
0
        tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE,
1199
0
          "No worker for read");
1200
0
        _tcpconn_rm(tcpconn,0);
1201
0
        close(fd);
1202
0
      }else tcpconn->lifetime=0; /* force expire*/
1203
0
      TCPCONN_UNLOCK(id);
1204
0
    }
1205
0
    return 0; /* we are not interested in possibly queued io events,
1206
           the fd was either passed to a worker, or closed */
1207
0
  } else {
1208
0
    LM_DBG("connection %p fd %d is now writable\n", tcpconn, tcpconn->s);
1209
    /* we received a write event */
1210
0
    if (tcpconn->state==S_CONN_CONNECTING) {
1211
      /* we're coming from an async connect & write
1212
       * let's see if we connected successfully */
1213
0
      err_len=sizeof(err);
1214
0
      if (getsockopt(tcpconn->s, SOL_SOCKET, SO_ERROR, &err, &err_len) < 0 || \
1215
0
          err != 0) {
1216
0
        LM_DBG("Failed connection attempt\n");
1217
0
        tcpconn_ref(tcpconn);
1218
0
        sh_log(tcpconn->hist, TCP_REF, "tcpconn connect, (%d)", tcpconn->refcnt);
1219
0
        reactor_del_all(tcpconn->s, fd_i, IO_FD_CLOSING);
1220
0
        tcpconn->flags|=F_CONN_REMOVED;
1221
0
        tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE,
1222
0
          "Async connect failed");
1223
0
        sh_log(tcpconn->hist, TCP_UNREF, "tcpconn connect, (%d)", tcpconn->refcnt);
1224
0
        tcpconn_destroy(tcpconn);
1225
0
        return 0;
1226
0
      }
1227
1228
      /* we successfully connected - further treat this case as if we
1229
       * were coming from an async write */
1230
0
      tcpconn->state = S_CONN_OK;
1231
0
      LM_DBG("Successfully completed previous async connect\n");
1232
1233
      /* now that we completed the async connection, we also need to
1234
       * listen for READ events, otherwise these will get lost */
1235
0
      if (tcpconn->flags & F_CONN_REMOVED_READ) {
1236
0
        reactor_add_reader( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1237
0
        tcpconn->flags&=~F_CONN_REMOVED_READ;
1238
0
      }
1239
1240
0
      goto async_write;
1241
0
    } else {
1242
      /* we're coming from an async write -
1243
       * just pass to worker and have it write
1244
       * our TCP chunks */
1245
0
async_write:
1246
      /* no more write events for now */
1247
0
      if (reactor_del_writer( tcpconn->s, fd_i, 0)==-1)
1248
0
        return -1;
1249
0
      tcpconn->flags|=F_CONN_REMOVED_WRITE;
1250
0
      tcpconn_ref(tcpconn); /* refcnt ++ */
1251
0
      sh_log(tcpconn->hist, TCP_REF, "tcpconn write, (%d)",
1252
0
        tcpconn->refcnt);
1253
0
      sh_log(tcpconn->hist, TCP_SEND2CHILD, "write");
1254
0
      if (send2worker(tcpconn,IO_WATCH_WRITE)<0){
1255
0
        LM_ERR("no TCP worker available\n");
1256
0
        id = tcpconn->id;
1257
0
        TCPCONN_LOCK(id);
1258
0
        tcpconn->refcnt--;
1259
0
        sh_log(tcpconn->hist, TCP_UNREF, "tcpconn write, (%d)",
1260
0
          tcpconn->refcnt);
1261
0
        if (tcpconn->refcnt==0){
1262
0
          fd=tcpconn->s;
1263
0
          tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE,
1264
0
            "No worker for write");
1265
0
          _tcpconn_rm(tcpconn,0);
1266
0
          close(fd);
1267
0
        }else tcpconn->lifetime=0; /* force expire*/
1268
0
        TCPCONN_UNLOCK(id);
1269
0
      }
1270
0
      return 0;
1271
0
    }
1272
0
  }
1273
0
}
1274
1275
1276
/*! \brief handles io from a tcp worker process
1277
 * \param  tcp_c - pointer in the tcp_workers array, to the entry for
1278
 *                 which an io event was detected
1279
 * \param  fd_i  - fd index in the fd_array (useful for optimizing
1280
 *                 io_watch_deletes)
1281
 * \return handle_* return convention: -1 on error, 0 on EAGAIN (no more
1282
 *           io events queued), >0 on success. success/error refer only to
1283
 *           the reads from the fd.
1284
 */
1285
inline static int handle_tcp_worker(struct tcp_worker* tcp_c, int fd_i)
1286
0
{
1287
0
  struct tcp_connection* tcpconn;
1288
0
  long response[2];
1289
0
  int cmd;
1290
0
  int bytes;
1291
1292
0
  if (tcp_c->unix_sock<=0){
1293
    /* (we can't have a fd==0, 0 is never closed )*/
1294
0
    LM_CRIT("fd %d for %d (pid %d)\n", tcp_c->unix_sock,
1295
0
        (int)(tcp_c-&tcp_workers[0]), tcp_c->pid);
1296
0
    goto error;
1297
0
  }
1298
  /* read until sizeof(response)
1299
   * (this is a SOCK_STREAM so read is not atomic) */
1300
0
  bytes=recv_all(tcp_c->unix_sock, response, sizeof(response), MSG_DONTWAIT);
1301
0
  if (bytes<(int)sizeof(response)){
1302
0
    if (bytes==0){
1303
      /* EOF -> bad, worker has died */
1304
0
      if (sr_get_core_status()!=STATE_TERMINATING)
1305
0
        LM_CRIT("dead tcp worker %d (EOF received), pid %d\n",
1306
0
          (int)(tcp_c-&tcp_workers[0]), tcp_c->pid );
1307
      /* don't listen on it any more */
1308
0
      reactor_del_reader( tcp_c->unix_sock, fd_i, 0/*flags*/);
1309
      /* eof. so no more io here, it's ok to return error */
1310
0
      goto error;
1311
0
    }else if (bytes<0){
1312
      /* EAGAIN is ok if we try to empty the buffer
1313
       * e.g.: SIGIO_RT overflow mode or EPOLL ET */
1314
0
      if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){
1315
0
        LM_CRIT("read from tcp worker %ld (pid %d) %s [%d]\n",
1316
0
            (long)(tcp_c-&tcp_workers[0]), tcp_c->pid,
1317
0
            strerror(errno), errno );
1318
0
      }else{
1319
0
        bytes=0;
1320
0
      }
1321
      /* try to ignore ? */
1322
0
      goto end;
1323
0
    }else{
1324
      /* should never happen */
1325
0
      LM_CRIT("too few bytes received (%d)\n", bytes );
1326
0
      bytes=0; /* something was read so there is no error; otoh if
1327
            receive_fd returned less then requested => the receive
1328
            buffer is empty => no more io queued on this fd */
1329
0
      goto end;
1330
0
    }
1331
0
  }
1332
1333
0
  LM_DBG("response= %lx, %ld from tcp worker %d (%d)\n",
1334
0
    response[0], response[1], tcp_c->pid, (int)(tcp_c-&tcp_workers[0]));
1335
1336
0
  cmd=response[1];
1337
0
  tcpconn=(struct tcp_connection*)response[0];
1338
0
  if (tcpconn==0){
1339
    /* should never happen */
1340
0
    LM_CRIT("null tcpconn pointer received from tcp worker %d (pid %d):"
1341
0
      "%lx, %lx\n", (int)(tcp_c-&tcp_workers[0]), tcp_c->pid,
1342
0
      response[0], response[1]) ;
1343
0
    goto end;
1344
0
  }
1345
0
  switch(cmd){
1346
0
    case CONN_RELEASE:
1347
0
      if (tcpconn->state==S_CONN_BAD){
1348
0
        sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release bad, (%d)", tcpconn->refcnt);
1349
0
        tcpconn_destroy(tcpconn);
1350
0
        break;
1351
0
      }
1352
0
      sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release, (%d)", tcpconn->refcnt);
1353
0
      tcpconn_put(tcpconn);
1354
      /* must be after the de-ref*/
1355
0
      reactor_add_reader( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1356
0
      tcpconn->flags&=~F_CONN_REMOVED_READ;
1357
0
      break;
1358
0
    case CONN_RELEASE_WRITE:
1359
0
      if (tcpconn->state==S_CONN_BAD){
1360
0
        sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release write bad, (%d)", tcpconn->refcnt);
1361
0
        tcpconn_destroy(tcpconn);
1362
0
        break;
1363
0
      }
1364
0
      sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release write, (%d)", tcpconn->refcnt);
1365
0
      tcpconn_put(tcpconn);
1366
0
      break;
1367
0
    case ASYNC_WRITE_TCPW:
1368
0
      if (tcpconn->state==S_CONN_BAD){
1369
0
        sh_log(tcpconn->hist, TCP_UNREF, "tcpworker async write bad, (%d)", tcpconn->refcnt);
1370
0
        tcpconn_destroy(tcpconn);
1371
0
        break;
1372
0
      }
1373
0
      sh_log(tcpconn->hist, TCP_UNREF, "tcpworker async write, (%d)", tcpconn->refcnt);
1374
0
      tcpconn_put(tcpconn);
1375
      /* must be after the de-ref*/
1376
0
      reactor_add_writer( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1377
0
      tcpconn->flags&=~F_CONN_REMOVED_WRITE;
1378
0
      break;
1379
0
    case CONN_ERROR_TCPW:
1380
0
    case CONN_DESTROY:
1381
0
    case CONN_EOF:
1382
      /* WARNING: this will auto-dec. refcnt! */
1383
0
      if ((tcpconn->flags & F_CONN_REMOVED) != F_CONN_REMOVED &&
1384
0
        (tcpconn->s!=-1)){
1385
0
        reactor_del_all( tcpconn->s, -1, IO_FD_CLOSING);
1386
0
        tcpconn->flags|=F_CONN_REMOVED;
1387
0
      }
1388
0
      sh_log(tcpconn->hist, TCP_UNREF, "tcpworker destroy, (%d)", tcpconn->refcnt);
1389
0
      tcpconn_destroy(tcpconn); /* closes also the fd */
1390
0
      break;
1391
0
    default:
1392
0
      LM_CRIT("unknown cmd %d from tcp worker %d (%d)\n",
1393
0
        cmd, tcp_c->pid, (int)(tcp_c-&tcp_workers[0]));
1394
0
  }
1395
0
end:
1396
0
  return bytes;
1397
0
error:
1398
0
  return -1;
1399
0
}
1400
1401
1402
/*! \brief handles io from a "generic" ser process (get fd or new_fd from a tcp_send)
1403
 *
1404
 * \param p     - pointer in the ser processes array (pt[]), to the entry for
1405
 *                 which an io event was detected
1406
 * \param fd_i  - fd index in the fd_array (useful for optimizing
1407
 *                 io_watch_deletes)
1408
 * \return  handle_* return convention:
1409
 *          - -1 on error reading from the fd,
1410
 *          -  0 on EAGAIN  or when no  more io events are queued
1411
 *             (receive buffer empty),
1412
 *          -  >0 on successful reads from the fd (the receive buffer might
1413
 *             be non-empty).
1414
 */
1415
inline static int handle_worker(struct process_table* p, int fd_i)
1416
0
{
1417
0
  struct tcp_connection* tcpconn;
1418
0
  long response[2];
1419
0
  int cmd;
1420
0
  int bytes;
1421
0
  int ret;
1422
0
  int fd;
1423
1424
0
  ret=-1;
1425
0
  if (p->unix_sock<=0){
1426
    /* (we can't have a fd==0, 0 is never closed )*/
1427
0
    LM_CRIT("fd %d for %d (pid %d)\n",
1428
0
        p->unix_sock, (int)(p-&pt[0]), p->pid);
1429
0
    goto error;
1430
0
  }
1431
1432
  /* get all bytes and the fd (if transmitted)
1433
   * (this is a SOCK_STREAM so read is not atomic) */
1434
0
  bytes=receive_fd(p->unix_sock, response, sizeof(response), &fd,
1435
0
            MSG_DONTWAIT);
1436
0
  if (bytes<(int)sizeof(response)){
1437
    /* too few bytes read */
1438
0
    if (bytes==0){
1439
      /* EOF -> bad, worker has died */
1440
0
      if (sr_get_core_status()!=STATE_TERMINATING)
1441
0
        LM_CRIT("dead tcp worker %d (EOF received), pid %d\n",
1442
0
          (int)(p-&pt[0]), p->pid);
1443
      /* don't listen on it any more */
1444
0
      reactor_del_reader( p->unix_sock, fd_i, 0/*flags*/);
1445
0
      goto error; /* worker dead => no further io events from it */
1446
0
    }else if (bytes<0){
1447
      /* EAGAIN is ok if we try to empty the buffer
1448
       * e.g: SIGIO_RT overflow mode or EPOLL ET */
1449
0
      if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){
1450
0
        LM_CRIT("read from worker %d (pid %d):  %s [%d]\n",
1451
0
            (int)(p-&pt[0]), p->pid, strerror(errno), errno);
1452
0
        ret=-1;
1453
0
      }else{
1454
0
        ret=0;
1455
0
      }
1456
      /* try to ignore ? */
1457
0
      goto end;
1458
0
    }else{
1459
      /* should never happen */
1460
0
      LM_CRIT("too few bytes received (%d)\n", bytes );
1461
0
      ret=0; /* something was read so there is no error; otoh if
1462
            receive_fd returned less then requested => the receive
1463
            buffer is empty => no more io queued on this fd */
1464
0
      goto end;
1465
0
    }
1466
0
  }
1467
0
  ret=1; /* something was received, there might be more queued */
1468
0
  LM_DBG("read response= %lx, %ld, fd %d from %d (%d)\n",
1469
0
          response[0], response[1], fd, (int)(p-&pt[0]), p->pid);
1470
0
  cmd=response[1];
1471
0
  tcpconn=(struct tcp_connection*)response[0];
1472
0
  if (tcpconn==0){
1473
0
    LM_CRIT("null tcpconn pointer received from worker %d (pid %d)"
1474
0
      "%lx, %lx\n", (int)(p-&pt[0]), p->pid, response[0], response[1]) ;
1475
0
    goto end;
1476
0
  }
1477
0
  switch(cmd){
1478
0
    case CONN_ERROR_GENW:
1479
      /* remove from reactor only if the fd exists, and it wasn't
1480
       * removed before */
1481
0
      if ((tcpconn->flags & F_CONN_REMOVED) != F_CONN_REMOVED &&
1482
0
          (tcpconn->s!=-1)){
1483
0
        reactor_del_all( tcpconn->s, -1, IO_FD_CLOSING);
1484
0
        tcpconn->flags|=F_CONN_REMOVED;
1485
0
      }
1486
0
      sh_log(tcpconn->hist, TCP_UNREF, "worker error, (%d)", tcpconn->refcnt);
1487
0
      tcpconn_destroy(tcpconn); /* will close also the fd */
1488
0
      break;
1489
0
    case CONN_GET_FD:
1490
      /* send the requested FD  */
1491
      /* WARNING: take care of setting refcnt properly to
1492
       * avoid race condition */
1493
0
      if (send_fd(p->unix_sock, &tcpconn, sizeof(tcpconn),
1494
0
              tcpconn->s)<=0){
1495
0
        LM_ERR("send_fd failed\n");
1496
0
      }
1497
0
      break;
1498
0
    case CONN_NEW:
1499
      /* update the fd in the requested tcpconn*/
1500
      /* WARNING: take care of setting refcnt properly to
1501
       * avoid race condition */
1502
0
      if (fd==-1){
1503
0
        LM_CRIT(" cmd CONN_NEW: no fd received\n");
1504
0
        break;
1505
0
      }
1506
0
      tcpconn->s=fd;
1507
      /* add tcpconn to the list*/
1508
0
      tcpconn_add(tcpconn);
1509
0
      tcp_connections_no++;
1510
0
      reactor_add_reader( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1511
0
      tcpconn->flags&=~F_CONN_REMOVED_READ;
1512
0
      break;
1513
0
    case ASYNC_CONNECT:
1514
      /* connection is not yet linked to hash = not yet
1515
       * available to the outside world */
1516
0
      if (fd==-1){
1517
0
        LM_CRIT(" cmd CONN_NEW: no fd received\n");
1518
0
        break;
1519
0
      }
1520
0
      tcpconn->s=fd;
1521
      /* add tcpconn to the list*/
1522
0
      tcpconn_add(tcpconn);
1523
0
      tcp_connections_no++;
1524
      /* FIXME - now we have lifetime==default_lifetime - should we
1525
       * set a shorter one when waiting for a connect ??? */
1526
      /* only maintain the socket in the IO_WATCH_WRITE watcher
1527
       * while we have stuff to write - otherwise we're going to get
1528
       * useless events */
1529
0
      reactor_add_writer( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1530
0
      tcpconn->flags&=~F_CONN_REMOVED_WRITE;
1531
0
      break;
1532
0
    case ASYNC_WRITE_GENW:
1533
0
      if (tcpconn->state==S_CONN_BAD){
1534
0
        tcpconn->lifetime=0;
1535
0
        break;
1536
0
      }
1537
0
      tcpconn_put(tcpconn);
1538
      /* must be after the de-ref*/
1539
0
      reactor_add_writer( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
1540
0
      tcpconn->flags&=~F_CONN_REMOVED_WRITE;
1541
0
      break;
1542
0
    default:
1543
0
      LM_CRIT("unknown cmd %d from worker %d (pid %d)\n", cmd,
1544
0
        (int)(p-&pt[0]), p->pid);
1545
0
  }
1546
0
end:
1547
0
  return ret;
1548
0
error:
1549
0
  return -1;
1550
0
}
1551
1552
1553
/*! \brief generic handle io routine, it will call the appropiate
1554
 *  handle_xxx() based on the fd_map type
1555
 *
1556
 * \param  fm  - pointer to a fd hash entry
1557
 * \param  idx - index in the fd_array (or -1 if not known)
1558
 * \return -1 on error
1559
 *          0 on EAGAIN or when by some other way it is known that no more
1560
 *            io events are queued on the fd (the receive buffer is empty).
1561
 *            Usefull to detect when there are no more io events queued for
1562
 *            sigio_rt, epoll_et, kqueue.
1563
 *         >0 on successful read from the fd (when there might be more io
1564
 *            queued -- the receive buffer might still be non-empty)
1565
 */
1566
inline static int handle_io(struct fd_map* fm, int idx,int event_type)
1567
0
{
1568
0
  int ret = 0;
1569
1570
0
  pt_become_active();
1571
0
  switch(fm->type){
1572
0
    case F_TCP_LISTENER:
1573
0
      ret = handle_new_connect((const struct socket_info*)fm->data);
1574
0
      break;
1575
0
    case F_TCPCONN:
1576
0
      ret = handle_tcpconn_ev((struct tcp_connection*)fm->data, idx,
1577
0
        event_type);
1578
0
      break;
1579
0
    case F_TCP_TCPWORKER:
1580
0
      ret = handle_tcp_worker((struct tcp_worker*)fm->data, idx);
1581
0
      break;
1582
0
    case F_TCP_WORKER:
1583
0
      ret = handle_worker((struct process_table*)fm->data, idx);
1584
0
      break;
1585
0
    case F_IPC:
1586
0
      ipc_handle_job(fm->fd);
1587
0
      break;
1588
0
    case F_NONE:
1589
0
      LM_CRIT("empty fd map\n");
1590
0
      goto error;
1591
0
    default:
1592
0
      LM_CRIT("unknown fd type %d\n", fm->type);
1593
0
      goto error;
1594
0
  }
1595
0
  pt_become_idle();
1596
0
  return ret;
1597
0
error:
1598
0
  pt_become_idle();
1599
0
  return -1;
1600
0
}
1601
1602
1603
/*
1604
 * iterates through all TCP connections and closes expired ones
1605
 * Note: runs once per second at most
1606
 */
1607
#define tcpconn_lifetime(last_sec) \
1608
  do { \
1609
    int now; \
1610
    now = get_ticks(); \
1611
    if (last_sec != now) { \
1612
      last_sec = now; \
1613
      __tcpconn_lifetime(0); \
1614
    } \
1615
  } while (0)
1616
1617
1618
/*! \brief very inefficient for now - FIXME
1619
 * keep in sync with tcpconn_destroy, the "delete" part should be
1620
 * the same except for io_watch_del..
1621
 * \todo FIXME (very inefficient for now)
1622
 */
1623
static inline void __tcpconn_lifetime(int shutdown)
1624
0
{
1625
0
  struct tcp_connection *c, *next;
1626
0
  unsigned int ticks,part;
1627
0
  unsigned h;
1628
0
  int fd;
1629
1630
0
  if (have_ticks())
1631
0
    ticks=get_ticks();
1632
0
  else
1633
0
    ticks=0;
1634
1635
0
  for( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) {
1636
0
    if (!shutdown) TCPCONN_LOCK(part); /* fixme: we can lock only on delete IMO */
1637
0
    for(h=0; h<TCP_ID_HASH_SIZE; h++){
1638
0
      c=TCP_PART(part).tcpconn_id_hash[h];
1639
0
      while(c){
1640
0
        next=c->id_next;
1641
0
        if (shutdown ||((c->refcnt==0) && (ticks>c->lifetime))) {
1642
0
          if (!shutdown)
1643
0
            LM_DBG("timeout for hash=%d - %p"
1644
0
                " (%d > %d)\n", h, c, ticks, c->lifetime);
1645
0
          fd=c->s;
1646
          /* report the closing of the connection . Note that
1647
           * there are connectioned that use an foced expire to 0
1648
           * as a way to be deleted - we are not interested in */
1649
          /* Also, do not trigger reporting when shutdown
1650
           * is done */
1651
0
          if (c->lifetime>0 && !shutdown)
1652
0
            tcp_trigger_report(c, TCP_REPORT_CLOSE,
1653
0
              "Timeout on no traffic");
1654
0
          if ((!shutdown)&&(fd>0)&&(c->refcnt==0)) {
1655
            /* if any of read or write are set, we need to remove
1656
             * the fd from the reactor */
1657
0
            if ((c->flags & F_CONN_REMOVED) != F_CONN_REMOVED){
1658
0
              reactor_del_all( fd, -1, IO_FD_CLOSING);
1659
0
              c->flags|=F_CONN_REMOVED;
1660
0
            }
1661
0
            close(fd);
1662
0
            c->s = -1;
1663
0
          }
1664
0
          _tcpconn_rm(c, shutdown?1:0);
1665
0
          tcp_connections_no--;
1666
0
        }
1667
0
        c=next;
1668
0
      }
1669
0
    }
1670
0
    if (!shutdown) TCPCONN_UNLOCK(part);
1671
0
  }
1672
0
}
1673
1674
1675
static void tcp_main_server(void)
1676
0
{
1677
0
  static unsigned int last_sec = 0;
1678
0
  int flags;
1679
0
  struct socket_info_full* sif;
1680
0
  int n;
1681
1682
  /* we run in a separate, dedicated process, with its own reactor
1683
   * (reactors are per process) */
1684
0
  if (init_worker_reactor("TCP_main", RCT_PRIO_MAX)<0)
1685
0
    goto error;
1686
1687
  /* now start watching all the fds */
1688
1689
  /* add all the sockets we listens on for connections */
1690
0
  for( n=PROTO_FIRST ; n<PROTO_LAST ; n++ )
1691
0
    if ( is_tcp_based_proto(n) )
1692
0
      for( sif=protos[n].listeners ; sif ; sif=sif->next ) {
1693
0
        struct socket_info* si = &sif->socket_info;
1694
0
        if (protos[n].tran.init_listener(si)<0) {
1695
0
          LM_ERR("failed to init listener [%.*s], proto %s\n",
1696
0
            si->name.len, si->name.s,
1697
0
            protos[n].name );
1698
0
          goto error;
1699
0
        }
1700
0
        if (protos[n].tran.bind_listener && protos[n].tran.bind_listener(si)<0) {
1701
0
          LM_ERR("failed to bind listener [%.*s], proto %s\n",
1702
0
            si->name.len, si->name.s,
1703
0
            protos[n].name );
1704
0
          goto error;
1705
0
        }
1706
0
        if(reactor_add_reader( si->socket, F_TCP_LISTENER,
1707
0
        RCT_PRIO_NET, si)<0 ) {
1708
0
          LM_ERR("failed to add listen socket to reactor\n");
1709
0
          goto error;
1710
0
        }
1711
0
      }
1712
  /* add all the unix sockets used for communcation with other opensips
1713
   * processes (get fd, new connection a.s.o)
1714
   * NOTE: we add even the socks for the inactive/unfork processes - the
1715
   *       socks are already created, but the triggering is from proc to
1716
   *       main, having them into reactor is harmless - they will never
1717
   *       trigger as there is no proc on the other end to write us */
1718
0
  for (n=1; n<counted_max_processes; n++) {
1719
    /* skip myslef (as process) and -1 socks (disabled)
1720
       (we can't have 0, we never close it!) */
1721
0
    if (n!=process_no && pt[n].tcp_socks_holder[0]>0)
1722
0
      if (reactor_add_reader( pt[n].tcp_socks_holder[0], F_TCP_WORKER,
1723
0
      RCT_PRIO_PROC, &pt[n])<0){
1724
0
        LM_ERR("failed to add process %d (%s) unix socket "
1725
0
          "to the fd list\n", n, pt[n].desc);
1726
0
        goto error;
1727
0
      }
1728
0
  }
1729
  /* add all the unix sokets used for communication with the tcp workers */
1730
0
  for (n=0; n<tcp_workers_max_no; n++) {
1731
    /*we can't have 0, we never close it!*/
1732
0
    if (tcp_workers[n].unix_sock>0) {
1733
      /* make socket non-blocking */
1734
0
      flags=fcntl(tcp_workers[n].unix_sock, F_GETFL);
1735
0
      if (flags==-1){
1736
0
        LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
1737
0
        goto error;
1738
0
      }
1739
0
      if (fcntl(tcp_workers[n].unix_sock,F_SETFL,flags|O_NONBLOCK)==-1){
1740
0
        LM_ERR("set non-blocking failed: (%d) %s\n",
1741
0
          errno, strerror(errno));
1742
0
        goto error;
1743
0
      }
1744
      /* add socket for listening */
1745
0
      if (reactor_add_reader( tcp_workers[n].unix_sock,
1746
0
      F_TCP_TCPWORKER, RCT_PRIO_PROC, &tcp_workers[n])<0) {
1747
0
        LM_ERR("failed to add tcp worker %d unix socket to "
1748
0
            "the fd list\n", n);
1749
0
        goto error;
1750
0
      }
1751
0
    }
1752
0
  }
1753
1754
  /* init: start watching for the IPC jobs */
1755
0
  if (reactor_add_reader(IPC_FD_READ_SELF, F_IPC, RCT_PRIO_ASYNC, NULL)<0){
1756
0
    LM_CRIT("failed to add IPC pipe to reactor\n");
1757
0
    goto error;
1758
0
  }
1759
1760
0
  is_tcp_main = 1;
1761
1762
  /* main loop (requires "handle_io()" implementation) */
1763
0
  reactor_main_loop( TCP_MAIN_SELECT_TIMEOUT, error,
1764
0
      tcpconn_lifetime(last_sec) );
1765
1766
0
error:
1767
0
  destroy_worker_reactor();
1768
0
  LM_CRIT("exiting...");
1769
0
  exit(-1);
1770
0
}
1771
1772
1773
1774
/**************************** Control functions ******************************/
1775
1776
/* initializes the TCP network level in terms of data structures */
1777
int tcp_init(void)
1778
0
{
1779
0
  unsigned int i;
1780
1781
  /* first we do auto-detection to see if there are any TCP based
1782
   * protocols loaded */
1783
0
  for ( i=PROTO_FIRST ; i<PROTO_LAST ; i++ )
1784
0
    if (is_tcp_based_proto(i) && proto_has_listeners(i)) {
1785
0
      tcp_disabled=0;
1786
0
      break;
1787
0
    }
1788
1789
0
  tcp_init_con_profiles();
1790
1791
0
  if (tcp_disabled)
1792
0
    return 0;
1793
1794
#ifdef DBG_TCPCON
1795
  con_hist = shl_init("TCP con", 10000, 0);
1796
  if (!con_hist) {
1797
    LM_ERR("oom con hist\n");
1798
    goto error;
1799
  }
1800
#endif
1801
1802
0
  if (tcp_auto_scaling_profile) {
1803
0
    s_profile = get_scaling_profile(tcp_auto_scaling_profile);
1804
0
    if (s_profile==NULL) {
1805
0
      LM_WARN("TCP scaling profile <%s> not defined "
1806
0
        "-> ignoring it...\n", tcp_auto_scaling_profile);
1807
0
    } else {
1808
0
      auto_scaling_enabled = 1;
1809
0
    }
1810
0
  }
1811
1812
0
  tcp_workers_max_no = (s_profile && (tcp_workers_no<s_profile->max_procs)) ?
1813
0
    s_profile->max_procs : tcp_workers_no ;
1814
1815
  /* init tcp workers array */
1816
0
  tcp_workers = (struct tcp_worker*)shm_malloc
1817
0
    ( tcp_workers_max_no*sizeof(struct tcp_worker) );
1818
0
  if (tcp_workers==0) {
1819
0
    LM_CRIT("could not alloc tcp_workers array in shm memory\n");
1820
0
    goto error;
1821
0
  }
1822
0
  memset( tcp_workers, 0, tcp_workers_max_no*sizeof(struct tcp_worker));
1823
  /* init globals */
1824
0
  connection_id=(unsigned int*)shm_malloc(sizeof(unsigned int));
1825
0
  if (connection_id==0){
1826
0
    LM_CRIT("could not alloc globals in shm memory\n");
1827
0
    goto error;
1828
0
  }
1829
  // The  rand()  function returns a pseudo-random integer in the range 0 to
1830
  // RAND_MAX inclusive (i.e., the mathematical range [0, RAND_MAX]).
1831
0
  *connection_id=(unsigned int)rand();
1832
0
  memset( &tcp_parts, 0, TCP_PARTITION_SIZE*sizeof(struct tcp_partition));
1833
  /* init partitions */
1834
0
  for( i=0 ; i<TCP_PARTITION_SIZE ; i++ ) {
1835
    /* init lock */
1836
0
    tcp_parts[i].tcpconn_lock=lock_alloc();
1837
0
    if (tcp_parts[i].tcpconn_lock==0){
1838
0
      LM_CRIT("could not alloc lock\n");
1839
0
      goto error;
1840
0
    }
1841
0
    if (lock_init(tcp_parts[i].tcpconn_lock)==0){
1842
0
      LM_CRIT("could not init lock\n");
1843
0
      lock_dealloc((void*)tcp_parts[i].tcpconn_lock);
1844
0
      tcp_parts[i].tcpconn_lock=0;
1845
0
      goto error;
1846
0
    }
1847
    /* alloc hashtables*/
1848
0
    tcp_parts[i].tcpconn_aliases_hash=(struct tcp_conn_alias**)
1849
0
      shm_malloc(TCP_ALIAS_HASH_SIZE* sizeof(struct tcp_conn_alias*));
1850
0
    if (tcp_parts[i].tcpconn_aliases_hash==0){
1851
0
      LM_CRIT("could not alloc address hashtable in shm memory\n");
1852
0
      goto error;
1853
0
    }
1854
0
    tcp_parts[i].tcpconn_id_hash=(struct tcp_connection**)
1855
0
      shm_malloc(TCP_ID_HASH_SIZE*sizeof(struct tcp_connection*));
1856
0
    if (tcp_parts[i].tcpconn_id_hash==0){
1857
0
      LM_CRIT("could not alloc id hashtable in shm memory\n");
1858
0
      goto error;
1859
0
    }
1860
    /* init hashtables*/
1861
0
    memset((void*)tcp_parts[i].tcpconn_aliases_hash, 0,
1862
0
      TCP_ALIAS_HASH_SIZE * sizeof(struct tcp_conn_alias*));
1863
0
    memset((void*)tcp_parts[i].tcpconn_id_hash, 0,
1864
0
      TCP_ID_HASH_SIZE * sizeof(struct tcp_connection*));
1865
0
  }
1866
1867
0
  return 0;
1868
0
error:
1869
  /* clean-up */
1870
0
  tcp_destroy();
1871
0
  return -1;
1872
0
}
1873
1874
1875
/* destroys the TCP data */
1876
void tcp_destroy(void)
1877
0
{
1878
0
  int part;
1879
1880
0
  if (tcp_parts[0].tcpconn_id_hash)
1881
      /* force close/expire for all active tcpconns*/
1882
0
      __tcpconn_lifetime(1);
1883
1884
0
  if (connection_id){
1885
0
    shm_free(connection_id);
1886
0
    connection_id=0;
1887
0
  }
1888
1889
0
  for ( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) {
1890
0
    if (tcp_parts[part].tcpconn_id_hash){
1891
0
      shm_free(tcp_parts[part].tcpconn_id_hash);
1892
0
      tcp_parts[part].tcpconn_id_hash=0;
1893
0
    }
1894
0
    if (tcp_parts[part].tcpconn_aliases_hash){
1895
0
      shm_free(tcp_parts[part].tcpconn_aliases_hash);
1896
0
      tcp_parts[part].tcpconn_aliases_hash=0;
1897
0
    }
1898
0
    if (tcp_parts[part].tcpconn_lock){
1899
0
      lock_destroy(tcp_parts[part].tcpconn_lock);
1900
0
      lock_dealloc((void*)tcp_parts[part].tcpconn_lock);
1901
0
      tcp_parts[part].tcpconn_lock=0;
1902
0
    }
1903
0
  }
1904
0
}
1905
1906
1907
int tcp_create_comm_proc_socks( int proc_no)
1908
0
{
1909
0
  int i;
1910
1911
0
  if (tcp_disabled)
1912
0
    return 0;
1913
1914
0
  for( i=0 ; i<proc_no ; i++ ) {
1915
0
    if (socketpair(AF_UNIX, SOCK_STREAM, 0, pt[i].tcp_socks_holder)<0){
1916
0
      LM_ERR("socketpair failed for process %d: %d/%s\n",
1917
0
        i, errno, strerror(errno));
1918
0
      return -1;
1919
0
    }
1920
0
  }
1921
1922
0
  return 0;
1923
0
}
1924
1925
1926
int tcp_activate_comm_proc_socks( int proc_no)
1927
0
{
1928
0
  if (tcp_disabled)
1929
0
    return 0;
1930
1931
0
  unix_tcp_sock = pt[proc_no].tcp_socks_holder[1];
1932
0
  pt[proc_no].unix_sock = pt[proc_no].tcp_socks_holder[0];
1933
1934
0
  return 0;
1935
0
}
1936
1937
1938
void tcp_connect_proc_to_tcp_main( int proc_no, int worker )
1939
0
{
1940
0
  if (tcp_disabled)
1941
0
    return;
1942
1943
0
  if (worker) {
1944
0
    close( pt[proc_no].unix_sock );
1945
0
  } else {
1946
0
    unix_tcp_sock = -1;
1947
0
  }
1948
0
}
1949
1950
1951
int _get_own_tcp_worker_id(void)
1952
0
{
1953
0
  pid_t pid;
1954
0
  int i;
1955
1956
0
  pid = getpid();
1957
0
  for( i=0 ; i<tcp_workers_max_no ; i++)
1958
0
    if(tcp_workers[i].pid==pid)
1959
0
      return i;
1960
1961
0
  return -1;
1962
0
}
1963
1964
1965
void tcp_reset_worker_slot(void)
1966
0
{
1967
0
  int i;
1968
1969
0
  if ((i=_get_own_tcp_worker_id())>=0) {
1970
0
    tcp_workers[i].state=STATE_INACTIVE;
1971
0
    tcp_workers[i].pid=0;
1972
0
    tcp_workers[i].pt_idx=0;
1973
0
  }
1974
0
}
1975
1976
1977
static int fork_dynamic_tcp_process(void *foo)
1978
0
{
1979
0
  int p_id;
1980
0
  int r;
1981
0
  const struct internal_fork_params ifp_sr_tcp = {
1982
0
    .proc_desc = "SIP receiver TCP",
1983
0
    .flags = OSS_PROC_DYNAMIC|OSS_PROC_NEEDS_SCRIPT,
1984
0
    .type = TYPE_TCP,
1985
0
  };
1986
1987
  /* search for free slot in the TCP workers table */
1988
0
  for( r=0 ; r<tcp_workers_max_no ; r++ )
1989
0
    if (tcp_workers[r].state==STATE_INACTIVE)
1990
0
      break;
1991
1992
0
  if (r==tcp_workers_max_no) {
1993
0
    LM_BUG("trying to fork one more TCP worker but no free slots in "
1994
0
      "the TCP table (size=%d)\n",tcp_workers_max_no);
1995
0
    return -1;
1996
0
  }
1997
1998
0
  if((p_id=internal_fork(&ifp_sr_tcp))<0){
1999
0
    LM_ERR("cannot fork dynamic TCP worker process\n");
2000
0
    return(-1);
2001
0
  }else if (p_id==0){
2002
    /* new TCP process */
2003
0
    set_proc_attrs("TCP receiver");
2004
0
    tcp_workers[r].pid = getpid();
2005
2006
0
    if (tcp_worker_proc_reactor_init(tcp_workers[r].main_unix_sock)<0||
2007
0
    init_child(20000) < 0) {
2008
0
      goto error;
2009
0
    }
2010
2011
0
    report_conditional_status( 1, 0);/*report success*/
2012
    /* the child proc is done read&write) dealing with the status pipe */
2013
0
    clean_read_pipeend();
2014
2015
0
    tcp_worker_proc_loop();
2016
0
    destroy_worker_reactor();
2017
2018
0
error:
2019
0
    report_failure_status();
2020
0
    LM_ERR("Initializing new process failed, exiting with error \n");
2021
0
    pt[process_no].flags |= OSS_PROC_SELFEXIT;
2022
0
    exit( -1);
2023
0
  } else {
2024
    /*parent/main*/
2025
0
    tcp_workers[r].state=STATE_ACTIVE;
2026
0
    tcp_workers[r].n_reqs=0;
2027
0
    tcp_workers[r].pt_idx=p_id;
2028
0
    return p_id;
2029
0
  }
2030
2031
0
  return 0;
2032
0
}
2033
2034
2035
static void tcp_process_graceful_terminate(int sender, void *param)
2036
0
{
2037
0
  int i;
2038
2039
  /* we accept this only from the main proccess */
2040
0
  if (sender!=0) {
2041
0
    LM_BUG("graceful terminate received from a non-main process!!\n");
2042
0
    return;
2043
0
  }
2044
0
  LM_NOTICE("process %d received RPC to terminate from Main\n",process_no);
2045
2046
  /* going into "draining" state will avoid:
2047
   *  - getting jobs from TCP MAIN (active state required for that)
2048
   *  - having othe worker slot re-used (inactive state required for that) */
2049
0
  if ((i=_get_own_tcp_worker_id())>=0)
2050
0
    tcp_workers[i].state=STATE_DRAINING;
2051
2052
0
  tcp_terminate_worker();
2053
2054
0
  return;
2055
0
}
2056
2057
2058
/* counts the number of TCP processes to start with; this number may
2059
 * change during runtime due auto-scaling */
2060
int tcp_count_processes(unsigned int *extra)
2061
0
{
2062
0
  if (extra) *extra = 0;
2063
2064
0
  if (tcp_disabled)
2065
0
    return 0;
2066
2067
2068
0
  if (s_profile && extra) {
2069
    /* how many can be forked over the number of procs to start with ?*/
2070
0
    if (s_profile->max_procs > tcp_workers_no)
2071
0
      *extra = s_profile->max_procs - tcp_workers_no;
2072
0
  }
2073
2074
0
  return 1/* tcp main */ + tcp_workers_no /*workers to start with*/;
2075
0
}
2076
2077
2078
int tcp_start_processes(int *chd_rank, int *startup_done)
2079
0
{
2080
0
  int r, n, p_id;
2081
0
  int reader_fd[2]; /* for comm. with the tcp workers read  */
2082
0
  struct socket_info_full *sif;
2083
0
  const struct internal_fork_params ifp_sr_tcp = {
2084
0
    .proc_desc = "SIP receiver TCP",
2085
0
    .flags = OSS_PROC_NEEDS_SCRIPT,
2086
0
    .type = TYPE_TCP,
2087
0
  };
2088
2089
0
  if (tcp_disabled)
2090
0
    return 0;
2091
2092
  /* estimate max fd. no:
2093
   * 1 tcp send unix socket/all_proc,
2094
   *  + 1 udp sock/udp proc + 1 tcp_worker sock/tcp worker*
2095
   *  + no_listen_tcp */
2096
0
  for( r=0,n=PROTO_FIRST ; n<PROTO_LAST ; n++ )
2097
0
    if ( is_tcp_based_proto(n) )
2098
0
      for(sif=protos[n].listeners; sif ; sif=sif->next,r++ );
2099
2100
  /* create the socket pairs for ALL potential processes */
2101
0
  for(r=0; r<tcp_workers_max_no; r++){
2102
    /* create sock to communicate from TCP main to worker */
2103
0
    if (socketpair(AF_UNIX, SOCK_STREAM, 0, reader_fd)<0){
2104
0
      LM_ERR("socketpair failed: %s\n", strerror(errno));
2105
0
      goto error;
2106
0
    }
2107
0
    tcp_workers[r].unix_sock = reader_fd[0]; /* worker's end */
2108
0
    tcp_workers[r].main_unix_sock = reader_fd[1]; /* main's end */
2109
0
  }
2110
2111
0
  if ( auto_scaling_enabled && s_profile &&
2112
0
  create_process_group( TYPE_TCP, NULL, s_profile,
2113
0
  fork_dynamic_tcp_process, tcp_process_graceful_terminate)!=0)
2114
0
    LM_ERR("failed to create group of TCP processes for, "
2115
0
      "auto forking will not be possible\n");
2116
2117
  /* start the TCP workers */
2118
0
  for(r=0; r<tcp_workers_no; r++){
2119
0
    (*chd_rank)++;
2120
0
    p_id=internal_fork(&ifp_sr_tcp);
2121
0
    if (p_id<0){
2122
0
      LM_ERR("fork failed\n");
2123
0
      goto error;
2124
0
    }else if (p_id>0){
2125
      /* parent */
2126
0
      tcp_workers[r].state=STATE_ACTIVE;
2127
0
      tcp_workers[r].n_reqs=0;
2128
0
      tcp_workers[r].pt_idx=p_id;
2129
0
    }else{
2130
      /* child */
2131
0
      set_proc_attrs("TCP receiver");
2132
0
      tcp_workers[r].pid = getpid();
2133
0
      if (tcp_worker_proc_reactor_init(tcp_workers[r].main_unix_sock)<0||
2134
0
          init_child(*chd_rank) < 0) {
2135
0
        LM_ERR("init_children failed\n");
2136
0
        report_failure_status();
2137
0
        if (startup_done)
2138
0
          *startup_done = -1;
2139
0
        exit(-1);
2140
0
      }
2141
2142
      /* was startup route executed so far ? */
2143
0
      if (startup_done!=NULL && *startup_done==0 && r==0) {
2144
0
        LM_DBG("running startup for first TCP\n");
2145
0
        if(run_startup_route()< 0) {
2146
0
          LM_ERR("Startup route processing failed\n");
2147
0
          report_failure_status();
2148
0
          *startup_done = -1;
2149
0
          exit(-1);
2150
0
        }
2151
0
        *startup_done = 1;
2152
0
      }
2153
2154
0
      report_conditional_status( (!no_daemon_mode), 0);
2155
2156
0
      tcp_worker_proc_loop();
2157
0
    }
2158
0
  }
2159
2160
  /* wait for the startup route to be executed */
2161
0
  if (startup_done)
2162
0
    while (!(*startup_done)) {
2163
0
      usleep(5);
2164
0
      handle_sigs();
2165
0
    }
2166
2167
0
  return 0;
2168
0
error:
2169
0
  return -1;
2170
0
}
2171
2172
2173
int tcp_start_listener(void)
2174
0
{
2175
0
  int p_id;
2176
0
  const struct internal_fork_params ifp_tcp_main = {
2177
0
    .proc_desc = "TCP main",
2178
0
    .flags = 0,
2179
0
    .type = TYPE_NONE,
2180
0
  };
2181
2182
0
  if (tcp_disabled)
2183
0
    return 0;
2184
2185
  /* start the TCP manager process */
2186
0
  if ( (p_id=internal_fork(&ifp_tcp_main))<0 ) {
2187
0
    LM_CRIT("cannot fork tcp main process\n");
2188
0
    goto error;
2189
0
  }else if (p_id==0){
2190
      /* child */
2191
    /* close the TCP inter-process sockets */
2192
0
    close(unix_tcp_sock);
2193
0
    unix_tcp_sock = -1;
2194
0
    close(pt[process_no].unix_sock);
2195
0
    pt[process_no].unix_sock = -1;
2196
2197
0
    report_conditional_status( (!no_daemon_mode), 0);
2198
2199
0
    tcp_main_server();
2200
0
    exit(-1);
2201
0
  }
2202
2203
0
  return 0;
2204
0
error:
2205
0
  return -1;
2206
0
}
2207
2208
int tcp_has_async_write(void)
2209
0
{
2210
0
  return reactor_has_async();
2211
0
}
2212
2213
2214
/***************************** MI functions **********************************/
2215
2216
mi_response_t *mi_tcp_list_conns(const mi_params_t *params,
2217
            struct mi_handler *async_hdl)
2218
0
{
2219
0
  mi_response_t *resp;
2220
0
  mi_item_t *resp_obj;
2221
0
  mi_item_t *conns_arr, *conn_item;
2222
0
  struct tcp_connection *conn;
2223
0
  time_t _ts;
2224
0
  char date_buf[MI_DATE_BUF_LEN];
2225
0
  int date_buf_len;
2226
0
  unsigned int i,j,part;
2227
0
  char proto[PROTO_NAME_MAX_SIZE];
2228
0
  struct tm ltime;
2229
0
  char *p;
2230
2231
0
  if (tcp_disabled)
2232
0
    return init_mi_result_null();
2233
2234
0
  resp = init_mi_result_object(&resp_obj);
2235
0
  if (!resp)
2236
0
    return 0;
2237
2238
0
  conns_arr = add_mi_array(resp_obj, MI_SSTR("Connections"));
2239
0
  if (!conns_arr) {
2240
0
    free_mi_response(resp);
2241
0
    return 0;
2242
0
  }
2243
2244
0
  for( part=0 ; part<TCP_PARTITION_SIZE ; part++) {
2245
0
    TCPCONN_LOCK(part);
2246
0
    for( i=0; i<TCP_ID_HASH_SIZE ; i++ ) {
2247
0
      for(conn=TCP_PART(part).tcpconn_id_hash[i];conn;conn=conn->id_next){
2248
        /* add one object fo each conn */
2249
0
        conn_item = add_mi_object(conns_arr, 0, 0);
2250
0
        if (!conn_item)
2251
0
          goto error;
2252
2253
        /* add ID */
2254
0
        if (add_mi_number(conn_item, MI_SSTR("ID"), conn->id) < 0)
2255
0
          goto error;
2256
2257
        /* add type/proto */
2258
0
        p = proto2str(conn->type, proto);
2259
0
        if (add_mi_string(conn_item, MI_SSTR("Type"), proto,
2260
0
          (int)(long)(p-proto)) > 0)
2261
0
          goto error;
2262
2263
        /* add state */
2264
0
        if (add_mi_number(conn_item, MI_SSTR("State"), conn->state) < 0)
2265
0
          goto error;
2266
2267
        /* add Remote IP:Port */
2268
0
        if (add_mi_string_fmt(conn_item, MI_SSTR("Remote"), "%s:%d",
2269
0
          ip_addr2a(&conn->rcv.src_ip), conn->rcv.src_port) < 0)
2270
0
          goto error;
2271
2272
        /* add Local IP:Port */
2273
0
        if (add_mi_string_fmt(conn_item, MI_SSTR("Local"), "%s:%d",
2274
0
          ip_addr2a(&conn->rcv.dst_ip), conn->rcv.dst_port) < 0)
2275
0
          goto error;
2276
2277
        /* add lifetime */
2278
0
        _ts = (time_t)conn->lifetime + startup_time;
2279
0
        localtime_r(&_ts, &ltime);
2280
0
        date_buf_len = strftime(date_buf, MI_DATE_BUF_LEN - 1,
2281
0
                    "%Y-%m-%d %H:%M:%S", &ltime);
2282
0
        if (date_buf_len != 0) {
2283
0
          if (add_mi_string(conn_item, MI_SSTR("Lifetime"),
2284
0
            date_buf, date_buf_len) < 0)
2285
0
            goto error;
2286
0
        } else {
2287
0
          if (add_mi_number(conn_item, MI_SSTR("Lifetime"), _ts) < 0)
2288
0
            goto error;
2289
0
        }
2290
2291
        /* add the port-aliases */
2292
0
        for( j=0 ; j<conn->aliases ; j++ )
2293
          /* add one node for each conn */
2294
0
          add_mi_number( conn_item, MI_SSTR("Alias port"),
2295
0
            conn->con_aliases[j].port );
2296
0
      }
2297
0
    }
2298
2299
0
    TCPCONN_UNLOCK(part);
2300
0
  }
2301
2302
0
  return resp;
2303
2304
0
error:
2305
0
  TCPCONN_UNLOCK(part);
2306
0
  LM_ERR("failed to add MI item\n");
2307
0
  free_mi_response(resp);
2308
0
  return 0;
2309
0
}