Coverage Report

Created: 2025-07-18 06:32

/src/opensips/net/tcp_common.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2019 - OpenSIPS Solutions
3
 *
4
 * This file is part of opensips, a free SIP server.
5
 *
6
 * opensips is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version
10
 *
11
 * opensips is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19
 *
20
 */
21
22
#include "../reactor_defs.h"
23
#include "net_tcp.h"
24
#include "tcp_common.h"
25
#include "tcp_conn_profile.h"
26
#include "../tsend.h"
27
28
/*! \brief blocking connect on a non-blocking fd; it will timeout after
29
 * tcp_connect_timeout
30
 * if BLOCKING_USE_SELECT and HAVE_SELECT are defined it will internally
31
 * use select() instead of poll (bad if fd > FD_SET_SIZE, poll is preferred)
32
 */
33
int tcp_connect_blocking_timeout(int fd, const struct sockaddr *servaddr,
34
                      socklen_t addrlen, int timeout_ms)
35
0
{
36
0
  int n;
37
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
38
  fd_set sel_set;
39
  fd_set orig_set;
40
  struct timeval timeout;
41
#else
42
0
  struct pollfd pf;
43
0
#endif
44
0
  int elapsed;
45
0
  int to;
46
0
  int err;
47
0
  struct timeval begin;
48
0
  unsigned int err_len;
49
0
  int poll_err;
50
0
  char *ip;
51
0
  unsigned short port;
52
53
0
  poll_err=0;
54
0
  to = timeout_ms * 1000;
55
56
0
  if (gettimeofday(&(begin), NULL)) {
57
0
    LM_ERR("Failed to get TCP connect start time\n");
58
0
    goto error;
59
0
  }
60
61
0
again:
62
0
  n=connect(fd, servaddr, addrlen);
63
0
  if (n==-1){
64
0
    if (errno==EINTR){
65
0
      elapsed=get_time_diff(&begin);
66
0
      if (elapsed<to) goto again;
67
0
      else goto error_timeout;
68
0
    }
69
0
    if (errno!=EINPROGRESS && errno!=EALREADY){
70
0
      get_su_info( servaddr, ip, port);
71
0
      LM_ERR("[server=%s:%d] (%d) %s\n",ip, port, errno, strerror(errno));
72
0
      goto error;
73
0
    }
74
0
  }else goto end;
75
76
  /* poll/select loop */
77
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
78
    FD_ZERO(&orig_set);
79
    FD_SET(fd, &orig_set);
80
#else
81
0
    pf.fd=fd;
82
0
    pf.events=POLLOUT;
83
0
#endif
84
0
  while(1){
85
0
    elapsed = get_time_diff(&begin);
86
0
    if (elapsed<to)
87
0
      to-=elapsed;
88
0
    else
89
0
      goto error_timeout;
90
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
91
    sel_set=orig_set;
92
    timeout.tv_sec = to/1000000;
93
    timeout.tv_usec = to%1000000;
94
    n=select(fd+1, 0, &sel_set, 0, &timeout);
95
#else
96
0
    n=poll(&pf, 1, to/1000);
97
0
#endif
98
0
    if (n<0){
99
0
      if (errno==EINTR) continue;
100
0
      get_su_info( servaddr, ip, port);
101
0
      LM_ERR("poll/select failed:[server=%s:%d] (%d) %s\n",
102
0
        ip, port, errno, strerror(errno));
103
0
      goto error;
104
0
    }else if (n==0) /* timeout */ continue;
105
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
106
    if (FD_ISSET(fd, &sel_set))
107
#else
108
0
    if (pf.revents&(POLLERR|POLLHUP|POLLNVAL)){
109
0
      LM_ERR("poll error: flags %d - %d %d %d %d \n", pf.revents,
110
0
           POLLOUT,POLLERR,POLLHUP,POLLNVAL);
111
0
      poll_err=1;
112
0
    }
113
0
#endif
114
0
    {
115
0
      err_len=sizeof(err);
116
0
      if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &err_len) != 0) {
117
0
        get_su_info( servaddr, ip, port);
118
0
        LM_WARN("getsockopt error: fd=%d [server=%s:%d]: (%d) %s\n", fd,
119
0
            ip, port, errno, strerror(errno));
120
0
        goto error;
121
0
      }
122
0
      if ((err==0) && (poll_err==0)) goto end;
123
0
      if (err!=EINPROGRESS && err!=EALREADY){
124
0
        get_su_info( servaddr, ip, port);
125
0
        LM_ERR("failed to retrieve SO_ERROR [server=%s:%d] (%d) %s\n",
126
0
          ip, port, err, strerror(err));
127
0
        goto error;
128
0
      }
129
0
    }
