Coverage Report

Created: 2024-05-20 06:23

/src/nspr/pr/src/io/prmwait.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* This Source Code Form is subject to the terms of the Mozilla Public
3
 * License, v. 2.0. If a copy of the MPL was not distributed with this
4
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6
#include "primpl.h"
7
#include "pprmwait.h"
8
9
0
#define _MW_REHASH_MAX 11
10
11
static PRLock *mw_lock = NULL;
12
static _PRGlobalState *mw_state = NULL;
13
14
static PRIntervalTime max_polling_interval;
15
16
#ifdef WINNT
17
18
typedef struct TimerEvent {
19
    PRIntervalTime absolute;
20
    void (*func)(void *);
21
    void *arg;
22
    LONG ref_count;
23
    PRCList links;
24
} TimerEvent;
25
26
#define TIMER_EVENT_PTR(_qp) \
27
    ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
28
29
struct {
30
    PRLock *ml;
31
    PRCondVar *new_timer;
32
    PRCondVar *cancel_timer;
33
    PRThread *manager_thread;
34
    PRCList timer_queue;
35
} tm_vars;
36
37
static PRStatus TimerInit(void);
38
static void TimerManager(void *arg);
39
static TimerEvent *CreateTimer(PRIntervalTime timeout,
40
                               void (*func)(void *), void *arg);
41
static PRBool CancelTimer(TimerEvent *timer);
42
43
static void TimerManager(void *arg)
44
{
45
    PRIntervalTime now;
46
    PRIntervalTime timeout;
47
    PRCList *head;
48
    TimerEvent *timer;
49
50
    PR_Lock(tm_vars.ml);
51
    while (1)
52
    {
53
        if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue))
54
        {
55
            PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
56
        }
57
        else
58
        {
59
            now = PR_IntervalNow();
60
            head = PR_LIST_HEAD(&tm_vars.timer_queue);
61
            timer = TIMER_EVENT_PTR(head);
62
            if ((PRInt32) (now - timer->absolute) >= 0)
63
            {
64
                PR_REMOVE_LINK(head);
65
                /*
66
                 * make its prev and next point to itself so that
67
                 * it's obvious that it's not on the timer_queue.
68
                 */
69
                PR_INIT_CLIST(head);
70
                PR_ASSERT(2 == timer->ref_count);
71
                PR_Unlock(tm_vars.ml);
72
                timer->func(timer->arg);
73
                PR_Lock(tm_vars.ml);
74
                timer->ref_count -= 1;
75
                if (0 == timer->ref_count)
76
                {
77
                    PR_NotifyAllCondVar(tm_vars.cancel_timer);
78
                }
79
            }
80
            else
81
            {
82
                timeout = (PRIntervalTime)(timer->absolute - now);
83
                PR_WaitCondVar(tm_vars.new_timer, timeout);
84
            }
85
        }
86
    }
87
    PR_Unlock(tm_vars.ml);
88
}
89
90
static TimerEvent *CreateTimer(
91
    PRIntervalTime timeout,
92
    void (*func)(void *),
93
    void *arg)
