Coverage Report

Created: 2026-04-12 06:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/opensips/net/tcp_common.c
Line
Count
Source
1
/*
2
 * Copyright (C) 2019 - OpenSIPS Solutions
3
 *
4
 * This file is part of opensips, a free SIP server.
5
 *
6
 * opensips is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version
10
 *
11
 * opensips is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19
 *
20
 */
21
22
#include "../reactor_defs.h"
23
#include "net_tcp.h"
24
#include "tcp_common.h"
25
#include "tcp_conn_profile.h"
26
#include "proxy_protocol.h"
27
#include "trans.h"
28
#include "../tsend.h"
29
#include "proto_tcp/tcp_common_defs.h"
30
31
0
#define TCP_DEFAULT_ASYNC_CHUNKS 32
32
33
static int tcp_async_init_data(struct tcp_connection *con)
34
0
{
35
0
  int chunks;
36
37
0
  if (con->async)
38
0
    return 0;
39
40
0
  chunks = protos[con->type].net.stream.async_chunks;
41
0
  if (chunks <= 0)
42
0
    chunks = TCP_DEFAULT_ASYNC_CHUNKS;
43
44
0
  con->async = shm_malloc(sizeof(struct tcp_async_data) +
45
0
      chunks * sizeof(struct tcp_async_chunk *));
46
0
  if (!con->async) {
47
0
    LM_ERR("No more SHM for async queue on conn %p / %u\n", con, con->id);
48
0
    return -1;
49
0
  }
50
51
0
  con->async->allocated = chunks;
52
0
  con->async->oldest = 0;
53
0
  con->async->pending = 0;
54
0
  return 0;
55
0
}
56
57
/*! \brief blocking connect on a non-blocking fd; it will timeout after
58
 * tcp_connect_timeout
59
 * if BLOCKING_USE_SELECT and HAVE_SELECT are defined it will internally
60
 * use select() instead of poll (bad if fd > FD_SET_SIZE, poll is preferred)
61
 */
62
int tcp_connect_blocking_timeout(int fd, const struct sockaddr *servaddr,
63
                      socklen_t addrlen, int timeout_ms)
64
0
{
65
0
  int n;
66
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
67
  fd_set sel_set;
68
  fd_set orig_set;
69
  struct timeval timeout;
70
#else
71
0
  struct pollfd pf;
72
0
#endif
73
0
  int elapsed;
74
0
  int to;
75
0
  int err;
76
0
  struct timeval begin;
77
0
  unsigned int err_len;
78
0
  int poll_err;
79
0
  char *ip;
80
0
  unsigned short port;
81
82
0
  poll_err=0;
83
0
  to = timeout_ms * 1000;
84
85
0
  if (gettimeofday(&(begin), NULL)) {
86
0
    LM_ERR("Failed to get TCP connect start time\n");
87
0
    goto error;
88
0
  }
89
90
0
again:
91
0
  n=connect(fd, servaddr, addrlen);
92
0
  if (n==-1){
93
0
    if (errno==EINTR){
94
0
      elapsed=get_time_diff(&begin);
95
0
      if (elapsed<to) goto again;
96
0
      else goto error_timeout;
97
0
    }
98
0
    if (errno!=EINPROGRESS && errno!=EALREADY){
99
0
      get_su_info( servaddr, ip, port);
100
0
      LM_ERR("[server=%s:%d] (%d) %s\n",ip, port, errno, strerror(errno));
101
0
      goto error;
102
0
    }
103
0
  }else goto end;
104
105
  /* poll/select loop */
106
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
107
    FD_ZERO(&orig_set);
108
    FD_SET(fd, &orig_set);
109
#else
110
0
    pf.fd=fd;
111
0
    pf.events=POLLOUT;
112
0
#endif
113
0
  while(1){
114
0
    elapsed = get_time_diff(&begin);
115
0
    if (elapsed<to)
116
0
      to-=elapsed;
117
0
    else
118
0
      goto error_timeout;
119
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
120
    sel_set=orig_set;
121
    timeout.tv_sec = to/1000000;
122
    timeout.tv_usec = to%1000000;
123
    n=select(fd+1, 0, &sel_set, 0, &timeout);
124
#else
125
0
    n=poll(&pf, 1, to/1000);
126
0
#endif
127
0
    if (n<0){
128
0
      if (errno==EINTR) continue;
129
0
      get_su_info( servaddr, ip, port);
130
0
      LM_ERR("poll/select failed:[server=%s:%d] (%d) %s\n",
131
0
        ip, port, errno, strerror(errno));
132
0
      goto error;
133
0
    }else if (n==0) /* timeout */ continue;
134
#if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
135
    if (FD_ISSET(fd, &sel_set))
136
#else
137
0
    if (pf.revents&(POLLERR|POLLHUP|POLLNVAL)){
138
0
      LM_ERR("poll error: flags %d - %d %d %d %d \n", pf.revents,
139
0
           POLLOUT,POLLERR,POLLHUP,POLLNVAL);
140
0
      poll_err=1;
141
0
    }
142
0
#endif
143
0
    {
144
0
      err_len=sizeof(err);
145
0
      if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &err_len) != 0) {
146
0
        get_su_info( servaddr, ip, port);
147
0
        LM_WARN("getsockopt error: fd=%d [server=%s:%d]: (%d) %s\n", fd,
148
0
            ip, port, errno, strerror(errno));
149
0
        goto error;
150
0
      }
151
0
      if ((err==0) && (poll_err==0)) goto end;
152
0
      if (err!=EINPROGRESS && err!=EALREADY){
153
0
        get_su_info( servaddr, ip, port);
154
0
        LM_ERR("failed to retrieve SO_ERROR [server=%s:%d] (%d) %s\n",
155
0
          ip, port, err, strerror(err));
156
0
        goto error;
157
0
      }
158
0
    }
