Coverage Report

Created: 2025-10-12 06:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/pjsip/pjlib/src/pj/activesock.c
Line
Count
Source
1
/* 
2
 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
3
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License
16
 * along with this program; if not, write to the Free Software
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
18
 */
19
#include <pj/activesock.h>
20
#include <pj/compat/socket.h>
21
#include <pj/assert.h>
22
#include <pj/errno.h>
23
#include <pj/log.h>
24
#include <pj/pool.h>
25
#include <pj/sock.h>
26
#include <pj/string.h>
27
28
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
29
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
30
#   include <CFNetwork/CFNetwork.h>
31
32
    static pj_bool_t ios_bg_support = PJ_TRUE;
33
#endif
34
35
0
#define PJ_ACTIVESOCK_MAX_LOOP      50
36
37
38
enum read_type
39
{
40
    TYPE_NONE,
41
    TYPE_RECV,
42
    TYPE_RECV_FROM
43
};
44
45
enum shutdown_dir
46
{
47
    SHUT_NONE = 0,
48
    SHUT_RX = 1,
49
    SHUT_TX = 2
50
};
51
52
struct read_op
53
{
54
    pj_ioqueue_op_key_t  op_key;
55
    pj_uint8_t          *pkt;
56
    unsigned             max_size;
57
    pj_size_t            size;
58
    pj_sockaddr          src_addr;
59
    int                  src_addr_len;
60
};
61
62
struct accept_op
63
{
64
    pj_ioqueue_op_key_t  op_key;
65
    pj_sock_t            new_sock;
66
    pj_sockaddr          rem_addr;
67
    int                  rem_addr_len;
68
};
69
70
struct send_data
71
{
72
    pj_uint8_t          *data;
73
    pj_ssize_t           len;
74
    pj_ssize_t           sent;
75
    unsigned             flags;
76
};
77
78
struct pj_activesock_t
79
{
80
    pj_ioqueue_key_t    *key;
81
    pj_bool_t            stream_oriented;
82
    pj_bool_t            whole_data;
83
    pj_ioqueue_t        *ioqueue;
84
    void                *user_data;
85
    unsigned             async_count;
86
    unsigned             shutdown;
87
    unsigned             max_loop;
88
    pj_activesock_cb     cb;
89
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
90
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
91
    int                  bg_setting;
92
    pj_sock_t            sock;
93
    CFReadStreamRef      readStream;
94
#endif
95
    
96
    unsigned             err_counter;
97
    pj_status_t          last_err;
98
99
    struct send_data     send_data;
100
101
    struct read_op      *read_op;
102
    pj_uint32_t          read_flags;
103
    enum read_type       read_type;
104
105
    struct accept_op    *accept_op;
106
};
107
108
109
static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, 
110
                                     pj_ioqueue_op_key_t *op_key, 
111
                                     pj_ssize_t bytes_read);
112
static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, 
113
                                      pj_ioqueue_op_key_t *op_key,
114
                                      pj_ssize_t bytes_sent);
115
#if PJ_HAS_TCP
116
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, 
117
                                       pj_ioqueue_op_key_t *op_key,
118
                                       pj_sock_t sock, 
119
                                       pj_status_t status);
120
static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, 
121
                                        pj_status_t status);
122
#endif
123
124
PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
125
0
{
126
0
    pj_bzero(cfg, sizeof(*cfg));
127
0
    cfg->async_cnt = 1;
128
0
    cfg->concurrency = -1;
129
0
    cfg->whole_data = PJ_TRUE;
130
0
    cfg->sock_cloexec = PJ_TRUE;
131
0
}
132
133
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
134
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
135
static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock)
136
{
137
    if (asock->readStream) {
138
        CFReadStreamClose(asock->readStream);
139
        CFRelease(asock->readStream);
140
        asock->readStream = NULL;
141
    }
142
}
143
144
static void activesock_create_iphone_os_stream(pj_activesock_t *asock)
145
{
146
#if (defined(__IPHONE_OS_VERSION_MIN_REQUIRED) && \
147
     __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0)
