Coverage Report

Created: 2025-07-01 06:25

/src/nspr/pr/src/io/prmwait.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* This Source Code Form is subject to the terms of the Mozilla Public
3
 * License, v. 2.0. If a copy of the MPL was not distributed with this
4
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6
#include "primpl.h"
7
#include "pprmwait.h"
8
9
0
#define _MW_REHASH_MAX 11
10
11
static PRLock* mw_lock = NULL;
12
static _PRGlobalState* mw_state = NULL;
13
14
static PRIntervalTime max_polling_interval;
15
16
#ifdef WINNT
17
18
typedef struct TimerEvent {
19
  PRIntervalTime absolute;
20
  void (*func)(void*);
21
  void* arg;
22
  LONG ref_count;
23
  PRCList links;
24
} TimerEvent;
25
26
#  define TIMER_EVENT_PTR(_qp) \
27
    ((TimerEvent*)((char*)(_qp) - offsetof(TimerEvent, links)))
28
29
struct {
30
  PRLock* ml;
31
  PRCondVar* new_timer;
32
  PRCondVar* cancel_timer;
33
  PRThread* manager_thread;
34
  PRCList timer_queue;
35
} tm_vars;
36
37
static PRStatus TimerInit(void);
38
static void TimerManager(void* arg);
39
static TimerEvent* CreateTimer(PRIntervalTime timeout, void (*func)(void*),
40
                               void* arg);
41
static PRBool CancelTimer(TimerEvent* timer);
42
43
static void TimerManager(void* arg) {
44
  PRIntervalTime now;
45
  PRIntervalTime timeout;
46
  PRCList* head;
47
  TimerEvent* timer;
48
49
  PR_Lock(tm_vars.ml);
50
  while (1) {
51
    if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue)) {
52
      PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
53
    } else {
54
      now = PR_IntervalNow();
55
      head = PR_LIST_HEAD(&tm_vars.timer_queue);
56
      timer = TIMER_EVENT_PTR(head);
57
      if ((PRInt32)(now - timer->absolute) >= 0) {
58
        PR_REMOVE_LINK(head);
59
        /*
60
         * make its prev and next point to itself so that
61
         * it's obvious that it's not on the timer_queue.
62
         */
63
        PR_INIT_CLIST(head);
64
        PR_ASSERT(2 == timer->ref_count);
65
        PR_Unlock(tm_vars.ml);
66
        timer->func(timer->arg);
67
        PR_Lock(tm_vars.ml);
68
        timer->ref_count -= 1;
69
        if (0 == timer->ref_count) {
70
          PR_NotifyAllCondVar(tm_vars.cancel_timer);
71
        }
72
      } else {
73
        timeout = (PRIntervalTime)(timer->absolute - now);
74
        PR_WaitCondVar(tm_vars.new_timer, timeout);
75
      }
76
    }
77
  }
78
  PR_Unlock(tm_vars.ml);