130
0
  }
131
0
error_timeout:
132
  /* timeout */
133
0
  LM_ERR("connect timed out, %d us elapsed out of %d us\n", elapsed,
134
0
    timeout_ms * 1000);
135
0
error:
136
0
  return -1;
137
0
end:
138
0
  return 0;
139
0
}
140
141
int tcp_connect_blocking(int fd, const struct sockaddr *servaddr,
142
                              socklen_t addrlen)
143
0
{
144
0
  return tcp_connect_blocking_timeout(fd, servaddr, addrlen,
145
0
      tcp_connect_timeout);
146
0
}
147
148
int tcp_sync_connect_fd(const union sockaddr_union* src, const union sockaddr_union* dst,
149
                 enum sip_protos proto, const struct tcp_conn_profile *prof, enum si_flags flags, int sock_tos)
150
0
{
151
0
  int s;
152
0
  union sockaddr_union my_name;
153
0
  socklen_t my_name_len;
154
155
0
  s=socket(AF2PF(dst->s.sa_family), SOCK_STREAM, 0);
156
0
  if (s==-1){
157
0
    LM_ERR("socket: (%d) %s\n", errno, strerror(errno));
158
0
    goto error;
159
0
  }
160
161
0
  if (tcp_init_sock_opt(s, prof, flags, sock_tos)<0){
162
0
    LM_ERR("tcp_init_sock_opt failed\n");
163
0
    goto error;
164
0
  }
165
166
0
  if (src) {
167
0
    my_name_len = sockaddru_len(*src);
168
0
    memcpy( &my_name, src, my_name_len);
169
0
    if (!(flags & SI_REUSEPORT))
170
0
      su_setport( &my_name, 0);
171
0
    if (bind(s, &my_name.s, my_name_len )!=0) {
172
0
      LM_ERR("bind failed (%d) %s\n", errno,strerror(errno));
173
0
      goto error;
174
0
    }
175
0
  }
176
177
0
  if (tcp_connect_blocking_timeout(s, &dst->s, sockaddru_len(*dst),
178
0
                  prof->connect_timeout)<0){
179
0
    LM_ERR("tcp_blocking_connect failed\n");
180
0
    goto error;
181
0
  }
182
0
  return s;
183
0
error:
184
  /* close the opened socket */
185
0
  if (s!=-1) close(s);
186
0
  return -1;
187
0
}
188
189
struct tcp_connection* tcp_sync_connect(const struct socket_info* send_sock,
190
               const union sockaddr_union* server, struct tcp_conn_profile *prof,
191
               int *fd, int send2main)
192
0
{
193
0
  struct tcp_connection* con;
194
0
  int s;
195
196
0
  s = tcp_sync_connect_fd(&send_sock->su, server, send_sock->proto, prof, send_sock->flags, send_sock->tos);
197
0
  if (s < 0)
198
0
    return NULL;
199
200
0
  con=tcp_conn_create(s, server, send_sock, prof, S_CONN_OK, send2main);
201
0
  if (con==NULL){
202
0
    LM_ERR("tcp_conn_create failed, closing the socket\n");
203
0
    close(s);
204
0
    return 0;
205
0
  }
206
0
  *fd = s;
207
0
  return con;
208
0
}
209
210
int tcp_async_connect(const struct socket_info* send_sock,
211
            const union sockaddr_union* server, struct tcp_conn_profile *prof,
212
            int timeout, struct tcp_connection** c, int *ret_fd, int send2main)