148
149
    if (ios_bg_support && asock->bg_setting && asock->stream_oriented) {
150
        activesock_destroy_iphone_os_stream(asock);
151
152
        CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock,
153
                                     &asock->readStream, NULL);
154
155
        if (!asock->readStream ||
156
            CFReadStreamSetProperty(asock->readStream,
157
                                    kCFStreamNetworkServiceType,
158
                                    kCFStreamNetworkServiceTypeVoIP)
159
            != TRUE ||
160
            CFReadStreamOpen(asock->readStream) != TRUE)
161
        {
162
            PJ_LOG(2,("", "Failed to configure TCP transport for VoIP "
163
                      "usage. Usage of THIS particular TCP transport in "
164
                      "background mode will not be supported."));
165
166
            
167
            activesock_destroy_iphone_os_stream(asock);
168
        }
169
    }
170
171
#endif
172
}
173
174
175
PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock,
176
                                            int val)
177
{
178
    asock->bg_setting = val;
179
    if (asock->bg_setting)
180
        activesock_create_iphone_os_stream(asock);
181
    else
182
        activesock_destroy_iphone_os_stream(asock);
183
}
184
185
PJ_DEF(void) pj_activesock_enable_iphone_os_bg(pj_bool_t val)
186
{
187
    ios_bg_support = val;
188
}
189
#endif
190
191
PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
192
                                          pj_sock_t sock,
193
                                          int sock_type,
194
                                          const pj_activesock_cfg *opt,
195
                                          pj_ioqueue_t *ioqueue,
196
                                          const pj_activesock_cb *cb,
197
                                          void *user_data,
198
                                          pj_activesock_t **p_asock)
199
0
{
200
0
    pj_activesock_t *asock;
201
0
    pj_ioqueue_callback ioq_cb;
202
0
    pj_status_t status;
203
204
0
    PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL);
205
0
    PJ_ASSERT_RETURN(sock>=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL);
206
0
    PJ_ASSERT_RETURN((sock_type & 0xF)==pj_SOCK_STREAM() ||
207
0
                     (sock_type & 0xF)==pj_SOCK_DGRAM(), PJ_EINVAL);
208
0
    PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL);
209
210
0
    asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t);
211
0
    asock->ioqueue = ioqueue;
212
0
    asock->stream_oriented = ((sock_type & 0xF) == pj_SOCK_STREAM());
213
0
    asock->async_count = (opt? opt->async_cnt : 1);
214
0
    asock->whole_data = (opt? opt->whole_data : 1);
215
0
    asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
216
0
    asock->user_data = user_data;
217
0
    pj_memcpy(&asock->cb, cb, sizeof(*cb));
218
219
0
    pj_bzero(&ioq_cb, sizeof(ioq_cb));
220
0
    ioq_cb.on_read_complete = &ioqueue_on_read_complete;
221
0
    ioq_cb.on_write_complete = &ioqueue_on_write_complete;
222
0
#if PJ_HAS_TCP
223
0
    ioq_cb.on_connect_complete = &ioqueue_on_connect_complete;
224
0
    ioq_cb.on_accept_complete = &ioqueue_on_accept_complete;
225
0
#endif
226
227
0
    status = pj_ioqueue_register_sock2(pool, ioqueue, sock,
228
0
                                       (opt? opt->grp_lock : NULL),
229
0
                                       asock, &ioq_cb, &asock->key);
230
0
    if (status != PJ_SUCCESS) {
231
0
        pj_activesock_close(asock);
232
0
        return status;
233
0
    }
234
    
235
0
    if (asock->whole_data) {
236
        /* Must disable concurrency otherwise there is a race condition */
237
0
        pj_ioqueue_set_concurrency(asock->key, 0);
238
0
    } else if (opt && opt->concurrency >= 0) {
239
0
        pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
240
0
    }
241
242
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
243
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
244
    asock->sock = sock;
245
    asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG;
246
#endif
247
248
0
    *p_asock = asock;
249
0
    return PJ_SUCCESS;
250
0
}
251
252
253
PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool,
254
                                              const pj_sockaddr *addr,
255
                                              const pj_activesock_cfg *opt,
256
                                              pj_ioqueue_t *ioqueue,
257
                                              const pj_activesock_cb *cb,
