/src/opensips/net/net_tcp.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (C) 2014-2015 OpenSIPS Project |
3 | | * Copyright (C) 2001-2003 FhG Fokus |
4 | | * |
5 | | * This file is part of opensips, a free SIP server. |
6 | | * |
7 | | * opensips is free software; you can redistribute it and/or modify |
8 | | * it under the terms of the GNU General Public License as published by |
9 | | * the Free Software Foundation; either version 2 of the License, or |
10 | | * (at your option) any later version. |
11 | | * |
12 | | * opensips is distributed in the hope that it will be useful, |
13 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
14 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
15 | | * GNU General Public License for more details. |
16 | | * |
17 | | * You should have received a copy of the GNU General Public License |
18 | | * along with this program; if not, write to the Free Software |
19 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
20 | | * |
21 | | * |
22 | | * history: |
23 | | * --------- |
24 | | * 2015-01-xx created (razvanc) |
25 | | */ |
26 | | |
27 | | #include <sys/types.h> |
28 | | #include <sys/socket.h> |
29 | | #include <netinet/in.h> |
30 | | #include <netinet/in_systm.h> |
31 | | #include <netinet/ip.h> |
32 | | #include <netinet/tcp.h> |
33 | | #include <sys/uio.h> /* writev*/ |
34 | | #include <netdb.h> |
35 | | #include <stdlib.h> /*exit() */ |
36 | | #include <time.h> /*time() */ |
37 | | #include <unistd.h> |
38 | | #include <errno.h> |
39 | | #include <string.h> |
40 | | |
41 | | #include "../mem/mem.h" |
42 | | #include "../mem/shm_mem.h" |
43 | | #include "../globals.h" |
44 | | #include "../locking.h" |
45 | | #include "../socket_info.h" |
46 | | #include "../ut.h" |
47 | | #include "../pt.h" |
48 | | #include "../pt_load.h" |
49 | | #include "../daemonize.h" |
50 | | #include "../status_report.h" |
51 | | #include "../reactor.h" |
52 | | #include "../timer.h" |
53 | | #include "../ipc.h" |
54 | | |
55 | | #include "tcp_passfd.h" |
56 | | #include "net_tcp_proc.h" |
57 | | #include "net_tcp_report.h" |
58 | | #include "net_tcp.h" |
59 | | #include "tcp_conn.h" |
60 | | #include "tcp_conn_profile.h" |
61 | | #include "trans.h" |
62 | | #include "net_tcp_dbg.h" |
63 | | |
64 | | struct struct_hist_list *con_hist; |
65 | | |
66 | | enum tcp_worker_state { STATE_INACTIVE=0, STATE_ACTIVE, STATE_DRAINING}; |
67 | | |
68 | | /* definition of a TCP worker - the array of these TCP workers is |
69 | | * mainly intended to be used by the TCP main, to keep track of the |
70 | | * workers, about their load and so. Nevertheless, since the addition |
71 | | * of the process auto-scaling, other processes may need access to this |
72 | | * data, thus it's relocation in SHM (versus initial PKG). For example, |
73 | | * the attendant process is the one forking new TCP workers (scaling up), |
74 | | * so it must be able to set the ENABLE state for the TCP worker (and being |
75 | | * (seen by the TCP main proc). Similar, when a TCP worker shuts down, it has |
76 | | * to mark itself as DISABLED and the TCP main must see that. |
77 | | * Again, 99% this array is intended for TCP Main ops, it is not lock |
78 | | * protected, so be very careful with any ops from other procs. |
79 | | */ |
80 | | struct tcp_worker { |
81 | | pid_t pid; |
82 | | int unix_sock; /*!< Main-Worker comm, worker end */ |
83 | | int main_unix_sock; /*!< Main-Worker comm, TCP Main end */ |
84 | | int pt_idx; /*!< Index in the main Process Table */ |
85 | | enum tcp_worker_state state; |
86 | | int n_reqs; /*!< number of requests serviced so far */ |
87 | | }; |
88 | | |
89 | | /* definition of a TCP partition */ |
90 | | struct tcp_partition { |
91 | | /*! \brief connection hash table (after ip&port), includes also aliases */ |
92 | | struct tcp_conn_alias** tcpconn_aliases_hash; |
93 | | /*! \brief connection hash table (after connection id) */ |
94 | | struct tcp_connection** tcpconn_id_hash; |
95 | | gen_lock_t* tcpconn_lock; |
96 | | }; |
97 | | |
98 | | |
99 | | /* array of TCP workers - to be used only by TCP MAIN */ |
100 | | struct tcp_worker *tcp_workers=0; |
101 | | |
102 | | /* unique for each connection, used for |
103 | | * quickly finding the corresponding connection for a reply */ |
104 | | static unsigned int* connection_id=0; |
105 | | |
106 | | /* array of TCP partitions */ |
107 | | static struct tcp_partition tcp_parts[TCP_PARTITION_SIZE]; |
108 | | |
109 | | /*!< tcp protocol number as returned by getprotobyname */ |
110 | | static int tcp_proto_no=-1; |
111 | | |
112 | | /* communication socket from generic proc to TCP main */ |
113 | | int unix_tcp_sock = -1; |
114 | | |
115 | | /*!< current number of open connections */ |
116 | | static int tcp_connections_no = 0; |
117 | | |
118 | | /*!< by default don't accept aliases */ |
119 | | int tcp_accept_aliases=0; |
120 | | int tcp_connect_timeout=DEFAULT_TCP_CONNECT_TIMEOUT; |
121 | | int tcp_con_lifetime=DEFAULT_TCP_CONNECTION_LIFETIME; |
122 | | int tcp_socket_backlog=DEFAULT_TCP_SOCKET_BACKLOG; |
123 | | /*!< by default choose the best method */ |
124 | | enum poll_types tcp_poll_method=0; |
125 | | int tcp_max_connections=DEFAULT_TCP_MAX_CONNECTIONS; |
126 | | /* the configured/starting number of TCP workers */ |
127 | | int tcp_workers_no = UDP_WORKERS_NO; |
128 | | /* the maximum numbers of TCP workers */ |
129 | | int tcp_workers_max_no; |
130 | | /* the name of the auto-scaling profile (optional) */ |
131 | | char* tcp_auto_scaling_profile = NULL; |
132 | | /* Max number of seconds that we except a full SIP message |
133 | | * to arrive in - anything above will lead to the connection to closed */ |
134 | | int tcp_max_msg_time = TCP_CHILD_MAX_MSG_TIME; |
135 | | /* If the data reading may be performed across different workers (still |
136 | | * serial) or by a single worker (the TCP conns sticks to one worker) */ |
137 | | int tcp_parallel_read_on_workers = 0; |
138 | | |
139 | | #ifdef HAVE_SO_KEEPALIVE |
140 | | int tcp_keepalive = 1; |
141 | | #else |
142 | | int tcp_keepalive = 0; |
143 | | #endif |
144 | | int tcp_keepcount = 0; |
145 | | int tcp_keepidle = 0; |
146 | | int tcp_keepinterval = 0; |
147 | | |
148 | | /*!< should we allow opening a new TCP conn when sending data |
149 | | * over UAC branches? - branch flag to be set in the SIP messages */ |
150 | | int tcp_no_new_conn_bflag = 0; |
151 | | /*!< should we allow opening a new TCP conn when sending data |
152 | | * back to UAS (replies)? - msg flag to be set in the SIP messages */ |
153 | | int tcp_no_new_conn_rplflag = 0; |
154 | | /*!< should a new TCP conn be open if needed? - variable used to used for |
155 | | * signalizing between SIP layer (branch flag) and TCP layer (tcp_send func)*/ |
156 | | int tcp_no_new_conn = 0; |
157 | | |
158 | | /* if the TCP net layer is on or off (if no TCP based protos are loaded) */ |
159 | | static int tcp_disabled = 1; |
160 | | |
161 | | /* is the process TCP MAIN ? */ |
162 | | int is_tcp_main = 0; |
163 | | |
164 | | /* the ID of the TCP conn used for the last send operation in the |
165 | | * current process - attention, this is a really ugly HACK here */ |
166 | | unsigned int last_outgoing_tcp_id = 0; |
167 | | |
168 | | static struct scaling_profile *s_profile = NULL; |
169 | | |
170 | | /****************************** helper functions *****************************/ |
171 | | extern void handle_sigs(void); |
172 | | |
173 | | static inline int init_sock_keepalive(int s, const struct tcp_conn_profile *prof) |
174 | 0 | { |
175 | 0 | int ka; |
176 | 0 | #if defined(HAVE_TCP_KEEPINTVL) || defined(HAVE_TCP_KEEPIDLE) || defined(HAVE_TCP_KEEPCNT) |
177 | 0 | int optval; |
178 | 0 | #endif |
179 | |
|
180 | 0 | if (prof->keepinterval || prof->keepidle || prof->keepcount) |
181 | 0 | ka = 1; /* force on */ |
182 | 0 | else |
183 | 0 | ka = prof->keepalive; |
184 | |
|
185 | 0 | #ifdef HAVE_SO_KEEPALIVE |
186 | 0 | if (setsockopt(s,SOL_SOCKET,SO_KEEPALIVE,&ka,sizeof(ka))<0){ |
187 | 0 | LM_WARN("setsockopt failed to enable SO_KEEPALIVE: %s\n", |
188 | 0 | strerror(errno)); |
189 | 0 | return -1; |
190 | 0 | } |
191 | 0 | LM_DBG("TCP keepalive enabled on socket %d\n",s); |
192 | 0 | #endif |
193 | 0 | #ifdef HAVE_TCP_KEEPINTVL |
194 | 0 | if ((optval = prof->keepinterval)) { |
195 | 0 | if (setsockopt(s,IPPROTO_TCP,TCP_KEEPINTVL,&optval,sizeof(optval))<0){ |
196 | 0 | LM_WARN("setsockopt failed to set keepalive probes interval: %s\n", |
197 | 0 | strerror(errno)); |
198 | 0 | } |
199 | 0 | } |
200 | 0 | #endif |
201 | 0 | #ifdef HAVE_TCP_KEEPIDLE |
202 | 0 | if ((optval = prof->keepidle)) { |
203 | 0 | if (setsockopt(s,IPPROTO_TCP,TCP_KEEPIDLE,&optval,sizeof(optval))<0){ |
204 | 0 | LM_WARN("setsockopt failed to set keepalive idle interval: %s\n", |
205 | 0 | strerror(errno)); |
206 | 0 | } |
207 | 0 | } |
208 | 0 | #endif |
209 | 0 | #ifdef HAVE_TCP_KEEPCNT |
210 | 0 | if ((optval = prof->keepcount)) { |
211 | 0 | if (setsockopt(s,IPPROTO_TCP,TCP_KEEPCNT,&optval,sizeof(optval))<0){ |
212 | 0 | LM_WARN("setsockopt failed to set maximum keepalive count: %s\n", |
213 | 0 | strerror(errno)); |
214 | 0 | } |
215 | 0 | } |
216 | 0 | #endif |
217 | 0 | return 0; |
218 | 0 | } |
219 | | |
220 | | static inline void set_sock_reuseport(int s) |
221 | 0 | { |
222 | 0 | int yes = 1; |
223 | |
|
224 | 0 | if (setsockopt(s,SOL_SOCKET,SO_REUSEPORT,&yes,sizeof(yes))<0){ |
225 | 0 | LM_WARN("setsockopt failed to set SO_REUSEPORT: %s\n", |
226 | 0 | strerror(errno)); |
227 | 0 | } |
228 | 0 | if (setsockopt(s,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))<0){ |
229 | 0 | LM_WARN("setsockopt failed to set SO_REUSEADDR: %s\n", |
230 | 0 | strerror(errno)); |
231 | 0 | } |
232 | 0 | } |
233 | | |
234 | | /*! \brief Set all socket/fd options: disable nagle, tos lowdelay, |
235 | | * non-blocking |
236 | | * \return -1 on error */ |
237 | | int tcp_init_sock_opt(int s, const struct tcp_conn_profile *prof, enum si_flags socketflags, int sock_tos) |
238 | 0 | { |
239 | 0 | int flags; |
240 | 0 | int optval; |
241 | |
|
242 | 0 | #ifdef DISABLE_NAGLE |
243 | 0 | flags=1; |
244 | 0 | if ( (tcp_proto_no!=-1) && (setsockopt(s, tcp_proto_no , TCP_NODELAY, |
245 | 0 | &flags, sizeof(flags))<0) ){ |
246 | 0 | LM_WARN("could not disable Nagle: %s\n", strerror(errno)); |
247 | 0 | } |
248 | 0 | #endif |
249 | | /* tos*/ |
250 | 0 | optval = (sock_tos > 0) ? sock_tos : tos; |
251 | 0 | if (optval > 0) { |
252 | 0 | if (setsockopt(s, IPPROTO_IP, IP_TOS, (void*)&optval,sizeof(optval)) ==-1){ |
253 | 0 | LM_WARN("setsockopt tos: %s\n", strerror(errno)); |
254 | | /* continue since this is not critical */ |
255 | 0 | } |
256 | 0 | } |
257 | |
|
258 | 0 | if (probe_max_sock_buff(s,1,MAX_SEND_BUFFER_SIZE,BUFFER_INCREMENT)) { |
259 | 0 | LM_WARN("setsockopt tcp snd buff: %s\n", strerror(errno)); |
260 | | /* continue since this is not critical */ |
261 | 0 | } |
262 | |
|
263 | 0 | init_sock_keepalive(s, prof); |
264 | 0 | if (socketflags & SI_REUSEPORT) |
265 | 0 | set_sock_reuseport(s); |
266 | | |
267 | | /* non-blocking */ |
268 | 0 | flags=fcntl(s, F_GETFL); |
269 | 0 | if (flags==-1){ |
270 | 0 | LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno)); |
271 | 0 | goto error; |
272 | 0 | } |
273 | 0 | if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){ |
274 | 0 | LM_ERR("set non-blocking failed: (%d) %s\n", errno, strerror(errno)); |
275 | 0 | goto error; |
276 | 0 | } |
277 | 0 | return 0; |
278 | 0 | error: |
279 | 0 | return -1; |
280 | 0 | } |
281 | | |
282 | | static int send2worker(struct tcp_connection* tcpconn,int rw) |
283 | 0 | { |
284 | 0 | int i; |
285 | 0 | int min_load; |
286 | 0 | int idx; |
287 | 0 | long response[2]; |
288 | 0 | unsigned int load; |
289 | |
|
290 | 0 | min_load=100; /* it is a percentage */ |
291 | 0 | idx=0; |
292 | 0 | for (i=0; i<tcp_workers_max_no; i++){ |
293 | 0 | if (tcp_workers[i].state==STATE_ACTIVE) { |
294 | 0 | load = pt_get_1m_proc_load( tcp_workers[i].pt_idx ); |
295 | | #ifdef EXTRA_DEBUG |
296 | | LM_DBG("checking TCP worker %d (proc %d), with load %u," |
297 | | "min_load so far %u\n", i, tcp_workers[i].pt_idx, load, |
298 | | min_load); |
299 | | #endif |
300 | 0 | if (min_load>load) { |
301 | 0 | min_load = load; |
302 | 0 | idx = i; |
303 | 0 | } |
304 | 0 | } |
305 | 0 | } |
306 | |
|
307 | 0 | tcp_workers[idx].n_reqs++; |
308 | 0 | LM_DBG("to tcp worker %d (%d/%d) load %u, %p/%d rw %d\n", idx, |
309 | 0 | tcp_workers[idx].pid, tcp_workers[idx].pt_idx, min_load, |
310 | 0 | tcpconn, tcpconn->s, rw); |
311 | 0 | response[0]=(long)tcpconn; |
312 | 0 | response[1]=rw; |
313 | 0 | if (send_fd(tcp_workers[idx].unix_sock, response, sizeof(response), |
314 | 0 | tcpconn->s)<=0){ |
315 | 0 | LM_ERR("send_fd failed\n"); |
316 | 0 | return -1; |
317 | 0 | } |
318 | | |
319 | 0 | return 0; |
320 | 0 | } |
321 | | |
322 | | |
323 | | |
324 | | /********************** TCP conn management functions ************************/ |
325 | | |
326 | | /* initializes an already defined TCP listener */ |
327 | | int tcp_init_listener(struct socket_info *si) |
328 | 0 | { |
329 | 0 | union sockaddr_union* addr; |
330 | 0 | int optval; |
331 | 0 | #ifdef DISABLE_NAGLE |
332 | 0 | int flag; |
333 | 0 | struct protoent* pe; |
334 | |
|
335 | 0 | if (tcp_proto_no==-1){ /* if not already set */ |
336 | 0 | pe=getprotobyname("tcp"); |
337 | 0 | if (pe==0){ |
338 | 0 | LM_ERR("could not get TCP protocol number\n"); |
339 | 0 | tcp_proto_no=-1; |
340 | 0 | }else{ |
341 | 0 | tcp_proto_no=pe->p_proto; |
342 | 0 | } |
343 | 0 | } |
344 | 0 | #endif |
345 | |
|
346 | 0 | addr = &si->su; |
347 | 0 | if (init_su(addr, &si->address, si->port_no)<0){ |
348 | 0 | LM_ERR("could no init sockaddr_union\n"); |
349 | 0 | goto error; |
350 | 0 | } |
351 | 0 | si->socket = socket(AF2PF(addr->s.sa_family), SOCK_STREAM, 0); |
352 | 0 | if (si->socket==-1){ |
353 | 0 | LM_ERR("socket failed with [%s]\n", strerror(errno)); |
354 | 0 | goto error; |
355 | 0 | } |
356 | 0 | #ifdef DISABLE_NAGLE |
357 | 0 | flag=1; |
358 | 0 | if ( (tcp_proto_no!=-1) && |
359 | 0 | (setsockopt(si->socket, tcp_proto_no , TCP_NODELAY, |
360 | 0 | &flag, sizeof(flag))<0) ){ |
361 | 0 | LM_ERR("could not disable Nagle: %s\n",strerror(errno)); |
362 | 0 | } |
363 | 0 | #endif |
364 | |
|
365 | 0 | #if !defined(TCP_DONT_REUSEADDR) |
366 | | /* Stevens, "Network Programming", Section 7.5, "Generic Socket |
367 | | * Options": "...server started,..a child continues..on existing |
368 | | * connection..listening server is restarted...call to bind fails |
369 | | * ... ALL TCP servers should specify the SO_REUSEADDRE option |
370 | | * to allow the server to be restarted in this situation |
371 | | */ |
372 | 0 | optval=1; |
373 | 0 | if (setsockopt(si->socket, SOL_SOCKET, SO_REUSEADDR, |
374 | 0 | (void*)&optval, sizeof(optval))==-1) { |
375 | 0 | LM_ERR("setsockopt failed with [%s]\n", strerror(errno)); |
376 | 0 | goto error; |
377 | 0 | } |
378 | 0 | #endif |
379 | | /* tos */ |
380 | 0 | optval = (si->tos > 0) ? si->tos : tos; |
381 | 0 | if (optval > 0) { |
382 | 0 | if (setsockopt(si->socket, IPPROTO_IP, IP_TOS, (void*)&optval, |
383 | 0 | sizeof(optval)) ==-1){ |
384 | 0 | LM_WARN("setsockopt tos: %s\n", strerror(errno)); |
385 | | /* continue since this is not critical */ |
386 | 0 | } |
387 | 0 | } |
388 | |
|
389 | 0 | if (probe_max_sock_buff(si->socket,1,MAX_SEND_BUFFER_SIZE, |
390 | 0 | BUFFER_INCREMENT)) { |
391 | 0 | LM_WARN("setsockopt tcp snd buff: %s\n",strerror(errno)); |
392 | | /* continue since this is not critical */ |
393 | 0 | } |
394 | |
|
395 | 0 | init_sock_keepalive(si->socket, &tcp_con_df_profile); |
396 | 0 | if (si->flags & SI_REUSEPORT) |
397 | 0 | set_sock_reuseport(si->socket); |
398 | 0 | if (bind(si->socket, &addr->s, sockaddru_len(*addr))==-1){ |
399 | 0 | LM_ERR("bind(%x, %p, %d) on %s:%d : %s\n", |
400 | 0 | si->socket, &addr->s, |
401 | 0 | (unsigned)sockaddru_len(*addr), |
402 | 0 | si->address_str.s, |
403 | 0 | si->port_no, |
404 | 0 | strerror(errno)); |
405 | 0 | goto error; |
406 | 0 | } |
407 | 0 | if (listen(si->socket, tcp_socket_backlog)==-1){ |
408 | 0 | LM_ERR("listen(%x, %p, %d) on %s: %s\n", |
409 | 0 | si->socket, &addr->s, |
410 | 0 | (unsigned)sockaddru_len(*addr), |
411 | 0 | si->address_str.s, |
412 | 0 | strerror(errno)); |
413 | 0 | goto error; |
414 | 0 | } |
415 | | |
416 | 0 | return 0; |
417 | 0 | error: |
418 | 0 | if (si->socket!=-1){ |
419 | 0 | close(si->socket); |
420 | 0 | si->socket=-1; |
421 | 0 | } |
422 | 0 | return -1; |
423 | 0 | } |
424 | | |
425 | | |
426 | | /*! \brief finds a connection, if id=0 return NULL |
427 | | * \note WARNING: unprotected (locks) use tcpconn_get unless you really |
428 | | * know what you are doing */ |
429 | | static struct tcp_connection* _tcpconn_find(unsigned int id) |
430 | 0 | { |
431 | 0 | struct tcp_connection *c; |
432 | 0 | unsigned hash; |
433 | |
|
434 | 0 | if (id){ |
435 | 0 | hash=tcp_id_hash(id); |
436 | 0 | for (c=TCP_PART(id).tcpconn_id_hash[hash]; c; c=c->id_next){ |
437 | | #ifdef EXTRA_DEBUG |
438 | | LM_DBG("c=%p, c->id=%u, port=%d\n",c, c->id, c->rcv.src_port); |
439 | | print_ip("ip=", &c->rcv.src_ip, "\n"); |
440 | | #endif |
441 | 0 | if ((id==c->id)&&(c->state!=S_CONN_BAD)) return c; |
442 | 0 | } |
443 | 0 | } |
444 | 0 | return 0; |
445 | 0 | } |
446 | | |
447 | | |
448 | | /* returns the correlation ID of a TCP connection */ |
449 | | int tcp_get_correlation_id( unsigned int id, unsigned long long *cid) |
450 | 0 | { |
451 | 0 | struct tcp_connection* c; |
452 | |
|
453 | 0 | TCPCONN_LOCK(id); |
454 | 0 | if ( (c=_tcpconn_find(id))!=NULL ) { |
455 | 0 | *cid = c->cid; |
456 | 0 | TCPCONN_UNLOCK(id); |
457 | 0 | return 0; |
458 | 0 | } |
459 | 0 | *cid = 0; |
460 | 0 | TCPCONN_UNLOCK(id); |
461 | 0 | return -1; |
462 | 0 | } |
463 | | |
464 | | |
465 | | /*! \brief _tcpconn_find with locks and acquire fd */ |
466 | | int tcp_conn_get(unsigned int id, struct ip_addr* ip, int port, |
467 | | enum sip_protos proto, void *proto_extra_id, |
468 | | struct tcp_connection** conn, int* conn_fd, |
469 | | const struct socket_info* send_sock) |
470 | 0 | { |
471 | 0 | struct tcp_connection* c; |
472 | 0 | struct tcp_connection* tmp; |
473 | 0 | struct tcp_conn_alias* a; |
474 | 0 | unsigned hash; |
475 | 0 | long response[2]; |
476 | 0 | unsigned int part; |
477 | 0 | int n; |
478 | 0 | int fd; |
479 | |
|
480 | 0 | if (id) { |
481 | 0 | part = id; |
482 | 0 | TCPCONN_LOCK(part); |
483 | 0 | if ( (c=_tcpconn_find(part))!=NULL ) |
484 | 0 | goto found; |
485 | 0 | TCPCONN_UNLOCK(part); |
486 | 0 | } |
487 | | |
488 | | /* continue search based on IP address + port + transport */ |
489 | | #ifdef EXTRA_DEBUG |
490 | | LM_DBG("%d port %u\n",id, port); |
491 | | if (ip) print_ip("tcpconn_find: ip ", ip, "\n"); |
492 | | #endif |
493 | 0 | if (ip){ |
494 | 0 | hash=tcp_addr_hash(ip, port); |
495 | 0 | for( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) { |
496 | 0 | TCPCONN_LOCK(part); |
497 | 0 | for (a=TCP_PART(part).tcpconn_aliases_hash[hash]; a; a=a->next) { |
498 | | #ifdef EXTRA_DEBUG |
499 | | LM_DBG("a=%p, c=%p, c->id=%u, alias port= %d port=%d\n", |
500 | | a, a->parent, a->parent->id, a->port, |
501 | | a->parent->rcv.src_port); |
502 | | print_ip("ip=",&a->parent->rcv.src_ip,"\n"); |
503 | | if (send_sock && a->parent->rcv.bind_address) { |
504 | | print_ip("requested send_sock ip=", &send_sock->address,"\n"); |
505 | | print_ip("found send_sock ip=", &a->parent->rcv.bind_address->address,"\n"); |
506 | | } |
507 | | #endif |
508 | 0 | c = a->parent; |
509 | 0 | if (c->state != S_CONN_BAD && |
510 | 0 | c->flags&F_CONN_INIT && |
511 | 0 | (send_sock==NULL || send_sock == a->parent->rcv.bind_address) && |
512 | 0 | port == a->port && |
513 | 0 | proto == c->type && |
514 | 0 | ip_addr_cmp(ip, &c->rcv.src_ip) && |
515 | 0 | (proto_extra_id==NULL || |
516 | 0 | protos[proto].net.stream.conn.match==NULL || |
517 | 0 | protos[proto].net.stream.conn.match( c, proto_extra_id)) ) |
518 | 0 | goto found; |
519 | 0 | } |
520 | 0 | TCPCONN_UNLOCK(part); |
521 | 0 | } |
522 | 0 | } |
523 | | |
524 | | /* not found */ |
525 | 0 | *conn = NULL; |
526 | 0 | if (conn_fd) *conn_fd = -1; |
527 | 0 | return 0; |
528 | | |
529 | 0 | found: |
530 | 0 | c->refcnt++; |
531 | 0 | TCPCONN_UNLOCK(part); |
532 | 0 | sh_log(c->hist, TCP_REF, "tcp_conn_get, (%d)", c->refcnt); |
533 | |
|
534 | 0 | LM_DBG("con found in state %d\n",c->state); |
535 | |
|
536 | 0 | if (c->state!=S_CONN_OK || conn_fd==NULL) { |
537 | | /* no need to acquired, just return the conn with an invalid fd */ |
538 | 0 | *conn = c; |
539 | 0 | if (conn_fd) *conn_fd = -1; |
540 | 0 | return 1; |
541 | 0 | } |
542 | | |
543 | 0 | if (c->proc_id == process_no) { |
544 | 0 | LM_DBG("tcp connection found (%p) already in this process ( %d ) ," |
545 | 0 | " fd = %d\n", c, c->proc_id, c->fd); |
546 | | /* we already have the connection in this worker's reactor, */ |
547 | | /* no need to acquire FD */ |
548 | 0 | *conn = c; |
549 | 0 | *conn_fd = c->fd; |
550 | 0 | return 1; |
551 | 0 | } |
552 | | |
553 | | /* acquire the fd for this connection too */ |
554 | 0 | LM_DBG("tcp connection found (%p), acquiring fd\n", c); |
555 | | /* get the fd */ |
556 | 0 | response[0]=(long)c; |
557 | 0 | response[1]=CONN_GET_FD; |
558 | 0 | n=send_all(unix_tcp_sock, response, sizeof(response)); |
559 | 0 | if (n<=0){ |
560 | 0 | LM_ERR("failed to get fd(write):%s (%d)\n", |
561 | 0 | strerror(errno), errno); |
562 | 0 | n=-1; |
563 | 0 | goto error; |
564 | 0 | } |
565 | 0 | LM_DBG("c= %p, n=%d, Usock=%d\n", c, n, unix_tcp_sock); |
566 | 0 | tmp = c; |
567 | 0 | n=receive_fd(unix_tcp_sock, &c, sizeof(c), &fd, MSG_WAITALL); |
568 | 0 | if (n<=0){ |
569 | 0 | LM_ERR("failed to get fd(receive_fd):" |
570 | 0 | " %s (%d)\n", strerror(errno), errno); |
571 | 0 | n=-1; |
572 | 0 | goto error; |
573 | 0 | } |
574 | 0 | if (c!=tmp){ |
575 | 0 | LM_CRIT("got different connection:" |
576 | 0 | " %p (id= %u, refcnt=%d state=%d != " |
577 | 0 | " %p (id= %u, refcnt=%d state=%d (n=%d)\n", |
578 | 0 | c, c->id, c->refcnt, c->state, |
579 | 0 | tmp, tmp->id, tmp->refcnt, tmp->state, n |
580 | 0 | ); |
581 | 0 | n=-1; /* fail */ |
582 | 0 | close(fd); |
583 | 0 | goto error; |
584 | 0 | } |
585 | 0 | LM_DBG("after receive_fd: c= %p n=%d fd=%d\n",c, n, fd); |
586 | |
|
587 | 0 | *conn = c; |
588 | 0 | *conn_fd = fd; |
589 | |
|
590 | 0 | return 1; |
591 | 0 | error: |
592 | 0 | tcpconn_put(c); |
593 | 0 | sh_log(c->hist, TCP_UNREF, "tcp_conn_get, (%d)", c->refcnt); |
594 | 0 | *conn = NULL; |
595 | 0 | *conn_fd = -1; |
596 | 0 | return -1; |
597 | 0 | } |
598 | | |
599 | | |
600 | | /* used to tune the tcp_connection attributes - not to be used inside the |
601 | | network layer, but onlu from the above layer (otherwise we may end up |
602 | | in strange deadlocks!) */ |
603 | | int tcp_conn_fcntl(struct receive_info *rcv, int attr, void *value) |
604 | 0 | { |
605 | 0 | struct tcp_connection *con; |
606 | |
|
607 | 0 | switch (attr) { |
608 | 0 | case DST_FCNTL_SET_LIFETIME: |
609 | | /* set connection timeout */ |
610 | 0 | TCPCONN_LOCK(rcv->proto_reserved1); |
611 | 0 | con =_tcpconn_find(rcv->proto_reserved1); |
612 | 0 | if (!con) { |
613 | 0 | LM_ERR("Strange, tcp conn not found (id=%u)\n", |
614 | 0 | rcv->proto_reserved1); |
615 | 0 | } else { |
616 | 0 | tcp_conn_set_lifetime( con, (int)(long)(value)); |
617 | 0 | } |
618 | 0 | TCPCONN_UNLOCK(rcv->proto_reserved1); |
619 | 0 | return 0; |
620 | 0 | default: |
621 | 0 | LM_ERR("unsupported operation %d on conn\n",attr); |
622 | 0 | return -1; |
623 | 0 | } |
624 | 0 | return -1; |
625 | 0 | } |
626 | | |
627 | | |
628 | | static struct tcp_connection* tcpconn_add(struct tcp_connection *c) |
629 | 0 | { |
630 | 0 | unsigned hash; |
631 | |
|
632 | 0 | if (c){ |
633 | 0 | TCPCONN_LOCK(c->id); |
634 | | /* add it at the beginning of the list*/ |
635 | 0 | hash=tcp_id_hash(c->id); |
636 | 0 | c->id_hash=hash; |
637 | 0 | tcpconn_listadd(TCP_PART(c->id).tcpconn_id_hash[hash], c, id_next, |
638 | 0 | id_prev); |
639 | |
|
640 | 0 | hash=tcp_addr_hash(&c->rcv.src_ip, c->rcv.src_port); |
641 | | /* set the first alias */ |
642 | 0 | c->con_aliases[0].port=c->rcv.src_port; |
643 | 0 | c->con_aliases[0].hash=hash; |
644 | 0 | c->con_aliases[0].parent=c; |
645 | 0 | tcpconn_listadd(TCP_PART(c->id).tcpconn_aliases_hash[hash], |
646 | 0 | &c->con_aliases[0], next, prev); |
647 | 0 | c->aliases++; |
648 | 0 | TCPCONN_UNLOCK(c->id); |
649 | 0 | LM_DBG("hashes: %d, %d\n", hash, c->id_hash); |
650 | 0 | return c; |
651 | 0 | }else{ |
652 | 0 | LM_CRIT("null connection pointer\n"); |
653 | 0 | return 0; |
654 | 0 | } |
655 | 0 | } |
656 | | |
657 | | static str e_tcp_src_ip = str_init("src_ip"); |
658 | | static str e_tcp_src_port = str_init("src_port"); |
659 | | static str e_tcp_dst_ip = str_init("dst_ip"); |
660 | | static str e_tcp_dst_port = str_init("dst_port"); |
661 | | static str e_tcp_c_proto = str_init("proto"); |
662 | | |
663 | | static void tcp_disconnect_event_raise(struct tcp_connection* c) |
664 | 0 | { |
665 | 0 | evi_params_p list = 0; |
666 | 0 | str src_ip,dst_ip, proto; |
667 | 0 | int src_port,dst_port; |
668 | 0 | char src_ip_buf[IP_ADDR_MAX_STR_SIZE],dst_ip_buf[IP_ADDR_MAX_STR_SIZE]; |
669 | | |
670 | | // event has to be triggered - check for subscribers |
671 | 0 | if (!evi_probe_event(EVI_TCP_DISCONNECT)) { |
672 | 0 | goto end; |
673 | 0 | } |
674 | | |
675 | 0 | if (!(list = evi_get_params())) |
676 | 0 | goto end; |
677 | | |
678 | 0 | src_ip.s = ip_addr2a( &c->rcv.src_ip ); |
679 | 0 | memcpy(src_ip_buf,src_ip.s,IP_ADDR_MAX_STR_SIZE); |
680 | 0 | src_ip.s = src_ip_buf; |
681 | 0 | src_ip.len = strlen(src_ip.s); |
682 | |
|
683 | 0 | if (evi_param_add_str(list, &e_tcp_src_ip, &src_ip)) { |
684 | 0 | LM_ERR("unable to add parameter\n"); |
685 | 0 | goto end; |
686 | 0 | } |
687 | | |
688 | 0 | src_port = c->rcv.src_port; |
689 | |
|
690 | 0 | if (evi_param_add_int(list, &e_tcp_src_port, &src_port)) { |
691 | 0 | LM_ERR("unable to add parameter\n"); |
692 | 0 | goto end; |
693 | 0 | } |
694 | | |
695 | 0 | dst_ip.s = ip_addr2a( &c->rcv.dst_ip ); |
696 | 0 | memcpy(dst_ip_buf,dst_ip.s,IP_ADDR_MAX_STR_SIZE); |
697 | 0 | dst_ip.s = dst_ip_buf; |
698 | 0 | dst_ip.len = strlen(dst_ip.s); |
699 | |
|
700 | 0 | if (evi_param_add_str(list, &e_tcp_dst_ip, &dst_ip)) { |
701 | 0 | LM_ERR("unable to add parameter\n"); |
702 | 0 | goto end; |
703 | 0 | } |
704 | | |
705 | 0 | dst_port = c->rcv.dst_port; |
706 | |
|
707 | 0 | if (evi_param_add_int(list, &e_tcp_dst_port, &dst_port)) { |
708 | 0 | LM_ERR("unable to add parameter\n"); |
709 | 0 | goto end; |
710 | 0 | } |
711 | | |
712 | 0 | proto.s = protos[c->rcv.proto].name; |
713 | 0 | proto.len = strlen(proto.s); |
714 | |
|
715 | 0 | if (evi_param_add_str(list, &e_tcp_c_proto, &proto)) { |
716 | 0 | LM_ERR("unable to add parameter\n"); |
717 | 0 | goto end; |
718 | 0 | } |
719 | | |
720 | 0 | if (is_tcp_main) { |
721 | 0 | if (evi_dispatch_event(EVI_TCP_DISCONNECT, list)) { |
722 | 0 | LM_ERR("unable to dispatch tcp disconnect event\n"); |
723 | 0 | } |
724 | 0 | } else { |
725 | 0 | if (evi_raise_event(EVI_TCP_DISCONNECT, list)) { |
726 | 0 | LM_ERR("unable to send tcp disconnect event\n"); |
727 | 0 | } |
728 | 0 | } |
729 | 0 | list = 0; |
730 | |
|
731 | 0 | end: |
732 | 0 | if (list) |
733 | 0 | evi_free_params(list); |
734 | 0 | } |
735 | | |
736 | | /* convenience macro to aid in shm_free() debugging */ |
737 | | #define _tcpconn_rm(c, ne) \ |
738 | 0 | do {\ |
739 | 0 | __tcpconn_rm(c, ne);\ |
740 | 0 | shm_free(c);\ |
741 | 0 | } while (0) |
742 | | |
743 | | /*! \brief unsafe tcpconn_rm version (nolocks) */ |
744 | | static void __tcpconn_rm(struct tcp_connection* c, int no_event) |
745 | 0 | { |
746 | 0 | int r; |
747 | |
|
748 | 0 | tcpconn_listrm(TCP_PART(c->id).tcpconn_id_hash[c->id_hash], c, |
749 | 0 | id_next, id_prev); |
750 | | /* remove all the aliases */ |
751 | 0 | for (r=0; r<c->aliases; r++) |
752 | 0 | tcpconn_listrm(TCP_PART(c->id).tcpconn_aliases_hash[c->con_aliases[r].hash], |
753 | 0 | &c->con_aliases[r], next, prev); |
754 | 0 | lock_destroy(&c->write_lock); |
755 | |
|
756 | 0 | if (c->async) { |
757 | 0 | for (r = 0; r<c->async->pending; r++) |
758 | 0 | shm_free(c->async->chunks[r]); |
759 | 0 | shm_free(c->async); |
760 | 0 | c->async = NULL; |
761 | 0 | } |
762 | |
|
763 | 0 | if (c->con_req) |
764 | 0 | shm_free(c->con_req); |
765 | |
|
766 | 0 | if (protos[c->type].net.stream.conn.clean) |
767 | 0 | protos[c->type].net.stream.conn.clean(c); |
768 | |
|
769 | 0 | if (!no_event) tcp_disconnect_event_raise(c); |
770 | |
|
771 | | #ifdef DBG_TCPCON |
772 | | sh_log(c->hist, TCP_DESTROY, "type=%d", c->type); |
773 | | sh_unref(c->hist); |
774 | | c->hist = NULL; |
775 | | #endif |
776 | | |
777 | | /* shm_free(c); -- freed by _tcpconn_rm() */ |
778 | 0 | } |
779 | | |
780 | | |
781 | | |
782 | | #if 0 |
783 | | static void tcpconn_rm(struct tcp_connection* c) |
784 | | { |
785 | | int r; |
786 | | |
787 | | TCPCONN_LOCK(c->id); |
788 | | tcpconn_listrm(TCP_PART(c->id).tcpconn_id_hash[c->id_hash], c, |
789 | | id_next, id_prev); |
790 | | /* remove all the aliases */ |
791 | | for (r=0; r<c->aliases; r++) |
792 | | tcpconn_listrm(TCP_PART(c->id).tcpconn_aliases_hash |
793 | | [c->con_aliases[r].hash], |
794 | | &c->con_aliases[r], next, prev); |
795 | | TCPCONN_UNLOCK(c->id); |
796 | | lock_destroy(&c->write_lock); |
797 | | |
798 | | if (protos[c->type].net.stream.conn.clean) |
799 | | protos[c->type].net.stream.conn.clean(c); |
800 | | |
801 | | shm_free(c); |
802 | | } |
803 | | #endif |
804 | | |
805 | | |
806 | | /*! \brief add port as an alias for the "id" connection |
807 | | * \return 0 on success,-1 on failure */ |
808 | | int tcpconn_add_alias(struct sip_msg *msg, unsigned int id, int port, int proto) |
809 | 0 | { |
810 | 0 | struct tcp_connection* c; |
811 | 0 | unsigned hash; |
812 | 0 | struct tcp_conn_alias* a; |
813 | |
|
814 | 0 | a=0; |
815 | | /* fix the port */ |
816 | 0 | port=port ? port : protos[proto].default_port ; |
817 | 0 | TCPCONN_LOCK(id); |
818 | | /* check if alias already exists */ |
819 | 0 | c=_tcpconn_find(id); |
820 | 0 | if (c) { |
821 | 0 | if (msg && !(c->profile.alias_mode == TCP_ALIAS_ALWAYS |
822 | 0 | || (c->profile.alias_mode == TCP_ALIAS_RFC_5923 |
823 | 0 | && msg->via1->alias))) { |
824 | 0 | LM_DBG("refusing to add alias (alias_mode: %u, via 'alias': %u)\n", |
825 | 0 | c->profile.alias_mode, !!msg->via1->alias); |
826 | 0 | TCPCONN_UNLOCK(id); |
827 | 0 | return 0; |
828 | 0 | } |
829 | | |
830 | 0 | hash=tcp_addr_hash(&c->rcv.src_ip, port); |
831 | | /* search the aliases for an already existing one */ |
832 | 0 | for (a=TCP_PART(id).tcpconn_aliases_hash[hash]; a; a=a->next) { |
833 | 0 | if (a->parent->state != S_CONN_BAD && |
834 | 0 | port == a->port && |
835 | 0 | proto == a->parent->type && |
836 | 0 | ip_addr_cmp(&c->rcv.src_ip, &a->parent->rcv.src_ip)) { |
837 | | /* found */ |
838 | 0 | if (a->parent!=c) goto error_sec; |
839 | 0 | else goto ok; |
840 | 0 | } |
841 | 0 | } |
842 | 0 | if (c->aliases>=TCP_CON_MAX_ALIASES) goto error_aliases; |
843 | 0 | c->con_aliases[c->aliases].parent=c; |
844 | 0 | c->con_aliases[c->aliases].port=port; |
845 | 0 | c->con_aliases[c->aliases].hash=hash; |
846 | 0 | tcpconn_listadd(TCP_PART(id).tcpconn_aliases_hash[hash], |
847 | 0 | &c->con_aliases[c->aliases], next, prev); |
848 | 0 | c->aliases++; |
849 | 0 | }else goto error_not_found; |
850 | 0 | ok: |
851 | 0 | TCPCONN_UNLOCK(id); |
852 | | #ifdef EXTRA_DEBUG |
853 | | if (a) LM_DBG("alias already present\n"); |
854 | | else LM_DBG("alias port %d for hash %d, id %u\n", port, hash, id); |
855 | | #endif |
856 | 0 | return 0; |
857 | 0 | error_aliases: |
858 | 0 | TCPCONN_UNLOCK(id); |
859 | 0 | LM_ERR("too many aliases for connection %p (%u)\n", c, id); |
860 | 0 | return -1; |
861 | 0 | error_not_found: |
862 | 0 | TCPCONN_UNLOCK(id); |
863 | 0 | LM_ERR("no connection found for id %u\n",id); |
864 | 0 | return -1; |
865 | 0 | error_sec: |
866 | 0 | LM_WARN("possible port hijack attempt\n"); |
867 | 0 | LM_WARN("alias already present and points to another connection " |
868 | 0 | "(%d : %d and %u : %d)\n", a->parent->id, port, id, port); |
869 | 0 | TCPCONN_UNLOCK(id); |
870 | 0 | return -1; |
871 | 0 | } |
872 | | |
873 | | |
874 | | void tcpconn_put(struct tcp_connection* c) |
875 | 0 | { |
876 | 0 | TCPCONN_LOCK(c->id); |
877 | 0 | c->refcnt--; |
878 | 0 | TCPCONN_UNLOCK(c->id); |
879 | 0 | } |
880 | | |
881 | | |
882 | | static inline void tcpconn_ref(struct tcp_connection* c) |
883 | 0 | { |
884 | 0 | TCPCONN_LOCK(c->id); |
885 | 0 | c->refcnt++; |
886 | 0 | TCPCONN_UNLOCK(c->id); |
887 | 0 | } |
888 | | |
889 | | |
890 | | static struct tcp_connection* tcpconn_new(int sock, const union sockaddr_union* su, |
891 | | const struct socket_info* si, const struct tcp_conn_profile *prof, |
892 | | int state, int flags, int in_main_proc) |
893 | 0 | { |
894 | 0 | struct tcp_connection *c; |
895 | 0 | union sockaddr_union local_su; |
896 | 0 | unsigned int su_size; |
897 | |
|
898 | 0 | c=(struct tcp_connection*)shm_malloc(sizeof(struct tcp_connection)); |
899 | 0 | if (c==0){ |
900 | 0 | LM_ERR("shared memory allocation failure\n"); |
901 | 0 | return 0; |
902 | 0 | } |
903 | 0 | memset(c, 0, sizeof(struct tcp_connection)); /* zero init */ |
904 | 0 | c->s=sock; |
905 | 0 | c->fd=-1; /* not initialized */ |
906 | 0 | if (lock_init(&c->write_lock)==0){ |
907 | 0 | LM_ERR("init lock failed\n"); |
908 | 0 | goto error0; |
909 | 0 | } |
910 | | |
911 | 0 | c->rcv.src_su=*su; |
912 | |
|
913 | 0 | c->refcnt=0; |
914 | 0 | su2ip_addr(&c->rcv.src_ip, su); |
915 | 0 | c->rcv.src_port=su_getport(su); |
916 | 0 | c->rcv.bind_address = si; |
917 | 0 | c->rcv.dst_ip = si->address; |
918 | 0 | su_size = sockaddru_len(*su); |
919 | 0 | if (getsockname(sock, (struct sockaddr *)&local_su, &su_size)<0) { |
920 | 0 | LM_ERR("failed to get info on received interface/IP %d/%s\n", |
921 | 0 | errno, strerror(errno)); |
922 | 0 | goto error; |
923 | 0 | } |
924 | 0 | c->rcv.dst_port = su_getport(&local_su); |
925 | 0 | print_ip("tcpconn_new: new tcp connection to: ", &c->rcv.src_ip, "\n"); |
926 | 0 | LM_DBG("on port %d, proto %d\n", c->rcv.src_port, si->proto); |
927 | 0 | c->id=(*connection_id)++; |
928 | 0 | c->cid = (unsigned long long)c->id |
929 | 0 | | ( (unsigned long long)(startup_time&0xFFFFFF) << 32 ) |
930 | 0 | | ( (unsigned long long)(rand()&0xFF) << 56 ); |
931 | |
|
932 | 0 | c->rcv.proto_reserved1=0; /* this will be filled before receive_message*/ |
933 | 0 | c->rcv.proto_reserved2=0; |
934 | 0 | c->state=state; |
935 | 0 | c->extra_data=0; |
936 | 0 | c->type = si->proto; |
937 | 0 | c->rcv.proto = si->proto; |
938 | | /* start with the default conn lifetime */ |
939 | 0 | c->lifetime = get_ticks() + prof->con_lifetime; |
940 | 0 | c->profile = *prof; |
941 | 0 | c->flags|=F_CONN_REMOVED|flags; |
942 | | #ifdef DBG_TCPCON |
943 | | c->hist = sh_push(c, con_hist); |
944 | | #endif |
945 | |
|
946 | 0 | if (protos[si->proto].net.stream.async_chunks) { |
947 | 0 | c->async = shm_malloc(sizeof(struct tcp_async_data) + |
948 | 0 | protos[si->proto].net.stream.async_chunks * |
949 | 0 | sizeof(struct tcp_async_chunk)); |
950 | 0 | if (c->async) { |
951 | 0 | c->async->allocated = protos[si->proto].net.stream.async_chunks; |
952 | 0 | c->async->oldest = 0; |
953 | 0 | c->async->pending = 0; |
954 | 0 | } else { |
955 | 0 | LM_ERR("could not allocate async data for con!\n"); |
956 | 0 | goto error; |
957 | 0 | } |
958 | 0 | } |
959 | 0 | if(in_main_proc) |
960 | 0 | tcp_connections_no++; |
961 | 0 | return c; |
962 | | |
963 | 0 | error: |
964 | 0 | lock_destroy(&c->write_lock); |
965 | 0 | error0: |
966 | 0 | shm_free(c); |
967 | 0 | return 0; |
968 | 0 | } |
969 | | |
970 | | |
971 | | /* creates a new tcp connection structure |
972 | | * if send2main is 1, the function informs the TCP Main about the new conn |
973 | | * a +1 ref is set for the new conn ! |
974 | | * IMPORTANT - the function assumes you want to create a new TCP conn as |
975 | | * a result of a connect operation - the conn will be set as connect !! |
976 | | * Accepted connection are triggered internally only */ |
977 | | struct tcp_connection* tcp_conn_create(int sock, const union sockaddr_union* su, |
978 | | const struct socket_info* si, struct tcp_conn_profile *prof, |
979 | | int state, int send2main) |
980 | 0 | { |
981 | 0 | struct tcp_connection *c; |
982 | |
|
983 | 0 | if (!prof) |
984 | 0 | tcp_con_get_profile(su, &si->su, si->proto, prof); |
985 | | |
986 | | /* create the connection structure */ |
987 | 0 | c = tcpconn_new(sock, su, si, prof, state, 0, !send2main); |
988 | 0 | if (c==NULL) { |
989 | 0 | LM_ERR("tcpconn_new failed\n"); |
990 | 0 | return NULL; |
991 | 0 | } |
992 | | |
993 | 0 | if (protos[c->type].net.stream.conn.init && |
994 | 0 | protos[c->type].net.stream.conn.init(c) < 0) { |
995 | 0 | LM_ERR("failed to do proto %d specific init for conn %p\n", |
996 | 0 | c->type, c); |
997 | 0 | tcp_conn_destroy(c); |
998 | 0 | return NULL; |
999 | 0 | } |
1000 | 0 | c->flags |= F_CONN_INIT; |
1001 | |
|
1002 | 0 | c->refcnt++; /* safe to do it w/o locking, it's not yet |
1003 | | available to the rest of the world */ |
1004 | 0 | sh_log(c->hist, TCP_REF, "connect, (%d)", c->refcnt); |
1005 | 0 | if (!send2main) |
1006 | 0 | return c; |
1007 | | |
1008 | 0 | return (tcp_conn_send(c) == 0 ? c : NULL); |
1009 | 0 | } |
1010 | | |
1011 | | /* sends a new connection from a worker to main */ |
1012 | | int tcp_conn_send(struct tcp_connection *c) |
1013 | 0 | { |
1014 | 0 | long response[2]; |
1015 | 0 | int n, fd; |
1016 | | |
1017 | | /* inform TCP main about this new connection */ |
1018 | 0 | if (c->state==S_CONN_CONNECTING) { |
1019 | | /* store the local fd now, before TCP main overwrites it */ |
1020 | 0 | fd = c->s; |
1021 | 0 | response[0]=(long)c; |
1022 | 0 | response[1]=ASYNC_CONNECT; |
1023 | 0 | n=send_fd(unix_tcp_sock, response, sizeof(response), fd); |
1024 | 0 | if (n<=0) { |
1025 | 0 | LM_ERR("Failed to send the socket to main for async connection\n"); |
1026 | 0 | goto error; |
1027 | 0 | } |
1028 | 0 | close(fd); |
1029 | 0 | } else { |
1030 | 0 | response[0]=(long)c; |
1031 | 0 | response[1]=CONN_NEW; |
1032 | 0 | n=send_fd(unix_tcp_sock, response, sizeof(response), c->s); |
1033 | 0 | if (n<=0){ |
1034 | 0 | LM_ERR("failed send_fd: %s (%d)\n", strerror(errno), errno); |
1035 | 0 | goto error; |
1036 | 0 | } |
1037 | 0 | } |
1038 | | |
1039 | 0 | return 0; |
1040 | 0 | error: |
1041 | | /* no reporting as closed, as PROTO layer did not reporte it as |
1042 | | * OPEN yet */ |
1043 | 0 | _tcpconn_rm(c,1); |
1044 | 0 | tcp_connections_no--; |
1045 | 0 | return -1; |
1046 | 0 | } |
1047 | | |
1048 | | |
1049 | | static inline void tcpconn_destroy(struct tcp_connection* tcpconn) |
1050 | 0 | { |
1051 | 0 | int fd; |
1052 | 0 | int unsigned id = tcpconn->id; |
1053 | |
|
1054 | 0 | TCPCONN_LOCK(id); /*avoid races w/ tcp_send*/ |
1055 | 0 | tcpconn->refcnt--; |
1056 | 0 | if (tcpconn->refcnt==0){ |
1057 | 0 | LM_DBG("destroying connection %p, flags %04x\n", |
1058 | 0 | tcpconn, tcpconn->flags); |
1059 | 0 | fd=tcpconn->s; |
1060 | | /* no reporting here - the tcpconn_destroy() function is called |
1061 | | * from the TCP_MAIN reactor when handling connectioned received |
1062 | | * from a worker; and we generate the CLOSE reports from WORKERs */ |
1063 | 0 | _tcpconn_rm(tcpconn,0); |
1064 | 0 | if (fd >= 0) |
1065 | 0 | close(fd); |
1066 | 0 | tcp_connections_no--; |
1067 | 0 | }else{ |
1068 | | /* force timeout */ |
1069 | 0 | tcpconn->lifetime=0; |
1070 | 0 | tcpconn->state=S_CONN_BAD; |
1071 | 0 | LM_DBG("delaying (%p, flags %04x) ref = %d ...\n", |
1072 | 0 | tcpconn, tcpconn->flags, tcpconn->refcnt); |
1073 | |
|
1074 | 0 | } |
1075 | 0 | TCPCONN_UNLOCK(id); |
1076 | 0 | } |
1077 | | |
1078 | | /* wrapper to the internally used function */ |
1079 | | void tcp_conn_destroy(struct tcp_connection* tcpconn) |
1080 | 0 | { |
1081 | 0 | tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE, |
1082 | 0 | "Closed by Proto layer"); |
1083 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "tcp_conn_destroy, (%d)", tcpconn->refcnt); |
1084 | 0 | return tcpconn_destroy(tcpconn); |
1085 | 0 | } |
1086 | | |
1087 | | |
1088 | | /************************ TCP MAIN process functions ************************/ |
1089 | | |
1090 | | /*! \brief |
1091 | | * handles a new connection, called internally by tcp_main_loop/handle_io. |
1092 | | * \param si - pointer to one of the tcp socket_info structures on which |
1093 | | * an io event was detected (connection attempt) |
1094 | | * \return handle_* return convention: -1 on error, 0 on EAGAIN (no more |
1095 | | * io events queued), >0 on success. success/error refer only to |
1096 | | * the accept. |
1097 | | */ |
1098 | | static inline int handle_new_connect(const struct socket_info* si) |
1099 | 0 | { |
1100 | 0 | union sockaddr_union su; |
1101 | 0 | struct tcp_connection* tcpconn; |
1102 | 0 | struct tcp_conn_profile prof; |
1103 | 0 | socklen_t su_len = sizeof(su); |
1104 | 0 | int new_sock; |
1105 | 0 | unsigned int id; |
1106 | | |
1107 | | /* coverity[overrun-buffer-arg: FALSE] - union has 28 bytes, CID #200070 */ |
1108 | 0 | new_sock=accept(si->socket, &(su.s), &su_len); |
1109 | 0 | if (new_sock==-1){ |
1110 | 0 | if ((errno==EAGAIN)||(errno==EWOULDBLOCK)) |
1111 | 0 | return 0; |
1112 | 0 | LM_ERR("failed to accept connection(%d): %s\n", errno, strerror(errno)); |
1113 | 0 | return -1; |
1114 | 0 | } |
1115 | 0 | if (tcp_connections_no>=tcp_max_connections){ |
1116 | 0 | LM_ERR("maximum number of connections exceeded: %d/%d\n", |
1117 | 0 | tcp_connections_no, tcp_max_connections); |
1118 | 0 | close(new_sock); |
1119 | 0 | return 1; /* success, because the accept was successful */ |
1120 | 0 | } |
1121 | | |
1122 | 0 | tcp_con_get_profile(&su, &si->su, si->proto, &prof); |
1123 | 0 | if (tcp_init_sock_opt(new_sock, &prof, si->flags, si->tos)<0){ |
1124 | 0 | LM_ERR("tcp_init_sock_opt failed\n"); |
1125 | 0 | close(new_sock); |
1126 | 0 | return 1; /* success, because the accept was successful */ |
1127 | 0 | } |
1128 | | |
1129 | | /* add socket to list */ |
1130 | 0 | tcpconn=tcpconn_new(new_sock, &su, si, &prof, S_CONN_OK, F_CONN_ACCEPTED, 1); |
1131 | 0 | if (tcpconn){ |
1132 | 0 | tcpconn->refcnt++; /* safe, not yet available to the |
1133 | | outside world */ |
1134 | 0 | sh_log(tcpconn->hist, TCP_REF, "accept, (%d)", tcpconn->refcnt); |
1135 | 0 | tcpconn_add(tcpconn); |
1136 | 0 | LM_DBG("new connection: %p %d flags: %04x\n", |
1137 | 0 | tcpconn, tcpconn->s, tcpconn->flags); |
1138 | | /* pass it to a workerr */ |
1139 | 0 | sh_log(tcpconn->hist, TCP_SEND2CHILD, "accept"); |
1140 | 0 | if(send2worker(tcpconn,IO_WATCH_READ)<0){ |
1141 | 0 | LM_ERR("no TCP workers available\n"); |
1142 | 0 | id = tcpconn->id; |
1143 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "accept, (%d)", tcpconn->refcnt); |
1144 | 0 | TCPCONN_LOCK(id); |
1145 | 0 | tcpconn->refcnt--; |
1146 | 0 | if (tcpconn->refcnt==0){ |
1147 | | /* no close to report here as the connection was not yet |
1148 | | * reported as OPEN by the proto layer...this sucks a bit */ |
1149 | 0 | _tcpconn_rm(tcpconn,1); |
1150 | 0 | close(new_sock/*same as tcpconn->s*/); |
1151 | 0 | }else tcpconn->lifetime=0; /* force expire */ |
1152 | 0 | TCPCONN_UNLOCK(id); |
1153 | 0 | } |
1154 | 0 | }else{ /*tcpconn==0 */ |
1155 | 0 | LM_ERR("tcpconn_new failed, closing socket\n"); |
1156 | 0 | close(new_sock); |
1157 | 0 | } |
1158 | 0 | return 1; /* accept() was successful */ |
1159 | 0 | } |
1160 | | |
1161 | | |
1162 | | /*! \brief |
1163 | | * handles an io event on one of the watched tcp connections |
1164 | | * |
1165 | | * \param tcpconn - pointer to the tcp_connection for which we have an io ev. |
1166 | | * \param fd_i - index in the fd_array table (needed for delete) |
1167 | | * \return handle_* return convention, but on success it always returns 0 |
1168 | | * (because it's one-shot, after a successful execution the fd is |
1169 | | * removed from tcp_main's watch fd list and passed to a worker => |
1170 | | * tcp_main is not interested in further io events that might be |
1171 | | * queued for this fd) |
1172 | | */ |
1173 | | inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i, |
1174 | | int event_type) |
1175 | 0 | { |
1176 | 0 | int fd; |
1177 | 0 | int err; |
1178 | 0 | unsigned int id; |
1179 | 0 | unsigned int err_len; |
1180 | |
|
1181 | 0 | if (event_type == IO_WATCH_READ) { |
1182 | | /* pass it to worker, so remove it from the io watch list */ |
1183 | 0 | LM_DBG("data available on %p %d\n", tcpconn, tcpconn->s); |
1184 | 0 | if (reactor_del_reader(tcpconn->s, fd_i, 0)==-1) |
1185 | 0 | return -1; |
1186 | 0 | tcpconn->flags|=F_CONN_REMOVED_READ; |
1187 | 0 | tcpconn_ref(tcpconn); /* refcnt ++ */ |
1188 | 0 | sh_log(tcpconn->hist, TCP_REF, "tcpconn read, (%d)", tcpconn->refcnt); |
1189 | 0 | sh_log(tcpconn->hist, TCP_SEND2CHILD, "read"); |
1190 | 0 | if (send2worker(tcpconn,IO_WATCH_READ)<0){ |
1191 | 0 | LM_ERR("no TCP workers available\n"); |
1192 | 0 | id = tcpconn->id; |
1193 | 0 | TCPCONN_LOCK(id); |
1194 | 0 | tcpconn->refcnt--; |
1195 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "tcpconn read, (%d)", tcpconn->refcnt); |
1196 | 0 | if (tcpconn->refcnt==0){ |
1197 | 0 | fd=tcpconn->s; |
1198 | 0 | tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE, |
1199 | 0 | "No worker for read"); |
1200 | 0 | _tcpconn_rm(tcpconn,0); |
1201 | 0 | close(fd); |
1202 | 0 | }else tcpconn->lifetime=0; /* force expire*/ |
1203 | 0 | TCPCONN_UNLOCK(id); |
1204 | 0 | } |
1205 | 0 | return 0; /* we are not interested in possibly queued io events, |
1206 | | the fd was either passed to a worker, or closed */ |
1207 | 0 | } else { |
1208 | 0 | LM_DBG("connection %p fd %d is now writable\n", tcpconn, tcpconn->s); |
1209 | | /* we received a write event */ |
1210 | 0 | if (tcpconn->state==S_CONN_CONNECTING) { |
1211 | | /* we're coming from an async connect & write |
1212 | | * let's see if we connected successfully */ |
1213 | 0 | err_len=sizeof(err); |
1214 | 0 | if (getsockopt(tcpconn->s, SOL_SOCKET, SO_ERROR, &err, &err_len) < 0 || \ |
1215 | 0 | err != 0) { |
1216 | 0 | LM_DBG("Failed connection attempt\n"); |
1217 | 0 | tcpconn_ref(tcpconn); |
1218 | 0 | sh_log(tcpconn->hist, TCP_REF, "tcpconn connect, (%d)", tcpconn->refcnt); |
1219 | 0 | reactor_del_all(tcpconn->s, fd_i, IO_FD_CLOSING); |
1220 | 0 | tcpconn->flags|=F_CONN_REMOVED; |
1221 | 0 | tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE, |
1222 | 0 | "Async connect failed"); |
1223 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "tcpconn connect, (%d)", tcpconn->refcnt); |
1224 | 0 | tcpconn_destroy(tcpconn); |
1225 | 0 | return 0; |
1226 | 0 | } |
1227 | | |
1228 | | /* we successfully connected - further treat this case as if we |
1229 | | * were coming from an async write */ |
1230 | 0 | tcpconn->state = S_CONN_OK; |
1231 | 0 | LM_DBG("Successfully completed previous async connect\n"); |
1232 | | |
1233 | | /* now that we completed the async connection, we also need to |
1234 | | * listen for READ events, otherwise these will get lost */ |
1235 | 0 | if (tcpconn->flags & F_CONN_REMOVED_READ) { |
1236 | 0 | reactor_add_reader( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn); |
1237 | 0 | tcpconn->flags&=~F_CONN_REMOVED_READ; |
1238 | 0 | } |
1239 | |
|
1240 | 0 | goto async_write; |
1241 | 0 | } else { |
1242 | | /* we're coming from an async write - |
1243 | | * just pass to worker and have it write |
1244 | | * our TCP chunks */ |
1245 | 0 | async_write: |
1246 | | /* no more write events for now */ |
1247 | 0 | if (reactor_del_writer( tcpconn->s, fd_i, 0)==-1) |
1248 | 0 | return -1; |
1249 | 0 | tcpconn->flags|=F_CONN_REMOVED_WRITE; |
1250 | 0 | tcpconn_ref(tcpconn); /* refcnt ++ */ |
1251 | 0 | sh_log(tcpconn->hist, TCP_REF, "tcpconn write, (%d)", |
1252 | 0 | tcpconn->refcnt); |
1253 | 0 | sh_log(tcpconn->hist, TCP_SEND2CHILD, "write"); |
1254 | 0 | if (send2worker(tcpconn,IO_WATCH_WRITE)<0){ |
1255 | 0 | LM_ERR("no TCP worker available\n"); |
1256 | 0 | id = tcpconn->id; |
1257 | 0 | TCPCONN_LOCK(id); |
1258 | 0 | tcpconn->refcnt--; |
1259 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "tcpconn write, (%d)", |
1260 | 0 | tcpconn->refcnt); |
1261 | 0 | if (tcpconn->refcnt==0){ |
1262 | 0 | fd=tcpconn->s; |
1263 | 0 | tcp_trigger_report(tcpconn, TCP_REPORT_CLOSE, |
1264 | 0 | "No worker for write"); |
1265 | 0 | _tcpconn_rm(tcpconn,0); |
1266 | 0 | close(fd); |
1267 | 0 | }else tcpconn->lifetime=0; /* force expire*/ |
1268 | 0 | TCPCONN_UNLOCK(id); |
1269 | 0 | } |
1270 | 0 | return 0; |
1271 | 0 | } |
1272 | 0 | } |
1273 | 0 | } |
1274 | | |
1275 | | |
1276 | | /*! \brief handles io from a tcp worker process |
1277 | | * \param tcp_c - pointer in the tcp_workers array, to the entry for |
1278 | | * which an io event was detected |
1279 | | * \param fd_i - fd index in the fd_array (useful for optimizing |
1280 | | * io_watch_deletes) |
1281 | | * \return handle_* return convention: -1 on error, 0 on EAGAIN (no more |
1282 | | * io events queued), >0 on success. success/error refer only to |
1283 | | * the reads from the fd. |
1284 | | */ |
1285 | | inline static int handle_tcp_worker(struct tcp_worker* tcp_c, int fd_i) |
1286 | 0 | { |
1287 | 0 | struct tcp_connection* tcpconn; |
1288 | 0 | long response[2]; |
1289 | 0 | int cmd; |
1290 | 0 | int bytes; |
1291 | |
|
1292 | 0 | if (tcp_c->unix_sock<=0){ |
1293 | | /* (we can't have a fd==0, 0 is never closed )*/ |
1294 | 0 | LM_CRIT("fd %d for %d (pid %d)\n", tcp_c->unix_sock, |
1295 | 0 | (int)(tcp_c-&tcp_workers[0]), tcp_c->pid); |
1296 | 0 | goto error; |
1297 | 0 | } |
1298 | | /* read until sizeof(response) |
1299 | | * (this is a SOCK_STREAM so read is not atomic) */ |
1300 | 0 | bytes=recv_all(tcp_c->unix_sock, response, sizeof(response), MSG_DONTWAIT); |
1301 | 0 | if (bytes<(int)sizeof(response)){ |
1302 | 0 | if (bytes==0){ |
1303 | | /* EOF -> bad, worker has died */ |
1304 | 0 | if (sr_get_core_status()!=STATE_TERMINATING) |
1305 | 0 | LM_CRIT("dead tcp worker %d (EOF received), pid %d\n", |
1306 | 0 | (int)(tcp_c-&tcp_workers[0]), tcp_c->pid ); |
1307 | | /* don't listen on it any more */ |
1308 | 0 | reactor_del_reader( tcp_c->unix_sock, fd_i, 0/*flags*/); |
1309 | | /* eof. so no more io here, it's ok to return error */ |
1310 | 0 | goto error; |
1311 | 0 | }else if (bytes<0){ |
1312 | | /* EAGAIN is ok if we try to empty the buffer |
1313 | | * e.g.: SIGIO_RT overflow mode or EPOLL ET */ |
1314 | 0 | if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){ |
1315 | 0 | LM_CRIT("read from tcp worker %ld (pid %d) %s [%d]\n", |
1316 | 0 | (long)(tcp_c-&tcp_workers[0]), tcp_c->pid, |
1317 | 0 | strerror(errno), errno ); |
1318 | 0 | }else{ |
1319 | 0 | bytes=0; |
1320 | 0 | } |
1321 | | /* try to ignore ? */ |
1322 | 0 | goto end; |
1323 | 0 | }else{ |
1324 | | /* should never happen */ |
1325 | 0 | LM_CRIT("too few bytes received (%d)\n", bytes ); |
1326 | 0 | bytes=0; /* something was read so there is no error; otoh if |
1327 | | receive_fd returned less then requested => the receive |
1328 | | buffer is empty => no more io queued on this fd */ |
1329 | 0 | goto end; |
1330 | 0 | } |
1331 | 0 | } |
1332 | | |
1333 | 0 | LM_DBG("response= %lx, %ld from tcp worker %d (%d)\n", |
1334 | 0 | response[0], response[1], tcp_c->pid, (int)(tcp_c-&tcp_workers[0])); |
1335 | |
|
1336 | 0 | cmd=response[1]; |
1337 | 0 | tcpconn=(struct tcp_connection*)response[0]; |
1338 | 0 | if (tcpconn==0){ |
1339 | | /* should never happen */ |
1340 | 0 | LM_CRIT("null tcpconn pointer received from tcp worker %d (pid %d):" |
1341 | 0 | "%lx, %lx\n", (int)(tcp_c-&tcp_workers[0]), tcp_c->pid, |
1342 | 0 | response[0], response[1]) ; |
1343 | 0 | goto end; |
1344 | 0 | } |
1345 | 0 | switch(cmd){ |
1346 | 0 | case CONN_RELEASE: |
1347 | 0 | if (tcpconn->state==S_CONN_BAD){ |
1348 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release bad, (%d)", tcpconn->refcnt); |
1349 | 0 | tcpconn_destroy(tcpconn); |
1350 | 0 | break; |
1351 | 0 | } |
1352 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release, (%d)", tcpconn->refcnt); |
1353 | 0 | tcpconn_put(tcpconn); |
1354 | | /* must be after the de-ref*/ |
1355 | 0 | reactor_add_reader( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn); |
1356 | 0 | tcpconn->flags&=~F_CONN_REMOVED_READ; |
1357 | 0 | break; |
1358 | 0 | case CONN_RELEASE_WRITE: |
1359 | 0 | if (tcpconn->state==S_CONN_BAD){ |
1360 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release write bad, (%d)", tcpconn->refcnt); |
1361 | 0 | tcpconn_destroy(tcpconn); |
1362 | 0 | break; |
1363 | 0 | } |
1364 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release write, (%d)", tcpconn->refcnt); |
1365 | 0 | tcpconn_put(tcpconn); |
1366 | 0 | break; |
1367 | 0 | case ASYNC_WRITE_TCPW: |
1368 | 0 | if (tcpconn->state==S_CONN_BAD){ |
1369 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "tcpworker async write bad, (%d)", tcpconn->refcnt); |
1370 | 0 | tcpconn_destroy(tcpconn); |
1371 | 0 | break; |
1372 | 0 | } |
1373 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "tcpworker async write, (%d)", tcpconn->refcnt); |
1374 | 0 | tcpconn_put(tcpconn); |
1375 | | /* must be after the de-ref*/ |
1376 | 0 | reactor_add_writer( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn); |
1377 | 0 | tcpconn->flags&=~F_CONN_REMOVED_WRITE; |
1378 | 0 | break; |
1379 | 0 | case CONN_ERROR_TCPW: |
1380 | 0 | case CONN_DESTROY: |
1381 | 0 | case CONN_EOF: |
1382 | | /* WARNING: this will auto-dec. refcnt! */ |
1383 | 0 | if ((tcpconn->flags & F_CONN_REMOVED) != F_CONN_REMOVED && |
1384 | 0 | (tcpconn->s!=-1)){ |
1385 | 0 | reactor_del_all( tcpconn->s, -1, IO_FD_CLOSING); |
1386 | 0 | tcpconn->flags|=F_CONN_REMOVED; |
1387 | 0 | } |
1388 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "tcpworker destroy, (%d)", tcpconn->refcnt); |
1389 | 0 | tcpconn_destroy(tcpconn); /* closes also the fd */ |
1390 | 0 | break; |
1391 | 0 | default: |
1392 | 0 | LM_CRIT("unknown cmd %d from tcp worker %d (%d)\n", |
1393 | 0 | cmd, tcp_c->pid, (int)(tcp_c-&tcp_workers[0])); |
1394 | 0 | } |
1395 | 0 | end: |
1396 | 0 | return bytes; |
1397 | 0 | error: |
1398 | 0 | return -1; |
1399 | 0 | } |
1400 | | |
1401 | | |
1402 | | /*! \brief handles io from a "generic" ser process (get fd or new_fd from a tcp_send) |
1403 | | * |
1404 | | * \param p - pointer in the ser processes array (pt[]), to the entry for |
1405 | | * which an io event was detected |
1406 | | * \param fd_i - fd index in the fd_array (useful for optimizing |
1407 | | * io_watch_deletes) |
1408 | | * \return handle_* return convention: |
1409 | | * - -1 on error reading from the fd, |
1410 | | * - 0 on EAGAIN or when no more io events are queued |
1411 | | * (receive buffer empty), |
1412 | | * - >0 on successful reads from the fd (the receive buffer might |
1413 | | * be non-empty). |
1414 | | */ |
1415 | | inline static int handle_worker(struct process_table* p, int fd_i) |
1416 | 0 | { |
1417 | 0 | struct tcp_connection* tcpconn; |
1418 | 0 | long response[2]; |
1419 | 0 | int cmd; |
1420 | 0 | int bytes; |
1421 | 0 | int ret; |
1422 | 0 | int fd; |
1423 | |
|
1424 | 0 | ret=-1; |
1425 | 0 | if (p->unix_sock<=0){ |
1426 | | /* (we can't have a fd==0, 0 is never closed )*/ |
1427 | 0 | LM_CRIT("fd %d for %d (pid %d)\n", |
1428 | 0 | p->unix_sock, (int)(p-&pt[0]), p->pid); |
1429 | 0 | goto error; |
1430 | 0 | } |
1431 | | |
1432 | | /* get all bytes and the fd (if transmitted) |
1433 | | * (this is a SOCK_STREAM so read is not atomic) */ |
1434 | 0 | bytes=receive_fd(p->unix_sock, response, sizeof(response), &fd, |
1435 | 0 | MSG_DONTWAIT); |
1436 | 0 | if (bytes<(int)sizeof(response)){ |
1437 | | /* too few bytes read */ |
1438 | 0 | if (bytes==0){ |
1439 | | /* EOF -> bad, worker has died */ |
1440 | 0 | if (sr_get_core_status()!=STATE_TERMINATING) |
1441 | 0 | LM_CRIT("dead tcp worker %d (EOF received), pid %d\n", |
1442 | 0 | (int)(p-&pt[0]), p->pid); |
1443 | | /* don't listen on it any more */ |
1444 | 0 | reactor_del_reader( p->unix_sock, fd_i, 0/*flags*/); |
1445 | 0 | goto error; /* worker dead => no further io events from it */ |
1446 | 0 | }else if (bytes<0){ |
1447 | | /* EAGAIN is ok if we try to empty the buffer |
1448 | | * e.g: SIGIO_RT overflow mode or EPOLL ET */ |
1449 | 0 | if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){ |
1450 | 0 | LM_CRIT("read from worker %d (pid %d): %s [%d]\n", |
1451 | 0 | (int)(p-&pt[0]), p->pid, strerror(errno), errno); |
1452 | 0 | ret=-1; |
1453 | 0 | }else{ |
1454 | 0 | ret=0; |
1455 | 0 | } |
1456 | | /* try to ignore ? */ |
1457 | 0 | goto end; |
1458 | 0 | }else{ |
1459 | | /* should never happen */ |
1460 | 0 | LM_CRIT("too few bytes received (%d)\n", bytes ); |
1461 | 0 | ret=0; /* something was read so there is no error; otoh if |
1462 | | receive_fd returned less then requested => the receive |
1463 | | buffer is empty => no more io queued on this fd */ |
1464 | 0 | goto end; |
1465 | 0 | } |
1466 | 0 | } |
1467 | 0 | ret=1; /* something was received, there might be more queued */ |
1468 | 0 | LM_DBG("read response= %lx, %ld, fd %d from %d (%d)\n", |
1469 | 0 | response[0], response[1], fd, (int)(p-&pt[0]), p->pid); |
1470 | 0 | cmd=response[1]; |
1471 | 0 | tcpconn=(struct tcp_connection*)response[0]; |
1472 | 0 | if (tcpconn==0){ |
1473 | 0 | LM_CRIT("null tcpconn pointer received from worker %d (pid %d)" |
1474 | 0 | "%lx, %lx\n", (int)(p-&pt[0]), p->pid, response[0], response[1]) ; |
1475 | 0 | goto end; |
1476 | 0 | } |
1477 | 0 | switch(cmd){ |
1478 | 0 | case CONN_ERROR_GENW: |
1479 | | /* remove from reactor only if the fd exists, and it wasn't |
1480 | | * removed before */ |
1481 | 0 | if ((tcpconn->flags & F_CONN_REMOVED) != F_CONN_REMOVED && |
1482 | 0 | (tcpconn->s!=-1)){ |
1483 | 0 | reactor_del_all( tcpconn->s, -1, IO_FD_CLOSING); |
1484 | 0 | tcpconn->flags|=F_CONN_REMOVED; |
1485 | 0 | } |
1486 | 0 | sh_log(tcpconn->hist, TCP_UNREF, "worker error, (%d)", tcpconn->refcnt); |
1487 | 0 | tcpconn_destroy(tcpconn); /* will close also the fd */ |
1488 | 0 | break; |
1489 | 0 | case CONN_GET_FD: |
1490 | | /* send the requested FD */ |
1491 | | /* WARNING: take care of setting refcnt properly to |
1492 | | * avoid race condition */ |
1493 | 0 | if (send_fd(p->unix_sock, &tcpconn, sizeof(tcpconn), |
1494 | 0 | tcpconn->s)<=0){ |
1495 | 0 | LM_ERR("send_fd failed\n"); |
1496 | 0 | } |
1497 | 0 | break; |
1498 | 0 | case CONN_NEW: |
1499 | | /* update the fd in the requested tcpconn*/ |
1500 | | /* WARNING: take care of setting refcnt properly to |
1501 | | * avoid race condition */ |
1502 | 0 | if (fd==-1){ |
1503 | 0 | LM_CRIT(" cmd CONN_NEW: no fd received\n"); |
1504 | 0 | break; |
1505 | 0 | } |
1506 | 0 | tcpconn->s=fd; |
1507 | | /* add tcpconn to the list*/ |
1508 | 0 | tcpconn_add(tcpconn); |
1509 | 0 | tcp_connections_no++; |
1510 | 0 | reactor_add_reader( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn); |
1511 | 0 | tcpconn->flags&=~F_CONN_REMOVED_READ; |
1512 | 0 | break; |
1513 | 0 | case ASYNC_CONNECT: |
1514 | | /* connection is not yet linked to hash = not yet |
1515 | | * available to the outside world */ |
1516 | 0 | if (fd==-1){ |
1517 | 0 | LM_CRIT(" cmd CONN_NEW: no fd received\n"); |
1518 | 0 | break; |
1519 | 0 | } |
1520 | 0 | tcpconn->s=fd; |
1521 | | /* add tcpconn to the list*/ |
1522 | 0 | tcpconn_add(tcpconn); |
1523 | 0 | tcp_connections_no++; |
1524 | | /* FIXME - now we have lifetime==default_lifetime - should we |
1525 | | * set a shorter one when waiting for a connect ??? */ |
1526 | | /* only maintain the socket in the IO_WATCH_WRITE watcher |
1527 | | * while we have stuff to write - otherwise we're going to get |
1528 | | * useless events */ |
1529 | 0 | reactor_add_writer( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn); |
1530 | 0 | tcpconn->flags&=~F_CONN_REMOVED_WRITE; |
1531 | 0 | break; |
1532 | 0 | case ASYNC_WRITE_GENW: |
1533 | 0 | if (tcpconn->state==S_CONN_BAD){ |
1534 | 0 | tcpconn->lifetime=0; |
1535 | 0 | break; |
1536 | 0 | } |
1537 | 0 | tcpconn_put(tcpconn); |
1538 | | /* must be after the de-ref*/ |
1539 | 0 | reactor_add_writer( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn); |
1540 | 0 | tcpconn->flags&=~F_CONN_REMOVED_WRITE; |
1541 | 0 | break; |
1542 | 0 | default: |
1543 | 0 | LM_CRIT("unknown cmd %d from worker %d (pid %d)\n", cmd, |
1544 | 0 | (int)(p-&pt[0]), p->pid); |
1545 | 0 | } |
1546 | 0 | end: |
1547 | 0 | return ret; |
1548 | 0 | error: |
1549 | 0 | return -1; |
1550 | 0 | } |
1551 | | |
1552 | | |
1553 | | /*! \brief generic handle io routine, it will call the appropiate |
1554 | | * handle_xxx() based on the fd_map type |
1555 | | * |
1556 | | * \param fm - pointer to a fd hash entry |
1557 | | * \param idx - index in the fd_array (or -1 if not known) |
1558 | | * \return -1 on error |
1559 | | * 0 on EAGAIN or when by some other way it is known that no more |
1560 | | * io events are queued on the fd (the receive buffer is empty). |
1561 | | * Usefull to detect when there are no more io events queued for |
1562 | | * sigio_rt, epoll_et, kqueue. |
1563 | | * >0 on successful read from the fd (when there might be more io |
1564 | | * queued -- the receive buffer might still be non-empty) |
1565 | | */ |
1566 | | inline static int handle_io(struct fd_map* fm, int idx,int event_type) |
1567 | 0 | { |
1568 | 0 | int ret = 0; |
1569 | |
|
1570 | 0 | pt_become_active(); |
1571 | 0 | switch(fm->type){ |
1572 | 0 | case F_TCP_LISTENER: |
1573 | 0 | ret = handle_new_connect((const struct socket_info*)fm->data); |
1574 | 0 | break; |
1575 | 0 | case F_TCPCONN: |
1576 | 0 | ret = handle_tcpconn_ev((struct tcp_connection*)fm->data, idx, |
1577 | 0 | event_type); |
1578 | 0 | break; |
1579 | 0 | case F_TCP_TCPWORKER: |
1580 | 0 | ret = handle_tcp_worker((struct tcp_worker*)fm->data, idx); |
1581 | 0 | break; |
1582 | 0 | case F_TCP_WORKER: |
1583 | 0 | ret = handle_worker((struct process_table*)fm->data, idx); |
1584 | 0 | break; |
1585 | 0 | case F_IPC: |
1586 | 0 | ipc_handle_job(fm->fd); |
1587 | 0 | break; |
1588 | 0 | case F_NONE: |
1589 | 0 | LM_CRIT("empty fd map\n"); |
1590 | 0 | goto error; |
1591 | 0 | default: |
1592 | 0 | LM_CRIT("unknown fd type %d\n", fm->type); |
1593 | 0 | goto error; |
1594 | 0 | } |
1595 | 0 | pt_become_idle(); |
1596 | 0 | return ret; |
1597 | 0 | error: |
1598 | 0 | pt_become_idle(); |
1599 | 0 | return -1; |
1600 | 0 | } |
1601 | | |
1602 | | |
1603 | | /* |
1604 | | * iterates through all TCP connections and closes expired ones |
1605 | | * Note: runs once per second at most |
1606 | | */ |
1607 | | #define tcpconn_lifetime(last_sec) \ |
1608 | | do { \ |
1609 | | int now; \ |
1610 | | now = get_ticks(); \ |
1611 | | if (last_sec != now) { \ |
1612 | | last_sec = now; \ |
1613 | | __tcpconn_lifetime(0); \ |
1614 | | } \ |
1615 | | } while (0) |
1616 | | |
1617 | | |
1618 | | /*! \brief very inefficient for now - FIXME |
1619 | | * keep in sync with tcpconn_destroy, the "delete" part should be |
1620 | | * the same except for io_watch_del.. |
1621 | | * \todo FIXME (very inefficient for now) |
1622 | | */ |
1623 | | static inline void __tcpconn_lifetime(int shutdown) |
1624 | 0 | { |
1625 | 0 | struct tcp_connection *c, *next; |
1626 | 0 | unsigned int ticks,part; |
1627 | 0 | unsigned h; |
1628 | 0 | int fd; |
1629 | |
|
1630 | 0 | if (have_ticks()) |
1631 | 0 | ticks=get_ticks(); |
1632 | 0 | else |
1633 | 0 | ticks=0; |
1634 | |
|
1635 | 0 | for( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) { |
1636 | 0 | if (!shutdown) TCPCONN_LOCK(part); /* fixme: we can lock only on delete IMO */ |
1637 | 0 | for(h=0; h<TCP_ID_HASH_SIZE; h++){ |
1638 | 0 | c=TCP_PART(part).tcpconn_id_hash[h]; |
1639 | 0 | while(c){ |
1640 | 0 | next=c->id_next; |
1641 | 0 | if (shutdown ||((c->refcnt==0) && (ticks>c->lifetime))) { |
1642 | 0 | if (!shutdown) |
1643 | 0 | LM_DBG("timeout for hash=%d - %p" |
1644 | 0 | " (%d > %d)\n", h, c, ticks, c->lifetime); |
1645 | 0 | fd=c->s; |
1646 | | /* report the closing of the connection . Note that |
1647 | | * there are connectioned that use an foced expire to 0 |
1648 | | * as a way to be deleted - we are not interested in */ |
1649 | | /* Also, do not trigger reporting when shutdown |
1650 | | * is done */ |
1651 | 0 | if (c->lifetime>0 && !shutdown) |
1652 | 0 | tcp_trigger_report(c, TCP_REPORT_CLOSE, |
1653 | 0 | "Timeout on no traffic"); |
1654 | 0 | if ((!shutdown)&&(fd>0)&&(c->refcnt==0)) { |
1655 | | /* if any of read or write are set, we need to remove |
1656 | | * the fd from the reactor */ |
1657 | 0 | if ((c->flags & F_CONN_REMOVED) != F_CONN_REMOVED){ |
1658 | 0 | reactor_del_all( fd, -1, IO_FD_CLOSING); |
1659 | 0 | c->flags|=F_CONN_REMOVED; |
1660 | 0 | } |
1661 | 0 | close(fd); |
1662 | 0 | c->s = -1; |
1663 | 0 | } |
1664 | 0 | _tcpconn_rm(c, shutdown?1:0); |
1665 | 0 | tcp_connections_no--; |
1666 | 0 | } |
1667 | 0 | c=next; |
1668 | 0 | } |
1669 | 0 | } |
1670 | 0 | if (!shutdown) TCPCONN_UNLOCK(part); |
1671 | 0 | } |
1672 | 0 | } |
1673 | | |
1674 | | |
1675 | | static void tcp_main_server(void) |
1676 | 0 | { |
1677 | 0 | static unsigned int last_sec = 0; |
1678 | 0 | int flags; |
1679 | 0 | struct socket_info_full* sif; |
1680 | 0 | int n; |
1681 | | |
1682 | | /* we run in a separate, dedicated process, with its own reactor |
1683 | | * (reactors are per process) */ |
1684 | 0 | if (init_worker_reactor("TCP_main", RCT_PRIO_MAX)<0) |
1685 | 0 | goto error; |
1686 | | |
1687 | | /* now start watching all the fds */ |
1688 | | |
1689 | | /* add all the sockets we listens on for connections */ |
1690 | 0 | for( n=PROTO_FIRST ; n<PROTO_LAST ; n++ ) |
1691 | 0 | if ( is_tcp_based_proto(n) ) |
1692 | 0 | for( sif=protos[n].listeners ; sif ; sif=sif->next ) { |
1693 | 0 | struct socket_info* si = &sif->socket_info; |
1694 | 0 | if (protos[n].tran.init_listener(si)<0) { |
1695 | 0 | LM_ERR("failed to init listener [%.*s], proto %s\n", |
1696 | 0 | si->name.len, si->name.s, |
1697 | 0 | protos[n].name ); |
1698 | 0 | goto error; |
1699 | 0 | } |
1700 | 0 | if (protos[n].tran.bind_listener && protos[n].tran.bind_listener(si)<0) { |
1701 | 0 | LM_ERR("failed to bind listener [%.*s], proto %s\n", |
1702 | 0 | si->name.len, si->name.s, |
1703 | 0 | protos[n].name ); |
1704 | 0 | goto error; |
1705 | 0 | } |
1706 | 0 | if(reactor_add_reader( si->socket, F_TCP_LISTENER, |
1707 | 0 | RCT_PRIO_NET, si)<0 ) { |
1708 | 0 | LM_ERR("failed to add listen socket to reactor\n"); |
1709 | 0 | goto error; |
1710 | 0 | } |
1711 | 0 | } |
1712 | | /* add all the unix sockets used for communcation with other opensips |
1713 | | * processes (get fd, new connection a.s.o) |
1714 | | * NOTE: we add even the socks for the inactive/unfork processes - the |
1715 | | * socks are already created, but the triggering is from proc to |
1716 | | * main, having them into reactor is harmless - they will never |
1717 | | * trigger as there is no proc on the other end to write us */ |
1718 | 0 | for (n=1; n<counted_max_processes; n++) { |
1719 | | /* skip myslef (as process) and -1 socks (disabled) |
1720 | | (we can't have 0, we never close it!) */ |
1721 | 0 | if (n!=process_no && pt[n].tcp_socks_holder[0]>0) |
1722 | 0 | if (reactor_add_reader( pt[n].tcp_socks_holder[0], F_TCP_WORKER, |
1723 | 0 | RCT_PRIO_PROC, &pt[n])<0){ |
1724 | 0 | LM_ERR("failed to add process %d (%s) unix socket " |
1725 | 0 | "to the fd list\n", n, pt[n].desc); |
1726 | 0 | goto error; |
1727 | 0 | } |
1728 | 0 | } |
1729 | | /* add all the unix sokets used for communication with the tcp workers */ |
1730 | 0 | for (n=0; n<tcp_workers_max_no; n++) { |
1731 | | /*we can't have 0, we never close it!*/ |
1732 | 0 | if (tcp_workers[n].unix_sock>0) { |
1733 | | /* make socket non-blocking */ |
1734 | 0 | flags=fcntl(tcp_workers[n].unix_sock, F_GETFL); |
1735 | 0 | if (flags==-1){ |
1736 | 0 | LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno)); |
1737 | 0 | goto error; |
1738 | 0 | } |
1739 | 0 | if (fcntl(tcp_workers[n].unix_sock,F_SETFL,flags|O_NONBLOCK)==-1){ |
1740 | 0 | LM_ERR("set non-blocking failed: (%d) %s\n", |
1741 | 0 | errno, strerror(errno)); |
1742 | 0 | goto error; |
1743 | 0 | } |
1744 | | /* add socket for listening */ |
1745 | 0 | if (reactor_add_reader( tcp_workers[n].unix_sock, |
1746 | 0 | F_TCP_TCPWORKER, RCT_PRIO_PROC, &tcp_workers[n])<0) { |
1747 | 0 | LM_ERR("failed to add tcp worker %d unix socket to " |
1748 | 0 | "the fd list\n", n); |
1749 | 0 | goto error; |
1750 | 0 | } |
1751 | 0 | } |
1752 | 0 | } |
1753 | | |
1754 | | /* init: start watching for the IPC jobs */ |
1755 | 0 | if (reactor_add_reader(IPC_FD_READ_SELF, F_IPC, RCT_PRIO_ASYNC, NULL)<0){ |
1756 | 0 | LM_CRIT("failed to add IPC pipe to reactor\n"); |
1757 | 0 | goto error; |
1758 | 0 | } |
1759 | | |
1760 | 0 | is_tcp_main = 1; |
1761 | | |
1762 | | /* main loop (requires "handle_io()" implementation) */ |
1763 | 0 | reactor_main_loop( TCP_MAIN_SELECT_TIMEOUT, error, |
1764 | 0 | tcpconn_lifetime(last_sec) ); |
1765 | |
|
1766 | 0 | error: |
1767 | 0 | destroy_worker_reactor(); |
1768 | 0 | LM_CRIT("exiting..."); |
1769 | 0 | exit(-1); |
1770 | 0 | } |
1771 | | |
1772 | | |
1773 | | |
1774 | | /**************************** Control functions ******************************/ |
1775 | | |
1776 | | /* initializes the TCP network level in terms of data structures */ |
1777 | | int tcp_init(void) |
1778 | 0 | { |
1779 | 0 | unsigned int i; |
1780 | | |
1781 | | /* first we do auto-detection to see if there are any TCP based |
1782 | | * protocols loaded */ |
1783 | 0 | for ( i=PROTO_FIRST ; i<PROTO_LAST ; i++ ) |
1784 | 0 | if (is_tcp_based_proto(i) && proto_has_listeners(i)) { |
1785 | 0 | tcp_disabled=0; |
1786 | 0 | break; |
1787 | 0 | } |
1788 | |
|
1789 | 0 | tcp_init_con_profiles(); |
1790 | |
|
1791 | 0 | if (tcp_disabled) |
1792 | 0 | return 0; |
1793 | | |
1794 | | #ifdef DBG_TCPCON |
1795 | | con_hist = shl_init("TCP con", 10000, 0); |
1796 | | if (!con_hist) { |
1797 | | LM_ERR("oom con hist\n"); |
1798 | | goto error; |
1799 | | } |
1800 | | #endif |
1801 | | |
1802 | 0 | if (tcp_auto_scaling_profile) { |
1803 | 0 | s_profile = get_scaling_profile(tcp_auto_scaling_profile); |
1804 | 0 | if (s_profile==NULL) { |
1805 | 0 | LM_WARN("TCP scaling profile <%s> not defined " |
1806 | 0 | "-> ignoring it...\n", tcp_auto_scaling_profile); |
1807 | 0 | } else { |
1808 | 0 | auto_scaling_enabled = 1; |
1809 | 0 | } |
1810 | 0 | } |
1811 | |
|
1812 | 0 | tcp_workers_max_no = (s_profile && (tcp_workers_no<s_profile->max_procs)) ? |
1813 | 0 | s_profile->max_procs : tcp_workers_no ; |
1814 | | |
1815 | | /* init tcp workers array */ |
1816 | 0 | tcp_workers = (struct tcp_worker*)shm_malloc |
1817 | 0 | ( tcp_workers_max_no*sizeof(struct tcp_worker) ); |
1818 | 0 | if (tcp_workers==0) { |
1819 | 0 | LM_CRIT("could not alloc tcp_workers array in shm memory\n"); |
1820 | 0 | goto error; |
1821 | 0 | } |
1822 | 0 | memset( tcp_workers, 0, tcp_workers_max_no*sizeof(struct tcp_worker)); |
1823 | | /* init globals */ |
1824 | 0 | connection_id=(unsigned int*)shm_malloc(sizeof(unsigned int)); |
1825 | 0 | if (connection_id==0){ |
1826 | 0 | LM_CRIT("could not alloc globals in shm memory\n"); |
1827 | 0 | goto error; |
1828 | 0 | } |
1829 | | // The rand() function returns a pseudo-random integer in the range 0 to |
1830 | | // RAND_MAX inclusive (i.e., the mathematical range [0, RAND_MAX]). |
1831 | 0 | *connection_id=(unsigned int)rand(); |
1832 | 0 | memset( &tcp_parts, 0, TCP_PARTITION_SIZE*sizeof(struct tcp_partition)); |
1833 | | /* init partitions */ |
1834 | 0 | for( i=0 ; i<TCP_PARTITION_SIZE ; i++ ) { |
1835 | | /* init lock */ |
1836 | 0 | tcp_parts[i].tcpconn_lock=lock_alloc(); |
1837 | 0 | if (tcp_parts[i].tcpconn_lock==0){ |
1838 | 0 | LM_CRIT("could not alloc lock\n"); |
1839 | 0 | goto error; |
1840 | 0 | } |
1841 | 0 | if (lock_init(tcp_parts[i].tcpconn_lock)==0){ |
1842 | 0 | LM_CRIT("could not init lock\n"); |
1843 | 0 | lock_dealloc((void*)tcp_parts[i].tcpconn_lock); |
1844 | 0 | tcp_parts[i].tcpconn_lock=0; |
1845 | 0 | goto error; |
1846 | 0 | } |
1847 | | /* alloc hashtables*/ |
1848 | 0 | tcp_parts[i].tcpconn_aliases_hash=(struct tcp_conn_alias**) |
1849 | 0 | shm_malloc(TCP_ALIAS_HASH_SIZE* sizeof(struct tcp_conn_alias*)); |
1850 | 0 | if (tcp_parts[i].tcpconn_aliases_hash==0){ |
1851 | 0 | LM_CRIT("could not alloc address hashtable in shm memory\n"); |
1852 | 0 | goto error; |
1853 | 0 | } |
1854 | 0 | tcp_parts[i].tcpconn_id_hash=(struct tcp_connection**) |
1855 | 0 | shm_malloc(TCP_ID_HASH_SIZE*sizeof(struct tcp_connection*)); |
1856 | 0 | if (tcp_parts[i].tcpconn_id_hash==0){ |
1857 | 0 | LM_CRIT("could not alloc id hashtable in shm memory\n"); |
1858 | 0 | goto error; |
1859 | 0 | } |
1860 | | /* init hashtables*/ |
1861 | 0 | memset((void*)tcp_parts[i].tcpconn_aliases_hash, 0, |
1862 | 0 | TCP_ALIAS_HASH_SIZE * sizeof(struct tcp_conn_alias*)); |
1863 | 0 | memset((void*)tcp_parts[i].tcpconn_id_hash, 0, |
1864 | 0 | TCP_ID_HASH_SIZE * sizeof(struct tcp_connection*)); |
1865 | 0 | } |
1866 | | |
1867 | 0 | return 0; |
1868 | 0 | error: |
1869 | | /* clean-up */ |
1870 | 0 | tcp_destroy(); |
1871 | 0 | return -1; |
1872 | 0 | } |
1873 | | |
1874 | | |
1875 | | /* destroys the TCP data */ |
1876 | | void tcp_destroy(void) |
1877 | 0 | { |
1878 | 0 | int part; |
1879 | |
|
1880 | 0 | if (tcp_parts[0].tcpconn_id_hash) |
1881 | | /* force close/expire for all active tcpconns*/ |
1882 | 0 | __tcpconn_lifetime(1); |
1883 | |
|
1884 | 0 | if (connection_id){ |
1885 | 0 | shm_free(connection_id); |
1886 | 0 | connection_id=0; |
1887 | 0 | } |
1888 | |
|
1889 | 0 | for ( part=0 ; part<TCP_PARTITION_SIZE ; part++ ) { |
1890 | 0 | if (tcp_parts[part].tcpconn_id_hash){ |
1891 | 0 | shm_free(tcp_parts[part].tcpconn_id_hash); |
1892 | 0 | tcp_parts[part].tcpconn_id_hash=0; |
1893 | 0 | } |
1894 | 0 | if (tcp_parts[part].tcpconn_aliases_hash){ |
1895 | 0 | shm_free(tcp_parts[part].tcpconn_aliases_hash); |
1896 | 0 | tcp_parts[part].tcpconn_aliases_hash=0; |
1897 | 0 | } |
1898 | 0 | if (tcp_parts[part].tcpconn_lock){ |
1899 | 0 | lock_destroy(tcp_parts[part].tcpconn_lock); |
1900 | 0 | lock_dealloc((void*)tcp_parts[part].tcpconn_lock); |
1901 | 0 | tcp_parts[part].tcpconn_lock=0; |
1902 | 0 | } |
1903 | 0 | } |
1904 | 0 | } |
1905 | | |
1906 | | |
1907 | | int tcp_create_comm_proc_socks( int proc_no) |
1908 | 0 | { |
1909 | 0 | int i; |
1910 | |
|
1911 | 0 | if (tcp_disabled) |
1912 | 0 | return 0; |
1913 | | |
1914 | 0 | for( i=0 ; i<proc_no ; i++ ) { |
1915 | 0 | if (socketpair(AF_UNIX, SOCK_STREAM, 0, pt[i].tcp_socks_holder)<0){ |
1916 | 0 | LM_ERR("socketpair failed for process %d: %d/%s\n", |
1917 | 0 | i, errno, strerror(errno)); |
1918 | 0 | return -1; |
1919 | 0 | } |
1920 | 0 | } |
1921 | | |
1922 | 0 | return 0; |
1923 | 0 | } |
1924 | | |
1925 | | |
1926 | | int tcp_activate_comm_proc_socks( int proc_no) |
1927 | 0 | { |
1928 | 0 | if (tcp_disabled) |
1929 | 0 | return 0; |
1930 | | |
1931 | 0 | unix_tcp_sock = pt[proc_no].tcp_socks_holder[1]; |
1932 | 0 | pt[proc_no].unix_sock = pt[proc_no].tcp_socks_holder[0]; |
1933 | |
|
1934 | 0 | return 0; |
1935 | 0 | } |
1936 | | |
1937 | | |
1938 | | void tcp_connect_proc_to_tcp_main( int proc_no, int worker ) |
1939 | 0 | { |
1940 | 0 | if (tcp_disabled) |
1941 | 0 | return; |
1942 | | |
1943 | 0 | if (worker) { |
1944 | 0 | close( pt[proc_no].unix_sock ); |
1945 | 0 | } else { |
1946 | 0 | unix_tcp_sock = -1; |
1947 | 0 | } |
1948 | 0 | } |
1949 | | |
1950 | | |
1951 | | int _get_own_tcp_worker_id(void) |
1952 | 0 | { |
1953 | 0 | pid_t pid; |
1954 | 0 | int i; |
1955 | |
|
1956 | 0 | pid = getpid(); |
1957 | 0 | for( i=0 ; i<tcp_workers_max_no ; i++) |
1958 | 0 | if(tcp_workers[i].pid==pid) |
1959 | 0 | return i; |
1960 | | |
1961 | 0 | return -1; |
1962 | 0 | } |
1963 | | |
1964 | | |
1965 | | void tcp_reset_worker_slot(void) |
1966 | 0 | { |
1967 | 0 | int i; |
1968 | |
|
1969 | 0 | if ((i=_get_own_tcp_worker_id())>=0) { |
1970 | 0 | tcp_workers[i].state=STATE_INACTIVE; |
1971 | 0 | tcp_workers[i].pid=0; |
1972 | 0 | tcp_workers[i].pt_idx=0; |
1973 | 0 | } |
1974 | 0 | } |
1975 | | |
1976 | | |
1977 | | static int fork_dynamic_tcp_process(void *foo) |
1978 | 0 | { |
1979 | 0 | int p_id; |
1980 | 0 | int r; |
1981 | 0 | const struct internal_fork_params ifp_sr_tcp = { |
1982 | 0 | .proc_desc = "SIP receiver TCP", |
1983 | 0 | .flags = OSS_PROC_DYNAMIC|OSS_PROC_NEEDS_SCRIPT, |
1984 | 0 | .type = TYPE_TCP, |
1985 | 0 | }; |
1986 | | |
1987 | | /* search for free slot in the TCP workers table */ |
1988 | 0 | for( r=0 ; r<tcp_workers_max_no ; r++ ) |
1989 | 0 | if (tcp_workers[r].state==STATE_INACTIVE) |
1990 | 0 | break; |
1991 | |
|
1992 | 0 | if (r==tcp_workers_max_no) { |
1993 | 0 | LM_BUG("trying to fork one more TCP worker but no free slots in " |
1994 | 0 | "the TCP table (size=%d)\n",tcp_workers_max_no); |
1995 | 0 | return -1; |
1996 | 0 | } |
1997 | | |
1998 | 0 | if((p_id=internal_fork(&ifp_sr_tcp))<0){ |
1999 | 0 | LM_ERR("cannot fork dynamic TCP worker process\n"); |
2000 | 0 | return(-1); |
2001 | 0 | }else if (p_id==0){ |
2002 | | /* new TCP process */ |
2003 | 0 | set_proc_attrs("TCP receiver"); |
2004 | 0 | tcp_workers[r].pid = getpid(); |
2005 | |
|
2006 | 0 | if (tcp_worker_proc_reactor_init(tcp_workers[r].main_unix_sock)<0|| |
2007 | 0 | init_child(20000) < 0) { |
2008 | 0 | goto error; |
2009 | 0 | } |
2010 | | |
2011 | 0 | report_conditional_status( 1, 0);/*report success*/ |
2012 | | /* the child proc is done read&write) dealing with the status pipe */ |
2013 | 0 | clean_read_pipeend(); |
2014 | |
|
2015 | 0 | tcp_worker_proc_loop(); |
2016 | 0 | destroy_worker_reactor(); |
2017 | |
|
2018 | 0 | error: |
2019 | 0 | report_failure_status(); |
2020 | 0 | LM_ERR("Initializing new process failed, exiting with error \n"); |
2021 | 0 | pt[process_no].flags |= OSS_PROC_SELFEXIT; |
2022 | 0 | exit( -1); |
2023 | 0 | } else { |
2024 | | /*parent/main*/ |
2025 | 0 | tcp_workers[r].state=STATE_ACTIVE; |
2026 | 0 | tcp_workers[r].n_reqs=0; |
2027 | 0 | tcp_workers[r].pt_idx=p_id; |
2028 | 0 | return p_id; |
2029 | 0 | } |
2030 | | |
2031 | 0 | return 0; |
2032 | 0 | } |
2033 | | |
2034 | | |
2035 | | static void tcp_process_graceful_terminate(int sender, void *param) |
2036 | 0 | { |
2037 | 0 | int i; |
2038 | | |
2039 | | /* we accept this only from the main proccess */ |
2040 | 0 | if (sender!=0) { |
2041 | 0 | LM_BUG("graceful terminate received from a non-main process!!\n"); |
2042 | 0 | return; |
2043 | 0 | } |
2044 | 0 | LM_NOTICE("process %d received RPC to terminate from Main\n",process_no); |
2045 | | |
2046 | | /* going into "draining" state will avoid: |
2047 | | * - getting jobs from TCP MAIN (active state required for that) |
2048 | | * - having othe worker slot re-used (inactive state required for that) */ |
2049 | 0 | if ((i=_get_own_tcp_worker_id())>=0) |
2050 | 0 | tcp_workers[i].state=STATE_DRAINING; |
2051 | |
|
2052 | 0 | tcp_terminate_worker(); |
2053 | |
|
2054 | 0 | return; |
2055 | 0 | } |
2056 | | |
2057 | | |
2058 | | /* counts the number of TCP processes to start with; this number may |
2059 | | * change during runtime due auto-scaling */ |
2060 | | int tcp_count_processes(unsigned int *extra) |
2061 | 0 | { |
2062 | 0 | if (extra) *extra = 0; |
2063 | |
|
2064 | 0 | if (tcp_disabled) |
2065 | 0 | return 0; |
2066 | | |
2067 | | |
2068 | 0 | if (s_profile && extra) { |
2069 | | /* how many can be forked over the number of procs to start with ?*/ |
2070 | 0 | if (s_profile->max_procs > tcp_workers_no) |
2071 | 0 | *extra = s_profile->max_procs - tcp_workers_no; |
2072 | 0 | } |
2073 | |
|
2074 | 0 | return 1/* tcp main */ + tcp_workers_no /*workers to start with*/; |
2075 | 0 | } |
2076 | | |
2077 | | |
2078 | | int tcp_start_processes(int *chd_rank, int *startup_done) |
2079 | 0 | { |
2080 | 0 | int r, n, p_id; |
2081 | 0 | int reader_fd[2]; /* for comm. with the tcp workers read */ |
2082 | 0 | struct socket_info_full *sif; |
2083 | 0 | const struct internal_fork_params ifp_sr_tcp = { |
2084 | 0 | .proc_desc = "SIP receiver TCP", |
2085 | 0 | .flags = OSS_PROC_NEEDS_SCRIPT, |
2086 | 0 | .type = TYPE_TCP, |
2087 | 0 | }; |
2088 | |
|
2089 | 0 | if (tcp_disabled) |
2090 | 0 | return 0; |
2091 | | |
2092 | | /* estimate max fd. no: |
2093 | | * 1 tcp send unix socket/all_proc, |
2094 | | * + 1 udp sock/udp proc + 1 tcp_worker sock/tcp worker* |
2095 | | * + no_listen_tcp */ |
2096 | 0 | for( r=0,n=PROTO_FIRST ; n<PROTO_LAST ; n++ ) |
2097 | 0 | if ( is_tcp_based_proto(n) ) |
2098 | 0 | for(sif=protos[n].listeners; sif ; sif=sif->next,r++ ); |
2099 | | |
2100 | | /* create the socket pairs for ALL potential processes */ |
2101 | 0 | for(r=0; r<tcp_workers_max_no; r++){ |
2102 | | /* create sock to communicate from TCP main to worker */ |
2103 | 0 | if (socketpair(AF_UNIX, SOCK_STREAM, 0, reader_fd)<0){ |
2104 | 0 | LM_ERR("socketpair failed: %s\n", strerror(errno)); |
2105 | 0 | goto error; |
2106 | 0 | } |
2107 | 0 | tcp_workers[r].unix_sock = reader_fd[0]; /* worker's end */ |
2108 | 0 | tcp_workers[r].main_unix_sock = reader_fd[1]; /* main's end */ |
2109 | 0 | } |
2110 | | |
2111 | 0 | if ( auto_scaling_enabled && s_profile && |
2112 | 0 | create_process_group( TYPE_TCP, NULL, s_profile, |
2113 | 0 | fork_dynamic_tcp_process, tcp_process_graceful_terminate)!=0) |
2114 | 0 | LM_ERR("failed to create group of TCP processes for, " |
2115 | 0 | "auto forking will not be possible\n"); |
2116 | | |
2117 | | /* start the TCP workers */ |
2118 | 0 | for(r=0; r<tcp_workers_no; r++){ |
2119 | 0 | (*chd_rank)++; |
2120 | 0 | p_id=internal_fork(&ifp_sr_tcp); |
2121 | 0 | if (p_id<0){ |
2122 | 0 | LM_ERR("fork failed\n"); |
2123 | 0 | goto error; |
2124 | 0 | }else if (p_id>0){ |
2125 | | /* parent */ |
2126 | 0 | tcp_workers[r].state=STATE_ACTIVE; |
2127 | 0 | tcp_workers[r].n_reqs=0; |
2128 | 0 | tcp_workers[r].pt_idx=p_id; |
2129 | 0 | }else{ |
2130 | | /* child */ |
2131 | 0 | set_proc_attrs("TCP receiver"); |
2132 | 0 | tcp_workers[r].pid = getpid(); |
2133 | 0 | if (tcp_worker_proc_reactor_init(tcp_workers[r].main_unix_sock)<0|| |
2134 | 0 | init_child(*chd_rank) < 0) { |
2135 | 0 | LM_ERR("init_children failed\n"); |
2136 | 0 | report_failure_status(); |
2137 | 0 | if (startup_done) |
2138 | 0 | *startup_done = -1; |
2139 | 0 | exit(-1); |
2140 | 0 | } |
2141 | | |
2142 | | /* was startup route executed so far ? */ |
2143 | 0 | if (startup_done!=NULL && *startup_done==0 && r==0) { |
2144 | 0 | LM_DBG("running startup for first TCP\n"); |
2145 | 0 | if(run_startup_route()< 0) { |
2146 | 0 | LM_ERR("Startup route processing failed\n"); |
2147 | 0 | report_failure_status(); |
2148 | 0 | *startup_done = -1; |
2149 | 0 | exit(-1); |
2150 | 0 | } |
2151 | 0 | *startup_done = 1; |
2152 | 0 | } |
2153 | | |
2154 | 0 | report_conditional_status( (!no_daemon_mode), 0); |
2155 | |
|
2156 | 0 | tcp_worker_proc_loop(); |
2157 | 0 | } |
2158 | 0 | } |
2159 | | |
2160 | | /* wait for the startup route to be executed */ |
2161 | 0 | if (startup_done) |
2162 | 0 | while (!(*startup_done)) { |
2163 | 0 | usleep(5); |
2164 | 0 | handle_sigs(); |
2165 | 0 | } |
2166 | |
|
2167 | 0 | return 0; |
2168 | 0 | error: |
2169 | 0 | return -1; |
2170 | 0 | } |
2171 | | |
2172 | | |
2173 | | int tcp_start_listener(void) |
2174 | 0 | { |
2175 | 0 | int p_id; |
2176 | 0 | const struct internal_fork_params ifp_tcp_main = { |
2177 | 0 | .proc_desc = "TCP main", |
2178 | 0 | .flags = 0, |
2179 | 0 | .type = TYPE_NONE, |
2180 | 0 | }; |
2181 | |
|
2182 | 0 | if (tcp_disabled) |
2183 | 0 | return 0; |
2184 | | |
2185 | | /* start the TCP manager process */ |
2186 | 0 | if ( (p_id=internal_fork(&ifp_tcp_main))<0 ) { |
2187 | 0 | LM_CRIT("cannot fork tcp main process\n"); |
2188 | 0 | goto error; |
2189 | 0 | }else if (p_id==0){ |
2190 | | /* child */ |
2191 | | /* close the TCP inter-process sockets */ |
2192 | 0 | close(unix_tcp_sock); |
2193 | 0 | unix_tcp_sock = -1; |
2194 | 0 | close(pt[process_no].unix_sock); |
2195 | 0 | pt[process_no].unix_sock = -1; |
2196 | |
|
2197 | 0 | report_conditional_status( (!no_daemon_mode), 0); |
2198 | |
|
2199 | 0 | tcp_main_server(); |
2200 | 0 | exit(-1); |
2201 | 0 | } |
2202 | | |
2203 | 0 | return 0; |
2204 | 0 | error: |
2205 | 0 | return -1; |
2206 | 0 | } |
2207 | | |
2208 | | int tcp_has_async_write(void) |
2209 | 0 | { |
2210 | 0 | return reactor_has_async(); |
2211 | 0 | } |
2212 | | |
2213 | | |
2214 | | /***************************** MI functions **********************************/ |
2215 | | |
2216 | | mi_response_t *mi_tcp_list_conns(const mi_params_t *params, |
2217 | | struct mi_handler *async_hdl) |
2218 | 0 | { |
2219 | 0 | mi_response_t *resp; |
2220 | 0 | mi_item_t *resp_obj; |
2221 | 0 | mi_item_t *conns_arr, *conn_item; |
2222 | 0 | struct tcp_connection *conn; |
2223 | 0 | time_t _ts; |
2224 | 0 | char date_buf[MI_DATE_BUF_LEN]; |
2225 | 0 | int date_buf_len; |
2226 | 0 | unsigned int i,j,part; |
2227 | 0 | char proto[PROTO_NAME_MAX_SIZE]; |
2228 | 0 | struct tm ltime; |
2229 | 0 | char *p; |
2230 | |
|
2231 | 0 | if (tcp_disabled) |
2232 | 0 | return init_mi_result_null(); |
2233 | | |
2234 | 0 | resp = init_mi_result_object(&resp_obj); |
2235 | 0 | if (!resp) |
2236 | 0 | return 0; |
2237 | | |
2238 | 0 | conns_arr = add_mi_array(resp_obj, MI_SSTR("Connections")); |
2239 | 0 | if (!conns_arr) { |
2240 | 0 | free_mi_response(resp); |
2241 | 0 | return 0; |
2242 | 0 | } |
2243 | | |
2244 | 0 | for( part=0 ; part<TCP_PARTITION_SIZE ; part++) { |
2245 | 0 | TCPCONN_LOCK(part); |
2246 | 0 | for( i=0; i<TCP_ID_HASH_SIZE ; i++ ) { |
2247 | 0 | for(conn=TCP_PART(part).tcpconn_id_hash[i];conn;conn=conn->id_next){ |
2248 | | /* add one object fo each conn */ |
2249 | 0 | conn_item = add_mi_object(conns_arr, 0, 0); |
2250 | 0 | if (!conn_item) |
2251 | 0 | goto error; |
2252 | | |
2253 | | /* add ID */ |
2254 | 0 | if (add_mi_number(conn_item, MI_SSTR("ID"), conn->id) < 0) |
2255 | 0 | goto error; |
2256 | | |
2257 | | /* add type/proto */ |
2258 | 0 | p = proto2str(conn->type, proto); |
2259 | 0 | if (add_mi_string(conn_item, MI_SSTR("Type"), proto, |
2260 | 0 | (int)(long)(p-proto)) > 0) |
2261 | 0 | goto error; |
2262 | | |
2263 | | /* add state */ |
2264 | 0 | if (add_mi_number(conn_item, MI_SSTR("State"), conn->state) < 0) |
2265 | 0 | goto error; |
2266 | | |
2267 | | /* add Remote IP:Port */ |
2268 | 0 | if (add_mi_string_fmt(conn_item, MI_SSTR("Remote"), "%s:%d", |
2269 | 0 | ip_addr2a(&conn->rcv.src_ip), conn->rcv.src_port) < 0) |
2270 | 0 | goto error; |
2271 | | |
2272 | | /* add Local IP:Port */ |
2273 | 0 | if (add_mi_string_fmt(conn_item, MI_SSTR("Local"), "%s:%d", |
2274 | 0 | ip_addr2a(&conn->rcv.dst_ip), conn->rcv.dst_port) < 0) |
2275 | 0 | goto error; |
2276 | | |
2277 | | /* add lifetime */ |
2278 | 0 | _ts = (time_t)conn->lifetime + startup_time; |
2279 | 0 | localtime_r(&_ts, <ime); |
2280 | 0 | date_buf_len = strftime(date_buf, MI_DATE_BUF_LEN - 1, |
2281 | 0 | "%Y-%m-%d %H:%M:%S", <ime); |
2282 | 0 | if (date_buf_len != 0) { |
2283 | 0 | if (add_mi_string(conn_item, MI_SSTR("Lifetime"), |
2284 | 0 | date_buf, date_buf_len) < 0) |
2285 | 0 | goto error; |
2286 | 0 | } else { |
2287 | 0 | if (add_mi_number(conn_item, MI_SSTR("Lifetime"), _ts) < 0) |
2288 | 0 | goto error; |
2289 | 0 | } |
2290 | | |
2291 | | /* add the port-aliases */ |
2292 | 0 | for( j=0 ; j<conn->aliases ; j++ ) |
2293 | | /* add one node for each conn */ |
2294 | 0 | add_mi_number( conn_item, MI_SSTR("Alias port"), |
2295 | 0 | conn->con_aliases[j].port ); |
2296 | 0 | } |
2297 | 0 | } |
2298 | | |
2299 | 0 | TCPCONN_UNLOCK(part); |
2300 | 0 | } |
2301 | | |
2302 | 0 | return resp; |
2303 | | |
2304 | 0 | error: |
2305 | 0 | TCPCONN_UNLOCK(part); |
2306 | 0 | LM_ERR("failed to add MI item\n"); |
2307 | 0 | free_mi_response(resp); |
2308 | 0 | return 0; |
2309 | 0 | } |