Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/xpcom/io/nsPipe3.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this
5
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include <algorithm>
8
#include "mozilla/Attributes.h"
9
#include "mozilla/IntegerPrintfMacros.h"
10
#include "mozilla/ReentrantMonitor.h"
11
#include "nsIBufferedStreams.h"
12
#include "nsICloneableInputStream.h"
13
#include "nsIPipe.h"
14
#include "nsIEventTarget.h"
15
#include "nsISeekableStream.h"
16
#include "mozilla/RefPtr.h"
17
#include "nsSegmentedBuffer.h"
18
#include "nsStreamUtils.h"
19
#include "nsCOMPtr.h"
20
#include "nsCRT.h"
21
#include "mozilla/Logging.h"
22
#include "nsIClassInfoImpl.h"
23
#include "nsAlgorithm.h"
24
#include "nsMemory.h"
25
#include "nsIAsyncInputStream.h"
26
#include "nsIAsyncOutputStream.h"
27
28
using namespace mozilla;
29
30
#ifdef LOG
31
#undef LOG
32
#endif
33
//
34
// set MOZ_LOG=nsPipe:5
35
//
36
static LazyLogModule sPipeLog("nsPipe");
37
0
#define LOG(args) MOZ_LOG(sPipeLog, mozilla::LogLevel::Debug, args)
38
39
0
#define DEFAULT_SEGMENT_SIZE  4096
40
0
#define DEFAULT_SEGMENT_COUNT 16
41
42
class nsPipe;
43
class nsPipeEvents;
44
class nsPipeInputStream;
45
class nsPipeOutputStream;
46
class AutoReadSegment;
47
48
namespace {
49
50
enum MonitorAction
51
{
52
  DoNotNotifyMonitor,
53
  NotifyMonitor
54
};
55
56
enum SegmentChangeResult
57
{
58
  SegmentNotChanged,
59
  SegmentAdvanceBufferRead
60
};
61
62
} // namespace
63
64
//-----------------------------------------------------------------------------
65
66
// this class is used to delay notifications until the end of a particular
67
// scope.  it helps avoid the complexity of issuing callbacks while inside
68
// a critical section.
69
class nsPipeEvents
70
{
71
public:
72
0
  nsPipeEvents() { }
73
  ~nsPipeEvents();
74
75
  inline void NotifyInputReady(nsIAsyncInputStream* aStream,
76
                               nsIInputStreamCallback* aCallback)
77
0
  {
78
0
    mInputList.AppendElement(InputEntry(aStream, aCallback));
79
0
  }
80
81
  inline void NotifyOutputReady(nsIAsyncOutputStream* aStream,
82
                                nsIOutputStreamCallback* aCallback)
83
0
  {
84
0
    MOZ_DIAGNOSTIC_ASSERT(!mOutputCallback);
85
0
    mOutputStream = aStream;
86
0
    mOutputCallback = aCallback;
87
0
  }
88
89
private:
90
  struct InputEntry
91
  {
92
    InputEntry(nsIAsyncInputStream* aStream, nsIInputStreamCallback* aCallback)
93
      : mStream(aStream)
94
      , mCallback(aCallback)
95
0
    {
96
0
      MOZ_DIAGNOSTIC_ASSERT(mStream);
97
0
      MOZ_DIAGNOSTIC_ASSERT(mCallback);
98
0
    }
99
100
    nsCOMPtr<nsIAsyncInputStream> mStream;
101
    nsCOMPtr<nsIInputStreamCallback> mCallback;
102
  };
103
104
  nsTArray<InputEntry> mInputList;
105
106
  nsCOMPtr<nsIAsyncOutputStream>    mOutputStream;
107
  nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
108
};
109
110
//-----------------------------------------------------------------------------
111
112
// This class is used to maintain input stream state.  Its broken out from the
113
// nsPipeInputStream class because generally the nsPipe should be modifying
114
// this state and not the input stream itself.
115
struct nsPipeReadState
116
{
117
  nsPipeReadState()
118
    : mReadCursor(nullptr)
119
    , mReadLimit(nullptr)
120
    , mSegment(0)
121
    , mAvailable(0)
122
    , mActiveRead(false)
123
    , mNeedDrain(false)
124
0
  { }
125
126
  char*    mReadCursor;
127
  char*    mReadLimit;
128
  int32_t  mSegment;
129
  uint32_t mAvailable;
130
131
  // This flag is managed using the AutoReadSegment RAII stack class.
132
  bool     mActiveRead;
133
134
  // Set to indicate that the input stream has closed and should be drained,
135
  // but that drain has been delayed due to an active read.  When the read
136
  // completes, this flag indicate the drain should then be performed.
137
  bool     mNeedDrain;
138
};
139
140
//-----------------------------------------------------------------------------
141
142
// an input end of a pipe (maintained as a list of refs within the pipe)
143
class nsPipeInputStream final
144
  : public nsIAsyncInputStream
145
  , public nsISeekableStream
146
  , public nsISearchableInputStream
147
  , public nsICloneableInputStream
148
  , public nsIClassInfo
149
  , public nsIBufferedInputStream
150
{
151
public:
152
  // Pipe input streams preserve their refcount changes when record/replaying,
153
  // as otherwise the thread which destroys the stream may vary between
154
  // recording and replaying.
155
  NS_DECL_THREADSAFE_ISUPPORTS_WITH_RECORDING(recordreplay::Behavior::Preserve)
156
  NS_DECL_NSIINPUTSTREAM
157
  NS_DECL_NSIASYNCINPUTSTREAM
158
  NS_DECL_NSISEEKABLESTREAM
159
  NS_DECL_NSISEARCHABLEINPUTSTREAM
160
  NS_DECL_NSICLONEABLEINPUTSTREAM
161
  NS_DECL_NSICLASSINFO
162
  NS_DECL_NSIBUFFEREDINPUTSTREAM
163
164
  explicit nsPipeInputStream(nsPipe* aPipe)
165
    : mPipe(aPipe)
166
    , mLogicalOffset(0)
167
    , mInputStatus(NS_OK)
168
    , mBlocking(true)
169
    , mBlocked(false)
170
    , mCallbackFlags(0)
171
0
  { }
172
173
  explicit nsPipeInputStream(const nsPipeInputStream& aOther)
174
    : mPipe(aOther.mPipe)
175
    , mLogicalOffset(aOther.mLogicalOffset)
176
    , mInputStatus(aOther.mInputStatus)
177
    , mBlocking(aOther.mBlocking)
178
    , mBlocked(false)
179
    , mCallbackFlags(0)
180
    , mReadState(aOther.mReadState)
181
0
  { }
182
183
  nsresult Fill();
184
  void SetNonBlocking(bool aNonBlocking)
185
0
  {
186
0
    mBlocking = !aNonBlocking;
187
0
  }
188
189
  uint32_t Available();
190
191
  // synchronously wait for the pipe to become readable.
192
  nsresult Wait();
193
194
  // These two don't acquire the monitor themselves.  Instead they
195
  // expect their caller to have done so and to pass the monitor as
196
  // evidence.
197
  MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&,
198
                                const ReentrantMonitorAutoEnter& ev);
199
  MonitorAction OnInputException(nsresult, nsPipeEvents&,
200
                                 const ReentrantMonitorAutoEnter& ev);
201
202
  nsPipeReadState& ReadState()
203
0
  {
204
0
    return mReadState;
205
0
  }
206
207
  const nsPipeReadState& ReadState() const
208
0
  {
209
0
    return mReadState;
210
0
  }
211
212
  nsresult Status() const;
213
214
  // A version of Status() that doesn't acquire the monitor.
215
  nsresult Status(const ReentrantMonitorAutoEnter& ev) const;
216
217
private:
218
  virtual ~nsPipeInputStream();
219
220
  RefPtr<nsPipe>               mPipe;
221
222
  int64_t                        mLogicalOffset;
223
  // Individual input streams can be closed without effecting the rest of the
224
  // pipe.  So track individual input stream status separately.  |mInputStatus|
225
  // is protected by |mPipe->mReentrantMonitor|.
226
  nsresult                       mInputStatus;
227
  bool                           mBlocking;
228
229
  // these variables can only be accessed while inside the pipe's monitor
230
  bool                           mBlocked;
231
  nsCOMPtr<nsIInputStreamCallback> mCallback;
232
  uint32_t                       mCallbackFlags;
233
234
  // requires pipe's monitor; usually treat as an opaque token to pass to nsPipe
235
  nsPipeReadState                mReadState;
236
};
237
238
//-----------------------------------------------------------------------------
239
240
// the output end of a pipe (allocated as a member of the pipe).
241
class nsPipeOutputStream
242
  : public nsIAsyncOutputStream
243
  , public nsIClassInfo
