Coverage Report

Created: 2025-10-10 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/pjsip/pjlib/src/pj/ioqueue_select.c
Line
Count
Source
1
/* 
2
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
3
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License
16
 * along with this program; if not, write to the Free Software
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
18
 */
19
20
/*
21
 * sock_select.c
22
 *
23
 * This is the implementation of IOQueue using pj_sock_select().
24
 * It runs anywhere where pj_sock_select() is available (currently
25
 * Win32, Linux, Linux kernel, etc.).
26
 */
27
28
#include <pj/ioqueue.h>
29
#include <pj/os.h>
30
#include <pj/lock.h>
31
#include <pj/log.h>
32
#include <pj/list.h>
33
#include <pj/pool.h>
34
#include <pj/string.h>
35
#include <pj/assert.h>
36
#include <pj/sock.h>
37
#include <pj/compat/socket.h>
38
#include <pj/sock_select.h>
39
#include <pj/sock_qos.h>
40
#include <pj/errno.h>
41
#include <pj/rand.h>
42
43
44
/* Only build when the backend is using select(). */
45
#if PJ_IOQUEUE_IMP == PJ_IOQUEUE_IMP_SELECT
46
47
48
/* Now that we have access to OS'es <sys/select>, lets check again that
49
 * PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE
50
 */
51
#if PJ_IOQUEUE_MAX_HANDLES > FD_SETSIZE
52
#   error "PJ_IOQUEUE_MAX_HANDLES cannot be greater than FD_SETSIZE"
53
#endif
54
55
56
/*
57
 * Include declaration from common abstraction.
58
 */
59
#include "ioqueue_common_abs.h"
60
61
/*
62
 * ISSUES with ioqueue_select()
63
 *
64
 * EAGAIN/EWOULDBLOCK error in recv():
65
 *  - when multiple threads are working with the ioqueue, application
66
 *    may receive EAGAIN or EWOULDBLOCK in the receive callback.
67
 *    This error happens because more than one thread is watching for
68
 *    the same descriptor set, so when all of them call recv() or recvfrom()
69
 *    simultaneously, only one will succeed and the rest will get the error.
70
 *
71
 */
72
#define THIS_FILE   "ioq_select"
73
74
/*
75
 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
76
 * the correct error code.
77
 */
78
#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
79
#   error "Error reporting must be enabled for this function to work!"
80
#endif
81
82
/*
83
 * During debugging build, VALIDATE_FD_SET is set.
84
 * This will check the validity of the fd_sets.
85
 */
86
/*
87
#if defined(PJ_DEBUG) && PJ_DEBUG != 0
88
#  define VALIDATE_FD_SET               1
89
#else
90
#  define VALIDATE_FD_SET               0
91
#endif
92
*/
93
#define VALIDATE_FD_SET     0
94
95
#if 0
96
#  define TRACE__(args) PJ_LOG(3,args)
97
#else
98
#  define TRACE__(args)
99
#endif
100
101
/*
102
 * This describes each key.
103
 */
104
struct pj_ioqueue_key_t
105
{
106
    DECLARE_COMMON_KEY
107
};
108
109
/*
110
 * This describes the I/O queue itself.
111
 */
112
struct pj_ioqueue_t
113
{
114
    DECLARE_COMMON_IOQUEUE
115
116
    unsigned            max, count;     /* Max and current key count        */
117
    int                 nfds;           /* The largest fd value (for select)*/
118
    pj_ioqueue_key_t    active_list;    /* List of active keys.             */
119
    pj_fd_set_t         rfdset;
120
    pj_fd_set_t         wfdset;
121
#if PJ_HAS_TCP
122
    pj_fd_set_t         xfdset;
123
#endif
124
125
#if PJ_IOQUEUE_HAS_SAFE_UNREG
126
    pj_mutex_t         *ref_cnt_mutex;
127
    pj_ioqueue_key_t    closing_list;
128
    pj_ioqueue_key_t    free_list;
129
#endif
130
};
131
132
/* Proto */
133
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
134
            PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
135
static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h);
136
#endif
137
138
#if defined(PJ_HAS_SSL_SOCK) && PJ_HAS_SSL_SOCK != 0 && \
139
    (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_APPLE)
140
    /* Call SSL Network framework poll */
141
pj_status_t ssl_network_event_poll();
142
#endif
143
144
/* Include implementation for common abstraction after we declare
145
 * pj_ioqueue_key_t and pj_ioqueue_t.
146
 */
147
#include "ioqueue_common_abs.c"
148
149
#if PJ_IOQUEUE_HAS_SAFE_UNREG
150
/* Scan closing keys to be put to free list again */
151
static void scan_closing_keys(pj_ioqueue_t *ioqueue);
152
#endif
153
154
/*
155
 * pj_ioqueue_name()
156
 */
157
PJ_DEF(const char*) pj_ioqueue_name(void)
158
0
{
159
0
    return "select";
160
0
}
161
162
/* 
163
 * Scan the socket descriptor sets for the largest descriptor.
164
 * This value is needed by select().
165
 */