79
}
80
81
static TimerEvent* CreateTimer(PRIntervalTime timeout, void (*func)(void*),
82
                               void* arg) {
83
  TimerEvent* timer;
84
  PRCList *links, *tail;
85
  TimerEvent* elem;
86
87
  timer = PR_NEW(TimerEvent);
88
  if (NULL == timer) {
89
    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
90
    return timer;
91
  }
92
  timer->absolute = PR_IntervalNow() + timeout;
93
  timer->func = func;
94
  timer->arg = arg;
95
  timer->ref_count = 2;
96
  PR_Lock(tm_vars.ml);
97
  tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
98
  while (links->prev != tail) {
99
    elem = TIMER_EVENT_PTR(links);
100
    if ((PRInt32)(timer->absolute - elem->absolute) >= 0) {
101
      break;
102
    }
103
    links = links->prev;
104
  }
105
  PR_INSERT_AFTER(&timer->links, links);
106
  PR_NotifyCondVar(tm_vars.new_timer);
107
  PR_Unlock(tm_vars.ml);
108
  return timer;
109
}
110
111
static PRBool CancelTimer(TimerEvent* timer) {
112
  PRBool canceled = PR_FALSE;
113
114
  PR_Lock(tm_vars.ml);
115
  timer->ref_count -= 1;
116
  if (timer->links.prev == &timer->links) {
117
    while (timer->ref_count == 1) {
118
      PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
119
    }
120
  } else {
121
    PR_REMOVE_LINK(&timer->links);
122
    canceled = PR_TRUE;
123
  }
124
  PR_Unlock(tm_vars.ml);
125
  PR_DELETE(timer);
126
  return canceled;
127
}
128
129
static PRStatus TimerInit(void) {
130
  tm_vars.ml = PR_NewLock();
131
  if (NULL == tm_vars.ml) {
132
    goto failed;
133
  }
134
  tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
135
  if (NULL == tm_vars.new_timer) {
136
    goto failed;
137
  }
138
  tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
139
  if (NULL == tm_vars.cancel_timer) {
140
    goto failed;
141
  }
142
  PR_INIT_CLIST(&tm_vars.timer_queue);
143
  tm_vars.manager_thread =
144
      PR_CreateThread(PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
145
                      PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
146
  if (NULL == tm_vars.manager_thread) {
147
    goto failed;
148
  }
149
  return PR_SUCCESS;
150
151
failed:
152
  if (NULL != tm_vars.cancel_timer) {
153
    PR_DestroyCondVar(tm_vars.cancel_timer);
154
  }
155
  if (NULL != tm_vars.new_timer) {
156
    PR_DestroyCondVar(tm_vars.new_timer);
157
  }
158
  if (NULL != tm_vars.ml) {
159
    PR_DestroyLock(tm_vars.ml);
160
  }
161
  return PR_FAILURE;
162
}
163
164
#endif /* WINNT */
165
166
/******************************************************************/
167
/******************************************************************/
168
/************************ The private portion *********************/
169
/******************************************************************/
170
/******************************************************************/
171
0
void _PR_InitMW(void) {
172
#ifdef WINNT
173
  /*
174
   * We use NT 4's InterlockedCompareExchange() to operate
175
   * on PRMWStatus variables.
176
   */
177
  PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
178
  TimerInit();
179
#endif
180
0
  mw_lock = PR_NewLock();
181
0
  PR_ASSERT(NULL != mw_lock);
182
0
  mw_state = PR_NEWZAP(_PRGlobalState);
183
0
  PR_ASSERT(NULL != mw_state);
184
0
  PR_INIT_CLIST(&mw_state->group_list);
185
0
  max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
186
0
} /* _PR_InitMW */
187
188
0
void _PR_CleanupMW(void) {
189
0
  PR_DestroyLock(mw_lock);
190
0
  mw_lock = NULL;
191
0
  if (mw_state->group) {
192
0
    PR_DestroyWaitGroup(mw_state->group);
193
    /* mw_state->group is set to NULL as a side effect. */
194
0
  }
195
0
  PR_DELETE(mw_state);
196
0
} /* _PR_CleanupMW */
197
198
0
static PRWaitGroup* MW_Init2(void) {
199
0
  PRWaitGroup* group = mw_state->group; /* it's the null group */
200
0
  if (NULL == group)                    /* there is this special case */
201
0
  {
202
0
    group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
203
0
    if (NULL == group) {
204
0
      goto failed_alloc;
205
0
    }
206
0
    PR_Lock(mw_lock);
207
0
    if (NULL == mw_state->group) {
208
0
      mw_state->group = group;
209
0
      group = NULL;
210
0
    }
211
0
    PR_Unlock(mw_lock);
212
0
    if (group != NULL) {
213
0
      (void)PR_DestroyWaitGroup(group);
214
0
    }
215
0
    group = mw_state->group; /* somebody beat us to it */
216
0
  }
217
0
failed_alloc:
218
0
  return group; /* whatever */
219
0
} /* MW_Init2 */
220
221
0
static _PR_HashStory MW_AddHashInternal(PRRecvWait* desc, _PRWaiterHash* hash) {
222
  /*
223
  ** The entries are put in the table using the fd (PRFileDesc*) of
224
  ** the receive descriptor as the key. This allows us to locate
225
  ** the appropriate entry aqain when the poll operation finishes.
226
  **
227
  ** The pointer to the file descriptor object is first divided by
228
  ** the natural alignment of a pointer in the belief that object
229
  ** will have at least that many zeros in the low order bits.
230
  ** This may not be a good assuption.
231
  **
232
  ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
233
  ** that we declare defeat and force the table to be reconstructed.
234
  ** Since some fds might be added more than once, won't that cause
235
  ** collisions even in an empty table?
236
  */
237
0
  PRIntn rehash = _MW_REHASH_MAX;
238
0
  PRRecvWait** waiter;
239
0
  PRUintn hidx = _MW_HASH(desc->fd, hash->length);
240
0
  PRUintn hoffset = 0;
241
242
0
  while (rehash-- > 0) {
243
0
    waiter = &hash->recv_wait;
244
0
    if (NULL == waiter[hidx]) {
245
0
      waiter[hidx] = desc;
246
0
      hash->count += 1;
247
#if 0
248
            printf("Adding 0x%x->0x%x ", desc, desc->fd);
249
            printf(
250
                "table[%u:%u:*%u]: 0x%x->0x%x\n",
251
                hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
252
#endif
253
0
      return _prmw_success;
254
0
    }
255
0
    if (desc == waiter[hidx]) {
256
0
      PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */
257
0
      return _prmw_error;
258
0
    }
259
#if 0
260
        printf("Failing 0x%x->0x%x ", desc, desc->fd);
261
        printf(
262
            "table[*%u:%u:%u]: 0x%x->0x%x\n",
263
            hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
264
#endif
265
0
    if (0 == hoffset) {
266
0
      hoffset = _MW_HASH2(desc->fd, hash->length);
267
0
      PR_ASSERT(0 != hoffset);
268
0
    }
269
0
    hidx = (hidx + hoffset) % (hash->length);
270
0
  }
271
0
  return _prmw_rehash;
272
0
} /* MW_AddHashInternal */
273
274
0
static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup* group) {
275
0
  PRRecvWait** desc;
276
0
  PRUint32 pidx, length;
277
0
  _PRWaiterHash *newHash, *oldHash = group->waiter;
278
0
  PRBool retry;
279
0
  _PR_HashStory hrv;
280
281
0
  static const PRInt32 prime_number[] = {_PR_DEFAULT_HASH_LENGTH,
282
0
                                         179,
283
0
                                         521,
284
0
                                         907,
285
0
                                         1427,
286
0
                                         2711,
287
0
                                         3917,
288
0
                                         5021,
289
0
                                         8219,
290
0
                                         11549,
291
0
                                         18911,
292
0
                                         26711,
293
0
                                         33749,
294
0
                                         44771};
295
0
  PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
296
297
  /* look up the next size we'd like to use for the hash table */
298
0
  for (pidx = 0; pidx < primes; ++pidx) {
299
0
    if (prime_number[pidx] == oldHash->length) {
300
0
      break;
301
0
    }
302
0
  }
303
  /* table size must be one of the prime numbers */
304
0
  PR_ASSERT(pidx < primes);
305
306
  /* if pidx == primes - 1, we can't expand the table any more */
307
0
  while (pidx < primes - 1) {
308
    /* next size */
309
0
    ++pidx;
310
0
    length = prime_number[pidx];
311
312
    /* allocate the new hash table and fill it in with the old */
313
0
    newHash = (_PRWaiterHash*)PR_CALLOC(sizeof(_PRWaiterHash) +
314
0
                                        (length * sizeof(PRRecvWait*)));
315
0
    if (NULL == newHash) {
316
0
      PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
317
0
      return _prmw_error;
318
0
    }
319
320
0
    newHash->length = length;
321
0
    retry = PR_FALSE;
322
0
    for (desc = &oldHash->recv_wait; newHash->count < oldHash->count; ++desc) {
323
0
      PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
324
0
      if (NULL != *desc) {
325
0
        hrv = MW_AddHashInternal(*desc, newHash);
326
0
        PR_ASSERT(_prmw_error != hrv);
327
0
        if (_prmw_success != hrv) {
328
0
          PR_DELETE(newHash);
329
0
          retry = PR_TRUE;
330
0
          break;
331
0
        }
332
0
      }
333
0
    }
334
0
    if (retry) {
335
0
      continue;
336
0
    }
337
338
0
    PR_DELETE(group->waiter);
339
0
    group->waiter = newHash;
340
0
    group->p_timestamp += 1;
341
0
    return _prmw_success;
342
0
  }
343
344
0
  PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
345
0
  return _prmw_error; /* we're hosed */
346
0
} /* MW_ExpandHashInternal */
347
348
#ifndef WINNT
349
static void _MW_DoneInternal(PRWaitGroup* group, PRRecvWait** waiter,
350
0
                             PRMWStatus outcome) {
351
  /*
352
  ** Add this receive wait object to the list of finished I/O
353
  ** operations for this particular group. If there are other
354
  ** threads waiting on the group, notify one. If not, arrange
355
  ** for this thread to return.
356
  */
357
358
#  if 0
359
    printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
360
#  endif
361
0
  (*waiter)->outcome = outcome;
362
0
  PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
363
0
  PR_NotifyCondVar(group->io_complete);
364
0
  PR_ASSERT(0 != group->waiter->count);
365
0
  group->waiter->count -= 1;
366
0
  *waiter = NULL;
367
0
} /* _MW_DoneInternal */
368
#endif /* WINNT */
369
370
0
static PRRecvWait** _MW_LookupInternal(PRWaitGroup* group, PRFileDesc* fd) {
371
  /*
372
  ** Find the receive wait object corresponding to the file descriptor.
373
  ** Only search the wait group specified.
374
  */
375
0
  PRRecvWait** desc;
376
0
  PRIntn rehash = _MW_REHASH_MAX;
377
0
  _PRWaiterHash* hash = group->waiter;
378
0
  PRUintn hidx = _MW_HASH(fd, hash->length);
379
0
  PRUintn hoffset = 0;
380
381
0
  while (rehash-- > 0) {
382
0
    desc = (&hash->recv_wait) + hidx;
383
0
    if ((*desc != NULL) && ((*desc)->fd == fd)) {
384
0
      return desc;
385
0
    }
386
0
    if (0 == hoffset) {
387
0
      hoffset = _MW_HASH2(fd, hash->length);
388
0
      PR_ASSERT(0 != hoffset);
389
0
    }
390
0
    hidx = (hidx + hoffset) % (hash->length);
391
0
  }
392
0
  return NULL;
393
0
} /* _MW_LookupInternal */
394
395
#ifndef WINNT
396
0
static PRStatus _MW_PollInternal(PRWaitGroup* group) {
397
0
  PRRecvWait** waiter;
398
0
  PRStatus rv = PR_FAILURE;
399
0
  PRInt32 count, count_ready;
400
0
  PRIntervalTime polling_interval;
401
402
0
  group->poller = PR_GetCurrentThread();
403
404
0
  while (PR_TRUE) {
405
0
    PRIntervalTime now, since_last_poll;
406
0
    PRPollDesc* poll_list;
407
408
0
    while (0 == group->waiter->count) {
409
0
      PRStatus st;
410
0
      st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
411
0
      if (_prmw_running != group->state) {
412
0
        PR_SetError(PR_INVALID_STATE_ERROR, 0);
413
0
        goto aborted;
414
0
      }
415
0
      if (_MW_ABORTED(st)) {
416
0
        goto aborted;
417
0
      }
418
0
    }
419
420
    /*
421
    ** There's something to do. See if our existing polling list
422
    ** is large enough for what we have to do?
423
    */
424
425
0
    while (group->polling_count < group->waiter->count) {
426
0
      PRUint32 old_count = group->waiter->count;
427
0
      PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
428
0
      PRSize new_size = sizeof(PRPollDesc) * new_count;
429
0
      PRPollDesc* old_polling_list = group->polling_list;
430
431
0
      PR_Unlock(group->ml);
432
0
      poll_list = (PRPollDesc*)PR_CALLOC(new_size);
433
0
      if (NULL == poll_list) {
434
0
        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
435
0
        PR_Lock(group->ml);
436
0
        goto failed_alloc;
437
0
      }
438
0
      if (NULL != old_polling_list) {
439
0
        PR_DELETE(old_polling_list);
440
0
      }
441
0
      PR_Lock(group->ml);
442
0
      if (_prmw_running != group->state) {
443
0
        PR_DELETE(poll_list);
444
0
        PR_SetError(PR_INVALID_STATE_ERROR, 0);
445
0
        goto aborted;
446
0
      }
447
0
      group->polling_list = poll_list;
448
0
      group->polling_count = new_count;
449
0
    }
450
451
0
    now = PR_IntervalNow();
452
0
    polling_interval = max_polling_interval;
453
0
    since_last_poll = now - group->last_poll;
454
455
0
    waiter = &group->waiter->recv_wait;
456
0
    poll_list = group->polling_list;
457
0
    for (count = 0; count < group->waiter->count; ++waiter) {
458
0
      PR_ASSERT(waiter < &group->waiter->recv_wait + group->waiter->length);
459
0
      if (NULL != *waiter) /* a live one! */
460
0
      {
461
0
        if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) &&
462
0
            (since_last_poll >= (*waiter)->timeout)) {
463
0
          _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
464
0
        } else {
465
0
          if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) {
466
0
            (*waiter)->timeout -= since_last_poll;
467
0
            if ((*waiter)->timeout < polling_interval) {
468
0
              polling_interval = (*waiter)->timeout;
469
0
            }
470
0
          }
471
0
          PR_ASSERT(poll_list < group->polling_list + group->polling_count);
472
0
          poll_list->fd = (*waiter)->fd;
473
0
          poll_list->in_flags = PR_POLL_READ;
474
0
          poll_list->out_flags = 0;
475
#  if 0
476
                    printf(
477
                        "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
478
                        poll_list, count, poll_list->fd, (*waiter)->timeout);
479
#  endif
480
0
          poll_list += 1;
481
0
          count += 1;
482
0
        }
483
0
      }
484
0
    }
485
486
0
    PR_ASSERT(count == group->waiter->count);
487
488
    /*
489
    ** If there are no more threads waiting for completion,
490
    ** we need to return.
491
    */
492
0
    if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) &&