213
0
{
214
0
  int fd, n;
215
0
  union sockaddr_union my_name;
216
0
  socklen_t my_name_len;
217
0
  struct tcp_connection* con;
218
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
219
  fd_set sel_set;
220
  fd_set orig_set;
221
  struct timeval timeout_val;
222
#else
223
0
  struct pollfd pf;
224
0
#endif
225
0
  unsigned int elapsed,to;
226
0
  int err;
227
0
  unsigned int err_len;
228
0
  int poll_err;
229
0
  char *ip;
230
0
  unsigned short port;
231
0
  struct timeval begin;
232
233
  /* create the socket */
234
0
  fd=socket(AF2PF(server->s.sa_family), SOCK_STREAM, 0);
235
0
  if (fd==-1){
236
0
    LM_ERR("socket: (%d) %s\n", errno, strerror(errno));
237
0
    return -1;
238
0
  }
239
240
0
  if (tcp_init_sock_opt(fd, prof, send_sock->flags, send_sock->tos)<0){
241
0
    LM_ERR("tcp_init_sock_opt failed\n");
242
0
    goto error;
243
0
  }
244
245
0
  my_name_len = sockaddru_len(send_sock->su);
246
0
  memcpy( &my_name, &send_sock->su, my_name_len);
247
0
  if (!(send_sock->flags & SI_REUSEPORT))
248
0
    su_setport( &my_name, 0);
249
0
  if (bind(fd, &my_name.s, my_name_len )!=0) {
250
0
    LM_ERR("bind failed (%d) %s\n", errno,strerror(errno));
251
0
    goto error;
252
0
  }
253
254
  /* attempt to do connect and see if we do block or not */
255
0
  poll_err=0;
256
0
  elapsed = 0;
257
0
  to = timeout*1000;
258
259
0
  if (gettimeofday(&(begin), NULL)) {
260
0
    LM_ERR("Failed to get TCP connect start time\n");
261
0
    goto error;
262
0
  }
263
264
0
again:
265
0
  n=connect(fd, &server->s, sockaddru_len(*server));
266
0
  if (n==-1) {
267
0
    if (errno==EINTR){
268
0
      elapsed=get_time_diff(&begin);
269
0
      if (elapsed<to) goto again;
270
0
      else {
271
0
        LM_DBG("Local connect attempt failed \n");
272
0
        goto async_connect;
273
0
      }
274
0
    }
275
0
    if (errno!=EINPROGRESS && errno!=EALREADY){
276
0
      get_su_info(&server->s, ip, port);
277
0
      LM_ERR("[server=%s:%d] (%d) %s\n",ip, port, errno,strerror(errno));
278
0
      goto error;
279
0
    }
280
0
  } else goto local_connect;
281
282
  /* let's poll for a little */
283
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
284
  FD_ZERO(&orig_set);
285
  FD_SET(fd, &orig_set);
286
#else
287
0
  pf.fd=fd;
288
0
  pf.events=POLLOUT;
289
0
#endif
290
291
0
  while(1){
292
0
    elapsed=get_time_diff(&begin);
293
0
    if (elapsed<to)
294
0
      to-=elapsed;
295
0
    else {
296
0
      LM_DBG("Polling is overdue \n");
297
0
      goto async_connect;
298
0
    }
299
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
300
    sel_set=orig_set;
301
    timeout_val.tv_sec=to/1000000;
302
    timeout_val.tv_usec=to%1000000;
303
    n=select(fd+1, 0, &sel_set, 0, &timeout_val);
304
#else
305
0
    n=poll(&pf, 1, to/1000);
306
0
#endif
307
0
    if (n<0){
308
0
      if (errno==EINTR) continue;
309
0
      get_su_info(&server->s, ip, port);
310
0
      LM_ERR("poll/select failed:[server=%s:%d] (%d) %s\n",
311
0
        ip, port, errno, strerror(errno));
312
0
      goto error;
313
0
    }else if (n==0) /* timeout */ continue;
314
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
315
    if (FD_ISSET(fd, &sel_set))
316
#else
317
0
    if (pf.revents&(POLLERR|POLLHUP|POLLNVAL)){
318
0
      LM_ERR("poll error: flags %x\n", pf.revents);
319
0
      poll_err=1;
320
0
    }
321
0
#endif
322
0
    {
323
0
      err_len=sizeof(err);
324
0
      if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &err_len) != 0) {
325
0
        get_su_info(&server->s, ip, port);
326
0
        LM_WARN("getsockopt error: fd=%d [server=%s:%d]: (%d) %s\n", fd,
327
0
            ip, port, errno, strerror(errno));
328
0
        goto error;
329
0
      }
330
0
      if ((err==0) && (poll_err==0)) goto local_connect;
331
0
      if (err!=EINPROGRESS && err!=EALREADY){
332
0
        get_su_info(&server->s, ip, port);
333
0
        LM_ERR("failed to retrieve SO_ERROR [server=%s:%d] (%d) %s\n",
334
0
          ip, port, err, strerror(err));
335
0
        goto error;
336
0
      }
337
0
    }
