Coverage Report

Created: 2026-04-29 06:40

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/opensips/net/net_tcp.c
Line
Count
Source
1
/*
2
 * Copyright (C) 2014-2015 OpenSIPS Project
3
 * Copyright (C) 2001-2003 FhG Fokus
4
 *
5
 * This file is part of opensips, a free SIP server.
6
 *
7
 * opensips is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version.
11
 *
12
 * opensips is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20
 *
21
 *
22
 * history:
23
 * ---------
24
 *  2015-01-xx  created (razvanc)
25
 */
26
27
#include <sys/types.h>
28
#include <sys/socket.h>
29
#include <netinet/in.h>
30
#include <netinet/in_systm.h>
31
#include <netinet/ip.h>
32
#include <netinet/tcp.h>
33
#include <sys/uio.h>  /* writev*/
34
#include <netdb.h>
35
#include <stdlib.h> /*exit() */
36
#include <time.h>   /*time() */
37
#include <unistd.h>
38
#include <errno.h>
39
#include <string.h>
40
#include <pthread.h>
41
#include <stdint.h>
42
43
#include "../mem/mem.h"
44
#include "../mem/shm_mem.h"
45
#include "../globals.h"
46
#include "../locking.h"
47
#include "../socket_info.h"
48
#include "../ut.h"
49
#include "../pt.h"
50
#include "../pt_load.h"
51
#include "../daemonize.h"
52
#include "../status_report.h"
53
#include "../reactor.h"
54
#include "../timer.h"
55
#include "../ipc.h"
56
#include "../receive.h"
57
#include "../lib/cond.h"
58
59
#include "tcp_passfd.h"
60
#include "net_tcp_proc.h"
61
#include "net_tcp_report.h"
62
#include "net_tcp.h"
63
#include "tcp_common.h"
64
#include "tcp_conn.h"
65
#include "tcp_conn_profile.h"
66
#include "trans.h"
67
#include "net_tcp_dbg.h"
68
69
struct struct_hist_list *con_hist;
70
71
enum tcp_worker_state { STATE_INACTIVE=0, STATE_ACTIVE, STATE_DRAINING};
72
73
static int tcpconn_prepare_write(struct tcp_connection *tcpconn);
74
75
/* definition of a TCP worker - the array of these TCP workers is
76
 * mainly intended to be used by the TCP main, to keep track of the
77
 * workers, about their load and so. Nevertheless, since the addition
78
 * of the process auto-scaling, other processes may need access to this
79
 * data, thus it's relocation in SHM (versus initial PKG). For example,
80
 * the attendant process is the one forking new TCP workers (scaling up),
81
 * so it must be able to set the ENABLE state for the TCP worker (and being
82
 * (seen by the TCP main proc). Similar, when a TCP worker shuts down, it has
83
 * to mark itself as DISABLED and the TCP main must see that.
84
 * Again, 99% this array is intended for TCP Main ops, it is not lock
85
 * protected, so be very careful with any ops from other procs.
86
 */
87
struct tcp_worker {
88
  pid_t pid;
89
  int pt_idx;     /*!< Index in the main Process Table */
90
  enum tcp_worker_state state;
91
};
92
93
/* definition of a TCP partition */
94
struct tcp_partition {
95
  /*! \brief connection hash table (after ip&port), includes also aliases */
96
  struct tcp_conn_alias** tcpconn_aliases_hash;
97
  /*! \brief connection hash table (after connection id) */
98
  struct tcp_connection** tcpconn_id_hash;
99
  gen_lock_t* tcpconn_lock;
100
};
101
102
103
/* array of TCP workers - to be used only by TCP MAIN */
104
struct tcp_worker *tcp_workers=0;
105
static int tcp_dispatch_sock[2] = { -1, -1 };
106
107
/* unique for each connection, used for
108
 * quickly finding the corresponding connection for a reply */
109
static unsigned int* connection_id=0;
110
static int *tcp_main_proc_no = 0;
111
112
/* array of TCP partitions */
113
static struct tcp_partition tcp_parts[TCP_PARTITION_SIZE];
114
115
/*!< current number of open connections */
116
static unsigned int *tcp_connections_no = 0;
117
static gen_lock_t *tcp_connections_lock = 0;
118
119
/*!< by default don't accept aliases */
120
int tcp_accept_aliases=0;
121
int tcp_connect_timeout=DEFAULT_TCP_CONNECT_TIMEOUT;
122
int tcp_con_lifetime=DEFAULT_TCP_CONNECTION_LIFETIME;
123
int tcp_socket_backlog=DEFAULT_TCP_SOCKET_BACKLOG;
124
/*!< by default choose the best method */
125
enum poll_types tcp_poll_method=0;
126
int tcp_max_connections=DEFAULT_TCP_MAX_CONNECTIONS;
127
/* the configured/starting number of TCP workers */
128
int tcp_workers_no = UDP_WORKERS_NO;
129
/* the maximum numbers of TCP workers */
130
int tcp_workers_max_no;
131
/* the name of the auto-scaling profile (optional) */
132
char* tcp_auto_scaling_profile = NULL;
133
/* Max number of seconds that we expect a full SIP message
134
 * to arrive in. Anything above will close the connection. */
135
int tcp_max_msg_time = TCP_CHILD_MAX_MSG_TIME;
136
#ifdef HAVE_SO_KEEPALIVE
137
    int tcp_keepalive = 1;
138
#else
139
    int tcp_keepalive = 0;
140
#endif
141
int tcp_keepcount = 0;
142
int tcp_keepidle = 0;
143
int tcp_keepinterval = 0;
144
145
/*!< should we allow opening a new TCP conn when sending data 
146
 * over UAC branches? - branch flag to be set in the SIP messages */
147
int tcp_no_new_conn_bflag = 0;
148
/*!< should we allow opening a new TCP conn when sending data 
149
 * back to UAS (replies)? - msg flag to be set in the SIP messages */
150
int tcp_no_new_conn_rplflag = 0;
151
/*!< should a new TCP conn be open if needed? - variable used to used for
152
 * signalizing between SIP layer (branch flag) and TCP layer (tcp_send func)*/
153
int tcp_no_new_conn = 0;
154
int tcp_threads = 0;
155
156
/* if the TCP net layer is on or off (if no TCP based protos are loaded) */
157
static int tcp_disabled = 1;
158
159
/* is the process TCP MAIN ? */
160
int is_tcp_main = 0;
161
162
/* the ID of the TCP conn used for the last send operation in the
163
 * current process - attention, this is a really ugly HACK here */
164
unsigned int last_outgoing_tcp_id = 0;
165
166
static struct scaling_profile *s_profile = NULL;
167
168
/****************************** helper functions *****************************/
169
extern void handle_sigs(void);
170
171
static inline int init_sock_keepalive(int s, const struct tcp_conn_profile *prof)
172
0
{
173
0
  int ka;
174
0
#if defined(HAVE_TCP_KEEPINTVL) || defined(HAVE_TCP_KEEPIDLE) || defined(HAVE_TCP_KEEPCNT)
175
0
  int optval;
176
0
#endif
177
178
0
  if (prof->keepinterval || prof->keepidle || prof->keepcount)
179
0
    ka = 1; /* force on */
180
0
  else
181
0
    ka = prof->keepalive;
182
183
0
#ifdef HAVE_SO_KEEPALIVE
184
0
  if (setsockopt(s,SOL_SOCKET,SO_KEEPALIVE,&ka,sizeof(ka))<0){
185
0
    LM_WARN("setsockopt failed to enable SO_KEEPALIVE: %s\n",
186
0
      strerror(errno));
187
0
    return -1;
188
0
  }
189
0
  LM_DBG("TCP keepalive enabled on socket %d\n",s);
190
0
#endif
191
0
#ifdef HAVE_TCP_KEEPINTVL
192
0
  if ((optval = prof->keepinterval)) {
193
0
    if (setsockopt(s,IPPROTO_TCP,TCP_KEEPINTVL,&optval,sizeof(optval))<0){
194
0
      LM_WARN("setsockopt failed to set keepalive probes interval: %s\n",
195
0
        strerror(errno));
196
0
    }
197
0
  }
198
0
#endif
199
0
#ifdef HAVE_TCP_KEEPIDLE
200
0
  if ((optval = prof->keepidle)) {
201
0
    if (setsockopt(s,IPPROTO_TCP,TCP_KEEPIDLE,&optval,sizeof(optval))<0){
202
0
      LM_WARN("setsockopt failed to set keepalive idle interval: %s\n",
203
0
        strerror(errno));
204
0
    }
205
0
  }
206
0
#endif
207
0
#ifdef HAVE_TCP_KEEPCNT
208
0
  if ((optval = prof->keepcount)) {
209
0
    if (setsockopt(s,IPPROTO_TCP,TCP_KEEPCNT,&optval,sizeof(optval))<0){
210
0
      LM_WARN("setsockopt failed to set maximum keepalive count: %s\n",
211
0
        strerror(errno));
212
0
    }
213
0
  }
214
0
#endif
215
0
  return 0;
216
0
}
217
218
static inline void set_sock_reuseport(int s)
219
0
{
220
0
  int yes = 1;
221
222
0
  if (setsockopt(s,SOL_SOCKET,SO_REUSEPORT,&yes,sizeof(yes))<0){
223
0
    LM_WARN("setsockopt failed to set SO_REUSEPORT: %s\n",
224
0
      strerror(errno));
225
0
  }
226
0
  if (setsockopt(s,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))<0){
227
0
    LM_WARN("setsockopt failed to set SO_REUSEADDR: %s\n",
228
0
      strerror(errno));
229
0
  }
230
0
}
231
232
/*! \brief Set all socket/fd options:  disable nagle, tos lowdelay,
233
 * non-blocking
234
 * \return -1 on error */
235
int tcp_init_sock_opt(int s, const struct tcp_conn_profile *prof, enum si_flags socketflags, int sock_tos)
236
0
{
237
0
  int flags;
238
0
  int optval;
239
240
0
#ifdef DISABLE_NAGLE
241
0
  flags=1;
242
0
  if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags)) < 0){
243
0
    LM_WARN("could not disable Nagle: %s\n", strerror(errno));
244
0
  }
245
0
#endif
246
  /* tos*/
247
0
  optval = (sock_tos > 0) ? sock_tos : tos;
248
0
  if (optval > 0) {
249
0
    if (setsockopt(s, IPPROTO_IP, IP_TOS, (void*)&optval,sizeof(optval)) ==-1){
250
0
      LM_WARN("setsockopt tos: %s\n",  strerror(errno));
251
      /* continue since this is not critical */
252
0
    }
253
0
  }
254
255
0
  if (probe_max_sock_buff(s,1,MAX_SEND_BUFFER_SIZE,BUFFER_INCREMENT)) {
256
0
    LM_WARN("setsockopt tcp snd buff: %s\n", strerror(errno));
257
    /* continue since this is not critical */
258
0
  }
259
260
0
  init_sock_keepalive(s, prof);
261
0
  if (socketflags & SI_REUSEPORT)
262
0
    set_sock_reuseport(s);
263
264
  /* non-blocking */
265
0
  flags=fcntl(s, F_GETFL);
266
0
  if (flags==-1){
267
0
    LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
268
0
    goto error;
269
0
  }
270
0
  if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){
271
0
    LM_ERR("set non-blocking failed: (%d) %s\n", errno, strerror(errno));
272
0
    goto error;
273
0
  }
274
0
  return 0;
275
0
error:
276
0
  return -1;
277
0
}
278
279
struct tcp_ipc_payload {
280
  struct receive_info rcv;
281
  struct tcp_connection *conn;
282
  int msg_len;
283
  int data_len;
284
  char msg_buf[0];
285
};
286
287
int tcp_dispatch_msg(char *msg, int len,
288
    struct receive_info *rcv, const void *data, int data_len)