493
0
        (1 == group->waiting_threads)) {
494
0
      break;
495
0
    }
496
497
0
    if (0 == count) {
498
0
      continue; /* wait for new business */
499
0
    }
500
501
0
    group->last_poll = now;
502
503
0
    PR_Unlock(group->ml);
504
505
0
    count_ready = PR_Poll(group->polling_list, count, polling_interval);
506
507
0
    PR_Lock(group->ml);
508
509
0
    if (_prmw_running != group->state) {
510
0
      PR_SetError(PR_INVALID_STATE_ERROR, 0);
511
0
      goto aborted;
512
0
    }
513
0
    if (-1 == count_ready) {
514
0
      goto failed_poll; /* that's a shame */
515
0
    } else if (0 < count_ready) {
516
0
      for (poll_list = group->polling_list; count > 0; poll_list++, count--) {
517
0
        PR_ASSERT(poll_list < group->polling_list + group->polling_count);
518
0
        if (poll_list->out_flags != 0) {
519
0
          waiter = _MW_LookupInternal(group, poll_list->fd);
520
          /*
521
          ** If 'waiter' is NULL, that means the wait receive
522
          ** descriptor has been canceled.
523
          */
524
0
          if (NULL != waiter) {
525
0
            _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
526
0
          }
527
0
        }
528
0
      }
529
0
    }
