Coverage Report

Created: 2026-02-26 07:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/opensips/net/net_tcp_proc.c
Line
Count
Source
1
/*
2
 * Copyright (C) 2013-2014 OpenSIPS Solutions
3
 * Copyright (C) 2001-2003 FhG Fokus
4
 *
5
 * This file is part of opensips, a free SIP server.
6
 *
7
 * opensips is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 2 of the License, or
10
 * (at your option) any later version
11
 *
12
 * opensips is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License
18
 * along with this program; if not, write to the Free Software
19
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
20
 *
21
 * History:
22
 * --------
23
 * 2015-02-05 imported from former tpc_read.c (bogdan)
24
 */
25
26
27
#include "../pt_load.h"
28
#include "../ipc.h"
29
#include "../timer.h"
30
#include "../reactor.h"
31
#include "../async.h"
32
#include "../cfg_reload.h"
33
34
#include "tcp_conn.h"
35
#include "tcp_passfd.h"
36
#include "net_tcp_report.h"
37
#include "trans.h"
38
#include "net_tcp_dbg.h"
39
40
/*!< the FD currently used by the process to communicate with TCP MAIN*/
41
static int _my_fd_to_tcp_main = -1;
42
43
/*!< list of tcp connections handled by this process */
44
static struct tcp_connection* tcp_conn_lst=0;
45
46
static int _tcp_done_reading_marker = 0;
47
48
static int tcpmain_sock=-1;
49
extern int unix_tcp_sock;
50
51
extern struct struct_hist_list *con_hist;
52
53
#define tcpconn_release_error(_conn, _writer, _reason) \
54
0
  do { \
55
0
    tcp_trigger_report( _conn, TCP_REPORT_CLOSE, _reason);\
56
0
    tcpconn_release( _conn, CONN_ERROR_TCPW, _writer, 1/*as TCP worker*/);\
57
0
  }while(0)
58
59
60
61
62
static void tcpconn_release(struct tcp_connection* c, long state, int writer,
63
                              int as_tcp_worker)
64
0
{
65
0
  long response[2];
66
67
0
  LM_DBG(" releasing con %p, state %ld, fd=%d, id=%d\n",
68
0
      c, state, c->fd, c->id);
69
0
  LM_DBG(" extra_data %p\n", c->extra_data);
70
71
  /* release req & signal the parent */
72
0
  if (!writer)
73
0
    c->proc_id = -1;
74
75
  /* errno==EINTR, EWOULDBLOCK a.s.o todo */
76
0
  response[0]=(long)c;
77
0
  response[1]=state;
78
79
0
  if (send_all( as_tcp_worker?tcpmain_sock:unix_tcp_sock, response,
80
0
  sizeof(response))<=0)
81
0
    LM_ERR("send_all failed state=%ld con=%p\n", state, c);
82
0
}
83
84
85
/* wrapper around internal tcpconn_release() - to be called by functions which
86
 * used tcp_conn_get(), in order to release the connection;
87
 * It does the unref and pushes back (if needed) some update to TCP main;
88
 * right now, it used only from the xxx_send() functions
89
 */