94
{
95
    TimerEvent *timer;
96
    PRCList *links, *tail;
97
    TimerEvent *elem;
98
99
    timer = PR_NEW(TimerEvent);
100
    if (NULL == timer)
101
    {
102
        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
103
        return timer;
104
    }
105
    timer->absolute = PR_IntervalNow() + timeout;
106
    timer->func = func;
107
    timer->arg = arg;
108
    timer->ref_count = 2;
109
    PR_Lock(tm_vars.ml);
110
    tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
111
    while (links->prev != tail)
112
    {
113
        elem = TIMER_EVENT_PTR(links);
114
        if ((PRInt32)(timer->absolute - elem->absolute) >= 0)
115
        {
116
            break;
117
        }
118
        links = links->prev;
119
    }
120
    PR_INSERT_AFTER(&timer->links, links);
121
    PR_NotifyCondVar(tm_vars.new_timer);
122
    PR_Unlock(tm_vars.ml);
123
    return timer;
124
}
125
126
static PRBool CancelTimer(TimerEvent *timer)
127
{
128
    PRBool canceled = PR_FALSE;
129
130
    PR_Lock(tm_vars.ml);
131
    timer->ref_count -= 1;
132
    if (timer->links.prev == &timer->links)
133
    {
134
        while (timer->ref_count == 1)
135
        {
136
            PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
137
        }
138
    }
139
    else
140
    {
141
        PR_REMOVE_LINK(&timer->links);
142
        canceled = PR_TRUE;
143
    }
144
    PR_Unlock(tm_vars.ml);
145
    PR_DELETE(timer);
146
    return canceled;
147
}
148
149
static PRStatus TimerInit(void)
150
{
151
    tm_vars.ml = PR_NewLock();
152
    if (NULL == tm_vars.ml)
153
    {
154
        goto failed;
155
    }
156
    tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
157
    if (NULL == tm_vars.new_timer)
158
    {
159
        goto failed;
160
    }
161
    tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
162
    if (NULL == tm_vars.cancel_timer)
163
    {
164
        goto failed;
165
    }
166
    PR_INIT_CLIST(&tm_vars.timer_queue);
167
    tm_vars.manager_thread = PR_CreateThread(
168
                                 PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
169
                                 PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
170
    if (NULL == tm_vars.manager_thread)
171
    {
172
        goto failed;
173
    }
174
    return PR_SUCCESS;
175
176
failed:
177
    if (NULL != tm_vars.cancel_timer)
178
    {
179
        PR_DestroyCondVar(tm_vars.cancel_timer);
180
    }
181
    if (NULL != tm_vars.new_timer)
182
    {
183
        PR_DestroyCondVar(tm_vars.new_timer);
184
    }
185
    if (NULL != tm_vars.ml)
186
    {
187
        PR_DestroyLock(tm_vars.ml);
188
    }
189
    return PR_FAILURE;
190
}
191
192
#endif /* WINNT */
193
194
/******************************************************************/
195
/******************************************************************/
196
/************************ The private portion *********************/
197
/******************************************************************/
198
/******************************************************************/
199
void _PR_InitMW(void)
200
1
{
201
#ifdef WINNT
202
    /*
203
     * We use NT 4's InterlockedCompareExchange() to operate
204
     * on PRMWStatus variables.
205
     */
206
    PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
207
    TimerInit();
208
#endif
209
1
    mw_lock = PR_NewLock();
210
1
    PR_ASSERT(NULL != mw_lock);
211
1
    mw_state = PR_NEWZAP(_PRGlobalState);
212
1
    PR_ASSERT(NULL != mw_state);
213
1
    PR_INIT_CLIST(&mw_state->group_list);
214
1
    max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
215
1
}  /* _PR_InitMW */
216
217
void _PR_CleanupMW(void)
218
0
{
219
0
    PR_DestroyLock(mw_lock);
220
0
    mw_lock = NULL;
221
0
    if (mw_state->group) {
222
0
        PR_DestroyWaitGroup(mw_state->group);
223
        /* mw_state->group is set to NULL as a side effect. */
224
0
    }
225
0
    PR_DELETE(mw_state);
226
0
}  /* _PR_CleanupMW */
227
228
static PRWaitGroup *MW_Init2(void)
229
0
{
230
0
    PRWaitGroup *group = mw_state->group;  /* it's the null group */
231
0
    if (NULL == group)  /* there is this special case */
232
0
    {
233
0
        group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
234
0
        if (NULL == group) {
235
0
            goto failed_alloc;
236
0
        }
237
0
        PR_Lock(mw_lock);
238
0
        if (NULL == mw_state->group)
239
0
        {
240
0
            mw_state->group = group;
241
0
            group = NULL;
242
0
        }
243
0
        PR_Unlock(mw_lock);
244
0
        if (group != NULL) {
245
0
            (void)PR_DestroyWaitGroup(group);
246
0
        }
247
0
        group = mw_state->group;  /* somebody beat us to it */
248
0
    }
249
0
failed_alloc:
250
0
    return group;  /* whatever */
251
0
}  /* MW_Init2 */
252
253
static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
254
0
{
255
    /*
256
    ** The entries are put in the table using the fd (PRFileDesc*) of
257
    ** the receive descriptor as the key. This allows us to locate
258
    ** the appropriate entry aqain when the poll operation finishes.
259
    **
260
    ** The pointer to the file descriptor object is first divided by
261
    ** the natural alignment of a pointer in the belief that object
262
    ** will have at least that many zeros in the low order bits.
263
    ** This may not be a good assuption.
264
    **
265
    ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
266
    ** that we declare defeat and force the table to be reconstructed.
267
    ** Since some fds might be added more than once, won't that cause
268
    ** collisions even in an empty table?
269
    */
270
0
    PRIntn rehash = _MW_REHASH_MAX;
271
0
    PRRecvWait **waiter;
272
0
    PRUintn hidx = _MW_HASH(desc->fd, hash->length);
273
0
    PRUintn hoffset = 0;
274
275
0
    while (rehash-- > 0)
276
0
    {
277
0
        waiter = &hash->recv_wait;
278
0
        if (NULL == waiter[hidx])
279
0
        {
280
0
            waiter[hidx] = desc;
281
0
            hash->count += 1;
282
#if 0
283
            printf("Adding 0x%x->0x%x ", desc, desc->fd);
284
            printf(
285
                "table[%u:%u:*%u]: 0x%x->0x%x\n",
286
                hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
287
#endif
288
0
            return _prmw_success;
289
0
        }
290
0
        if (desc == waiter[hidx])
291
0
        {
292
0
            PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);  /* desc already in table */
293
0
            return _prmw_error;
294
0
        }
295
#if 0
296
        printf("Failing 0x%x->0x%x ", desc, desc->fd);
297
        printf(
298
            "table[*%u:%u:%u]: 0x%x->0x%x\n",
299
            hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
300
#endif
301
0
        if (0 == hoffset)
302
0
        {
303
0
            hoffset = _MW_HASH2(desc->fd, hash->length);
304
0
            PR_ASSERT(0 != hoffset);
305
0
        }
306
0
        hidx = (hidx + hoffset) % (hash->length);
307
0
    }
308
0
    return _prmw_rehash;
309
0
}  /* MW_AddHashInternal */
310
311
static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
312
0
{
313
0
    PRRecvWait **desc;
314
0
    PRUint32 pidx, length;
315
0
    _PRWaiterHash *newHash, *oldHash = group->waiter;
316
0
    PRBool retry;
317
0
    _PR_HashStory hrv;
318
319
0
    static const PRInt32 prime_number[] = {
320
0
        _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
321
0
        2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771
322
0
    };
323
0
    PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
324
325
    /* look up the next size we'd like to use for the hash table */
326
0
    for (pidx = 0; pidx < primes; ++pidx)
327
0
    {
328
0
        if (prime_number[pidx] == oldHash->length)
329
0
        {
330
0
            break;
331
0
        }
332
0
    }
333
    /* table size must be one of the prime numbers */
334
0
    PR_ASSERT(pidx < primes);
335
336
    /* if pidx == primes - 1, we can't expand the table any more */
337
0
    while (pidx < primes - 1)
338
0
    {
339
        /* next size */
340
0
        ++pidx;
341
0
        length = prime_number[pidx];
342
343
        /* allocate the new hash table and fill it in with the old */
344
0
        newHash = (_PRWaiterHash*)PR_CALLOC(
345
0
                      sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
346
0
        if (NULL == newHash)
347
0
        {
348
0
            PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
349
0
            return _prmw_error;
350
0
        }
351
352
0
        newHash->length = length;
353
0
        retry = PR_FALSE;
354
0
        for (desc = &oldHash->recv_wait;
355
0
             newHash->count < oldHash->count; ++desc)
356
0
        {
357
0
            PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
358
0
            if (NULL != *desc)
359
0
            {
360
0
                hrv = MW_AddHashInternal(*desc, newHash);
361
0
                PR_ASSERT(_prmw_error != hrv);
362
0
                if (_prmw_success != hrv)
363
0
                {
364
0
                    PR_DELETE(newHash);
365
0
                    retry = PR_TRUE;
366
0
                    break;
367
0
                }
368
0
            }
369
0
        }
370
0
        if (retry) {
371
0
            continue;
372
0
        }
373
374
0
        PR_DELETE(group->waiter);
375
0
        group->waiter = newHash;
376
0
        group->p_timestamp += 1;
377
0
        return _prmw_success;
378
0
    }
379
380
0
    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
381
0
    return _prmw_error;  /* we're hosed */
382
0
}  /* MW_ExpandHashInternal */
383
384
#ifndef WINNT
385
static void _MW_DoneInternal(
386
    PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
387
0
{
388
    /*
389
    ** Add this receive wait object to the list of finished I/O
390
    ** operations for this particular group. If there are other
391
    ** threads waiting on the group, notify one. If not, arrange
392
    ** for this thread to return.
393
    */
394
395
#if 0
396
    printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
397
#endif
398
0
    (*waiter)->outcome = outcome;
399
0
    PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
400
0
    PR_NotifyCondVar(group->io_complete);
401
0
    PR_ASSERT(0 != group->waiter->count);
402
0
    group->waiter->count -= 1;
403
0
    *waiter = NULL;
404
0
}  /* _MW_DoneInternal */
405
#endif /* WINNT */
406
407
static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
408
0
{
409
    /*
410
    ** Find the receive wait object corresponding to the file descriptor.
411
    ** Only search the wait group specified.
412
    */
413
0
    PRRecvWait **desc;
414
0
    PRIntn rehash = _MW_REHASH_MAX;
415
0
    _PRWaiterHash *hash = group->waiter;
416
0
    PRUintn hidx = _MW_HASH(fd, hash->length);
417
0
    PRUintn hoffset = 0;
418
419
0
    while (rehash-- > 0)
420
0
    {
421
0
        desc = (&hash->recv_wait) + hidx;
422
0
        if ((*desc != NULL) && ((*desc)->fd == fd)) {
423
0
            return desc;
424
0
        }
425
0
        if (0 == hoffset)
426
0
        {
427
0
            hoffset = _MW_HASH2(fd, hash->length);
428
0
            PR_ASSERT(0 != hoffset);
429
0
        }
430
0
        hidx = (hidx + hoffset) % (hash->length);
431
0
    }
432
0
    return NULL;
433
0
}  /* _MW_LookupInternal */
434
435
#ifndef WINNT
436
static PRStatus _MW_PollInternal(PRWaitGroup *group)
437
0
{
438
0
    PRRecvWait **waiter;
439
0
    PRStatus rv = PR_FAILURE;
440
0
    PRInt32 count, count_ready;
441
0
    PRIntervalTime polling_interval;
442
443
0
    group->poller = PR_GetCurrentThread();
444
445
0
    while (PR_TRUE)
446
0
    {
447
0
        PRIntervalTime now, since_last_poll;
448
0
        PRPollDesc *poll_list;
449
450
0
        while (0 == group->waiter->count)
451
0
        {
452
0
            PRStatus st;
453
0
            st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
454
0
            if (_prmw_running != group->state)
455
0
            {
456
0
                PR_SetError(PR_INVALID_STATE_ERROR, 0);
457
0
                goto aborted;
458
0
            }
459
0
            if (_MW_ABORTED(st)) {
460
0
                goto aborted;
461
0
            }
462
0
        }
463
464
        /*
465
        ** There's something to do. See if our existing polling list
466
        ** is large enough for what we have to do?
467
        */
468
469
0
        while (group->polling_count < group->waiter->count)
470
0
        {
471
0
            PRUint32 old_count = group->waiter->count;
472
0
            PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
473
0
            PRSize new_size = sizeof(PRPollDesc) * new_count;
474
0
            PRPollDesc *old_polling_list = group->polling_list;
475
476
0
            PR_Unlock(group->ml);
477
0
            poll_list = (PRPollDesc*)PR_CALLOC(new_size);
478
0
            if (NULL == poll_list)
479
0
            {
480
0
                PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
481
0
                PR_Lock(group->ml);
482
0
                goto failed_alloc;
483
0
            }
484
0
            if (NULL != old_polling_list) {
485
0
                PR_DELETE(old_polling_list);
486
0
            }
487
0
            PR_Lock(group->ml);
488
0
            if (_prmw_running != group->state)
489
0
            {
490
0
                PR_DELETE(poll_list);
491
0
                PR_SetError(PR_INVALID_STATE_ERROR, 0);
492
0
                goto aborted;
493
0
            }
494
0
            group->polling_list = poll_list;
495
0
            group->polling_count = new_count;
496
0
        }
497
498
0
        now = PR_IntervalNow();
499
0
        polling_interval = max_polling_interval;
500
0
        since_last_poll = now - group->last_poll;
501
502
0
        waiter = &group->waiter->recv_wait;
503
0
        poll_list = group->polling_list;
504
0
        for (count = 0; count < group->waiter->count; ++waiter)
505
0
        {
506
0
            PR_ASSERT(waiter < &group->waiter->recv_wait
507
0
                      + group->waiter->length);
508
0
            if (NULL != *waiter)  /* a live one! */
509
0
            {
510
0
                if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
511
0
                    && (since_last_poll >= (*waiter)->timeout)) {
512
0
                    _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
513
0
                }
514
0
                else
515
0
                {
516
0
                    if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
517
0
                    {
518
0
                        (*waiter)->timeout -= since_last_poll;
519
0
                        if ((*waiter)->timeout < polling_interval) {
520
0
                            polling_interval = (*waiter)->timeout;
521
0
                        }
522
0
                    }
523
0
                    PR_ASSERT(poll_list < group->polling_list
524
0
                              + group->polling_count);
525
0
                    poll_list->fd = (*waiter)->fd;
526
0
                    poll_list->in_flags = PR_POLL_READ;
527
0
                    poll_list->out_flags = 0;
528
#if 0
529
                    printf(
530
                        "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
531
                        poll_list, count, poll_list->fd, (*waiter)->timeout);
532
#endif
533
0
                    poll_list += 1;
534
0
                    count += 1;
535
0
                }
536
0
            }
537
0
        }
538
539
0
        PR_ASSERT(count == group->waiter->count);
540
541
        /*
542
        ** If there are no more threads waiting for completion,
543
        ** we need to return.
544
        */
545
0
        if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
546
0
            && (1 == group->waiting_threads)) {
547
0
            break;
548
0
        }
549
550
0
        if (0 == count) {
551
0
            continue;    /* wait for new business */
552
0
        }
553
554
0
        group->last_poll = now;
555
556
0
        PR_Unlock(group->ml);
557
558
0
        count_ready = PR_Poll(group->polling_list, count, polling_interval);
559
560
0
        PR_Lock(group->ml);
561
562
0
        if (_prmw_running != group->state)
563
0
        {
564
0
            PR_SetError(PR_INVALID_STATE_ERROR, 0);
565
0
            goto aborted;
566
0
        }
567
0
        if (-1 == count_ready)
568
0
        {
569
0
            goto failed_poll;  /* that's a shame */
570
0
        }
571
0
        else if (0 < count_ready)
572
0
        {
573
0
            for (poll_list = group->polling_list; count > 0;
574
0
                 poll_list++, count--)
575
0
            {
576
0
                PR_ASSERT(
577
0
                    poll_list < group->polling_list + group->polling_count);
578
0
                if (poll_list->out_flags != 0)
579
0
                {
580
0
                    waiter = _MW_LookupInternal(group, poll_list->fd);
581
                    /*
582
                    ** If 'waiter' is NULL, that means the wait receive
583
                    ** descriptor has been canceled.
584
                    */
585
0
                    if (NULL != waiter) {
586
0
                        _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
587
0
                    }
588
0
                }
589
0
            }
590
0
        }
591
        /*
592
        ** If there are no more threads waiting for completion,
593
        ** we need to return.
594
        ** This thread was "borrowed" to do the polling, but it really
595
        ** belongs to the client.
596
        */
597
0
        if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
598
0
            && (1 == group->waiting_threads)) {
599
0
            break;
600
0
        }
601
0
    }
602
603
0
    rv = PR_SUCCESS;
604
605
0
aborted:
606
0
failed_poll:
607
0
failed_alloc:
608
0
    group->poller = NULL;  /* we were that, not we ain't */
609
0
    if ((_prmw_running == group->state) && (group->waiting_threads > 1))
610
0
    {
611
        /* Wake up one thread to become the new poller. */
612
0
        PR_NotifyCondVar(group->io_complete);
613
0
    }
614
0
    return rv;  /* we return with the lock held */
615
0
}  /* _MW_PollInternal */
616
#endif /* !WINNT */
617
618
static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
619
0
{
620
0
    PRMWGroupState rv = group->state;
621
    /*
622
    ** Looking at the group's fields is safe because
623
    ** once the group's state is no longer running, it
624
    ** cannot revert and there is a safe check on entry
625
    ** to make sure no more threads are made to wait.
626
    */
627
0
    if ((_prmw_stopping == rv)
628
0
        && (0 == group->waiting_threads))
629
0
    {
630
0
        rv = group->state = _prmw_stopped;
631
0
        PR_NotifyCondVar(group->mw_manage);
632
0
    }
633
0
    return rv;
634
0
}  /* MW_TestForShutdownInternal */
635
636
#ifndef WINNT
637
static void _MW_InitialRecv(PRCList *io_ready)
638
0
{
639
0
    PRRecvWait *desc = (PRRecvWait*)io_ready;
640
0
    if ((NULL == desc->buffer.start)
641
0
        || (0 == desc->buffer.length)) {
642
0
        desc->bytesRecv = 0;
643
0
    }
644
0
    else
645
0
    {
646
0
        desc->bytesRecv = (desc->fd->methods->recv)(
647
0
                              desc->fd, desc->buffer.start,
648
0
                              desc->buffer.length, 0, desc->timeout);
649
0
        if (desc->bytesRecv < 0) { /* SetError should already be there */
650
0
            desc->outcome = PR_MW_FAILURE;
651
0
        }
652
0
    }
653
0
}  /* _MW_InitialRecv */
654
#endif
655
656
#ifdef WINNT
657
static void NT_TimeProc(void *arg)
658
{
659
    _MDOverlapped *overlapped = (_MDOverlapped *)arg;
660
    PRRecvWait *desc =  overlapped->data.mw.desc;
661
    PRFileDesc *bottom;
662
663
    if (InterlockedCompareExchange((LONG *)&desc->outcome,
664
                                   (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING)
665
    {
666
        /* This wait recv descriptor has already completed. */
667
        return;
668
    }
669
670
    /* close the osfd to abort the outstanding async io request */
671
    /* $$$$
672
    ** Little late to be checking if NSPR's on the bottom of stack,
673
    ** but if we don't check, we can't assert that the private data
674
    ** is what we think it is.
675
    ** $$$$
676
    */
677
    bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
678
    PR_ASSERT(NULL != bottom);
679
    if (NULL != bottom)  /* now what!?!?! */
680
    {
681
        bottom->secret->state = _PR_FILEDESC_CLOSED;
682
        if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
683
        {
684
            fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
685
            PR_NOT_REACHED("What shall I do?");
686
        }
687
    }
688
    return;
689
}  /* NT_TimeProc */
690
691
static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd)
692
{
693
    PRRecvWait **waiter;
694
695
    _PR_MD_LOCK(&group->mdlock);
696
    waiter = _MW_LookupInternal(group, fd);
697
    if (NULL != waiter)
698
    {
699
        group->waiter->count -= 1;
700
        *waiter = NULL;
701
    }
702
    _PR_MD_UNLOCK(&group->mdlock);
703
    return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
704
}
705
706
PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd)
707
{
708
    PRRecvWait **waiter;
709
710
    waiter = _MW_LookupInternal(group, fd);
711
    if (NULL != waiter)
712
    {
713
        group->waiter->count -= 1;
714
        *waiter = NULL;
715
    }
716
    return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
717
}
718
#endif /* WINNT */
719
720
/******************************************************************/
721
/******************************************************************/
722
/********************** The public API portion ********************/
723
/******************************************************************/
724
/******************************************************************/
725
PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
726
    PRWaitGroup *group, PRRecvWait *desc)