289
0
{
290
0
  struct tcp_ipc_payload *payload;
291
0
  struct tcp_connection *conn = NULL;
292
0
  unsigned int alloc_len;
293
0
  int n;
294
0
  uintptr_t payload_ptr;
295
296
0
  if (len < 0) {
297
0
    LM_BUG("negative TCP message length: %d\n", len);
298
0
    return -1;
299
0
  }
300
0
  if (data_len < 0) {
301
0
    LM_BUG("negative TCP dispatch data length: %d\n", data_len);
302
0
    return -1;
303
0
  }
304
0
  if (data_len && !data) {
305
0
    LM_BUG("missing TCP dispatch data buffer for %d bytes\n", data_len);
306
0
    return -1;
307
0
  }
308
309
0
  alloc_len = sizeof(*payload) + len + 1 + data_len;
310
0
  payload = shm_malloc(alloc_len);
311
0
  if (!payload) {
312
0
    LM_ERR("oom while allocating TCP IPC payload (%u bytes)\n", alloc_len);
313
0
    return -1;
314
0
  }
315
316
0
  memcpy(&payload->rcv, rcv, sizeof(payload->rcv));
317
0
  payload->conn = NULL;
318
0
  payload->msg_len = len;
319
0
  payload->data_len = data_len;
320
0
  memcpy(payload->msg_buf, msg, len);
321
0
  payload->msg_buf[len] = '\0';
322
0
  if (data_len)
323
0
    memcpy(payload->msg_buf + len + 1, data, data_len);
324
325
0
  if (rcv->proto_reserved1 &&
326
0
      tcp_conn_get(rcv->proto_reserved1, NULL, 0, PROTO_NONE,
327
0
        NULL, &conn, NULL) > 0) {
328
0
    payload->conn = conn;
329
0
  }
330
331
0
  payload_ptr = (uintptr_t)payload;
332
0
  n = send(tcp_dispatch_sock[1], &payload_ptr, sizeof(payload_ptr), 0);
333
0
  if (n != (int)sizeof(payload_ptr)) {
334
0
    LM_ERR("failed to dispatch TCP message to worker socket: %s\n",
335
0
      (n < 0) ? strerror(errno) : "short write");
336
0
    if (payload->conn)
337
0
      tcpconn_put(payload->conn);
338
0
    shm_free(payload);
339
0
    return -1;
340
0
  }
341
342
0
  return 0;
343
0
}
344
345
enum tcp_job_op {
346
  TCP_READ_JOB = 1,
347
  TCP_WRITE_JOB = 2,
348
  TCP_RUN_JOB = 3,
349
};
350
351
struct tcp_job {
352
  struct tcp_connection *conn;
353
  int op;
354
  tcp_thread_job_f run;
355
  void *data;
356
  long resp;
357
  int ret;
358
  struct tcp_job *next;
359
};
360
361
static struct tcp_pool {
362
  pthread_t *threads;
363
  int threads_no;
364
  int stop;
365
  struct tcp_job *task_head;
366
  struct tcp_job *task_tail;
367
368
  pthread_mutex_t done_lock;
369
  struct tcp_job *done_head;
370
  struct tcp_job *done_tail;
371
372
  int notify_pipe[2];
373
} tcp_pool = {
374
  .threads = NULL,
375
  .threads_no = 0,
376
  .stop = 0,
377
  .task_head = NULL,
378
  .task_tail = NULL,
379
  .done_lock = PTHREAD_MUTEX_INITIALIZER,
380
  .done_head = NULL,
381
  .done_tail = NULL,
382
  .notify_pipe = {-1, -1},
383
};
384
385
struct tcp_shared_write_queue {
386
  gen_cond_t cond;
387
  struct tcp_connection *head;
388
  struct tcp_connection *tail;
389
};
390
391
static struct tcp_shared_write_queue *tcp_write_queue = NULL;
392
393
static inline int tcp_threads_active(void)
394
0
{
395
0
  return tcp_pool.threads_no > 0;
396
0
}
397
398
int tcp_write_in_main(void)
399
0
{
400
  /* This is a process-independent policy: TCP writes are handled by the
401
   * dedicated TCP main process, regardless of the caller process. */
402
0
  return !tcp_disabled;
403
0
}
404
405
406
407
/********************** TCP conn management functions ************************/
408
409
/* initializes an already defined TCP listener */
410
int tcp_init_listener(struct socket_info *si)
411
0
{
412
0
  union sockaddr_union* addr = &si->su;
413
414
0
  if (init_su(addr, &si->address, si->port_no)<0){
415
0
    LM_ERR("could no init sockaddr_union\n");
416
0
    return -1;
417
0
  }
418
419
0
  return 0;
420
0
}
421
422
/* binding an defined TCP listener */
423
int tcp_bind_listener(struct socket_info *si)
424
0
{
425
0
  union sockaddr_union* addr;
426
0
  int optval;
427
0
#ifdef DISABLE_NAGLE
428
0
  int flag;
429
0
#endif
430
431
0
  addr = &si->su;
432
0
  si->socket = socket(AF2PF(addr->s.sa_family), SOCK_STREAM, 0);
433
0
  if (si->socket==-1){
434
0
    LM_ERR("socket failed with [%s]\n", strerror(errno));
435
0
    goto error;
436
0
  }
437
0
#ifdef DISABLE_NAGLE
438
0
  flag=1;
439
0
  if (setsockopt(si->socket, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) < 0){
440
0
    LM_ERR("could not disable Nagle: %s\n",strerror(errno));
441
0
  }
442
0
#endif
443
444
0
#if  !defined(TCP_DONT_REUSEADDR)
445
  /* Stevens, "Network Programming", Section 7.5, "Generic Socket
446
   * Options": "...server started,..a child continues..on existing
447
   * connection..listening server is restarted...call to bind fails
448
   * ... ALL TCP servers should specify the SO_REUSEADDRE option
449
   * to allow the server to be restarted in this situation
450
   */
451
0
  optval=1;
452
0
  if (setsockopt(si->socket, SOL_SOCKET, SO_REUSEADDR,
453
0
  (void*)&optval, sizeof(optval))==-1) {
454
0
    LM_ERR("setsockopt failed with [%s]\n", strerror(errno));
455
0
    goto error;
456
0
  }
457
0
#endif
458
  /* tos */
459
0
  optval = (si->tos > 0) ? si->tos : tos;
460
0
  if (optval > 0) {
461
0
    if (setsockopt(si->socket, IPPROTO_IP, IP_TOS, (void*)&optval,
462
0
    sizeof(optval)) ==-1){
463
0
      LM_WARN("setsockopt tos: %s\n", strerror(errno));
464
      /* continue since this is not critical */
465
0
    }
466
0
  }
467
468
0
  if (probe_max_sock_buff(si->socket,1,MAX_SEND_BUFFER_SIZE,
469
0
  BUFFER_INCREMENT)) {
470
0
    LM_WARN("setsockopt tcp snd buff: %s\n",strerror(errno));
471
    /* continue since this is not critical */
472
0
  }
473
474
0
  init_sock_keepalive(si->socket, &tcp_con_df_profile);
475
0
  if (si->flags & SI_REUSEPORT)
476
0
    set_sock_reuseport(si->socket);
477
0
  if (bind(si->socket, &addr->s, sockaddru_len(*addr))==-1){
478
0
    LM_ERR("bind(%x, %p, %d) on %s:%d : %s\n",
479
0
        si->socket, &addr->s,
480
0
        (unsigned)sockaddru_len(*addr),
481
0
        si->address_str.s,
482
0
        si->port_no,
483
0
        strerror(errno));
484
0
    goto error;
485
0
  }
486
0
  if (listen(si->socket, tcp_socket_backlog)==-1){
487
0
    LM_ERR("listen(%x, %p, %d) on %s: %s\n",
488
0
        si->socket, &addr->s,
489
0
        (unsigned)sockaddru_len(*addr),
490
0
        si->address_str.s,
491
0
        strerror(errno));
492
0
    goto error;
493
0
  }
494
495
0
  return 0;
496
0
error:
497
0
  if (si->socket!=-1){
498
0
    close(si->socket);
499
0
    si->socket=-1;
500
0
  }
501
0
  return -1;
502
0
}
503
504
505
/*! \brief finds a connection, if id=0 return NULL
506
 * \note WARNING: unprotected (locks) use tcpconn_get unless you really
507
 * know what you are doing */
508
static struct tcp_connection* _tcpconn_find(unsigned int id)
509
0
{
510
0
  struct tcp_connection *c;
511
0
  unsigned hash;
512
513
0
  if (id){
514
0
    hash=tcp_id_hash(id);
515
0
    for (c=TCP_PART(id).tcpconn_id_hash[hash]; c; c=c->id_next){
516
#ifdef EXTRA_DEBUG
517
      LM_DBG("c=%p, c->id=%u, port=%d\n",c, c->id, c->rcv.src_port);
518
      print_ip("ip=", &c->rcv.src_ip, "\n");
519
#endif
520
0
      if ((id==c->id) && c->state!=S_CONN_BAD &&
521
0
          !(c->flags & F_CONN_FORCE_CLOSED))
522
0
        return c;
523
0
    }
524
0
  }
525
0
  return 0;
526
0
}
527
528
529
/* returns the correlation ID of a TCP connection */
530
int tcp_get_correlation_id( unsigned int id, unsigned long long *cid)
531
0
{
532
0
  struct tcp_connection* c;
533
534
0
  TCPCONN_LOCK(id);
535
0
  if ( (c=_tcpconn_find(id))!=NULL ) {
536
0
    *cid = c->cid;
537
0
    TCPCONN_UNLOCK(id);
538
0
    return 0;
539
0
  }
540
0
  *cid = 0;
541
0
  TCPCONN_UNLOCK(id);
542
0
  return -1;
543
0
}
544
545
/* returns the correlation ID of a TCP connection */
546
int tcp_get_rcv( unsigned int id, struct receive_info *ri)
547
0
{
548
0
  struct tcp_connection* c;
549
550
0
  TCPCONN_LOCK(id);
551
0
  if ( (c=_tcpconn_find(id))!=NULL ) {
552
0
    memcpy(ri, &c->rcv, sizeof *ri);
553
0
    TCPCONN_UNLOCK(id);
554
0
    return 0;
555
0
  }
556
0
  TCPCONN_UNLOCK(id);
557
0
  return -1;
558
0
}
559
560
int tcp_get_main_proc_no(void)
561
0
{
562
0
  return tcp_main_proc_no ? *tcp_main_proc_no : -1;
563
0
}
564
565
566
/*! \brief _tcpconn_find with locks and acquire a shared connection reference */
567
int tcp_conn_get(unsigned int id, struct ip_addr* ip, int port,
568
    enum sip_protos proto, void *proto_extra_id,
569
    struct tcp_connection** conn, const struct socket_info* send_sock)
570
0
{
571
0
  struct tcp_connection* c;
572
0
  struct tcp_conn_alias* a;
573
0
  unsigned hash;
574
0
  unsigned int part;
575
576
0
  if (id) {
577
0
    part = id;
578
0
    TCPCONN_LOCK(part);
579
0
    if ( (c=_tcpconn_find(part))!=NULL )
580
0
      goto found;
581
0
    TCPCONN_UNLOCK(part);
582
0
  }
583
584
  /* continue search based on IP address + port + transport */
585
#ifdef EXTRA_DEBUG
586
  LM_DBG("%d  port %u\n",id, port);
587
  if (ip) print_ip("tcpconn_find: ip ", ip, "\n");
588
#endif
589
0
  if (ip){
590
0
    hash=tcp_addr_hash(ip, port);
591
0
    for( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) {
592
0
      TCPCONN_LOCK(part);
593
0
      for (a=TCP_PART(part).tcpconn_aliases_hash[hash]; a; a=a->next) {
594
#ifdef EXTRA_DEBUG
595
        LM_DBG("a=%p, c=%p, c->id=%u, alias port= %d port=%d\n",
596
          a, a->parent, a->parent->id, a->port,
597
          a->parent->rcv.src_port);
598
        print_ip("ip=",&a->parent->rcv.src_ip,"\n");
599
        if (send_sock && a->parent->rcv.bind_address) {
600
          print_ip("requested send_sock ip=", &send_sock->address,"\n");
601
          print_ip("found send_sock ip=", &a->parent->rcv.bind_address->address,"\n");
602
        }
603
#endif
604
0
        c = a->parent;
605
0
        if (c->state != S_CONN_BAD &&
606
0
            !(c->flags & F_CONN_FORCE_CLOSED) &&
607
0
            ((c->flags & F_CONN_INIT) ||
608
0
             (c->state == S_CONN_CONNECTING && c->fd == -1)) &&
609
0
            (send_sock==NULL || send_sock == a->parent->rcv.bind_address) &&
610
0
            port == a->port &&
611
0
            proto == c->type &&
612
0
            ip_addr_cmp(ip, &c->rcv.src_ip) &&
613
0
            (proto_extra_id == NULL ||
614
0
             ((c->flags & F_CONN_INIT) &&
615
0
              (protos[proto].net.stream.conn.match == NULL ||
616
0
               protos[proto].net.stream.conn.match(c, proto_extra_id)))) )
617
0
            goto found;
618
0
        }
619
0
      TCPCONN_UNLOCK(part);
620
0
    }
621
0
  }
622
623
  /* not found */
624
0
  *conn = NULL;
625
0
  return 0;
626
627
0
found:
628
0
  c->refcnt++;
629
0
  TCPCONN_UNLOCK(part);
630
0
  sh_log(c->hist, TCP_REF, "tcp_conn_get, (%d)", c->refcnt);
631
632
0
  LM_DBG("con found in state %d\n",c->state);
633
634
0
  *conn = c;
635
0
  return 1;
636
0
}
637
638
639
/* used to tune the tcp_connection attributes - not to be used inside the
640
   network layer, but onlu from the above layer (otherwise we may end up
641
   in strange deadlocks!) */
642
int tcp_conn_fcntl(struct receive_info *rcv, int attr, void *value)
643
0
{
644
0
  struct tcp_connection *con;
645
646
0
  switch (attr) {
647
0
  case DST_FCNTL_SET_LIFETIME:
648
    /* set connection timeout */
649
0
    TCPCONN_LOCK(rcv->proto_reserved1);
650
0
    con =_tcpconn_find(rcv->proto_reserved1);
651
0
    if (!con) {
652
0
      LM_ERR("Strange, tcp conn not found (id=%u)\n",
653
0
        rcv->proto_reserved1);
654
0
    } else {
655
0
      tcp_conn_set_lifetime( con, (int)(long)(value));
656
0
    }
657
0
    TCPCONN_UNLOCK(rcv->proto_reserved1);
658
0
    return 0;
659
0
  default:
660
0
    LM_ERR("unsupported operation %d on conn\n",attr);
661
0
    return -1;
662
0
  }
663
0
  return -1;
664
0
}
665
666
667
static struct tcp_connection* tcpconn_add(struct tcp_connection *c)
668
0
{
669
0
  unsigned hash;
670
671
0
  if (c){
672
0
    TCPCONN_LOCK(c->id);
673
    /* add it at the beginning of the list*/
674
0
    hash=tcp_id_hash(c->id);
675
0
    c->id_hash=hash;
676
0
    tcpconn_listadd(TCP_PART(c->id).tcpconn_id_hash[hash], c, id_next,
677
0
      id_prev);
678
679
0
    hash=tcp_addr_hash(&c->rcv.src_ip, c->rcv.src_port);
680
    /* set the first alias */
681
0
    c->con_aliases[0].port=c->rcv.src_port;
682
0
    c->con_aliases[0].hash=hash;
683
0
    c->con_aliases[0].parent=c;
684
0
    tcpconn_listadd(TCP_PART(c->id).tcpconn_aliases_hash[hash],
685
0
      &c->con_aliases[0], next, prev);
686
0
    c->aliases++;
687
0
    c->flags |= F_CONN_HASHED;
688
0
    TCPCONN_UNLOCK(c->id);
689
0
    LM_DBG("hashes: %d, %d\n", hash, c->id_hash);
690
0
    return c;
691
0
  }else{
692
0
    LM_CRIT("null connection pointer\n");
693
0
    return 0;
694
0
  }
695
0
}
696
697
static str e_tcp_src_ip = str_init("src_ip");
698
static str e_tcp_src_port = str_init("src_port");
699
static str e_tcp_dst_ip = str_init("dst_ip");
700
static str e_tcp_dst_port = str_init("dst_port");
701
static str e_tcp_c_proto = str_init("proto");
702
703
static void tcp_disconnect_event_raise(struct tcp_connection* c)
704
0
{
705
0
  evi_params_p list = 0;
706
0
  str src_ip,dst_ip, proto;
707
0
  int src_port,dst_port;
708
0
  char src_ip_buf[IP_ADDR_MAX_STR_SIZE],dst_ip_buf[IP_ADDR_MAX_STR_SIZE];
709
710
  // event has to be triggered - check for subscribers
711
0
  if (!evi_probe_event(EVI_TCP_DISCONNECT)) {
712
0
    goto end;
713
0
  }
714
715
0
  if (!(list = evi_get_params()))
716
0
    goto end;
717
718
0
  src_ip.s = ip_addr2a( &c->rcv.src_ip );
719
0
  memcpy(src_ip_buf,src_ip.s,IP_ADDR_MAX_STR_SIZE);
720
0
  src_ip.s = src_ip_buf;
721
0
  src_ip.len = strlen(src_ip.s);
722
723
0
  if (evi_param_add_str(list, &e_tcp_src_ip, &src_ip)) {
724
0
    LM_ERR("unable to add parameter\n");
725
0
    goto end;
726
0
  }
727
728
0
  src_port = c->rcv.src_port;
729
730
0
  if (evi_param_add_int(list, &e_tcp_src_port, &src_port)) {
731
0
    LM_ERR("unable to add parameter\n");
732
0
    goto end;
733
0
  }
734
735
0
  dst_ip.s = ip_addr2a( &c->rcv.dst_ip );
736
0
  memcpy(dst_ip_buf,dst_ip.s,IP_ADDR_MAX_STR_SIZE);
737
0
  dst_ip.s = dst_ip_buf;
738
0
  dst_ip.len = strlen(dst_ip.s);
739
740
0
  if (evi_param_add_str(list, &e_tcp_dst_ip, &dst_ip)) {
741
0
    LM_ERR("unable to add parameter\n");
742
0
    goto end;
743
0
  }
744
745
0
  dst_port = c->rcv.dst_port;
746
747
0
  if (evi_param_add_int(list, &e_tcp_dst_port, &dst_port)) {
748
0
    LM_ERR("unable to add parameter\n");
749
0
    goto end;
750
0
  }
751
752
0
  proto.s = protos[c->rcv.proto].name;
753
0
  proto.len = strlen(proto.s);
754
755
0
  if (evi_param_add_str(list, &e_tcp_c_proto, &proto)) {
756
0
    LM_ERR("unable to add parameter\n");
757
0
    goto end;
758
0
  }
759
760
0
  if (is_tcp_main) {
761
0
    if (evi_dispatch_event(EVI_TCP_DISCONNECT, list)) {
762
0
      LM_ERR("unable to dispatch tcp disconnect event\n");
763
0
    }
764
0
  } else {
765
0
    if (evi_raise_event(EVI_TCP_DISCONNECT, list)) {
766
0
      LM_ERR("unable to send tcp disconnect event\n");
767
0
    }
768
0
  }
769
0
  list = 0;
770
771
0
end:
772
0
  if (list)
773
0
    evi_free_params(list);
774
0
}
775
776
/* convenience macro to aid in shm_free() debugging */
777
#define _tcpconn_rm(c, ne) \
778
0
  do {\
779
0
    __tcpconn_rm(c, ne);\
780
0
    shm_free(c);\
781
0
  } while (0)