166
#if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0
167
static void rescan_fdset(pj_ioqueue_t *ioqueue)
168
{
169
    pj_ioqueue_key_t *key = ioqueue->active_list.next;
170
    int max = 0;
171
172
    while (key != &ioqueue->active_list) {
173
        if (key->fd > max)
174
            max = key->fd;
175
        key = key->next;
176
    }
177
178
    ioqueue->nfds = max;
179
}
180
#else
181
static void rescan_fdset(pj_ioqueue_t *ioqueue)
182
0
{
183
0
    ioqueue->nfds = FD_SETSIZE-1;
184
0
}
185
#endif
186
187
188
/*
189
 * pj_ioqueue_create()
190
 *
191
 * Create select ioqueue.
192
 */
193
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
194
                                       pj_size_t max_fd,
195
                                       pj_ioqueue_t **p_ioqueue)
196
0
{
197
0
    return pj_ioqueue_create2(pool, max_fd, NULL, p_ioqueue);
198
0
}
199
200
201
/*
202
 * pj_ioqueue_create2()
203
 *
204
 * Create select ioqueue.
205
 */
206
PJ_DEF(pj_status_t) pj_ioqueue_create2(pj_pool_t *pool,
207
                                       pj_size_t max_fd,
208
                                       const pj_ioqueue_cfg *cfg,
209
                                       pj_ioqueue_t **p_ioqueue)
210
0
{
211
0
    pj_ioqueue_t *ioqueue;
212
0
    pj_lock_t *lock;
213
0
    pj_size_t i;
214
0
    pj_status_t rc;
215
216
    /* Check that arguments are valid. */
217
0
    PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 
218
0
                     max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, 
219
0
                     PJ_EINVAL);
220
221
    /* Check that size of pj_ioqueue_op_key_t is sufficient */
222
0
    PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
223
0
                     sizeof(union operation_key), PJ_EBUG);
224
225
    /* Create and init common ioqueue stuffs */
226
0
    ioqueue = PJ_POOL_ALLOC_T(pool, pj_ioqueue_t);
227
0
    ioqueue_init(ioqueue);
228
229
0
    if (cfg)
230
0
        pj_memcpy(&ioqueue->cfg, cfg, sizeof(*cfg));
231
0
    else
232
0
        pj_ioqueue_cfg_default(&ioqueue->cfg);
233
0
    ioqueue->max = (unsigned)max_fd;
234
0
    ioqueue->count = 0;
235
0
    PJ_FD_ZERO(&ioqueue->rfdset);
236
0
    PJ_FD_ZERO(&ioqueue->wfdset);
237
0
#if PJ_HAS_TCP
238
0
    PJ_FD_ZERO(&ioqueue->xfdset);
239
0
#endif
240
0
    pj_list_init(&ioqueue->active_list);
241
242
0
    rescan_fdset(ioqueue);
243
244
0
#if PJ_IOQUEUE_HAS_SAFE_UNREG
245
    /* When safe unregistration is used (the default), we pre-create
246
     * all keys and put them in the free list.
247
     */
248
249
    /* Mutex to protect key's reference counter 
250
     * We don't want to use key's mutex or ioqueue's mutex because
251
     * that would create deadlock situation in some cases.
252
     */
253
0
    rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
254
0
    if (rc != PJ_SUCCESS)
255
0
        return rc;
256
257
258
    /* Init key list */
259
0
    pj_list_init(&ioqueue->free_list);
260
0
    pj_list_init(&ioqueue->closing_list);
261
262
263
    /* Pre-create all keys according to max_fd */
264
0
    for (i=0; i<max_fd; ++i) {
265
0
        pj_ioqueue_key_t *key;
266
267
0
        key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
268
0
        key->ref_count = 0;
269
0
        rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock);
270
0
        if (rc != PJ_SUCCESS) {
271
0
            key = ioqueue->free_list.next;
272
0
            while (key != &ioqueue->free_list) {
273
0
                pj_lock_destroy(key->lock);
274
0
                key = key->next;
275
0
            }
276
0
            pj_mutex_destroy(ioqueue->ref_cnt_mutex);
277
0
            return rc;
278
0
        }
279
280
0
        pj_list_push_back(&ioqueue->free_list, key);
281
0
    }
282
0
#endif
283
284
    /* Create and init ioqueue mutex */
285
0
    rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
286
0
    if (rc != PJ_SUCCESS)
287
0
        return rc;
288
289
0
    rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
290
0
    if (rc != PJ_SUCCESS)
291
0
        return rc;
292
293
0
    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
294
295
0
    *p_ioqueue = ioqueue;
296
0
    return PJ_SUCCESS;
297
0
}
298
299
/*
300
 * pj_ioqueue_destroy()
301
 *
302
 * Destroy ioqueue.
303
 */