530
    /*
531
    ** If there are no more threads waiting for completion,
532
    ** we need to return.
533
    ** This thread was "borrowed" to do the polling, but it really
534
    ** belongs to the client.
535
    */
536
0
    if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) &&
537
0
        (1 == group->waiting_threads)) {
538
0
      break;
539
0
    }
540
0
  }
541
542
0
  rv = PR_SUCCESS;
543
544
0
aborted:
545
0
failed_poll:
546
0
failed_alloc:
547
0
  group->poller = NULL; /* we were that, not we ain't */
548
0
  if ((_prmw_running == group->state) && (group->waiting_threads > 1)) {
549
    /* Wake up one thread to become the new poller. */
550
0
    PR_NotifyCondVar(group->io_complete);
551
0
  }
552
0
  return rv; /* we return with the lock held */
553
0
} /* _MW_PollInternal */
554
#endif /* !WINNT */
555
556
0
static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup* group) {
557
0
  PRMWGroupState rv = group->state;
558
  /*
559
  ** Looking at the group's fields is safe because
560
  ** once the group's state is no longer running, it
561
  ** cannot revert and there is a safe check on entry
562
  ** to make sure no more threads are made to wait.
563
  */
564
0
  if ((_prmw_stopping == rv) && (0 == group->waiting_threads)) {
565
0
    rv = group->state = _prmw_stopped;
566
0
    PR_NotifyCondVar(group->mw_manage);
567
0
  }
568
0
  return rv;
569
0
} /* MW_TestForShutdownInternal */
570
571
#ifndef WINNT
572
0
static void _MW_InitialRecv(PRCList* io_ready) {
573
0
  PRRecvWait* desc = (PRRecvWait*)io_ready;
574
0
  if ((NULL == desc->buffer.start) || (0 == desc->buffer.length)) {
575
0
    desc->bytesRecv = 0;
576
0
  } else {
577
0
    desc->bytesRecv = (desc->fd->methods->recv)(
578
0
        desc->fd, desc->buffer.start, desc->buffer.length, 0, desc->timeout);
579
0
    if (desc->bytesRecv < 0) { /* SetError should already be there */
580
0
      desc->outcome = PR_MW_FAILURE;
581
0
    }
582
0
  }
583
0
} /* _MW_InitialRecv */
584
#endif
585
586
#ifdef WINNT
587
static void NT_TimeProc(void* arg) {
588
  _MDOverlapped* overlapped = (_MDOverlapped*)arg;
589
  PRRecvWait* desc = overlapped->data.mw.desc;
590
  PRFileDesc* bottom;
591
592
  if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_TIMEOUT,
593
                                 (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING) {
594
    /* This wait recv descriptor has already completed. */
595
    return;
596
  }
597
598
  /* close the osfd to abort the outstanding async io request */
599
  /* $$$$
600
  ** Little late to be checking if NSPR's on the bottom of stack,
601
  ** but if we don't check, we can't assert that the private data
602
  ** is what we think it is.
603
  ** $$$$
604
  */
605
  bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
606
  PR_ASSERT(NULL != bottom);
607
  if (NULL != bottom) /* now what!?!?! */
608
  {
609
    bottom->secret->state = _PR_FILEDESC_CLOSED;
610
    if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
611
      fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
612
      PR_NOT_REACHED("What shall I do?");
613
    }
614
  }
615
  return;
616
} /* NT_TimeProc */
617
618
static PRStatus NT_HashRemove(PRWaitGroup* group, PRFileDesc* fd) {
619
  PRRecvWait** waiter;
620
621
  _PR_MD_LOCK(&group->mdlock);
622
  waiter = _MW_LookupInternal(group, fd);
623
  if (NULL != waiter) {
624
    group->waiter->count -= 1;
625
    *waiter = NULL;
626
  }
627
  _PR_MD_UNLOCK(&group->mdlock);
628
  return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
629
}
630
631
PRStatus NT_HashRemoveInternal(PRWaitGroup* group, PRFileDesc* fd) {
632
  PRRecvWait** waiter;
633
634
  waiter = _MW_LookupInternal(group, fd);
635
  if (NULL != waiter) {
636
    group->waiter->count -= 1;
637
    *waiter = NULL;
638
  }
639
  return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
640
}
641
#endif /* WINNT */
642
643
/******************************************************************/
644
/******************************************************************/
645
/********************** The public API portion ********************/
646
/******************************************************************/
647
/******************************************************************/
648
PR_IMPLEMENT(PRStatus)
649
0
PR_AddWaitFileDesc(PRWaitGroup* group, PRRecvWait* desc) {
650
0
  _PR_HashStory hrv;
651
0
  PRStatus rv = PR_FAILURE;
652
#ifdef WINNT
653
  _MDOverlapped* overlapped;
654
  HANDLE hFile;
655
  BOOL bResult;
656
  DWORD dwError;
657
  PRFileDesc* bottom;
658
#endif
659
660
0
  if (!_pr_initialized) {
661
0
    _PR_ImplicitInitialization();
662
0
  }
663
0
  if ((NULL == group) && (NULL == (group = MW_Init2()))) {
664
0
    return rv;
665
0
  }
666
667
0
  PR_ASSERT(NULL != desc->fd);
668
669
0
  desc->outcome = PR_MW_PENDING; /* nice, well known value */
670
0
  desc->bytesRecv = 0;           /* likewise, though this value is ambiguious */
671
672
0
  PR_Lock(group->ml);
673
674
0
  if (_prmw_running != group->state) {
675
    /* Not allowed to add after cancelling the group */
676
0
    desc->outcome = PR_MW_INTERRUPT;
677
0
    PR_SetError(PR_INVALID_STATE_ERROR, 0);
678
0
    PR_Unlock(group->ml);
679
0
    return rv;
680
0
  }
681
682
#ifdef WINNT
683
  _PR_MD_LOCK(&group->mdlock);
684
#endif
685
686
  /*
687
  ** If the waiter count is zero at this point, there's no telling
688
  ** how long we've been idle. Therefore, initialize the beginning
689
  ** of the timing interval. As long as the list doesn't go empty,
690
  ** it will maintain itself.
691
  */
692
0
  if (0 == group->waiter->count) {
693
0
    group->last_poll = PR_IntervalNow();
694
0
  }
695
696
0
  do {
697
0
    hrv = MW_AddHashInternal(desc, group->waiter);
698
0
    if (_prmw_rehash != hrv) {
699
0
      break;
700
0
    }
701
0
    hrv = MW_ExpandHashInternal(group); /* gruesome */
702
0
    if (_prmw_success != hrv) {
703
0
      break;
704
0
    }
705
0
  } while (PR_TRUE);
706
707
#ifdef WINNT
708
  _PR_MD_UNLOCK(&group->mdlock);
709
#endif
710
711
0
  PR_NotifyCondVar(group->new_business); /* tell the world */
712
0
  rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
713
0
  PR_Unlock(group->ml);
714
715
#ifdef WINNT
716
  overlapped = PR_NEWZAP(_MDOverlapped);
717
  if (NULL == overlapped) {
718
    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
719
    NT_HashRemove(group, desc->fd);
720
    return rv;
721
  }
722
  overlapped->ioModel = _MD_MultiWaitIO;
723
  overlapped->data.mw.desc = desc;
724
  overlapped->data.mw.group = group;
725
  if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) {
726
    overlapped->data.mw.timer =
727
        CreateTimer(desc->timeout, NT_TimeProc, overlapped);
728
    if (0 == overlapped->data.mw.timer) {
729
      NT_HashRemove(group, desc->fd);
730
      PR_DELETE(overlapped);
731
      /*
732
       * XXX It appears that a maximum of 16 timer events can
733
       * be outstanding. GetLastError() returns 0 when I try it.
734
       */
735
      PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
736
      return PR_FAILURE;
737
    }
738
  }
