/src/opensips/net/proto_tcp/proto_tcp.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (C) 2015 - OpenSIPS Foundation |
3 | | * Copyright (C) 2001-2003 FhG Fokus |
4 | | * |
5 | | * This file is part of opensips, a free SIP server. |
6 | | * |
7 | | * opensips is free software; you can redistribute it and/or modify |
8 | | * it under the terms of the GNU General Public License as published by |
9 | | * the Free Software Foundation; either version 2 of the License, or |
10 | | * (at your option) any later version |
11 | | * |
12 | | * opensips is distributed in the hope that it will be useful, |
13 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
14 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
15 | | * GNU General Public License for more details. |
16 | | * |
17 | | * You should have received a copy of the GNU General Public License |
18 | | * along with this program; if not, write to the Free Software |
19 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
20 | | * |
21 | | * |
22 | | * History: |
23 | | * ------- |
24 | | * 2015-01-09 first version (razvanc) |
25 | | */ |
26 | | |
27 | | #include <errno.h> |
28 | | #include <unistd.h> |
29 | | #include <netinet/tcp.h> |
30 | | #include <poll.h> |
31 | | |
32 | | #include "../../timer.h" |
33 | | #include "../../sr_module.h" |
34 | | #include "../../net/api_proto.h" |
35 | | #include "../../net/api_proto_net.h" |
36 | | #include "../../net/net_tcp.h" |
37 | | #include "../../net/net_tcp_report.h" |
38 | | #include "../../net/trans_trace.h" |
39 | | #include "../../net/tcp_common.h" |
40 | | #include "../../socket_info.h" |
41 | | #include "../../tsend.h" |
42 | | #include "../../trace_api.h" |
43 | | #include "../../net/net_tcp_dbg.h" |
44 | | |
45 | | #include "tcp_common_defs.h" |
46 | | #include "proto_tcp_handler.h" |
47 | | |
48 | 0 | #define F_TCP_CONN_TRACED ( 1 << 0 ) |
49 | 0 | #define TRACE_ON(flags) (t_dst && (*trace_is_on) && \ |
50 | 0 | !(flags & F_CONN_TRACE_DROPPED)) |
51 | | |
52 | | static int mod_init(void); |
53 | | static int proto_tcp_init(struct proto_info *pi); |
54 | | static int proto_tcp_init_listener(struct socket_info *si); |
55 | | static int proto_tcp_send(const struct socket_info* send_sock, |
56 | | char* buf, unsigned int len, const union sockaddr_union* to, |
57 | | unsigned int id); |
58 | | inline static int _tcp_write_on_socket(struct tcp_connection *c, int fd, |
59 | | char *buf, int len); |
60 | | |
61 | | /* buffer to be used for reading all TCP SIP messages |
62 | | detached from the actual con - in order to improve |
63 | | paralelism ( process the SIP message while the con |
64 | | can be sent back to main to do more stuff */ |
65 | | static struct tcp_req tcp_current_req; |
66 | | |
67 | 0 | #define _tcp_common_write _tcp_write_on_socket |
68 | 0 | #define _tcp_common_current_req tcp_current_req |
69 | | #include "tcp_common.h" |
70 | | |
71 | | static int tcp_read_req(struct tcp_connection* con, int* bytes_read); |
72 | | static void tcp_report(int type, unsigned long long conn_id, int conn_flags, |
73 | | void *extra); |
74 | | static mi_response_t *w_tcp_trace_mi(const mi_params_t *params, |
75 | | struct mi_handler *async_hdl); |
76 | | static mi_response_t *w_tcp_trace_mi_1(const mi_params_t *params, |
77 | | struct mi_handler *async_hdl); |
78 | | |
79 | 0 | #define TRACE_PROTO "proto_hep" |
80 | | |
81 | | static str trace_destination_name = {NULL, 0}; |
82 | | trace_dest t_dst; |
83 | | trace_proto_t tprot; |
84 | | |
85 | | /* module tracing parameters */ |
86 | | static int trace_is_on_tmp=0, *trace_is_on; |
87 | | static char* trace_filter_route; |
88 | | static struct script_route_ref* trace_filter_route_ref = NULL; |
89 | | /**/ |
90 | | |
91 | | extern int unix_tcp_sock; |
92 | | |
93 | | /* default port for TCP protocol */ |
94 | | static int tcp_port = SIP_PORT; |
95 | | |
96 | | /* in milliseconds */ |
97 | | static int tcp_send_timeout = 100; |
98 | | |
99 | | /* 1 if TCP connect & write should be async */ |
100 | | static int tcp_async = 1; |
101 | | |
102 | | /* Number of milliseconds that a worker will block waiting for a local |
103 | | * connect - if connect op exceeds this, it will get passed to TCP main*/ |
104 | | static int tcp_async_local_connect_timeout = 100; |
105 | | |
106 | | /* Number of milliseconds that a worker will block waiting for a local |
107 | | * write - if write op exceeds this, it will get passed to TCP main*/ |
108 | | static int tcp_async_local_write_timeout = 10; |
109 | | |
110 | | /* maximum number of write chunks that will be queued per TCP connection - |
111 | | if we exceed this number, we just drop the connection */ |
112 | | static int tcp_async_max_postponed_chunks = 32; |
113 | | |
114 | | static int tcp_max_msg_chunks = TCP_CHILD_MAX_MSG_CHUNK; |
115 | | |
116 | | /* 0: send CRLF pong to incoming CRLFCRLF ping */ |
117 | | static int tcp_crlf_pingpong = 1; |
118 | | |
119 | | /* 0: do not drop single CRLF messages */ |
120 | | static int tcp_crlf_drop = 0; |
121 | | |
122 | | /* if the handling/processing (NOT READING) of the SIP messages should |
123 | | * be done in parallel (after one SIP msg is read, while processing it, |
124 | | * another READ op may be performed) */ |
125 | | static int tcp_parallel_handling = 0; |
126 | | |
127 | | |
128 | | static const cmd_export_t cmds[] = { |
129 | | {"proto_init", (cmd_function)proto_tcp_init, {{0, 0, 0}}, 0}, |
130 | | {0,0,{{0,0,0}},0} |
131 | | }; |
132 | | |
133 | | |
134 | | static const param_export_t params[] = { |
135 | | { "tcp_port", INT_PARAM, &tcp_port }, |
136 | | { "tcp_send_timeout", INT_PARAM, &tcp_send_timeout }, |
137 | | { "tcp_max_msg_chunks", INT_PARAM, &tcp_max_msg_chunks }, |
138 | | { "tcp_crlf_pingpong", INT_PARAM, &tcp_crlf_pingpong }, |
139 | | { "tcp_crlf_drop", INT_PARAM, &tcp_crlf_drop }, |
140 | | { "tcp_async", INT_PARAM, &tcp_async }, |
141 | | { "tcp_async_max_postponed_chunks", INT_PARAM, |
142 | | &tcp_async_max_postponed_chunks }, |
143 | | { "tcp_async_local_connect_timeout", INT_PARAM, |
144 | | &tcp_async_local_connect_timeout}, |
145 | | { "tcp_async_local_write_timeout", INT_PARAM, |
146 | | &tcp_async_local_write_timeout }, |
147 | | { "tcp_parallel_handling", INT_PARAM, |
148 | | &tcp_parallel_handling }, |
149 | | { "trace_destination", STR_PARAM, &trace_destination_name.s}, |
150 | | { "trace_on", INT_PARAM, &trace_is_on_tmp }, |
151 | | { "trace_filter_route", STR_PARAM, &trace_filter_route }, |
152 | | {0, 0, 0} |
153 | | }; |
154 | | |
155 | | static const mi_export_t mi_cmds[] = { |
156 | | { "tcp_trace", 0, 0, 0, { |
157 | | {w_tcp_trace_mi, {0}}, |
158 | | {w_tcp_trace_mi_1, {"trace_mode", 0}}, |
159 | | {EMPTY_MI_RECIPE} |
160 | | } |
161 | | }, |
162 | | {EMPTY_MI_EXPORT} |
163 | | }; |
164 | | |
165 | | /* module dependencies */ |
166 | | static const dep_export_t deps = { |
167 | | { /* OpenSIPS module dependencies */ |
168 | | { MOD_TYPE_DEFAULT, "proto_hep", DEP_SILENT }, |
169 | | { MOD_TYPE_NULL, NULL, 0 } |
170 | | }, |
171 | | { /* modparam dependencies */ |
172 | | { NULL, NULL} |
173 | | } |
174 | | }; |
175 | | |
176 | | struct module_exports proto_tcp_exports = { |
177 | | PROTO_PREFIX "tcp", /* module name*/ |
178 | | MOD_TYPE_DEFAULT,/* class of this module */ |
179 | | MODULE_VERSION, |
180 | | DEFAULT_DLFLAGS, /* dlopen flags */ |
181 | | 0, /* load function */ |
182 | | &deps, /* OpenSIPS module dependencies */ |
183 | | cmds, /* exported functions */ |
184 | | 0, /* exported async functions */ |
185 | | params, /* module parameters */ |
186 | | 0, /* exported statistics */ |
187 | | mi_cmds, /* exported MI functions */ |
188 | | 0, /* exported pseudo-variables */ |
189 | | 0, /* exported transformations */ |
190 | | 0, /* extra processes */ |
191 | | 0, /* module pre-initialization function */ |
192 | | mod_init, /* module initialization function */ |
193 | | 0, /* response function */ |
194 | | 0, /* destroy function */ |
195 | | 0, /* per-child init function */ |
196 | | 0 /* reload confirm function */ |
197 | | }; |
198 | | |
199 | | static int proto_tcp_init(struct proto_info *pi) |
200 | 0 | { |
201 | 0 | pi->id = PROTO_TCP; |
202 | 0 | pi->name = "tcp"; |
203 | 0 | pi->default_port = tcp_port; |
204 | |
|
205 | 0 | pi->tran.init_listener = proto_tcp_init_listener; |
206 | 0 | pi->tran.bind_listener = tcp_bind_listener; |
207 | 0 | pi->tran.send = proto_tcp_send; |
208 | 0 | pi->tran.dst_attr = tcp_conn_fcntl; |
209 | |
|
210 | 0 | pi->net.flags = PROTO_NET_USE_TCP; |
211 | 0 | pi->net.stream.read = tcp_read_req; |
212 | 0 | pi->net.stream.write = tcp_async_write; |
213 | 0 | pi->net.report = tcp_report; |
214 | |
|
215 | 0 | if (tcp_async && !tcp_has_async_write()) { |
216 | 0 | LM_WARN("TCP network layer does not have support for ASYNC write, " |
217 | 0 | "disabling it for TCP plain\n"); |
218 | 0 | tcp_async = 0; |
219 | 0 | } |
220 | | |
221 | | /* without async support, there is nothing to init/clean per conn */ |
222 | 0 | if (tcp_async!=0) { |
223 | | /* be sure the settings are consistent, like having a minimum 2 value |
224 | | * if the tcp_async is enbled */ |
225 | 0 | if (tcp_async_max_postponed_chunks<=1) |
226 | 0 | tcp_async_max_postponed_chunks = 2; |
227 | 0 | pi->net.stream.async_chunks= tcp_async_max_postponed_chunks; |
228 | 0 | } |
229 | |
|
230 | 0 | return 0; |
231 | 0 | } |
232 | | |
233 | | |
234 | | static int mod_init(void) |
235 | 0 | { |
236 | 0 | LM_INFO("initializing TCP-plain protocol\n"); |
237 | 0 | if (trace_destination_name.s) { |
238 | 0 | if ( !net_trace_api ) { |
239 | 0 | if ( trace_prot_bind( TRACE_PROTO, &tprot) < 0 ) { |
240 | 0 | LM_ERR( "can't bind trace protocol <%s>\n", TRACE_PROTO ); |
241 | 0 | return -1; |
242 | 0 | } |
243 | | |
244 | 0 | net_trace_api = &tprot; |
245 | 0 | } else { |
246 | 0 | tprot = *net_trace_api; |
247 | 0 | } |
248 | | |
249 | 0 | trace_destination_name.len = strlen( trace_destination_name.s ); |
250 | |
|
251 | 0 | if ( net_trace_proto_id == -1 ) |
252 | 0 | net_trace_proto_id = tprot.get_message_id( TRANS_TRACE_PROTO_ID ); |
253 | |
|
254 | 0 | t_dst = tprot.get_trace_dest_by_name( &trace_destination_name ); |
255 | 0 | } |
256 | | |
257 | | /* fix route name */ |
258 | 0 | if ( !(trace_is_on = shm_malloc(sizeof(int))) ) { |
259 | 0 | LM_ERR("no more shared memory!\n"); |
260 | 0 | return -1; |
261 | 0 | } |
262 | | |
263 | 0 | *trace_is_on = trace_is_on_tmp; |
264 | 0 | if ( trace_filter_route ) { |
265 | 0 | trace_filter_route_ref = |
266 | 0 | ref_script_route_by_name( trace_filter_route, |
267 | 0 | sroutes->request, RT_NO, REQUEST_ROUTE, 0 ); |
268 | 0 | } |
269 | |
|
270 | 0 | return 0; |
271 | 0 | } |
272 | | |
273 | | |
274 | | static int proto_tcp_init_listener(struct socket_info *si) |
275 | 0 | { |
276 | | /* we do not do anything particular to TCP plain here, so |
277 | | * transparently use the generic listener init from net TCP layer */ |
278 | 0 | return tcp_init_listener(si); |
279 | 0 | } |
280 | | |
281 | | |
282 | | /*! \brief reads next available bytes |
283 | | * \return number of bytes read, 0 on EOF or -1 on error, |
284 | | * on EOF it also sets c->state to S_CONN_EOF |
285 | | * (to distinguish from reads that would block which could return 0) |
286 | | * sets also r->error |
287 | | */ |
288 | | int proto_tcp_read(struct tcp_connection *c,struct tcp_req *r) |
289 | 0 | { |
290 | 0 | int bytes_free, bytes_read; |
291 | 0 | int fd; |
292 | |
|
293 | 0 | fd=c->fd; |
294 | 0 | bytes_free=TCP_BUF_SIZE- (int)(r->pos - r->buf); |
295 | |
|
296 | 0 | if (bytes_free==0){ |
297 | 0 | LM_ERR("buffer overrun, dropping\n"); |
298 | 0 | r->error=TCP_REQ_OVERRUN; |
299 | 0 | return -1; |
300 | 0 | } |
301 | 0 | again: |
302 | 0 | bytes_read=read(fd, r->pos, bytes_free); |
303 | |
|
304 | 0 | if(bytes_read==-1){ |
305 | 0 | if (errno == EWOULDBLOCK || errno == EAGAIN){ |
306 | 0 | return 0; /* nothing has been read */ |
307 | 0 | } else if (errno == EINTR) { |
308 | 0 | goto again; |
309 | 0 | } else if (errno == ECONNRESET) { |
310 | 0 | c->state=S_CONN_EOF; |
311 | 0 | LM_DBG("CONN RESET on %p, FD %d\n", c, fd); |
312 | 0 | bytes_read = 0; |
313 | 0 | } else { |
314 | 0 | LM_ERR("error reading: %s\n",strerror(errno)); |
315 | 0 | r->error=TCP_READ_ERROR; |
316 | 0 | return -1; |
317 | 0 | } |
318 | 0 | }else if (bytes_read==0){ |
319 | 0 | c->state=S_CONN_EOF; |
320 | 0 | LM_DBG("EOF on %p, FD %d\n", c, fd); |
321 | 0 | } |
322 | | #ifdef EXTRA_DEBUG |
323 | | LM_DBG("read %d bytes:\n%.*s\n", bytes_read, bytes_read, r->pos); |
324 | | #endif |
325 | 0 | r->pos+=bytes_read; |
326 | 0 | return bytes_read; |
327 | 0 | } |
328 | | |
329 | | |
330 | | static void tcp_report(int type, unsigned long long conn_id, int conn_flags, |
331 | | void *extra) |
332 | 0 | { |
333 | 0 | str s; |
334 | |
|
335 | 0 | if (type==TCP_REPORT_CLOSE) { |
336 | | /* grab reason text */ |
337 | 0 | if (extra) { |
338 | 0 | s.s = (char*)extra; |
339 | 0 | s.len = strlen (s.s); |
340 | 0 | } |
341 | |
|
342 | 0 | if ( TRACE_ON( conn_flags ) ) { |
343 | 0 | trace_message_atonce( PROTO_TCP, conn_id, NULL/*src*/, NULL/*dst*/, |
344 | 0 | TRANS_TRACE_CLOSED, TRANS_TRACE_SUCCESS, extra?&s:NULL, t_dst ); |
345 | 0 | } |
346 | 0 | } |
347 | |
|
348 | 0 | return; |
349 | 0 | } |
350 | | |
351 | | |
352 | | /************** WRITE related functions ***************/ |
353 | | /* This is just a wrapper around the writing function, so we can use them |
354 | | * internally, but also export them to the "tcp_common" funcs */ |
355 | | inline static int _tcp_write_on_socket(struct tcp_connection *c, int fd, |
356 | | char *buf, int len) |
357 | 0 | { |
358 | 0 | return tcp_write_on_socket(c, fd, buf, len, |
359 | 0 | tcp_send_timeout, tcp_async_local_write_timeout); |
360 | 0 | } |
361 | | |
362 | | |
363 | | /*! \brief Finds a tcpconn & sends on it */ |
364 | | static int proto_tcp_send(const struct socket_info* send_sock, |
365 | | char* buf, unsigned int len, |
366 | | const union sockaddr_union* to, unsigned int id) |
367 | 0 | { |
368 | 0 | struct tcp_connection *c; |
369 | 0 | struct tcp_conn_profile prof; |
370 | 0 | struct ip_addr ip; |
371 | 0 | struct timeval get,snd; |
372 | 0 | union sockaddr_union src_su, dst_su; |
373 | 0 | int port = 0, fd, n, matched; |
374 | |
|
375 | 0 | matched = tcp_con_get_profile(to, &send_sock->su, send_sock->proto, &prof); |
376 | |
|
377 | 0 | reset_tcp_vars(prof.send_threshold); |
378 | 0 | start_expire_timer(get,prof.send_threshold); |
379 | |
|
380 | 0 | if (to){ |
381 | 0 | su2ip_addr(&ip, to); |
382 | 0 | port=su_getport(to); |
383 | 0 | n = tcp_conn_get(id, &ip, port, PROTO_TCP, NULL, &c, &fd, send_sock); |
384 | 0 | }else if (id){ |
385 | 0 | n = tcp_conn_get(id, 0, 0, PROTO_NONE, NULL, &c, &fd, NULL); |
386 | 0 | }else{ |
387 | 0 | LM_CRIT("tcp_send called with null id & to\n"); |
388 | 0 | get_time_difference(get,prof.send_threshold,tcp_timeout_con_get); |
389 | 0 | return -1; |
390 | 0 | } |
391 | | |
392 | 0 | if (n<0) { |
393 | | /* error during conn get, return with error too */ |
394 | 0 | LM_ERR("failed to acquire connection\n"); |
395 | 0 | get_time_difference(get,prof.send_threshold,tcp_timeout_con_get); |
396 | 0 | return -1; |
397 | 0 | } |
398 | | |
399 | | /* was connection found ?? */ |
400 | 0 | if (c==0) { |
401 | 0 | if ((matched && prof.no_new_conn) || (!matched && tcp_no_new_conn)) |
402 | 0 | return -1; |
403 | | |
404 | 0 | if (!to) { |
405 | 0 | LM_ERR("Unknown destination - cannot open new tcp connection\n"); |
406 | 0 | return -1; |
407 | 0 | } |
408 | 0 | LM_DBG("no open tcp connection found, opening new one, async = %d\n", |
409 | 0 | tcp_async); |
410 | | /* create tcp connection */ |
411 | 0 | if (tcp_async) { |
412 | 0 | n = tcp_async_connect(send_sock, to, &prof, |
413 | 0 | tcp_async_local_connect_timeout, &c, &fd, 1); |
414 | 0 | if ( n<0 ) { |
415 | 0 | LM_ERR("async TCP connect failed\n"); |
416 | 0 | get_time_difference(get,prof.send_threshold,tcp_timeout_con_get); |
417 | 0 | return -1; |
418 | 0 | } |
419 | | /* connect succeeded, we have a connection */ |
420 | 0 | LM_DBG( "Successfully connected from interface %s:%d to %s:%d!\n", |
421 | 0 | ip_addr2a( &c->rcv.src_ip ), c->rcv.src_port, |
422 | 0 | ip_addr2a( &c->rcv.dst_ip ), c->rcv.dst_port ); |
423 | |
|
424 | 0 | if (n==0) { |
425 | | /* attach the write buffer to it */ |
426 | 0 | if (tcp_async_add_chunk(c, buf, len, 1) < 0) { |
427 | 0 | LM_ERR("Failed to add the initial write chunk\n"); |
428 | 0 | len = -1; /* report an error - let the caller decide what to do */ |
429 | 0 | } |
430 | | |
431 | | /* trace the message */ |
432 | 0 | if ( TRACE_ON( c->flags ) && |
433 | 0 | check_trace_route( trace_filter_route_ref, c) ) { |
434 | 0 | if ( tcpconn2su( c, &src_su, &dst_su) < 0 ) { |
435 | 0 | LM_ERR("can't create su structures for tracing!\n"); |
436 | 0 | } else { |
437 | 0 | trace_message_atonce( PROTO_TCP, c->cid, |
438 | 0 | &src_su, &dst_su, |
439 | 0 | TRANS_TRACE_CONNECT_START, TRANS_TRACE_SUCCESS, |
440 | 0 | &AS_CONNECT_INIT, t_dst ); |
441 | 0 | } |
442 | 0 | } |
443 | | |
444 | | /* mark the ID of the used connection (tracing purposes) */ |
445 | 0 | last_outgoing_tcp_id = c->id; |
446 | 0 | send_sock->last_real_ports->local = c->rcv.dst_port; |
447 | 0 | send_sock->last_real_ports->remote = c->rcv.src_port; |
448 | | |
449 | | /* connect is still in progress, break the sending |
450 | | * flow now (the actual write will be done when |
451 | | * connect will be completed */ |
452 | 0 | LM_DBG("Successfully started async connection \n"); |
453 | 0 | sh_log(c->hist, TCP_SEND2MAIN, "send 1, (%d)", c->refcnt); |
454 | 0 | tcp_conn_release(c, 0); |
455 | 0 | return len; |
456 | 0 | } |
457 | | |
458 | 0 | LM_DBG("First connect attempt succeeded in less than %d ms, " |
459 | 0 | "proceed to writing \n",tcp_async_local_connect_timeout); |
460 | | /* our first connect attempt succeeded - go ahead as normal */ |
461 | | /* trace the attempt */ |
462 | 0 | if ( TRACE_ON( c->flags ) && |
463 | 0 | check_trace_route( trace_filter_route_ref, c) ) { |
464 | 0 | c->proto_flags |= F_TCP_CONN_TRACED; |
465 | 0 | if ( tcpconn2su( c, &src_su, &dst_su) < 0 ) { |
466 | 0 | LM_ERR("can't create su structures for tracing!\n"); |
467 | 0 | } else { |
468 | 0 | trace_message_atonce( PROTO_TCP, c->cid, &src_su, &dst_su, |
469 | 0 | TRANS_TRACE_CONNECTED, TRANS_TRACE_SUCCESS, |
470 | 0 | &ASYNC_CONNECT_OK, t_dst ); |
471 | 0 | } |
472 | 0 | } |
473 | 0 | } else { |
474 | 0 | if ((c=tcp_sync_connect(send_sock, to, &prof, &fd, 1))==0) { |
475 | 0 | LM_ERR("connect failed\n"); |
476 | 0 | get_time_difference(get,prof.send_threshold,tcp_timeout_con_get); |
477 | 0 | return -1; |
478 | 0 | } |
479 | | |
480 | 0 | if ( TRACE_ON( c->flags ) && |
481 | 0 | check_trace_route( trace_filter_route_ref, c) ) { |
482 | 0 | c->proto_flags |= F_TCP_CONN_TRACED; |
483 | 0 | if ( tcpconn2su( c, &src_su, &dst_su) < 0 ) { |
484 | 0 | LM_ERR("can't create su structures for tracing!\n"); |
485 | 0 | } else { |
486 | 0 | trace_message_atonce( PROTO_TCP, c->cid, &src_su, &dst_su, |
487 | 0 | TRANS_TRACE_CONNECTED, TRANS_TRACE_SUCCESS, |
488 | 0 | &CONNECT_OK, t_dst ); |
489 | 0 | } |
490 | 0 | } |
491 | |
|
492 | 0 | LM_DBG( "Successfully connected from interface %s:%d to %s:%d!\n", |
493 | 0 | ip_addr2a( &c->rcv.src_ip ), c->rcv.src_port, |
494 | 0 | ip_addr2a( &c->rcv.dst_ip ), c->rcv.dst_port ); |
495 | 0 | } |
496 | | |
497 | 0 | goto send_it; |
498 | 0 | } |
499 | | |
500 | 0 | if ( !(c->proto_flags & F_TCP_CONN_TRACED) ) { |
501 | | /* most probably it's an async connect */ |
502 | 0 | if ( TRACE_ON( c->flags ) ) { |
503 | 0 | trace_message_atonce( PROTO_TCP, c->cid, 0, 0, |
504 | 0 | TRANS_TRACE_CONNECTED, TRANS_TRACE_SUCCESS, |
505 | 0 | &CONNECT_OK, t_dst ); |
506 | 0 | } |
507 | |
|
508 | 0 | c->proto_flags |= F_TCP_CONN_TRACED; |
509 | 0 | } |
510 | |
|
511 | 0 | get_time_difference(get,prof.send_threshold,tcp_timeout_con_get); |
512 | | |
513 | | /* now we have a connection, let's see what we can do with it */ |
514 | | /* BE CAREFUL now as we need to release the conn before exiting !!! */ |
515 | 0 | if (fd==-1) { |
516 | | /* connection is not writable because of its state - can we append |
517 | | * data to it for later writting (async writting)? */ |
518 | 0 | if (c->state==S_CONN_CONNECTING) { |
519 | | /* the connection is currently in the process of getting |
520 | | * connected - let's append our send chunk as well - just in |
521 | | * case we ever manage to get through */ |
522 | 0 | LM_DBG("We have acquired a TCP connection which is still " |
523 | 0 | "pending to connect - delaying write \n"); |
524 | 0 | n = tcp_async_add_chunk(c,buf,len,1); |
525 | 0 | if (n < 0) { |
526 | 0 | LM_ERR("Failed to add another write chunk to %p\n",c); |
527 | | /* we failed due to internal errors - put the |
528 | | * connection back */ |
529 | 0 | sh_log(c->hist, TCP_SEND2MAIN, "send 2, (%d)", c->refcnt); |
530 | 0 | tcp_conn_release(c, 0); |
531 | 0 | return -1; |
532 | 0 | } |
533 | | |
534 | | /* mark the ID of the used connection (tracing purposes) */ |
535 | 0 | last_outgoing_tcp_id = c->id; |
536 | 0 | send_sock->last_real_ports->local = c->rcv.dst_port; |
537 | 0 | send_sock->last_real_ports->remote = c->rcv.src_port; |
538 | | |
539 | | /* we successfully added our write chunk - success */ |
540 | 0 | sh_log(c->hist, TCP_SEND2MAIN, "send 3, (%d)", c->refcnt); |
541 | 0 | tcp_conn_release(c, 0); |
542 | 0 | return len; |
543 | 0 | } else { |
544 | | /* return error, nothing to do about it */ |
545 | 0 | sh_log(c->hist, TCP_SEND2MAIN, "send 4, (%d)", c->refcnt); |
546 | 0 | tcp_conn_release(c, 0); |
547 | 0 | return -1; |
548 | 0 | } |
549 | 0 | } |
550 | | |
551 | | |
552 | 0 | send_it: |
553 | 0 | LM_DBG("sending via fd %d...\n",fd); |
554 | |
|
555 | 0 | start_expire_timer(snd,prof.send_threshold); |
556 | |
|
557 | 0 | n = tcp_write_on_socket(c, fd, buf, len, |
558 | 0 | tcp_send_timeout, tcp_async_local_write_timeout); |
559 | |
|
560 | 0 | get_time_difference(snd,prof.send_threshold,tcp_timeout_send); |
561 | 0 | stop_expire_timer(get,prof.send_threshold,"tcp ops",buf,(int)len,1); |
562 | |
|
563 | 0 | tcp_conn_reset_lifetime(c); |
564 | |
|
565 | 0 | LM_DBG("after write: c= %p n/len=%d/%d fd=%d\n",c, n, len, fd); |
566 | | /* LM_DBG("buf=\n%.*s\n", (int)len, buf); */ |
567 | 0 | if (n<0){ |
568 | 0 | LM_ERR("failed to send\n"); |
569 | 0 | c->state=S_CONN_BAD; |
570 | 0 | if (c->proc_id != process_no) |
571 | 0 | close(fd); |
572 | |
|
573 | 0 | sh_log(c->hist, TCP_SEND2MAIN, "send 5, (%d)", c->refcnt); |
574 | 0 | tcp_conn_release(c, 0); |
575 | 0 | return -1; |
576 | 0 | } |
577 | | |
578 | | /* only close the FD if not already in the context of our process |
579 | | either we just connected, or main sent us the FD */ |
580 | 0 | if (c->proc_id != process_no) |
581 | 0 | close(fd); |
582 | | |
583 | | /* mark the ID of the used connection (tracing purposes) */ |
584 | 0 | last_outgoing_tcp_id = c->id; |
585 | 0 | send_sock->last_real_ports->local = c->rcv.dst_port; |
586 | 0 | send_sock->last_real_ports->remote = c->rcv.src_port; |
587 | |
|
588 | 0 | sh_log(c->hist, TCP_SEND2MAIN, "send 6, (%d, async: %d)", c->refcnt, n < len); |
589 | 0 | tcp_conn_release(c, (n<len)?1:0/*pending data in async mode?*/ ); |
590 | 0 | return n; |
591 | 0 | } |
592 | | |
593 | | |
594 | | |
595 | | /************** READ related functions ***************/ |
596 | | |
597 | | /*! \brief reads next available bytes |
598 | | * \return number of bytes read, 0 on EOF or -1 on error, |
599 | | * on EOF it also sets c->state to S_CONN_EOF |
600 | | * (to distinguish from reads that would block which could return 0) |
601 | | * sets also r->error |
602 | | */ |
603 | | int tcp_read(struct tcp_connection *c,struct tcp_req *r) |
604 | 0 | { |
605 | 0 | int bytes_free, bytes_read; |
606 | 0 | int fd; |
607 | |
|
608 | 0 | fd=c->fd; |
609 | 0 | bytes_free=TCP_BUF_SIZE- (int)(r->pos - r->buf); |
610 | |
|
611 | 0 | if (bytes_free==0){ |
612 | 0 | LM_ERR("buffer overrun, dropping\n"); |
613 | 0 | r->error=TCP_REQ_OVERRUN; |
614 | 0 | return -1; |
615 | 0 | } |
616 | 0 | again: |
617 | 0 | bytes_read=read(fd, r->pos, bytes_free); |
618 | |
|
619 | 0 | if(bytes_read==-1){ |
620 | 0 | if (errno == EWOULDBLOCK || errno == EAGAIN){ |
621 | 0 | return 0; /* nothing has been read */ |
622 | 0 | } else if (errno == EINTR) { |
623 | 0 | goto again; |
624 | 0 | } else if (errno == ECONNRESET) { |
625 | 0 | c->state=S_CONN_EOF; |
626 | 0 | LM_DBG("CONN RESET on %p, FD %d\n", c, fd); |
627 | 0 | bytes_read = 0; |
628 | 0 | } else { |
629 | 0 | LM_ERR("error reading: %s\n",strerror(errno)); |
630 | 0 | r->error=TCP_READ_ERROR; |
631 | 0 | return -1; |
632 | 0 | } |
633 | 0 | }else if (bytes_read==0){ |
634 | 0 | c->state=S_CONN_EOF; |
635 | 0 | LM_DBG("EOF on %p, FD %d\n", c, fd); |
636 | 0 | } |
637 | | #ifdef EXTRA_DEBUG |
638 | | LM_DBG("read %d bytes:\n%.*s\n", bytes_read, bytes_read, r->pos); |
639 | | #endif |
640 | 0 | r->pos+=bytes_read; |
641 | 0 | return bytes_read; |
642 | 0 | } |
643 | | |
644 | | |
645 | | /* Responsible for reading the request |
646 | | * * if returns >= 0 : the connection will be released |
647 | | * * if returns < 0 : the connection will be released as BAD / broken |
648 | | */ |
649 | | static int tcp_read_req(struct tcp_connection* con, int* bytes_read) |
650 | 0 | { |
651 | 0 | int bytes, rc; |
652 | 0 | int total_bytes; |
653 | 0 | struct tcp_req* req; |
654 | |
|
655 | 0 | union sockaddr_union src_su, dst_su; |
656 | |
|
657 | 0 | if ( !(con->proto_flags & F_TCP_CONN_TRACED)) { |
658 | 0 | con->proto_flags |= F_TCP_CONN_TRACED; |
659 | |
|
660 | 0 | LM_DBG("Accepted connection from %s:%d on interface %s:%d!\n", |
661 | 0 | ip_addr2a( &con->rcv.src_ip ), con->rcv.src_port, |
662 | 0 | ip_addr2a( &con->rcv.dst_ip ), con->rcv.dst_port ); |
663 | |
|
664 | 0 | if ( TRACE_ON( con->flags ) && |
665 | 0 | check_trace_route( trace_filter_route_ref, con) ) { |
666 | 0 | if ( tcpconn2su( con, &src_su, &dst_su) < 0 ) { |
667 | 0 | LM_ERR("can't create su structures for tracing!\n"); |
668 | 0 | } else { |
669 | 0 | trace_message_atonce( PROTO_TCP, con->cid, &src_su, &dst_su, |
670 | 0 | TRANS_TRACE_ACCEPTED, TRANS_TRACE_SUCCESS, |
671 | 0 | &ACCEPT_OK, t_dst ); |
672 | 0 | } |
673 | 0 | } |
674 | 0 | } |
675 | |
|
676 | 0 | bytes=-1; |
677 | 0 | total_bytes=0; |
678 | |
|
679 | 0 | if (con->con_req) { |
680 | 0 | req=con->con_req; |
681 | 0 | LM_DBG("Using the per connection buff for conn %p\n",con); |
682 | 0 | } else { |
683 | 0 | LM_DBG("Using the global ( per process ) buff for conn %p\n",con); |
684 | 0 | init_tcp_req(&tcp_current_req, 0); |
685 | 0 | req=&tcp_current_req; |
686 | 0 | } |
687 | |
|
688 | 0 | again: |
689 | 0 | if(req->error==TCP_REQ_OK){ |
690 | | /* if we still have some unparsed part, parse it first, |
691 | | * don't do the read*/ |
692 | 0 | if (req->parsed<req->pos){ |
693 | 0 | bytes=0; |
694 | 0 | }else{ |
695 | 0 | bytes=tcp_read(con,req); |
696 | 0 | if (bytes<0) { |
697 | 0 | LM_ERR("failed to read \n"); |
698 | 0 | goto error; |
699 | 0 | } |
700 | 0 | } |
701 | | |
702 | 0 | tcp_parse_headers(req, tcp_crlf_pingpong, tcp_crlf_drop); |
703 | | #ifdef EXTRA_DEBUG |
704 | | /* if timeout state=0; goto end__req; */ |
705 | | LM_DBG("read= %d bytes, parsed=%d, state=%d, error=%d\n", |
706 | | bytes, (int)(req->parsed-req->start), req->state, |
707 | | req->error ); |
708 | | LM_DBG("last char=0x%02X, parsed msg=\n%.*s\n", |
709 | | *(req->parsed-1), (int)(req->parsed-req->start), |
710 | | req->start); |
711 | | #endif |
712 | 0 | total_bytes+=bytes; |
713 | | /* eof check: |
714 | | * is EOF if eof on fd and req. not complete yet, |
715 | | * if req. is complete we might have a second unparsed |
716 | | * request after it, so postpone release_with_eof |
717 | | */ |
718 | 0 | if ((con->state==S_CONN_EOF) && (req->complete==0)) { |
719 | 0 | LM_DBG("EOF received\n"); |
720 | 0 | goto done; |
721 | 0 | } |
722 | 0 | } |
723 | | |
724 | 0 | if (req->error!=TCP_REQ_OK){ |
725 | 0 | LM_ERR("bad request, state=%d, error=%d " |
726 | 0 | "buf:\n%.*s\nparsed:\n%.*s\n", req->state, req->error, |
727 | 0 | (int)(req->pos-req->buf), req->buf, |
728 | 0 | (int)(req->parsed-req->start), req->start); |
729 | 0 | LM_DBG("- received from: port %d\n", con->rcv.src_port); |
730 | 0 | print_ip("- received from: ip ",&con->rcv.src_ip, "\n"); |
731 | 0 | goto error; |
732 | 0 | } |
733 | | |
734 | 0 | int parallel_handling = tcp_is_default_profile(con->profile) ? |
735 | 0 | tcp_parallel_handling : (con->profile.parallel_read == 2); |
736 | 0 | int max_chunks = tcp_attr_isset(con, TCP_ATTR_MAX_MSG_CHUNKS) ? |
737 | 0 | con->profile.attrs[TCP_ATTR_MAX_MSG_CHUNKS] : tcp_max_msg_chunks; |
738 | |
|
739 | 0 | switch ((rc = tcp_handle_req(req,con,max_chunks,parallel_handling))){ |
740 | 0 | case 1: |
741 | 0 | goto again; |
742 | 0 | case -1: |
743 | 0 | goto error; |
744 | 0 | } |
745 | | |
746 | 0 | LM_DBG("tcp_read_req end for conn %p, req is %p\n",con,con->con_req); |
747 | 0 | done: |
748 | 0 | if (bytes_read) *bytes_read=total_bytes; |
749 | |
|
750 | 0 | return rc == 2 ? 1 /* connection is already released! */ |
751 | 0 | /* 0,1? */: 0; /* connection will be released */ |
752 | 0 | error: |
753 | | /* connection will be released as ERROR */ |
754 | 0 | return -1; |
755 | 0 | } |
756 | | |
757 | | static mi_response_t *w_tcp_trace_mi(const mi_params_t *mi_params, |
758 | | struct mi_handler *async_hdl) |
759 | 0 | { |
760 | 0 | mi_response_t *resp; |
761 | 0 | mi_item_t *resp_obj; |
762 | |
|
763 | 0 | resp = init_mi_result_object(&resp_obj); |
764 | 0 | if (!resp) |
765 | 0 | return 0; |
766 | | |
767 | 0 | if (add_mi_string_fmt(resp_obj, MI_SSTR("TCP tracing"), "%s", |
768 | 0 | *trace_is_on ? "on" : "off") < 0) { |
769 | 0 | free_mi_response(resp); |
770 | 0 | return 0; |
771 | 0 | } |
772 | | |
773 | 0 | return resp; |
774 | 0 | } |
775 | | |
776 | | static mi_response_t *w_tcp_trace_mi_1(const mi_params_t *mi_params, |
777 | | struct mi_handler *async_hdl) |
778 | 0 | { |
779 | 0 | str new_mode; |
780 | |
|
781 | 0 | if (get_mi_string_param(mi_params, "trace_mode", &new_mode.s, &new_mode.len) < 0) |
782 | 0 | return init_mi_param_error(); |
783 | | |
784 | 0 | if ((new_mode.s[0] | 0x20) == 'o' && |
785 | 0 | (new_mode.s[1] | 0x20) == 'n' ) { |
786 | 0 | *trace_is_on = 1; |
787 | 0 | return init_mi_result_ok(); |
788 | 0 | } else if ((new_mode.s[0] | 0x20) == 'o' && |
789 | 0 | (new_mode.s[1] | 0x20) == 'f' && |
790 | 0 | (new_mode.s[2] | 0x20) == 'f') { |
791 | 0 | *trace_is_on = 0; |
792 | 0 | return init_mi_result_ok(); |
793 | 0 | } else { |
794 | 0 | return init_mi_error_extra(JSONRPC_INVAL_PARAMS_CODE, |
795 | 0 | MI_SSTR(JSONRPC_INVAL_PARAMS_MSG), |
796 | 0 | MI_SSTR("trace_mode should be 'on' or 'off'")); |
797 | 0 | } |
798 | 0 | } |