782
783
struct tcp_req *tcp_conn_get_req(struct tcp_connection *c)
784
0
{
785
0
  if (!c)
786
0
    return NULL;
787
788
0
  if (c->con_req)
789
0
    return c->con_req;
790
791
0
  c->con_req = thread_malloc(sizeof(*c->con_req));
792
0
  if (!c->con_req) {
793
0
    LM_ERR("failed to allocate TCP request buffer for connection %u\n",
794
0
      c->id);
795
0
    return NULL;
796
0
  }
797
0
  memset(c->con_req, 0, sizeof(*c->con_req));
798
799
0
  return c->con_req;
800
0
}
801
802
void tcp_conn_destroy_req(struct tcp_connection *c)
803
0
{
804
0
  if (!c || !c->con_req)
805
0
    return;
806
807
0
  thread_free(c->con_req);
808
0
  c->con_req = NULL;
809
0
}
810
811
/*! \brief unsafe tcpconn_rm version (nolocks) */
812
static void __tcpconn_rm(struct tcp_connection* c, int no_event)
813
0
{
814
0
  int r;
815
816
0
  if (c->flags & F_CONN_HASHED) {
817
0
    tcpconn_listrm(TCP_PART(c->id).tcpconn_id_hash[c->id_hash], c,
818
0
      id_next, id_prev);
819
    /* remove all the aliases */
820
0
    for (r=0; r<c->aliases; r++)
821
0
      tcpconn_listrm(TCP_PART(c->id).tcpconn_aliases_hash[
822
0
        c->con_aliases[r].hash], &c->con_aliases[r], next, prev);
823
0
    c->flags &= ~F_CONN_HASHED;
824
0
  }
825
0
  lock_destroy(&c->write_lock);
826
827
0
  if (c->async) {
828
0
    for (r = 0; r<c->async->pending; r++)
829
0
      shm_free(c->async->chunks[r]);
830
0
    shm_free(c->async);
831
0
    c->async = NULL;
832
0
  }
833
834
0
  lock_get(tcp_connections_lock);
835
0
  (*tcp_connections_no)--;
836
0
  lock_release(tcp_connections_lock);
837
838
0
  if (c->proto_req)
839
0
    thread_free(c->proto_req);
840
0
  c->proto_req = NULL;
841
0
  tcp_conn_destroy_req(c);
842
843
0
  if (protos[c->type].net.stream.conn.clean)
844
0
    protos[c->type].net.stream.conn.clean(c);
845
846
0
  if (!no_event) tcp_disconnect_event_raise(c);
847
848
#ifdef DBG_TCPCON
849
  sh_log(c->hist, TCP_DESTROY, "type=%d", c->type);
850
  sh_unref(c->hist);
851
  c->hist = NULL;
852
#endif
853
854
  /* shm_free(c); -- freed by _tcpconn_rm() */
855
0
}
856
857
/*! \brief add port as an alias for the "id" connection
858
 * \return 0 on success,-1 on failure */
859
int tcpconn_add_alias(struct sip_msg *msg, unsigned int id, int port, int proto)
860
0
{
861
0
  struct tcp_connection* c;
862
0
  unsigned hash;
863
0
  struct tcp_conn_alias* a;
864
865
0
  a=0;
866
  /* fix the port */
867
0
  port=port ? port : protos[proto].default_port ;
868
0
  TCPCONN_LOCK(id);
869
  /* check if alias already exists */
870
0
  c=_tcpconn_find(id);
871
0
  if (c) {
872
0
    if (msg && !(c->profile.alias_mode == TCP_ALIAS_ALWAYS
873
0
                   || (c->profile.alias_mode == TCP_ALIAS_RFC_5923
874
0
                       && msg->via1->alias))) {
875
0
      LM_DBG("refusing to add alias (alias_mode: %u, via 'alias': %u)\n",
876
0
              c->profile.alias_mode, !!msg->via1->alias);
877
0
      TCPCONN_UNLOCK(id);
878
0
      return 0;
879
0
    }
880
881
0
    hash=tcp_addr_hash(&c->rcv.src_ip, port);
882
    /* search the aliases for an already existing one */
883
0
    for (a=TCP_PART(id).tcpconn_aliases_hash[hash]; a; a=a->next) {
884
0
      if (a->parent->state != S_CONN_BAD &&
885
0
          port == a->port &&
886
0
          proto == a->parent->type &&
887
0
          ip_addr_cmp(&c->rcv.src_ip, &a->parent->rcv.src_ip)) {
888
        /* found */
889
0
        if (a->parent!=c) goto error_sec;
890
0
        else goto ok;
891
0
      }
892
0
    }
893
0
    if (c->aliases>=TCP_CON_MAX_ALIASES) goto error_aliases;
894
0
    c->con_aliases[c->aliases].parent=c;
895
0
    c->con_aliases[c->aliases].port=port;
896
0
    c->con_aliases[c->aliases].hash=hash;
897
0
    tcpconn_listadd(TCP_PART(id).tcpconn_aliases_hash[hash],
898
0
                &c->con_aliases[c->aliases], next, prev);
899
0
    c->aliases++;
900
0
  }else goto error_not_found;
901
0
ok:
902
0
  TCPCONN_UNLOCK(id);
903
#ifdef EXTRA_DEBUG
904
  if (a) LM_DBG("alias already present\n");
905
  else   LM_DBG("alias port %d for hash %d, id %u\n", port, hash, id);
906
#endif
907
0
  return 0;
908
0
error_aliases:
909
0
  TCPCONN_UNLOCK(id);
910
0
  LM_ERR("too many aliases for connection %p (%u)\n", c, id);
911
0
  return -1;
912
0
error_not_found:
913
0
  TCPCONN_UNLOCK(id);
914
0
  LM_ERR("no connection found for id %u\n",id);
915
0
  return -1;
916
0
error_sec:
917
0
  LM_WARN("possible port hijack attempt\n");
918
0
  LM_WARN("alias already present and points to another connection "
919
0
      "(%d : %d and %u : %d)\n", a->parent->id,  port, id, port);
920
0
  TCPCONN_UNLOCK(id);
921
0
  return -1;
922
0
}
923
924
925
void tcpconn_put(struct tcp_connection* c)
926
0
{
927
0
  int destroy = 0;
928
929
0
  TCPCONN_LOCK(c->id);
930
0
  c->refcnt--;
931
0
  if (c->refcnt == 0 && (c->flags & F_CONN_HASHED) == 0)
932
0
    destroy = 1;
933
0
  TCPCONN_UNLOCK(c->id);
934
935
0
  if (destroy)
936
0
    _tcpconn_rm(c, 1);
937
0
}
938
939
940
static inline void tcpconn_ref(struct tcp_connection* c)
941
0
{
942
0
  TCPCONN_LOCK(c->id);
943
0
  c->refcnt++;
944
0
  TCPCONN_UNLOCK(c->id);
945
0
}
946
947
948
static struct tcp_connection* tcpconn_new(int sock, const union sockaddr_union* su,
949
                    const struct socket_info* si, const struct tcp_conn_profile *prof,
950
                    int state, int flags)
951
0
{
952
0
  struct tcp_connection *c;
953
0
  union sockaddr_union local_su;
954
0
  unsigned int su_size;
955
0
  int counted = 0;
956
957
0
  lock_get(tcp_connections_lock);
958
0
  if (*tcp_connections_no >= (unsigned int)tcp_max_connections) {
959
0
    lock_release(tcp_connections_lock);
960
0
    LM_ERR("maximum number of connections exceeded: %u/%d\n",
961
0
      *tcp_connections_no, tcp_max_connections);
962
0
    return 0;
963
0
  }
964
0
  (*tcp_connections_no)++;
965
0
  lock_release(tcp_connections_lock);
966
0
  counted = 1;
967
968
0
  c=(struct tcp_connection*)shm_malloc(sizeof(struct tcp_connection));
969
0
  if (c==0){
970
0
    LM_ERR("shared memory allocation failure\n");
971
0
    goto error_count;
972
0
  }
973
0
  memset(c, 0, sizeof(struct tcp_connection)); /* zero init */
974
0
  c->fd=sock;
975
0
  if (lock_init(&c->write_lock)==0){
976
0
    LM_ERR("init lock failed\n");
977
0
    goto error0;
978
0
  }
979
980
0
  c->rcv.src_su=*su;
981
982
0
  c->refcnt=0;
983
0
  su2ip_addr(&c->rcv.src_ip, su);
984
0
  c->rcv.src_port=su_getport(su);
985
0
  c->rcv.bind_address = si;
986
0
  c->rcv.dst_ip = si->address;
987
0
  if (sock >= 0) {
988
0
    su_size = sockaddru_len(*su);
989
0
    if (getsockname(sock, (struct sockaddr *)&local_su, &su_size)<0) {
990
0
      LM_ERR("failed to get info on received interface/IP %d/%s\n",
991
0
        errno, strerror(errno));
992
0
      goto error;
993
0
    }
994
0
    c->rcv.dst_port = su_getport(&local_su);
995
0
  } else {
996
0
    c->rcv.dst_port = (si->flags & SI_REUSEPORT) ? su_getport(&si->su) : 0;
997
0
  }
998
0
  print_ip("tcpconn_new: new tcp connection to: ", &c->rcv.src_ip, "\n");
999
0
  LM_DBG("on port %d, proto %d\n", c->rcv.src_port, si->proto);
1000
0
  c->id=(*connection_id)++;
1001
0
  c->cid = (unsigned long long)c->id
1002
0
        | ( (unsigned long long)(startup_time&0xFFFFFF) << 32 )
1003
0
          | ( (unsigned long long)(rand()&0xFF) << 56 );
1004
1005
0
  c->rcv.proto_reserved1=0; /* this will be filled before receive_message*/
1006
0
  c->rcv.proto_reserved2=0;
1007
0
  c->state=state;
1008
0
  c->extra_data=0;
1009
0
  c->type = si->proto;
1010
0
  c->rcv.proto = si->proto;
1011
  /* start with the default conn lifetime */
1012
0
  c->lifetime = get_ticks() + prof->con_lifetime;
1013
0
  c->timeout = c->lifetime;
1014
0
  c->profile = *prof;
1015
0
  c->flags|=F_CONN_REMOVED|flags;
1016
#ifdef DBG_TCPCON
1017
  c->hist = sh_push(c, con_hist);
1018
#endif
1019
1020
0
  if (protos[si->proto].net.stream.async_chunks) {
1021
0
    c->async = shm_malloc(sizeof(struct tcp_async_data) +
1022
0
        protos[si->proto].net.stream.async_chunks *
1023
0
        sizeof(struct tcp_async_chunk));
1024
0
    if (c->async) {
1025
0
      c->async->allocated = protos[si->proto].net.stream.async_chunks;
1026
0
      c->async->oldest = 0;
1027
0
      c->async->pending = 0;
1028
0
    } else {
1029
0
      LM_ERR("could not allocate async data for con!\n");
1030
0
      goto error;
1031
0
    }
1032
0
  }
1033
0
  if (sock >= 0 && protos[si->proto].net.stream.conn.init) {
1034
0
    if (protos[si->proto].net.stream.conn.init(c) < 0) {
1035
0
      LM_ERR("failed to do proto %d specific init for conn %p\n",
1036
0
          c->type, c);
1037
0
      goto error;
1038
0
    }
1039
0
    c->flags |= F_CONN_INIT;
1040
0
  }
1041
0
  return c;
1042
1043
0
error:
1044
0
  lock_destroy(&c->write_lock);
1045
0
error0:
1046
0
  shm_free(c);
1047
0
error_count:
1048
0
  if (counted) {
1049
0
    lock_get(tcp_connections_lock);
1050
0
    (*tcp_connections_no)--;
1051
0
    lock_release(tcp_connections_lock);
1052
0
  }
1053
0
  return 0;
1054
0
}
1055
1056
1057
/* creates a new tcp connection structure
1058
 * for an outgoing connection request; local private state is initialized later
1059
 * a +1 ref is set for the new conn !
1060
 * IMPORTANT - the function assumes you want to create a new TCP conn as
1061
 * a result of a connect operation - the conn will be set as connect !!
1062
 * Accepted connection are triggered internally only */
1063
struct tcp_connection* tcp_conn_create(const union sockaddr_union* su,
1064
    const struct socket_info* si, struct tcp_conn_profile *prof,
1065
    int state)