739
740
  /* Reach to the bottom layer to get the OS fd */
741
  bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
742
  PR_ASSERT(NULL != bottom);
743
  if (NULL == bottom) {
744
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
745
    return PR_FAILURE;
746
  }
747
  hFile = (HANDLE)bottom->secret->md.osfd;
748
  if (!bottom->secret->md.io_model_committed) {
749
    PRInt32 st;
750
    st = _md_Associate(hFile);
751
    PR_ASSERT(0 != st);
752
    bottom->secret->md.io_model_committed = PR_TRUE;
753
  }
754
  bResult = ReadFile(hFile, desc->buffer.start, (DWORD)desc->buffer.length,
755
                     NULL, &overlapped->overlapped);
756
  if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING) {
757
    if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) {
758
      if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_FAILURE,
759
                                     (LONG)PR_MW_PENDING) ==
760
          (LONG)PR_MW_PENDING) {
761
        CancelTimer(overlapped->data.mw.timer);
762
      }
763
      NT_HashRemove(group, desc->fd);
764
      PR_DELETE(overlapped);
765
    }
766
    _PR_MD_MAP_READ_ERROR(dwError);
767
    rv = PR_FAILURE;
768
  }
769
#endif
770
771
0
  return rv;
772
0
} /* PR_AddWaitFileDesc */
773
774
0
PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup* group) {
775
0
  PRCList* io_ready = NULL;
776
#ifdef WINNT
777
  PRThread* me = _PR_MD_CURRENT_THREAD();
778
  _MDOverlapped* overlapped;
779
#endif
780
781
0
  if (!_pr_initialized) {
782
0
    _PR_ImplicitInitialization();
783
0
  }
784
0
  if ((NULL == group) && (NULL == (group = MW_Init2()))) {
785
0
    goto failed_init;
786
0
  }
787
788
0
  PR_Lock(group->ml);
789
790
0
  if (_prmw_running != group->state) {
791
0
    PR_SetError(PR_INVALID_STATE_ERROR, 0);
792
0
    goto invalid_state;
793
0
  }
794
795
0
  group->waiting_threads += 1; /* the polling thread is counted */
796
797
#ifdef WINNT
798
  _PR_MD_LOCK(&group->mdlock);
799
  while (PR_CLIST_IS_EMPTY(&group->io_ready)) {
800
    _PR_THREAD_LOCK(me);
801
    me->state = _PR_IO_WAIT;
802
    PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
803
    if (!_PR_IS_NATIVE_THREAD(me)) {
804
      _PR_SLEEPQ_LOCK(me->cpu);
805
      _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
806
      _PR_SLEEPQ_UNLOCK(me->cpu);
807
    }
808
    _PR_THREAD_UNLOCK(me);
809
    _PR_MD_UNLOCK(&group->mdlock);
810
    PR_Unlock(group->ml);
811
    _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
812
    me->state = _PR_RUNNING;
813
    PR_Lock(group->ml);
814
    _PR_MD_LOCK(&group->mdlock);
815
    if (_PR_PENDING_INTERRUPT(me)) {
816
      PR_REMOVE_LINK(&me->waitQLinks);
817
      _PR_MD_UNLOCK(&group->mdlock);
818
      me->flags &= ~_PR_INTERRUPT;
819
      me->io_suspended = PR_FALSE;
820
      PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
821
      goto aborted;
822
    }
823
  }
824
  io_ready = PR_LIST_HEAD(&group->io_ready);
825
  PR_ASSERT(io_ready != NULL);
826
  PR_REMOVE_LINK(io_ready);
827
  _PR_MD_UNLOCK(&group->mdlock);
828
  overlapped =
829
      (_MDOverlapped*)((char*)io_ready - offsetof(_MDOverlapped, data));
830
  io_ready = &overlapped->data.mw.desc->internal;
831
#else
832
0
  do {
833
    /*
834
    ** If the I/O ready list isn't empty, have this thread
835
    ** return with the first receive wait object that's available.
836
    */
837
0
    if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
838
      /*
839
      ** Is there a polling thread yet? If not, grab this thread
840
      ** and use it.
841
      */
842
0
      if (NULL == group->poller) {
843
        /*
844
        ** This thread will stay do polling until it becomes the only one
845
        ** left to service a completion. Then it will return and there will
846
        ** be none left to actually poll or to run completions.
847
        **
848
        ** The polling function should only return w/ failure or
849
        ** with some I/O ready.
850
        */
851
0
        if (PR_FAILURE == _MW_PollInternal(group)) {
852
0
          goto failed_poll;
853
0
        }
854
0
      } else {
855
        /*
856
        ** There are four reasons a thread can be awakened from
857
        ** a wait on the io_complete condition variable.
858
        ** 1. Some I/O has completed, i.e., the io_ready list
859
        **    is nonempty.
860
        ** 2. The wait group is canceled.
861
        ** 3. The thread is interrupted.
862
        ** 4. The current polling thread has to leave and needs
863
        **    a replacement.
864
        ** The logic to find a new polling thread is made more
865
        ** complicated by all the other possible events.
866
        ** I tried my best to write the logic clearly, but
867
        ** it is still full of if's with continue and goto.
868
        */
869
0
        PRStatus st;
870
0
        do {
871
0
          st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
872
0
          if (_prmw_running != group->state) {
873
0
            PR_SetError(PR_INVALID_STATE_ERROR, 0);
874
0
            goto aborted;
875
0
          }
876
0
          if (_MW_ABORTED(st) || (NULL == group->poller)) {
877
0
            break;
878
0
          }
879
0
        } while (PR_CLIST_IS_EMPTY(&group->io_ready));
880
881
        /*
882
        ** The thread is interrupted and has to leave.  It might
883
        ** have also been awakened to process ready i/o or be the
884
        ** new poller.  To be safe, if either condition is true,
885
        ** we awaken another thread to take its place.
886
        */
887
0
        if (_MW_ABORTED(st)) {
888
0
          if ((NULL == group->poller || !PR_CLIST_IS_EMPTY(&group->io_ready)) &&
889
0
              group->waiting_threads > 1) {
890
0
            PR_NotifyCondVar(group->io_complete);
891
0
          }
892
0
          goto aborted;
893
0
        }
894
895
        /*
896
        ** A new poller is needed, but can I be the new poller?
897
        ** If there is no i/o ready, sure.  But if there is any
898
        ** i/o ready, it has a higher priority.  I want to
899
        ** process the ready i/o first and wake up another
900
        ** thread to be the new poller.
901
        */
902
0
        if (NULL == group->poller) {
903
0
          if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
904
0
            continue;
905
0
          }
906
0
          if (group->waiting_threads > 1) {
907
0
            PR_NotifyCondVar(group->io_complete);
908
0
          }
909
0
        }
910
0
      }
911
0
      PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
912
0
    }