244
{
245
public:
246
  // since this class will be allocated as a member of the pipe, we do not
247
  // need our own ref count.  instead, we share the lifetime (the ref count)
248
  // of the entire pipe.  this macro is just convenience since it does not
249
  // declare a mRefCount variable; however, don't let the name fool you...
250
  // we are not inheriting from nsPipe ;-)
251
  NS_DECL_ISUPPORTS_INHERITED
252
253
  NS_DECL_NSIOUTPUTSTREAM
254
  NS_DECL_NSIASYNCOUTPUTSTREAM
255
  NS_DECL_NSICLASSINFO
256
257
  explicit nsPipeOutputStream(nsPipe* aPipe)
258
    : mPipe(aPipe)
259
    , mWriterRefCnt(0)
260
    , mLogicalOffset(0)
261
    , mBlocking(true)
262
    , mBlocked(false)
263
    , mWritable(true)
264
    , mCallbackFlags(0)
265
0
  { }
266
267
  void SetNonBlocking(bool aNonBlocking)
268
0
  {
269
0
    mBlocking = !aNonBlocking;
270
0
  }
271
  void SetWritable(bool aWritable)
272
0
  {
273
0
    mWritable = aWritable;
274
0
  }
275
276
  // synchronously wait for the pipe to become writable.
277
  nsresult Wait();
278
279
  MonitorAction OnOutputWritable(nsPipeEvents&);
280
  MonitorAction OnOutputException(nsresult, nsPipeEvents&);
281
282
private:
283
  nsPipe*                         mPipe;
284
285
  // separate refcnt so that we know when to close the producer
286
  ThreadSafeAutoRefCntWithRecording<recordreplay::Behavior::Preserve> mWriterRefCnt;
287
  int64_t                         mLogicalOffset;
288
  bool                            mBlocking;
289
290
  // these variables can only be accessed while inside the pipe's monitor
291
  bool                            mBlocked;
292
  bool                            mWritable;
293
  nsCOMPtr<nsIOutputStreamCallback> mCallback;
294
  uint32_t                        mCallbackFlags;
295
};
296
297
//-----------------------------------------------------------------------------
298
299
class nsPipe final : public nsIPipe
300
{
301
public:
302
  friend class nsPipeInputStream;
303
  friend class nsPipeOutputStream;
304
  friend class AutoReadSegment;
305
306
  // As for nsPipeInputStream, preserve refcount changes when recording or
307
  // replaying.
308
  NS_DECL_THREADSAFE_ISUPPORTS_WITH_RECORDING(recordreplay::Behavior::Preserve)
309
  NS_DECL_NSIPIPE
310
311
  // nsPipe methods:
312
  nsPipe();
313
314
private:
315
  ~nsPipe();
316
317
  //
318
  // Methods below may only be called while inside the pipe's monitor.  Some
319
  // of these methods require passing a ReentrantMonitorAutoEnter to prove the
320
  // monitor is held.
321
  //
322
323
  void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
324
                   char*& aCursor, char*& aLimit);
325
  SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState,
326
                                         const ReentrantMonitorAutoEnter &ev);
327
  bool ReadSegmentBeingWritten(nsPipeReadState& aReadState);
328
  uint32_t CountSegmentReferences(int32_t aSegment);
329
  void SetAllNullReadCursors();
330
  bool AllReadCursorsMatchWriteCursor();
331
  void RollBackAllReadCursors(char* aWriteCursor);
332
  void UpdateAllReadCursors(char* aWriteCursor);
333
  void ValidateAllReadCursors();
334
  uint32_t GetBufferSegmentCount(const nsPipeReadState& aReadState,
335
                                 const ReentrantMonitorAutoEnter& ev) const;
336
  bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const;
337
338
  //
339
  // methods below may be called while outside the pipe's monitor
340
  //
341
342
  void     DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents);
343
  nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
344
  void     AdvanceWriteCursor(uint32_t aCount);
345
346
  void     OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason);
347
  void     OnPipeException(nsresult aReason, bool aOutputOnly = false);
348
349
  nsresult CloneInputStream(nsPipeInputStream* aOriginal,
350
                            nsIInputStream** aCloneOut);
351
352
  // methods below should only be called by AutoReadSegment
353
  nsresult GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment,
354
                          uint32_t& aLength);
355
  void     ReleaseReadSegment(nsPipeReadState& aReadState,
356
                              nsPipeEvents& aEvents);
357
  void     AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount);
358
359
  // We can't inherit from both nsIInputStream and nsIOutputStream
360
  // because they collide on their Close method. Consequently we nest their
361
  // implementations to avoid the extra object allocation.
362
  nsPipeOutputStream  mOutput;
363
364
  // Since the input stream can be cloned, we may have more than one.  Use
365
  // a weak reference as the streams will clear their entry here in their
366
  // destructor.  Using a strong reference would create a reference cycle.
367
  // Only usable while mReentrantMonitor is locked.
368
  nsTArray<nsPipeInputStream*> mInputList;
369
370
  // But hold a strong ref to our original input stream.  For backward
371
  // compatibility we need to be able to consistently return this same
372
  // object from GetInputStream().  Note, mOriginalInput is also stored
373
  // in mInputList as a weak ref.
374
  RefPtr<nsPipeInputStream> mOriginalInput;
375
376
  ReentrantMonitor    mReentrantMonitor;
377
  nsSegmentedBuffer   mBuffer;
378
379
  // The maximum number of segments to allow to be buffered in advance
380
  // of the fastest reader.  This is collection of segments is called
381
  // the "advance buffer".
382
  uint32_t            mMaxAdvanceBufferSegmentCount;
383
384
  int32_t             mWriteSegment;
385
  char*               mWriteCursor;
386
  char*               mWriteLimit;
387
388
  // |mStatus| is protected by |mReentrantMonitor|.
389
  nsresult            mStatus;
390
  bool                mInited;
391
};
392
393
//-----------------------------------------------------------------------------
394
395
// RAII class representing an active read segment.  When it goes out of scope
396
// it automatically updates the read cursor and releases the read segment.
397
class MOZ_STACK_CLASS AutoReadSegment final
398
{
399
public:
400
  AutoReadSegment(nsPipe* aPipe, nsPipeReadState& aReadState,
401
                  uint32_t aMaxLength)
402
    : mPipe(aPipe)
403
    , mReadState(aReadState)
404
    , mStatus(NS_ERROR_FAILURE)
405
    , mSegment(nullptr)
406
    , mLength(0)
407
    , mOffset(0)
408
0
  {
409
0
    MOZ_DIAGNOSTIC_ASSERT(mPipe);
410
0
    MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead);
411
0
    mStatus = mPipe->GetReadSegment(mReadState, mSegment, mLength);
412
0
    if (NS_SUCCEEDED(mStatus)) {
413
0
      MOZ_DIAGNOSTIC_ASSERT(mReadState.mActiveRead);
414
0
      MOZ_DIAGNOSTIC_ASSERT(mSegment);
415
0
      mLength = std::min(mLength, aMaxLength);
416
0
      MOZ_DIAGNOSTIC_ASSERT(mLength);
417
0
    }
418
0
  }
419
420
  ~AutoReadSegment()
421
0
  {
422
0
    if (NS_SUCCEEDED(mStatus)) {
423
0
      if (mOffset) {
424
0
        mPipe->AdvanceReadCursor(mReadState, mOffset);
425
0
      } else {
426
0
        nsPipeEvents events;
427
0
        mPipe->ReleaseReadSegment(mReadState, events);
428
0
      }
429
0
    }
430
0
    MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead);
431
0
  }
432
433
  nsresult Status() const
434
0
  {
435
0
    return mStatus;
436
0
  }
437
438
  const char* Data() const
439
0
  {
440
0
    MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
441
0
    MOZ_DIAGNOSTIC_ASSERT(mSegment);
442
0
    return mSegment + mOffset;
443
0
  }
444
445
  uint32_t Length() const
446
0
  {
447
0
    MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
448
0
    MOZ_DIAGNOSTIC_ASSERT(mLength >= mOffset);
449
0
    return mLength - mOffset;
450
0
  }
451
452
  void
453
  Advance(uint32_t aCount)
454
0
  {
455
0
    MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
456
0
    MOZ_DIAGNOSTIC_ASSERT(aCount <= (mLength - mOffset));
457
0
    mOffset += aCount;
458
0
  }
459
460
  nsPipeReadState&
461
  ReadState() const
462
0
  {
463
0
    return mReadState;
464
0
  }
465
466
private:
467
  // guaranteed to remain alive due to limited stack lifetime of AutoReadSegment
468
  nsPipe* mPipe;
469
  nsPipeReadState& mReadState;
470
  nsresult mStatus;
471
  const char* mSegment;
472
  uint32_t mLength;
473
  uint32_t mOffset;
474
};
475
476
//
477
// NOTES on buffer architecture:
478
//
479
//       +-----------------+ - - mBuffer.GetSegment(0)
480
//       |                 |
481
//       + - - - - - - - - + - - nsPipeReadState.mReadCursor
482
//       |/////////////////|
483
//       |/////////////////|
484
//       |/////////////////|
485
//       |/////////////////|
486
//       +-----------------+ - - nsPipeReadState.mReadLimit
487
//                |
488
//       +-----------------+
489
//       |/////////////////|
490
//       |/////////////////|
491
//       |/////////////////|
492
//       |/////////////////|
493
//       |/////////////////|
494
//       |/////////////////|
495
//       +-----------------+
496
//                |
497
//       +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
498
//       |/////////////////|
499
//       |/////////////////|
500
//       |/////////////////|
501
//       + - - - - - - - - + - - mWriteCursor
502
//       |                 |
503
//       |                 |
504
//       +-----------------+ - - mWriteLimit
505
//
506
// (shaded region contains data)
507
//
508
// NOTE: Each input stream produced by the nsPipe contains its own, separate
509
//       nsPipeReadState.  This means there are multiple mReadCursor and
510
//       mReadLimit values in play.  The pipe cannot discard old data until
511
//       all mReadCursors have moved beyond that point in the stream.
512
//
513
//       Likewise, each input stream reader will have it's own amount of
514
//       buffered data.  The pipe size threshold, however, is only applied
515
//       to the input stream that is being read fastest.  We call this
516
//       the "advance buffer" in that its in advance of all readers.  We
517
//       allow slower input streams to buffer more data so that we don't
518
//       stall processing of the faster input stream.
519
//
520
// NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
521
// small allocations (e.g., 64 byte allocations).  this means that buffers may
522
// be allocated back-to-back.  in the diagram above, for example, mReadLimit
523
// would actually be pointing at the beginning of the next segment.  when
524
// making changes to this file, please keep this fact in mind.
525
//
526
527
//-----------------------------------------------------------------------------
528
// nsPipe methods:
529
//-----------------------------------------------------------------------------
530
531
nsPipe::nsPipe()
532
  : mOutput(this)
533
  , mOriginalInput(new nsPipeInputStream(this))
534
  , mReentrantMonitor("nsPipe.mReentrantMonitor")
535
  , mMaxAdvanceBufferSegmentCount(0)
536
  , mWriteSegment(-1)
537
  , mWriteCursor(nullptr)
538
  , mWriteLimit(nullptr)
539
  , mStatus(NS_OK)
540
  , mInited(false)