1066
0
{
1067
0
  struct tcp_connection *c;
1068
1069
0
  if (!prof)
1070
0
    tcp_con_get_profile(su, &si->su, si->proto, prof);
1071
1072
  /* create the connection structure */
1073
0
  c = tcpconn_new(-1, su, si, prof, state, 0);
1074
0
  if (c==NULL) {
1075
0
    LM_ERR("tcpconn_new failed\n");
1076
0
    return NULL;
1077
0
  }
1078
1079
0
  c->refcnt++; /* safe to do it w/o locking, it's not yet
1080
          available to the rest of the world */
1081
0
  sh_log(c->hist, TCP_REF, "connect, (%d)", c->refcnt);
1082
0
  return c;
1083
0
}
1084
1085
1086
static inline void tcpconn_destroy(struct tcp_connection* tcpconn)
1087
0
{
1088
0
  int fd;
1089
0
  int unsigned id = tcpconn->id;
1090
0
  int hashed;
1091
1092
0
  TCPCONN_LOCK(id); /*avoid races w/ tcp_send*/
1093
0
  tcpconn->refcnt--;
1094
0
  if (tcpconn->refcnt==0){
1095
0
    LM_DBG("destroying connection %p, flags %04x\n",
1096
0
        tcpconn, tcpconn->flags);
1097
0
    fd=tcpconn->fd;
1098
    /* no reporting here - the tcpconn_destroy() function is called
1099
     * from the TCP_MAIN reactor when handling connectioned received
1100
     * from a worker; and we generate the CLOSE reports from WORKERs */
1101
0
    hashed = (tcpconn->flags & F_CONN_HASHED);
1102
0
    _tcpconn_rm(tcpconn, hashed ? 0 : 1);
1103
0
    if (fd >= 0)
1104
0
      close(fd);
1105
0
  }else{
1106
    /* force timeout */
1107
0
    tcpconn->lifetime=0;
1108
0
    tcpconn->timeout=0;
1109
0
    tcpconn->state=S_CONN_BAD;
1110
0
    sh_log(tcpconn->hist, TCP_DEL_DELAY, "tcpconn_destroy delayed, (%d)",
1111
0
      tcpconn->refcnt);
1112
0
    LM_DBG("delaying (%p, flags %04x) ref = %d ...\n",
1113
0
        tcpconn, tcpconn->flags, tcpconn->refcnt);
1114
1115
0
  }
1116
0
  TCPCONN_UNLOCK(id);
1117
0
}
1118
1119
/* wrapper to the internally used function */
1120
void tcp_conn_destroy(struct tcp_connection* tcpconn)
1121
0
{
1122
0
  tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE,
1123
0
        "Closed by Proto layer");
1124
0
  sh_log(tcpconn->hist, TCP_UNREF, "tcp_conn_destroy, (%d)", tcpconn->refcnt);
1125
0
  return tcpconn_destroy(tcpconn);
1126
0
}
1127
1128
static inline int tcp_set_nonblock(int fd)
1129
0
{
1130
0
  int flags;
1131
1132
0
  flags = fcntl(fd, F_GETFL);
1133
0
  if (flags == -1) {
1134
0
    LM_ERR("fcntl(F_GETFL) failed for %d: %s\n", fd, strerror(errno));
1135
0
    return -1;
1136
0
  }
1137
1138
0
  if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
1139
0
    LM_ERR("fcntl(F_SETFL) failed for %d: %s\n", fd, strerror(errno));
1140
0
    return -1;
1141
0
  }
1142
1143
0
  return 0;