727
0
{
728
0
    _PR_HashStory hrv;
729
0
    PRStatus rv = PR_FAILURE;
730
#ifdef WINNT
731
    _MDOverlapped *overlapped;
732
    HANDLE hFile;
733
    BOOL bResult;
734
    DWORD dwError;
735
    PRFileDesc *bottom;
736
#endif
737
738
0
    if (!_pr_initialized) {
739
0
        _PR_ImplicitInitialization();
740
0
    }
741
0
    if ((NULL == group) && (NULL == (group = MW_Init2())))
742
0
    {
743
0
        return rv;
744
0
    }
745
746
0
    PR_ASSERT(NULL != desc->fd);
747
748
0
    desc->outcome = PR_MW_PENDING;  /* nice, well known value */
749
0
    desc->bytesRecv = 0;  /* likewise, though this value is ambiguious */
750
751
0
    PR_Lock(group->ml);
752
753
0
    if (_prmw_running != group->state)
754
0
    {
755
        /* Not allowed to add after cancelling the group */
756
0
        desc->outcome = PR_MW_INTERRUPT;
757
0
        PR_SetError(PR_INVALID_STATE_ERROR, 0);
758
0
        PR_Unlock(group->ml);
759
0
        return rv;
760
0
    }
761
762
#ifdef WINNT
763
    _PR_MD_LOCK(&group->mdlock);
764
#endif
765
766
    /*
767
    ** If the waiter count is zero at this point, there's no telling
768
    ** how long we've been idle. Therefore, initialize the beginning
769
    ** of the timing interval. As long as the list doesn't go empty,
770
    ** it will maintain itself.
771
    */
772
0
    if (0 == group->waiter->count) {
773
0
        group->last_poll = PR_IntervalNow();
774
0
    }
775
776
0
    do
777
0
    {
778
0
        hrv = MW_AddHashInternal(desc, group->waiter);
779
0
        if (_prmw_rehash != hrv) {
780
0
            break;
781
0
        }
782
0
        hrv = MW_ExpandHashInternal(group);  /* gruesome */
783
0
        if (_prmw_success != hrv) {
784
0
            break;
785
0
        }
786
0
    } while (PR_TRUE);
787
788
#ifdef WINNT
789
    _PR_MD_UNLOCK(&group->mdlock);
790
#endif
791
792
0
    PR_NotifyCondVar(group->new_business);  /* tell the world */
793
0
    rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
794
0
    PR_Unlock(group->ml);
795
796
#ifdef WINNT
797
    overlapped = PR_NEWZAP(_MDOverlapped);
798
    if (NULL == overlapped)
799
    {
800
        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
801
        NT_HashRemove(group, desc->fd);
802
        return rv;
803
    }
804
    overlapped->ioModel = _MD_MultiWaitIO;
805
    overlapped->data.mw.desc = desc;
806
    overlapped->data.mw.group = group;
807
    if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
808
    {
809
        overlapped->data.mw.timer = CreateTimer(
810
                                        desc->timeout,
811
                                        NT_TimeProc,
812
                                        overlapped);
813
        if (0 == overlapped->data.mw.timer)
814
        {
815
            NT_HashRemove(group, desc->fd);
816
            PR_DELETE(overlapped);
817
            /*
818
             * XXX It appears that a maximum of 16 timer events can
819
             * be outstanding. GetLastError() returns 0 when I try it.
820
             */
821
            PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
822
            return PR_FAILURE;
823
        }
824
    }
825
826
    /* Reach to the bottom layer to get the OS fd */
827
    bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
828
    PR_ASSERT(NULL != bottom);
829
    if (NULL == bottom)
830
    {
831
        PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
832
        return PR_FAILURE;
833
    }
834
    hFile = (HANDLE)bottom->secret->md.osfd;
835
    if (!bottom->secret->md.io_model_committed)
836
    {
837
        PRInt32 st;
838
        st = _md_Associate(hFile);
839
        PR_ASSERT(0 != st);
840
        bottom->secret->md.io_model_committed = PR_TRUE;
841
    }
842
    bResult = ReadFile(hFile,
843
                       desc->buffer.start,
844
                       (DWORD)desc->buffer.length,
845
                       NULL,
846
                       &overlapped->overlapped);
847
    if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)
