/src/pjsip/pjlib/src/pj/ioqueue_select.c
Line | Count | Source |
1 | | /* |
2 | | * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) |
3 | | * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> |
4 | | * |
5 | | * This program is free software; you can redistribute it and/or modify |
6 | | * it under the terms of the GNU General Public License as published by |
7 | | * the Free Software Foundation; either version 2 of the License, or |
8 | | * (at your option) any later version. |
9 | | * |
10 | | * This program is distributed in the hope that it will be useful, |
11 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | * GNU General Public License for more details. |
14 | | * |
15 | | * You should have received a copy of the GNU General Public License |
16 | | * along with this program; if not, write to the Free Software |
17 | | * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
18 | | */ |
19 | | |
20 | | /* |
21 | | * sock_select.c |
22 | | * |
23 | | * This is the implementation of IOQueue using pj_sock_select(). |
24 | | * It runs anywhere where pj_sock_select() is available (currently |
25 | | * Win32, Linux, Linux kernel, etc.). |
26 | | */ |
27 | | |
28 | | #include <pj/ioqueue.h> |
29 | | #include <pj/os.h> |
30 | | #include <pj/lock.h> |
31 | | #include <pj/log.h> |
32 | | #include <pj/list.h> |
33 | | #include <pj/pool.h> |
34 | | #include <pj/string.h> |
35 | | #include <pj/assert.h> |
36 | | #include <pj/sock.h> |
37 | | #include <pj/compat/socket.h> |
38 | | #include <pj/sock_select.h> |
39 | | #include <pj/sock_qos.h> |
40 | | #include <pj/errno.h> |
41 | | #include <pj/rand.h> |
42 | | |
43 | | |
44 | | /* Only build when the backend is using select(). */ |
45 | | #if PJ_IOQUEUE_IMP == PJ_IOQUEUE_IMP_SELECT |
46 | | |
47 | | |
48 | | /* Now that we have access to OS'es <sys/select>, lets check again that |
49 | | * PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE |
50 | | */ |
51 | | #if PJ_IOQUEUE_MAX_HANDLES > FD_SETSIZE |
52 | | # error "PJ_IOQUEUE_MAX_HANDLES cannot be greater than FD_SETSIZE" |
53 | | #endif |
54 | | |
55 | | |
56 | | /* |
57 | | * Include declaration from common abstraction. |
58 | | */ |
59 | | #include "ioqueue_common_abs.h" |
60 | | |
61 | | /* |
62 | | * ISSUES with ioqueue_select() |
63 | | * |
64 | | * EAGAIN/EWOULDBLOCK error in recv(): |
65 | | * - when multiple threads are working with the ioqueue, application |
66 | | * may receive EAGAIN or EWOULDBLOCK in the receive callback. |
67 | | * This error happens because more than one thread is watching for |
68 | | * the same descriptor set, so when all of them call recv() or recvfrom() |
69 | | * simultaneously, only one will succeed and the rest will get the error. |
70 | | * |
71 | | */ |
72 | | #define THIS_FILE "ioq_select" |
73 | | |
74 | | /* |
75 | | * The select ioqueue relies on socket functions (pj_sock_xxx()) to return |
76 | | * the correct error code. |
77 | | */ |
78 | | #if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) |
79 | | # error "Error reporting must be enabled for this function to work!" |
80 | | #endif |
81 | | |
82 | | /* |
83 | | * During debugging build, VALIDATE_FD_SET is set. |
84 | | * This will check the validity of the fd_sets. |
85 | | */ |
86 | | /* |
87 | | #if defined(PJ_DEBUG) && PJ_DEBUG != 0 |
88 | | # define VALIDATE_FD_SET 1 |
89 | | #else |
90 | | # define VALIDATE_FD_SET 0 |
91 | | #endif |
92 | | */ |
93 | | #define VALIDATE_FD_SET 0 |
94 | | |
95 | | #if 0 |
96 | | # define TRACE__(args) PJ_LOG(3,args) |
97 | | #else |
98 | | # define TRACE__(args) |
99 | | #endif |
100 | | |
101 | | /* |
102 | | * This describes each key. |
103 | | */ |
104 | | struct pj_ioqueue_key_t |
105 | | { |
106 | | DECLARE_COMMON_KEY |
107 | | }; |
108 | | |
109 | | /* |
110 | | * This describes the I/O queue itself. |
111 | | */ |
112 | | struct pj_ioqueue_t |
113 | | { |
114 | | DECLARE_COMMON_IOQUEUE |
115 | | |
116 | | unsigned max, count; /* Max and current key count */ |
117 | | int nfds; /* The largest fd value (for select)*/ |
118 | | pj_ioqueue_key_t active_list; /* List of active keys. */ |
119 | | pj_fd_set_t rfdset; |
120 | | pj_fd_set_t wfdset; |
121 | | #if PJ_HAS_TCP |
122 | | pj_fd_set_t xfdset; |
123 | | #endif |
124 | | |
125 | | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
126 | | pj_mutex_t *ref_cnt_mutex; |
127 | | pj_ioqueue_key_t closing_list; |
128 | | pj_ioqueue_key_t free_list; |
129 | | #endif |
130 | | }; |
131 | | |
132 | | /* Proto */ |
133 | | #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
134 | | PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
135 | | static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h); |
136 | | #endif |
137 | | |
138 | | #if defined(PJ_HAS_SSL_SOCK) && PJ_HAS_SSL_SOCK != 0 && \ |
139 | | (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_APPLE) |
140 | | /* Call SSL Network framework poll */ |
141 | | pj_status_t ssl_network_event_poll(); |
142 | | #endif |
143 | | |
144 | | /* Include implementation for common abstraction after we declare |
145 | | * pj_ioqueue_key_t and pj_ioqueue_t. |
146 | | */ |
147 | | #include "ioqueue_common_abs.c" |
148 | | |
149 | | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
150 | | /* Scan closing keys to be put to free list again */ |
151 | | static void scan_closing_keys(pj_ioqueue_t *ioqueue); |
152 | | #endif |
153 | | |
154 | | /* |
155 | | * pj_ioqueue_name() |
156 | | */ |
157 | | PJ_DEF(const char*) pj_ioqueue_name(void) |
158 | 0 | { |
159 | 0 | return "select"; |
160 | 0 | } |
161 | | |
162 | | /* |
163 | | * Scan the socket descriptor sets for the largest descriptor. |
164 | | * This value is needed by select(). |
165 | | */ |
166 | | #if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0 |
167 | | static void rescan_fdset(pj_ioqueue_t *ioqueue) |
168 | | { |
169 | | pj_ioqueue_key_t *key = ioqueue->active_list.next; |
170 | | int max = 0; |
171 | | |
172 | | while (key != &ioqueue->active_list) { |
173 | | if (key->fd > max) |
174 | | max = key->fd; |
175 | | key = key->next; |
176 | | } |
177 | | |
178 | | ioqueue->nfds = max; |
179 | | } |
180 | | #else |
181 | | static void rescan_fdset(pj_ioqueue_t *ioqueue) |
182 | 0 | { |
183 | 0 | ioqueue->nfds = FD_SETSIZE-1; |
184 | 0 | } |
185 | | #endif |
186 | | |
187 | | |
188 | | /* |
189 | | * pj_ioqueue_create() |
190 | | * |
191 | | * Create select ioqueue. |
192 | | */ |
193 | | PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, |
194 | | pj_size_t max_fd, |
195 | | pj_ioqueue_t **p_ioqueue) |
196 | 0 | { |
197 | 0 | return pj_ioqueue_create2(pool, max_fd, NULL, p_ioqueue); |
198 | 0 | } |
199 | | |
200 | | |
201 | | /* |
202 | | * pj_ioqueue_create2() |
203 | | * |
204 | | * Create select ioqueue. |
205 | | */ |
206 | | PJ_DEF(pj_status_t) pj_ioqueue_create2(pj_pool_t *pool, |
207 | | pj_size_t max_fd, |
208 | | const pj_ioqueue_cfg *cfg, |
209 | | pj_ioqueue_t **p_ioqueue) |
210 | 0 | { |
211 | 0 | pj_ioqueue_t *ioqueue; |
212 | 0 | pj_lock_t *lock; |
213 | 0 | pj_size_t i; |
214 | 0 | pj_status_t rc; |
215 | | |
216 | | /* Check that arguments are valid. */ |
217 | 0 | PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && |
218 | 0 | max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, |
219 | 0 | PJ_EINVAL); |
220 | | |
221 | | /* Check that size of pj_ioqueue_op_key_t is sufficient */ |
222 | 0 | PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= |
223 | 0 | sizeof(union operation_key), PJ_EBUG); |
224 | | |
225 | | /* Create and init common ioqueue stuffs */ |
226 | 0 | ioqueue = PJ_POOL_ALLOC_T(pool, pj_ioqueue_t); |
227 | 0 | ioqueue_init(ioqueue); |
228 | |
|
229 | 0 | if (cfg) |
230 | 0 | pj_memcpy(&ioqueue->cfg, cfg, sizeof(*cfg)); |
231 | 0 | else |
232 | 0 | pj_ioqueue_cfg_default(&ioqueue->cfg); |
233 | 0 | ioqueue->max = (unsigned)max_fd; |
234 | 0 | ioqueue->count = 0; |
235 | 0 | PJ_FD_ZERO(&ioqueue->rfdset); |
236 | 0 | PJ_FD_ZERO(&ioqueue->wfdset); |
237 | 0 | #if PJ_HAS_TCP |
238 | 0 | PJ_FD_ZERO(&ioqueue->xfdset); |
239 | 0 | #endif |
240 | 0 | pj_list_init(&ioqueue->active_list); |
241 | |
|
242 | 0 | rescan_fdset(ioqueue); |
243 | |
|
244 | 0 | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
245 | | /* When safe unregistration is used (the default), we pre-create |
246 | | * all keys and put them in the free list. |
247 | | */ |
248 | | |
249 | | /* Mutex to protect key's reference counter |
250 | | * We don't want to use key's mutex or ioqueue's mutex because |
251 | | * that would create deadlock situation in some cases. |
252 | | */ |
253 | 0 | rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex); |
254 | 0 | if (rc != PJ_SUCCESS) |
255 | 0 | return rc; |
256 | | |
257 | | |
258 | | /* Init key list */ |
259 | 0 | pj_list_init(&ioqueue->free_list); |
260 | 0 | pj_list_init(&ioqueue->closing_list); |
261 | | |
262 | | |
263 | | /* Pre-create all keys according to max_fd */ |
264 | 0 | for (i=0; i<max_fd; ++i) { |
265 | 0 | pj_ioqueue_key_t *key; |
266 | |
|
267 | 0 | key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t); |
268 | 0 | key->ref_count = 0; |
269 | 0 | rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock); |
270 | 0 | if (rc != PJ_SUCCESS) { |
271 | 0 | key = ioqueue->free_list.next; |
272 | 0 | while (key != &ioqueue->free_list) { |
273 | 0 | pj_lock_destroy(key->lock); |
274 | 0 | key = key->next; |
275 | 0 | } |
276 | 0 | pj_mutex_destroy(ioqueue->ref_cnt_mutex); |
277 | 0 | return rc; |
278 | 0 | } |
279 | | |
280 | 0 | pj_list_push_back(&ioqueue->free_list, key); |
281 | 0 | } |
282 | 0 | #endif |
283 | | |
284 | | /* Create and init ioqueue mutex */ |
285 | 0 | rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); |
286 | 0 | if (rc != PJ_SUCCESS) |
287 | 0 | return rc; |
288 | | |
289 | 0 | rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); |
290 | 0 | if (rc != PJ_SUCCESS) |
291 | 0 | return rc; |
292 | | |
293 | 0 | PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); |
294 | |
|
295 | 0 | *p_ioqueue = ioqueue; |
296 | 0 | return PJ_SUCCESS; |
297 | 0 | } |
298 | | |
299 | | /* |
300 | | * pj_ioqueue_destroy() |
301 | | * |
302 | | * Destroy ioqueue. |
303 | | */ |
304 | | PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) |
305 | 0 | { |
306 | 0 | pj_ioqueue_key_t *key; |
307 | |
|
308 | 0 | PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); |
309 | | |
310 | 0 | pj_lock_acquire(ioqueue->lock); |
311 | |
|
312 | 0 | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
313 | | /* Destroy reference counters */ |
314 | 0 | key = ioqueue->active_list.next; |
315 | 0 | while (key != &ioqueue->active_list) { |
316 | 0 | pj_lock_destroy(key->lock); |
317 | 0 | key = key->next; |
318 | 0 | } |
319 | |
|
320 | 0 | key = ioqueue->closing_list.next; |
321 | 0 | while (key != &ioqueue->closing_list) { |
322 | 0 | pj_lock_destroy(key->lock); |
323 | 0 | key = key->next; |
324 | 0 | } |
325 | |
|
326 | 0 | key = ioqueue->free_list.next; |
327 | 0 | while (key != &ioqueue->free_list) { |
328 | 0 | pj_lock_destroy(key->lock); |
329 | 0 | key = key->next; |
330 | 0 | } |
331 | |
|
332 | 0 | pj_mutex_destroy(ioqueue->ref_cnt_mutex); |
333 | 0 | #endif |
334 | |
|
335 | 0 | return ioqueue_destroy(ioqueue); |
336 | 0 | } |
337 | | |
338 | | |
339 | | /* |
340 | | * pj_ioqueue_register_sock() |
341 | | * |
342 | | * Register socket handle to ioqueue. |
343 | | */ |
344 | | PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, |
345 | | pj_ioqueue_t *ioqueue, |
346 | | pj_sock_t sock, |
347 | | pj_grp_lock_t *grp_lock, |
348 | | void *user_data, |
349 | | const pj_ioqueue_callback *cb, |
350 | | pj_ioqueue_key_t **p_key) |
351 | 0 | { |
352 | 0 | pj_ioqueue_key_t *key = NULL; |
353 | | #if defined(PJ_WIN32) && PJ_WIN32!=0 || \ |
354 | | defined(PJ_WIN64) && PJ_WIN64 != 0 || \ |
355 | | defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0 |
356 | | u_long value; |
357 | | #else |
358 | 0 | pj_uint32_t value; |
359 | 0 | #endif |
360 | 0 | pj_status_t rc = PJ_SUCCESS; |
361 | | |
362 | 0 | PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && |
363 | 0 | cb && p_key, PJ_EINVAL); |
364 | | |
365 | | /* On platforms with fd_set containing fd bitmap such as *nix family, |
366 | | * avoid potential memory corruption caused by select() when given |
367 | | * an fd that is higher than FD_SETSIZE. |
368 | | */ |
369 | 0 | if (sizeof(fd_set) < FD_SETSIZE && sock >= FD_SETSIZE) { |
370 | 0 | PJ_LOG(4, ("pjlib", "Failed to register socket to ioqueue because " |
371 | 0 | "socket fd is too big (fd=%ld/FD_SETSIZE=%d)", |
372 | 0 | sock, FD_SETSIZE)); |
373 | 0 | return PJ_ETOOBIG; |
374 | 0 | } |
375 | | |
376 | 0 | pj_lock_acquire(ioqueue->lock); |
377 | |
|
378 | 0 | if (ioqueue->count >= ioqueue->max) { |
379 | 0 | rc = PJ_ETOOMANY; |
380 | 0 | goto on_return; |
381 | 0 | } |
382 | | |
383 | | /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get |
384 | | * the key from the free list. Otherwise allocate a new one. |
385 | | */ |
386 | 0 | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
387 | | |
388 | | /* Scan closing_keys first to let them come back to free_list */ |
389 | 0 | scan_closing_keys(ioqueue); |
390 | |
|
391 | 0 | pj_assert(!pj_list_empty(&ioqueue->free_list)); |
392 | 0 | if (pj_list_empty(&ioqueue->free_list)) { |
393 | 0 | rc = PJ_ETOOMANY; |
394 | 0 | goto on_return; |
395 | 0 | } |
396 | | |
397 | 0 | key = ioqueue->free_list.next; |
398 | 0 | pj_list_erase(key); |
399 | | #else |
400 | | key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); |
401 | | #endif |
402 | |
|
403 | 0 | rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb); |
404 | 0 | if (rc != PJ_SUCCESS) { |
405 | 0 | key = NULL; |
406 | 0 | goto on_return; |
407 | 0 | } |
408 | | |
409 | | /* Set socket to nonblocking. */ |
410 | 0 | value = 1; |
411 | | #if defined(PJ_WIN32) && PJ_WIN32!=0 || \ |
412 | | defined(PJ_WIN64) && PJ_WIN64 != 0 || \ |
413 | | defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0 |
414 | | if (ioctlsocket(sock, FIONBIO, &value)) { |
415 | | #else |
416 | 0 | if (ioctl(sock, FIONBIO, &value)) { |
417 | 0 | #endif |
418 | 0 | rc = pj_get_netos_error(); |
419 | 0 | goto on_return; |
420 | 0 | } |
421 | | |
422 | | |
423 | | /* Put in active list. */ |
424 | 0 | pj_list_insert_before(&ioqueue->active_list, key); |
425 | 0 | ++ioqueue->count; |
426 | | |
427 | | /* Rescan fdset to get max descriptor */ |
428 | 0 | rescan_fdset(ioqueue); |
429 | |
|
430 | 0 | on_return: |
431 | | /* On error, socket may be left in non-blocking mode. */ |
432 | 0 | if (rc != PJ_SUCCESS) { |
433 | 0 | if (key && key->grp_lock) |
434 | 0 | pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0); |
435 | 0 | } |
436 | 0 | *p_key = key; |
437 | 0 | pj_lock_release(ioqueue->lock); |
438 | | |
439 | 0 | return rc; |
440 | 0 | } |
441 | | |
442 | | PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, |
443 | | pj_ioqueue_t *ioqueue, |
444 | | pj_sock_t sock, |
445 | | void *user_data, |
446 | | const pj_ioqueue_callback *cb, |
447 | | pj_ioqueue_key_t **p_key) |
448 | 0 | { |
449 | 0 | return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data, |
450 | 0 | cb, p_key); |
451 | 0 | } |
452 | | |
453 | | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
454 | | /* Increment key's reference counter */ |
455 | | static void increment_counter(pj_ioqueue_key_t *key) |
456 | 0 | { |
457 | 0 | pj_mutex_lock(key->ioqueue->ref_cnt_mutex); |
458 | 0 | ++key->ref_count; |
459 | 0 | pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); |
460 | 0 | } |
461 | | |
462 | | /* Decrement the key's reference counter, and when the counter reach zero, |
463 | | * destroy the key. |
464 | | * |
465 | | * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. |
466 | | */ |
467 | | static void decrement_counter(pj_ioqueue_key_t *key) |
468 | 0 | { |
469 | 0 | pj_lock_acquire(key->ioqueue->lock); |
470 | 0 | pj_mutex_lock(key->ioqueue->ref_cnt_mutex); |
471 | 0 | --key->ref_count; |
472 | 0 | if (key->ref_count == 0) { |
473 | |
|
474 | 0 | pj_assert(key->closing == 1); |
475 | 0 | pj_gettickcount(&key->free_time); |
476 | 0 | key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; |
477 | 0 | pj_time_val_normalize(&key->free_time); |
478 | |
|
479 | 0 | pj_list_erase(key); |
480 | 0 | pj_list_push_back(&key->ioqueue->closing_list, key); |
481 | | /* Rescan fdset to get max descriptor */ |
482 | 0 | rescan_fdset(key->ioqueue); |
483 | 0 | } |
484 | 0 | pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); |
485 | 0 | pj_lock_release(key->ioqueue->lock); |
486 | 0 | } |
487 | | #endif |
488 | | |
489 | | |
490 | | /* |
491 | | * pj_ioqueue_unregister() |
492 | | * |
493 | | * Unregister handle from ioqueue. |
494 | | */ |
495 | | PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) |
496 | 0 | { |
497 | 0 | pj_ioqueue_t *ioqueue; |
498 | |
|
499 | 0 | PJ_ASSERT_RETURN(key, PJ_EINVAL); |
500 | | |
501 | 0 | ioqueue = key->ioqueue; |
502 | | |
503 | | /* Lock the key to make sure no callback is simultaneously modifying |
504 | | * the key. We need to lock the key before ioqueue here to prevent |
505 | | * deadlock. |
506 | | */ |
507 | 0 | pj_ioqueue_lock_key(key); |
508 | | |
509 | | /* Best effort to avoid double key-unregistration */ |
510 | 0 | if (IS_CLOSING(key)) { |
511 | 0 | pj_ioqueue_unlock_key(key); |
512 | 0 | return PJ_SUCCESS; |
513 | 0 | } |
514 | | |
515 | | /* Also lock ioqueue */ |
516 | 0 | pj_lock_acquire(ioqueue->lock); |
517 | | |
518 | | /* Avoid "negative" ioqueue count */ |
519 | 0 | if (ioqueue->count > 0) { |
520 | 0 | --ioqueue->count; |
521 | 0 | } else { |
522 | | /* If this happens, very likely there is double unregistration |
523 | | * of a key. |
524 | | */ |
525 | 0 | pj_assert(!"Bad ioqueue count in key unregistration!"); |
526 | 0 | PJ_LOG(1,(THIS_FILE, "Bad ioqueue count in key unregistration!")); |
527 | 0 | } |
528 | |
|
529 | | #if !PJ_IOQUEUE_HAS_SAFE_UNREG |
530 | | /* Ticket #520, key will be erased more than once */ |
531 | | pj_list_erase(key); |
532 | | #endif |
533 | | |
534 | | /* Remove socket from sets and close socket. */ |
535 | 0 | if (key->fd != PJ_INVALID_SOCKET) { |
536 | 0 | PJ_FD_CLR(key->fd, &ioqueue->rfdset); |
537 | 0 | PJ_FD_CLR(key->fd, &ioqueue->wfdset); |
538 | 0 | #if PJ_HAS_TCP |
539 | 0 | PJ_FD_CLR(key->fd, &ioqueue->xfdset); |
540 | 0 | #endif |
541 | |
|
542 | 0 | pj_sock_close(key->fd); |
543 | 0 | key->fd = PJ_INVALID_SOCKET; |
544 | 0 | } |
545 | | |
546 | | /* Clear callback */ |
547 | 0 | key->cb.on_accept_complete = NULL; |
548 | 0 | key->cb.on_connect_complete = NULL; |
549 | 0 | key->cb.on_read_complete = NULL; |
550 | 0 | key->cb.on_write_complete = NULL; |
551 | | |
552 | | /* Must release ioqueue lock first before decrementing counter, to |
553 | | * prevent deadlock. |
554 | | */ |
555 | 0 | pj_lock_release(ioqueue->lock); |
556 | | |
557 | | /* Mark key is closing. */ |
558 | 0 | key->closing = 1; |
559 | |
|
560 | 0 | pj_ioqueue_unlock_key(key); |
561 | |
|
562 | 0 | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
563 | | /* Decrement counter. */ |
564 | 0 | decrement_counter(key); |
565 | | #else |
566 | | /* Destroy the key lock */ |
567 | | pj_lock_destroy(key->lock); |
568 | | #endif |
569 | | |
570 | | /* Done. */ |
571 | 0 | if (key->grp_lock) { |
572 | 0 | pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0); |
573 | 0 | } |
574 | |
|
575 | 0 | return PJ_SUCCESS; |
576 | 0 | } |
577 | | |
578 | | |
579 | | /* This supposed to check whether the fd_set values are consistent |
580 | | * with the operation currently set in each key. |
581 | | */ |
582 | | #if VALIDATE_FD_SET |
583 | | static void validate_sets(const pj_ioqueue_t *ioqueue, |
584 | | const pj_fd_set_t *rfdset, |
585 | | const pj_fd_set_t *wfdset, |
586 | | const pj_fd_set_t *xfdset) |
587 | | { |
588 | | pj_ioqueue_key_t *key; |
589 | | |
590 | | /* |
591 | | * This basicly would not work anymore. |
592 | | * We need to lock key before performing the check, but we can't do |
593 | | * so because we're holding ioqueue mutex. If we acquire key's mutex |
594 | | * now, the will cause deadlock. |
595 | | */ |
596 | | pj_assert(0); |
597 | | |
598 | | key = ioqueue->active_list.next; |
599 | | while (key != &ioqueue->active_list) { |
600 | | if (!pj_list_empty(&key->read_list) |
601 | | #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 |
602 | | || !pj_list_empty(&key->accept_list) |
603 | | #endif |
604 | | ) |
605 | | { |
606 | | pj_assert(PJ_FD_ISSET(key->fd, rfdset)); |
607 | | } |
608 | | else { |
609 | | pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0); |
610 | | } |
611 | | if (!pj_list_empty(&key->write_list) |
612 | | #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 |
613 | | || key->connecting |
614 | | #endif |
615 | | ) |
616 | | { |
617 | | pj_assert(PJ_FD_ISSET(key->fd, wfdset)); |
618 | | } |
619 | | else { |
620 | | pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0); |
621 | | } |
622 | | #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 |
623 | | if (key->connecting) |
624 | | { |
625 | | pj_assert(PJ_FD_ISSET(key->fd, xfdset)); |
626 | | } |
627 | | else { |
628 | | pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0); |
629 | | } |
630 | | #endif /* PJ_HAS_TCP */ |
631 | | |
632 | | key = key->next; |
633 | | } |
634 | | } |
635 | | #endif /* VALIDATE_FD_SET */ |
636 | | |
637 | | |
638 | | /* ioqueue_remove_from_set() |
639 | | * This function is called from ioqueue_dispatch_event() to instruct |
640 | | * the ioqueue to remove the specified descriptor from ioqueue's descriptor |
641 | | * set for the specified event. |
642 | | */ |
643 | | static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, |
644 | | pj_ioqueue_key_t *key, |
645 | | enum ioqueue_event_type event_type ) |
646 | 0 | { |
647 | 0 | ioqueue_remove_from_set2(ioqueue, key, event_type); |
648 | 0 | } |
649 | | |
650 | | static void ioqueue_remove_from_set2(pj_ioqueue_t *ioqueue, |
651 | | pj_ioqueue_key_t *key, |
652 | | unsigned event_types) |
653 | 0 | { |
654 | 0 | pj_lock_acquire(ioqueue->lock); |
655 | |
|
656 | 0 | if (event_types & READABLE_EVENT) |
657 | 0 | PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset); |
658 | 0 | if (event_types & WRITEABLE_EVENT) |
659 | 0 | PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset); |
660 | 0 | #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 |
661 | 0 | if (event_types & EXCEPTION_EVENT) |
662 | 0 | PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset); |
663 | 0 | #endif |
664 | |
|
665 | 0 | pj_lock_release(ioqueue->lock); |
666 | 0 | } |
667 | | |
668 | | /* |
669 | | * ioqueue_add_to_set() |
670 | | * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc |
671 | | * to instruct the ioqueue to add the specified handle to ioqueue's descriptor |
672 | | * set for the specified event. |
673 | | */ |
674 | | static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, |
675 | | pj_ioqueue_key_t *key, |
676 | | enum ioqueue_event_type event_type ) |
677 | 0 | { |
678 | 0 | ioqueue_add_to_set2(ioqueue, key, event_type); |
679 | 0 | } |
680 | | |
681 | | static void ioqueue_add_to_set2(pj_ioqueue_t *ioqueue, |
682 | | pj_ioqueue_key_t *key, |
683 | | unsigned event_types ) |
684 | 0 | { |
685 | 0 | pj_lock_acquire(ioqueue->lock); |
686 | |
|
687 | 0 | if (event_types & READABLE_EVENT) |
688 | 0 | PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset); |
689 | 0 | if (event_types & WRITEABLE_EVENT) |
690 | 0 | PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset); |
691 | 0 | #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 |
692 | 0 | if (event_types & EXCEPTION_EVENT) |
693 | 0 | PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset); |
694 | 0 | #endif |
695 | |
|
696 | 0 | pj_lock_release(ioqueue->lock); |
697 | 0 | } |
698 | | |
699 | | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
700 | | /* Scan closing keys to be put to free list again */ |
701 | | static void scan_closing_keys(pj_ioqueue_t *ioqueue) |
702 | 0 | { |
703 | 0 | pj_time_val now; |
704 | 0 | pj_ioqueue_key_t *h; |
705 | |
|
706 | 0 | pj_gettickcount(&now); |
707 | 0 | h = ioqueue->closing_list.next; |
708 | 0 | while (h != &ioqueue->closing_list) { |
709 | 0 | pj_ioqueue_key_t *next = h->next; |
710 | |
|
711 | 0 | pj_assert(h->closing != 0); |
712 | |
|
713 | 0 | if (PJ_TIME_VAL_GTE(now, h->free_time)) { |
714 | 0 | pj_list_erase(h); |
715 | | // Don't set grp_lock to NULL otherwise the other thread |
716 | | // will crash. Just leave it as dangling pointer, but this |
717 | | // should be safe |
718 | | //h->grp_lock = NULL; |
719 | 0 | pj_list_push_back(&ioqueue->free_list, h); |
720 | 0 | } |
721 | 0 | h = next; |
722 | 0 | } |
723 | 0 | } |
724 | | #endif |
725 | | |
726 | | #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ |
727 | | PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 |
728 | | static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h) |
729 | | { |
730 | | enum flags { |
731 | | HAS_PEER_ADDR = 1, |
732 | | HAS_QOS = 2 |
733 | | }; |
734 | | pj_sock_t old_sock, new_sock = PJ_INVALID_SOCKET; |
735 | | pj_sockaddr local_addr, rem_addr; |
736 | | int val, addr_len; |
737 | | pj_fd_set_t *fds[3]; |
738 | | unsigned i, fds_cnt, flags=0; |
739 | | pj_qos_params qos_params; |
740 | | unsigned msec, msec2; |
741 | | pj_status_t status = PJ_EUNKNOWN; |
742 | | |
743 | | pj_lock_acquire(h->ioqueue->lock); |
744 | | |
745 | | old_sock = h->fd; |
746 | | |
747 | | fds_cnt = 0; |
748 | | fds[fds_cnt++] = &h->ioqueue->rfdset; |
749 | | fds[fds_cnt++] = &h->ioqueue->wfdset; |
750 | | #if PJ_HAS_TCP |
751 | | fds[fds_cnt++] = &h->ioqueue->xfdset; |
752 | | #endif |
753 | | |
754 | | /* Can only replace UDP socket */ |
755 | | pj_assert(h->fd_type == pj_SOCK_DGRAM()); |
756 | | |
757 | | PJ_LOG(4,(THIS_FILE, "Attempting to replace UDP socket %ld", old_sock)); |
758 | | |
759 | | for (msec=20; (msec<1000 && status != PJ_SUCCESS); msec=msec*2) |
760 | | { |
761 | | if (msec > 20) { |
762 | | PJ_LOG(4,(THIS_FILE, "Retry to replace UDP socket %ld", h->fd)); |
763 | | pj_thread_sleep(msec); |
764 | | } |
765 | | |
766 | | if (old_sock != PJ_INVALID_SOCKET) { |
767 | | /* Investigate the old socket */ |
768 | | addr_len = sizeof(local_addr); |
769 | | status = pj_sock_getsockname(old_sock, &local_addr, &addr_len); |
770 | | if (status != PJ_SUCCESS) { |
771 | | PJ_PERROR(5,(THIS_FILE, status, "Error get socket name")); |
772 | | continue; |
773 | | } |
774 | | |
775 | | addr_len = sizeof(rem_addr); |
776 | | status = pj_sock_getpeername(old_sock, &rem_addr, &addr_len); |
777 | | if (status != PJ_SUCCESS) { |
778 | | PJ_PERROR(5,(THIS_FILE, status, "Error get peer name")); |
779 | | } else { |
780 | | flags |= HAS_PEER_ADDR; |
781 | | } |
782 | | |
783 | | status = pj_sock_get_qos_params(old_sock, &qos_params); |
784 | | if (status == PJ_STATUS_FROM_OS(EBADF) || |
785 | | status == PJ_STATUS_FROM_OS(EINVAL)) |
786 | | { |
787 | | PJ_PERROR(5,(THIS_FILE, status, "Error get qos param")); |
788 | | continue; |
789 | | } |
790 | | |
791 | | if (status != PJ_SUCCESS) { |
792 | | PJ_PERROR(5,(THIS_FILE, status, "Error get qos param")); |
793 | | } else { |
794 | | flags |= HAS_QOS; |
795 | | } |
796 | | |
797 | | /* We're done with the old socket, close it otherwise we'll get |
798 | | * error in bind() |
799 | | */ |
800 | | status = pj_sock_close(old_sock); |
801 | | if (status != PJ_SUCCESS) { |
802 | | PJ_PERROR(5,(THIS_FILE, status, "Error closing socket")); |
803 | | } |
804 | | |
805 | | old_sock = PJ_INVALID_SOCKET; |
806 | | } |
807 | | |
808 | | /* Prepare the new socket */ |
809 | | status = pj_sock_socket(local_addr.addr.sa_family, PJ_SOCK_DGRAM, 0, |
810 | | &new_sock); |
811 | | if (status != PJ_SUCCESS) { |
812 | | PJ_PERROR(5,(THIS_FILE, status, "Error create socket")); |
813 | | continue; |
814 | | } |
815 | | |
816 | | /* Even after the socket is closed, we'll still get "Address in use" |
817 | | * errors, so force it with SO_REUSEADDR |
818 | | */ |
819 | | val = 1; |
820 | | status = pj_sock_setsockopt(new_sock, SOL_SOCKET, SO_REUSEADDR, |
821 | | &val, sizeof(val)); |
822 | | if (status == PJ_STATUS_FROM_OS(EBADF) || |
823 | | status == PJ_STATUS_FROM_OS(EINVAL)) |
824 | | { |
825 | | PJ_PERROR(5,(THIS_FILE, status, "Error set socket option")); |
826 | | continue; |
827 | | } |
828 | | |
829 | | /* The loop is silly, but what else can we do? */ |
830 | | addr_len = pj_sockaddr_get_len(&local_addr); |
831 | | for (msec2=20; msec2<1000 ; msec2=msec2*2) { |
832 | | status = pj_sock_bind(new_sock, &local_addr, addr_len); |
833 | | if (status != PJ_STATUS_FROM_OS(EADDRINUSE)) |
834 | | break; |
835 | | PJ_LOG(4,(THIS_FILE, "Address is still in use, retrying..")); |
836 | | pj_thread_sleep(msec2); |
837 | | } |
838 | | |
839 | | if (status != PJ_SUCCESS) |
840 | | continue; |
841 | | |
842 | | if (flags & HAS_QOS) { |
843 | | status = pj_sock_set_qos_params(new_sock, &qos_params); |
844 | | if (status == PJ_STATUS_FROM_OS(EINVAL)) { |
845 | | PJ_PERROR(5,(THIS_FILE, status, "Error set qos param")); |
846 | | continue; |
847 | | } |
848 | | } |
849 | | |
850 | | if (flags & HAS_PEER_ADDR) { |
851 | | status = pj_sock_connect(new_sock, &rem_addr, addr_len); |
852 | | if (status != PJ_SUCCESS) { |
853 | | PJ_PERROR(5,(THIS_FILE, status, "Error connect socket")); |
854 | | continue; |
855 | | } |
856 | | } |
857 | | } |
858 | | |
859 | | if (status != PJ_SUCCESS) |
860 | | goto on_error; |
861 | | |
862 | | /* Set socket to nonblocking. */ |
863 | | val = 1; |
864 | | #if defined(PJ_WIN32) && PJ_WIN32!=0 || \ |
865 | | defined(PJ_WIN64) && PJ_WIN64 != 0 || \ |
866 | | defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0 |
867 | | if (ioctlsocket(new_sock, FIONBIO, &val)) { |
868 | | #else |
869 | | if (ioctl(new_sock, FIONBIO, &val)) { |
870 | | #endif |
871 | | status = pj_get_netos_error(); |
872 | | goto on_error; |
873 | | } |
874 | | |
875 | | /* Replace the occurrence of old socket with new socket in the |
876 | | * fd sets. |
877 | | */ |
878 | | for (i=0; i<fds_cnt; ++i) { |
879 | | if (PJ_FD_ISSET(h->fd, fds[i])) { |
880 | | PJ_FD_CLR(h->fd, fds[i]); |
881 | | PJ_FD_SET(new_sock, fds[i]); |
882 | | } |
883 | | } |
884 | | |
885 | | /* And finally replace the fd in the key */ |
886 | | h->fd = new_sock; |
887 | | |
888 | | PJ_LOG(4,(THIS_FILE, "UDP has been replaced successfully!")); |
889 | | |
890 | | pj_lock_release(h->ioqueue->lock); |
891 | | |
892 | | return PJ_SUCCESS; |
893 | | |
894 | | on_error: |
895 | | if (new_sock != PJ_INVALID_SOCKET) |
896 | | pj_sock_close(new_sock); |
897 | | if (old_sock != PJ_INVALID_SOCKET) |
898 | | pj_sock_close(old_sock); |
899 | | |
900 | | /* Clear the occurrence of old socket in the fd sets. */ |
901 | | for (i=0; i<fds_cnt; ++i) { |
902 | | if (PJ_FD_ISSET(h->fd, fds[i])) { |
903 | | PJ_FD_CLR(h->fd, fds[i]); |
904 | | } |
905 | | } |
906 | | |
907 | | h->fd = PJ_INVALID_SOCKET; |
908 | | PJ_PERROR(1,(THIS_FILE, status, "Error replacing socket %ld", old_sock)); |
909 | | pj_lock_release(h->ioqueue->lock); |
910 | | return PJ_ESOCKETSTOP; |
911 | | } |
912 | | #endif |
913 | | |
914 | | |
915 | | /* |
916 | | * pj_ioqueue_poll() |
917 | | * |
918 | | * Few things worth written: |
919 | | * |
920 | | * - we used to do only one callback called per poll, but it didn't go |
921 | | * very well. The reason is because on some situation, the write |
922 | | * callback gets called all the time, thus doesn't give the read |
923 | | * callback to get called. This happens, for example, when user |
924 | | * submit write operation inside the write callback. |
925 | | * As the result, we changed the behaviour so that now multiple |
926 | | * callbacks are called in a single poll. It should be fast too, |
927 | | * just that we need to be carefull with the ioqueue data structs. |
928 | | * |
929 | | * - to guarantee preemptiveness etc, the poll function must strictly |
930 | | * work on fd_set copy of the ioqueue (not the original one). |
931 | | */ |
932 | | PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) |
933 | 0 | { |
934 | 0 | pj_fd_set_t rfdset, wfdset, xfdset; |
935 | 0 | int nfds; |
936 | 0 | int i, count, event_cnt, processed_cnt; |
937 | 0 | pj_ioqueue_key_t *h; |
938 | 0 | enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; |
939 | 0 | struct event |
940 | 0 | { |
941 | 0 | pj_ioqueue_key_t *key; |
942 | 0 | enum ioqueue_event_type event_type; |
943 | 0 | } event[MAX_EVENTS]; |
944 | |
|
945 | 0 | PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); |
946 | | |
947 | | #if defined(PJ_HAS_SSL_SOCK) && PJ_HAS_SSL_SOCK != 0 && \ |
948 | | (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_APPLE) |
949 | | /* Call SSL Network framework event poll */ |
950 | | ssl_network_event_poll(); |
951 | | #endif |
952 | | |
953 | | /* Lock ioqueue before making fd_set copies */ |
954 | 0 | pj_lock_acquire(ioqueue->lock); |
955 | | |
956 | | /* We will only do select() when there are sockets to be polled. |
957 | | * Otherwise select() will return error. |
958 | | */ |
959 | 0 | if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && |
960 | 0 | PJ_FD_COUNT(&ioqueue->wfdset)==0 |
961 | 0 | #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 |
962 | 0 | && PJ_FD_COUNT(&ioqueue->xfdset)==0 |
963 | 0 | #endif |
964 | 0 | ) |
965 | 0 | { |
966 | 0 | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
967 | 0 | scan_closing_keys(ioqueue); |
968 | 0 | #endif |
969 | 0 | pj_lock_release(ioqueue->lock); |
970 | 0 | TRACE__((THIS_FILE, " poll: no fd is set")); |
971 | 0 | if (timeout) |
972 | 0 | pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); |
973 | 0 | return 0; |
974 | 0 | } |
975 | | |
976 | | /* Copy ioqueue's pj_fd_set_t to local variables. */ |
977 | 0 | pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t)); |
978 | 0 | pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t)); |
979 | 0 | #if PJ_HAS_TCP |
980 | 0 | pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t)); |
981 | | #else |
982 | | PJ_FD_ZERO(&xfdset); |
983 | | #endif |
984 | |
|
985 | | #if VALIDATE_FD_SET |
986 | | validate_sets(ioqueue, &rfdset, &wfdset, &xfdset); |
987 | | #endif |
988 | |
|
989 | 0 | nfds = ioqueue->nfds; |
990 | | |
991 | | /* Unlock ioqueue before select(). */ |
992 | 0 | pj_lock_release(ioqueue->lock); |
993 | |
|
994 | | #if defined(PJ_WIN32_WINPHONE8) && PJ_WIN32_WINPHONE8 |
995 | | count = 0; |
996 | | __try { |
997 | | #endif |
998 | |
|
999 | 0 | count = pj_sock_select(nfds+1, &rfdset, &wfdset, &xfdset, |
1000 | 0 | timeout); |
1001 | |
|
1002 | | #if defined(PJ_WIN32_WINPHONE8) && PJ_WIN32_WINPHONE8 |
1003 | | /* Ignore Invalid Handle Exception raised by select().*/ |
1004 | | } |
1005 | | __except (GetExceptionCode() == STATUS_INVALID_HANDLE ? |
1006 | | EXCEPTION_CONTINUE_EXECUTION : EXCEPTION_CONTINUE_SEARCH) { |
1007 | | } |
1008 | | #endif |
1009 | | |
1010 | 0 | if (count == 0) |
1011 | 0 | return 0; |
1012 | 0 | else if (count < 0) |
1013 | 0 | return -pj_get_netos_error(); |
1014 | | |
1015 | | /* Scan descriptor sets for event and add the events in the event |
1016 | | * array to be processed later in this function. We do this so that |
1017 | | * events can be processed in parallel without holding ioqueue lock. |
1018 | | */ |
1019 | 0 | pj_lock_acquire(ioqueue->lock); |
1020 | |
|
1021 | 0 | event_cnt = 0; |
1022 | | |
1023 | | /* Scan for writable sockets first to handle piggy-back data |
1024 | | * coming with accept(). |
1025 | | */ |
1026 | 0 | for (h = ioqueue->active_list.next; |
1027 | 0 | h != &ioqueue->active_list && event_cnt < MAX_EVENTS; |
1028 | 0 | h = h->next) |
1029 | 0 | { |
1030 | 0 | if (h->fd == PJ_INVALID_SOCKET) |
1031 | 0 | continue; |
1032 | | |
1033 | 0 | if ( (key_has_pending_write(h) || key_has_pending_connect(h)) |
1034 | 0 | && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h)) |
1035 | 0 | { |
1036 | 0 | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
1037 | 0 | increment_counter(h); |
1038 | 0 | #endif |
1039 | 0 | event[event_cnt].key = h; |
1040 | 0 | event[event_cnt].event_type = WRITEABLE_EVENT; |
1041 | 0 | ++event_cnt; |
1042 | 0 | } |
1043 | | |
1044 | | /* Scan for readable socket. */ |
1045 | 0 | if ((key_has_pending_read(h) || key_has_pending_accept(h)) |
1046 | 0 | && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) && |
1047 | 0 | event_cnt < MAX_EVENTS) |
1048 | 0 | { |
1049 | 0 | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
1050 | 0 | increment_counter(h); |
1051 | 0 | #endif |
1052 | 0 | event[event_cnt].key = h; |
1053 | 0 | event[event_cnt].event_type = READABLE_EVENT; |
1054 | 0 | ++event_cnt; |
1055 | 0 | } |
1056 | |
|
1057 | 0 | #if PJ_HAS_TCP |
1058 | 0 | if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) && |
1059 | 0 | !IS_CLOSING(h) && event_cnt < MAX_EVENTS) |
1060 | 0 | { |
1061 | 0 | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
1062 | 0 | increment_counter(h); |
1063 | 0 | #endif |
1064 | 0 | event[event_cnt].key = h; |
1065 | 0 | event[event_cnt].event_type = EXCEPTION_EVENT; |
1066 | 0 | ++event_cnt; |
1067 | 0 | } |
1068 | 0 | #endif |
1069 | 0 | } |
1070 | |
|
1071 | 0 | for (i=0; i<event_cnt; ++i) { |
1072 | 0 | if (event[i].key->grp_lock) |
1073 | 0 | pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); |
1074 | 0 | } |
1075 | |
|
1076 | 0 | PJ_RACE_ME(5); |
1077 | |
|
1078 | 0 | pj_lock_release(ioqueue->lock); |
1079 | |
|
1080 | 0 | PJ_RACE_ME(5); |
1081 | |
|
1082 | 0 | processed_cnt = 0; |
1083 | | |
1084 | | /* Now process all events. The dispatch functions will take care |
1085 | | * of locking in each of the key |
1086 | | */ |
1087 | 0 | for (i=0; i<event_cnt; ++i) { |
1088 | | |
1089 | | /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */ |
1090 | 0 | if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) { |
1091 | 0 | switch (event[i].event_type) { |
1092 | 0 | case READABLE_EVENT: |
1093 | 0 | if (ioqueue_dispatch_read_event(ioqueue, event[i].key)) |
1094 | 0 | ++processed_cnt; |
1095 | 0 | break; |
1096 | 0 | case WRITEABLE_EVENT: |
1097 | 0 | if (ioqueue_dispatch_write_event(ioqueue, event[i].key)) |
1098 | 0 | ++processed_cnt; |
1099 | 0 | break; |
1100 | 0 | case EXCEPTION_EVENT: |
1101 | 0 | if (ioqueue_dispatch_exception_event(ioqueue, event[i].key)) |
1102 | 0 | ++processed_cnt; |
1103 | 0 | break; |
1104 | 0 | case NO_EVENT: |
1105 | 0 | pj_assert(!"Invalid event!"); |
1106 | 0 | break; |
1107 | 0 | } |
1108 | 0 | } |
1109 | | |
1110 | 0 | #if PJ_IOQUEUE_HAS_SAFE_UNREG |
1111 | 0 | decrement_counter(event[i].key); |
1112 | 0 | #endif |
1113 | |
|
1114 | 0 | if (event[i].key->grp_lock) |
1115 | 0 | pj_grp_lock_dec_ref_dbg(event[i].key->grp_lock, |
1116 | 0 | "ioqueue", 0); |
1117 | 0 | } |
1118 | | |
1119 | 0 | TRACE__((THIS_FILE, " poll: count=%d events=%d processed=%d", |
1120 | 0 | count, event_cnt, processed_cnt)); |
1121 | |
|
1122 | 0 | return processed_cnt; |
1123 | 0 | } |
1124 | | |
1125 | | PJ_DEF(pj_oshandle_t) pj_ioqueue_get_os_handle( pj_ioqueue_t *ioqueue ) |
1126 | 0 | { |
1127 | 0 | PJ_UNUSED_ARG(ioqueue); |
1128 | | return NULL; |
1129 | 0 | } |
1130 | | |
1131 | | #endif /* PJ_IOQUEUE_IMP == PJ_IOQUEUE_IMP_SELECT */ |