541
0
{
542
0
  mInputList.AppendElement(mOriginalInput);
543
0
}
544
545
nsPipe::~nsPipe()
546
0
{
547
0
}
548
549
NS_IMPL_ADDREF(nsPipe)
550
NS_IMPL_QUERY_INTERFACE(nsPipe, nsIPipe)
551
552
NS_IMETHODIMP_(MozExternalRefCountType)
553
nsPipe::Release()
554
0
{
555
0
  MOZ_DIAGNOSTIC_ASSERT(int32_t(mRefCnt) > 0, "dup release");
556
0
  nsrefcnt count = --mRefCnt;
557
0
  NS_LOG_RELEASE(this, count, "nsPipe");
558
0
  if (count == 0) {
559
0
    delete (this);
560
0
    return 0;
561
0
  }
562
0
  // Avoid racing on |mOriginalInput| by only looking at it when
563
0
  // the refcount is 1, that is, we are the only pointer (hence only
564
0
  // thread) to access it.
565
0
  if (count == 1 && mOriginalInput) {
566
0
    mOriginalInput = nullptr;
567
0
    return 1;
568
0
  }
569
0
  return count;
570
0
}
571
572
NS_IMETHODIMP
573
nsPipe::Init(bool aNonBlockingIn,
574
             bool aNonBlockingOut,
575
             uint32_t aSegmentSize,
576
             uint32_t aSegmentCount)
577
0
{
578
0
  mInited = true;
579
0
580
0
  if (aSegmentSize == 0) {
581
0
    aSegmentSize = DEFAULT_SEGMENT_SIZE;
582
0
  }
583
0
  if (aSegmentCount == 0) {
584
0
    aSegmentCount = DEFAULT_SEGMENT_COUNT;
585
0
  }
586
0
587
0
  // protect against overflow
588
0
  uint32_t maxCount = uint32_t(-1) / aSegmentSize;
589
0
  if (aSegmentCount > maxCount) {
590
0
    aSegmentCount = maxCount;
591
0
  }
592
0
593
0
  // The internal buffer is always "infinite" so that we can allow
594
0
  // the size to expand when cloned streams are read at different
595
0
  // rates.  We enforce a limit on how much data can be buffered
596
0
  // ahead of the fastest reader in GetWriteSegment().
597
0
  nsresult rv = mBuffer.Init(aSegmentSize, UINT32_MAX);
598
0
  if (NS_FAILED(rv)) {
599
0
    return rv;
600
0
  }
601
0
602
0
  mMaxAdvanceBufferSegmentCount = aSegmentCount;
603
0
604
0
  mOutput.SetNonBlocking(aNonBlockingOut);
605
0
  mOriginalInput->SetNonBlocking(aNonBlockingIn);
606
0
607
0
  return NS_OK;
608
0
}
609
610
NS_IMETHODIMP
611
nsPipe::GetInputStream(nsIAsyncInputStream** aInputStream)
612
0
{
613
0
  if (NS_WARN_IF(!mInited)) {
614
0
    return NS_ERROR_NOT_INITIALIZED;
615
0
  }
616
0
  RefPtr<nsPipeInputStream> ref = mOriginalInput;
617
0
  ref.forget(aInputStream);
618
0
  return NS_OK;
619
0
}
620
621
NS_IMETHODIMP
622
nsPipe::GetOutputStream(nsIAsyncOutputStream** aOutputStream)
623
0
{
624
0
  if (NS_WARN_IF(!mInited)) {
625
0
    return NS_ERROR_NOT_INITIALIZED;
626
0
  }
627
0
  NS_ADDREF(*aOutputStream = &mOutput);
628
0
  return NS_OK;
629
0
}
630
631
void
632
nsPipe::PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
633
                    char*& aCursor, char*& aLimit)
634
0
{
635
0
  if (aIndex == 0) {
636
0
    MOZ_DIAGNOSTIC_ASSERT(!aReadState.mReadCursor || mBuffer.GetSegmentCount());
637
0
    aCursor = aReadState.mReadCursor;
638
0
    aLimit = aReadState.mReadLimit;
639
0
  } else {
640
0
    uint32_t absoluteIndex = aReadState.mSegment + aIndex;
641
0
    uint32_t numSegments = mBuffer.GetSegmentCount();
642
0
    if (absoluteIndex >= numSegments) {
643
0
      aCursor = aLimit = nullptr;
644
0
    } else {
645
0
      aCursor = mBuffer.GetSegment(absoluteIndex);
646
0
      if (mWriteSegment == (int32_t)absoluteIndex) {
647
0
        aLimit = mWriteCursor;
648
0
      } else {
649
0
        aLimit = aCursor + mBuffer.GetSegmentSize();
650
0
      }
651
0
    }
652
0
  }
653
0
}
654
655
nsresult
656
nsPipe::GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment,
657
                       uint32_t& aLength)
658
0
{
659
0
  ReentrantMonitorAutoEnter mon(mReentrantMonitor);
660
0
661
0
  if (aReadState.mReadCursor == aReadState.mReadLimit) {
662
0
    return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
663
0
  }
664
0
665
0
  // The input stream locks the pipe while getting the buffer to read from,
666
0
  // but then unlocks while actual data copying is taking place.  In
667
0
  // order to avoid deleting the buffer out from under this lockless read
668
0
  // set a flag to indicate a read is active.  This flag is only modified
669
0
  // while the lock is held.
670
0
  MOZ_DIAGNOSTIC_ASSERT(!aReadState.mActiveRead);
671
0
  aReadState.mActiveRead = true;
672
0
673
0
  aSegment = aReadState.mReadCursor;
674
0
  aLength = aReadState.mReadLimit - aReadState.mReadCursor;
675
0
  MOZ_DIAGNOSTIC_ASSERT(aLength <= aReadState.mAvailable);
676
0
677
0
  return NS_OK;
678
0
}
679
680
void
681
nsPipe::ReleaseReadSegment(nsPipeReadState& aReadState, nsPipeEvents& aEvents)
682
0
{
683
0
  ReentrantMonitorAutoEnter mon(mReentrantMonitor);
684
0
685
0
  MOZ_DIAGNOSTIC_ASSERT(aReadState.mActiveRead);
686
0
  aReadState.mActiveRead = false;
687
0
688
0
  // When a read completes and releases the mActiveRead flag, we may have blocked
689
0
  // a drain from completing.  This occurs when the input stream is closed during
690
0
  // the read.  In these cases, we need to complete the drain as soon as the
691
0
  // active read completes.
692
0
  if (aReadState.mNeedDrain) {
693
0
    aReadState.mNeedDrain = false;
694
0
    DrainInputStream(aReadState, aEvents);
695
0
  }
696
0
}
697
698
void
699
nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aBytesRead)
700
0
{
701
0
  MOZ_DIAGNOSTIC_ASSERT(aBytesRead > 0);
702
0
703
0
  nsPipeEvents events;
704
0
  {
705
0
    ReentrantMonitorAutoEnter mon(mReentrantMonitor);
706
0
707
0
    LOG(("III advancing read cursor by %u\n", aBytesRead));
708
0
    MOZ_DIAGNOSTIC_ASSERT(aBytesRead <= mBuffer.GetSegmentSize());
709
0
710
0
    aReadState.mReadCursor += aBytesRead;
711
0
    MOZ_DIAGNOSTIC_ASSERT(aReadState.mReadCursor <= aReadState.mReadLimit);
712
0
713
0
    MOZ_DIAGNOSTIC_ASSERT(aReadState.mAvailable >= aBytesRead);
714
0
    aReadState.mAvailable -= aBytesRead;
715
0
716
0
    // Check to see if we're at the end of the available read data.  If we
717
0
    // are, and this segment is not still being written, then we can possibly
718
0
    // free up the segment.
719
0
    if (aReadState.mReadCursor == aReadState.mReadLimit &&
720
0
        !ReadSegmentBeingWritten(aReadState)) {
721
0
722
0
      // Advance the segment position.  If we have read any segments from the
723
0
      // advance buffer then we can potentially notify blocked writers.
724
0
      if (AdvanceReadSegment(aReadState, mon) == SegmentAdvanceBufferRead &&
725
0
          mOutput.OnOutputWritable(events) == NotifyMonitor) {
726
0
        mon.NotifyAll();
727
0
      }
728
0
    }
729
0
730
0
    ReleaseReadSegment(aReadState, events);
731
0
  }
732
0
}
733
734
SegmentChangeResult
735
nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState,
736
                           const ReentrantMonitorAutoEnter &ev)
737
0
{
738
0
  // Calculate how many segments are buffered for this stream to start.
739
0
  uint32_t startBufferSegments = GetBufferSegmentCount(aReadState, ev);
740
0
741
0
  int32_t currentSegment = aReadState.mSegment;
742
0
743
0
  // Move to the next segment to read
744
0
  aReadState.mSegment += 1;
745
0
746
0
  // If this was the last reference to the first segment, then remove it.
747
0
  if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
748
0
749
0
    // shift write and read segment index (-1 indicates an empty buffer).
750
0
    mWriteSegment -= 1;
751
0
752
0
    // Directly modify the current read state.  If the associated input
753
0
    // stream is closed simultaneous with reading, then it may not be
754
0
    // in the mInputList any more.
755
0
    aReadState.mSegment -= 1;
756
0
757
0
    for (uint32_t i = 0; i < mInputList.Length(); ++i) {
758
0
      // Skip the current read state structure since we modify it manually
759
0
      // before entering this loop.
760
0
      if (&mInputList[i]->ReadState() == &aReadState) {
761
0
        continue;
762
0
      }
763
0
      mInputList[i]->ReadState().mSegment -= 1;
764
0
    }
765
0
766
0
    // done with this segment
767
0
    mBuffer.DeleteFirstSegment();
768
0
    LOG(("III deleting first segment\n"));
769
0
  }
