Coverage Report

Created: 2025-07-09 06:29

/src/opensips/net/net_tcp_proc.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2013-2014 OpenSIPS Solutions
3
 * Copyright (C) 2001-2003 FhG Fokus
4
 *
5
 * This file is part of opensips, a free SIP server.
6
 *
7
 * opensips is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version
11
 *
12
 * opensips is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
20
 *
21
 * History:
22
 * --------
23
 * 2015-02-05 imported from former tpc_read.c (bogdan)
24
 */
25
26
27
#include "../pt_load.h"
28
#include "../ipc.h"
29
#include "../timer.h"
30
#include "../reactor.h"
31
#include "../async.h"
32
#include "../cfg_reload.h"
33
34
#include "tcp_conn.h"
35
#include "tcp_passfd.h"
36
#include "net_tcp_report.h"
37
#include "trans.h"
38
#include "net_tcp_dbg.h"
39
40
/*!< the FD currently used by the process to communicate with TCP MAIN*/
41
static int _my_fd_to_tcp_main = -1;
42
43
/*!< list of tcp connections handled by this process */
44
static struct tcp_connection* tcp_conn_lst=0;
45
46
static int _tcp_done_reading_marker = 0;
47
48
static int tcpmain_sock=-1;
49
extern int unix_tcp_sock;
50
51
extern struct struct_hist_list *con_hist;
52
53
#define tcpconn_release_error(_conn, _writer, _reason) \
54
0
  do { \
55
0
    tcp_trigger_report( _conn, TCP_REPORT_CLOSE, _reason);\
56
0
    tcpconn_release( _conn, CONN_ERROR_TCPW, _writer, 1/*as TCP worker*/);\
57
0
  }while(0)
58
59
60
61
62
static void tcpconn_release(struct tcp_connection* c, long state, int writer,
63
                              int as_tcp_worker)
64
0
{
65
0
  long response[2];
66
67
0
  LM_DBG(" releasing con %p, state %ld, fd=%d, id=%d\n",
68
0
      c, state, c->fd, c->id);
69
0
  LM_DBG(" extra_data %p\n", c->extra_data);
70
71
  /* release req & signal the parent */
72
0
  if (!writer)
73
0
    c->proc_id = -1;
74
75
  /* errno==EINTR, EWOULDBLOCK a.s.o todo */
76
0
  response[0]=(long)c;
77
0
  response[1]=state;
78
79
0
  if (send_all( as_tcp_worker?tcpmain_sock:unix_tcp_sock, response,
80
0
  sizeof(response))<=0)
81
0
    LM_ERR("send_all failed state=%ld con=%p\n", state, c);
82
0
}
83
84
85
/* wrapper around internal tcpconn_release() - to be called by functions which
86
 * used tcp_conn_get(), in order to release the connection;
87
 * It does the unref and pushes back (if needed) some update to TCP main;
88
 * right now, it used only from the xxx_send() functions
89
 */