913
0
    io_ready = PR_LIST_HEAD(&group->io_ready);
914
0
    PR_NotifyCondVar(group->io_taken);
915
0
    PR_ASSERT(io_ready != NULL);
916
0
    PR_REMOVE_LINK(io_ready);
917
0
  } while (NULL == io_ready);
918
919
0
failed_poll:
920
921
0
#endif
922
923
0
aborted:
924
925
0
  group->waiting_threads -= 1;
926
0
invalid_state:
927
0
  (void)MW_TestForShutdownInternal(group);
928
0
  PR_Unlock(group->ml);
929
930
0
failed_init:
931
0
  if (NULL != io_ready) {
932
    /* If the operation failed, record the reason why */
933
0
    switch (((PRRecvWait*)io_ready)->outcome) {
934
0
      case PR_MW_PENDING:
935
0
        PR_ASSERT(0);
936
0
        break;
937
0
      case PR_MW_SUCCESS:
938
0
#ifndef WINNT
939
0
        _MW_InitialRecv(io_ready);
940
0
#endif
941
0
        break;
942
#ifdef WINNT
943
      case PR_MW_FAILURE:
944
        _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
945
        break;
946
#endif
947
0
      case PR_MW_TIMEOUT:
948
0
        PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
949
0
        break;
950
0
      case PR_MW_INTERRUPT:
951
0
        PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
952
0
        break;
953
0
      default:
954
0
        break;
955
0
    }
956
#ifdef WINNT
957
    if (NULL != overlapped->data.mw.timer) {
958
      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout);
959
      CancelTimer(overlapped->data.mw.timer);
960
    } else {
961
      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout);
962
    }
963
    PR_DELETE(overlapped);
964
#endif
965
0
  }
966
0
  return (PRRecvWait*)io_ready;
967
0
} /* PR_WaitRecvReady */
968
969
PR_IMPLEMENT(PRStatus)
970
0
PR_CancelWaitFileDesc(PRWaitGroup* group, PRRecvWait* desc) {
971
0
#if !defined(WINNT)
972
0
  PRRecvWait** recv_wait;
973
0
#endif
974
0
  PRStatus rv = PR_SUCCESS;
975
0
  if (NULL == group) {
976
0
    group = mw_state->group;
977
0
  }
978
0
  PR_ASSERT(NULL != group);
979
0
  if (NULL == group) {
980
0
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
981
0
    return PR_FAILURE;
982
0
  }
983
984
0
  PR_Lock(group->ml);
985
986
0
  if (_prmw_running != group->state) {
987
0
    PR_SetError(PR_INVALID_STATE_ERROR, 0);
988
0
    rv = PR_FAILURE;
989
0
    goto unlock;
990
0
  }
991
992
#ifdef WINNT
993
  if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_INTERRUPT,
994
                                 (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) {
995
    PRFileDesc* bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
996
    PR_ASSERT(NULL != bottom);
997
    if (NULL == bottom) {
998
      PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
999
      goto unlock;
1000
    }
1001
    bottom->secret->state = _PR_FILEDESC_CLOSED;
1002
#  if 0
1003
        fprintf(stderr, "cancel wait recv: closing socket\n");
1004
#  endif
1005
    if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
1006
      fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
1007
      exit(1);
1008
    }
1009
  }
1010
#else
1011
0
  if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd))) {
1012
    /* it was in the wait table */
1013
0
    _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
1014
0
    goto unlock;
1015
0
  }
1016
0
  if (!PR_CLIST_IS_EMPTY(&group->io_ready)) {
1017
    /* is it already complete? */
1018
0
    PRCList* head = PR_LIST_HEAD(&group->io_ready);
1019
0
    do {
1020
0
      PRRecvWait* done = (PRRecvWait*)head;
1021
0
      if (done == desc) {
1022
0
        goto unlock;
1023
0
      }
1024
0
      head = PR_NEXT_LINK(head);
1025
0
    } while (head != &group->io_ready);
1026
0
  }
1027
0
  PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1028
0
  rv = PR_FAILURE;
1029
1030
0
#endif
1031
0
unlock:
1032
0
  PR_Unlock(group->ml);
1033
0
  return rv;