770
0
771
0
  if (mWriteSegment < aReadState.mSegment) {
772
0
    // read cursor has hit the end of written data, so reset it
773
0
    MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
774
0
    aReadState.mReadCursor = nullptr;
775
0
    aReadState.mReadLimit = nullptr;
776
0
    // also, the buffer is completely empty, so reset the write cursor
777
0
    if (mWriteSegment == -1) {
778
0
      mWriteCursor = nullptr;
779
0
      mWriteLimit = nullptr;
780
0
    }
781
0
  } else {
782
0
    // advance read cursor and limit to next buffer segment
783
0
    aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
784
0
    if (mWriteSegment == aReadState.mSegment) {
785
0
      aReadState.mReadLimit = mWriteCursor;
786
0
    } else {
787
0
      aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
788
0
    }
789
0
  }
790
0
791
0
  // Calculate how many segments are buffered for the stream after
792
0
  // reading.
793
0
  uint32_t endBufferSegments = GetBufferSegmentCount(aReadState, ev);
794
0
795
0
  // If the stream has read a segment out of the set of advanced buffer
796
0
  // segments, then the writer may advance.
797
0
  if (startBufferSegments >= mMaxAdvanceBufferSegmentCount &&
798
0
      endBufferSegments < mMaxAdvanceBufferSegmentCount) {
799
0
    return SegmentAdvanceBufferRead;
800
0
  }
801
0
802
0
  // Otherwise there are no significant changes to the segment structure.
803
0
  return SegmentNotChanged;
804
0
}
805
806
void
807
nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents)
808
0
{
809
0
  ReentrantMonitorAutoEnter mon(mReentrantMonitor);
810
0
811
0
  // If a segment is actively being read in ReadSegments() for this input
812
0
  // stream, then we cannot drain the stream.  This can happen because
813
0
  // ReadSegments() does not hold the lock while copying from the buffer.
814
0
  // If we detect this condition, simply note that we need a drain once
815
0
  // the read completes and return immediately.
816
0
  if (aReadState.mActiveRead) {
817
0
    MOZ_DIAGNOSTIC_ASSERT(!aReadState.mNeedDrain);
818
0
    aReadState.mNeedDrain = true;
819
0
    return;
820
0
  }
821
0
822
0
  while(mWriteSegment >= aReadState.mSegment) {
823
0
824
0
    // If the last segment to free is still being written to, we're done
825
0
    // draining.  We can't free any more.
826
0
    if (ReadSegmentBeingWritten(aReadState)) {
827
0
      break;
828
0
    }
829
0
830
0
    // Don't bother checking if this results in an advance buffer segment
831
0
    // read.  Since we are draining the entire stream we will read an
832
0
    // advance buffer segment no matter what.
833
0
    AdvanceReadSegment(aReadState, mon);
834
0
  }
835
0
836
0
  // Force the stream into an empty state.  Make sure mAvailable, mCursor, and
837
0
  // mReadLimit are consistent with one another.
838
0
  aReadState.mAvailable = 0;
839
0
  aReadState.mReadCursor = nullptr;
840
0
  aReadState.mReadLimit = nullptr;
841
0
842
0
  // Remove the input stream from the pipe's list of streams.  This will
843
0
  // prevent the pipe from holding the stream alive or trying to update
844
0
  // its read state any further.
845
0
  DebugOnly<uint32_t> numRemoved = 0;
846
0
  mInputList.RemoveElementsBy([&](nsPipeInputStream* aEntry) {
847
0
    bool result = &aReadState == &aEntry->ReadState();
848
0
    numRemoved += result ? 1 : 0;
849
0
    return result;
850
0
  });
851
0
  MOZ_ASSERT(numRemoved == 1);
852
0
853
0
  // If we have read any segments from the advance buffer then we can
854
0
  // potentially notify blocked writers.
855
0
  if (!IsAdvanceBufferFull(mon) &&
856
0
      mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
857
0
    mon.NotifyAll();
858
0
  }
859
0
}
860
861
bool
862
nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState)
863
0
{
864
0
  mReentrantMonitor.AssertCurrentThreadIn();
865
0
  bool beingWritten = mWriteSegment == aReadState.mSegment &&
866
0
                      mWriteLimit > mWriteCursor;
867
0
  MOZ_DIAGNOSTIC_ASSERT(!beingWritten || aReadState.mReadLimit == mWriteCursor);
868
0
  return beingWritten;
869
0
}
870
871
nsresult
872
nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen)
873
0
{
874
0
  ReentrantMonitorAutoEnter mon(mReentrantMonitor);
875
0
876
0
  if (NS_FAILED(mStatus)) {
877
0
    return mStatus;
878
0
  }
879
0
880
0
  // write cursor and limit may both be null indicating an empty buffer.
881
0
  if (mWriteCursor == mWriteLimit) {
882
0
    // The pipe is full if we have hit our limit on advance data buffering.
883
0
    // This means the fastest reader is still reading slower than data is
884
0
    // being written into the pipe.
885
0
    if (IsAdvanceBufferFull(mon)) {
886
0
      return NS_BASE_STREAM_WOULD_BLOCK;
887
0
    }
888
0
889
0
    // The nsSegmentedBuffer is configured to be "infinite", so this
890
0
    // should never return nullptr here.
891
0
    char* seg = mBuffer.AppendNewSegment();
892
0
    if (!seg) {
893
0
      return NS_ERROR_OUT_OF_MEMORY;
894
0
    }
895
0
896
0
    LOG(("OOO appended new segment\n"));
897
0
    mWriteCursor = seg;
898
0
    mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
899
0
    ++mWriteSegment;
900
0
  }
901
0
902
0
  // make sure read cursor is initialized
903
0
  SetAllNullReadCursors();
904
0
905
0
  // check to see if we can roll-back our read and write cursors to the
906
0
  // beginning of the current/first segment.  this is purely an optimization.
907
0
  if (mWriteSegment == 0 && AllReadCursorsMatchWriteCursor()) {
908
0
    char* head = mBuffer.GetSegment(0);
909
0
    LOG(("OOO rolling back write cursor %" PRId64 " bytes\n",
910
0
         static_cast<int64_t>(mWriteCursor - head)));
911
0
    RollBackAllReadCursors(head);
912
0
    mWriteCursor = head;
913
0
  }
914
0
915
0
  aSegment    = mWriteCursor;
916
0
  aSegmentLen = mWriteLimit - mWriteCursor;
917
0
  return NS_OK;
918
0
}
919
920
void
921
nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten)
922
0
{
923
0
  MOZ_DIAGNOSTIC_ASSERT(aBytesWritten > 0);
924
0
925
0
  nsPipeEvents events;
926
0
  {
927
0
    ReentrantMonitorAutoEnter mon(mReentrantMonitor);
928
0
929
0
    LOG(("OOO advancing write cursor by %u\n", aBytesWritten));
930
0
931
0
    char* newWriteCursor = mWriteCursor + aBytesWritten;
932
0
    MOZ_DIAGNOSTIC_ASSERT(newWriteCursor <= mWriteLimit);
933
0
934
0
    // update read limit if reading in the same segment
935
0
    UpdateAllReadCursors(newWriteCursor);
936
0
937
0
    mWriteCursor = newWriteCursor;
938
0
939
0
    ValidateAllReadCursors();
940
0
941
0
    // update the writable flag on the output stream
942
0
    if (mWriteCursor == mWriteLimit) {
943
0
      mOutput.SetWritable(!IsAdvanceBufferFull(mon));
944
0
    }
945
0
946
0
    // notify input stream that pipe now contains additional data
947
0
    bool needNotify = false;
948
0
    for (uint32_t i = 0; i < mInputList.Length(); ++i) {
949
0
      if (mInputList[i]->OnInputReadable(aBytesWritten, events, mon)
950
0
          == NotifyMonitor) {
951
0
        needNotify = true;
952
0
      }
953
0
    }
954
0
955
0
    if (needNotify) {
956
0
      mon.NotifyAll();
957
0
    }
958
0
  }
959
0
}
960
961
void
962
nsPipe::OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason)
963
0
{
964
0
  MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
965
0
966
0
  nsPipeEvents events;
967
0
  {
968
0
    ReentrantMonitorAutoEnter mon(mReentrantMonitor);
969
0
970
0
    // Its possible to re-enter this method when we call OnPipeException() or
971
0
    // OnInputExection() below.  If there is a caller stuck in our synchronous
972
0
    // Wait() method, then they will get woken up with a failure code which
973
0
    // re-enters this method.  Therefore, gracefully handle unknown streams
974
0
    // here.
975
0
976
0
    // If we only have one stream open and it is the given stream, then shut
977
0
    // down the entire pipe.
978
0
    if (mInputList.Length() == 1) {
979
0
      if (mInputList[0] == aStream) {
980
0
        OnPipeException(aReason);
981
0
      }
982
0
      return;
983
0
    }
984
0
985
0
    // Otherwise just close the particular stream that hit an exception.
986
0
    for (uint32_t i = 0; i < mInputList.Length(); ++i) {
987
0
      if (mInputList[i] != aStream) {
988
0
        continue;
989
0
      }
990
0
991
0
      MonitorAction action = mInputList[i]->OnInputException(aReason, events,
992
0
                                                             mon);
993
0
994
0
      // Notify after element is removed in case we re-enter as a result.
995
0
      if (action == NotifyMonitor) {
996
0
        mon.NotifyAll();
997
0
      }
998
0
999
0
      return;
1000
0
    }
1001
0
  }
1002
0
}
1003
1004
void
1005
nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly)
1006
0
{
1007
0
  LOG(("PPP nsPipe::OnPipeException [reason=%" PRIx32 " output-only=%d]\n",
1008
0
       static_cast<uint32_t>(aReason), aOutputOnly));
1009
0
1010
0
  nsPipeEvents events;
1011
0
  {
1012
0
    ReentrantMonitorAutoEnter mon(mReentrantMonitor);
1013
0
1014
0
    // if we've already hit an exception, then ignore this one.
1015
0
    if (NS_FAILED(mStatus)) {
1016
0
      return;
1017
0
    }
1018
0
1019
0
    mStatus = aReason;
1020
0
1021
0
    bool needNotify = false;
1022
0
1023
0
    // OnInputException() can drain the stream and remove it from
1024
0
    // mInputList.  So iterate over a temp list instead.
1025
0
    nsTArray<nsPipeInputStream*> list(mInputList);
1026
0
    for (uint32_t i = 0; i < list.Length(); ++i) {
1027
0
      // an output-only exception applies to the input end if the pipe has
1028
0
      // zero bytes available.
1029
0
      if (aOutputOnly && list[i]->Available()) {
1030
0
        continue;
1031
0
      }
1032
0
1033
0
      if (list[i]->OnInputException(aReason, events, mon) == NotifyMonitor) {
1034
0
        needNotify = true;
1035
0
      }
1036
0
    }
1037
0
1038
0
    if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) {
1039
0
      needNotify = true;
1040
0
    }
1041
0
1042
0
    // Notify after we have removed any input streams from mInputList
1043
0
    if (needNotify) {
1044
0
      mon.NotifyAll();
1045
0
    }
1046
0
  }