258
                                              void *user_data,
259
                                              pj_activesock_t **p_asock,
260
                                              pj_sockaddr *bound_addr)
261
0
{
262
0
    pj_sock_t sock_fd;
263
0
    pj_sockaddr default_addr;
264
0
    pj_status_t status;
265
0
    int sock_type = pj_SOCK_DGRAM();
266
267
0
    if (opt && opt->sock_cloexec)
268
0
        sock_type |= pj_SOCK_CLOEXEC();
269
270
0
    if (addr == NULL) {
271
0
        pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0);
272
0
        addr = &default_addr;
273
0
    }
274
275
0
    status = pj_sock_socket(addr->addr.sa_family, sock_type, 0, 
276
0
                            &sock_fd);
277
0
    if (status != PJ_SUCCESS) {
278
0
        return status;
279
0
    }
280
281
0
    status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr));
282
0
    if (status != PJ_SUCCESS) {
283
0
        pj_sock_close(sock_fd);
284
0
        return status;
285
0
    }
286
287
0
    status = pj_activesock_create(pool, sock_fd, sock_type, opt,
288
0
                                  ioqueue, cb, user_data, p_asock);
289
0
    if (status != PJ_SUCCESS) {
290
0
        pj_sock_close(sock_fd);
291
0
        return status;
292
0
    }
293
294
0
    if (bound_addr) {
295
0
        int addr_len = sizeof(*bound_addr);
296
0
        status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len);
297
0
        if (status != PJ_SUCCESS) {
298
0
            pj_activesock_close(*p_asock);
299
0
            return status;
300
0
        }
301
0
    }
302
303
0
    return PJ_SUCCESS;
304
0
}
305
306
PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock)
307
0
{
308
0
    pj_ioqueue_key_t *key;
309
0
    pj_bool_t unregister = PJ_FALSE;
310
311
0
    PJ_ASSERT_RETURN(asock, PJ_EINVAL);
312
0
    asock->shutdown = SHUT_RX | SHUT_TX;
313
314
    /* Avoid double unregistration on the key */
315
0
    key = asock->key;
316
0
    if (key) {
317
0
        pj_ioqueue_lock_key(key);
318
0
        unregister = (asock->key != NULL);
319
0
        asock->key = NULL;
320
0
        pj_ioqueue_unlock_key(key);
321
0
    }
322
323
0
    if (unregister) {
324
0
        pj_ioqueue_unregister(key);
325
326
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
327
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
328
        activesock_destroy_iphone_os_stream(asock);
329
#endif  
330
0
    }
331
0
    return PJ_SUCCESS;
332
0
}
333
334
335
PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock,
336
                                                 void *user_data)
337
0
{
338
0
    PJ_ASSERT_RETURN(asock, PJ_EINVAL);
339
0
    asock->user_data = user_data;
340
0
    return PJ_SUCCESS;
341
0
}
342
343
344
PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock)
345
0
{
346
0
    PJ_ASSERT_RETURN(asock, NULL);
347
0
    return asock->user_data;
348
0
}
349
350
351
PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock,
352
                                             pj_pool_t *pool,
353
                                             unsigned buff_size,
354
                                             pj_uint32_t flags)
355
0
{
356
0
    void **readbuf;
357
0
    unsigned i;
358
359
0
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
360
361
0
    readbuf = (void**) pj_pool_calloc(pool, asock->async_count, 
362
0
                                      sizeof(void*));
363
364
0
    for (i=0; i<asock->async_count; ++i) {
365
0
        readbuf[i] = pj_pool_alloc(pool, buff_size);
366
0
    }
367
368
0
    return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags);
369
0
}
370
371
372
PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock,
373
                                               pj_pool_t *pool,
374
                                               unsigned buff_size,
375
                                               void *readbuf[],
376
                                               pj_uint32_t flags)
377
0
{
378
0
    unsigned i;
379
0
    pj_status_t status;
380
381
0
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
382
0
    PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
383
0
    PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP);
384
385
0
    asock->read_op = (struct read_op*)
386
0
                     pj_pool_calloc(pool, asock->async_count, 
387
0
                                    sizeof(struct read_op));