338
0
  }
339
340
0
async_connect:
341
0
  LM_DBG("Create connection for async connect\n");
342
  /* create a new dummy connection */
343
0
  con=tcp_conn_create(fd, server, send_sock,
344
0
                      prof, S_CONN_CONNECTING, send2main);
345
0
  if (con==NULL) {
346
0
    LM_ERR("tcp_conn_create failed\n");
347
0
    goto error;
348
0
  }
349
  /* report an async, in progress connect */
350
0
  *c = con;
351
0
  return 0;
352
353
0
local_connect:
354
0
  con=tcp_conn_create(fd, server, send_sock, prof, S_CONN_OK, send2main);
355
0
  if (con==NULL) {
356
0
    LM_ERR("tcp_conn_create failed, closing the socket\n");
357
0
    goto error;
358
0
  }
359
0
  *c = con;
360
0
  *ret_fd = fd;
361
  /* report a local connect */
362
0
  return 1;
363
364
0
error:
365
0
  close(fd);
366
0
  *c = NULL;
367
0
  return -1;
368
0
}
369
370
int tcp_async_write(struct tcp_connection* con,int fd)
371
0
{
372
0
  int n;
373
0
  struct tcp_async_chunk *chunk;
374
375
0
  while ((chunk = tcp_async_get_chunk(con)) != NULL) {
376
0
    LM_DBG("Trying to send %d bytes from chunk %p in conn %p - %d %d \n",
377
0
        chunk->len, chunk, con, chunk->ticks, get_ticks());
378
0
    n=send(fd, chunk->buf, chunk->len,
379
0
#ifdef HAVE_MSG_NOSIGNAL
380
0
        MSG_NOSIGNAL
381
#else
382
        0
383
#endif
384
0
        );
385
386
0
    if (n<0) {
387
0
      if (errno==EINTR)
388
0
        continue;
389
0
      else if (errno==EAGAIN || errno==EWOULDBLOCK) {
390
0
        LM_DBG("Can't finish to write chunk %p on conn %p\n",
391
0
            chunk,con);
392
        /* report back we have more writting to be done */
393
0
        return 1;
394
0
      } else {
395
0
        LM_ERR("Error occurred while sending async chunk %d (%s)\n",
396
0
            errno,strerror(errno));
397
        /* report the conn as broken */
398
0
        return -1;
399
0
      }
400
0
    }
401
0
    tcp_async_update_write(con, n);
402
0
  }
403
0
  return 0;
404
0
}
405
406
/**
407
 * called under the TCP connection write lock, timeout is in milliseconds
408
 *
409
 * @return: -1 or bytes written (if 0 < ret < len: the last bytes are chunked)
410
 */
411
static int tsend_stream_async(struct tcp_connection *c,
412
    int fd, char* buf, unsigned int len, int timeout)
413
0
{
414
0
  int n;
415
0
  struct pollfd pf;
416
417
0
  pf.fd=fd;
418
0
  pf.events=POLLOUT;
419
420
0
again:
421
0
  n=send(fd, buf, len,0);
422
0
  if (n<0){
423
0
    if (errno==EINTR) goto again;
424
0
    else if (errno!=EAGAIN && errno!=EWOULDBLOCK) {
425
0
      LM_ERR("Failed first TCP async send : (%d) %s\n",
426
0
          errno, strerror(errno));
427
0
      return -1;
428
0
    } else
429
0
      goto poll_loop;
430
0
  }
431
432
0
  if (n < len) {
433
    /* partial write */
434
0
    buf += n;
435
0
    len -= n;
436
0
  } else {
437
    /* successful write from the first try */
438
0
    LM_DBG("Async successful write from first try on %p\n",c);
439
0
    return len;
440
0
  }
441
442
0
poll_loop:
443
0
  n = poll(&pf,1,timeout);
444
0
  if (n<0) {
445
0
    if (errno==EINTR)
446
0
      goto poll_loop;
447
0
    LM_ERR("Polling while trying to async send failed %s [%d]\n",
448
0
        strerror(errno), errno);
449
0
    return -1;
450
0
  } else if (n == 0) {
451
0
    LM_DBG("timeout -> do an async write (add it to conn)\n");
452
    /* timeout - let's just pass to main */
453
0
    if (tcp_async_add_chunk(c,buf,len,0) < 0) {
454
0
      LM_ERR("Failed to add write chunk to connection \n");
455
0
      return -1;
456
0
    } else {
457
      /* we have successfully added async write chunk
458
       * tell MAIN to poll out for us */
459
0
      LM_DBG("Data still pending for write on conn %p\n",c);
460
0
      return 0;
461
0
    }
462
0
  }
463
464
0
  if (pf.revents&POLLOUT)
465
0
    goto again;
466
467
  /* some other events triggered by poll - treat as errors */
468
0
  return -1;
469
0
}
470
471
int tcp_write_on_socket(struct tcp_connection* c, int fd,
472
    char *buf, int len, int write_timeout, int async_write_timeout)