304
PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
305
0
{
306
0
    pj_ioqueue_key_t *key;
307
308
0
    PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
309
310
0
    pj_lock_acquire(ioqueue->lock);
311
312
0
#if PJ_IOQUEUE_HAS_SAFE_UNREG
313
    /* Destroy reference counters */
314
0
    key = ioqueue->active_list.next;
315
0
    while (key != &ioqueue->active_list) {
316
0
        pj_lock_destroy(key->lock);
317
0
        key = key->next;
318
0
    }
319
320
0
    key = ioqueue->closing_list.next;
321
0
    while (key != &ioqueue->closing_list) {
322
0
        pj_lock_destroy(key->lock);
323
0
        key = key->next;
324
0
    }
325
326
0
    key = ioqueue->free_list.next;
327
0
    while (key != &ioqueue->free_list) {
328
0
        pj_lock_destroy(key->lock);
329
0
        key = key->next;
330
0
    }
331
332
0
    pj_mutex_destroy(ioqueue->ref_cnt_mutex);
333
0
#endif
334
335
0
    return ioqueue_destroy(ioqueue);
336
0
}
337
338
339
/*
340
 * pj_ioqueue_register_sock()
341
 *
342
 * Register socket handle to ioqueue.
343
 */
344
PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
345
                                              pj_ioqueue_t *ioqueue,
346
                                              pj_sock_t sock,
347
                                              pj_grp_lock_t *grp_lock,
348
                                              void *user_data,
349
                                              const pj_ioqueue_callback *cb,
350
                                              pj_ioqueue_key_t **p_key)
351
0
{
352
0
    pj_ioqueue_key_t *key = NULL;
353
#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
354
    defined(PJ_WIN64) && PJ_WIN64 != 0 || \
355
    defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
356
    u_long value;
357
#else
358
0
    pj_uint32_t value;
359
0
#endif
360
0
    pj_status_t rc = PJ_SUCCESS;
361
    
362
0
    PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
363
0
                     cb && p_key, PJ_EINVAL);
364
365
    /* On platforms with fd_set containing fd bitmap such as *nix family,
366
     * avoid potential memory corruption caused by select() when given
367
     * an fd that is higher than FD_SETSIZE.
368
     */
369
0
    if (sizeof(fd_set) < FD_SETSIZE && sock >= FD_SETSIZE) {
370
0
        PJ_LOG(4, ("pjlib", "Failed to register socket to ioqueue because "
371
0
                            "socket fd is too big (fd=%ld/FD_SETSIZE=%d)",
372
0
                            sock, FD_SETSIZE));
373
0
        return PJ_ETOOBIG;
374
0
    }
375
376
0
    pj_lock_acquire(ioqueue->lock);
377
378
0
    if (ioqueue->count >= ioqueue->max) {
379
0
        rc = PJ_ETOOMANY;
380
0
        goto on_return;
381
0
    }
382
383
    /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
384
     * the key from the free list. Otherwise allocate a new one. 
385
     */
386
0
#if PJ_IOQUEUE_HAS_SAFE_UNREG
387
388
    /* Scan closing_keys first to let them come back to free_list */
389
0
    scan_closing_keys(ioqueue);
390
391
0
    pj_assert(!pj_list_empty(&ioqueue->free_list));
392
0
    if (pj_list_empty(&ioqueue->free_list)) {
393
0
        rc = PJ_ETOOMANY;
394
0
        goto on_return;
395
0
    }
396
397
0
    key = ioqueue->free_list.next;
398
0
    pj_list_erase(key);
399
#else
400
    key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
401
#endif
402
403
0
    rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb);
404
0
    if (rc != PJ_SUCCESS) {
405
0
        key = NULL;
406
0
        goto on_return;
407
0
    }
408
409
    /* Set socket to nonblocking. */
410
0
    value = 1;
411
#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
412
    defined(PJ_WIN64) && PJ_WIN64 != 0 || \
413
    defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
414
    if (ioctlsocket(sock, FIONBIO, &value)) {
415
#else
416
0
    if (ioctl(sock, FIONBIO, &value)) {
417
0
#endif
418
0
        rc = pj_get_netos_error();
419
0
        goto on_return;
420
0
    }
421
422
423
    /* Put in active list. */
424
0
    pj_list_insert_before(&ioqueue->active_list, key);
425
0
    ++ioqueue->count;
426
427
    /* Rescan fdset to get max descriptor */
428
0
    rescan_fdset(ioqueue);
429
430
0
on_return:
431
    /* On error, socket may be left in non-blocking mode. */
432
0
    if (rc != PJ_SUCCESS) {
433
0
        if (key && key->grp_lock)
434
0
            pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0);
435
0
    }
436
0
    *p_key = key;
437
0
    pj_lock_release(ioqueue->lock);
438
    
439
0
    return rc;
440
0
}
441
442
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
443
                                              pj_ioqueue_t *ioqueue,
444
                                              pj_sock_t sock,
445
                                              void *user_data,
446
                                              const pj_ioqueue_callback *cb,
447
                                              pj_ioqueue_key_t **p_key)
448
0
{
449
0
    return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data,
450
0
                                     cb, p_key);