1034
0
} /* PR_CancelWaitFileDesc */
1035
1036
0
PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup* group) {
1037
0
  PRRecvWait** desc;
1038
0
  PRRecvWait* recv_wait = NULL;
1039
#ifdef WINNT
1040
  _MDOverlapped* overlapped;
1041
  PRRecvWait** end;
1042
  PRThread* me = _PR_MD_CURRENT_THREAD();
1043
#endif
1044
1045
0
  if (NULL == group) {
1046
0
    group = mw_state->group;
1047
0
  }
1048
0
  PR_ASSERT(NULL != group);
1049
0
  if (NULL == group) {
1050
0
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1051
0
    return NULL;
1052
0
  }
1053
1054
0
  PR_Lock(group->ml);
1055
0
  if (_prmw_stopped != group->state) {
1056
0
    if (_prmw_running == group->state) {
1057
0
      group->state = _prmw_stopping; /* so nothing new comes in */
1058
0
    }
1059
0
    if (0 == group->waiting_threads) { /* is there anybody else? */
1060
0
      group->state = _prmw_stopped;    /* we can stop right now */
1061
0
    } else {
1062
0
      PR_NotifyAllCondVar(group->new_business);
1063
0
      PR_NotifyAllCondVar(group->io_complete);
1064
0
    }
1065
0
    while (_prmw_stopped != group->state) {
1066
0
      (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
1067
0
    }
1068
0
  }
1069
1070
#ifdef WINNT
1071
  _PR_MD_LOCK(&group->mdlock);
1072
#endif
1073
  /* make all the existing descriptors look done/interrupted */
1074
#ifdef WINNT
1075
  end = &group->waiter->recv_wait + group->waiter->length;
1076
  for (desc = &group->waiter->recv_wait; desc < end; ++desc) {
1077
    if (NULL != *desc) {
1078
      if (InterlockedCompareExchange(
1079
              (LONG*)&(*desc)->outcome, (LONG)PR_MW_INTERRUPT,
1080
              (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) {
1081
        PRFileDesc* bottom =
1082
            PR_GetIdentitiesLayer((*desc)->fd, PR_NSPR_IO_LAYER);
1083
        PR_ASSERT(NULL != bottom);
1084
        if (NULL == bottom) {
1085
          PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1086
          goto invalid_arg;
1087
        }
1088
        bottom->secret->state = _PR_FILEDESC_CLOSED;
1089
#  if 0
1090
                fprintf(stderr, "cancel wait group: closing socket\n");
1091
#  endif
1092
        if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
1093
          fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
1094
          exit(1);
1095
        }
1096
      }
1097
    }
1098
  }
1099
  while (group->waiter->count > 0) {
1100
    _PR_THREAD_LOCK(me);
1101
    me->state = _PR_IO_WAIT;
1102
    PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
1103
    if (!_PR_IS_NATIVE_THREAD(me)) {
1104
      _PR_SLEEPQ_LOCK(me->cpu);
1105
      _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
1106
      _PR_SLEEPQ_UNLOCK(me->cpu);
1107
    }
1108
    _PR_THREAD_UNLOCK(me);
1109
    _PR_MD_UNLOCK(&group->mdlock);
1110
    PR_Unlock(group->ml);
1111
    _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
1112
    me->state = _PR_RUNNING;
1113
    PR_Lock(group->ml);
1114
    _PR_MD_LOCK(&group->mdlock);
1115
  }
1116
#else
1117
0
  for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc) {
1118
0
    PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
1119
0
    if (NULL != *desc) {
1120
0
      _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
1121
0
    }
1122
0
  }
1123
0
#endif
1124
1125
  /* take first element of finished list and return it or NULL */
1126
0
  if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
1127
0
    PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1128
0
  } else {
1129
0
    PRCList* head = PR_LIST_HEAD(&group->io_ready);
1130
0
    PR_REMOVE_AND_INIT_LINK(head);
1131
#ifdef WINNT
1132
    overlapped = (_MDOverlapped*)((char*)head - offsetof(_MDOverlapped, data));
1133
    head = &overlapped->data.mw.desc->internal;
1134
    if (NULL != overlapped->data.mw.timer) {
1135
      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout);
1136
      CancelTimer(overlapped->data.mw.timer);
1137
    } else {
1138
      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout);
1139
    }
1140
    PR_DELETE(overlapped);
1141
#endif
1142
0
    recv_wait = (PRRecvWait*)head;
1143
0
  }
1144
#ifdef WINNT
1145
invalid_arg:
1146
  _PR_MD_UNLOCK(&group->mdlock);
1147
#endif
1148
0
  PR_Unlock(group->ml);
1149
1150
0
  return recv_wait;