1144
0
}
1145
1146
static void tcp_push_done_job(struct tcp_job *job)
1147
0
{
1148
0
  pthread_mutex_lock(&tcp_pool.done_lock);
1149
0
  if (tcp_pool.done_tail)
1150
0
    tcp_pool.done_tail->next = job;
1151
0
  else
1152
0
    tcp_pool.done_head = job;
1153
0
  tcp_pool.done_tail = job;
1154
0
  pthread_mutex_unlock(&tcp_pool.done_lock);
1155
0
}
1156
1157
static struct tcp_job *tcp_pop_done_job(void)
1158
0
{
1159
0
  struct tcp_job *job;
1160
1161
0
  pthread_mutex_lock(&tcp_pool.done_lock);
1162
0
  job = tcp_pool.done_head;
1163
0
  if (job) {
1164
0
    tcp_pool.done_head = job->next;
1165
0
    if (tcp_pool.done_head == NULL)
1166
0
      tcp_pool.done_tail = NULL;
1167
0
  }
1168
0
  pthread_mutex_unlock(&tcp_pool.done_lock);
1169
1170
0
  return job;
1171
0
}
1172
1173
static inline struct tcp_connection *tcp_pop_shared_write_conn_locked(void)
1174
0
{
1175
0
  struct tcp_connection *conn;
1176
1177
0
  conn = tcp_write_queue->head;
1178
0
  if (conn) {
1179
0
    tcp_write_queue->head = conn->wq_next;
1180
0
    if (tcp_write_queue->head == NULL)
1181
0
      tcp_write_queue->tail = NULL;
1182
0
    conn->wq_next = NULL;
1183
0
  }
1184
1185
0
  return conn;
1186
0
}
1187
1188
static void *tcp_thread_routine(void *arg)
1189
0
{
1190
0
  struct tcp_job *job;
1191
0
  struct tcp_connection *conn;
1192
0
  char wake = 'x';
1193
0
  int rc;
1194
1195
0
  (void)arg;
1196
1197
  /* Reactor operations stay in TCP main; IO threads only run read/write
1198
   * callbacks and notify completion back to the main thread. */
1199
0
  while (1) {
1200
0
    cond_lock(&tcp_write_queue->cond);
1201
0
    while (!tcp_pool.stop && tcp_pool.task_head == NULL &&
1202
0
        tcp_write_queue->head == NULL)
1203
0
      cond_wait(&tcp_write_queue->cond);
1204
1205
0
    if (tcp_pool.stop && tcp_pool.task_head == NULL &&
1206
0
        tcp_write_queue->head == NULL) {
1207
0
      cond_unlock(&tcp_write_queue->cond);
1208
0
      break;
1209
0
    }
1210
1211
0
    job = tcp_pool.task_head;
1212
0
    if (job) {
1213
0
      tcp_pool.task_head = job->next;
1214
0
      if (tcp_pool.task_head == NULL)
1215
0
        tcp_pool.task_tail = NULL;
1216
0
    } else if ((conn = tcp_pop_shared_write_conn_locked()) != NULL) {
1217
0
      job = thread_malloc(sizeof(*job));
1218
0
      if (!job) {
1219
0
        LM_ERR("oom while building shared TCP write job\n");
1220
0
        conn->flags &= ~F_CONN_WRITE_QUEUED;
1221
0
        tcpconn_put(conn);
1222
0
        cond_unlock(&tcp_write_queue->cond);
1223
0
        continue;
1224
0
      }
1225
0
      job->conn = conn;
1226
0
      job->op = TCP_WRITE_JOB;
1227
0
      job->run = NULL;
1228
0
      job->data = NULL;
1229
0
      job->resp = 0;
1230
0
      job->ret = 0;
1231
0
      job->next = NULL;
1232
0
    }
1233
0
    cond_unlock(&tcp_write_queue->cond);
1234
1235
0
    conn = job->conn;
1236
0
    if (job->op == TCP_READ_JOB) {
1237
0
      if (conn->msg_attempts && get_ticks() > conn->timeout) {
1238
0
        job->ret = -1;
1239
0
        job->resp = -2;
1240
0
      } else if (protos[conn->type].net.stream.read) {
1241
0
        job->resp = protos[conn->type].net.stream.read(conn, &job->ret);
1242
0
      } else {
1243
0
        LM_ERR("missing stream.read callback for proto %d\n", conn->type);
1244
0
        job->ret = -1;
1245
0
        job->resp = -1;
1246
0
      }
1247
0
    } else if (job->op == TCP_WRITE_JOB) {
1248
0
      if (protos[conn->type].net.stream.write) {
1249
0
        if (tcpconn_prepare_write(conn) < 0) {
1250
0
          job->ret = -1;
1251
0
          job->resp = -1;
1252
0
          goto done_job;
1253
0
        }
1254
0
        lock_get(&conn->write_lock);
1255
0
        job->resp = protos[conn->type].net.stream.write(conn, conn->fd);
1256
0
        lock_release(&conn->write_lock);
1257
0
        job->ret = (job->resp < 0) ? -1 : 0;
1258
0
      } else {
1259
0
        LM_ERR("missing stream.write callback for proto %d\n", conn->type);
1260
0
        job->ret = -1;
1261
0
        job->resp = -1;
1262
0
      }
1263
0
    } else if (job->op == TCP_RUN_JOB) {
1264
0
      job->ret = job->run ? job->run(job->data) : -1;
1265
0
      thread_free(job);
1266
0
      continue;
1267
0
    } else {
1268
0
      LM_ERR("unknown TCP job op %d\n", job->op);
1269
0
      job->ret = -1;
1270
0
      job->resp = -1;
1271
0
    }
1272
1273
0
done_job:
1274
0
    job->next = NULL;
1275
0
    tcp_push_done_job(job);
1276
1277
0
    rc = write(tcp_pool.notify_pipe[1], &wake, 1);
1278
0
    if (rc < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
1279
0
      LM_ERR("failed to notify TCP IO completion: %s\n", strerror(errno));
1280
0
  }
1281
1282
0
  return NULL;
1283
0
}
1284
1285
static int tcp_pool_init(void)
1286
0
{
1287
0
  int i;
1288
0
  int started = 0;
1289
0
  int threads_no;
1290
0
  long cpu_no;
1291
1292
0
  if (tcp_threads > 0)
1293
0
    threads_no = tcp_threads;
1294
0
  else {
1295
0
    cpu_no = sysconf(_SC_NPROCESSORS_ONLN);
1296
0
    if (cpu_no > 0)
1297
0
      threads_no = (int)cpu_no;
1298
0
    else if (tcp_workers_no > 0)
1299
0
      threads_no = tcp_workers_no;
1300
0
    else
1301
0
      threads_no = 1;
1302
0
  }
1303
1304
0
  if (pipe(tcp_pool.notify_pipe) < 0) {
1305
0
    LM_ERR("failed to create TCP IO notification pipe: %s\n", strerror(errno));
1306
0
    goto error;
1307
0
  }
1308
1309
0
  if (tcp_set_nonblock(tcp_pool.notify_pipe[0]) < 0 ||
1310
0
      tcp_set_nonblock(tcp_pool.notify_pipe[1]) < 0)
1311
0
    goto error;
1312
1313
0
  if (reactor_add_reader(tcp_pool.notify_pipe[0],
1314
0
    F_TCP_NOTIFY, RCT_PRIO_PROC, NULL) < 0) {
1315
0
    LM_ERR("failed to add TCP IO notify pipe to reactor\n");
1316
0
    goto error;
1317
0
  }
1318
1319
0
  tcp_pool.threads = pkg_malloc(sizeof(*tcp_pool.threads) * threads_no);
1320
0
  if (!tcp_pool.threads) {
1321
0
    LM_ERR("oom while allocating TCP IO threads array\n");
1322
0
    goto error;
1323
0
  }
1324
1325
0
  tcp_pool.stop = 0;
1326
0
  tcp_pool.threads_no = threads_no;
1327
1328
0
  for (i = 0; i < threads_no; i++) {
1329
0
    if (pthread_create(&tcp_pool.threads[i], NULL,
1330
0
        tcp_thread_routine, NULL) != 0) {
1331
0
      LM_ERR("failed to start TCP IO thread %d/%d\n", i + 1, threads_no);
1332
0
      goto error;
1333
0
    }
1334
0
    started++;
1335
0
  }
1336
1337
0
  LM_NOTICE("TCP single IO mode started with %d threads\n", threads_no);
1338
0
  return 0;
1339
1340
0
error:
1341
0
  cond_lock(&tcp_write_queue->cond);
1342
0
  tcp_pool.stop = 1;
1343
0
  cond_broadcast(&tcp_write_queue->cond);
1344
0
  cond_unlock(&tcp_write_queue->cond);
1345
1346
0
  if (tcp_pool.threads) {
1347
0
    for (i = 0; i < started; i++)
1348
0
      pthread_join(tcp_pool.threads[i], NULL);
1349
0
    pkg_free(tcp_pool.threads);
1350
0
    tcp_pool.threads = NULL;
1351
0
  }
1352
0
  tcp_pool.threads_no = 0;
1353
1354
0
  if (tcp_pool.notify_pipe[0] >= 0) {
1355
0
    reactor_del_reader(tcp_pool.notify_pipe[0], -1, 0);
1356
0
    close(tcp_pool.notify_pipe[0]);
1357
0
    tcp_pool.notify_pipe[0] = -1;
1358
0
  }
1359
0
  if (tcp_pool.notify_pipe[1] >= 0) {
1360
0
    close(tcp_pool.notify_pipe[1]);
1361
0
    tcp_pool.notify_pipe[1] = -1;
1362
0
  }
1363
1364
0
  return -1;
1365
0
}
1366
1367
static void tcp_pool_destroy(void)
1368
0
{
1369
0
  int i;
1370
0
  struct tcp_job *job;
1371
0
  struct tcp_job *next;
1372
0
  struct tcp_connection *conn;
1373
1374
0
  if (!tcp_threads_active() && tcp_pool.notify_pipe[0] < 0)
1375
0
    return;
1376
1377
0
  cond_lock(&tcp_write_queue->cond);
1378
0
  tcp_pool.stop = 1;
1379
0
  cond_broadcast(&tcp_write_queue->cond);
1380
0
  cond_unlock(&tcp_write_queue->cond);
1381
1382
0
  for (i = 0; i < tcp_pool.threads_no; i++)
1383
0
    pthread_join(tcp_pool.threads[i], NULL);
1384
1385
0
  if (tcp_pool.threads) {
1386
0
    pkg_free(tcp_pool.threads);
1387
0
    tcp_pool.threads = NULL;
1388
0
  }
1389
0
  tcp_pool.threads_no = 0;
1390
1391
0
  if (tcp_pool.notify_pipe[0] >= 0) {
1392
0
    reactor_del_reader(tcp_pool.notify_pipe[0], -1, 0);
1393
0
    close(tcp_pool.notify_pipe[0]);
1394
0
    tcp_pool.notify_pipe[0] = -1;
1395
0
  }
1396
0
  if (tcp_pool.notify_pipe[1] >= 0) {
1397
0
    close(tcp_pool.notify_pipe[1]);
1398
0
    tcp_pool.notify_pipe[1] = -1;
1399
0
  }
1400
1401
0
  cond_lock(&tcp_write_queue->cond);
1402
0
  for (job = tcp_pool.task_head; job; job = next) {
1403
0
    next = job->next;
1404
0
    if (job->conn)
1405
0
      tcpconn_put(job->conn);
1406
0
    thread_free(job);
1407
0
  }
1408
0
  tcp_pool.task_head = tcp_pool.task_tail = NULL;
1409
0
  while ((conn = tcp_pop_shared_write_conn_locked()) != NULL) {
1410
0
    conn->flags &= ~F_CONN_WRITE_QUEUED;
1411
0
    tcpconn_put(conn);
1412
0
  }
1413
0
  cond_unlock(&tcp_write_queue->cond);
1414
1415
0
  pthread_mutex_lock(&tcp_pool.done_lock);
1416
0
  for (job = tcp_pool.done_head; job; job = next) {
1417
0
    next = job->next;
1418
0
    if (job->conn)
1419
0
      tcpconn_put(job->conn);
1420
0
    thread_free(job);
1421
0
  }
1422
0
  tcp_pool.done_head = tcp_pool.done_tail = NULL;
1423
0
  pthread_mutex_unlock(&tcp_pool.done_lock);
1424
0
}
1425
1426
static int tcp_queue_job(struct tcp_connection *tcpconn, int op)
1427
0
{
1428
0
  struct tcp_job *job;
1429
1430
0
  if (!tcp_threads_active())
1431
0
    return -1;
1432
1433
0
  job = thread_malloc(sizeof(*job));
1434
0
  if (!job) {
1435
0
    LM_ERR("oom while queuing TCP IO job\n");
1436
0
    return -1;
1437
0
  }
1438
1439
0
  job->conn = tcpconn;
1440
0
  job->op = op;
1441
0
  job->run = NULL;
1442
0
  job->data = NULL;
1443
0
  job->resp = 0;
1444
0
  job->ret = 0;
1445
0
  job->next = NULL;
1446
1447
0
  cond_lock(&tcp_write_queue->cond);
1448
0
  if (tcp_pool.task_tail)
1449
0
    tcp_pool.task_tail->next = job;
1450
0
  else
1451
0
    tcp_pool.task_head = job;
1452
0
  tcp_pool.task_tail = job;
1453
0
  cond_signal(&tcp_write_queue->cond);
1454
0
  cond_unlock(&tcp_write_queue->cond);
1455
1456
0
  return 0;
1457
0
}
1458
1459
int tcp_async_write_job(struct tcp_connection *tcpconn)
1460
0
{
1461
0
  if (!tcp_write_queue)
1462
0
    return -1;
1463
0
  if ((tcpconn->flags & F_CONN_HASHED) == 0)
1464
0
    tcpconn_add(tcpconn);
1465
1466
0
  cond_lock(&tcp_write_queue->cond);
1467
0
  if (tcpconn->flags & F_CONN_WRITE_QUEUED) {
1468
0
    cond_unlock(&tcp_write_queue->cond);
1469
0
    return 0;
1470
0
  }
1471
1472
0
  tcpconn->flags |= F_CONN_WRITE_QUEUED;
1473
0
  tcpconn->wq_next = NULL;
1474
0
  if (tcp_write_queue->tail)
1475
0
    tcp_write_queue->tail->wq_next = tcpconn;
1476
0
  else
1477
0
    tcp_write_queue->head = tcpconn;
1478
0
  tcp_write_queue->tail = tcpconn;
1479
0
  cond_signal(&tcp_write_queue->cond);
1480
0
  cond_unlock(&tcp_write_queue->cond);
1481
0
  return 0;
1482
0
}
1483
1484
int tcp_run_task(tcp_thread_job_f run, void *data)
1485
0
{
1486
0
  struct tcp_job *job;
1487
1488
0
  if (!run)
1489
0
    return -1;
1490
1491
0
  if (!tcp_threads_active()) {
1492
0
    run(data);
1493
0
    return 0;
1494
0
  }
1495
1496
0
  job = thread_malloc(sizeof(*job));
1497
0
  if (!job) {
1498
0
    LM_ERR("oom while queuing TCP run job\n");
1499
0
    return -1;
1500
0
  }
1501
1502
0
  job->conn = NULL;
1503
0
  job->op = TCP_RUN_JOB;
1504
0
  job->run = run;
1505
0
  job->data = data;
1506
0
  job->resp = 0;
1507
0
  job->ret = -1;
1508
0
  job->next = NULL;
1509
1510
0
  cond_lock(&tcp_write_queue->cond);
1511
0
  if (tcp_pool.task_tail)
1512
0
    tcp_pool.task_tail->next = job;
1513
0
  else
1514
0
    tcp_pool.task_head = job;
1515
0
  tcp_pool.task_tail = job;
1516
0
  cond_signal(&tcp_write_queue->cond);
1517
0
  cond_unlock(&tcp_write_queue->cond);
1518
1519
0
  return 0;
1520
0
}
1521
1522
static inline int tcp_queue_write_job(struct tcp_connection *tcpconn)
1523
0
{
1524
0
  if (!(tcpconn->flags & F_CONN_REMOVED_READ) && tcpconn->fd != -1) {
1525
0
    if (reactor_del_reader(tcpconn->fd, -1, 0) == -1)
1526
0
      return -1;
1527
0
    tcpconn->flags |= F_CONN_REMOVED_READ;
1528
0
  }
1529
1530
0
  if (tcp_async_write_job(tcpconn) < 0)
1531
0
    return -1;
1532
1533
0
  return 0;
1534
0
}
1535
1536
static inline void tcp_fail_conn(struct tcp_connection *tcpconn,
1537
    const char *reason, int report)
1538
0
{
1539
0
  if ((tcpconn->flags & F_CONN_REMOVED) != F_CONN_REMOVED &&
1540
0
      tcpconn->fd != -1) {
1541
0
    reactor_del_all(tcpconn->fd, -1, IO_FD_CLOSING);
1542
0
    tcpconn->flags |= F_CONN_REMOVED;
1543
0
  }
1544
1545
0
  if (report)
1546
0
    tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE, (void *)reason);
1547
1548
0
  tcpconn_destroy(tcpconn);
1549
0
}
1550
1551
static inline void tcp_complete_read(struct tcp_job *job)
1552
0
{
1553
0
  struct tcp_connection *tcpconn;
1554
1555
0
  tcpconn = job->conn;
1556
1557
0
  if (job->resp == -2) {
1558
0
    tcp_fail_conn(tcpconn, "Timeout waiting for a complete message", 1);
1559
0
    return;
1560
0
  }
1561
1562
0
  if (job->resp < 0 || tcpconn->state == S_CONN_BAD) {
1563
0
    tcp_fail_conn(tcpconn, "Read error", 1);
1564
0
    return;
1565
0
  }
1566
1567
0
  if (tcpconn->state == S_CONN_EOF) {
1568
0
    tcp_fail_conn(tcpconn, "EOF received", 1);
1569
0
    return;
1570
0
  }
1571
1572
0
  if (tcpconn->flags & F_CONN_REMOVED_READ) {
1573
0
    if (reactor_add_reader(tcpconn->fd, F_TCPCONN, RCT_PRIO_NET, tcpconn) < 0) {
1574
0
      LM_ERR("failed to re-add TCP conn %p for read events\n", tcpconn);
1575
0
      tcp_fail_conn(tcpconn, "Failed to re-arm read", 0);
1576
0
      return;
1577
0
    }
1578
0
    tcpconn->flags &= ~F_CONN_REMOVED_READ;
1579
0
  }
1580
1581
0
  tcpconn_put(tcpconn);
1582
0
}
1583
1584
static inline void tcp_complete_write(struct tcp_job *job)
1585
0
{
1586
0
  struct tcp_connection *tcpconn;
1587
0
  int pending_chunks;
1588
1589
0
  tcpconn = job->conn;
1590
1591
0
  if (job->resp < 0 || tcpconn->state == S_CONN_BAD) {
1592
0
    tcp_fail_conn(tcpconn, "Write error", 1);
1593
0
    return;
1594
0
  }
1595
1596
0
  lock_get(&tcpconn->write_lock);
1597
0
  pending_chunks = (tcpconn->async && tcpconn->async->pending);
1598
0
  lock_release(&tcpconn->write_lock);
1599
1600
0
  if ((tcpconn->flags & F_CONN_REMOVED_READ) && tcpconn->fd != -1) {
1601
0
    if (reactor_add_reader(tcpconn->fd, F_TCPCONN, RCT_PRIO_NET,
1602
0
        tcpconn) < 0) {
1603
0
      LM_ERR("failed to add TCP conn %p for read events\n", tcpconn);
1604
0
      tcp_fail_conn(tcpconn, "Failed to arm read", 0);
1605
0
      return;
1606
0
    }
1607
0
    tcpconn->flags &= ~F_CONN_REMOVED_READ;
1608
0
  }
1609
1610
0
  if (job->resp == 1) {
1611
0
    if (reactor_add_writer(tcpconn->fd, F_TCPCONN, RCT_PRIO_NET, tcpconn) < 0) {
1612
0
      LM_ERR("failed to re-add TCP conn %p for write events\n", tcpconn);
1613
0
      tcp_fail_conn(tcpconn, "Failed to re-arm write", 0);
1614
0
      return;
1615
0
    }
1616
0
    tcpconn->flags &= ~F_CONN_REMOVED_WRITE;
1617
0
    tcpconn->flags &= ~F_CONN_WRITE_QUEUED;
1618
0
    tcpconn_put(tcpconn);
1619
0
    return;
1620
0
  }
1621
1622
0
  if (pending_chunks) {
1623
0
    tcpconn->flags &= ~F_CONN_WRITE_QUEUED;
1624
0
    if (tcp_async_write_job(tcpconn) < 0) {
1625
0
      LM_ERR("failed queuing follow-up TCP write job\n");
1626
0
      tcpconn->flags &= ~F_CONN_WRITE_QUEUED;
1627
0
      if (reactor_add_writer(tcpconn->fd, F_TCPCONN, RCT_PRIO_NET, tcpconn) < 0) {
1628
0
        tcp_fail_conn(tcpconn, "Failed queueing follow-up write", 0);
1629
0
        return;
1630
0
      }
1631
0
      tcpconn->flags &= ~F_CONN_REMOVED_WRITE;
1632
0
      tcpconn_put(tcpconn);
1633
0
    }
1634
0
    return;
1635
0
  }
1636
1637
0
  tcpconn->flags &= ~F_CONN_WRITE_QUEUED;
1638
0
  tcpconn_put(tcpconn);
1639
0
}
1640
1641
static inline int handle_tcp_notify(int fd)
1642
0
{
1643
0
  char buf[64];
1644
0
  int n;
1645
0
  struct tcp_job *job;
1646
1647
0
  while ((n = read(fd, buf, sizeof(buf))) > 0)
1648
0
    ;
1649
0
  if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
1650
0
    LM_ERR("failed to read TCP IO notify fd: %s\n", strerror(errno));
1651
1652
0
  while ((job = tcp_pop_done_job()) != NULL) {
1653
0
    if (job->op == TCP_READ_JOB)
1654
0
      tcp_complete_read(job);
1655
0
    else
1656
0
      tcp_complete_write(job);
1657
0
    thread_free(job);
1658
0
  }
1659
1660
0
  return 0;
1661
0
}
1662
1663
1664
/************************ TCP MAIN process functions ************************/
1665
1666
/*! \brief
1667
 * handles a new connection, called internally by tcp_main_loop/handle_io.
1668
 * \param si - pointer to one of the tcp socket_info structures on which
1669
 *              an io event was detected (connection attempt)
1670
 * \return  handle_* return convention: -1 on error, 0 on EAGAIN (no more
1671
 *           io events queued), >0 on success. success/error refer only to
1672
 *           the accept.
1673
 */
1674
static inline int handle_new_connect(const struct socket_info* si)
1675
0
{
1676
0
  union sockaddr_union su;
1677
0
  struct tcp_connection* tcpconn;
1678
0
  struct tcp_conn_profile prof;
1679
0
  socklen_t su_len = sizeof(su);
1680
0
  int new_sock;
1681
0
  unsigned int id;
1682
1683
  /* coverity[overrun-buffer-arg: FALSE] - union has 28 bytes, CID #200070 */
1684
0
  new_sock = accept(si->socket, &(su.s), &su_len);
1685
0
  if (new_sock == -1) {
1686
0
    if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1687
0
      return 0;
1688
0
    LM_ERR("failed to accept connection(%d): %s\n", errno, strerror(errno));
1689
0
    return -1;
1690
0
  }
1691
1692
0
  tcp_con_get_profile(&su, &si->su, si->proto, &prof);
1693
0
  if (tcp_init_sock_opt(new_sock, &prof, si->flags, si->tos) < 0) {
1694
0
    LM_ERR("tcp_init_sock_opt failed\n");
1695
0
    close(new_sock);
1696
0
    return 1; /* success, because the accept was successful */
1697
0
  }
1698
1699
  /* add socket to list */
1700
0
  tcpconn = tcpconn_new(new_sock, &su, si, &prof, S_CONN_OK,
1701
0
    F_CONN_ACCEPTED);
1702
0
  if (tcpconn) {
1703
    /* Safe: the connection is not yet visible outside TCP main. */
1704
0
    tcpconn->refcnt++;
1705
0
    sh_log(tcpconn->hist, TCP_REF, "accept, (%d)", tcpconn->refcnt);
1706
0
    tcpconn_add(tcpconn);
1707
0
    LM_DBG("new connection: %p %d flags: %04x\n",
1708
0
      tcpconn, tcpconn->fd, tcpconn->flags);
1709
0
    if (reactor_add_reader(tcpconn->fd, F_TCPCONN, RCT_PRIO_NET,
1710
0
        tcpconn) < 0) {
1711
0
      LM_ERR("failed to add accepted TCP conn to reactor\n");
1712
0
      id = tcpconn->id;
1713
0
      TCPCONN_LOCK(id);
1714
0
      tcpconn->refcnt--;
1715
0
      if (tcpconn->refcnt == 0) {
1716
0
        _tcpconn_rm(tcpconn, 1);
1717
0
        close(new_sock);
1718
0
      } else {
1719
0
        tcpconn->lifetime = 0;
1720
0
        tcpconn->timeout = 0;
1721
0
      }
1722
0
      TCPCONN_UNLOCK(id);
1723
0
    } else {
1724
0
      tcpconn->flags &= ~F_CONN_REMOVED_READ;
1725
0
      tcpconn_put(tcpconn);
1726
0
    }
1727
0
  } else {
1728
0
    LM_ERR("tcpconn_new failed, closing socket\n");
1729
0
    close(new_sock);
1730
0
  }
1731
0
  return 1; /* accept() was successful */
1732
0
}
1733
1734
1735
/*! \brief
1736
 * handles an io event on one of the watched tcp connections
1737
 *
1738
 * \param    tcpconn - pointer to the tcp_connection for which we have an io ev.
1739
 * \param    fd_i    - index in the fd_array table (needed for delete)
1740
 * \return   handle_* return convention, but on success it always returns 0
1741
 */