451
0
}
452
453
#if PJ_IOQUEUE_HAS_SAFE_UNREG
454
/* Increment key's reference counter */
455
static void increment_counter(pj_ioqueue_key_t *key)
456
0
{
457
0
    pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
458
0
    ++key->ref_count;
459
0
    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
460
0
}
461
462
/* Decrement the key's reference counter, and when the counter reach zero,
463
 * destroy the key.
464
 *
465
 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
466
 */
467
static void decrement_counter(pj_ioqueue_key_t *key)
468
0
{
469
0
    pj_lock_acquire(key->ioqueue->lock);
470
0
    pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
471
0
    --key->ref_count;
472
0
    if (key->ref_count == 0) {
473
474
0
        pj_assert(key->closing == 1);
475
0
        pj_gettickcount(&key->free_time);
476
0
        key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
477
0
        pj_time_val_normalize(&key->free_time);
478
479
0
        pj_list_erase(key);
480
0
        pj_list_push_back(&key->ioqueue->closing_list, key);
481
        /* Rescan fdset to get max descriptor */
482
0
        rescan_fdset(key->ioqueue);
483
0
    }
484
0
    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
485
0
    pj_lock_release(key->ioqueue->lock);
486
0
}
487
#endif
488
489
490
/*
491
 * pj_ioqueue_unregister()
492
 *
493
 * Unregister handle from ioqueue.
494
 */
495
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
496
0
{
497
0
    pj_ioqueue_t *ioqueue;
498
499
0
    PJ_ASSERT_RETURN(key, PJ_EINVAL);
500
501
0
    ioqueue = key->ioqueue;
502
503
    /* Lock the key to make sure no callback is simultaneously modifying
504
     * the key. We need to lock the key before ioqueue here to prevent
505
     * deadlock.
506
     */
507
0
    pj_ioqueue_lock_key(key);
508
509
    /* Best effort to avoid double key-unregistration */
510
0
    if (IS_CLOSING(key)) {
511
0
        pj_ioqueue_unlock_key(key);
512
0
        return PJ_SUCCESS;
513
0
    }
514
515
    /* Also lock ioqueue */
516
0
    pj_lock_acquire(ioqueue->lock);
517
518
    /* Avoid "negative" ioqueue count */
519
0
    if (ioqueue->count > 0) {
520
0
        --ioqueue->count;
521
0
    } else {
522
        /* If this happens, very likely there is double unregistration
523
         * of a key.
524
         */
525
0
        pj_assert(!"Bad ioqueue count in key unregistration!");
526
0
        PJ_LOG(1,(THIS_FILE, "Bad ioqueue count in key unregistration!"));
527
0
    }
528
529
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
530
    /* Ticket #520, key will be erased more than once */
531
    pj_list_erase(key);
532
#endif
533
534
    /* Remove socket from sets and close socket. */
535
0
    if (key->fd != PJ_INVALID_SOCKET) {
536
0
        PJ_FD_CLR(key->fd, &ioqueue->rfdset);
537
0
        PJ_FD_CLR(key->fd, &ioqueue->wfdset);
538
0
#if PJ_HAS_TCP
539
0
        PJ_FD_CLR(key->fd, &ioqueue->xfdset);
540
0
#endif
541
542
0
        pj_sock_close(key->fd);
543
0
        key->fd = PJ_INVALID_SOCKET;
544
0
    }
545
546
    /* Clear callback */
547
0
    key->cb.on_accept_complete = NULL;
548
0
    key->cb.on_connect_complete = NULL;
549
0
    key->cb.on_read_complete = NULL;
550
0
    key->cb.on_write_complete = NULL;
551
552
    /* Must release ioqueue lock first before decrementing counter, to
553
     * prevent deadlock.
554
     */
555
0
    pj_lock_release(ioqueue->lock);
556
557
    /* Mark key is closing. */
558
0
    key->closing = 1;
559
560
0
    pj_ioqueue_unlock_key(key);
561
562
0
#if PJ_IOQUEUE_HAS_SAFE_UNREG
563
    /* Decrement counter. */
564
0
    decrement_counter(key);
565
#else
566
    /* Destroy the key lock */
567
    pj_lock_destroy(key->lock);
568
#endif
569
570
    /* Done. */
571
0
    if (key->grp_lock) {
572
0
        pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0);
573
0
    }
574
575
0
    return PJ_SUCCESS;
576
0
}
577
578
579
/* This supposed to check whether the fd_set values are consistent
580
 * with the operation currently set in each key.
581
 */
582
#if VALIDATE_FD_SET
583
static void validate_sets(const pj_ioqueue_t *ioqueue,
584
                          const pj_fd_set_t *rfdset,
585
                          const pj_fd_set_t *wfdset,
586
                          const pj_fd_set_t *xfdset)