848
    {
849
        if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
850
        {
851
            if (InterlockedCompareExchange((LONG *)&desc->outcome,
852
                                           (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING)
853
                == (LONG)PR_MW_PENDING)
854
            {
855
                CancelTimer(overlapped->data.mw.timer);
856
            }
857
            NT_HashRemove(group, desc->fd);
858
            PR_DELETE(overlapped);
859
        }
860
        _PR_MD_MAP_READ_ERROR(dwError);
861
        rv = PR_FAILURE;
862
    }
863
#endif
864
865
0
    return rv;
866
0
}  /* PR_AddWaitFileDesc */
867
868
PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
869
0
{
870
0
    PRCList *io_ready = NULL;
871
#ifdef WINNT
872
    PRThread *me = _PR_MD_CURRENT_THREAD();
873
    _MDOverlapped *overlapped;
874
#endif
875
876
0
    if (!_pr_initialized) {
877
0
        _PR_ImplicitInitialization();
878
0
    }
879
0
    if ((NULL == group) && (NULL == (group = MW_Init2()))) {
880
0
        goto failed_init;
881
0
    }
882
883
0
    PR_Lock(group->ml);
884
885
0
    if (_prmw_running != group->state)
886
0
    {
887
0
        PR_SetError(PR_INVALID_STATE_ERROR, 0);
888
0
        goto invalid_state;
889
0
    }
890
891
0
    group->waiting_threads += 1;  /* the polling thread is counted */
892
893
#ifdef WINNT
894
    _PR_MD_LOCK(&group->mdlock);
895
    while (PR_CLIST_IS_EMPTY(&group->io_ready))
896
    {
897
        _PR_THREAD_LOCK(me);
898
        me->state = _PR_IO_WAIT;
899
        PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
900
        if (!_PR_IS_NATIVE_THREAD(me))
901
        {
902
            _PR_SLEEPQ_LOCK(me->cpu);
903
            _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
904
            _PR_SLEEPQ_UNLOCK(me->cpu);
905
        }
906
        _PR_THREAD_UNLOCK(me);
907
        _PR_MD_UNLOCK(&group->mdlock);
908
        PR_Unlock(group->ml);
909
        _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
910
        me->state = _PR_RUNNING;
911
        PR_Lock(group->ml);
912
        _PR_MD_LOCK(&group->mdlock);
913
        if (_PR_PENDING_INTERRUPT(me)) {
914
            PR_REMOVE_LINK(&me->waitQLinks);
915
            _PR_MD_UNLOCK(&group->mdlock);
916
            me->flags &= ~_PR_INTERRUPT;
917
            me->io_suspended = PR_FALSE;
918
            PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
919
            goto aborted;
920
        }
921
    }
922
    io_ready = PR_LIST_HEAD(&group->io_ready);
923
    PR_ASSERT(io_ready != NULL);
924
    PR_REMOVE_LINK(io_ready);
925
    _PR_MD_UNLOCK(&group->mdlock);
926
    overlapped = (_MDOverlapped *)
927
                 ((char *)io_ready - offsetof(_MDOverlapped, data));
928
    io_ready = &overlapped->data.mw.desc->internal;
929
#else
930
0
    do
931
0
    {
932
        /*
933
        ** If the I/O ready list isn't empty, have this thread
934
        ** return with the first receive wait object that's available.
935
        */
936
0
        if (PR_CLIST_IS_EMPTY(&group->io_ready))
937
0
        {
938
            /*
939
            ** Is there a polling thread yet? If not, grab this thread
940
            ** and use it.
941
            */
942
0
            if (NULL == group->poller)
943
0
            {
944
                /*
945
                ** This thread will stay do polling until it becomes the only one
946
                ** left to service a completion. Then it will return and there will
947
                ** be none left to actually poll or to run completions.
948
                **
949
                ** The polling function should only return w/ failure or
950
                ** with some I/O ready.
951
                */
952
0
                if (PR_FAILURE == _MW_PollInternal(group)) {
953
0
                    goto failed_poll;
954
0
                }
955
0
            }
956
0
            else
957
0
            {
958
                /*
959
                ** There are four reasons a thread can be awakened from
960
                ** a wait on the io_complete condition variable.
961
                ** 1. Some I/O has completed, i.e., the io_ready list
962
                **    is nonempty.
963
                ** 2. The wait group is canceled.
964
                ** 3. The thread is interrupted.
965
                ** 4. The current polling thread has to leave and needs
966
                **    a replacement.
967
                ** The logic to find a new polling thread is made more
968
                ** complicated by all the other possible events.
969
                ** I tried my best to write the logic clearly, but
970
                ** it is still full of if's with continue and goto.
971
                */
972
0
                PRStatus st;
973
0
                do
974
0
                {
975
0
                    st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
976
0
                    if (_prmw_running != group->state)
977
0
                    {
978
0
                        PR_SetError(PR_INVALID_STATE_ERROR, 0);
979
0
                        goto aborted;
980
0
                    }
981
0
                    if (_MW_ABORTED(st) || (NULL == group->poller)) {
982
0
                        break;
983
0
                    }
984
0
                } while (PR_CLIST_IS_EMPTY(&group->io_ready));
985
986
                /*
987
                ** The thread is interrupted and has to leave.  It might
988
                ** have also been awakened to process ready i/o or be the
989
                ** new poller.  To be safe, if either condition is true,
990
                ** we awaken another thread to take its place.
991
                */
992
0
                if (_MW_ABORTED(st))
993
0
                {
994
0
                    if ((NULL == group->poller
995
0
                         || !PR_CLIST_IS_EMPTY(&group->io_ready))
996
0
                        && group->waiting_threads > 1) {
997
0
                        PR_NotifyCondVar(group->io_complete);
998
0
                    }
999
0
                    goto aborted;
1000
0
                }
1001
1002
                /*
1003
                ** A new poller is needed, but can I be the new poller?
1004
                ** If there is no i/o ready, sure.  But if there is any
1005
                ** i/o ready, it has a higher priority.  I want to
1006
                ** process the ready i/o first and wake up another
1007
                ** thread to be the new poller.
1008
                */
1009
0
                if (NULL == group->poller)
1010
0
                {
1011
0
                    if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
1012
0
                        continue;
1013
0
                    }
1014
0
                    if (group->waiting_threads > 1) {
1015
0
                        PR_NotifyCondVar(group->io_complete);
1016
0
                    }
1017
0
                }
1018
0
            }
1019
0
            PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
1020
0
        }
1021
0
        io_ready = PR_LIST_HEAD(&group->io_ready);
1022
0
        PR_NotifyCondVar(group->io_taken);
1023
0
        PR_ASSERT(io_ready != NULL);
1024
0
        PR_REMOVE_LINK(io_ready);
1025
0
    } while (NULL == io_ready);