90
void tcp_conn_release(struct tcp_connection* c, int pending_data)
91
0
{
92
0
  if (c->state==S_CONN_BAD) {
93
    /* do more or less nothing, let the TCP READER owning the conn
94
     * to trash it based on the S_CONN_BAD marker */
95
0
    c->lifetime=0;
96
    /* but be sure we unref the conn */
97
0
    tcpconn_put(c);
98
0
    return;
99
0
  }
100
0
  if (pending_data) {
101
0
    tcpconn_release(c, ASYNC_WRITE_GENW, 1, 0 /*not TCP, but GEN worker*/);
102
0
    return;
103
0
  }
104
0
  tcpconn_put(c);
105
0
}
106
107
108
int tcp_done_reading(struct tcp_connection* con)
109
0
{
110
0
  if (_tcp_done_reading_marker==0) {
111
0
    reactor_del_all( con->fd, -1, IO_FD_CLOSING );
112
0
    tcpconn_check_del(con);
113
0
    tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
114
0
    if (con->fd!=-1) { close(con->fd); con->fd = -1; }
115
0
    sh_log(con->hist, TCP_SEND2MAIN,
116
0
      "parallel read OK - releasing, ref: %d", con->refcnt);
117
0
    tcpconn_release(con, CONN_RELEASE, 0, 1 /*as TCP proc*/);
118
119
0
    _tcp_done_reading_marker = 1;
120
0
  }
121
122
0
  return 0;
123
0
}
124
125
126
/*! \brief  releases expired connections and cleans up bad ones (state<0) */
127
static void tcp_receive_timeout(void)
128
0
{
129
0
  struct tcp_connection* con;
130
0
  struct tcp_connection* next;
131
0
  unsigned int ticks;
132
133
0
  ticks=get_ticks();
134
0
  for (con=tcp_conn_lst; con; con=next) {
135
0
    next=con->c_next; /* safe for removing */
136
0
    if (con->state<0){   /* kill bad connections */
137
      /* S_CONN_BAD or S_CONN_ERROR, remove it */
138
      /* fd will be closed in tcpconn_release */
139
0
      LM_ERR("TCP_DBG - conn %p / %u found as bad, relasing back "
140
0
        "to main\n", con, con->id);
141
142
0
      reactor_del_reader(con->fd, -1/*idx*/, IO_FD_CLOSING/*io_flags*/ );
143
0
      tcpconn_check_del(con);
144
0
      tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
145
0
      con->proc_id = -1;
146
0
      con->state=S_CONN_BAD;
147
0
      if (con->fd!=-1) { close(con->fd); con->fd = -1; }
148
0
      sh_log(con->hist, TCP_SEND2MAIN, "state: %d, att: %d, ref: %d",
149
0
             con->state, con->msg_attempts, con->refcnt);
150
0
      tcpconn_release_error(con, 0, "Unknown reason");
151
0
      continue;
152
0
    }
153
    /* pass back to Main connections that are inactive (expired) or
154
     * if we are in termination mode (this worker is doing graceful 
155
     * shutdown) and there is no pending data on the conn. */
156
0
    if (con->timeout<=ticks ||
157
0
    (_termination_in_progress && !con->msg_attempts) ){
158
0
      LM_DBG("%p expired - (%d, %d) lt=%d\n",
159
0
          con, con->timeout, ticks,con->lifetime);
160
      /* fd will be closed in tcpconn_release */
161
0
      reactor_del_reader(con->fd, -1/*idx*/, IO_FD_CLOSING/*io_flags*/ );
162
0
      tcpconn_check_del(con);
163
0
      tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
164
165
      /* connection is going to main */
166
0
      con->proc_id = -1;
167
0
      if (con->fd!=-1) { close(con->fd); con->fd = -1; }
168
169
0
      sh_log(con->hist, TCP_SEND2MAIN, "timeout: %d, att: %d",
170
0
             con->timeout, con->msg_attempts);
171
0
      if (con->msg_attempts)
172
0
        tcpconn_release_error(con, 0, "Read timeout with"
173
0
          "incomplete SIP message");
174
0
      else
175
0
        tcpconn_release(con, CONN_RELEASE, 0,  1 /*as TCP proc*/);
176
0
    }
177
0
  }
178
0
}
179
180
181
/*! \brief
182
 *  handle io routine, based on the fd_map type
183
 * (it will be called from reactor_main_loop )
184
 * params:  fm  - pointer to a fd hash entry
185
 *          idx - index in the fd_array (or -1 if not known)
186
 * return: -1 on error, or when we are not interested any more on reads
187
 *            from this fd (e.g.: we are closing it )
188
 *          0 on EAGAIN or when by some other way it is known that no more
189
 *            io events are queued on the fd (the receive buffer is empty).
190
 *            Usefull to detect when there are no more io events queued for
191
 *            sigio_rt, epoll_et, kqueue.
192
 *         >0 on successful read from the fd (when there might be more io
193
 *            queued -- the receive buffer might still be non-empty)
194
 */