587
{
588
    pj_ioqueue_key_t *key;
589
590
    /*
591
     * This basicly would not work anymore.
592
     * We need to lock key before performing the check, but we can't do
593
     * so because we're holding ioqueue mutex. If we acquire key's mutex
594
     * now, the will cause deadlock.
595
     */
596
    pj_assert(0);
597
598
    key = ioqueue->active_list.next;
599
    while (key != &ioqueue->active_list) {
600
        if (!pj_list_empty(&key->read_list)
601
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
602
            || !pj_list_empty(&key->accept_list)
603
#endif
604
            ) 
605
        {
606
            pj_assert(PJ_FD_ISSET(key->fd, rfdset));
607
        } 
608
        else {
609
            pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
610
        }
611
        if (!pj_list_empty(&key->write_list)
612
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
613
            || key->connecting
614
#endif
615
           )
616
        {
617
            pj_assert(PJ_FD_ISSET(key->fd, wfdset));
618
        }
619
        else {
620
            pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
621
        }
622
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
623
        if (key->connecting)
624
        {
625
            pj_assert(PJ_FD_ISSET(key->fd, xfdset));
626
        }
627
        else {
628
            pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
629
        }
630
#endif /* PJ_HAS_TCP */
631
632
        key = key->next;
633
    }
634
}
635
#endif  /* VALIDATE_FD_SET */
636
637
638
/* ioqueue_remove_from_set()
639
 * This function is called from ioqueue_dispatch_event() to instruct
640
 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
641
 * set for the specified event.
642
 */
643
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
644
                                     pj_ioqueue_key_t *key,
645
                                     enum ioqueue_event_type event_type )
646
0
{
647
0
    ioqueue_remove_from_set2(ioqueue, key, event_type);
648
0
}
649
650
static void ioqueue_remove_from_set2(pj_ioqueue_t *ioqueue,
651
                                     pj_ioqueue_key_t *key, 
652
                                     unsigned event_types)
653
0
{
654
0
    pj_lock_acquire(ioqueue->lock);
655
656
0
    if (event_types & READABLE_EVENT)
657
0
        PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
658
0
    if (event_types & WRITEABLE_EVENT)
659
0
        PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
660
0
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
661
0
    if (event_types & EXCEPTION_EVENT)
662
0
        PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
663
0
#endif
664
665
0
    pj_lock_release(ioqueue->lock);
666
0
}
667
668
/*
669
 * ioqueue_add_to_set()
670
 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
671
 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
672
 * set for the specified event.
673
 */
674
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
675
                                pj_ioqueue_key_t *key,
676
                                enum ioqueue_event_type event_type )
677
0
{
678
0
    ioqueue_add_to_set2(ioqueue, key, event_type);
679
0
}
680
681
static void ioqueue_add_to_set2(pj_ioqueue_t *ioqueue,
682
                                pj_ioqueue_key_t *key,
683
                                unsigned event_types )