1151
0
} /* PR_CancelWaitGroup */
1152
1153
0
PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */) {
1154
0
  PRWaitGroup* wg;
1155
1156
0
  if (NULL == (wg = PR_NEWZAP(PRWaitGroup))) {
1157
0
    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1158
0
    goto failed;
1159
0
  }
1160
  /* the wait group itself */
1161
0
  wg->ml = PR_NewLock();
1162
0
  if (NULL == wg->ml) {
1163
0
    goto failed_lock;
1164
0
  }
1165
0
  wg->io_taken = PR_NewCondVar(wg->ml);
1166
0
  if (NULL == wg->io_taken) {
1167
0
    goto failed_cvar0;
1168
0
  }
1169
0
  wg->io_complete = PR_NewCondVar(wg->ml);
1170
0
  if (NULL == wg->io_complete) {
1171
0
    goto failed_cvar1;
1172
0
  }
1173
0
  wg->new_business = PR_NewCondVar(wg->ml);
1174
0
  if (NULL == wg->new_business) {
1175
0
    goto failed_cvar2;
1176
0
  }
1177
0
  wg->mw_manage = PR_NewCondVar(wg->ml);
1178
0
  if (NULL == wg->mw_manage) {
1179
0
    goto failed_cvar3;
1180
0
  }
1181
1182
0
  PR_INIT_CLIST(&wg->group_link);
1183
0
  PR_INIT_CLIST(&wg->io_ready);
1184
1185
  /* the waiters sequence */
1186
0
  wg->waiter = (_PRWaiterHash*)PR_CALLOC(
1187
0
      sizeof(_PRWaiterHash) + (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
1188
0
  if (NULL == wg->waiter) {
1189
0
    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1190
0
    goto failed_waiter;
1191
0
  }
1192
0
  wg->waiter->count = 0;
1193
0
  wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
1194
1195
#ifdef WINNT
1196
  _PR_MD_NEW_LOCK(&wg->mdlock);
1197
  PR_INIT_CLIST(&wg->wait_list);
1198
#endif /* WINNT */
1199
1200
0
  PR_Lock(mw_lock);
1201
0
  PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
1202
0
  PR_Unlock(mw_lock);
1203
0
  return wg;
1204
1205
0
failed_waiter:
1206
0
  PR_DestroyCondVar(wg->mw_manage);
1207
0
failed_cvar3:
1208
0
  PR_DestroyCondVar(wg->new_business);
1209
0
failed_cvar2:
1210
0
  PR_DestroyCondVar(wg->io_complete);
1211
0
failed_cvar1:
1212
0
  PR_DestroyCondVar(wg->io_taken);
1213
0
failed_cvar0:
1214
0
  PR_DestroyLock(wg->ml);
1215
0
failed_lock:
1216
0
  PR_DELETE(wg);
1217
0
  wg = NULL;
1218
1219
0
failed:
1220
0
  return wg;
1221
0
} /* MW_CreateWaitGroup */
1222
1223
0
PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup* group) {
1224
0
  PRStatus rv = PR_SUCCESS;
1225
0
  if (NULL == group) {
1226
0
    group = mw_state->group;
1227
0
  }
1228
0
  PR_ASSERT(NULL != group);
1229
0
  if (NULL != group) {
1230
0
    PR_Lock(group->ml);
1231
0
    if ((group->waiting_threads == 0) && (group->waiter->count == 0) &&
1232
0
        PR_CLIST_IS_EMPTY(&group->io_ready)) {
1233
0
      group->state = _prmw_stopped;
1234
0
    } else {
1235
0
      PR_SetError(PR_INVALID_STATE_ERROR, 0);
1236
0
      rv = PR_FAILURE;
1237
0
    }
1238
0
    PR_Unlock(group->ml);
1239
0
    if (PR_FAILURE == rv) {
1240
0
      return rv;
1241
0
    }
1242
1243
0
    PR_Lock(mw_lock);
1244
0
    PR_REMOVE_LINK(&group->group_link);
1245
0
    PR_Unlock(mw_lock);
1246
1247
#ifdef WINNT
1248
    /*
1249
     * XXX make sure wait_list is empty and waiter is empty.
1250
     * These must be checked while holding mdlock.
1251
     */
1252
    _PR_MD_FREE_LOCK(&group->mdlock);
1253
#endif
1254
1255
0
    PR_DELETE(group->waiter);
1256
0
    PR_DELETE(group->polling_list);
1257
0
    PR_DestroyCondVar(group->mw_manage);
1258
0
    PR_DestroyCondVar(group->new_business);
1259
0
    PR_DestroyCondVar(group->io_complete);
1260
0
    PR_DestroyCondVar(group->io_taken);
1261
0
    PR_DestroyLock(group->ml);
1262
0
    if (group == mw_state->group) {
1263
0
      mw_state->group = NULL;
1264
0
    }
1265
0
    PR_DELETE(group);
1266
0
  } else {
1267
    /* The default wait group is not created yet. */
1268
0
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1269
0
    rv = PR_FAILURE;
1270
0
  }
1271
0
  return rv;
1272
0
} /* PR_DestroyWaitGroup */
1273
1274
/**********************************************************************
1275
***********************************************************************
1276
******************** Wait group enumerations **************************
1277
***********************************************************************
1278
**********************************************************************/
1279
1280
0
PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup* group) {
1281
0
  PRMWaitEnumerator* enumerator = PR_NEWZAP(PRMWaitEnumerator);
1282
0
  if (NULL == enumerator) {
1283
0
    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1284
0
  } else {
1285
0
    enumerator->group = group;
1286
0
    enumerator->seal = _PR_ENUM_SEALED;
1287
0
  }
1288
0
  return enumerator;
1289
0
} /* PR_CreateMWaitEnumerator */
1290
1291
PR_IMPLEMENT(PRStatus)
1292
0
PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator) {
1293
0
  PR_ASSERT(NULL != enumerator);
1294
0
  PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1295
0
  if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) {
1296
0
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1297
0
    return PR_FAILURE;
1298
0
  }
1299
0
  enumerator->seal = _PR_ENUM_UNSEALED;
1300
0
  PR_Free(enumerator);
1301
0
  return PR_SUCCESS;
1302
0
} /* PR_DestroyMWaitEnumerator */
1303
1304
PR_IMPLEMENT(PRRecvWait*)
1305
PR_EnumerateWaitGroup(PRMWaitEnumerator* enumerator,
1306
0
                      const PRRecvWait* previous) {
1307
0
  PRRecvWait* result = NULL;
1308
1309
  /* entry point sanity checking */
1310
0
  PR_ASSERT(NULL != enumerator);
1311
0
  PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1312
0
  if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) {
1313
0
    goto bad_argument;
1314
0
  }
1315
1316
  /* beginning of enumeration */
1317
0
  if (NULL == previous) {
1318
0
    if (NULL == enumerator->group) {
1319
0
      enumerator->group = mw_state->group;
1320
0
      if (NULL == enumerator->group) {
1321
0
        PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1322
0
        return NULL;
1323
0
      }
1324
0
    }
1325
0
    enumerator->waiter = &enumerator->group->waiter->recv_wait;
1326
0
    enumerator->p_timestamp = enumerator->group->p_timestamp;
1327
0
    enumerator->thread = PR_GetCurrentThread();
1328
0
    enumerator->index = 0;
1329
0
  }
1330
  /* continuing an enumeration */
1331
0
  else {
1332
0
    PRThread* me = PR_GetCurrentThread();
1333
0
    PR_ASSERT(me == enumerator->thread);
1334
0
    if (me != enumerator->thread) {
1335
0
      goto bad_argument;
1336
0
    }
1337
1338
    /* need to restart the enumeration */
1339
0
    if (enumerator->p_timestamp != enumerator->group->p_timestamp) {
1340
0
      return PR_EnumerateWaitGroup(enumerator, NULL);
1341
0
    }
1342
0
  }
1343
1344
  /* actually progress the enumeration */
1345
#if defined(WINNT)
1346
  _PR_MD_LOCK(&enumerator->group->mdlock);
1347
#else
1348
0
  PR_Lock(enumerator->group->ml);
1349
0
#endif
1350
0
  while (enumerator->index++ < enumerator->group->waiter->length) {
1351
0
    if (NULL != (result = *(enumerator->waiter)++)) {
1352
0
      break;
1353
0
    }
1354
0
  }
1355
#if defined(WINNT)
1356
  _PR_MD_UNLOCK(&enumerator->group->mdlock);
1357
#else
1358
0
  PR_Unlock(enumerator->group->ml);
1359
0
#endif
1360
1361
0
  return result; /* what we live for */
1362
1363
0
bad_argument:
1364
0
  PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1365
0
  return NULL; /* probably ambiguous */
1366
0
} /* PR_EnumerateWaitGroup */
1367
1368
/* prmwait.c */