1742
inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i,
1743
    int event_type)
1744
0
{
1745
0
  int err;
1746
0
  unsigned int err_len;
1747
1748
0
  if (event_type == IO_WATCH_READ) {
1749
0
    LM_DBG("data available on %p %d\n", tcpconn, tcpconn->fd);
1750
0
    if (reactor_del_reader(tcpconn->fd, fd_i, 0) == -1)
1751
0
      return -1;
1752
0
    tcpconn->flags |= F_CONN_REMOVED_READ;
1753
0
    tcpconn_ref(tcpconn); /* refcnt ++ */
1754
0
    sh_log(tcpconn->hist, TCP_REF, "tcp-main read queued, (%d)",
1755
0
      tcpconn->refcnt);
1756
0
    if (tcp_queue_job(tcpconn, TCP_READ_JOB) < 0) {
1757
0
      LM_ERR("failed queuing TCP read job\n");
1758
0
      if (reactor_add_reader(tcpconn->fd, F_TCPCONN, RCT_PRIO_NET,
1759
0
          tcpconn) < 0) {
1760
0
        tcp_fail_conn(tcpconn, "Failed queueing read", 0);
1761
0
        return 0;
1762
0
      }
1763
0
      tcpconn->flags &= ~F_CONN_REMOVED_READ;
1764
0
      tcpconn_put(tcpconn);
1765
0
    }
1766
0
    return 0;
1767
0
  } else {
1768
0
    LM_DBG("connection %p fd %d is now writable\n", tcpconn, tcpconn->fd);
1769
    /* we received a write event */
1770
0
    if (tcpconn->state == S_CONN_CONNECTING) {
1771
      /* we're coming from an async connect & write
1772
       * let's see if we connected successfully */
1773
0
      err_len = sizeof(err);
1774
0
      if (getsockopt(tcpconn->fd, SOL_SOCKET, SO_ERROR, &err, &err_len) < 0 ||
1775
0
          err != 0) {
1776
0
        LM_DBG("Failed connection attempt\n");
1777
0
        tcpconn_ref(tcpconn);
1778
0
        sh_log(tcpconn->hist, TCP_REF, "tcpconn connect, (%d)", tcpconn->refcnt);
1779
0
        reactor_del_all(tcpconn->fd, fd_i, IO_FD_CLOSING);
1780
0
        tcpconn->flags|=F_CONN_REMOVED;
1781
0
        tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE,
1782
0
          "Async connect failed");
1783
0
        sh_log(tcpconn->hist, TCP_UNREF, "tcpconn connect, (%d)", tcpconn->refcnt);
1784
0
        tcpconn_destroy(tcpconn);
1785
0
        return 0;
1786
0
      }
1787
1788
      /* we successfully connected - further treat this case as if we
1789
       * were coming from an async write */
1790
0
      tcpconn->state = S_CONN_OK;
1791
0
      LM_DBG("Successfully completed previous async connect\n");
1792
1793
      /* now that we completed the async connection, we also need to
1794
       * listen for READ events, otherwise these will get lost */
1795
0
      if (tcpconn->flags & F_CONN_REMOVED_READ) {
1796
0
          reactor_add_reader(tcpconn->fd, F_TCPCONN, RCT_PRIO_NET,
1797
0
            tcpconn);
1798
0
          tcpconn->flags &= ~F_CONN_REMOVED_READ;
1799
0
        }
1800
1801
0
      goto async_write;
1802
0
    } else {
1803
0
async_write:
1804
      /* no more write events for now */
1805
0
        if (reactor_del_writer(tcpconn->fd, fd_i, 0) == -1)
1806
0
          return -1;
1807
0
      tcpconn->flags |= F_CONN_REMOVED_WRITE;
1808
0
      tcpconn_ref(tcpconn); /* refcnt ++ */
1809
0
      sh_log(tcpconn->hist, TCP_REF, "tcpconn write, (%d)",
1810
0
        tcpconn->refcnt);
1811
0
      if (tcp_queue_write_job(tcpconn) < 0) {
1812
0
        LM_ERR("failed queuing TCP write job\n");
1813
0
        if (reactor_add_writer(tcpconn->fd, F_TCPCONN, RCT_PRIO_NET,
1814
0
            tcpconn) < 0) {
1815
0
          tcp_fail_conn(tcpconn, "Failed queueing write", 0);
1816
0
          return 0;
1817
0
        }
1818
0
        tcpconn->flags &= ~F_CONN_REMOVED_WRITE;
1819
0
        tcpconn_put(tcpconn);
1820
0
      }
1821
0
      return 0;
1822
0
    }
1823
0
  }
1824
0
}
1825
1826
1827
/*! \brief generic handle io routine, it will call the appropiate
1828
 *  handle_xxx() based on the fd_map type
1829
 *
1830
 * \param  fm  - pointer to a fd hash entry
1831
 * \param  idx - index in the fd_array (or -1 if not known)
1832
 * \return -1 on error
1833
 *          0 on EAGAIN or when by some other way it is known that no more
1834
 *            io events are queued on the fd (the receive buffer is empty).
1835
 *            Usefull to detect when there are no more io events queued for
1836
 *            sigio_rt, epoll_et, kqueue.
1837
 *         >0 on successful read from the fd (when there might be more io
1838
 *            queued -- the receive buffer might still be non-empty)
1839
 */
1840
inline static int handle_io(struct fd_map* fm, int idx,int event_type)
1841
0
{
1842
0
  int ret = 0;
1843
1844
0
  pt_become_active();
1845
  /* for now we do not do any profiling here as all ops here are
1846
     only internals related to TCP passing beetween processing, no real
1847
     processing
1848
   */
1849
0
  switch(fm->type){
1850
0
    case F_TCP_LISTENER:
1851
0
      ret = handle_new_connect((const struct socket_info*)fm->data);
1852
0
      break;
1853
0
    case F_TCPCONN:
1854
0
      ret = handle_tcpconn_ev((struct tcp_connection*)fm->data, idx,
1855
0
        event_type);
1856
0
      break;
1857
0
    case F_TCP_NOTIFY:
1858
0
      ret = handle_tcp_notify(fm->fd);
1859
0
      break;
1860
0
    case F_IPC:
1861
0
      ipc_handle_job(fm->fd);
1862
0
      break;
1863
0
    case F_NONE:
1864
0
      LM_CRIT("empty fd map\n");
1865
0
      goto error;
1866
0
    default:
1867
0
      LM_CRIT("unknown fd type %d\n", fm->type);
1868
0
      goto error;
1869
0
  }
1870
0
  pt_become_idle();
1871
0
  return ret;
1872
0
error:
1873
0
  pt_become_idle();
1874
0
  return -1;
1875
0
}
1876
1877
1878
/*
1879
 * iterates through all TCP connections and closes expired ones
1880
 * Note: runs once per second at most
1881
 */
1882
#define tcpconn_lifetime(last_sec) \
1883
  do { \
1884
    int now; \
1885
    now = get_ticks(); \
1886
    if (last_sec != now) { \
1887
      last_sec = now; \
1888
      __tcpconn_lifetime(0); \
1889
    } \
1890
  } while (0)
1891
1892
1893
/*! \brief very inefficient for now - FIXME
1894
 * keep in sync with tcpconn_destroy, the "delete" part should be
1895
 * the same except for io_watch_del..
1896
 * \todo FIXME (very inefficient for now)
1897
 */