684
0
{
685
0
    pj_lock_acquire(ioqueue->lock);
686
687
0
    if (event_types & READABLE_EVENT)
688
0
        PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
689
0
    if (event_types & WRITEABLE_EVENT)
690
0
        PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
691
0
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
692
0
    if (event_types & EXCEPTION_EVENT)
693
0
        PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
694
0
#endif
695
696
0
    pj_lock_release(ioqueue->lock);
697
0
}
698
699
#if PJ_IOQUEUE_HAS_SAFE_UNREG
700
/* Scan closing keys to be put to free list again */
701
static void scan_closing_keys(pj_ioqueue_t *ioqueue)
702
0
{
703
0
    pj_time_val now;
704
0
    pj_ioqueue_key_t *h;
705
706
0
    pj_gettickcount(&now);
707
0
    h = ioqueue->closing_list.next;
708
0
    while (h != &ioqueue->closing_list) {
709
0
        pj_ioqueue_key_t *next = h->next;
710
711
0
        pj_assert(h->closing != 0);
712
713
0
        if (PJ_TIME_VAL_GTE(now, h->free_time)) {
714
0
            pj_list_erase(h);
715
            // Don't set grp_lock to NULL otherwise the other thread
716
            // will crash. Just leave it as dangling pointer, but this
717
            // should be safe
718
            //h->grp_lock = NULL;
719
0
            pj_list_push_back(&ioqueue->free_list, h);
720
0
        }
721
0
        h = next;
722
0
    }
723
0
}
724
#endif
725
726
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
727
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
728
static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h)
729
{
730
    enum flags {
731
        HAS_PEER_ADDR = 1,
732
        HAS_QOS = 2
733
    };
734
    pj_sock_t old_sock, new_sock = PJ_INVALID_SOCKET;
735
    pj_sockaddr local_addr, rem_addr;
736
    int val, addr_len;
737
    pj_fd_set_t *fds[3];
738
    unsigned i, fds_cnt, flags=0;
739
    pj_qos_params qos_params;
740
    unsigned msec, msec2;
741
    pj_status_t status = PJ_EUNKNOWN;
742
743
    pj_lock_acquire(h->ioqueue->lock);
744
745
    old_sock = h->fd;
746
747
    fds_cnt = 0;
748
    fds[fds_cnt++] = &h->ioqueue->rfdset;
749
    fds[fds_cnt++] = &h->ioqueue->wfdset;
750
#if PJ_HAS_TCP
751
    fds[fds_cnt++] = &h->ioqueue->xfdset;
752
#endif
753
754
    /* Can only replace UDP socket */
755
    pj_assert(h->fd_type == pj_SOCK_DGRAM());
756
757
    PJ_LOG(4,(THIS_FILE, "Attempting to replace UDP socket %ld", old_sock));
758
759
    for (msec=20; (msec<1000 && status != PJ_SUCCESS); msec=msec*2)
760
    {
761
        if (msec > 20) {
762
            PJ_LOG(4,(THIS_FILE, "Retry to replace UDP socket %ld", h->fd));
763
            pj_thread_sleep(msec);
764
        }
765
        
766
        if (old_sock != PJ_INVALID_SOCKET) {
767
            /* Investigate the old socket */
768
            addr_len = sizeof(local_addr);
769
            status = pj_sock_getsockname(old_sock, &local_addr, &addr_len);
770
            if (status != PJ_SUCCESS) {
771
                PJ_PERROR(5,(THIS_FILE, status, "Error get socket name"));
772
                continue;
773
            }
774
        
775
            addr_len = sizeof(rem_addr);
776
            status = pj_sock_getpeername(old_sock, &rem_addr, &addr_len);
777
            if (status != PJ_SUCCESS) {
778
                PJ_PERROR(5,(THIS_FILE, status, "Error get peer name"));
779
            } else {
780
                flags |= HAS_PEER_ADDR;
781
            }
782
783
            status = pj_sock_get_qos_params(old_sock, &qos_params);
784
            if (status == PJ_STATUS_FROM_OS(EBADF) ||
785
                status == PJ_STATUS_FROM_OS(EINVAL))
786
            {
787
                PJ_PERROR(5,(THIS_FILE, status, "Error get qos param"));
788
                continue;
789
            }
790
        
791
            if (status != PJ_SUCCESS) {
792
                PJ_PERROR(5,(THIS_FILE, status, "Error get qos param"));
793
            } else {
794
                flags |= HAS_QOS;
795
            }
796
797
            /* We're done with the old socket, close it otherwise we'll get
798
             * error in bind()
799
             */
800
            status = pj_sock_close(old_sock);
801
            if (status != PJ_SUCCESS) {
802
                PJ_PERROR(5,(THIS_FILE, status, "Error closing socket"));
803
            }
804
            
805
            old_sock = PJ_INVALID_SOCKET;
806
        }
807
808
        /* Prepare the new socket */
809
        status = pj_sock_socket(local_addr.addr.sa_family, PJ_SOCK_DGRAM, 0,
810
                                &new_sock);
811
        if (status != PJ_SUCCESS) {
812
            PJ_PERROR(5,(THIS_FILE, status, "Error create socket"));
813
            continue;
814
        }
815
816
        /* Even after the socket is closed, we'll still get "Address in use"
817
         * errors, so force it with SO_REUSEADDR
818
         */
819
        val = 1;
820
        status = pj_sock_setsockopt(new_sock, SOL_SOCKET, SO_REUSEADDR,
821
                                    &val, sizeof(val));
822
        if (status == PJ_STATUS_FROM_OS(EBADF) ||
823
            status == PJ_STATUS_FROM_OS(EINVAL))
824
        {
825
            PJ_PERROR(5,(THIS_FILE, status, "Error set socket option"));
826
            continue;
827
        }
828
829
        /* The loop is silly, but what else can we do? */
830
        addr_len = pj_sockaddr_get_len(&local_addr);
831
        for (msec2=20; msec2<1000 ; msec2=msec2*2) {
832
            status = pj_sock_bind(new_sock, &local_addr, addr_len);
833
            if (status != PJ_STATUS_FROM_OS(EADDRINUSE))
834
                break;
835
            PJ_LOG(4,(THIS_FILE, "Address is still in use, retrying.."));
836
            pj_thread_sleep(msec2);
837
        }
838
839
        if (status != PJ_SUCCESS)
840
            continue;
841
842
        if (flags & HAS_QOS) {
843
            status = pj_sock_set_qos_params(new_sock, &qos_params);
844
            if (status == PJ_STATUS_FROM_OS(EINVAL)) {
845
                PJ_PERROR(5,(THIS_FILE, status, "Error set qos param"));
846
                continue;
847
            }
848
        }
849
850
        if (flags & HAS_PEER_ADDR) {
851
            status = pj_sock_connect(new_sock, &rem_addr, addr_len);
852
            if (status != PJ_SUCCESS) {
853
                PJ_PERROR(5,(THIS_FILE, status, "Error connect socket"));
854
                continue;
855
            }
856
        }
857
    }
858
    
859
    if (status != PJ_SUCCESS)
860
        goto on_error;
861
    
862
    /* Set socket to nonblocking. */
863
    val = 1;
864
#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
865
    defined(PJ_WIN64) && PJ_WIN64 != 0 || \
866
    defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
867
    if (ioctlsocket(new_sock, FIONBIO, &val)) {
868
#else
869
    if (ioctl(new_sock, FIONBIO, &val)) {
870
#endif
871
        status = pj_get_netos_error();
872
        goto on_error;
873
    }
874
875
    /* Replace the occurrence of old socket with new socket in the
876
     * fd sets.
877
     */
878
    for (i=0; i<fds_cnt; ++i) {
879
        if (PJ_FD_ISSET(h->fd, fds[i])) {
880
            PJ_FD_CLR(h->fd, fds[i]);
881
            PJ_FD_SET(new_sock, fds[i]);
882
        }
883
    }
884
885
    /* And finally replace the fd in the key */
886
    h->fd = new_sock;
887
888
    PJ_LOG(4,(THIS_FILE, "UDP has been replaced successfully!"));
889
890
    pj_lock_release(h->ioqueue->lock);
891
892
    return PJ_SUCCESS;
893
894
on_error:
895
    if (new_sock != PJ_INVALID_SOCKET)
896
        pj_sock_close(new_sock);
897
    if (old_sock != PJ_INVALID_SOCKET)
898
        pj_sock_close(old_sock);
899
900
    /* Clear the occurrence of old socket in the fd sets. */
901
    for (i=0; i<fds_cnt; ++i) {
902
        if (PJ_FD_ISSET(h->fd, fds[i])) {
903
            PJ_FD_CLR(h->fd, fds[i]);
904
        }
905
    }
906
907
    h->fd = PJ_INVALID_SOCKET;
908
    PJ_PERROR(1,(THIS_FILE, status, "Error replacing socket %ld", old_sock));
909
    pj_lock_release(h->ioqueue->lock);
910
    return PJ_ESOCKETSTOP;
911
}
912
#endif
913
914
915
/*
916
 * pj_ioqueue_poll()
917
 *
918
 * Few things worth written:
919
 *
920
 *  - we used to do only one callback called per poll, but it didn't go
921
 *    very well. The reason is because on some situation, the write 
922
 *    callback gets called all the time, thus doesn't give the read
923
 *    callback to get called. This happens, for example, when user
924
 *    submit write operation inside the write callback.
925
 *    As the result, we changed the behaviour so that now multiple
926
 *    callbacks are called in a single poll. It should be fast too,
927
 *    just that we need to be carefull with the ioqueue data structs.
928
 *
929
 *  - to guarantee preemptiveness etc, the poll function must strictly
930
 *    work on fd_set copy of the ioqueue (not the original one).
931
 */
932
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
933
0
{
934
0
    pj_fd_set_t rfdset, wfdset, xfdset;
935
0
    int nfds;
936
0
    int i, count, event_cnt, processed_cnt;
937
0
    pj_ioqueue_key_t *h;
938
0
    enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS };
939
0
    struct event
940
0
    {
941
0
        pj_ioqueue_key_t        *key;
942
0
        enum ioqueue_event_type  event_type;
943
0
    } event[MAX_EVENTS];
944
945
0
    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
946
947
#if defined(PJ_HAS_SSL_SOCK) && PJ_HAS_SSL_SOCK != 0 && \
948
    (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_APPLE)
949
    /* Call SSL Network framework event poll */
950
    ssl_network_event_poll();
951
#endif
952
953
    /* Lock ioqueue before making fd_set copies */
954
0
    pj_lock_acquire(ioqueue->lock);
955
956
    /* We will only do select() when there are sockets to be polled.
957
     * Otherwise select() will return error.
958
     */
959
0
    if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
960
0
        PJ_FD_COUNT(&ioqueue->wfdset)==0 
961
0
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
962
0
        && PJ_FD_COUNT(&ioqueue->xfdset)==0
963
0
#endif
964
0
        )