1047
0
}
1048
1049
nsresult
1050
nsPipe::CloneInputStream(nsPipeInputStream* aOriginal,
1051
                         nsIInputStream** aCloneOut)
1052
0
{
1053
0
  ReentrantMonitorAutoEnter mon(mReentrantMonitor);
1054
0
  RefPtr<nsPipeInputStream> ref = new nsPipeInputStream(*aOriginal);
1055
0
  mInputList.AppendElement(ref);
1056
0
  nsCOMPtr<nsIAsyncInputStream> downcast = ref.forget();
1057
0
  downcast.forget(aCloneOut);
1058
0
  return NS_OK;
1059
0
}
1060
1061
uint32_t
1062
nsPipe::CountSegmentReferences(int32_t aSegment)
1063
0
{
1064
0
  mReentrantMonitor.AssertCurrentThreadIn();
1065
0
  uint32_t count = 0;
1066
0
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1067
0
    if (aSegment >= mInputList[i]->ReadState().mSegment) {
1068
0
      count += 1;
1069
0
    }
1070
0
  }
1071
0
  return count;
1072
0
}
1073
1074
void
1075
nsPipe::SetAllNullReadCursors()
1076
0
{
1077
0
  mReentrantMonitor.AssertCurrentThreadIn();
1078
0
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1079
0
    nsPipeReadState& readState = mInputList[i]->ReadState();
1080
0
    if (!readState.mReadCursor) {
1081
0
      MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment);
1082
0
      readState.mReadCursor = readState.mReadLimit = mWriteCursor;
1083
0
    }
1084
0
  }
1085
0
}
1086
1087
bool
1088
nsPipe::AllReadCursorsMatchWriteCursor()
1089
0
{
1090
0
  mReentrantMonitor.AssertCurrentThreadIn();
1091
0
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1092
0
    const nsPipeReadState& readState = mInputList[i]->ReadState();
1093
0
    if (readState.mSegment != mWriteSegment ||
1094
0
        readState.mReadCursor != mWriteCursor) {
1095
0
      return false;
1096
0
    }
1097
0
  }
1098
0
  return true;
1099
0
}
1100
1101
void
1102
nsPipe::RollBackAllReadCursors(char* aWriteCursor)
1103
0
{
1104
0
  mReentrantMonitor.AssertCurrentThreadIn();
1105
0
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1106
0
    nsPipeReadState& readState = mInputList[i]->ReadState();
1107
0
    MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment);
1108
0
    MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadCursor);
1109
0
    MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadLimit);
1110
0
    readState.mReadCursor = aWriteCursor;
1111
0
    readState.mReadLimit = aWriteCursor;
1112
0
  }
1113
0
}
1114
1115
void
1116
nsPipe::UpdateAllReadCursors(char* aWriteCursor)
1117
0
{
1118
0
  mReentrantMonitor.AssertCurrentThreadIn();
1119
0
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1120
0
    nsPipeReadState& readState = mInputList[i]->ReadState();
1121
0
    if (mWriteSegment == readState.mSegment &&
1122
0
        readState.mReadLimit == mWriteCursor) {
1123
0
      readState.mReadLimit = aWriteCursor;
1124
0
    }
1125
0
  }
1126
0
}
1127
1128
void
1129
nsPipe::ValidateAllReadCursors()
1130
0
{
1131
0
  mReentrantMonitor.AssertCurrentThreadIn();
1132
0
  // The only way mReadCursor == mWriteCursor is if:
1133
0
  //
1134
0
  // - mReadCursor is at the start of a segment (which, based on how
1135
0
  //   nsSegmentedBuffer works, means that this segment is the "first"
1136
0
  //   segment)
1137
0
  // - mWriteCursor points at the location past the end of the current
1138
0
  //   write segment (so the current write filled the current write
1139
0
  //   segment, so we've incremented mWriteCursor to point past the end
1140
0
  //   of it)
1141
0
  // - the segment to which data has just been written is located
1142
0
  //   exactly one segment's worth of bytes before the first segment
1143
0
  //   where mReadCursor is located
1144
0
  //
1145
0
  // Consequently, the byte immediately after the end of the current
1146
0
  // write segment is the first byte of the first segment, so
1147
0
  // mReadCursor == mWriteCursor.  (Another way to think about this is
1148
0
  // to consider the buffer architecture diagram above, but consider it
1149
0
  // with an arena allocator which allocates from the *end* of the
1150
0
  // arena to the *beginning* of the arena.)
1151
#ifdef DEBUG
1152
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1153
    const nsPipeReadState& state = mInputList[i]->ReadState();
1154
    MOZ_ASSERT(state.mReadCursor != mWriteCursor ||
1155
               (mBuffer.GetSegment(state.mSegment) == state.mReadCursor &&
1156
               mWriteCursor == mWriteLimit));
1157
  }
1158
#endif
1159
}
1160
1161
uint32_t
1162
nsPipe::GetBufferSegmentCount(const nsPipeReadState& aReadState,
1163
                              const ReentrantMonitorAutoEnter& ev) const
1164
0
{
1165
0
  // The write segment can be smaller than the current reader position
1166
0
  // in some cases.  For example, when the first write segment has not
1167
0
  // been allocated yet mWriteSegment is negative.  In these cases
1168
0
  // the stream is effectively using zero segments.
1169
0
  if (mWriteSegment < aReadState.mSegment) {
1170
0
    return 0;
1171
0
  }
1172
0
1173
0
  MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= 0);
1174
0
  MOZ_DIAGNOSTIC_ASSERT(aReadState.mSegment >= 0);
1175
0
1176
0
  // Otherwise at least one segment is being used.  We add one here
1177
0
  // since a single segment is being used when the write and read
1178
0
  // segment indices are the same.
1179
0
  return 1 + mWriteSegment - aReadState.mSegment;
1180
0
}
1181
1182
bool
1183
nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const
1184
0
{
1185
0
  // If we have fewer total segments than the limit we can immediately
1186
0
  // determine we are not full.  Note, we must add one to mWriteSegment
1187
0
  // to convert from a index to a count.
1188
0
  MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= -1);
1189
0
  MOZ_DIAGNOSTIC_ASSERT(mWriteSegment < INT32_MAX);
1190
0
  uint32_t totalWriteSegments = mWriteSegment + 1;
1191
0
  if (totalWriteSegments < mMaxAdvanceBufferSegmentCount) {
1192
0
    return false;
1193
0
  }
1194
0
1195
0
  // Otherwise we must inspect all of our reader streams.  We need
1196
0
  // to determine the buffer depth of the fastest reader.
1197
0
  uint32_t minBufferSegments = UINT32_MAX;
1198
0
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1199
0
    // Only count buffer segments from input streams that are open.
1200
0
    if (NS_FAILED(mInputList[i]->Status(ev))) {
1201
0
      continue;
1202
0
    }
1203
0
    const nsPipeReadState& state = mInputList[i]->ReadState();
1204
0
    uint32_t bufferSegments = GetBufferSegmentCount(state, ev);
1205
0
    minBufferSegments = std::min(minBufferSegments, bufferSegments);
1206
0
    // We only care if any reader has fewer segments buffered than
1207
0
    // our threshold.  We can stop once we hit that threshold.
1208
0
    if (minBufferSegments < mMaxAdvanceBufferSegmentCount) {
1209
0
      return false;
1210
0
    }
1211
0
  }
1212
0
1213
0
  // Note, its possible for minBufferSegments to exceed our
1214
0
  // mMaxAdvanceBufferSegmentCount here.  This happens when a cloned
1215
0
  // reader gets far behind, but then the fastest reader stream is
1216
0
  // closed.  This leaves us with a single stream that is buffered
1217
0
  // beyond our max.  Naturally we continue to indicate the pipe
1218
0
  // is full at this point.
1219
0
1220
0
  return true;
1221
0
}
1222
1223
//-----------------------------------------------------------------------------
1224
// nsPipeEvents methods:
1225
//-----------------------------------------------------------------------------
1226
1227
nsPipeEvents::~nsPipeEvents()
1228
0
{
1229
0
  // dispatch any pending events
1230
0
1231
0
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1232
0
    mInputList[i].mCallback->OnInputStreamReady(mInputList[i].mStream);
1233
0
  }
1234
0
  mInputList.Clear();
1235
0
1236
0
  if (mOutputCallback) {
1237
0
    mOutputCallback->OnOutputStreamReady(mOutputStream);
1238
0
    mOutputCallback = nullptr;
1239
0
    mOutputStream = nullptr;
1240
0
  }
1241
0
}
1242
1243
//-----------------------------------------------------------------------------
1244
// nsPipeInputStream methods:
1245
//-----------------------------------------------------------------------------
1246
1247
NS_IMPL_ADDREF(nsPipeInputStream);
1248
NS_IMPL_RELEASE(nsPipeInputStream);
1249
1250
0
NS_INTERFACE_TABLE_HEAD(nsPipeInputStream)
1251
0
  NS_INTERFACE_TABLE_BEGIN
1252
0
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIAsyncInputStream)
1253
0
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISeekableStream)
1254
0
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISearchableInputStream)
1255
0
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsICloneableInputStream)
1256
0
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIBufferedInputStream)
1257
0
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIClassInfo)
1258
0
    NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsIInputStream,