1898
static inline void __tcpconn_lifetime(int shutdown)
1899
0
{
1900
0
  struct tcp_connection *c, *next;
1901
0
  unsigned int ticks,part;
1902
0
  unsigned h;
1903
0
  int fd;
1904
0
  void *reason;
1905
1906
0
  if (have_ticks())
1907
0
    ticks=get_ticks();
1908
0
  else
1909
0
    ticks=0;
1910
1911
0
  for( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) {
1912
0
    if (!shutdown) TCPCONN_LOCK(part); /* fixme: we can lock only on delete IMO */
1913
0
    for(h=0; h<TCP_ID_HASH_SIZE; h++){
1914
0
      c=TCP_PART(part).tcpconn_id_hash[h];
1915
0
      while(c){
1916
0
        next=c->id_next;
1917
0
        if (shutdown || ((c->refcnt == 0) &&
1918
0
        ((ticks > c->lifetime) ||
1919
0
        (c->msg_attempts && ticks > c->timeout)))) {
1920
0
          if (!shutdown)
1921
0
            LM_DBG("timeout for hash=%d - %p"
1922
0
                " (%d > %d)\n", h, c, ticks, c->lifetime);
1923
0
          fd=c->fd;
1924
          /* report the closing of the connection . Note that
1925
           * there are connectioned that use an foced expire to 0
1926
           * as a way to be deleted - we are not interested in */
1927
          /* Also, do not trigger reporting when shutdown
1928
           * is done */
1929
0
          if (c->lifetime>0 && !shutdown) {
1930
0
            reason = (c->msg_attempts && ticks > c->timeout) ?
1931
0
              "Timeout waiting for a complete message" :
1932
0
              "Timeout on no traffic";
1933
0
            tcp_trigger_report(c, TCP_REPORT_CLOSE, reason);
1934
0
          }
1935
0
          if ((!shutdown)&&(fd>0)&&(c->refcnt==0)) {
1936
            /* if any of read or write are set, we need to remove
1937
             * the fd from the reactor */
1938
0
            if ((c->flags & F_CONN_REMOVED) != F_CONN_REMOVED){
1939
0
              reactor_del_all( fd, -1, IO_FD_CLOSING);
1940
0
              c->flags|=F_CONN_REMOVED;
1941
0
            }
1942
0
            close(fd);
1943
0
            c->fd = -1;
1944
0
            }
1945
0
            _tcpconn_rm(c, shutdown?1:0);
1946
0
          }
1947
0
          c=next;
1948
0
        }
1949
0
    }
1950
0
    if (!shutdown) TCPCONN_UNLOCK(part);
1951
0
  }
1952
0
}
1953
1954
1955
static void tcp_main_server(void)
1956
0
{
1957
0
  static unsigned int last_sec = 0;
1958
0
  struct socket_info_full* sif;
1959
0
  struct sr_module *m;
1960
0
  int n;
1961
1962
  /* instruct tls_mgm to initialize all TLS domains */
1963
0
  for (m=modules; m; m = m->next) {
1964
0
    if (strcmp(m->exports->name, "tls_mgm") == 0)
1965
0
      if (init_child(PROC_TCP_MAIN) < 0) {
1966
0
        LM_ERR("error in init_child for PROC_TCP_MAIN\n");
1967
0
        goto error;
1968
0
      }
1969
0
  }
1970
1971
  /* we run in a separate, dedicated process, with its own reactor
1972
   * (reactors are per process) */
1973
0
  if (init_worker_reactor("TCP_main", RCT_PRIO_MAX)<0)
1974
0
    goto error;
1975
1976
  /* now start watching all the fds */
1977
1978
  /* add all the sockets we listens on for connections */
1979
0
  for (n = PROTO_FIRST; n < PROTO_LAST; n++)
1980
0
    if (is_tcp_based_proto(n))
1981
0
      for (sif = protos[n].listeners; sif; sif = sif->next) {
1982
0
        struct socket_info* si = &sif->socket_info;
1983
0
        if (protos[n].tran.bind_listener &&
1984
0
            protos[n].tran.bind_listener(si) < 0) {
1985
0
          LM_ERR("failed to bind listener [%.*s], proto %s\n",
1986
0
            si->name.len, si->name.s, protos[n].name);
1987
0
          goto error;
1988
0
        }
1989
0
        if (si->socket != -1 &&
1990
0
            reactor_add_reader(si->socket, F_TCP_LISTENER,
1991
0
              RCT_PRIO_NET, si) < 0) {
1992
0
          LM_ERR("failed to add listen socket to reactor\n");
1993
0
          goto error;
1994
0
        }
1995
0
      }
1996
  /* init: start watching for the IPC jobs */
1997
0
  if (reactor_add_reader(IPC_FD_READ_SELF, F_IPC, RCT_PRIO_ASYNC, NULL)<0){
1998
0
    LM_CRIT("failed to add IPC pipe to reactor\n");
1999
0
    goto error;
2000
0
  }
2001
2002
0
  if (tcp_pool_init() < 0)
2003
0
    goto error;
2004
2005
0
  is_tcp_main = 1;
2006
2007
  /* main loop (requires "handle_io()" implementation) */
2008
0
  reactor_main_loop( TCP_MAIN_SELECT_TIMEOUT, error,
2009
0
      tcpconn_lifetime(last_sec) );
2010
2011
0
error:
2012
0
  tcp_pool_destroy();
2013
0
  destroy_worker_reactor();
2014
0
  LM_CRIT("exiting...");
2015
0
  exit(-1);
2016
0
}
2017
2018
2019
2020
/**************************** Control functions ******************************/
2021
2022
/* initializes the TCP network level in terms of data structures */
2023
int tcp_init(void)
2024
0
{
2025
0
  unsigned int i;
2026
2027
  /* first we do auto-detection to see if there are any TCP based
2028
   * protocols loaded */
2029
0
  for ( i=PROTO_FIRST ; i<PROTO_LAST ; i++ )
2030
0
    if (is_tcp_based_proto(i) && proto_has_listeners(i)) {
2031
0
      tcp_disabled=0;
2032
0
      break;
2033
0
    }
2034
2035
0
  tcp_init_con_profiles();
2036
2037
0
  if (tcp_disabled)
2038
0
    return 0;
2039
2040
#ifdef DBG_TCPCON
2041
  con_hist = shl_init("TCP con", 10000, 0);
2042
  if (!con_hist) {
2043
    LM_ERR("oom con hist\n");
2044
    goto error;
2045
  }
2046
#endif
2047
2048
0
  if (tcp_auto_scaling_profile) {
2049
0
    s_profile = get_scaling_profile(tcp_auto_scaling_profile);
2050
0
    if (s_profile==NULL) {
2051
0
      LM_WARN("TCP scaling profile <%s> not defined "
2052
0
        "-> ignoring it...\n", tcp_auto_scaling_profile);
2053
0
    } else {
2054
0
      LM_WARN("ignoring TCP auto-scaling profile in single IO mode\n");
2055
0
      s_profile = NULL;
2056
0
    }
2057
0
  }
2058
2059
0
  tcp_workers_max_no = (s_profile && (tcp_workers_no<s_profile->max_procs)) ?
2060
0
    s_profile->max_procs : tcp_workers_no ;
2061
2062
  /* init tcp workers array */
2063
0
  tcp_workers = (struct tcp_worker*)shm_malloc
2064
0
    ( tcp_workers_max_no*sizeof(struct tcp_worker) );
2065
0
  if (tcp_workers==0) {
2066
0
    LM_CRIT("could not alloc tcp_workers array in shm memory\n");
2067
0
    goto error;
2068
0
  }
2069
0
  memset( tcp_workers, 0, tcp_workers_max_no*sizeof(struct tcp_worker));
2070
  /* init globals */
2071
0
  connection_id=(unsigned int*)shm_malloc(sizeof(unsigned int));
2072
0
  if (connection_id==0){
2073
0
    LM_CRIT("could not alloc globals in shm memory\n");
2074
0
    goto error;
2075
0
  }
2076
  // The  rand()  function returns a pseudo-random integer in the range 0 to
2077
  // RAND_MAX inclusive (i.e., the mathematical range [0, RAND_MAX]).
2078
0
  *connection_id=(unsigned int)rand();
2079
0
  tcp_connections_no = (unsigned int *)shm_malloc(sizeof(*tcp_connections_no));
2080
0
  if (tcp_connections_no == 0) {
2081
0
    LM_CRIT("could not alloc tcp connection counter in shm memory\n");
2082
0
    goto error;
2083
0
  }
2084
0
  *tcp_connections_no = 0;
2085
0
  tcp_main_proc_no = (int *)shm_malloc(sizeof(*tcp_main_proc_no));
2086
0
  if (tcp_main_proc_no == 0) {
2087
0
    LM_CRIT("could not alloc tcp main proc slot in shm memory\n");
2088
0
    goto error;
2089
0
  }
2090
0
  *tcp_main_proc_no = -1;
2091
0
  tcp_write_queue = shm_malloc(sizeof(*tcp_write_queue));
2092
0
  if (tcp_write_queue == 0) {
2093
0
    LM_CRIT("could not alloc tcp shared write queue in shm memory\n");
2094
0
    goto error;
2095
0
  }
2096
0
  memset(tcp_write_queue, 0, sizeof(*tcp_write_queue));
2097
0
  if (cond_init(&tcp_write_queue->cond) != 0) {
2098
0
    LM_CRIT("could not init tcp shared write queue cond\n");
2099
0
    shm_free(tcp_write_queue);
2100
0
    tcp_write_queue = 0;
2101
0
    goto error;
2102
0
  }
2103
0
  tcp_connections_lock = lock_alloc();
2104
0
  if (tcp_connections_lock == 0) {
2105
0
    LM_CRIT("could not alloc tcp connection counter lock\n");
2106
0
    goto error;
2107
0
  }
2108
0
  if (lock_init(tcp_connections_lock) == 0) {
2109
0
    LM_CRIT("could not init tcp connection counter lock\n");
2110
0
    lock_dealloc((void *)tcp_connections_lock);
2111
0
    tcp_connections_lock = 0;
2112
0
    goto error;
2113
0
  }
2114
0
  memset( &tcp_parts, 0, TCP_PARTITION_SIZE*sizeof(struct tcp_partition));
2115
  /* init partitions */
2116
0
  for( i=0 ; i<TCP_PARTITION_SIZE ; i++ ) {
2117
    /* init lock */
2118
0
    tcp_parts[i].tcpconn_lock=lock_alloc();
2119
0
    if (tcp_parts[i].tcpconn_lock==0){
2120
0
      LM_CRIT("could not alloc lock\n");
2121
0
      goto error;
2122
0
    }
2123
0
    if (lock_init(tcp_parts[i].tcpconn_lock)==0){
2124
0
      LM_CRIT("could not init lock\n");
2125
0
      lock_dealloc((void*)tcp_parts[i].tcpconn_lock);
2126
0
      tcp_parts[i].tcpconn_lock=0;
2127
0
      goto error;
2128
0
    }
2129
    /* alloc hashtables*/
2130
0
    tcp_parts[i].tcpconn_aliases_hash=(struct tcp_conn_alias**)
2131
0
      shm_malloc(TCP_ALIAS_HASH_SIZE* sizeof(struct tcp_conn_alias*));
2132
0
    if (tcp_parts[i].tcpconn_aliases_hash==0){
2133
0
      LM_CRIT("could not alloc address hashtable in shm memory\n");
2134
0
      goto error;
2135
0
    }
2136
0
    tcp_parts[i].tcpconn_id_hash=(struct tcp_connection**)
2137
0
      shm_malloc(TCP_ID_HASH_SIZE*sizeof(struct tcp_connection*));
2138
0
    if (tcp_parts[i].tcpconn_id_hash==0){
2139
0
      LM_CRIT("could not alloc id hashtable in shm memory\n");
2140
0
      goto error;
2141
0
    }
2142
    /* init hashtables*/
2143
0
    memset((void*)tcp_parts[i].tcpconn_aliases_hash, 0,
2144
0
      TCP_ALIAS_HASH_SIZE * sizeof(struct tcp_conn_alias*));
2145
0
    memset((void*)tcp_parts[i].tcpconn_id_hash, 0,
2146
0
      TCP_ID_HASH_SIZE * sizeof(struct tcp_connection*));
2147
0
  }
2148
2149
0
  return 0;
2150
0
error:
2151
  /* clean-up */
2152
0
  tcp_destroy();
2153
0
  return -1;
2154
0
}
2155
2156
2157
/* destroys the TCP data */
2158
void tcp_destroy(void)
2159
0
{
2160
0
  int part;
2161
2162
0
  if (tcp_parts[0].tcpconn_id_hash)
2163
      /* force close/expire for all active tcpconns*/
2164
0
      __tcpconn_lifetime(1);
2165
2166
0
  if (connection_id){
2167
0
    shm_free(connection_id);
2168
0
    connection_id=0;
2169
0
  }
2170
2171
0
  if (tcp_connections_no) {
2172
0
    shm_free(tcp_connections_no);
2173
0
    tcp_connections_no = 0;
2174
0
  }
2175
2176
0
  if (tcp_main_proc_no) {
2177
0
    shm_free(tcp_main_proc_no);
2178
0
    tcp_main_proc_no = 0;
2179
0
  }
2180
2181
0
  if (tcp_dispatch_sock[0] >= 0) {
2182
0
    close(tcp_dispatch_sock[0]);
2183
0
    tcp_dispatch_sock[0] = -1;
2184
0
  }
2185
0
  if (tcp_dispatch_sock[1] >= 0) {
2186
0
    close(tcp_dispatch_sock[1]);
2187
0
    tcp_dispatch_sock[1] = -1;
2188
0
  }
2189
2190
0
  if (tcp_write_queue) {
2191
    /* Skip cond teardown during attendant shutdown. */
2192
    /* cond_destroy(&tcp_write_queue->cond); */
2193
0
    shm_free(tcp_write_queue);
2194
0
    tcp_write_queue = 0;
2195
0
  }
2196
2197
0
  if (tcp_connections_lock) {
2198
0
    lock_destroy(tcp_connections_lock);
2199
0
    lock_dealloc((void *)tcp_connections_lock);
2200
0
    tcp_connections_lock = 0;
2201
0
  }
2202
2203
0
  for ( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) {
2204
0
    if (tcp_parts[part].tcpconn_id_hash){
2205
0
      shm_free(tcp_parts[part].tcpconn_id_hash);
2206
0
      tcp_parts[part].tcpconn_id_hash=0;
2207
0
    }
2208
0
    if (tcp_parts[part].tcpconn_aliases_hash){
2209
0
      shm_free(tcp_parts[part].tcpconn_aliases_hash);
2210
0
      tcp_parts[part].tcpconn_aliases_hash=0;
2211
0
    }
2212
0
    if (tcp_parts[part].tcpconn_lock){
2213
0
      lock_destroy(tcp_parts[part].tcpconn_lock);
2214
0
      lock_dealloc((void*)tcp_parts[part].tcpconn_lock);
2215
0
      tcp_parts[part].tcpconn_lock=0;
2216
0
    }
2217
0
  }
2218
0
}
2219
2220
2221
static int _get_own_tcp_worker_id(void)
2222
0
{
2223
0
  pid_t pid;
2224
0
  int i;
2225
2226
0
  pid = getpid();
2227
0
  for( i=0 ; i<tcp_workers_max_no ; i++)
2228
0
    if(tcp_workers[i].pid==pid)
2229
0
      return i;
2230
2231
0
  return -1;
2232
0
}
2233
2234
2235
void tcp_reset_worker_slot(void)
2236
0
{
2237
0
  int i;
2238
2239
0
  if ((i=_get_own_tcp_worker_id())>=0) {
2240
0
    tcp_workers[i].state=STATE_INACTIVE;
2241
0
    tcp_workers[i].pid=0;
2242
0
    tcp_workers[i].pt_idx=0;
2243
0
  }
2244
0
}
2245
2246
2247
/* counts the number of TCP processes to start with; this number may
2248
 * change during runtime due auto-scaling */
2249
int tcp_count_processes(unsigned int *extra)
2250
0
{
2251
0
  if (extra) *extra = 0;
2252
2253
0
  if (tcp_disabled)
2254
0
    return 0;
2255
2256
0
  return 1 /* tcp main / IO process */ + tcp_workers_no /* dispatch workers */;
2257
0
}
2258
2259
2260
int tcp_start_processes(int *chd_rank, int *startup_done)
2261
0
{
2262
0
  int r, p_id, flags;
2263
0
  const struct internal_fork_params ifp_sr_tcp = {
2264
0
    .proc_desc = "SIP receiver TCP",
2265
0
    .flags = OSS_PROC_NEEDS_SCRIPT,
2266
0
    .type = TYPE_TCP,
2267
0
  };
2268
2269
0
  if (tcp_disabled)
2270
0
    return 0;
2271
2272
  /* create the shared dispatch socket from TCP main to TCP workers */
2273
0
  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, tcp_dispatch_sock) < 0) {
2274
0
    LM_ERR("socketpair failed for TCP worker dispatch: %s\n",
2275
0
      strerror(errno));
2276
0
    goto error;
2277
0
  }
2278
0
  flags = fcntl(tcp_dispatch_sock[0], F_GETFL);
2279
0
  if (flags == -1) {
2280
0
    LM_ERR("fcntl failed for TCP worker dispatch socket: %s\n",
2281
0
      strerror(errno));
2282
0
    goto error;
2283
0
  }
2284
0
  if (fcntl(tcp_dispatch_sock[0], F_SETFL, flags | O_NONBLOCK) == -1) {
2285
0
    LM_ERR("failed to set non-blocking on TCP worker dispatch socket: %s\n",
2286
0
      strerror(errno));
2287
0
    goto error;
2288
0
  }
2289
0
  flags = fcntl(tcp_dispatch_sock[1], F_GETFL);
2290
0
  if (flags == -1) {
2291
0
    LM_ERR("fcntl failed for TCP worker dispatch socket: %s\n",
2292
0
      strerror(errno));
2293
0
    goto error;
2294
0
  }
2295
0
  if (fcntl(tcp_dispatch_sock[1], F_SETFL, flags | O_NONBLOCK) == -1) {
2296
0
    LM_ERR("failed to set non-blocking on TCP worker dispatch socket: %s\n",
2297
0
      strerror(errno));
2298
0
    goto error;
2299
0
  }
2300
2301
0
  for (r = 0; r < tcp_workers_no; r++) {
2302
0
    (*chd_rank)++;
2303
0
    p_id = internal_fork(&ifp_sr_tcp);
2304
0
    if (p_id < 0) {
2305
0
      LM_ERR("cannot fork TCP worker process %d\n", r);
2306
0
      goto error;
2307
0
    } else if (p_id > 0) {
2308
      /* parent */
2309
0
      tcp_workers[r].state = STATE_ACTIVE;
2310
0
      tcp_workers[r].pt_idx = p_id;
2311
0
      continue;
2312
0
    }
2313
2314
    /* child */
2315
0
    if (tcp_dispatch_sock[1] >= 0)
2316
0
      close(tcp_dispatch_sock[1]);
2317
0
    tcp_dispatch_sock[1] = -1;
2318
2319
0
    set_proc_attrs("TCP receiver");
2320
0
    tcp_workers[r].pid = getpid();
2321
2322
0
    if (tcp_worker_proc_reactor_init(tcp_dispatch_sock[0]) < 0 ||
2323
0
        init_child(*chd_rank) < 0) {
2324
0
      LM_ERR("init_child failed for TCP worker %d\n", r);
2325
0
      report_failure_status();
2326
0
      if (startup_done)
2327
0
        *startup_done = -1;
2328
0
      exit(-1);
2329
0
    }
2330
2331
    /* first TCP worker runs startup_route if not already run */
2332
0
    if (startup_done && *startup_done == 0 && r == 0) {
2333
0
      LM_DBG("running startup route for first TCP worker\n");
2334
0
      if (run_startup_route() < 0) {
2335
0
        LM_ERR("startup route processing failed in TCP worker\n");
2336
0
        report_failure_status();
2337
0
        *startup_done = -1;
2338
0
        exit(-1);
2339
0
      }
2340
0
      *startup_done = 1;
2341
0
    }
2342
2343
0
    report_conditional_status((!no_daemon_mode), 0);
2344
0
    tcp_worker_proc_loop();
2345
0
  }
2346
2347
0
  if (tcp_dispatch_sock[0] >= 0) {
2348
0
    close(tcp_dispatch_sock[0]);
2349
0
    tcp_dispatch_sock[0] = -1;
2350
0
  }
2351
2352
0
  if (startup_done && tcp_workers_no > 0)
2353
0
    while (!(*startup_done)) {
2354
0
      usleep(5);
2355
0
      handle_sigs();
2356
0
    }
2357
2358
0
  return 0;
2359
0
error:
2360
0
  if (tcp_dispatch_sock[0] >= 0) {
2361
0
    close(tcp_dispatch_sock[0]);
2362
0
    tcp_dispatch_sock[0] = -1;
2363
0
  }
2364
0
  if (tcp_dispatch_sock[1] >= 0) {
2365
0
    close(tcp_dispatch_sock[1]);
2366
0
    tcp_dispatch_sock[1] = -1;
2367
0
  }
2368
0
  return -1;