965
0
    {
966
0
#if PJ_IOQUEUE_HAS_SAFE_UNREG
967
0
        scan_closing_keys(ioqueue);
968
0
#endif
969
0
        pj_lock_release(ioqueue->lock);
970
0
        TRACE__((THIS_FILE, "     poll: no fd is set"));
971
0
        if (timeout)
972
0
            pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
973
0
        return 0;
974
0
    }
975
976
    /* Copy ioqueue's pj_fd_set_t to local variables. */
977
0
    pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
978
0
    pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
979
0
#if PJ_HAS_TCP
980
0
    pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
981
#else
982
    PJ_FD_ZERO(&xfdset);
983
#endif
984
985
#if VALIDATE_FD_SET
986
    validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
987
#endif
988
989
0
    nfds = ioqueue->nfds;
990
991
    /* Unlock ioqueue before select(). */
992
0
    pj_lock_release(ioqueue->lock);
993
994
#if defined(PJ_WIN32_WINPHONE8) && PJ_WIN32_WINPHONE8
995
    count = 0;
996
    __try {
997
#endif
998
999
0
    count = pj_sock_select(nfds+1, &rfdset, &wfdset, &xfdset, 
1000
0
                           timeout);
1001
1002
#if defined(PJ_WIN32_WINPHONE8) && PJ_WIN32_WINPHONE8
1003
    /* Ignore Invalid Handle Exception raised by select().*/
1004
    }
1005
    __except (GetExceptionCode() == STATUS_INVALID_HANDLE ?
1006
              EXCEPTION_CONTINUE_EXECUTION : EXCEPTION_CONTINUE_SEARCH) {
1007
    }