1026
1027
0
failed_poll:
1028
1029
0
#endif
1030
1031
0
aborted:
1032
1033
0
    group->waiting_threads -= 1;
1034
0
invalid_state:
1035
0
    (void)MW_TestForShutdownInternal(group);
1036
0
    PR_Unlock(group->ml);
1037
1038
0
failed_init:
1039
0
    if (NULL != io_ready)
1040
0
    {
1041
        /* If the operation failed, record the reason why */
1042
0
        switch (((PRRecvWait*)io_ready)->outcome)
1043
0
        {
1044
0
            case PR_MW_PENDING:
1045
0
                PR_ASSERT(0);
1046
0
                break;
1047
0
            case PR_MW_SUCCESS:
1048
0
#ifndef WINNT
1049
0
                _MW_InitialRecv(io_ready);
1050
0
#endif
1051
0
                break;
1052
#ifdef WINNT
1053
            case PR_MW_FAILURE:
1054
                _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
1055
                break;
1056
#endif
1057
0
            case PR_MW_TIMEOUT:
1058
0
                PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
1059
0
                break;
1060
0
            case PR_MW_INTERRUPT:
1061
0
                PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
1062
0
                break;
1063
0
            default: break;
1064
0
        }
1065
#ifdef WINNT
1066
        if (NULL != overlapped->data.mw.timer)
1067
        {
1068
            PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1069
                      != overlapped->data.mw.desc->timeout);
1070
            CancelTimer(overlapped->data.mw.timer);
1071
        }