2369
0
}
2370
2371
static int tcpconn_update_local_port(struct tcp_connection *tcpconn)
2372
0
{
2373
0
  union sockaddr_union local_su;
2374
0
  socklen_t su_size;
2375
2376
0
  su_size = sockaddru_len(tcpconn->rcv.src_su);
2377
0
  if (getsockname(tcpconn->fd, (struct sockaddr *)&local_su, &su_size) < 0) {
2378
0
    LM_ERR("failed to get local socket info on conn %u: %s\n",
2379
0
      tcpconn->id, strerror(errno));
2380
0
    return -1;
2381
0
  }
2382
2383
0
  tcpconn->rcv.dst_port = su_getport(&local_su);
2384
0
  return 0;
2385
0
}
2386
2387
static int tcpconn_prepare_write(struct tcp_connection *tcpconn)
2388
0
{
2389
0
  int fd;
2390
0
  int connected = 0;
2391
2392
0
  if ((tcpconn->flags & F_CONN_INIT) == 0 &&
2393
0
      protos[tcpconn->type].net.stream.conn.init) {
2394
0
    if (protos[tcpconn->type].net.stream.conn.init(tcpconn) < 0) {
2395
0
      LM_ERR("failed to init proto %d conn %p in TCP main\n",
2396
0
        tcpconn->type, tcpconn);
2397
0
      return -1;
2398
0
    }
2399
0
    tcpconn->flags |= F_CONN_INIT;
2400
0
  }
2401
2402
0
  if (tcpconn->fd < 0) {
2403
0
    if (!tcpconn->rcv.bind_address) {
2404
0
      LM_ERR("missing bind_address for outbound conn %u\n", tcpconn->id);
2405
0
      return -1;
2406
0
    }
2407
2408
0
    fd = tcp_sync_connect_fd(&tcpconn->rcv.bind_address->su,
2409
0
        &tcpconn->rcv.src_su, tcpconn->type, &tcpconn->profile,
2410
0
        tcpconn->rcv.bind_address->flags,
2411
0
        tcpconn->rcv.bind_address->tos);
2412
0
    if (fd < 0)
2413
0
      return -1;
2414
2415
0
    tcpconn->fd = fd;
2416
0
    if (tcpconn_update_local_port(tcpconn) < 0) {
2417
0
      close(fd);
2418
0
      tcpconn->fd = -1;
2419
0
      return -1;
2420
0
    }
2421
2422
0
    tcpconn->state = S_CONN_OK;
2423
0
    connected = 1;
2424
0
  }
2425
2426
0
  if (connected && protos[tcpconn->type].net.stream.conn.connect) {
2427
0
    if (protos[tcpconn->type].net.stream.conn.connect(tcpconn) < 0) {
2428
0
      LM_ERR("failed to finish proto %d connect on conn %u\n",
2429
0
        tcpconn->type, tcpconn->id);
2430
0
      return -1;
2431
0
    }
2432
0
  }
2433
2434
0
  return 0;
2435
0
}
2436
2437
2438
int tcp_start_listener(void)
2439
0
{
2440
0
  int p_id;
2441
0
  const struct internal_fork_params ifp_tcp_main = {
2442
0
    .proc_desc = "TCP main",
2443
0
    .flags = 0,
2444
0
    .type = TYPE_NONE,
2445
0
  };
2446
2447
0
  if (tcp_disabled)
2448
0
    return 0;
2449
2450
  /* start the TCP manager process */
2451
0
  if ( (p_id=internal_fork(&ifp_tcp_main))<0 ) {
2452
0
    LM_CRIT("cannot fork tcp main process\n");
2453
0
    goto error;
2454
0
  }else if (p_id==0){
2455
      /* child */
2456
0
    report_conditional_status( (!no_daemon_mode), 0);
2457
2458
0
    tcp_main_server();
2459
0
    exit(-1);
2460
0
  }
2461
0
  *tcp_main_proc_no = p_id;
2462
2463
0
  return 0;
2464
0
error:
2465
0
  return -1;
2466
0
}
2467
2468
int tcp_has_async_write(void)
2469
0
{
2470
0
  return reactor_has_async();
2471
0
}
2472
2473
static int tcp_close_conn_run(void *data)
2474
0
{
2475
0
  struct tcp_connection *conn = data;
2476
2477
0
  if (!conn)
2478
0
    return -1;
2479
2480
0
  tcp_conn_destroy(conn);
2481
0
  return 0;
2482
0
}
2483
2484
static void tcp_close_conn_rpc(int _, void *param)
2485
0
{
2486
0
  tcp_close_conn_run(param);
2487
0
}
2488
2489
int tcp_close_connection(str *ipport)
2490
0
{
2491
0
  struct tcp_connection *conn = NULL;
2492
0
  struct ip_addr *ip;
2493
0
  str host;
2494
0
  unsigned int id;
2495
0
  int port, proto, rc, tcp_main_proc;
2496
0
  int start_proto, end_proto;
2497
0
  unsigned int p;
2498
0
  char *sep;
2499
2500
0
  if (tcp_disabled)
2501
0
    return 0;
2502
2503
0
  sep = q_memchr(ipport->s, ':', ipport->len);
2504
0
  if (!sep) {
2505
0
    if (str2int(ipport, &id) < 0) {
2506
0
      LM_ERR("failed to parse tcp connection [%.*s]\n",
2507
0
        ipport->len, ipport->s);
2508
0
      return -1;
2509
0
    }
2510
2511
0
    switch (tcp_conn_get(id, NULL, 0, PROTO_NONE, NULL, &conn, NULL)) {
2512
0
    case 1:
2513
0
      goto found;
2514
0
    case -1:
2515
0
      return -1;
2516
0
    default:
2517
0
      return 0;
2518
0
    }
2519
0
  }
2520
2521
0
  if (parse_phostport(ipport->s, ipport->len, &host.s, &host.len,
2522
0
  &port, &proto) != 0 || port <= 0) {
2523
0
    LM_ERR("failed to parse tcp connection [%.*s]\n",
2524
0
      ipport->len, ipport->s);
2525
0
    return -1;
2526
0
  }
2527
2528
0
  ip = str2ip(&host);
2529
0
  if (!ip)
2530
0
    ip = str2ip6(&host);
2531
0
  if (!ip) {
2532
0
    LM_ERR("invalid IP in tcp connection [%.*s]\n",
2533
0
      ipport->len, ipport->s);
2534
0
    return -1;
2535
0
  }
2536
2537
0
  if (proto != PROTO_NONE) {
2538
0
    if (!is_tcp_based_proto(proto)) {
2539
0
      LM_ERR("protocol %d is not TCP based for [%.*s]\n",
2540
0
        proto, ipport->len, ipport->s);
2541
0
      return -1;
2542
0
    }
2543
0
    start_proto = proto;
2544
0
    end_proto = proto + 1;
2545
0
  } else {
2546
0
    start_proto = 0;
2547
0
    end_proto = PROTO_LAST;
2548
0
  }
2549
2550
0
  for (p = start_proto; p < (unsigned int)end_proto; p++) {
2551
0
    if (!is_tcp_based_proto(p))
2552
0
      continue;
2553
2554
0
    switch (tcp_conn_get(0, ip, port, p, NULL, &conn, NULL)) {
2555
0
    case 1:
2556
0
      goto found;
2557
0
    case -1:
2558
0
      return -1;
2559
0
    }
2560
0
  }
2561
2562
0
  return 0;
2563
2564
0
found:
2565
0
  TCPCONN_LOCK(conn->id);
2566
0
  conn->flags |= F_CONN_FORCE_CLOSED;
2567
0
  TCPCONN_UNLOCK(conn->id);
2568
2569
0
  tcp_main_proc = tcp_get_main_proc_no();
2570
0
  if (tcp_main_proc < 0)
2571
0
    rc = -1;
2572
0
  else if (process_no == tcp_main_proc)
2573
0
    rc = tcp_close_conn_run(conn);
2574
0
  else
2575
0
    rc = ipc_send_rpc(tcp_main_proc, tcp_close_conn_rpc, conn);
2576
2577
0
  if (rc < 0) {
2578
0
    TCPCONN_LOCK(conn->id);
2579
0
    conn->flags &= ~F_CONN_FORCE_CLOSED;
2580
0
    TCPCONN_UNLOCK(conn->id);
2581
0
    tcp_conn_release(conn, 0);
2582
0
    return -1;
2583
0
  }
2584
2585
0
  return 1;
2586
0
}
2587
2588
2589
/***************************** MI functions **********************************/
2590
2591
mi_response_t *mi_tcp_list_conns(const mi_params_t *params,
2592
            struct mi_handler *async_hdl)
2593
0
{
2594
0
  mi_response_t *resp;
2595
0
  mi_item_t *resp_obj;
2596
0
  mi_item_t *conns_arr, *conn_item;
2597
0
  struct tcp_connection *conn;
2598
0
  time_t _ts;
2599
0
  char date_buf[MI_DATE_BUF_LEN];
2600
0
  int date_buf_len;
2601
0
  unsigned int i,j,part;
2602
0
  char proto_buf[PROTO_NAME_MAX_SIZE];
2603
0
  char *proto_s;
2604
0
  int proto_len;
2605
0
  int proto_filter = PROTO_NONE;
2606
0
  int filter_by_proto = 0;
2607
0
  str proto_name;
2608
0
  struct tm ltime;
2609
0
  char *p;
2610
2611
0
  if (tcp_disabled)
2612
0
    return init_mi_result_null();
2613
2614
0
  switch (try_get_mi_string_param(params, "proto", &proto_s, &proto_len)) {
2615
0
  case 0:
2616
0
    proto_name.s = proto_s;
2617
0
    proto_name.len = proto_len;
2618
2619
0
    if (proto_len == 3 && str_strcasecmp(&proto_name, _str("any")) == 0)
2620
0
      ;
2621
0
    else if (parse_proto((unsigned char *)proto_s, proto_len,
2622
0
        (int *)&proto_filter) < 0)
2623
0
      return init_mi_error(400, MI_SSTR("Bad protocol"));
2624
0
    else
2625
0
      filter_by_proto = 1;
2626
0
    break;
2627
0
  case -1:
2628
0
    break;
2629
0
  default:
2630
0
    return init_mi_param_error();
2631
0
  }
2632
2633
0
  resp = init_mi_result_object(&resp_obj);
2634
0
  if (!resp)
2635
0
    return 0;
2636
2637
0
  conns_arr = add_mi_array(resp_obj, MI_SSTR("Connections"));
2638
0
  if (!conns_arr) {
2639
0
    free_mi_response(resp);
2640
0
    return 0;
2641
0
  }
2642
2643
0
  for( part=0 ; part<TCP_PARTITION_SIZE ; part++) {
2644
0
    TCPCONN_LOCK(part);
2645
0
    for( i=0; i<TCP_ID_HASH_SIZE ; i++ ) {
2646
0
      for(conn=TCP_PART(part).tcpconn_id_hash[i];conn;conn=conn->id_next){
2647
0
        if (filter_by_proto && conn->type != proto_filter)
2648
0
          continue;
2649
2650
        /* add one object fo each conn */
2651
0
        conn_item = add_mi_object(conns_arr, 0, 0);
2652
0
        if (!conn_item)
2653
0
          goto error;
2654
2655
        /* add ID */
2656
0
        if (add_mi_number(conn_item, MI_SSTR("ID"), conn->id) < 0)
2657
0
          goto error;
2658
2659
        /* add type/proto */
2660
0
        p = proto2str(conn->type, proto_buf);
2661
0
        if (add_mi_string(conn_item, MI_SSTR("Type"), proto_buf,
2662
0
          (int)(long)(p-proto_buf)) < 0)
2663
0
          goto error;
2664
2665
        /* add state */
2666
0
        if (add_mi_number(conn_item, MI_SSTR("State"), conn->state) < 0)
2667
0
          goto error;
2668
2669
        /* add Remote IP:Port */
2670
0
        if (add_mi_string_fmt(conn_item, MI_SSTR("Remote"), "%s:%d",
2671
0
          ip_addr2a(&conn->rcv.src_ip), conn->rcv.src_port) < 0)
2672
0
          goto error;
2673
2674
        /* add Local IP:Port */
2675
0
        if (add_mi_string_fmt(conn_item, MI_SSTR("Local"), "%s:%d",
2676
0
          ip_addr2a(&conn->rcv.dst_ip), conn->rcv.dst_port) < 0)
2677
0
          goto error;
2678
2679
0
        if (protos[conn->type].net.stream.conn.dump &&
2680
0
            protos[conn->type].net.stream.conn.dump(conn,
2681
0
              conn_item) < 0)
2682
0
          goto error;
2683
2684
        /* add lifetime */
2685
0
        _ts = (time_t)conn->lifetime + startup_time;
2686
0
        localtime_r(&_ts, &ltime);
2687
0
        date_buf_len = strftime(date_buf, MI_DATE_BUF_LEN - 1,
2688
0
                    "%Y-%m-%d %H:%M:%S", &ltime);
2689
0
        if (date_buf_len != 0) {
2690
0
          if (add_mi_string(conn_item, MI_SSTR("Lifetime"),
2691
0
            date_buf, date_buf_len) < 0)
2692
0
            goto error;
2693
0
        } else {
2694
0
          if (add_mi_number(conn_item, MI_SSTR("Lifetime"), _ts) < 0)
2695
0
            goto error;
2696
0
        }
2697
2698
        /* add the port-aliases */
2699
0
        for( j=0 ; j<conn->aliases ; j++ )
2700
          /* add one node for each conn */
2701
0
          add_mi_number( conn_item, MI_SSTR("Alias port"),
2702
0
            conn->con_aliases[j].port );
2703
2704
0
      }
2705
0
    }
2706
2707
0
    TCPCONN_UNLOCK(part);
2708
0
  }
2709
2710
0
  return resp;
2711
2712
0
error:
2713
0
  TCPCONN_UNLOCK(part);
2714
0
  LM_ERR("failed to add MI item\n");
2715
0
  free_mi_response(resp);
2716
0
  return 0;
2717
0
}
2718
2719
2720
mi_response_t *mi_tcp_close_conn(const mi_params_t *params,
2721
            struct mi_handler *_)
2722
0
{
2723
0
  str ipport;
2724
0
  int rc;
2725
2726
0
  if (get_mi_string_param(params, "ipport", &ipport.s, &ipport.len) < 0)
2727
0
    return init_mi_param_error();
2728
2729
0
  rc = tcp_close_connection(&ipport);
2730
0
  if (rc < 0)
2731
0
    return init_mi_error(400, MI_SSTR("Bad tcp connection"));
2732
0
  if (rc == 0)
2733
0
    return init_mi_result_null();
2734
2735
0
  return init_mi_result_ok();
2736
0
}