1259
0
                                       nsIAsyncInputStream)
1260
0
    NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsISupports,
1261
0
                                       nsIAsyncInputStream)
1262
0
  NS_INTERFACE_TABLE_END
1263
0
NS_INTERFACE_TABLE_TAIL
1264
1265
NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream,
1266
                            nsIInputStream,
1267
                            nsIAsyncInputStream,
1268
                            nsISeekableStream,
1269
                            nsISearchableInputStream,
1270
                            nsICloneableInputStream,
1271
                            nsIBufferedInputStream)
1272
1273
NS_IMPL_THREADSAFE_CI(nsPipeInputStream)
1274
1275
NS_IMETHODIMP
1276
nsPipeInputStream::Init(nsIInputStream*, uint32_t)
1277
0
{
1278
0
  MOZ_CRASH("nsPipeInputStream should never be initialized with "
1279
0
            "nsIBufferedInputStream::Init!\n");
1280
0
}
1281
1282
NS_IMETHODIMP
1283
nsPipeInputStream::GetData(nsIInputStream **aResult)
1284
0
{
1285
0
  // as this was not created with init() we are not
1286
0
  // wrapping anything
1287
0
  return NS_ERROR_NOT_IMPLEMENTED;
1288
0
}
1289
1290
uint32_t
1291
nsPipeInputStream::Available()
1292
0
{
1293
0
  mPipe->mReentrantMonitor.AssertCurrentThreadIn();
1294
0
  return mReadState.mAvailable;
1295
0
}
1296
1297
nsresult
1298
nsPipeInputStream::Wait()
1299
0
{
1300
0
  MOZ_DIAGNOSTIC_ASSERT(mBlocking);
1301
0
1302
0
  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1303
0
1304
0
  while (NS_SUCCEEDED(Status(mon)) && (mReadState.mAvailable == 0)) {
1305
0
    LOG(("III pipe input: waiting for data\n"));
1306
0
1307
0
    mBlocked = true;
1308
0
    mon.Wait();
1309
0
    mBlocked = false;
1310
0
1311
0
    LOG(("III pipe input: woke up [status=%" PRIx32 " available=%u]\n",
1312
0
         static_cast<uint32_t>(Status(mon)), mReadState.mAvailable));
1313
0
  }
1314
0
1315
0
  return Status(mon) == NS_BASE_STREAM_CLOSED ? NS_OK : Status(mon);
1316
0
}
1317
1318
MonitorAction
1319
nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten,
1320
                                   nsPipeEvents& aEvents,
1321
                                   const ReentrantMonitorAutoEnter& ev)
1322
0
{
1323
0
  MonitorAction result = DoNotNotifyMonitor;
1324
0
1325
0
  mPipe->mReentrantMonitor.AssertCurrentThreadIn();
1326
0
  mReadState.mAvailable += aBytesWritten;
1327
0
1328
0
  if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
1329
0
    aEvents.NotifyInputReady(this, mCallback);
1330
0
    mCallback = nullptr;
1331
0
    mCallbackFlags = 0;
1332
0
  } else if (mBlocked) {
1333
0
    result = NotifyMonitor;
1334
0
  }
1335
0
1336
0
  return result;
1337
0
}
1338
1339
MonitorAction
1340
nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents,
1341
                                    const ReentrantMonitorAutoEnter& ev)
1342
0
{
1343
0
  LOG(("nsPipeInputStream::OnInputException [this=%p reason=%" PRIx32 "]\n",
1344
0
       this, static_cast<uint32_t>(aReason)));
1345
0
1346
0
  MonitorAction result = DoNotNotifyMonitor;
1347
0
1348
0
  MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
1349
0
1350
0
  if (NS_SUCCEEDED(mInputStatus)) {
1351
0
    mInputStatus = aReason;
1352
0
  }
1353
0
1354
0
  // force count of available bytes to zero.
1355
0
  mPipe->DrainInputStream(mReadState, aEvents);
1356
0
1357
0
  if (mCallback) {
1358
0
    aEvents.NotifyInputReady(this, mCallback);
1359
0
    mCallback = nullptr;
1360
0
    mCallbackFlags = 0;
1361
0
  } else if (mBlocked) {
1362
0
    result = NotifyMonitor;
1363
0
  }
1364
0
1365
0
  return result;
1366
0
}
1367
1368
NS_IMETHODIMP
1369
nsPipeInputStream::CloseWithStatus(nsresult aReason)
1370
0
{
1371
0
  LOG(("III CloseWithStatus [this=%p reason=%" PRIx32 "]\n",
1372
0
       this, static_cast<uint32_t>(aReason)));
1373
0
1374
0
  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1375
0
1376
0
  if (NS_FAILED(mInputStatus)) {
1377
0
    return NS_OK;
1378
0
  }
1379
0
1380
0
  if (NS_SUCCEEDED(aReason)) {
1381
0
    aReason = NS_BASE_STREAM_CLOSED;
1382
0
  }
1383
0
1384
0
  mPipe->OnInputStreamException(this, aReason);
1385
0
  return NS_OK;
1386
0
}
1387
1388
NS_IMETHODIMP
1389
nsPipeInputStream::Close()
1390
0
{
1391
0
  return CloseWithStatus(NS_BASE_STREAM_CLOSED);
1392
0
}
1393
1394
NS_IMETHODIMP
1395
nsPipeInputStream::Available(uint64_t* aResult)
1396
0
{
1397
0
  // nsPipeInputStream supports under 4GB stream only
1398
0
  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1399
0
1400
0
  // return error if closed
1401
0
  if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
1402
0
    return Status(mon);
1403
0
  }
1404
0
1405
0
  *aResult = (uint64_t)mReadState.mAvailable;
1406
0
  return NS_OK;
1407
0
}
1408
1409
NS_IMETHODIMP
1410
nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter,
1411
                                void* aClosure,
1412
                                uint32_t aCount,
1413
                                uint32_t* aReadCount)
1414
0
{
1415
0
  LOG(("III ReadSegments [this=%p count=%u]\n", this, aCount));
1416
0
1417
0
  nsresult rv = NS_OK;
1418
0
1419
0
  *aReadCount = 0;
1420
0
  while (aCount) {
1421
0
    AutoReadSegment segment(mPipe, mReadState, aCount);
1422
0
    rv = segment.Status();
1423
0
    if (NS_FAILED(rv)) {
1424
0
      // ignore this error if we've already read something.
1425
0
      if (*aReadCount > 0) {
1426
0
        rv = NS_OK;
1427
0
        break;
1428
0
      }
1429
0
      if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1430
0
        // pipe is empty
1431
0
        if (!mBlocking) {
1432
0
          break;
1433
0
        }
1434
0
        // wait for some data to be written to the pipe
1435
0
        rv = Wait();
1436
0
        if (NS_SUCCEEDED(rv)) {
1437
0
          continue;
1438
0
        }
1439
0
      }
1440
0
      // ignore this error, just return.
1441
0
      if (rv == NS_BASE_STREAM_CLOSED) {
1442
0
        rv = NS_OK;
1443
0
        break;
1444
0
      }
1445
0
      mPipe->OnInputStreamException(this, rv);
1446
0
      break;
1447
0
    }
1448
0
1449
0
    uint32_t writeCount;
1450
0
    while (segment.Length()) {
1451
0
      writeCount = 0;
1452
0
1453
0
      rv = aWriter(static_cast<nsIAsyncInputStream*>(this), aClosure,
1454
0
                   segment.Data(), *aReadCount, segment.Length(), &writeCount);
1455
0
1456
0
      if (NS_FAILED(rv) || writeCount == 0) {
1457
0
        aCount = 0;
1458
0
        // any errors returned from the writer end here: do not
1459
0
        // propagate to the caller of ReadSegments.
1460
0
        rv = NS_OK;
1461
0
        break;
1462
0
      }
1463
0
1464
0
      MOZ_DIAGNOSTIC_ASSERT(writeCount <= segment.Length());
1465
0
      segment.Advance(writeCount);
1466
0
      aCount -= writeCount;
1467
0
      *aReadCount += writeCount;
1468
0
      mLogicalOffset += writeCount;
1469
0
    }
1470
0
  }
1471
0
1472
0
  return rv;
1473
0
}
1474
1475
NS_IMETHODIMP
1476
nsPipeInputStream::Read(char* aToBuf, uint32_t aBufLen, uint32_t* aReadCount)
1477
0
{
1478
0
  return ReadSegments(NS_CopySegmentToBuffer, aToBuf, aBufLen, aReadCount);
1479
0
}
1480
1481
NS_IMETHODIMP
1482
nsPipeInputStream::IsNonBlocking(bool* aNonBlocking)
1483
0
{
1484
0
  *aNonBlocking = !mBlocking;
1485
0
  return NS_OK;
1486
0
}
1487
1488
NS_IMETHODIMP
1489
nsPipeInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
1490
                             uint32_t aFlags,
1491
                             uint32_t aRequestedCount,
1492
                             nsIEventTarget* aTarget)