1072
        else
1073
        {
1074
            PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1075
                      == overlapped->data.mw.desc->timeout);
1076
        }
1077
        PR_DELETE(overlapped);
1078
#endif
1079
0
    }
1080
0
    return (PRRecvWait*)io_ready;
1081
0
}  /* PR_WaitRecvReady */
1082
1083
PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc)
1084
0
{
1085
0
#if !defined(WINNT)
1086
0
    PRRecvWait **recv_wait;
1087
0
#endif
1088
0
    PRStatus rv = PR_SUCCESS;
1089
0
    if (NULL == group) {
1090
0
        group = mw_state->group;
1091
0
    }
1092
0
    PR_ASSERT(NULL != group);
1093
0
    if (NULL == group)
1094
0
    {
1095
0
        PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1096
0
        return PR_FAILURE;
1097
0
    }
1098
1099
0
    PR_Lock(group->ml);
1100
1101
0
    if (_prmw_running != group->state)
1102
0
    {
1103
0
        PR_SetError(PR_INVALID_STATE_ERROR, 0);
1104
0
        rv = PR_FAILURE;
1105
0
        goto unlock;
1106
0
    }
1107
1108
#ifdef WINNT
1109
    if (InterlockedCompareExchange((LONG *)&desc->outcome,
1110
                                   (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING)
1111
    {
1112
        PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
1113
        PR_ASSERT(NULL != bottom);
1114
        if (NULL == bottom)
1115
        {
1116
            PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1117
            goto unlock;
1118
        }
1119
        bottom->secret->state = _PR_FILEDESC_CLOSED;
1120
#if 0
1121
        fprintf(stderr, "cancel wait recv: closing socket\n");
1122
#endif
1123
        if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1124
        {
1125
            fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
1126
            exit(1);
1127
        }
1128
    }
1129
#else
1130
0
    if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
1131
0
    {
1132
        /* it was in the wait table */
1133
0
        _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
1134
0
        goto unlock;
1135
0
    }
1136
0
    if (!PR_CLIST_IS_EMPTY(&group->io_ready))
1137
0
    {
1138
        /* is it already complete? */
1139
0
        PRCList *head = PR_LIST_HEAD(&group->io_ready);
1140
0
        do
1141
0
        {
1142
0
            PRRecvWait *done = (PRRecvWait*)head;
1143
0
            if (done == desc) {
1144
0
                goto unlock;
1145
0
            }
1146
0
            head = PR_NEXT_LINK(head);
1147
0
        } while (head != &group->io_ready);
1148
0
    }
1149
0
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1150
0
    rv = PR_FAILURE;
1151
1152
0
#endif
1153
0
unlock:
1154
0
    PR_Unlock(group->ml);
1155
0
    return rv;
1156
0
}  /* PR_CancelWaitFileDesc */
1157
1158
PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
1159
0
{
1160
0
    PRRecvWait **desc;
1161
0
    PRRecvWait *recv_wait = NULL;
1162
#ifdef WINNT
1163
    _MDOverlapped *overlapped;
1164
    PRRecvWait **end;
1165
    PRThread *me = _PR_MD_CURRENT_THREAD();
1166
#endif
1167
1168
0
    if (NULL == group) {
1169
0
        group = mw_state->group;
1170
0
    }
1171
0
    PR_ASSERT(NULL != group);
1172
0
    if (NULL == group)
1173
0
    {
1174
0
        PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1175
0
        return NULL;
1176
0
    }
1177
1178
0
    PR_Lock(group->ml);
1179
0
    if (_prmw_stopped != group->state)
1180
0
    {
1181
0
        if (_prmw_running == group->state) {
1182
0
            group->state = _prmw_stopping;    /* so nothing new comes in */
1183
0
        }
1184
0
        if (0 == group->waiting_threads) { /* is there anybody else? */
1185
0
            group->state = _prmw_stopped;    /* we can stop right now */
1186
0
        }
1187
0
        else
1188
0
        {
1189
0
            PR_NotifyAllCondVar(group->new_business);
1190
0
            PR_NotifyAllCondVar(group->io_complete);
1191
0
        }
1192
0
        while (_prmw_stopped != group->state) {
1193
0
            (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
1194
0
        }
1195
0
    }
1196
1197
#ifdef WINNT
1198
    _PR_MD_LOCK(&group->mdlock);
1199
#endif
1200
    /* make all the existing descriptors look done/interrupted */
1201
#ifdef WINNT
1202
    end = &group->waiter->recv_wait + group->waiter->length;
1203
    for (desc = &group->waiter->recv_wait; desc < end; ++desc)
1204
    {
1205
        if (NULL != *desc)
1206
        {
1207
            if (InterlockedCompareExchange((LONG *)&(*desc)->outcome,
1208
                                           (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING)
1209
                == (LONG)PR_MW_PENDING)
1210
            {
1211
                PRFileDesc *bottom = PR_GetIdentitiesLayer(
1212
                                         (*desc)->fd, PR_NSPR_IO_LAYER);
1213
                PR_ASSERT(NULL != bottom);
1214
                if (NULL == bottom)
1215
                {
1216
                    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1217
                    goto invalid_arg;
1218
                }
1219
                bottom->secret->state = _PR_FILEDESC_CLOSED;
1220
#if 0
1221
                fprintf(stderr, "cancel wait group: closing socket\n");
1222
#endif
1223
                if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1224
                {
1225
                    fprintf(stderr, "closesocket failed: %d\n",
1226
                            WSAGetLastError());
1227
                    exit(1);
1228
                }
1229
            }
1230
        }
1231
    }
1232
    while (group->waiter->count > 0)
1233
    {
1234
        _PR_THREAD_LOCK(me);
1235
        me->state = _PR_IO_WAIT;
1236
        PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
1237
        if (!_PR_IS_NATIVE_THREAD(me))
1238
        {
1239
            _PR_SLEEPQ_LOCK(me->cpu);
1240
            _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
1241
            _PR_SLEEPQ_UNLOCK(me->cpu);
1242
        }
1243
        _PR_THREAD_UNLOCK(me);
1244
        _PR_MD_UNLOCK(&group->mdlock);
1245
        PR_Unlock(group->ml);
1246
        _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
1247
        me->state = _PR_RUNNING;
1248
        PR_Lock(group->ml);
1249
        _PR_MD_LOCK(&group->mdlock);
1250
    }
1251
#else
1252
0
    for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
1253
0
    {
1254
0
        PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
1255
0
        if (NULL != *desc) {
1256
0
            _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
1257
0
        }
1258
0
    }
1259
0
#endif
1260
1261
    /* take first element of finished list and return it or NULL */
1262
0
    if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
1263
0
        PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1264
0
    }
1265
0
    else
1266
0
    {
1267
0
        PRCList *head = PR_LIST_HEAD(&group->io_ready);
1268
0
        PR_REMOVE_AND_INIT_LINK(head);
1269
#ifdef WINNT
1270
        overlapped = (_MDOverlapped *)
1271
                     ((char *)head - offsetof(_MDOverlapped, data));
1272
        head = &overlapped->data.mw.desc->internal;
1273
        if (NULL != overlapped->data.mw.timer)
1274
        {
1275
            PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1276
                      != overlapped->data.mw.desc->timeout);
1277
            CancelTimer(overlapped->data.mw.timer);
1278
        }
1279
        else
1280
        {
1281
            PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1282
                      == overlapped->data.mw.desc->timeout);
1283
        }
1284
        PR_DELETE(overlapped);
1285
#endif
1286
0
        recv_wait = (PRRecvWait*)head;
1287
0
    }
1288
#ifdef WINNT
1289
invalid_arg:
1290
    _PR_MD_UNLOCK(&group->mdlock);
1291
#endif
1292
0
    PR_Unlock(group->ml);
1293
1294
0
    return recv_wait;
1295
0
}  /* PR_CancelWaitGroup */
1296
1297
PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
1298
0
{
1299
0
    PRWaitGroup *wg;
1300
1301
0
    if (NULL == (wg = PR_NEWZAP(PRWaitGroup)))
1302
0
    {
1303
0
        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1304
0
        goto failed;
1305
0
    }
1306
    /* the wait group itself */
1307
0
    wg->ml = PR_NewLock();
1308
0
    if (NULL == wg->ml) {
1309
0
        goto failed_lock;
1310
0
    }
1311
0
    wg->io_taken = PR_NewCondVar(wg->ml);
1312
0
    if (NULL == wg->io_taken) {
1313
0
        goto failed_cvar0;
1314
0
    }
1315
0
    wg->io_complete = PR_NewCondVar(wg->ml);
1316
0
    if (NULL == wg->io_complete) {
1317
0
        goto failed_cvar1;
1318
0
    }
1319
0
    wg->new_business = PR_NewCondVar(wg->ml);
1320
0
    if (NULL == wg->new_business) {
1321
0
        goto failed_cvar2;
1322
0
    }
1323
0
    wg->mw_manage = PR_NewCondVar(wg->ml);
1324
0
    if (NULL == wg->mw_manage) {
1325
0
        goto failed_cvar3;
1326
0
    }
1327
1328
0
    PR_INIT_CLIST(&wg->group_link);
1329
0
    PR_INIT_CLIST(&wg->io_ready);
1330
1331
    /* the waiters sequence */
1332
0
    wg->waiter = (_PRWaiterHash*)PR_CALLOC(
1333
0
                     sizeof(_PRWaiterHash) +
1334
0
                     (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
1335
0
    if (NULL == wg->waiter)
1336
0
    {
1337
0
        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1338
0
        goto failed_waiter;
1339
0
    }
1340
0
    wg->waiter->count = 0;
1341
0
    wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
1342
1343
#ifdef WINNT
1344
    _PR_MD_NEW_LOCK(&wg->mdlock);
1345
    PR_INIT_CLIST(&wg->wait_list);
1346
#endif /* WINNT */
1347
1348
0
    PR_Lock(mw_lock);
1349
0
    PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
1350
0
    PR_Unlock(mw_lock);
1351
0
    return wg;
1352
1353
0
failed_waiter:
1354
0
    PR_DestroyCondVar(wg->mw_manage);
1355
0
failed_cvar3:
1356
0
    PR_DestroyCondVar(wg->new_business);
1357
0
failed_cvar2:
1358
0
    PR_DestroyCondVar(wg->io_complete);
1359
0
failed_cvar1:
1360
0
    PR_DestroyCondVar(wg->io_taken);
1361
0
failed_cvar0:
1362
0
    PR_DestroyLock(wg->ml);
1363
0
failed_lock:
1364
0
    PR_DELETE(wg);
1365
0
    wg = NULL;
1366
1367
0
failed:
1368
0
    return wg;
1369
0
}  /* MW_CreateWaitGroup */
1370
1371
PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
1372
0
{
1373
0
    PRStatus rv = PR_SUCCESS;
1374
0
    if (NULL == group) {
1375
0
        group = mw_state->group;
1376
0
    }
1377
0
    PR_ASSERT(NULL != group);
1378
0
    if (NULL != group)
1379
0
    {
1380
0
        PR_Lock(group->ml);
1381
0
        if ((group->waiting_threads == 0)
1382
0
            && (group->waiter->count == 0)
1383
0
            && PR_CLIST_IS_EMPTY(&group->io_ready))
1384
0
        {
1385
0
            group->state = _prmw_stopped;
1386
0
        }
1387
0
        else
1388
0
        {
1389
0
            PR_SetError(PR_INVALID_STATE_ERROR, 0);
1390
0
            rv = PR_FAILURE;
1391
0
        }
1392
0
        PR_Unlock(group->ml);
1393
0
        if (PR_FAILURE == rv) {
1394
0
            return rv;
1395
0
        }
1396
1397
0
        PR_Lock(mw_lock);
1398
0
        PR_REMOVE_LINK(&group->group_link);
1399
0
        PR_Unlock(mw_lock);
1400
1401
#ifdef WINNT
1402
        /*
1403
         * XXX make sure wait_list is empty and waiter is empty.
1404
         * These must be checked while holding mdlock.
1405
         */
1406
        _PR_MD_FREE_LOCK(&group->mdlock);
1407
#endif
1408
1409
0
        PR_DELETE(group->waiter);
1410
0
        PR_DELETE(group->polling_list);
1411
0
        PR_DestroyCondVar(group->mw_manage);
1412
0
        PR_DestroyCondVar(group->new_business);
1413
0
        PR_DestroyCondVar(group->io_complete);
1414
0
        PR_DestroyCondVar(group->io_taken);
1415
0
        PR_DestroyLock(group->ml);
1416
0
        if (group == mw_state->group) {
1417
0
            mw_state->group = NULL;
1418
0
        }
1419
0
        PR_DELETE(group);
1420
0
    }
1421
0
    else
1422
0
    {
1423
        /* The default wait group is not created yet. */
1424
0
        PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1425
0
        rv = PR_FAILURE;
1426
0
    }
1427
0
    return rv;
1428
0
}  /* PR_DestroyWaitGroup */
1429
1430
/**********************************************************************
1431
***********************************************************************
1432
******************** Wait group enumerations **************************
1433
***********************************************************************
1434
**********************************************************************/
1435
1436
PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group)
1437
0
{
1438
0
    PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator);
1439
0
    if (NULL == enumerator) {
1440
0
        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1441
0
    }
1442
0
    else
1443
0
    {
1444
0
        enumerator->group = group;
1445
0
        enumerator->seal = _PR_ENUM_SEALED;
1446
0
    }
1447
0
    return enumerator;
1448
0
}  /* PR_CreateMWaitEnumerator */
1449
1450
PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator)
1451
0
{
1452
0
    PR_ASSERT(NULL != enumerator);
1453
0
    PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1454
0
    if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal))