195
inline static int handle_io(struct fd_map* fm, int idx,int event_type)
196
0
{
197
0
  int ret=0;
198
0
  int n;
199
0
  struct tcp_connection* con;
200
0
  int s,rw;
201
0
  long resp;
202
0
  long response[2];
203
204
0
  pt_become_active();
205
206
0
  pre_run_handle_script_reload(fm->app_flags);
207
208
0
  switch(fm->type){
209
0
    case F_TIMER_JOB:
210
0
      handle_timer_job();
211
0
      break;
212
0
    case F_SCRIPT_ASYNC:
213
0
      async_script_resume_f( fm->fd, fm->data,
214
0
        (event_type==IO_WATCH_TIMEOUT)?1:0 );
215
0
      break;
216
0
    case F_FD_ASYNC:
217
0
      async_fd_resume( fm->fd, fm->data);
218
0
      break;
219
0
    case F_LAUNCH_ASYNC:
220
0
      async_launch_resume( fm->fd, fm->data);
221
0
      break;
222
0
    case F_IPC:
223
0
      ipc_handle_job(fm->fd);
224
0
      break;
225
0
    case F_TCPMAIN:
226
0
again:
227
0
      ret=n=receive_fd(fm->fd, response, sizeof(response), &s, 0);
228
0
      if (n<0){
229
0
        if (errno == EWOULDBLOCK || errno == EAGAIN){
230
0
          ret=0;
231
0
          break;
232
0
        }else if (errno == EINTR) goto again;
233
0
        else{
234
0
          LM_CRIT("read_fd: %s \n", strerror(errno));
235
0
            abort(); /* big error*/
236
0
        }
237
0
      }
238
0
      if (n==0){
239
0
        LM_WARN("0 bytes read\n");
240
0
        break;
241
0
      }
242
0
      con = (struct tcp_connection *)response[0];
243
0
      rw = (int)response[1];
244
245
0
      if (con==0){
246
0
          LM_CRIT("null pointer\n");
247
0
          break;
248
0
      }
249
0
      if (s==-1) {
250
0
        LM_BUG("read_fd:no fd read for conn %p, rw %d\n", con, rw);
251
        /* FIXME? */
252
0
        goto error;
253
0
      }
254
255
0
      if (!(con->flags & F_CONN_INIT)) {
256
0
        if (protos[con->type].net.stream.conn.init &&
257
0
            protos[con->type].net.stream.conn.init(con) < 0) {
258
0
          LM_ERR("failed to do proto %d specific init for conn %p\n",
259
0
              con->type, con);
260
0
          goto con_error;
261
0
        }
262
0
        con->flags |= F_CONN_INIT;
263
0
      }
264
265
0
      LM_DBG("We have received conn %p with rw %d on fd %d\n",con,rw,s);
266
0
      if (rw & IO_WATCH_READ) {
267
0
        if (tcpconn_list_find(con, tcp_conn_lst)) {
268
0
          LM_CRIT("duplicate connection received: %p, id %d, fd %d, "
269
0
                  "refcnt %d state %d (n=%d)\n", con, con->id,
270
0
                  con->fd, con->refcnt, con->state, n);
271
0
          tcpconn_release_error(con, 0, "Internal duplicate");
272
0
          break; /* try to recover */
273
0
        }
274
275
        /* 0 attempts so far for this SIP MSG */
276
0
        con->msg_attempts = 0;
277
278
        /* must be before reactor_add, as the add might catch some
279
         * already existing events => might call handle_io and
280
         * handle_io might decide to del. the new connection =>
281
         * must be in the list */
282
0
        tcpconn_check_add(con);
283
0
        tcpconn_listadd(tcp_conn_lst, con, c_next, c_prev);
284
        /* pending event on a connection -> prevent premature expiry */
285
0
        tcp_conn_reset_lifetime(con);
286
0
        con->timeout = con->lifetime;
287
0
        if (reactor_add_reader( s, F_TCPCONN, RCT_PRIO_NET, con )<0) {
288
0
          LM_CRIT("failed to add new socket to the fd list\n");
289
0
          tcpconn_check_del(con);
290
0
          tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
291
0
          goto con_error;
292
0
        }
293
294
0
        sh_log(con->hist, TCP_ADD_READER, "add reader fd %d, ref: %d",
295
0
               s, con->refcnt);
296
297
        /* mark that the connection is currently in our process
298
        future writes to this con won't have to acquire FD */
299
0
        con->proc_id = process_no;
300
        /* save FD which is valid in context of this TCP worker */
301
0
        con->fd=s;
302
0
      } else if (rw & IO_WATCH_WRITE) {
303
0
        LM_DBG("Received con for async write %p ref = %d\n",
304
0
          con, con->refcnt);
305
0
        lock_get(&con->write_lock);
306
0
        resp = protos[con->type].net.stream.write( con, s );
307
0
        lock_release(&con->write_lock);
308
0
        if (resp<0) {
309
0
          ret=-1; /* some error occurred */
310
0
          con->state=S_CONN_BAD;
311
0
          sh_log(con->hist, TCP_SEND2MAIN,
312
0
            "handle write, err, state: %d, att: %d",
313
0
            con->state, con->msg_attempts);
314
0
          tcpconn_release_error(con, 1,"Write error");
315
0
          close(s); /* we always close the socket received for writing */
316
0
          break;
317
0
        } else if (resp==1) {
318
0
          sh_log(con->hist, TCP_SEND2MAIN,
319
0
            "handle write, async, state: %d, att: %d",
320
0
            con->state, con->msg_attempts);
321
0
          tcpconn_release(con, ASYNC_WRITE_TCPW, 1,
322
0
            1 /*as TCP proc*/);
323
0
        } else {
324
0
          sh_log(con->hist, TCP_SEND2MAIN,
325
0
            "handle write, ok, state: %d, att: %d",
326
0
            con->state, con->msg_attempts);
327
0
          tcpconn_release(con, CONN_RELEASE_WRITE, 1,
328
0
            1/*as TCP proc*/);
329
0
        }
330
0
        ret = 0;
331
        /* we always close the socket received for writing */
332
0
        close(s);
333
0
      }
334
0
      break;
335
0
    case F_TCPCONN:
336
0
      if (event_type & IO_WATCH_READ) {
337
0
        con=(struct tcp_connection*)fm->data;
338
0
        _tcp_done_reading_marker = 0;
339
0
        resp = protos[con->type].net.stream.read( con, &ret );
340
0
        if (resp<0) {
341
0
          ret=-1; /* some error occurred */
342
0
          con->state=S_CONN_BAD;
343
0
          reactor_del_all( con->fd, idx, IO_FD_CLOSING );
344
0
          tcpconn_check_del(con);
345
0
          tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
346
0
          con->proc_id = -1;
347
0
          if (con->fd!=-1) { close(con->fd); con->fd = -1; }
348
0
          sh_log(con->hist, TCP_SEND2MAIN,
349
0
            "handle read, err, resp: %d, att: %d",
350
0
            resp, con->msg_attempts);
351
0
          tcpconn_release_error(con, 0, "Read error");
352
0
        } else if (resp == 1) {
353
          /* the connection is already released */
354
0
          break;
355
0
        } else if (con->state==S_CONN_EOF) {
356
0
          reactor_del_all( con->fd, idx, IO_FD_CLOSING );
357
0
          tcpconn_check_del(con);
358
0
          tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
359
0
          con->proc_id = -1;
360
0
          if (con->fd!=-1) { close(con->fd); con->fd = -1; }
361
0
          tcp_trigger_report( con, TCP_REPORT_CLOSE,
362
0
            "EOF received");
363
0
          sh_log(con->hist, TCP_SEND2MAIN,
364
0
            "handle read, EOF, resp: %d, att: %d",
365
0
            resp, con->msg_attempts);
366
0
          tcpconn_release(con, CONN_EOF, 0, 1 /*as TCP proc*/);
367
0
        } else {
368
0
          if (con->profile.parallel_read)
369
            /* return the connection if not already */
370
0
            tcp_done_reading( con );
371
0
          break;
372
0
        }
373
0
      }
374
0
      break;
375
0
    case F_NONE:
376
0
      LM_CRIT("empty fd map %p: "
377
0
            "{%d, %d, %p}\n", fm,
378
0
            fm->fd, fm->type, fm->data);
379
0
      goto error;
380
0
    default:
381
0
      LM_CRIT("unknown fd type %d\n", fm->type);
382
0
      goto error;
383
0
  }
384
385
0
  if (_termination_in_progress==1) {
386
    /* force (again) passing back all the active conns */
387
0
    tcp_receive_timeout();
388
    /* check if anything is still left */
389
0
    if (reactor_is_empty()) {
390
0
      LM_WARN("reactor got empty while termination in progress\n");
391
0
      ipc_handle_all_pending_jobs(IPC_FD_READ_SELF);
392
0
      if (reactor_is_empty())
393
0
        dynamic_process_final_exit();
394
0
    }
395
0
  }
396
397
0
  post_run_handle_script_reload();
398
399
0
  pt_become_idle();
400
0
  return ret;
401
0
con_error:
402
0
  con->state=S_CONN_BAD;
403
0
  tcpconn_release_error(con, 0, "Internal error");
404
0
  pt_become_idle();
405
0
  return ret;
406
0
error:
407
0
  pt_become_idle();
408
0
  return -1;
409
0
}
410
411
412
413
int tcp_worker_proc_reactor_init( int unix_sock)
414
0
{
415
  /* init reactor for TCP worker */
416
0
  tcpmain_sock=unix_sock; /* init com. socket */
417
0
  if ( init_worker_reactor( "TCP_worker", RCT_PRIO_MAX)<0 ) {
418
0
    goto error;
419
0
  }
420
421
  /* start watching for the timer jobs */
422
0
  if (reactor_add_reader( timer_fd_out, F_TIMER_JOB, RCT_PRIO_TIMER,NULL)<0){
423
0
    LM_CRIT("failed to add timer pipe_out to reactor\n");
424
0
    goto error;
425
0
  }
426
427
  /* init: start watching for the IPC jobs */
428
0
  if (reactor_add_reader(IPC_FD_READ_SELF, F_IPC, RCT_PRIO_ASYNC, NULL)<0){
429
0
    LM_CRIT("failed to add IPC pipe to reactor\n");
430
0
    goto error;
431
0
  }
432
433
  /* init: start watching for IPC "dispatched" jobs */
434
0
  if (reactor_add_reader(IPC_FD_READ_SHARED, F_IPC, RCT_PRIO_ASYNC, NULL)<0){
435
0
    LM_CRIT("failed to add IPC shared pipe to reactor\n");
436
0
    return -1;
437
0
  }
438
439
  /* add the unix socket */
440
0
  if (reactor_add_reader( tcpmain_sock, F_TCPMAIN, RCT_PRIO_PROC, NULL)<0) {
441
0
    LM_CRIT("failed to add socket to the fd list\n");
442
0
    goto error;
443
0
  }
444
0
  _my_fd_to_tcp_main = tcpmain_sock;
445
446
0
  return 0;
447
0
error:
448
0
  destroy_worker_reactor();
449
0
  return -1;
450
0
}
451
452
void tcp_worker_proc_loop(void)
453
0
{
454
  /* main loop */
455
0
  reactor_main_loop( TCP_CHILD_SELECT_TIMEOUT, error, tcp_receive_timeout());
456
0
  LM_CRIT("exiting...");
457
0
  exit(-1);
458
0
error:
459
0
  destroy_worker_reactor();
460
0
}
461
462
463
void tcp_terminate_worker(void)
464
0
{
465
  /*remove from reactor all the shared fds, so we stop reading from them */
466
467
  /*remove timer jobs pipe */
468
0
  reactor_del_reader( timer_fd_out, -1, 0);
469
470
  /*remove IPC dispatcher pipe */
471
0
  reactor_del_reader( IPC_FD_READ_SHARED, -1, 0);
472
473
  /*remove private IPC pipe */
474
0
  reactor_del_reader( IPC_FD_READ_SELF, -1, 0);
475
476
  /*remove unix sock to TCP main */
477
0
  reactor_del_reader( _my_fd_to_tcp_main, -1, 0);
478
479
0
  _termination_in_progress = 1;
480
481
  /* let's drain the private IPC */
482
0
  ipc_handle_all_pending_jobs(IPC_FD_READ_SELF);
483
484
  /* force passing back all the active conns */
485
0
  tcp_receive_timeout();
486
487
  /* what is left now is the reactor are async fd's, so we need to 
488
   * wait to complete all of them */
489
0
  if (reactor_is_empty())
490
0
    dynamic_process_final_exit();
491
492
  /* the exit will be triggered by the reactor, when empty */
493
0
  LM_INFO("reactor not empty, waiting for pending async/conns\n");
494
0
}
495