473
0
{
474
0
  int n;
475
476
0
  lock_get(&c->write_lock);
477
0
  if (c->async) {
478
    /*
479
     * if there is any data pending to write, we have to wait for those chunks
480
     * to be sent, otherwise we will completely break the messages' order
481
     */
482
0
    if (c->async->pending)
483
0
      n = tcp_async_add_chunk(c, buf, len, 0);
484
0
    else
485
0
      n = tsend_stream_async(c,fd,buf,len, async_write_timeout);
486
0
  } else {
487
0
    n = tsend_stream(fd, buf, len, write_timeout);
488
0
  }
489
0
  lock_release(&c->write_lock);
490
491
0
  return n;
492
0
}
493
494
/* returns :
495
 * 0  - in case of success
496
 * -1 - in case there was an internal error
497
 * -2 - in case our chunks buffer is full
498
 *    and we need to let the connection go
499
 */
500
int tcp_async_add_chunk(struct tcp_connection *con, char *buf,
501
    int len, int lock)
502
0
{
503
0
  struct tcp_async_chunk *c;
504
505
0
  c = shm_malloc(sizeof(struct tcp_async_chunk) + len);
506
0
  if (!c) {
507
0
    LM_ERR("No more SHM\n");
508
0
    return -1;
509
0
  }
510
511
0
  c->len = len;
512
0
  c->ticks = get_ticks();
513
0
  c->buf = (char *)(c+1);
514
0
  memcpy(c->buf,buf,len);
515
516
0
  if (lock)
517
0
    lock_get(&con->write_lock);
518
519
0
  if (con->async->allocated == con->async->pending) {
520
0
    LM_ERR("We have reached the limit of max async postponed chunks %d\n",
521
0
        con->async->pending);
522
0
    if (lock)
523
0
      lock_release(&con->write_lock);
524
0
    shm_free(c);
525
0
    return -2;
526
0
  }
527
528
0
  con->async->chunks[con->async->pending++] = c;
529
0
  if (con->async->pending == 1)
530
0
    con->async->oldest = c->ticks;
531
532
0
  if (lock)
533
0
    lock_release(&con->write_lock);
534
535
0
  return 0;
536
0
}
537
538
539
struct tcp_async_chunk *tcp_async_get_chunk(struct tcp_connection *con)
540
0
{
541
0
  if (con->async->pending == 0)
542
0
    return NULL;
543
0
  return con->async->chunks[0];
544
0
}
545
546
void tcp_async_update_write(struct tcp_connection *con, int len)
547
0
{
548
0
  int i = 0, c;
549
0
  struct tcp_async_chunk *chunk;
550
551
0
  while (len > 0) {
552
0
    chunk = con->async->chunks[i];
553
0
    if (len < chunk->len) {
554
      /* partial write */
555
0
      chunk->len -= len;
556
0
      memmove(chunk->buf, chunk->buf + len, chunk->len);
557
0
      return;
558
0
    } else {
559
      /* written the entire chunk */
560
0
      i++;
561
0
      len -= chunk->len;
562
0
    }
563
0
  }
564
0
  con->async->pending -= i;
565
0
  for (c = 0; c < i; c++)
566
0
    shm_free(con->async->chunks[c]);
567
0
  if (con->async->pending) {
568
0
    LM_DBG("We still have %d chunks pending on %p\n",
569
0
        con->async->pending, con);
570
0
    memmove(con->async->chunks, con->async->chunks + i,
571
0
        con->async->pending * sizeof(struct tcp_async_chunk *));
572
0
    con->async->oldest = con->async->chunks[0]->ticks;
573
0
  } else {
574
0
    LM_DBG("We have finished writing all our async chunks in %p\n", con);
575
0
    con->async->oldest = 0;
576
0
  }
577
0
}