1455
0
    {
1456
0
        PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1457
0
        return PR_FAILURE;
1458
0
    }
1459
0
    enumerator->seal = _PR_ENUM_UNSEALED;
1460
0
    PR_Free(enumerator);
1461
0
    return PR_SUCCESS;
1462
0
}  /* PR_DestroyMWaitEnumerator */
1463
1464
PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup(
1465
    PRMWaitEnumerator *enumerator, const PRRecvWait *previous)
1466
0
{
1467
0
    PRRecvWait *result = NULL;
1468
1469
    /* entry point sanity checking */
1470
0
    PR_ASSERT(NULL != enumerator);
1471
0
    PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1472
0
    if ((NULL == enumerator)
1473
0
        || (_PR_ENUM_SEALED != enumerator->seal)) {
1474
0
        goto bad_argument;
1475
0
    }
1476
1477
    /* beginning of enumeration */
1478
0
    if (NULL == previous)
1479
0
    {
1480
0
        if (NULL == enumerator->group)
1481
0
        {
1482
0
            enumerator->group = mw_state->group;
1483
0
            if (NULL == enumerator->group)
1484
0
            {
1485
0
                PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1486
0
                return NULL;
1487
0
            }
1488
0
        }
1489
0
        enumerator->waiter = &enumerator->group->waiter->recv_wait;
1490
0
        enumerator->p_timestamp = enumerator->group->p_timestamp;
1491
0
        enumerator->thread = PR_GetCurrentThread();
1492
0
        enumerator->index = 0;
1493
0
    }
1494
    /* continuing an enumeration */
1495
0
    else
1496
0
    {
1497
0
        PRThread *me = PR_GetCurrentThread();
1498
0
        PR_ASSERT(me == enumerator->thread);
1499
0
        if (me != enumerator->thread) {
1500
0
            goto bad_argument;
1501
0
        }
1502
1503
        /* need to restart the enumeration */
1504
0
        if (enumerator->p_timestamp != enumerator->group->p_timestamp) {
1505
0
            return PR_EnumerateWaitGroup(enumerator, NULL);
1506
0
        }
1507
0
    }
1508
1509
    /* actually progress the enumeration */
1510
#if defined(WINNT)
1511
    _PR_MD_LOCK(&enumerator->group->mdlock);
1512
#else
1513
0
    PR_Lock(enumerator->group->ml);
1514
0
#endif
1515
0
    while (enumerator->index++ < enumerator->group->waiter->length)
1516
0
    {
1517
0
        if (NULL != (result = *(enumerator->waiter)++)) {
1518
0
            break;
1519
0
        }
1520
0
    }
1521
#if defined(WINNT)
1522
    _PR_MD_UNLOCK(&enumerator->group->mdlock);
1523
#else
1524
0
    PR_Unlock(enumerator->group->ml);
1525
0
#endif
1526
1527
0
    return result;  /* what we live for */
1528
1529
0
bad_argument:
1530
0
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1531
0
    return NULL;  /* probably ambiguous */
1532
0
}  /* PR_EnumerateWaitGroup */
1533
1534
/* prmwait.c */