1008
#endif    
1009
    
1010
0
    if (count == 0)
1011
0
        return 0;
1012
0
    else if (count < 0)
1013
0
        return -pj_get_netos_error();
1014
1015
    /* Scan descriptor sets for event and add the events in the event
1016
     * array to be processed later in this function. We do this so that
1017
     * events can be processed in parallel without holding ioqueue lock.
1018
     */
1019
0
    pj_lock_acquire(ioqueue->lock);
1020
1021
0
    event_cnt = 0;
1022
1023
    /* Scan for writable sockets first to handle piggy-back data
1024
     * coming with accept().
1025
     */
1026
0
    for (h = ioqueue->active_list.next;
1027
0
         h != &ioqueue->active_list && event_cnt < MAX_EVENTS;
1028
0
         h = h->next)
1029
0
    {
1030
0
        if (h->fd == PJ_INVALID_SOCKET)
1031
0
            continue;
1032
1033
0
        if ( (key_has_pending_write(h) || key_has_pending_connect(h))
1034
0
             && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
1035
0
        {
1036
0
#if PJ_IOQUEUE_HAS_SAFE_UNREG
1037
0
            increment_counter(h);
1038
0
#endif
1039
0
            event[event_cnt].key = h;
1040
0
            event[event_cnt].event_type = WRITEABLE_EVENT;
1041
0
            ++event_cnt;
1042
0
        }
1043
1044
        /* Scan for readable socket. */
1045
0
        if ((key_has_pending_read(h) || key_has_pending_accept(h))
1046
0
            && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) &&
1047
0
            event_cnt < MAX_EVENTS)
1048
0
        {
1049
0
#if PJ_IOQUEUE_HAS_SAFE_UNREG
1050
0
            increment_counter(h);
1051
0
#endif
1052
0
            event[event_cnt].key = h;
1053
0
            event[event_cnt].event_type = READABLE_EVENT;
1054
0
            ++event_cnt;
1055
0
        }
1056
1057
0
#if PJ_HAS_TCP
1058
0
        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
1059
0
            !IS_CLOSING(h) && event_cnt < MAX_EVENTS)
1060
0
        {
1061
0
#if PJ_IOQUEUE_HAS_SAFE_UNREG
1062
0
            increment_counter(h);
1063
0
#endif
1064
0
            event[event_cnt].key = h;
1065
0
            event[event_cnt].event_type = EXCEPTION_EVENT;
1066
0
            ++event_cnt;
1067
0
        }
1068
0
#endif
1069
0
    }
1070
1071
0
    for (i=0; i<event_cnt; ++i) {
1072
0
        if (event[i].key->grp_lock)
1073
0
            pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0);
1074
0
    }
1075
1076
0
    PJ_RACE_ME(5);
1077
1078
0
    pj_lock_release(ioqueue->lock);
1079
1080
0
    PJ_RACE_ME(5);
1081
1082
0
    processed_cnt = 0;
1083
1084
    /* Now process all events. The dispatch functions will take care
1085
     * of locking in each of the key
1086
     */
1087
0
    for (i=0; i<event_cnt; ++i) {
1088
1089
        /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */
1090
0
        if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) {
1091
0
            switch (event[i].event_type) {
1092
0
            case READABLE_EVENT:
1093
0
                if (ioqueue_dispatch_read_event(ioqueue, event[i].key))
1094
0
                    ++processed_cnt;
1095
0
                break;
1096
0
            case WRITEABLE_EVENT:
1097
0
                if (ioqueue_dispatch_write_event(ioqueue, event[i].key))
1098
0
                    ++processed_cnt;
1099
0
                break;
1100
0
            case EXCEPTION_EVENT:
1101
0
                if (ioqueue_dispatch_exception_event(ioqueue, event[i].key))
1102
0
                    ++processed_cnt;
1103
0
                break;
1104
0
            case NO_EVENT:
1105
0
                pj_assert(!"Invalid event!");
1106
0
                break;
1107
0
            }
1108
0
        }
1109
1110
0
#if PJ_IOQUEUE_HAS_SAFE_UNREG
1111
0
        decrement_counter(event[i].key);
1112
0
#endif
1113
1114
0
        if (event[i].key->grp_lock)
1115
0
            pj_grp_lock_dec_ref_dbg(event[i].key->grp_lock,
1116
0
                                    "ioqueue", 0);
1117
0
    }
1118
1119
0
    TRACE__((THIS_FILE, "     poll: count=%d events=%d processed=%d",
1120
0
             count, event_cnt, processed_cnt));
1121
1122
0
    return processed_cnt;
1123
0
}
1124
1125
PJ_DEF(pj_oshandle_t) pj_ioqueue_get_os_handle( pj_ioqueue_t *ioqueue )
1126
0
{
1127
0
    PJ_UNUSED_ARG(ioqueue);
1128
    return NULL;
1129
0
}
1130
1131
#endif /* PJ_IOQUEUE_IMP == PJ_IOQUEUE_IMP_SELECT */