1493
0
{
1494
0
  LOG(("III AsyncWait [this=%p]\n", this));
1495
0
1496
0
  nsPipeEvents pipeEvents;
1497
0
  {
1498
0
    ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1499
0
1500
0
    // replace a pending callback
1501
0
    mCallback = nullptr;
1502
0
    mCallbackFlags = 0;
1503
0
1504
0
    if (!aCallback) {
1505
0
      return NS_OK;
1506
0
    }
1507
0
1508
0
    nsCOMPtr<nsIInputStreamCallback> proxy;
1509
0
    if (aTarget) {
1510
0
      proxy = NS_NewInputStreamReadyEvent("nsPipeInputStream::AsyncWait",
1511
0
                                          aCallback, aTarget);
1512
0
      aCallback = proxy;
1513
0
    }
1514
0
1515
0
    if (NS_FAILED(Status(mon)) ||
1516
0
       (mReadState.mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) {
1517
0
      // stream is already closed or readable; post event.
1518
0
      pipeEvents.NotifyInputReady(this, aCallback);
1519
0
    } else {
1520
0
      // queue up callback object to be notified when data becomes available
1521
0
      mCallback = aCallback;
1522
0
      mCallbackFlags = aFlags;
1523
0
    }
1524
0
  }
1525
0
  return NS_OK;
1526
0
}
1527
1528
NS_IMETHODIMP
1529
nsPipeInputStream::Seek(int32_t aWhence, int64_t aOffset)
1530
0
{
1531
0
  MOZ_ASSERT_UNREACHABLE("nsPipeInputStream::Seek");
1532
0
  return NS_ERROR_NOT_IMPLEMENTED;
1533
0
}
1534
1535
NS_IMETHODIMP
1536
nsPipeInputStream::Tell(int64_t* aOffset)
1537
0
{
1538
0
  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1539
0
1540
0
  // return error if closed
1541
0
  if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
1542
0
    return Status(mon);
1543
0
  }
1544
0
1545
0
  *aOffset = mLogicalOffset;
1546
0
  return NS_OK;
1547
0
}
1548
1549
NS_IMETHODIMP
1550
nsPipeInputStream::SetEOF()
1551
0
{
1552
0
  MOZ_ASSERT_UNREACHABLE("nsPipeInputStream::SetEOF");
1553
0
  return NS_ERROR_NOT_IMPLEMENTED;
1554
0
}
1555
1556
static bool strings_equal(bool aIgnoreCase,
1557
                          const char* aS1, const char* aS2, uint32_t aLen)
1558
0
{
1559
0
  return aIgnoreCase
1560
0
    ? !nsCRT::strncasecmp(aS1, aS2, aLen) : !strncmp(aS1, aS2, aLen);
1561
0
}
1562
1563
NS_IMETHODIMP
1564
nsPipeInputStream::Search(const char* aForString,
1565
                          bool aIgnoreCase,
1566
                          bool* aFound,
1567
                          uint32_t* aOffsetSearchedTo)
1568
0
{
1569
0
  LOG(("III Search [for=%s ic=%u]\n", aForString, aIgnoreCase));
1570
0
1571
0
  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1572
0
1573
0
  char* cursor1;
1574
0
  char* limit1;
1575
0
  uint32_t index = 0, offset = 0;
1576
0
  uint32_t strLen = strlen(aForString);
1577
0
1578
0
  mPipe->PeekSegment(mReadState, 0, cursor1, limit1);
1579
0
  if (cursor1 == limit1) {
1580
0
    *aFound = false;
1581
0
    *aOffsetSearchedTo = 0;
1582
0
    LOG(("  result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1583
0
    return NS_OK;
1584
0
  }
1585
0
1586
0
  while (true) {
1587
0
    uint32_t i, len1 = limit1 - cursor1;
1588
0
1589
0
    // check if the string is in the buffer segment
1590
0
    for (i = 0; i < len1 - strLen + 1; i++) {
1591
0
      if (strings_equal(aIgnoreCase, &cursor1[i], aForString, strLen)) {
1592
0
        *aFound = true;
1593
0
        *aOffsetSearchedTo = offset + i;
1594
0
        LOG(("  result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1595
0
        return NS_OK;
1596
0
      }
1597
0
    }
1598
0
1599
0
    // get the next segment
1600
0
    char* cursor2;
1601
0
    char* limit2;
1602
0
    uint32_t len2;
1603
0
1604
0
    index++;
1605
0
    offset += len1;
1606
0
1607
0
    mPipe->PeekSegment(mReadState, index, cursor2, limit2);
1608
0
    if (cursor2 == limit2) {
1609
0
      *aFound = false;
1610
0
      *aOffsetSearchedTo = offset - strLen + 1;
1611
0
      LOG(("  result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1612
0
      return NS_OK;
1613
0
    }
1614
0
    len2 = limit2 - cursor2;
1615
0
1616
0
    // check if the string is straddling the next buffer segment
1617
0
    uint32_t lim = XPCOM_MIN(strLen, len2 + 1);
1618
0
    for (i = 0; i < lim; ++i) {
1619
0
      uint32_t strPart1Len = strLen - i - 1;
1620
0
      uint32_t strPart2Len = strLen - strPart1Len;
1621
0
      const char* strPart2 = &aForString[strLen - strPart2Len];
1622
0
      uint32_t bufSeg1Offset = len1 - strPart1Len;
1623
0
      if (strings_equal(aIgnoreCase, &cursor1[bufSeg1Offset], aForString, strPart1Len) &&
1624
0
          strings_equal(aIgnoreCase, cursor2, strPart2, strPart2Len)) {
1625
0
        *aFound = true;
1626
0
        *aOffsetSearchedTo = offset - strPart1Len;
1627
0
        LOG(("  result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1628
0
        return NS_OK;
1629
0
      }
1630
0
    }
1631
0
1632
0
    // finally continue with the next buffer
1633
0
    cursor1 = cursor2;
1634
0
    limit1 = limit2;
1635
0
  }
1636
0
1637
0
  MOZ_ASSERT_UNREACHABLE("can't get here");
1638
0
  return NS_ERROR_UNEXPECTED;    // keep compiler happy
1639
0
}
1640
1641
NS_IMETHODIMP
1642
nsPipeInputStream::GetCloneable(bool* aCloneableOut)
1643
0
{
1644
0
  *aCloneableOut = true;
1645
0
  return NS_OK;
1646
0
}
1647
1648
NS_IMETHODIMP
1649
nsPipeInputStream::Clone(nsIInputStream** aCloneOut)
1650
0
{
1651
0
  return mPipe->CloneInputStream(this, aCloneOut);
1652
0
}
1653
1654
nsresult
1655
nsPipeInputStream::Status(const ReentrantMonitorAutoEnter& ev) const
1656
0
{
1657
0
  if (NS_FAILED(mInputStatus)) {
1658
0
    return mInputStatus;
1659
0
  }
1660
0
1661
0
  if (mReadState.mAvailable) {
1662
0
    // Still something to read and this input stream state is OK.
1663
0
    return NS_OK;
1664
0
  }
1665
0
1666
0
  // Nothing to read, just fall through to the pipe's state that
1667
0
  // may reflect state of its output stream side (already closed).
1668
0
  return mPipe->mStatus;
1669
0
}
1670
1671
nsresult
1672
nsPipeInputStream::Status() const
1673
0
{
1674
0
  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1675
0
  return Status(mon);
1676
0
}
1677
1678
nsPipeInputStream::~nsPipeInputStream()
1679
0
{
1680
0
  Close();
1681
0
}
1682
1683
//-----------------------------------------------------------------------------
1684
// nsPipeOutputStream methods:
1685
//-----------------------------------------------------------------------------
1686
1687
NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream,
1688
                        nsIOutputStream,
1689
                        nsIAsyncOutputStream,
1690
                        nsIClassInfo)
1691
1692
NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream,
1693
                            nsIOutputStream,
1694
                            nsIAsyncOutputStream)
1695
1696
NS_IMPL_THREADSAFE_CI(nsPipeOutputStream)
1697
1698
nsresult
1699
nsPipeOutputStream::Wait()
1700
0
{
1701
0
  MOZ_DIAGNOSTIC_ASSERT(mBlocking);
1702
0
1703
0
  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1704
0
1705
0
  if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
1706
0
    LOG(("OOO pipe output: waiting for space\n"));
1707
0
    mBlocked = true;
1708
0
    mon.Wait();
1709
0
    mBlocked = false;
1710
0
    LOG(("OOO pipe output: woke up [pipe-status=%" PRIx32 " writable=%u]\n",
1711
0
         static_cast<uint32_t>(mPipe->mStatus), mWritable));
1712
0
  }
1713
0
1714
0
  return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
1715
0
}
1716
1717
MonitorAction
1718
nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents)
1719
0
{
1720
0
  MonitorAction result = DoNotNotifyMonitor;
1721
0
1722
0
  mWritable = true;
1723
0
1724
0
  if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
1725
0
    aEvents.NotifyOutputReady(this, mCallback);
1726
0
    mCallback = nullptr;
1727
0
    mCallbackFlags = 0;
1728
0
  } else if (mBlocked) {
1729
0
    result = NotifyMonitor;
1730
0
  }
1731
0
1732
0
  return result;
1733
0
}
1734
1735
MonitorAction
1736
nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents)
1737
0
{
1738
0
  LOG(("nsPipeOutputStream::OnOutputException [this=%p reason=%" PRIx32 "]\n",
1739
0
       this, static_cast<uint32_t>(aReason)));
1740
0
1741
0
  MonitorAction result = DoNotNotifyMonitor;
1742
0
1743
0
  MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
1744
0
  mWritable = false;
1745
0
1746
0
  if (mCallback) {
1747
0
    aEvents.NotifyOutputReady(this, mCallback);
1748
0
    mCallback = nullptr;
1749
0
    mCallbackFlags = 0;
1750
0
  } else if (mBlocked) {
1751
0
    result = NotifyMonitor;
1752
0
  }
1753
0
1754
0
  return result;
1755
0
}
1756
1757
1758
NS_IMETHODIMP_(MozExternalRefCountType)
1759
nsPipeOutputStream::AddRef()
1760
0
{
1761
0
  ++mWriterRefCnt;
1762
0
  return mPipe->AddRef();
1763
0
}
1764
1765
NS_IMETHODIMP_(MozExternalRefCountType)
1766
nsPipeOutputStream::Release()
1767
0
{
1768
0
  if (--mWriterRefCnt == 0) {
1769
0
    Close();
1770
0
  }
1771
0
  return mPipe->Release();
1772
0
}
1773
1774
NS_IMETHODIMP
1775
nsPipeOutputStream::CloseWithStatus(nsresult aReason)
1776
0
{
1777
0
  LOG(("OOO CloseWithStatus [this=%p reason=%" PRIx32 "]\n",
1778
0
       this, static_cast<uint32_t>(aReason)));
1779
0
1780
0
  if (NS_SUCCEEDED(aReason)) {
1781
0
    aReason = NS_BASE_STREAM_CLOSED;
1782
0
  }
1783
0
1784
0
  // input stream may remain open
1785
0
  mPipe->OnPipeException(aReason, true);
1786
0
  return NS_OK;
1787
0
}
1788
1789
NS_IMETHODIMP
1790
nsPipeOutputStream::Close()
1791
0
{
1792
0
  return CloseWithStatus(NS_BASE_STREAM_CLOSED);
1793
0
}
1794
1795
NS_IMETHODIMP
1796
nsPipeOutputStream::WriteSegments(nsReadSegmentFun aReader,
1797
                                  void* aClosure,
1798
                                  uint32_t aCount,
1799
                                  uint32_t* aWriteCount)
1800
0
{
1801
0
  LOG(("OOO WriteSegments [this=%p count=%u]\n", this, aCount));
1802
0
1803
0
  nsresult rv = NS_OK;
1804
0
1805
0
  char* segment;
1806
0
  uint32_t segmentLen;
1807
0
1808
0
  *aWriteCount = 0;
1809
0
  while (aCount) {
1810
0
    rv = mPipe->GetWriteSegment(segment, segmentLen);
1811
0
    if (NS_FAILED(rv)) {
1812
0
      if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1813
0
        // pipe is full
1814
0
        if (!mBlocking) {
1815
0
          // ignore this error if we've already written something
1816
0
          if (*aWriteCount > 0) {
1817
0
            rv = NS_OK;
1818
0
          }
1819
0
          break;
1820
0
        }
1821
0
        // wait for the pipe to have an empty segment.
1822
0
        rv = Wait();
1823
0
        if (NS_SUCCEEDED(rv)) {
1824
0
          continue;
1825
0
        }
1826
0
      }
1827
0
      mPipe->OnPipeException(rv);
1828
0
      break;
1829
0
    }
1830
0
1831
0
    // write no more than aCount
1832
0
    if (segmentLen > aCount) {
1833
0
      segmentLen = aCount;
1834
0
    }
1835
0
1836
0
    uint32_t readCount, originalLen = segmentLen;
1837
0
    while (segmentLen) {
1838
0
      readCount = 0;
1839
0
1840
0
      rv = aReader(this, aClosure, segment, *aWriteCount, segmentLen, &readCount);
1841
0
1842
0
      if (NS_FAILED(rv) || readCount == 0) {
1843
0
        aCount = 0;
1844
0
        // any errors returned from the aReader end here: do not
1845
0
        // propagate to the caller of WriteSegments.
1846
0
        rv = NS_OK;
1847
0
        break;
1848
0
      }
1849
0
1850
0
      MOZ_DIAGNOSTIC_ASSERT(readCount <= segmentLen);
1851
0
      segment += readCount;
1852
0
      segmentLen -= readCount;
1853
0
      aCount -= readCount;
1854
0
      *aWriteCount += readCount;
1855
0
      mLogicalOffset += readCount;
1856
0
    }
1857
0
1858
0
    if (segmentLen < originalLen) {
1859
0
      mPipe->AdvanceWriteCursor(originalLen - segmentLen);
1860
0
    }
1861
0
  }
1862
0
1863
0
  return rv;
1864
0
}
1865
1866
static nsresult
1867
nsReadFromRawBuffer(nsIOutputStream* aOutStr,
1868
                    void* aClosure,
1869
                    char* aToRawSegment,
1870
                    uint32_t aOffset,
1871
                    uint32_t aCount,
1872
                    uint32_t* aReadCount)
1873
0
{
1874
0
  const char* fromBuf = (const char*)aClosure;
1875
0
  memcpy(aToRawSegment, &fromBuf[aOffset], aCount);
1876
0
  *aReadCount = aCount;
1877
0
  return NS_OK;
1878
0
}
1879
1880
NS_IMETHODIMP
1881
nsPipeOutputStream::Write(const char* aFromBuf,
1882
                          uint32_t aBufLen,
1883
                          uint32_t* aWriteCount)
1884
0
{
1885
0
  return WriteSegments(nsReadFromRawBuffer, (void*)aFromBuf, aBufLen, aWriteCount);
1886
0
}
1887
1888
NS_IMETHODIMP
1889
nsPipeOutputStream::Flush(void)
1890
0
{
1891
0
  // nothing to do
1892
0
  return NS_OK;
1893
0
}
1894
1895
static nsresult
1896
nsReadFromInputStream(nsIOutputStream* aOutStr,
1897
                      void* aClosure,
1898
                      char* aToRawSegment,
1899
                      uint32_t aOffset,
1900
                      uint32_t aCount,
1901
                      uint32_t* aReadCount)
1902
0
{
1903
0
  nsIInputStream* fromStream = (nsIInputStream*)aClosure;
1904
0
  return fromStream->Read(aToRawSegment, aCount, aReadCount);
1905
0
}
1906
1907
NS_IMETHODIMP
1908
nsPipeOutputStream::WriteFrom(nsIInputStream* aFromStream,
1909
                              uint32_t aCount,
1910
                              uint32_t* aWriteCount)
1911
0
{
1912
0
  return WriteSegments(nsReadFromInputStream, aFromStream, aCount, aWriteCount);
1913
0
}
1914
1915
NS_IMETHODIMP
1916
nsPipeOutputStream::IsNonBlocking(bool* aNonBlocking)
1917
0
{
1918
0
  *aNonBlocking = !mBlocking;
1919
0
  return NS_OK;
1920
0
}
1921
1922
NS_IMETHODIMP
1923
nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback* aCallback,
1924
                              uint32_t aFlags,
1925
                              uint32_t aRequestedCount,
1926
                              nsIEventTarget* aTarget)