388
0
    asock->read_type = TYPE_RECV;
389
0
    asock->read_flags = flags;
390
391
0
    for (i=0; i<asock->async_count; ++i) {
392
0
        struct read_op *r = &asock->read_op[i];
393
0
        pj_ssize_t size_to_read;
394
395
0
        r->pkt = (pj_uint8_t*)readbuf[i];
396
0
        size_to_read = r->max_size = buff_size;
397
398
0
        status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read,
399
0
                                 PJ_IOQUEUE_ALWAYS_ASYNC | flags);
400
0
        PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
401
402
0
        if (status != PJ_EPENDING)
403
0
            return status;
404
0
    }
405
406
0
    return PJ_SUCCESS;
407
0
}
408
409
410
PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock,
411
                                                 pj_pool_t *pool,
412
                                                 unsigned buff_size,
413
                                                 pj_uint32_t flags)
414
0
{
415
0
    void **readbuf;
416
0
    unsigned i;
417
418
0
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
419
420
0
    readbuf = (void**) pj_pool_calloc(pool, asock->async_count, 
421
0
                                      sizeof(void*));
422
423
0
    for (i=0; i<asock->async_count; ++i) {
424
0
        readbuf[i] = pj_pool_alloc(pool, buff_size);
425
0
    }
426
427
0
    return pj_activesock_start_recvfrom2(asock, pool, buff_size, 
428
0
                                         readbuf, flags);
429
0
}
430
431
432
PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock,
433
                                                   pj_pool_t *pool,
434
                                                   unsigned buff_size,
435
                                                   void *readbuf[],
436
                                                   pj_uint32_t flags)
437
0
{
438
0
    unsigned i;
439
0
    pj_status_t status;
440
441
0
    PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL);
442
0
    PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP);
443
444
0
    asock->read_op = (struct read_op*)
445
0
                     pj_pool_calloc(pool, asock->async_count, 
446
0
                                    sizeof(struct read_op));
447
0
    asock->read_type = TYPE_RECV_FROM;
448
0
    asock->read_flags = flags;
449
450
0
    for (i=0; i<asock->async_count; ++i) {
451
0
        struct read_op *r = &asock->read_op[i];
452
0
        pj_ssize_t size_to_read;
453
454
0
        r->pkt = (pj_uint8_t*) readbuf[i];
455
0
        size_to_read = r->max_size = buff_size;
456
0
        r->src_addr_len = sizeof(r->src_addr);
457
458
0
        status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt,
459
0
                                     &size_to_read, 
460
0
                                     PJ_IOQUEUE_ALWAYS_ASYNC | flags,
461
0
                                     &r->src_addr, &r->src_addr_len);
462
0
        PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG);
463
464
0
        if (status != PJ_EPENDING)
465
0
            return status;
466
0
    }
467
468
0
    return PJ_SUCCESS;
469
0
}
470
471
472
static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, 
473
                                     pj_ioqueue_op_key_t *op_key, 
474
                                     pj_ssize_t bytes_read)
475
0
{
476
0
    pj_activesock_t *asock;
477
0
    struct read_op *r = (struct read_op*)op_key;
478
0
    unsigned loop = 0;
479
0
    pj_status_t status;
480
481
0
    asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
482
483
    /* Ignore if we've been shutdown */
484
0
    if (asock->shutdown & SHUT_RX)
485
0
        return;
486
487
0
    do {
488
0
        unsigned flags;
489
490
0
        if (bytes_read > 0) {
491
            /*
492
             * We've got new data.
493
             */
494
0
            pj_size_t remainder;
495
0
            pj_bool_t ret;
496
497
            /* Append this new data to existing data. If socket is stream 
498
             * oriented, user might have left some data in the buffer. 
499
             * Otherwise if socket is datagram there will be nothing in 
500
             * existing packet hence the packet will contain only the new
501
             * packet.
502
             */
503
0
            r->size += bytes_read;
504
505
            /* Set default remainder to zero */
506
0
            remainder = 0;
507
508
            /* And return value to TRUE */
509
0
            ret = PJ_TRUE;
510
511
            /* Notify callback */
512
0
            if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
513
0
                ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
514
0
                                                PJ_SUCCESS, &remainder);
515
0
                PJ_ASSERT_ON_FAIL(
516
0
                    !ret || !asock->stream_oriented || remainder <= r->size,
517
0
                    {
518
0
                        PJ_LOG(2, ("",
519
0
                                   "App bug! Invalid remainder length from "
520
0
                                   "activesock on_data_read()."));
521
0
                        remainder = 0;
522
0
                    });
523
0
            } else if (asock->read_type == TYPE_RECV_FROM && 
524
0
                       asock->cb.on_data_recvfrom) 
525
0
            {
526
0
                ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size,
527
0
                                                    &r->src_addr, 
528
0
                                                    r->src_addr_len,
529
0
                                                    PJ_SUCCESS);
530
0
            }