159
0
  }
160
0
error_timeout:
161
  /* timeout */
162
0
  LM_ERR("connect timed out, %d us elapsed out of %d us\n", elapsed,
163
0
    timeout_ms * 1000);
164
0
error:
165
0
  return -1;
166
0
end:
167
0
  return 0;
168
0
}
169
170
int tcp_connect_blocking(int fd, const struct sockaddr *servaddr,
171
                              socklen_t addrlen)
172
0
{
173
0
  return tcp_connect_blocking_timeout(fd, servaddr, addrlen,
174
0
      tcp_connect_timeout);
175
0
}
176
177
int tcp_sync_connect_fd(const union sockaddr_union* src, const union sockaddr_union* dst,
178
                 enum sip_protos proto, const struct tcp_conn_profile *prof, enum si_flags flags, int sock_tos)
179
0
{
180
0
  int s;
181
0
  union sockaddr_union my_name;
182
0
  socklen_t my_name_len;
183
184
0
  s=socket(AF2PF(dst->s.sa_family), SOCK_STREAM, 0);
185
0
  if (s==-1){
186
0
    LM_ERR("socket: (%d) %s\n", errno, strerror(errno));
187
0
    goto error;
188
0
  }
189
190
0
  if (tcp_init_sock_opt(s, prof, flags, sock_tos)<0){
191
0
    LM_ERR("tcp_init_sock_opt failed\n");
192
0
    goto error;
193
0
  }
194
195
0
  if (src) {
196
0
    my_name_len = sockaddru_len(*src);
197
0
    memcpy( &my_name, src, my_name_len);
198
0
    if (!(flags & SI_REUSEPORT))
199
0
      su_setport( &my_name, 0);
200
0
    if (bind(s, &my_name.s, my_name_len )!=0) {
201
0
      LM_ERR("bind failed (%d) %s\n", errno,strerror(errno));
202
0
      goto error;
203
0
    }
204
0
  }
205
206
0
  if (tcp_connect_blocking_timeout(s, &dst->s, sockaddru_len(*dst),
207
0
                  prof->connect_timeout)<0){
208
0
    LM_ERR("tcp_blocking_connect failed\n");
209
0
    goto error;
210
0
  }
211
0
  return s;
212
0
error:
213
  /* close the opened socket */
214
0
  if (s!=-1) close(s);
215
0
  return -1;
216
0
}
217
218
struct tcp_connection* tcp_sync_connect(const struct socket_info* send_sock,
219
               const union sockaddr_union* server, struct tcp_conn_profile *prof,
220
               int *fd)
221
0
{
222
0
  struct tcp_connection* con;
223
224
0
  *fd = -1;
225
0
  con = tcp_conn_create(server, send_sock, prof, S_CONN_CONNECTING);
226
0
  if (!con)
227
0
    LM_ERR("tcp_conn_create failed\n");
228
0
  return con;
229
0
}
230
231
int tcp_async_connect(const struct socket_info* send_sock,
232
            const union sockaddr_union* server, struct tcp_conn_profile *prof,
233
            struct tcp_connection** c, int *ret_fd)