90
void tcp_conn_release(struct tcp_connection* c, int pending_data)
91
0
{
92
0
  if (c->state==S_CONN_BAD) {
93
0
    c->lifetime=0;
94
    /* CONN_ERROR will auto-dec refcnt => we must not call tcpconn_put !!*/
95
0
    tcpconn_release(c, CONN_ERROR_GENW, 1, 0 /*not TCP, but GEN worker*/);
96
0
    return;
97
0
  }
98
0
  if (pending_data) {
99
0
    tcpconn_release(c, ASYNC_WRITE_GENW, 1, 0 /*not TCP, but GEN worker*/);
100
0
    return;
101
0
  }
102
0
  tcpconn_put(c);
103
0
}
104
105
106
int tcp_done_reading(struct tcp_connection* con)
107
0
{
108
0
  if (_tcp_done_reading_marker==0) {
109
0
    reactor_del_all( con->fd, -1, IO_FD_CLOSING );
110
0
    tcpconn_check_del(con);
111
0
    tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
112
0
    if (con->fd!=-1) { close(con->fd); con->fd = -1; }
113
0
    sh_log(con->hist, TCP_SEND2MAIN,
114
0
      "parallel read OK - releasing, ref: %d", con->refcnt);
115
0
    tcpconn_release(con, CONN_RELEASE, 0, 1 /*as TCP proc*/);
116
117
0
    _tcp_done_reading_marker = 1;
118
0
  }
119
120
0
  return 0;
121
0
}
122
123
124
/*! \brief  releases expired connections and cleans up bad ones (state<0) */
125
static void tcp_receive_timeout(void)
126
0
{
127
0
  struct tcp_connection* con;
128
0
  struct tcp_connection* next;
129
0
  unsigned int ticks;
130
131
0
  ticks=get_ticks();
132
0
  for (con=tcp_conn_lst; con; con=next) {
133
0
    next=con->c_next; /* safe for removing */
134
0
    if (con->state<0){   /* kill bad connections */
135
      /* S_CONN_BAD or S_CONN_ERROR, remove it */
136
      /* fd will be closed in tcpconn_release */
137
138
0
      reactor_del_reader(con->fd, -1/*idx*/, IO_FD_CLOSING/*io_flags*/ );
139
0
      tcpconn_check_del(con);
140
0
      tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
141
0
      con->proc_id = -1;
142
0
      con->state=S_CONN_BAD;
143
0
      if (con->fd!=-1) { close(con->fd); con->fd = -1; }
144
0
      sh_log(con->hist, TCP_SEND2MAIN, "state: %d, att: %d",
145
0
             con->state, con->msg_attempts);
146
0
      tcpconn_release_error(con, 0, "Unknown reason");
147
0
      continue;
148
0
    }
149
    /* pass back to Main connections that are inactive (expired) or
150
     * if we are in termination mode (this worker is doing graceful 
151
     * shutdown) and there is no pending data on the conn. */
152
0
    if (con->timeout<=ticks ||
153
0
    (_termination_in_progress && !con->msg_attempts) ){
154
0
      LM_DBG("%p expired - (%d, %d) lt=%d\n",
155
0
          con, con->timeout, ticks,con->lifetime);
156
      /* fd will be closed in tcpconn_release */
157
0
      reactor_del_reader(con->fd, -1/*idx*/, IO_FD_CLOSING/*io_flags*/ );
158
0
      tcpconn_check_del(con);
159
0
      tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
160
161
      /* connection is going to main */
162
0
      con->proc_id = -1;
163
0
      if (con->fd!=-1) { close(con->fd); con->fd = -1; }
164
165
0
      sh_log(con->hist, TCP_SEND2MAIN, "timeout: %d, att: %d",
166
0
             con->timeout, con->msg_attempts);
167
0
      if (con->msg_attempts)
168
0
        tcpconn_release_error(con, 0, "Read timeout with"
169
0
          "incomplete SIP message");
170
0
      else
171
0
        tcpconn_release(con, CONN_RELEASE, 0,  1 /*as TCP proc*/);
172
0
    }
173
0
  }
174
0
}
175
176
177
/*! \brief
178
 *  handle io routine, based on the fd_map type
179
 * (it will be called from reactor_main_loop )
180
 * params:  fm  - pointer to a fd hash entry
181
 *          idx - index in the fd_array (or -1 if not known)
182
 * return: -1 on error, or when we are not interested any more on reads
183
 *            from this fd (e.g.: we are closing it )
184
 *          0 on EAGAIN or when by some other way it is known that no more
185
 *            io events are queued on the fd (the receive buffer is empty).
186
 *            Usefull to detect when there are no more io events queued for
187
 *            sigio_rt, epoll_et, kqueue.
188
 *         >0 on successful read from the fd (when there might be more io
189
 *            queued -- the receive buffer might still be non-empty)
190
 */