531
532
            /* If callback returns false, we have been destroyed! */
533
0
            if (!ret)
534
0
                return;
535
536
            /* Only stream oriented socket may leave data in the packet */
537
0
            if (asock->stream_oriented) {
538
0
                r->size = remainder;
539
0
            } else {
540
0
                r->size = 0;
541
0
            }
542
543
0
        } else if (bytes_read <= 0 &&
544
0
                   -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
545
0
                   -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && 
546
0
                   (asock->stream_oriented ||
547
0
                    -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))) 
548
0
        {
549
0
            pj_size_t remainder;
550
0
            pj_bool_t ret;
551
552
0
            if (bytes_read == 0) {
553
                /* For stream/connection oriented socket, this means the 
554
                 * connection has been closed. For datagram sockets, it means
555
                 * we've received datagram with zero length.
556
                 */
557
0
                if (asock->stream_oriented)
558
0
                    status = PJ_EEOF;
559
0
                else
560
0
                    status = PJ_SUCCESS;
561
0
            } else {
562
                /* This means we've got an error. If this is stream/connection
563
                 * oriented, it means connection has been closed. For datagram
564
                 * sockets, it means we've got some error (e.g. EWOULDBLOCK).
565
                 */
566
0
                status = (pj_status_t)-bytes_read;
567
0
            }
568
569
            /* Set default remainder to zero */
570
0
            remainder = 0;
571
572
            /* And return value to TRUE */
573
0
            ret = PJ_TRUE;
574
575
            /* Notify callback */
576
0
            if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) {
577
                /* For connection oriented socket, we still need to report 
578
                 * the remainder data (if any) to the user to let user do 
579
                 * processing with the remainder data before it closes the
580
                 * connection.
581
                 * If there is no remainder data, set the packet to NULL.
582
                 */
583
584
                /* Shouldn't set the packet to NULL, as there may be active 
585
                 * socket user, such as SSL socket, that needs to have access
586
                 * to the read buffer packet.
587
                 */
588
                //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL),
589
                //                              r->size, status, &remainder);
590
0
                ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size,
591
0
                                                status, &remainder);
592
0
                PJ_ASSERT_ON_FAIL(
593
0
                    !ret || !asock->stream_oriented || remainder <= r->size,
594
0
                    {
595
0
                        PJ_LOG(2, ("",
596
0
                                   "App bug! Invalid remainder length from "
597
0
                                   "activesock on_data_read()."));
598
0
                        remainder = 0;
599
0
                    });
600
601
0
            } else if (asock->read_type == TYPE_RECV_FROM && 
602
0
                       asock->cb.on_data_recvfrom) 
603
0
            {
604
                /* This would always be datagram oriented hence there's 
605
                 * nothing in the packet. We can't be sure if there will be
606
                 * anything useful in the source_addr, so just put NULL
607
                 * there too.
608
                 */
609
                /* In some scenarios, status may be PJ_SUCCESS. The upper 
610
                 * layer application may not expect the callback to be called
611
                 * with successful status and NULL data, so lets not call the
612
                 * callback if the status is PJ_SUCCESS.
613
                 */
614
0
                if (status != PJ_SUCCESS ) {
615
0
                    ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0,
616
0
                                                        NULL, 0, status);
617
0
                }
618
0
            }
619
620
            /* If callback returns false, we have been destroyed! */
621
0
            if (!ret)
622
0
                return;