234
0
{
235
0
  struct tcp_connection* con;
236
237
0
  *ret_fd = -1;
238
0
  con = tcp_conn_create(server, send_sock, prof, S_CONN_CONNECTING);
239
0
  if (con == NULL) {
240
0
    LM_ERR("tcp_conn_create failed\n");
241
0
    return -1;
242
0
  }
243
0
  *c = con;
244
0
  return 0;
245
0
}
246
247
int tcp_async_write(struct tcp_connection* con,int fd)
248
0
{
249
0
  int n;
250
0
  struct tcp_async_chunk *chunk;
251
252
0
  n = send_stream_proxy_protocol_v1(con, fd, 0,
253
0
      0, NULL, "TCP");
254
0
  if (n < 0)
255
0
    return -1;
256
0
  if (n > 0)
257
0
    return 1;
258
259
0
  while ((chunk = tcp_async_get_chunk(con)) != NULL) {
260
0
    LM_DBG("Trying to send %d bytes from chunk %p in conn %p - %d %d \n",
261
0
        chunk->len, chunk, con, chunk->ticks, get_ticks());
262
0
    n=send(fd, chunk->buf, chunk->len,
263
0
#ifdef HAVE_MSG_NOSIGNAL
264
0
        MSG_NOSIGNAL
265
#else
266
        0
267
#endif
268
0
        );
269
270
0
    if (n<0) {
271
0
      if (errno==EINTR)
272
0
        continue;
273
0
      else if (errno==EAGAIN || errno==EWOULDBLOCK) {
274
0
        LM_DBG("Can't finish to write chunk %p on conn %p\n",
275
0
            chunk,con);
276
        /* report back we have more writting to be done */
277
0
        return 1;
278
0
      } else {
279
0
        LM_ERR("Error occurred while sending async chunk %d (%s)\n",
280
0
            errno,strerror(errno));
281
        /* report the conn as broken */
282
0
        return -1;
283
0
      }
284
0
    }
285
0
    tcp_async_update_write(con, n);
286
0
    tcp_conn_reset_lifetime(con);
287
0
  }
288
0
  return 0;
289
0
}
290
291
/**
292
 * called under the TCP connection write lock, timeout is in milliseconds
293
 *
294
 * @return: -1 or bytes written (if 0 < ret < len: the last bytes are chunked)
295
 */
296
static int tsend_stream_async(struct tcp_connection *c,
297
    int fd, char* buf, unsigned int len, int timeout)
298
0
{
299
0
  int n;
300
0
  struct pollfd pf;
301
302
0
  pf.fd=fd;
303
0
  pf.events=POLLOUT;
304
305
0
again:
306
0
  n=send(fd, buf, len,0);
307
0
  if (n<0){
308
0
    if (errno==EINTR) goto again;
309
0
    else if (errno!=EAGAIN && errno!=EWOULDBLOCK) {
310
0
      LM_ERR("Failed first TCP async send : (%d) %s\n",
311
0
          errno, strerror(errno));
312
0
      return -1;
313
0
    } else
314
0
      goto poll_loop;
315
0
  }
316
317
0
  if (n < len) {
318
    /* partial write */
319
0
    buf += n;
320
0
    len -= n;
321
0
  } else {
322
    /* successful write from the first try */
323
0
    LM_DBG("Async successful write from first try on %p\n",c);
324
0
    return len;
325
0
  }
326
327
0
poll_loop:
328
0
  n = poll(&pf,1,timeout);
329
0
  if (n<0) {
330
0
    if (errno==EINTR)
331
0
      goto poll_loop;
332
0
    LM_ERR("Polling while trying to async send failed %s [%d]\n",
333
0
        strerror(errno), errno);
334
0
    return -1;
335
0
  } else if (n == 0) {
336
0
    LM_DBG("timeout -> do an async write (add it to conn)\n");
337
    /* timeout - let's just pass to main */
338
0
    if (tcp_async_add_chunk(c,buf,len,0) < 0) {
339
0
      LM_ERR("Failed to add write chunk to connection \n");
340
0
      return -1;
341
0
    } else {
342
      /* we have successfully added async write chunk
343
       * tell MAIN to poll out for us */
344
0
      LM_DBG("Data still pending for write on conn %p\n",c);
345
0
      return 0;
346
0
    }
347
0
  }
348
349
0
  if (pf.revents&POLLOUT)
350
0
    goto again;
351
352
  /* some other events triggered by poll - treat as errors */
353
0
  return -1;
354
0
}
355
356
int tcp_write_on_socket(struct tcp_connection* c, int fd,
357
    char *buf, int len, int write_timeout, int async_write_timeout)