191
inline static int handle_io(struct fd_map* fm, int idx,int event_type)
192
0
{
193
0
  int ret=0;
194
0
  int n;
195
0
  struct tcp_connection* con;
196
0
  int s,rw;
197
0
  long resp;
198
0
  long response[2];
199
200
0
  pt_become_active();
201
202
0
  pre_run_handle_script_reload(fm->app_flags);
203
204
0
  switch(fm->type){
205
0
    case F_TIMER_JOB:
206
0
      handle_timer_job();
207
0
      break;
208
0
    case F_SCRIPT_ASYNC:
209
0
      async_script_resume_f( fm->fd, fm->data,
210
0
        (event_type==IO_WATCH_TIMEOUT)?1:0 );
211
0
      break;
212
0
    case F_FD_ASYNC:
213
0
      async_fd_resume( fm->fd, fm->data);
214
0
      break;
215
0
    case F_LAUNCH_ASYNC:
216
0
      async_launch_resume( fm->fd, fm->data);
217
0
      break;
218
0
    case F_IPC:
219
0
      ipc_handle_job(fm->fd);
220
0
      break;
221
0
    case F_TCPMAIN:
222
0
again:
223
0
      ret=n=receive_fd(fm->fd, response, sizeof(response), &s, 0);
224
0
      if (n<0){
225
0
        if (errno == EWOULDBLOCK || errno == EAGAIN){
226
0
          ret=0;
227
0
          break;
228
0
        }else if (errno == EINTR) goto again;
229
0
        else{
230
0
          LM_CRIT("read_fd: %s \n", strerror(errno));
231
0
            abort(); /* big error*/
232
0
        }
233
0
      }
234
0
      if (n==0){
235
0
        LM_WARN("0 bytes read\n");
236
0
        break;
237
0
      }
238
0
      con = (struct tcp_connection *)response[0];
239
0
      rw = (int)response[1];
240
241
0
      if (con==0){
242
0
          LM_CRIT("null pointer\n");
243
0
          break;
244
0
      }
245
0
      if (s==-1) {
246
0
        LM_BUG("read_fd:no fd read for conn %p, rw %d\n", con, rw);
247
        /* FIXME? */
248
0
        goto error;
249
0
      }
250
251
0
      if (!(con->flags & F_CONN_INIT)) {
252
0
        if (protos[con->type].net.stream.conn.init &&
253
0
            protos[con->type].net.stream.conn.init(con) < 0) {
254
0
          LM_ERR("failed to do proto %d specific init for conn %p\n",
255
0
              con->type, con);
256
0
          goto con_error;
257
0
        }
258
0
        con->flags |= F_CONN_INIT;
259
0
      }
260
261
0
      LM_DBG("We have received conn %p with rw %d on fd %d\n",con,rw,s);
262
0
      if (rw & IO_WATCH_READ) {
263
0
        if (tcpconn_list_find(con, tcp_conn_lst)) {
264
0
          LM_CRIT("duplicate connection received: %p, id %d, fd %d, "
265
0
                  "refcnt %d state %d (n=%d)\n", con, con->id,
266
0
                  con->fd, con->refcnt, con->state, n);
267
0
          tcpconn_release_error(con, 0, "Internal duplicate");
268
0
          break; /* try to recover */
269
0
        }
270
271
        /* 0 attempts so far for this SIP MSG */
272
0
        con->msg_attempts = 0;
273
274
        /* must be before reactor_add, as the add might catch some
275
         * already existing events => might call handle_io and
276
         * handle_io might decide to del. the new connection =>
277
         * must be in the list */
278
0
        tcpconn_check_add(con);
279
0
        tcpconn_listadd(tcp_conn_lst, con, c_next, c_prev);
280
        /* pending event on a connection -> prevent premature expiry */
281
0
        tcp_conn_reset_lifetime(con);
282
0
        con->timeout = con->lifetime;
283
0
        if (reactor_add_reader( s, F_TCPCONN, RCT_PRIO_NET, con )<0) {
284
0
          LM_CRIT("failed to add new socket to the fd list\n");
285
0
          tcpconn_check_del(con);
286
0
          tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
287
0
          goto con_error;
288
0
        }
289
290
0
        sh_log(con->hist, TCP_ADD_READER, "add reader fd %d, ref: %d",
291
0
               s, con->refcnt);
292
293
        /* mark that the connection is currently in our process
294
        future writes to this con won't have to acquire FD */
295
0
        con->proc_id = process_no;
296
        /* save FD which is valid in context of this TCP worker */
297
0
        con->fd=s;
298
0
      } else if (rw & IO_WATCH_WRITE) {
299
0
        LM_DBG("Received con for async write %p ref = %d\n",
300
0
          con, con->refcnt);
301
0
        lock_get(&con->write_lock);
302
0
        resp = protos[con->type].net.stream.write( con, s );
303
0
        lock_release(&con->write_lock);
304
0
        if (resp<0) {
305
0
          ret=-1; /* some error occurred */
306
0
          con->state=S_CONN_BAD;
307
0
          sh_log(con->hist, TCP_SEND2MAIN,
308
0
            "handle write, err, state: %d, att: %d",
309
0
            con->state, con->msg_attempts);
310
0
          tcpconn_release_error(con, 1,"Write error");
311
0
          close(s); /* we always close the socket received for writing */
312
0
          break;
313
0
        } else if (resp==1) {
314
0
          sh_log(con->hist, TCP_SEND2MAIN,
315
0
            "handle write, async, state: %d, att: %d",
316
0
            con->state, con->msg_attempts);
317
0
          tcpconn_release(con, ASYNC_WRITE_TCPW, 1,
318
0
            1 /*as TCP proc*/);
319
0
        } else {
320
0
          sh_log(con->hist, TCP_SEND2MAIN,
321
0
            "handle write, ok, state: %d, att: %d",
322
0
            con->state, con->msg_attempts);
323
0
          tcpconn_release(con, CONN_RELEASE_WRITE, 1,
324
0
            1/*as TCP proc*/);
325
0
        }
326
0
        ret = 0;
327
        /* we always close the socket received for writing */
328
0
        close(s);
329
0
      }
330
0
      break;
331
0
    case F_TCPCONN:
332
0
      if (event_type & IO_WATCH_READ) {
333
0
        con=(struct tcp_connection*)fm->data;
334
0
        _tcp_done_reading_marker = 0;
335
0
        resp = protos[con->type].net.stream.read( con, &ret );
336
0
        if (resp<0) {
337
0
          ret=-1; /* some error occurred */
338
0
          con->state=S_CONN_BAD;
339
0
          reactor_del_all( con->fd, idx, IO_FD_CLOSING );
340
0
          tcpconn_check_del(con);
341
0
          tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
342
0
          con->proc_id = -1;
343
0
          if (con->fd!=-1) { close(con->fd); con->fd = -1; }
344
0
          sh_log(con->hist, TCP_SEND2MAIN,
345
0
            "handle read, err, resp: %d, att: %d",
346
0
            resp, con->msg_attempts);
347
0
          tcpconn_release_error(con, 0, "Read error");
348
0
        } else if (resp == 1) {
349
          /* the connection is already released */
350
0
          break;
351
0
        } else if (con->state==S_CONN_EOF) {
352
0
          reactor_del_all( con->fd, idx, IO_FD_CLOSING );
353
0
          tcpconn_check_del(con);
354
0
          tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
355
0
          con->proc_id = -1;
356
0
          if (con->fd!=-1) { close(con->fd); con->fd = -1; }
357
0
          tcp_trigger_report( con, TCP_REPORT_CLOSE,
358
0
            "EOF received");
359
0
          sh_log(con->hist, TCP_SEND2MAIN,
360
0
            "handle read, EOF, resp: %d, att: %d",
361
0
            resp, con->msg_attempts);
362
0
          tcpconn_release(con, CONN_EOF, 0, 1 /*as TCP proc*/);
363
0
        } else {
364
0
          if (con->profile.parallel_read)
365
            /* return the connection if not already */
366
0
            tcp_done_reading( con );
367
0
          break;
368
0
        }
369
0
      }
370
0
      break;
371
0
    case F_NONE:
372
0
      LM_CRIT("empty fd map %p: "
373
0
            "{%d, %d, %p}\n", fm,
374
0
            fm->fd, fm->type, fm->data);
375
0
      goto error;
376
0
    default:
377
0
      LM_CRIT("unknown fd type %d\n", fm->type);
378
0
      goto error;
379
0
  }
380
381
0
  if (_termination_in_progress==1) {
382
    /* force (again) passing back all the active conns */
383
0
    tcp_receive_timeout();
384
    /* check if anything is still left */
385
0
    if (reactor_is_empty()) {
386
0
      LM_WARN("reactor got empty while termination in progress\n");
387
0
      ipc_handle_all_pending_jobs(IPC_FD_READ_SELF);
388
0
      if (reactor_is_empty())
389
0
        dynamic_process_final_exit();
390
0
    }
391
0
  }
392
393
0
  post_run_handle_script_reload();
394
395
0
  pt_become_idle();
396
0
  return ret;
397
0
con_error:
398
0
  con->state=S_CONN_BAD;
399
0
  tcpconn_release_error(con, 0, "Internal error");
400
0
  pt_become_idle();
401
0
  return ret;
402
0
error:
403
0
  pt_become_idle();
404
0
  return -1;
405
0
}
406
407
408
409
int tcp_worker_proc_reactor_init( int unix_sock)
410
0
{
411
  /* init reactor for TCP worker */
412
0
  tcpmain_sock=unix_sock; /* init com. socket */
413
0
  if ( init_worker_reactor( "TCP_worker", RCT_PRIO_MAX)<0 ) {
414
0
    goto error;
415
0
  }
416
417
  /* start watching for the timer jobs */
418
0
  if (reactor_add_reader( timer_fd_out, F_TIMER_JOB, RCT_PRIO_TIMER,NULL)<0){
419
0
    LM_CRIT("failed to add timer pipe_out to reactor\n");
420
0
    goto error;
421
0
  }
422
423
  /* init: start watching for the IPC jobs */
424
0
  if (reactor_add_reader(IPC_FD_READ_SELF, F_IPC, RCT_PRIO_ASYNC, NULL)<0){
425
0
    LM_CRIT("failed to add IPC pipe to reactor\n");
426
0
    goto error;
427
0
  }
428
429
  /* init: start watching for IPC "dispatched" jobs */
430
0
  if (reactor_add_reader(IPC_FD_READ_SHARED, F_IPC, RCT_PRIO_ASYNC, NULL)<0){
431
0
    LM_CRIT("failed to add IPC shared pipe to reactor\n");
432
0
    return -1;
433
0
  }
434
435
  /* add the unix socket */
436
0
  if (reactor_add_reader( tcpmain_sock, F_TCPMAIN, RCT_PRIO_PROC, NULL)<0) {
437
0
    LM_CRIT("failed to add socket to the fd list\n");
438
0
    goto error;
439
0
  }
440
0
  _my_fd_to_tcp_main = tcpmain_sock;
441
442
0
  return 0;
443
0
error:
444
0
  destroy_worker_reactor();
445
0
  return -1;
446
0
}
447
448
void tcp_worker_proc_loop(void)
449
0
{
450
  /* main loop */
451
0
  reactor_main_loop( TCP_CHILD_SELECT_TIMEOUT, error, tcp_receive_timeout());
452
0
  LM_CRIT("exiting...");
453
0
  exit(-1);
454
0
error:
455
0
  destroy_worker_reactor();
456
0
}
457
458
459
void tcp_terminate_worker(void)
460
0
{
461
  /*remove from reactor all the shared fds, so we stop reading from them */
462
463
  /*remove timer jobs pipe */
464
0
  reactor_del_reader( timer_fd_out, -1, 0);
465
466
  /*remove IPC dispatcher pipe */
467
0
  reactor_del_reader( IPC_FD_READ_SHARED, -1, 0);
468
469
  /*remove private IPC pipe */
470
0
  reactor_del_reader( IPC_FD_READ_SELF, -1, 0);
471
472
  /*remove unix sock to TCP main */
473
0
  reactor_del_reader( _my_fd_to_tcp_main, -1, 0);
474
475
0
  _termination_in_progress = 1;
476
477
  /* let's drain the private IPC */
478
0
  ipc_handle_all_pending_jobs(IPC_FD_READ_SELF);
479
480
  /* force passing back all the active conns */
481
0
  tcp_receive_timeout();
482
483
  /* what is left now is the reactor are async fd's, so we need to 
484
   * wait to complete all of them */
485
0
  if (reactor_is_empty())
486
0
    dynamic_process_final_exit();
487
488
  /* the exit will be triggered by the reactor, when empty */
489
0
  LM_INFO("reactor not empty, waiting for pending async/conns\n");
490
0
}
491