623
624
            /* Also stop further read if we've been shutdown */
625
0
            if (asock->shutdown & SHUT_RX)
626
0
                return;
627
628
            /* Only stream oriented socket may leave data in the packet */
629
0
            if (asock->stream_oriented) {
630
0
                r->size = remainder;
631
0
            } else {
632
0
                r->size = 0;
633
0
            }
634
0
        }
635
636
        /* Read next data. We limit ourselves to processing max_loop immediate
637
         * data, so when the loop counter has exceeded this value, force the
638
         * read()/recvfrom() to return pending operation to allow the program
639
         * to do other jobs.
640
         */
641
0
        bytes_read = r->max_size - r->size;
642
0
        flags = asock->read_flags;
643
0
        if (++loop >= asock->max_loop)
644
0
            flags |= PJ_IOQUEUE_ALWAYS_ASYNC;
645
646
0
        if (asock->read_type == TYPE_RECV) {
647
0
            status = pj_ioqueue_recv(key, op_key, r->pkt + r->size, 
648
0
                                     &bytes_read, flags);
649
0
        } else {
650
0
            r->src_addr_len = sizeof(r->src_addr);
651
0
            status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size,
652
0
                                         &bytes_read, flags,
653
0
                                         &r->src_addr, &r->src_addr_len);
654
0
        }
655
656
0
        if (status == PJ_SUCCESS) {
657
            /* Immediate data */
658
0
            ;
659
0
        } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) {
660
            /* Error */
661
0
            bytes_read = -status;
662
0
        } else {
663
0
            break;
664
0
        }
665
0
    } while (1);
666
667
0
}
668
669
670
static pj_status_t send_remaining(pj_activesock_t *asock, 
671
                                  pj_ioqueue_op_key_t *send_key)
672
0
{
673
0
    struct send_data *sd = (struct send_data*)send_key->activesock_data;
674
0
    pj_status_t status;
675
676
0
    do {
677
0
        pj_ssize_t size;
678
679
0
        size = sd->len - sd->sent;
680
0
        status = pj_ioqueue_send(asock->key, send_key, 
681
0
                                 sd->data+sd->sent, &size, sd->flags);
682
0
        if (status != PJ_SUCCESS) {
683
            /* Pending or error */
684
0
            break;
685
0
        }
686
687
0
        sd->sent += size;
688
0
        if (sd->sent == sd->len) {
689
            /* The whole data has been sent. */
690
0
            return PJ_SUCCESS;
691
0
        }
692
693
0
    } while (sd->sent < sd->len);
694
695
0
    return status;
696
0
}
697
698
699
PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
700
                                        pj_ioqueue_op_key_t *send_key,
701
                                        const void *data,
702
                                        pj_ssize_t *size,
703
                                        unsigned flags)
704
0
{
705
0
    PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
706
707
0
    if (asock->shutdown & SHUT_TX)
708
0
        return PJ_EINVALIDOP;
709
710
0
    send_key->activesock_data = NULL;
711
712
0
    if (asock->whole_data) {
713
0
        pj_ssize_t whole;
714
0
        pj_status_t status;
715
716
0
        whole = *size;
717
718
0
        status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
719
0
        if (status != PJ_SUCCESS) {
720
            /* Pending or error */
721
0
            return status;
722
0
        }
723
724
0
        if (*size == whole) {
725
            /* The whole data has been sent. */
726
0
            return PJ_SUCCESS;
727
0
        }
728
729
        /* Data was partially sent */
730
0
        asock->send_data.data = (pj_uint8_t*)data;
731
0
        asock->send_data.len = whole;
732
0
        asock->send_data.sent = *size;
733
0
        asock->send_data.flags = flags;
734
0
        send_key->activesock_data = &asock->send_data;
735
736
        /* Try again */
737
0
        status = send_remaining(asock, send_key);
738
0
        if (status == PJ_SUCCESS) {
739
0
            *size = whole;
740
0
        }
741
0
        return status;
742
743
0
    } else {
744
0
        return pj_ioqueue_send(asock->key, send_key, data, size, flags);
745
0
    }
746
0
}
747
748
749
PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock,
750
                                          pj_ioqueue_op_key_t *send_key,