358
0
{
359
0
  int n;
360
361
0
  lock_get(&c->write_lock);
362
0
  if (fd < 0) {
363
0
    n = tcp_async_add_chunk(c, buf, len, 0);
364
0
    if (n == 0)
365
0
      n = len;
366
0
  } else if (c->async) {
367
    /*
368
     * if there is any data pending to write, we have to wait for those chunks
369
     * to be sent, otherwise we will completely break the messages' order
370
     */
371
0
    if (c->async->pending)
372
0
      n = tcp_async_add_chunk(c, buf, len, 0);
373
0
    else
374
0
      n = tsend_stream_async(c,fd,buf,len, async_write_timeout);
375
0
  } else {
376
0
    n = tsend_stream(fd, buf, len, write_timeout);
377
0
  }
378
0
  lock_release(&c->write_lock);
379
0
  if (fd >= 0 && n > 0)
380
0
    tcp_conn_reset_lifetime(c);
381
382
0
  return n;
383
0
}
384
385
/* returns :
386
 * 0  - in case of success
387
 * -1 - in case there was an internal error
388
 * -2 - in case our chunks buffer is full
389
 *    and we need to let the connection go
390
 */
391
int tcp_async_add_chunk(struct tcp_connection *con, char *buf,
392
    int len, int lock)
393
0
{
394
0
  struct tcp_async_chunk *c;
395
396
0
  if (lock)
397
0
    lock_get(&con->write_lock);
398
399
0
  if (tcp_async_init_data(con) < 0) {
400
0
    if (lock)
401
0
      lock_release(&con->write_lock);
402
0
    return -1;
403
0
  }
404
405
0
  c = shm_malloc(sizeof(struct tcp_async_chunk) + len);
406
0
  if (!c) {
407
0
    if (lock)
408
0
      lock_release(&con->write_lock);
409
0
    LM_ERR("No more SHM\n");
410
0
    return -1;
411
0
  }
412
413
0
  c->len = len;
414
0
  c->ticks = get_ticks();
415
0
  c->buf = (char *)(c+1);
416
0
  memcpy(c->buf,buf,len);
417
418
0
  if (con->async->allocated == con->async->pending) {
419
0
    LM_ERR("We have reached the limit of max async postponed chunks %d "
420
0
      "on conn %p / %u\n", con->async->pending, con, con->id);
421
0
    if (lock)
422
0
      lock_release(&con->write_lock);
423
0
    shm_free(c);
424
0
    return -2;
425
0
  }
426
427
0
  con->async->chunks[con->async->pending++] = c;
428
0
  if (con->async->pending == 1)
429
0
    con->async->oldest = c->ticks;
430
431
0
  if (lock)
432
0
    lock_release(&con->write_lock);
433
434
0
  return 0;
435
0
}
436
437
int tcp_async_add_chunks(struct tcp_connection *con, const struct iovec *iov,
438
    int iovcnt, int lock)
439
0
{
440
0
  int i;
441
0
  int rc = 0;
442
443
0
  if (lock)
444
0
    lock_get(&con->write_lock);
445
446
0
  if (tcp_async_init_data(con) < 0) {
447
0
    rc = -1;
448
0
    goto out;
449
0
  }
450
451
0
  for (i = 0; i < iovcnt; i++) {
452
0
    if (iov[i].iov_len == 0)
453
0
      continue;
454
0
    rc = tcp_async_add_chunk(con, iov[i].iov_base, iov[i].iov_len, 0);
455
0
    if (rc < 0)
456
0
      break;
457
0
  }
458
459
0
out:
460
0
  if (lock)
461
0
    lock_release(&con->write_lock);
462
0
  return rc;
463
0
}
464
465
466
struct tcp_async_chunk *tcp_async_get_chunk(struct tcp_connection *con)
467
0
{
468
0
  if (con->async->pending == 0)
469
0
    return NULL;
470
0
  return con->async->chunks[0];
471
0
}
472
473
void tcp_async_update_write(struct tcp_connection *con, int len)
474
0
{
475
0
  int i = 0, c;
476
0
  struct tcp_async_chunk *chunk;
477
478
0
  while (len > 0) {
479
0
    chunk = con->async->chunks[i];
480
0
    if (len < chunk->len) {
481
      /* partial write */
482
0
      chunk->len -= len;
483
0
      memmove(chunk->buf, chunk->buf + len, chunk->len);
484
0
      return;
485
0
    } else {
486
      /* written the entire chunk */
487
0
      i++;
488
0
      len -= chunk->len;
489
0
    }
490
0
  }
491
0
  con->async->pending -= i;
492
0
  for (c = 0; c < i; c++)
493
0
    shm_free(con->async->chunks[c]);
494
0
  if (con->async->pending) {
495
0
    LM_DBG("We still have %d chunks pending on %p\n",
496
0
        con->async->pending, con);
497
0
    memmove(con->async->chunks, con->async->chunks + i,
498
0
        con->async->pending * sizeof(struct tcp_async_chunk *));
499
0
    con->async->oldest = con->async->chunks[0]->ticks;
500
0
  } else {
501
0
    LM_DBG("We have finished writing all our async chunks in %p\n", con);
502
0
    con->async->oldest = 0;
503
0
  }
504
0
}