/src/pjsip/pjlib/src/pj/activesock.c
Line | Count | Source |
1 | | /* |
2 | | * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) |
3 | | * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> |
4 | | * |
5 | | * This program is free software; you can redistribute it and/or modify |
6 | | * it under the terms of the GNU General Public License as published by |
7 | | * the Free Software Foundation; either version 2 of the License, or |
8 | | * (at your option) any later version. |
9 | | * |
10 | | * This program is distributed in the hope that it will be useful, |
11 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | * GNU General Public License for more details. |
14 | | * |
15 | | * You should have received a copy of the GNU General Public License |
16 | | * along with this program; if not, write to the Free Software |
17 | | * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
18 | | */ |
19 | | #include <pj/activesock.h> |
20 | | #include <pj/compat/socket.h> |
21 | | #include <pj/assert.h> |
22 | | #include <pj/errno.h> |
23 | | #include <pj/log.h> |
24 | | #include <pj/pool.h> |
25 | | #include <pj/sock.h> |
26 | | #include <pj/string.h> |
27 | | |
28 | | #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
29 | | PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
30 | | # include <CFNetwork/CFNetwork.h> |
31 | | |
32 | | static pj_bool_t ios_bg_support = PJ_TRUE; |
33 | | #endif |
34 | | |
35 | 0 | #define PJ_ACTIVESOCK_MAX_LOOP 50 |
36 | | |
37 | | |
38 | | enum read_type |
39 | | { |
40 | | TYPE_NONE, |
41 | | TYPE_RECV, |
42 | | TYPE_RECV_FROM |
43 | | }; |
44 | | |
45 | | enum shutdown_dir |
46 | | { |
47 | | SHUT_NONE = 0, |
48 | | SHUT_RX = 1, |
49 | | SHUT_TX = 2 |
50 | | }; |
51 | | |
52 | | struct read_op |
53 | | { |
54 | | pj_ioqueue_op_key_t op_key; |
55 | | pj_uint8_t *pkt; |
56 | | unsigned max_size; |
57 | | pj_size_t size; |
58 | | pj_sockaddr src_addr; |
59 | | int src_addr_len; |
60 | | }; |
61 | | |
62 | | struct accept_op |
63 | | { |
64 | | pj_ioqueue_op_key_t op_key; |
65 | | pj_sock_t new_sock; |
66 | | pj_sockaddr rem_addr; |
67 | | int rem_addr_len; |
68 | | }; |
69 | | |
70 | | struct send_data |
71 | | { |
72 | | pj_uint8_t *data; |
73 | | pj_ssize_t len; |
74 | | pj_ssize_t sent; |
75 | | unsigned flags; |
76 | | }; |
77 | | |
78 | | struct pj_activesock_t |
79 | | { |
80 | | pj_ioqueue_key_t *key; |
81 | | pj_bool_t stream_oriented; |
82 | | pj_bool_t whole_data; |
83 | | pj_ioqueue_t *ioqueue; |
84 | | void *user_data; |
85 | | unsigned async_count; |
86 | | unsigned shutdown; |
87 | | unsigned max_loop; |
88 | | pj_activesock_cb cb; |
89 | | #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
90 | | PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
91 | | int bg_setting; |
92 | | pj_sock_t sock; |
93 | | CFReadStreamRef readStream; |
94 | | #endif |
95 | | |
96 | | unsigned err_counter; |
97 | | pj_status_t last_err; |
98 | | |
99 | | struct send_data send_data; |
100 | | |
101 | | struct read_op *read_op; |
102 | | pj_uint32_t read_flags; |
103 | | enum read_type read_type; |
104 | | |
105 | | struct accept_op *accept_op; |
106 | | }; |
107 | | |
108 | | |
109 | | static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, |
110 | | pj_ioqueue_op_key_t *op_key, |
111 | | pj_ssize_t bytes_read); |
112 | | static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, |
113 | | pj_ioqueue_op_key_t *op_key, |
114 | | pj_ssize_t bytes_sent); |
115 | | #if PJ_HAS_TCP |
116 | | static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, |
117 | | pj_ioqueue_op_key_t *op_key, |
118 | | pj_sock_t sock, |
119 | | pj_status_t status); |
120 | | static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, |
121 | | pj_status_t status); |
122 | | #endif |
123 | | |
124 | | PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg) |
125 | 0 | { |
126 | 0 | pj_bzero(cfg, sizeof(*cfg)); |
127 | 0 | cfg->async_cnt = 1; |
128 | 0 | cfg->concurrency = -1; |
129 | 0 | cfg->whole_data = PJ_TRUE; |
130 | 0 | cfg->sock_cloexec = PJ_TRUE; |
131 | 0 | } |
132 | | |
133 | | #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
134 | | PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
135 | | static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock) |
136 | | { |
137 | | if (asock->readStream) { |
138 | | CFReadStreamClose(asock->readStream); |
139 | | CFRelease(asock->readStream); |
140 | | asock->readStream = NULL; |
141 | | } |
142 | | } |
143 | | |
144 | | static void activesock_create_iphone_os_stream(pj_activesock_t *asock) |
145 | | { |
146 | | #if (defined(__IPHONE_OS_VERSION_MIN_REQUIRED) && \ |
147 | | __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0) |
148 | | |
149 | | if (ios_bg_support && asock->bg_setting && asock->stream_oriented) { |
150 | | activesock_destroy_iphone_os_stream(asock); |
151 | | |
152 | | CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock, |
153 | | &asock->readStream, NULL); |
154 | | |
155 | | if (!asock->readStream || |
156 | | CFReadStreamSetProperty(asock->readStream, |
157 | | kCFStreamNetworkServiceType, |
158 | | kCFStreamNetworkServiceTypeVoIP) |
159 | | != TRUE || |
160 | | CFReadStreamOpen(asock->readStream) != TRUE) |
161 | | { |
162 | | PJ_LOG(2,("", "Failed to configure TCP transport for VoIP " |
163 | | "usage. Usage of THIS particular TCP transport in " |
164 | | "background mode will not be supported.")); |
165 | | |
166 | | |
167 | | activesock_destroy_iphone_os_stream(asock); |
168 | | } |
169 | | } |
170 | | |
171 | | #endif |
172 | | } |
173 | | |
174 | | |
175 | | PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock, |
176 | | int val) |
177 | | { |
178 | | asock->bg_setting = val; |
179 | | if (asock->bg_setting) |
180 | | activesock_create_iphone_os_stream(asock); |
181 | | else |
182 | | activesock_destroy_iphone_os_stream(asock); |
183 | | } |
184 | | |
185 | | PJ_DEF(void) pj_activesock_enable_iphone_os_bg(pj_bool_t val) |
186 | | { |
187 | | ios_bg_support = val; |
188 | | } |
189 | | #endif |
190 | | |
191 | | PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool, |
192 | | pj_sock_t sock, |
193 | | int sock_type, |
194 | | const pj_activesock_cfg *opt, |
195 | | pj_ioqueue_t *ioqueue, |
196 | | const pj_activesock_cb *cb, |
197 | | void *user_data, |
198 | | pj_activesock_t **p_asock) |
199 | 0 | { |
200 | 0 | pj_activesock_t *asock; |
201 | 0 | pj_ioqueue_callback ioq_cb; |
202 | 0 | pj_status_t status; |
203 | |
|
204 | 0 | PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL); |
205 | 0 | PJ_ASSERT_RETURN(sock>=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL); |
206 | 0 | PJ_ASSERT_RETURN((sock_type & 0xF)==pj_SOCK_STREAM() || |
207 | 0 | (sock_type & 0xF)==pj_SOCK_DGRAM(), PJ_EINVAL); |
208 | 0 | PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL); |
209 | | |
210 | 0 | asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t); |
211 | 0 | asock->ioqueue = ioqueue; |
212 | 0 | asock->stream_oriented = ((sock_type & 0xF) == pj_SOCK_STREAM()); |
213 | 0 | asock->async_count = (opt? opt->async_cnt : 1); |
214 | 0 | asock->whole_data = (opt? opt->whole_data : 1); |
215 | 0 | asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP; |
216 | 0 | asock->user_data = user_data; |
217 | 0 | pj_memcpy(&asock->cb, cb, sizeof(*cb)); |
218 | |
|
219 | 0 | pj_bzero(&ioq_cb, sizeof(ioq_cb)); |
220 | 0 | ioq_cb.on_read_complete = &ioqueue_on_read_complete; |
221 | 0 | ioq_cb.on_write_complete = &ioqueue_on_write_complete; |
222 | 0 | #if PJ_HAS_TCP |
223 | 0 | ioq_cb.on_connect_complete = &ioqueue_on_connect_complete; |
224 | 0 | ioq_cb.on_accept_complete = &ioqueue_on_accept_complete; |
225 | 0 | #endif |
226 | |
|
227 | 0 | status = pj_ioqueue_register_sock2(pool, ioqueue, sock, |
228 | 0 | (opt? opt->grp_lock : NULL), |
229 | 0 | asock, &ioq_cb, &asock->key); |
230 | 0 | if (status != PJ_SUCCESS) { |
231 | 0 | pj_activesock_close(asock); |
232 | 0 | return status; |
233 | 0 | } |
234 | | |
235 | 0 | if (asock->whole_data) { |
236 | | /* Must disable concurrency otherwise there is a race condition */ |
237 | 0 | pj_ioqueue_set_concurrency(asock->key, 0); |
238 | 0 | } else if (opt && opt->concurrency >= 0) { |
239 | 0 | pj_ioqueue_set_concurrency(asock->key, opt->concurrency); |
240 | 0 | } |
241 | |
|
242 | | #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
243 | | PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
244 | | asock->sock = sock; |
245 | | asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG; |
246 | | #endif |
247 | |
|
248 | 0 | *p_asock = asock; |
249 | 0 | return PJ_SUCCESS; |
250 | 0 | } |
251 | | |
252 | | |
253 | | PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool, |
254 | | const pj_sockaddr *addr, |
255 | | const pj_activesock_cfg *opt, |
256 | | pj_ioqueue_t *ioqueue, |
257 | | const pj_activesock_cb *cb, |
258 | | void *user_data, |
259 | | pj_activesock_t **p_asock, |
260 | | pj_sockaddr *bound_addr) |
261 | 0 | { |
262 | 0 | pj_sock_t sock_fd; |
263 | 0 | pj_sockaddr default_addr; |
264 | 0 | pj_status_t status; |
265 | 0 | int sock_type = pj_SOCK_DGRAM(); |
266 | |
|
267 | 0 | if (opt && opt->sock_cloexec) |
268 | 0 | sock_type |= pj_SOCK_CLOEXEC(); |
269 | |
|
270 | 0 | if (addr == NULL) { |
271 | 0 | pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0); |
272 | 0 | addr = &default_addr; |
273 | 0 | } |
274 | |
|
275 | 0 | status = pj_sock_socket(addr->addr.sa_family, sock_type, 0, |
276 | 0 | &sock_fd); |
277 | 0 | if (status != PJ_SUCCESS) { |
278 | 0 | return status; |
279 | 0 | } |
280 | | |
281 | 0 | status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr)); |
282 | 0 | if (status != PJ_SUCCESS) { |
283 | 0 | pj_sock_close(sock_fd); |
284 | 0 | return status; |
285 | 0 | } |
286 | | |
287 | 0 | status = pj_activesock_create(pool, sock_fd, sock_type, opt, |
288 | 0 | ioqueue, cb, user_data, p_asock); |
289 | 0 | if (status != PJ_SUCCESS) { |
290 | 0 | pj_sock_close(sock_fd); |
291 | 0 | return status; |
292 | 0 | } |
293 | | |
294 | 0 | if (bound_addr) { |
295 | 0 | int addr_len = sizeof(*bound_addr); |
296 | 0 | status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len); |
297 | 0 | if (status != PJ_SUCCESS) { |
298 | 0 | pj_activesock_close(*p_asock); |
299 | 0 | return status; |
300 | 0 | } |
301 | 0 | } |
302 | | |
303 | 0 | return PJ_SUCCESS; |
304 | 0 | } |
305 | | |
306 | | PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock) |
307 | 0 | { |
308 | 0 | pj_ioqueue_key_t *key; |
309 | 0 | pj_bool_t unregister = PJ_FALSE; |
310 | |
|
311 | 0 | PJ_ASSERT_RETURN(asock, PJ_EINVAL); |
312 | 0 | asock->shutdown = SHUT_RX | SHUT_TX; |
313 | | |
314 | | /* Avoid double unregistration on the key */ |
315 | 0 | key = asock->key; |
316 | 0 | if (key) { |
317 | 0 | pj_ioqueue_lock_key(key); |
318 | 0 | unregister = (asock->key != NULL); |
319 | 0 | asock->key = NULL; |
320 | 0 | pj_ioqueue_unlock_key(key); |
321 | 0 | } |
322 | |
|
323 | 0 | if (unregister) { |
324 | 0 | pj_ioqueue_unregister(key); |
325 | |
|
326 | | #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
327 | | PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
328 | | activesock_destroy_iphone_os_stream(asock); |
329 | | #endif |
330 | 0 | } |
331 | 0 | return PJ_SUCCESS; |
332 | 0 | } |
333 | | |
334 | | |
335 | | PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock, |
336 | | void *user_data) |
337 | 0 | { |
338 | 0 | PJ_ASSERT_RETURN(asock, PJ_EINVAL); |
339 | 0 | asock->user_data = user_data; |
340 | 0 | return PJ_SUCCESS; |
341 | 0 | } |
342 | | |
343 | | |
344 | | PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock) |
345 | 0 | { |
346 | 0 | PJ_ASSERT_RETURN(asock, NULL); |
347 | 0 | return asock->user_data; |
348 | 0 | } |
349 | | |
350 | | |
351 | | PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock, |
352 | | pj_pool_t *pool, |
353 | | unsigned buff_size, |
354 | | pj_uint32_t flags) |
355 | 0 | { |
356 | 0 | void **readbuf; |
357 | 0 | unsigned i; |
358 | |
|
359 | 0 | PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); |
360 | | |
361 | 0 | readbuf = (void**) pj_pool_calloc(pool, asock->async_count, |
362 | 0 | sizeof(void*)); |
363 | |
|
364 | 0 | for (i=0; i<asock->async_count; ++i) { |
365 | 0 | readbuf[i] = pj_pool_alloc(pool, buff_size); |
366 | 0 | } |
367 | |
|
368 | 0 | return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags); |
369 | 0 | } |
370 | | |
371 | | |
372 | | PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock, |
373 | | pj_pool_t *pool, |
374 | | unsigned buff_size, |
375 | | void *readbuf[], |
376 | | pj_uint32_t flags) |
377 | 0 | { |
378 | 0 | unsigned i; |
379 | 0 | pj_status_t status; |
380 | |
|
381 | 0 | PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); |
382 | 0 | PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP); |
383 | 0 | PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP); |
384 | | |
385 | 0 | asock->read_op = (struct read_op*) |
386 | 0 | pj_pool_calloc(pool, asock->async_count, |
387 | 0 | sizeof(struct read_op)); |
388 | 0 | asock->read_type = TYPE_RECV; |
389 | 0 | asock->read_flags = flags; |
390 | |
|
391 | 0 | for (i=0; i<asock->async_count; ++i) { |
392 | 0 | struct read_op *r = &asock->read_op[i]; |
393 | 0 | pj_ssize_t size_to_read; |
394 | |
|
395 | 0 | r->pkt = (pj_uint8_t*)readbuf[i]; |
396 | 0 | size_to_read = r->max_size = buff_size; |
397 | |
|
398 | 0 | status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read, |
399 | 0 | PJ_IOQUEUE_ALWAYS_ASYNC | flags); |
400 | 0 | PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG); |
401 | | |
402 | 0 | if (status != PJ_EPENDING) |
403 | 0 | return status; |
404 | 0 | } |
405 | | |
406 | 0 | return PJ_SUCCESS; |
407 | 0 | } |
408 | | |
409 | | |
410 | | PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock, |
411 | | pj_pool_t *pool, |
412 | | unsigned buff_size, |
413 | | pj_uint32_t flags) |
414 | 0 | { |
415 | 0 | void **readbuf; |
416 | 0 | unsigned i; |
417 | |
|
418 | 0 | PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); |
419 | | |
420 | 0 | readbuf = (void**) pj_pool_calloc(pool, asock->async_count, |
421 | 0 | sizeof(void*)); |
422 | |
|
423 | 0 | for (i=0; i<asock->async_count; ++i) { |
424 | 0 | readbuf[i] = pj_pool_alloc(pool, buff_size); |
425 | 0 | } |
426 | |
|
427 | 0 | return pj_activesock_start_recvfrom2(asock, pool, buff_size, |
428 | 0 | readbuf, flags); |
429 | 0 | } |
430 | | |
431 | | |
432 | | PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock, |
433 | | pj_pool_t *pool, |
434 | | unsigned buff_size, |
435 | | void *readbuf[], |
436 | | pj_uint32_t flags) |
437 | 0 | { |
438 | 0 | unsigned i; |
439 | 0 | pj_status_t status; |
440 | |
|
441 | 0 | PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); |
442 | 0 | PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP); |
443 | | |
444 | 0 | asock->read_op = (struct read_op*) |
445 | 0 | pj_pool_calloc(pool, asock->async_count, |
446 | 0 | sizeof(struct read_op)); |
447 | 0 | asock->read_type = TYPE_RECV_FROM; |
448 | 0 | asock->read_flags = flags; |
449 | |
|
450 | 0 | for (i=0; i<asock->async_count; ++i) { |
451 | 0 | struct read_op *r = &asock->read_op[i]; |
452 | 0 | pj_ssize_t size_to_read; |
453 | |
|
454 | 0 | r->pkt = (pj_uint8_t*) readbuf[i]; |
455 | 0 | size_to_read = r->max_size = buff_size; |
456 | 0 | r->src_addr_len = sizeof(r->src_addr); |
457 | |
|
458 | 0 | status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt, |
459 | 0 | &size_to_read, |
460 | 0 | PJ_IOQUEUE_ALWAYS_ASYNC | flags, |
461 | 0 | &r->src_addr, &r->src_addr_len); |
462 | 0 | PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG); |
463 | | |
464 | 0 | if (status != PJ_EPENDING) |
465 | 0 | return status; |
466 | 0 | } |
467 | | |
468 | 0 | return PJ_SUCCESS; |
469 | 0 | } |
470 | | |
471 | | |
472 | | static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, |
473 | | pj_ioqueue_op_key_t *op_key, |
474 | | pj_ssize_t bytes_read) |
475 | 0 | { |
476 | 0 | pj_activesock_t *asock; |
477 | 0 | struct read_op *r = (struct read_op*)op_key; |
478 | 0 | unsigned loop = 0; |
479 | 0 | pj_status_t status; |
480 | |
|
481 | 0 | asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); |
482 | | |
483 | | /* Ignore if we've been shutdown */ |
484 | 0 | if (asock->shutdown & SHUT_RX) |
485 | 0 | return; |
486 | | |
487 | 0 | do { |
488 | 0 | unsigned flags; |
489 | |
|
490 | 0 | if (bytes_read > 0) { |
491 | | /* |
492 | | * We've got new data. |
493 | | */ |
494 | 0 | pj_size_t remainder; |
495 | 0 | pj_bool_t ret; |
496 | | |
497 | | /* Append this new data to existing data. If socket is stream |
498 | | * oriented, user might have left some data in the buffer. |
499 | | * Otherwise if socket is datagram there will be nothing in |
500 | | * existing packet hence the packet will contain only the new |
501 | | * packet. |
502 | | */ |
503 | 0 | r->size += bytes_read; |
504 | | |
505 | | /* Set default remainder to zero */ |
506 | 0 | remainder = 0; |
507 | | |
508 | | /* And return value to TRUE */ |
509 | 0 | ret = PJ_TRUE; |
510 | | |
511 | | /* Notify callback */ |
512 | 0 | if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) { |
513 | 0 | ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size, |
514 | 0 | PJ_SUCCESS, &remainder); |
515 | 0 | PJ_ASSERT_ON_FAIL( |
516 | 0 | !ret || !asock->stream_oriented || remainder <= r->size, |
517 | 0 | { |
518 | 0 | PJ_LOG(2, ("", |
519 | 0 | "App bug! Invalid remainder length from " |
520 | 0 | "activesock on_data_read().")); |
521 | 0 | remainder = 0; |
522 | 0 | }); |
523 | 0 | } else if (asock->read_type == TYPE_RECV_FROM && |
524 | 0 | asock->cb.on_data_recvfrom) |
525 | 0 | { |
526 | 0 | ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size, |
527 | 0 | &r->src_addr, |
528 | 0 | r->src_addr_len, |
529 | 0 | PJ_SUCCESS); |
530 | 0 | } |
531 | | |
532 | | /* If callback returns false, we have been destroyed! */ |
533 | 0 | if (!ret) |
534 | 0 | return; |
535 | | |
536 | | /* Only stream oriented socket may leave data in the packet */ |
537 | 0 | if (asock->stream_oriented) { |
538 | 0 | r->size = remainder; |
539 | 0 | } else { |
540 | 0 | r->size = 0; |
541 | 0 | } |
542 | |
|
543 | 0 | } else if (bytes_read <= 0 && |
544 | 0 | -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && |
545 | 0 | -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && |
546 | 0 | (asock->stream_oriented || |
547 | 0 | -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))) |
548 | 0 | { |
549 | 0 | pj_size_t remainder; |
550 | 0 | pj_bool_t ret; |
551 | |
|
552 | 0 | if (bytes_read == 0) { |
553 | | /* For stream/connection oriented socket, this means the |
554 | | * connection has been closed. For datagram sockets, it means |
555 | | * we've received datagram with zero length. |
556 | | */ |
557 | 0 | if (asock->stream_oriented) |
558 | 0 | status = PJ_EEOF; |
559 | 0 | else |
560 | 0 | status = PJ_SUCCESS; |
561 | 0 | } else { |
562 | | /* This means we've got an error. If this is stream/connection |
563 | | * oriented, it means connection has been closed. For datagram |
564 | | * sockets, it means we've got some error (e.g. EWOULDBLOCK). |
565 | | */ |
566 | 0 | status = (pj_status_t)-bytes_read; |
567 | 0 | } |
568 | | |
569 | | /* Set default remainder to zero */ |
570 | 0 | remainder = 0; |
571 | | |
572 | | /* And return value to TRUE */ |
573 | 0 | ret = PJ_TRUE; |
574 | | |
575 | | /* Notify callback */ |
576 | 0 | if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) { |
577 | | /* For connection oriented socket, we still need to report |
578 | | * the remainder data (if any) to the user to let user do |
579 | | * processing with the remainder data before it closes the |
580 | | * connection. |
581 | | * If there is no remainder data, set the packet to NULL. |
582 | | */ |
583 | | |
584 | | /* Shouldn't set the packet to NULL, as there may be active |
585 | | * socket user, such as SSL socket, that needs to have access |
586 | | * to the read buffer packet. |
587 | | */ |
588 | | //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL), |
589 | | // r->size, status, &remainder); |
590 | 0 | ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size, |
591 | 0 | status, &remainder); |
592 | 0 | PJ_ASSERT_ON_FAIL( |
593 | 0 | !ret || !asock->stream_oriented || remainder <= r->size, |
594 | 0 | { |
595 | 0 | PJ_LOG(2, ("", |
596 | 0 | "App bug! Invalid remainder length from " |
597 | 0 | "activesock on_data_read().")); |
598 | 0 | remainder = 0; |
599 | 0 | }); |
600 | |
|
601 | 0 | } else if (asock->read_type == TYPE_RECV_FROM && |
602 | 0 | asock->cb.on_data_recvfrom) |
603 | 0 | { |
604 | | /* This would always be datagram oriented hence there's |
605 | | * nothing in the packet. We can't be sure if there will be |
606 | | * anything useful in the source_addr, so just put NULL |
607 | | * there too. |
608 | | */ |
609 | | /* In some scenarios, status may be PJ_SUCCESS. The upper |
610 | | * layer application may not expect the callback to be called |
611 | | * with successful status and NULL data, so lets not call the |
612 | | * callback if the status is PJ_SUCCESS. |
613 | | */ |
614 | 0 | if (status != PJ_SUCCESS ) { |
615 | 0 | ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0, |
616 | 0 | NULL, 0, status); |
617 | 0 | } |
618 | 0 | } |
619 | | |
620 | | /* If callback returns false, we have been destroyed! */ |
621 | 0 | if (!ret) |
622 | 0 | return; |
623 | | |
624 | | /* Also stop further read if we've been shutdown */ |
625 | 0 | if (asock->shutdown & SHUT_RX) |
626 | 0 | return; |
627 | | |
628 | | /* Only stream oriented socket may leave data in the packet */ |
629 | 0 | if (asock->stream_oriented) { |
630 | 0 | r->size = remainder; |
631 | 0 | } else { |
632 | 0 | r->size = 0; |
633 | 0 | } |
634 | 0 | } |
635 | | |
636 | | /* Read next data. We limit ourselves to processing max_loop immediate |
637 | | * data, so when the loop counter has exceeded this value, force the |
638 | | * read()/recvfrom() to return pending operation to allow the program |
639 | | * to do other jobs. |
640 | | */ |
641 | 0 | bytes_read = r->max_size - r->size; |
642 | 0 | flags = asock->read_flags; |
643 | 0 | if (++loop >= asock->max_loop) |
644 | 0 | flags |= PJ_IOQUEUE_ALWAYS_ASYNC; |
645 | |
|
646 | 0 | if (asock->read_type == TYPE_RECV) { |
647 | 0 | status = pj_ioqueue_recv(key, op_key, r->pkt + r->size, |
648 | 0 | &bytes_read, flags); |
649 | 0 | } else { |
650 | 0 | r->src_addr_len = sizeof(r->src_addr); |
651 | 0 | status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size, |
652 | 0 | &bytes_read, flags, |
653 | 0 | &r->src_addr, &r->src_addr_len); |
654 | 0 | } |
655 | |
|
656 | 0 | if (status == PJ_SUCCESS) { |
657 | | /* Immediate data */ |
658 | 0 | ; |
659 | 0 | } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) { |
660 | | /* Error */ |
661 | 0 | bytes_read = -status; |
662 | 0 | } else { |
663 | 0 | break; |
664 | 0 | } |
665 | 0 | } while (1); |
666 | |
|
667 | 0 | } |
668 | | |
669 | | |
670 | | static pj_status_t send_remaining(pj_activesock_t *asock, |
671 | | pj_ioqueue_op_key_t *send_key) |
672 | 0 | { |
673 | 0 | struct send_data *sd = (struct send_data*)send_key->activesock_data; |
674 | 0 | pj_status_t status; |
675 | |
|
676 | 0 | do { |
677 | 0 | pj_ssize_t size; |
678 | |
|
679 | 0 | size = sd->len - sd->sent; |
680 | 0 | status = pj_ioqueue_send(asock->key, send_key, |
681 | 0 | sd->data+sd->sent, &size, sd->flags); |
682 | 0 | if (status != PJ_SUCCESS) { |
683 | | /* Pending or error */ |
684 | 0 | break; |
685 | 0 | } |
686 | | |
687 | 0 | sd->sent += size; |
688 | 0 | if (sd->sent == sd->len) { |
689 | | /* The whole data has been sent. */ |
690 | 0 | return PJ_SUCCESS; |
691 | 0 | } |
692 | |
|
693 | 0 | } while (sd->sent < sd->len); |
694 | | |
695 | 0 | return status; |
696 | 0 | } |
697 | | |
698 | | |
699 | | PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, |
700 | | pj_ioqueue_op_key_t *send_key, |
701 | | const void *data, |
702 | | pj_ssize_t *size, |
703 | | unsigned flags) |
704 | 0 | { |
705 | 0 | PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); |
706 | | |
707 | 0 | if (asock->shutdown & SHUT_TX) |
708 | 0 | return PJ_EINVALIDOP; |
709 | | |
710 | 0 | send_key->activesock_data = NULL; |
711 | |
|
712 | 0 | if (asock->whole_data) { |
713 | 0 | pj_ssize_t whole; |
714 | 0 | pj_status_t status; |
715 | |
|
716 | 0 | whole = *size; |
717 | |
|
718 | 0 | status = pj_ioqueue_send(asock->key, send_key, data, size, flags); |
719 | 0 | if (status != PJ_SUCCESS) { |
720 | | /* Pending or error */ |
721 | 0 | return status; |
722 | 0 | } |
723 | | |
724 | 0 | if (*size == whole) { |
725 | | /* The whole data has been sent. */ |
726 | 0 | return PJ_SUCCESS; |
727 | 0 | } |
728 | | |
729 | | /* Data was partially sent */ |
730 | 0 | asock->send_data.data = (pj_uint8_t*)data; |
731 | 0 | asock->send_data.len = whole; |
732 | 0 | asock->send_data.sent = *size; |
733 | 0 | asock->send_data.flags = flags; |
734 | 0 | send_key->activesock_data = &asock->send_data; |
735 | | |
736 | | /* Try again */ |
737 | 0 | status = send_remaining(asock, send_key); |
738 | 0 | if (status == PJ_SUCCESS) { |
739 | 0 | *size = whole; |
740 | 0 | } |
741 | 0 | return status; |
742 | |
|
743 | 0 | } else { |
744 | 0 | return pj_ioqueue_send(asock->key, send_key, data, size, flags); |
745 | 0 | } |
746 | 0 | } |
747 | | |
748 | | |
749 | | PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock, |
750 | | pj_ioqueue_op_key_t *send_key, |
751 | | const void *data, |
752 | | pj_ssize_t *size, |
753 | | unsigned flags, |
754 | | const pj_sockaddr_t *addr, |
755 | | int addr_len) |
756 | 0 | { |
757 | 0 | PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len, |
758 | 0 | PJ_EINVAL); |
759 | | |
760 | 0 | if (asock->shutdown & SHUT_TX) |
761 | 0 | return PJ_EINVALIDOP; |
762 | | |
763 | 0 | return pj_ioqueue_sendto(asock->key, send_key, data, size, flags, |
764 | 0 | addr, addr_len); |
765 | 0 | } |
766 | | |
767 | | |
768 | | static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, |
769 | | pj_ioqueue_op_key_t *op_key, |
770 | | pj_ssize_t bytes_sent) |
771 | 0 | { |
772 | 0 | pj_activesock_t *asock; |
773 | |
|
774 | 0 | asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); |
775 | | |
776 | | /* Ignore if we've been shutdown. This may cause data to be partially |
777 | | * sent even when 'wholedata' was requested if the OS only sent partial |
778 | | * buffer. |
779 | | */ |
780 | 0 | if (asock->shutdown & SHUT_TX) |
781 | 0 | return; |
782 | | |
783 | 0 | if (bytes_sent > 0 && op_key->activesock_data) { |
784 | | /* whole_data is requested. Make sure we send all the data */ |
785 | 0 | struct send_data *sd = (struct send_data*)op_key->activesock_data; |
786 | |
|
787 | 0 | sd->sent += bytes_sent; |
788 | 0 | if (sd->sent == sd->len) { |
789 | | /* all has been sent */ |
790 | 0 | bytes_sent = sd->sent; |
791 | 0 | op_key->activesock_data = NULL; |
792 | 0 | } else { |
793 | | /* send remaining data */ |
794 | 0 | pj_status_t status; |
795 | |
|
796 | 0 | status = send_remaining(asock, op_key); |
797 | 0 | if (status == PJ_EPENDING) |
798 | 0 | return; |
799 | 0 | else if (status == PJ_SUCCESS) |
800 | 0 | bytes_sent = sd->sent; |
801 | 0 | else |
802 | 0 | bytes_sent = -status; |
803 | | |
804 | 0 | op_key->activesock_data = NULL; |
805 | 0 | } |
806 | 0 | } |
807 | | |
808 | 0 | if (asock->cb.on_data_sent) { |
809 | 0 | pj_bool_t ret; |
810 | |
|
811 | 0 | ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent); |
812 | | |
813 | | /* If callback returns false, we have been destroyed! */ |
814 | 0 | if (!ret) |
815 | 0 | return; |
816 | 0 | } |
817 | 0 | } |
818 | | |
819 | | #if PJ_HAS_TCP |
820 | | PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock, |
821 | | pj_pool_t *pool) |
822 | 0 | { |
823 | 0 | unsigned i; |
824 | |
|
825 | 0 | PJ_ASSERT_RETURN(asock, PJ_EINVAL); |
826 | 0 | PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP); |
827 | | |
828 | | /* Ignore if we've been shutdown */ |
829 | 0 | if (asock->shutdown) |
830 | 0 | return PJ_EINVALIDOP; |
831 | | |
832 | 0 | asock->accept_op = (struct accept_op*) |
833 | 0 | pj_pool_calloc(pool, asock->async_count, |
834 | 0 | sizeof(struct accept_op)); |
835 | 0 | for (i=0; i<asock->async_count; ++i) { |
836 | 0 | struct accept_op *a = &asock->accept_op[i]; |
837 | 0 | pj_status_t status; |
838 | |
|
839 | 0 | do { |
840 | 0 | a->new_sock = PJ_INVALID_SOCKET; |
841 | 0 | a->rem_addr_len = sizeof(a->rem_addr); |
842 | |
|
843 | 0 | status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock, |
844 | 0 | NULL, &a->rem_addr, &a->rem_addr_len); |
845 | 0 | if (status == PJ_SUCCESS) { |
846 | | /* We've got immediate connection. Not sure if it's a good |
847 | | * idea to call the callback now (probably application will |
848 | | * not be prepared to process it), so lets just silently |
849 | | * close the socket. |
850 | | */ |
851 | 0 | pj_sock_close(a->new_sock); |
852 | 0 | } |
853 | 0 | } while (status == PJ_SUCCESS); |
854 | |
|
855 | 0 | if (status != PJ_EPENDING) { |
856 | 0 | return status; |
857 | 0 | } |
858 | 0 | } |
859 | | |
860 | 0 | return PJ_SUCCESS; |
861 | 0 | } |
862 | | |
863 | | |
864 | | static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, |
865 | | pj_ioqueue_op_key_t *op_key, |
866 | | pj_sock_t new_sock, |
867 | | pj_status_t status) |
868 | 0 | { |
869 | 0 | pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); |
870 | 0 | struct accept_op *accept_op = (struct accept_op*) op_key; |
871 | |
|
872 | 0 | PJ_UNUSED_ARG(new_sock); |
873 | | |
874 | | /* Ignore if we've been shutdown */ |
875 | 0 | if (asock->shutdown) |
876 | 0 | return; |
877 | | |
878 | 0 | do { |
879 | 0 | if (status == asock->last_err && status != PJ_SUCCESS) { |
880 | 0 | asock->err_counter++; |
881 | 0 | if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) { |
882 | 0 | PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()" |
883 | 0 | " operation, stopping further ioqueue accepts.", |
884 | 0 | asock->err_counter, asock->last_err)); |
885 | | |
886 | 0 | if ((status == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) && |
887 | 0 | (asock->cb.on_accept_complete2)) |
888 | 0 | { |
889 | 0 | (*asock->cb.on_accept_complete2)(asock, |
890 | 0 | accept_op->new_sock, |
891 | 0 | &accept_op->rem_addr, |
892 | 0 | accept_op->rem_addr_len, |
893 | 0 | PJ_ESOCKETSTOP); |
894 | 0 | } |
895 | 0 | return; |
896 | 0 | } |
897 | 0 | } else { |
898 | 0 | asock->err_counter = 0; |
899 | 0 | asock->last_err = status; |
900 | 0 | } |
901 | | |
902 | 0 | if (status==PJ_SUCCESS && (asock->cb.on_accept_complete2 || |
903 | 0 | asock->cb.on_accept_complete)) { |
904 | 0 | pj_bool_t ret; |
905 | | |
906 | | /* Notify callback */ |
907 | 0 | if (asock->cb.on_accept_complete2) { |
908 | 0 | ret = (*asock->cb.on_accept_complete2)(asock, |
909 | 0 | accept_op->new_sock, |
910 | 0 | &accept_op->rem_addr, |
911 | 0 | accept_op->rem_addr_len, |
912 | 0 | status); |
913 | 0 | } else { |
914 | 0 | ret = (*asock->cb.on_accept_complete)(asock, |
915 | 0 | accept_op->new_sock, |
916 | 0 | &accept_op->rem_addr, |
917 | 0 | accept_op->rem_addr_len); |
918 | 0 | } |
919 | | |
920 | | /* If callback returns false, we have been destroyed! */ |
921 | 0 | if (!ret) |
922 | 0 | return; |
923 | |
|
924 | | #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
925 | | PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
926 | | activesock_create_iphone_os_stream(asock); |
927 | | #endif |
928 | 0 | } else if (status==PJ_SUCCESS) { |
929 | | /* Application doesn't handle the new socket, we need to |
930 | | * close it to avoid resource leak. |
931 | | */ |
932 | 0 | pj_sock_close(accept_op->new_sock); |
933 | 0 | } |
934 | | |
935 | | /* Don't start another accept() if we've been shutdown */ |
936 | 0 | if (asock->shutdown) |
937 | 0 | return; |
938 | | |
939 | | /* Prepare next accept() */ |
940 | 0 | accept_op->new_sock = PJ_INVALID_SOCKET; |
941 | 0 | accept_op->rem_addr_len = sizeof(accept_op->rem_addr); |
942 | |
|
943 | 0 | status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock, |
944 | 0 | NULL, &accept_op->rem_addr, |
945 | 0 | &accept_op->rem_addr_len); |
946 | |
|
947 | 0 | } while (status != PJ_EPENDING && status != PJ_ECANCELLED); |
948 | 0 | } |
949 | | |
950 | | |
951 | | PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock, |
952 | | pj_pool_t *pool, |
953 | | const pj_sockaddr_t *remaddr, |
954 | | int addr_len) |
955 | 0 | { |
956 | 0 | PJ_UNUSED_ARG(pool); |
957 | |
|
958 | 0 | if (asock->shutdown) |
959 | 0 | return PJ_EINVALIDOP; |
960 | | |
961 | 0 | return pj_ioqueue_connect(asock->key, remaddr, addr_len); |
962 | 0 | } |
963 | | |
964 | | static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, |
965 | | pj_status_t status) |
966 | 0 | { |
967 | 0 | pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); |
968 | | |
969 | | /* Ignore if we've been shutdown */ |
970 | 0 | if (asock->shutdown) |
971 | 0 | return; |
972 | | |
973 | 0 | if (asock->cb.on_connect_complete) { |
974 | 0 | pj_bool_t ret; |
975 | |
|
976 | 0 | ret = (*asock->cb.on_connect_complete)(asock, status); |
977 | |
|
978 | 0 | if (!ret) { |
979 | | /* We've been destroyed */ |
980 | 0 | return; |
981 | 0 | } |
982 | | |
983 | | #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
984 | | PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
985 | | activesock_create_iphone_os_stream(asock); |
986 | | #endif |
987 | | |
988 | 0 | } |
989 | 0 | } |
990 | | #endif /* PJ_HAS_TCP */ |
991 | | |