751
                                          const void *data,
752
                                          pj_ssize_t *size,
753
                                          unsigned flags,
754
                                          const pj_sockaddr_t *addr,
755
                                          int addr_len)
756
0
{
757
0
    PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len, 
758
0
                     PJ_EINVAL);
759
760
0
    if (asock->shutdown & SHUT_TX)
761
0
        return PJ_EINVALIDOP;
762
763
0
    return pj_ioqueue_sendto(asock->key, send_key, data, size, flags,
764
0
                             addr, addr_len);
765
0
}
766
767
768
static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, 
769
                                      pj_ioqueue_op_key_t *op_key,
770
                                      pj_ssize_t bytes_sent)
771
0
{
772
0
    pj_activesock_t *asock;
773
774
0
    asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
775
776
    /* Ignore if we've been shutdown. This may cause data to be partially
777
     * sent even when 'wholedata' was requested if the OS only sent partial
778
     * buffer.
779
     */
780
0
    if (asock->shutdown & SHUT_TX)
781
0
        return;
782
783
0
    if (bytes_sent > 0 && op_key->activesock_data) {
784
        /* whole_data is requested. Make sure we send all the data */
785
0
        struct send_data *sd = (struct send_data*)op_key->activesock_data;
786
787
0
        sd->sent += bytes_sent;
788
0
        if (sd->sent == sd->len) {
789
            /* all has been sent */
790
0
            bytes_sent = sd->sent;
791
0
            op_key->activesock_data = NULL;
792
0
        } else {
793
            /* send remaining data */
794
0
            pj_status_t status;
795
796
0
            status = send_remaining(asock, op_key);
797
0
            if (status == PJ_EPENDING)
798
0
                return;
799
0
            else if (status == PJ_SUCCESS)
800
0
                bytes_sent = sd->sent;
801
0
            else
802
0
                bytes_sent = -status;
803
804
0
            op_key->activesock_data = NULL;
805
0
        }
806
0
    } 
807
808
0
    if (asock->cb.on_data_sent) {
809
0
        pj_bool_t ret;
810
811
0
        ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent);
812
813
        /* If callback returns false, we have been destroyed! */
814
0
        if (!ret)
815
0
            return;
816
0
    }
817
0
}
818
819
#if PJ_HAS_TCP
820
PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock,
821
                                               pj_pool_t *pool)
822
0
{
823
0
    unsigned i;
824
825
0
    PJ_ASSERT_RETURN(asock, PJ_EINVAL);
826
0
    PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP);
827
828
    /* Ignore if we've been shutdown */
829
0
    if (asock->shutdown)
830
0
        return PJ_EINVALIDOP;
831
832
0
    asock->accept_op = (struct accept_op*)
833
0
                       pj_pool_calloc(pool, asock->async_count,
834
0
                                      sizeof(struct accept_op));
835
0
    for (i=0; i<asock->async_count; ++i) {
836
0
        struct accept_op *a = &asock->accept_op[i];
837
0
        pj_status_t status;
838
839
0
        do {
840
0
            a->new_sock = PJ_INVALID_SOCKET;
841
0
            a->rem_addr_len = sizeof(a->rem_addr);
842
843
0
            status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock,
844
0
                                       NULL, &a->rem_addr, &a->rem_addr_len);
845
0
            if (status == PJ_SUCCESS) {
846
                /* We've got immediate connection. Not sure if it's a good
847
                 * idea to call the callback now (probably application will
848
                 * not be prepared to process it), so lets just silently
849
                 * close the socket.
850
                 */
851
0
                pj_sock_close(a->new_sock);
852
0
            }
853
0
        } while (status == PJ_SUCCESS);
854
855
0
        if (status != PJ_EPENDING) {
856
0
            return status;
857
0
        }
858
0
    }
859
860
0
    return PJ_SUCCESS;
861
0
}
862
863
864
static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, 
865
                                       pj_ioqueue_op_key_t *op_key,
866
                                       pj_sock_t new_sock, 
867
                                       pj_status_t status)