1927
0
{
1928
0
  LOG(("OOO AsyncWait [this=%p]\n", this));
1929
0
1930
0
  nsPipeEvents pipeEvents;
1931
0
  {
1932
0
    ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1933
0
1934
0
    // replace a pending callback
1935
0
    mCallback = nullptr;
1936
0
    mCallbackFlags = 0;
1937
0
1938
0
    if (!aCallback) {
1939
0
      return NS_OK;
1940
0
    }
1941
0
1942
0
    nsCOMPtr<nsIOutputStreamCallback> proxy;
1943
0
    if (aTarget) {
1944
0
      proxy = NS_NewOutputStreamReadyEvent(aCallback, aTarget);
1945
0
      aCallback = proxy;
1946
0
    }
1947
0
1948
0
    if (NS_FAILED(mPipe->mStatus) ||
1949
0
        (mWritable && !(aFlags & WAIT_CLOSURE_ONLY))) {
1950
0
      // stream is already closed or writable; post event.
1951
0
      pipeEvents.NotifyOutputReady(this, aCallback);
1952
0
    } else {
1953
0
      // queue up callback object to be notified when data becomes available
1954
0
      mCallback = aCallback;
1955
0
      mCallbackFlags = aFlags;
1956
0
    }
1957
0
  }
1958
0
  return NS_OK;
1959
0
}
1960
1961
////////////////////////////////////////////////////////////////////////////////
1962
1963
nsresult
1964
NS_NewPipe(nsIInputStream** aPipeIn,
1965
           nsIOutputStream** aPipeOut,
1966
           uint32_t aSegmentSize,
1967
           uint32_t aMaxSize,
1968
           bool aNonBlockingInput,
1969
           bool aNonBlockingOutput)
1970
0
{
1971
0
  if (aSegmentSize == 0) {
1972
0
    aSegmentSize = DEFAULT_SEGMENT_SIZE;
1973
0
  }
1974
0
1975
0
  // Handle aMaxSize of UINT32_MAX as a special case
1976
0
  uint32_t segmentCount;
1977
0
  if (aMaxSize == UINT32_MAX) {
1978
0
    segmentCount = UINT32_MAX;
1979
0
  } else {
1980
0
    segmentCount = aMaxSize / aSegmentSize;
1981
0
  }
1982
0
1983
0
  nsIAsyncInputStream* in;
1984
0
  nsIAsyncOutputStream* out;
1985
0
  nsresult rv = NS_NewPipe2(&in, &out, aNonBlockingInput, aNonBlockingOutput,
1986
0
                            aSegmentSize, segmentCount);
1987
0
  if (NS_FAILED(rv)) {
1988
0
    return rv;
1989
0
  }
1990
0
1991
0
  *aPipeIn = in;
1992
0
  *aPipeOut = out;
1993
0
  return NS_OK;
1994
0
}
1995
1996
nsresult
1997
NS_NewPipe2(nsIAsyncInputStream** aPipeIn,
1998
            nsIAsyncOutputStream** aPipeOut,
1999
            bool aNonBlockingInput,
2000
            bool aNonBlockingOutput,
2001
            uint32_t aSegmentSize,
2002
            uint32_t aSegmentCount)
2003
0
{
2004
0
  nsPipe* pipe = new nsPipe();
2005
0
  nsresult rv = pipe->Init(aNonBlockingInput,
2006
0
                           aNonBlockingOutput,
2007
0
                           aSegmentSize,
2008
0
                           aSegmentCount);
2009
0
  if (NS_FAILED(rv)) {
2010
0
    NS_ADDREF(pipe);
2011
0
    NS_RELEASE(pipe);
2012
0
    return rv;
2013
0
  }
2014
0
2015
0
  // These always succeed because the pipe is initialized above.
2016
0
  MOZ_ALWAYS_SUCCEEDS(pipe->GetInputStream(aPipeIn));
2017
0
  MOZ_ALWAYS_SUCCEEDS(pipe->GetOutputStream(aPipeOut));
2018
0
  return NS_OK;
2019
0
}
2020
2021
nsresult
2022
nsPipeConstructor(nsISupports* aOuter, REFNSIID aIID, void** aResult)
2023
0
{
2024
0
  if (aOuter) {
2025
0
    return NS_ERROR_NO_AGGREGATION;
2026
0
  }
2027
0
  nsPipe* pipe = new nsPipe();
2028
0
  NS_ADDREF(pipe);
2029
0
  nsresult rv = pipe->QueryInterface(aIID, aResult);
2030
0
  NS_RELEASE(pipe);
2031
0
  return rv;
2032
0
}
2033
2034
////////////////////////////////////////////////////////////////////////////////