868
0
{
869
0
    pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
870
0
    struct accept_op *accept_op = (struct accept_op*) op_key;
871
872
0
    PJ_UNUSED_ARG(new_sock);
873
874
    /* Ignore if we've been shutdown */
875
0
    if (asock->shutdown)
876
0
        return;
877
878
0
    do {
879
0
        if (status == asock->last_err && status != PJ_SUCCESS) {
880
0
            asock->err_counter++;
881
0
            if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) {
882
0
                PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()"
883
0
                               " operation, stopping further ioqueue accepts.",
884
0
                               asock->err_counter, asock->last_err));
885
                
886
0
                if ((status == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) && 
887
0
                    (asock->cb.on_accept_complete2)) 
888
0
                {
889
0
                    (*asock->cb.on_accept_complete2)(asock, 
890
0
                                                     accept_op->new_sock,
891
0
                                                     &accept_op->rem_addr,
892
0
                                                     accept_op->rem_addr_len,
893
0
                                                     PJ_ESOCKETSTOP);
894
0
                }
895
0
                return;
896
0
            }
897
0
        } else {
898
0
            asock->err_counter = 0;
899
0
            asock->last_err = status;
900
0
        }
901
902
0
        if (status==PJ_SUCCESS && (asock->cb.on_accept_complete2 || 
903
0
                                   asock->cb.on_accept_complete)) {
904
0
            pj_bool_t ret;
905
906
            /* Notify callback */
907
0
            if (asock->cb.on_accept_complete2) {
908
0
                ret = (*asock->cb.on_accept_complete2)(asock, 
909
0
                                                       accept_op->new_sock,
910
0
                                                       &accept_op->rem_addr,
911
0
                                                       accept_op->rem_addr_len,
912
0
                                                       status);
913
0
            } else {
914
0
                ret = (*asock->cb.on_accept_complete)(asock, 
915
0
                                                      accept_op->new_sock,
916
0
                                                      &accept_op->rem_addr,
917
0
                                                      accept_op->rem_addr_len);     
918
0
            }
919
920
            /* If callback returns false, we have been destroyed! */
921
0
            if (!ret)
922
0
                return;
923
924
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
925
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
926
            activesock_create_iphone_os_stream(asock);
927
#endif
928
0
        } else if (status==PJ_SUCCESS) {
929
            /* Application doesn't handle the new socket, we need to 
930
             * close it to avoid resource leak.
931
             */
932
0
            pj_sock_close(accept_op->new_sock);
933
0
        }
934
935
        /* Don't start another accept() if we've been shutdown */
936
0
        if (asock->shutdown)
937
0
            return;
938
939
        /* Prepare next accept() */
940
0
        accept_op->new_sock = PJ_INVALID_SOCKET;
941
0
        accept_op->rem_addr_len = sizeof(accept_op->rem_addr);
942
943
0
        status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock,
944
0
                                   NULL, &accept_op->rem_addr, 
945
0
                                   &accept_op->rem_addr_len);
946
947
0
    } while (status != PJ_EPENDING && status != PJ_ECANCELLED);
948
0
}
949
950
951
PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock,
952
                                                 pj_pool_t *pool,
953
                                                 const pj_sockaddr_t *remaddr,
954
                                                 int addr_len)
955
0
{
956
0
    PJ_UNUSED_ARG(pool);
957
958
0
    if (asock->shutdown)
959
0
        return PJ_EINVALIDOP;
960
961
0
    return pj_ioqueue_connect(asock->key, remaddr, addr_len);
962
0
}
963
964
static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, 
965
                                        pj_status_t status)
966
0
{
967
0
    pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
968
969
    /* Ignore if we've been shutdown */
970
0
    if (asock->shutdown)
971
0
        return;
972
973
0
    if (asock->cb.on_connect_complete) {
974
0
        pj_bool_t ret;
975
976
0
        ret = (*asock->cb.on_connect_complete)(asock, status);
977
978
0
        if (!ret) {
979
            /* We've been destroyed */
980
0
            return;
981
0
        }
982
        
983
#if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
984
    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
985
        activesock_create_iphone_os_stream(asock);
986
#endif
987
        
988
0
    }
989
0
}
990
#endif  /* PJ_HAS_TCP */
991