Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/xpcom/io/nsMultiplexInputStream.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this
5
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
/**
8
 * The multiplex stream concatenates a list of input streams into a single
9
 * stream.
10
 */
11
12
#include "mozilla/Attributes.h"
13
#include "mozilla/MathAlgorithms.h"
14
#include "mozilla/Mutex.h"
15
#include "mozilla/SystemGroup.h"
16
17
#include "base/basictypes.h"
18
19
#include "nsMultiplexInputStream.h"
20
#include "nsIBufferedStreams.h"
21
#include "nsICloneableInputStream.h"
22
#include "nsIMultiplexInputStream.h"
23
#include "nsISeekableStream.h"
24
#include "nsCOMPtr.h"
25
#include "nsCOMArray.h"
26
#include "nsIClassInfoImpl.h"
27
#include "nsIIPCSerializableInputStream.h"
28
#include "mozilla/ipc/InputStreamUtils.h"
29
#include "nsIAsyncInputStream.h"
30
#include "nsIInputStreamLength.h"
31
#include "nsNetUtil.h"
32
#include "nsStreamUtils.h"
33
34
using namespace mozilla;
35
using namespace mozilla::ipc;
36
37
using mozilla::DeprecatedAbs;
38
using mozilla::Maybe;
39
using mozilla::Nothing;
40
using mozilla::Some;
41
42
class nsMultiplexInputStream final
43
  : public nsIMultiplexInputStream
44
  , public nsISeekableStream
45
  , public nsIIPCSerializableInputStream
46
  , public nsICloneableInputStream
47
  , public nsIAsyncInputStream
48
  , public nsIInputStreamCallback
49
  , public nsIInputStreamLength
50
  , public nsIAsyncInputStreamLength
51
{
52
public:
53
  nsMultiplexInputStream();
54
55
  NS_DECL_THREADSAFE_ISUPPORTS
56
  NS_DECL_NSIINPUTSTREAM
57
  NS_DECL_NSIMULTIPLEXINPUTSTREAM
58
  NS_DECL_NSISEEKABLESTREAM
59
  NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
60
  NS_DECL_NSICLONEABLEINPUTSTREAM
61
  NS_DECL_NSIASYNCINPUTSTREAM
62
  NS_DECL_NSIINPUTSTREAMCALLBACK
63
  NS_DECL_NSIINPUTSTREAMLENGTH
64
  NS_DECL_NSIASYNCINPUTSTREAMLENGTH
65
66
  // This is used for nsIAsyncInputStream::AsyncWait
67
  void AsyncWaitCompleted();
68
69
  // This is used for nsIAsyncInputStreamLength::AsyncLengthWait
70
  void AsyncWaitCompleted(int64_t aLength,
71
                          const MutexAutoLock& aProofOfLock);
72
73
  struct StreamData
74
  {
75
    void Initialize(nsIInputStream* aStream, bool aBuffered)
76
0
    {
77
0
      mStream = aStream;
78
0
      mAsyncStream = do_QueryInterface(aStream);
79
0
      mSeekableStream = do_QueryInterface(aStream);
80
0
      mBuffered = aBuffered;
81
0
    }
82
83
    nsCOMPtr<nsIInputStream> mStream;
84
85
    // This can be null.
86
    nsCOMPtr<nsIAsyncInputStream> mAsyncStream;
87
    // This can be null.
88
    nsCOMPtr<nsISeekableStream> mSeekableStream;
89
90
    // True if the stream is wrapped with nsIBufferedInputStream.
91
    bool mBuffered;
92
  };
93
94
  Mutex& GetLock()
95
0
  {
96
0
    return mLock;
97
0
  }
98
99
private:
100
  ~nsMultiplexInputStream()
101
0
  {
102
0
  }
103
104
  nsresult
105
  AsyncWaitInternal();
106
107
  // This method updates mSeekableStreams, mIPCSerializableStreams,
108
  // mCloneableStreams and mAsyncInputStreams values.
109
  void UpdateQIMap(StreamData& aStream, int32_t aCount);
110
111
  struct MOZ_STACK_CLASS ReadSegmentsState
112
  {
113
    nsCOMPtr<nsIInputStream> mThisStream;
114
    uint32_t mOffset;
115
    nsWriteSegmentFun mWriter;
116
    void* mClosure;
117
    bool mDone;
118
  };
119
120
  static nsresult ReadSegCb(nsIInputStream* aIn, void* aClosure,
121
                            const char* aFromRawSegment, uint32_t aToOffset,
122
                            uint32_t aCount, uint32_t* aWriteCount);
123
124
  bool IsSeekable() const;
125
  bool IsIPCSerializable() const;
126
  bool IsCloneable() const;
127
  bool IsAsyncInputStream() const;
128
  bool IsInputStreamLength() const;
129
  bool IsAsyncInputStreamLength() const;
130
131
  Mutex mLock; // Protects access to all data members.
132
133
  nsTArray<StreamData> mStreams;
134
135
  uint32_t mCurrentStream;
136
  bool mStartedReadingCurrent;
137
  nsresult mStatus;
138
  nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
139
  uint32_t mAsyncWaitFlags;
140
  uint32_t mAsyncWaitRequestedCount;
141
  nsCOMPtr<nsIEventTarget> mAsyncWaitEventTarget;
142
  nsCOMPtr<nsIInputStreamLengthCallback> mAsyncWaitLengthCallback;
143
144
  class AsyncWaitLengthHelper;
145
  RefPtr<AsyncWaitLengthHelper> mAsyncWaitLengthHelper;
146
147
  uint32_t mSeekableStreams;
148
  uint32_t mIPCSerializableStreams;
149
  uint32_t mCloneableStreams;
150
  uint32_t mAsyncInputStreams;
151
  uint32_t mInputStreamLengths;
152
  uint32_t mAsyncInputStreamLengths;
153
};
154
155
NS_IMPL_ADDREF(nsMultiplexInputStream)
156
NS_IMPL_RELEASE(nsMultiplexInputStream)
157
158
NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
159
                  NS_MULTIPLEXINPUTSTREAM_CID)
160
161
0
NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream)
162
0
  NS_INTERFACE_MAP_ENTRY(nsIMultiplexInputStream)
163
0
  NS_INTERFACE_MAP_ENTRY(nsIInputStream)
164
0
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, IsSeekable())
165
0
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
166
0
                                     IsIPCSerializable())
167
0
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
168
0
                                     IsCloneable())
169
0
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream,
170
0
                                     IsAsyncInputStream())
171
0
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback,
172
0
                                     IsAsyncInputStream())
173
0
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength,
174
0
                                     IsInputStreamLength())
175
0
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength,
176
0
                                     IsAsyncInputStreamLength())
177
0
  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIMultiplexInputStream)
178
0
  NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream)
179
0
NS_INTERFACE_MAP_END
180
181
NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream,
182
                            nsIMultiplexInputStream,
183
                            nsIInputStream,
184
                            nsISeekableStream)
185
186
static nsresult
187
AvailableMaybeSeek(nsMultiplexInputStream::StreamData& aStream,
188
                   uint64_t* aResult)
189
0
{
190
0
  nsresult rv = aStream.mStream->Available(aResult);
191
0
  if (rv == NS_BASE_STREAM_CLOSED) {
192
0
    // Blindly seek to the current position if Available() returns
193
0
    // NS_BASE_STREAM_CLOSED.
194
0
    // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
195
0
    // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
196
0
    if (aStream.mSeekableStream) {
197
0
      nsresult rvSeek =
198
0
        aStream.mSeekableStream->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
199
0
      if (NS_SUCCEEDED(rvSeek)) {
200
0
        rv = aStream.mStream->Available(aResult);
201
0
      }
202
0
    }
203
0
  }
204
0
  return rv;
205
0
}
206
207
static nsresult
208
TellMaybeSeek(nsISeekableStream* aSeekable, int64_t* aResult)
209
0
{
210
0
  nsresult rv = aSeekable->Tell(aResult);
211
0
  if (rv == NS_BASE_STREAM_CLOSED) {
212
0
    // Blindly seek to the current position if Tell() returns
213
0
    // NS_BASE_STREAM_CLOSED.
214
0
    // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
215
0
    // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
216
0
    nsresult rvSeek = aSeekable->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
217
0
    if (NS_SUCCEEDED(rvSeek)) {
218
0
      rv = aSeekable->Tell(aResult);
219
0
    }
220
0
  }
221
0
  return rv;
222
0
}
223
224
nsMultiplexInputStream::nsMultiplexInputStream()
225
  : mLock("nsMultiplexInputStream lock")
226
  , mCurrentStream(0)
227
  , mStartedReadingCurrent(false)
228
  , mStatus(NS_OK)
229
  , mAsyncWaitFlags(0)
230
  , mAsyncWaitRequestedCount(0)
231
  , mSeekableStreams(0)
232
  , mIPCSerializableStreams(0)
233
  , mCloneableStreams(0)
234
  , mAsyncInputStreams(0)
235
  , mInputStreamLengths(0)
236
  , mAsyncInputStreamLengths(0)
237
0
{}
238
239
NS_IMETHODIMP
240
nsMultiplexInputStream::GetCount(uint32_t* aCount)
241
0
{
242
0
  MutexAutoLock lock(mLock);
243
0
  *aCount = mStreams.Length();
244
0
  return NS_OK;
245
0
}
246
247
NS_IMETHODIMP
248
nsMultiplexInputStream::AppendStream(nsIInputStream* aStream)
249
0
{
250
0
  nsCOMPtr<nsIInputStream> stream = aStream;
251
0
252
0
  bool buffered = false;
253
0
  if (!NS_InputStreamIsBuffered(stream)) {
254
0
    nsCOMPtr<nsIInputStream> bufferedStream;
255
0
    nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(bufferedStream),
256
0
                                            stream.forget(), 4096);
257
0
    NS_ENSURE_SUCCESS(rv, rv);
258
0
    stream = bufferedStream.forget();
259
0
    buffered = true;
260
0
  }
261
0
262
0
  MutexAutoLock lock(mLock);
263
0
264
0
  StreamData* streamData = mStreams.AppendElement(fallible);
265
0
  if (NS_WARN_IF(!streamData)) {
266
0
    return NS_ERROR_OUT_OF_MEMORY;
267
0
  }
268
0
269
0
  streamData->Initialize(stream, buffered);
270
0
271
0
  UpdateQIMap(*streamData, 1);
272
0
273
0
  if (mStatus == NS_BASE_STREAM_CLOSED) {
274
0
    // We were closed, but now we have more data to read.
275
0
    mStatus = NS_OK;
276
0
  }
277
0
278
0
  return NS_OK;
279
0
}
280
281
NS_IMETHODIMP
282
nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream** aResult)
283
0
{
284
0
  MutexAutoLock lock(mLock);
285
0
286
0
  if (aIndex >= mStreams.Length()) {
287
0
    return NS_ERROR_NOT_AVAILABLE;
288
0
  }
289
0
290
0
  StreamData& streamData = mStreams.ElementAt(aIndex);
291
0
292
0
  nsCOMPtr<nsIInputStream> stream = streamData.mStream;
293
0
294
0
  if (streamData.mBuffered) {
295
0
    nsCOMPtr<nsIBufferedInputStream> bufferedStream = do_QueryInterface(stream);
296
0
    MOZ_ASSERT(bufferedStream);
297
0
298
0
    nsresult rv = bufferedStream->GetData(getter_AddRefs(stream));
299
0
    if (NS_WARN_IF(NS_FAILED(rv))) {
300
0
      return rv;
301
0
    }
302
0
  }
303
0
304
0
  stream.forget(aResult);
305
0
  return NS_OK;
306
0
}
307
308
NS_IMETHODIMP
309
nsMultiplexInputStream::Close()
310
0
{
311
0
  nsTArray<nsCOMPtr<nsIInputStream>> streams;
312
0
313
0
  // Let's take a copy of the streams becuase, calling close() it could trigger
314
0
  // a nsIInputStreamCallback immediately and we don't want to create a deadlock
315
0
  // with mutex.
316
0
  {
317
0
    MutexAutoLock lock(mLock);
318
0
    uint32_t len = mStreams.Length();
319
0
    for (uint32_t i = 0; i < len; ++i) {
320
0
      if (NS_WARN_IF(!streams.AppendElement(mStreams[i].mStream, fallible))) {
321
0
        mStatus = NS_BASE_STREAM_CLOSED;
322
0
        return NS_ERROR_OUT_OF_MEMORY;
323
0
      }
324
0
    }
325
0
    mStatus = NS_BASE_STREAM_CLOSED;
326
0
  }
327
0
328
0
  nsresult rv = NS_OK;
329
0
330
0
  uint32_t len = streams.Length();
331
0
  for (uint32_t i = 0; i < len; ++i) {
332
0
    nsresult rv2 = streams[i]->Close();
333
0
    // We still want to close all streams, but we should return an error
334
0
    if (NS_FAILED(rv2)) {
335
0
      rv = rv2;
336
0
    }
337
0
  }
338
0
339
0
  return rv;
340
0
}
341
342
NS_IMETHODIMP
343
nsMultiplexInputStream::Available(uint64_t* aResult)
344
0
{
345
0
  *aResult = 0;
346
0
347
0
  MutexAutoLock lock(mLock);
348
0
  if (NS_FAILED(mStatus)) {
349
0
    return mStatus;
350
0
  }
351
0
352
0
  uint64_t avail = 0;
353
0
  nsresult rv = NS_BASE_STREAM_CLOSED;
354
0
355
0
  uint32_t len = mStreams.Length();
356
0
  for (uint32_t i = mCurrentStream; i < len; i++) {
357
0
    uint64_t streamAvail;
358
0
    rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
359
0
    if (rv == NS_BASE_STREAM_CLOSED) {
360
0
      // If a stream is closed, we continue with the next one.
361
0
      // If this is the current stream we move to the following stream.
362
0
      if (mCurrentStream == i) {
363
0
        ++mCurrentStream;
364
0
      }
365
0
366
0
      // If this is the last stream, we want to return this error code.
367
0
      continue;
368
0
    }
369
0
370
0
    if (NS_WARN_IF(NS_FAILED(rv))) {
371
0
      mStatus = rv;
372
0
      return mStatus;
373
0
    }
374
0
375
0
    // If the current stream is async, we have to return what we have so far
376
0
    // without processing the following streams. This is needed because
377
0
    // ::Available should return only what is currently available. In case of an
378
0
    // nsIAsyncInputStream, we have to call AsyncWait() in order to read more.
379
0
    if (mStreams[i].mAsyncStream) {
380
0
      avail += streamAvail;
381
0
      break;
382
0
    }
383
0
384
0
    if (streamAvail == 0) {
385
0
      // Nothing to read for this stream. Let's move to the next one.
386
0
      continue;
387
0
    }
388
0
389
0
    avail += streamAvail;
390
0
  }
391
0
392
0
  // We still have something to read. We don't want to return an error code yet.
393
0
  if (avail) {
394
0
    *aResult = avail;
395
0
    return NS_OK;
396
0
  }
397
0
398
0
  // Let's propagate the last error message.
399
0
  mStatus = rv;
400
0
  return rv;
401
0
}
402
403
NS_IMETHODIMP
404
nsMultiplexInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
405
0
{
406
0
  MutexAutoLock lock(mLock);
407
0
  // It is tempting to implement this method in terms of ReadSegments, but
408
0
  // that would prevent this class from being used with streams that only
409
0
  // implement Read (e.g., file streams).
410
0
411
0
  *aResult = 0;
412
0
413
0
  if (mStatus == NS_BASE_STREAM_CLOSED) {
414
0
    return NS_OK;
415
0
  }
416
0
  if (NS_FAILED(mStatus)) {
417
0
    return mStatus;
418
0
  }
419
0
420
0
  nsresult rv = NS_OK;
421
0
422
0
  uint32_t len = mStreams.Length();
423
0
  while (mCurrentStream < len && aCount) {
424
0
    uint32_t read;
425
0
    rv = mStreams[mCurrentStream].mStream->Read(aBuf, aCount, &read);
426
0
427
0
    // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
428
0
    // (This is a bug in those stream implementations)
429
0
    if (rv == NS_BASE_STREAM_CLOSED) {
430
0
      MOZ_ASSERT_UNREACHABLE("Input stream's Read method returned "
431
0
                             "NS_BASE_STREAM_CLOSED");
432
0
      rv = NS_OK;
433
0
      read = 0;
434
0
    } else if (NS_FAILED(rv)) {
435
0
      break;
436
0
    }
437
0
438
0
    if (read == 0) {
439
0
      ++mCurrentStream;
440
0
      mStartedReadingCurrent = false;
441
0
    } else {
442
0
      NS_ASSERTION(aCount >= read, "Read more than requested");
443
0
      *aResult += read;
444
0
      aCount -= read;
445
0
      aBuf += read;
446
0
      mStartedReadingCurrent = true;
447
0
    }
448
0
  }
449
0
  return *aResult ? NS_OK : rv;
450
0
}
451
452
NS_IMETHODIMP
453
nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
454
                                     uint32_t aCount, uint32_t* aResult)
455
0
{
456
0
  MutexAutoLock lock(mLock);
457
0
458
0
  if (mStatus == NS_BASE_STREAM_CLOSED) {
459
0
    *aResult = 0;
460
0
    return NS_OK;
461
0
  }
462
0
  if (NS_FAILED(mStatus)) {
463
0
    return mStatus;
464
0
  }
465
0
466
0
  NS_ASSERTION(aWriter, "missing aWriter");
467
0
468
0
  nsresult rv = NS_OK;
469
0
  ReadSegmentsState state;
470
0
  state.mThisStream = this;
471
0
  state.mOffset = 0;
472
0
  state.mWriter = aWriter;
473
0
  state.mClosure = aClosure;
474
0
  state.mDone = false;
475
0
476
0
  uint32_t len = mStreams.Length();
477
0
  while (mCurrentStream < len && aCount) {
478
0
    uint32_t read;
479
0
    rv = mStreams[mCurrentStream].mStream->ReadSegments(ReadSegCb, &state,
480
0
                                                        aCount, &read);
481
0
482
0
    // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
483
0
    // (This is a bug in those stream implementations)
484
0
    if (rv == NS_BASE_STREAM_CLOSED) {
485
0
      MOZ_ASSERT_UNREACHABLE("Input stream's Read method returned "
486
0
                             "NS_BASE_STREAM_CLOSED");
487
0
      rv = NS_OK;
488
0
      read = 0;
489
0
    }
490
0
491
0
    // if |aWriter| decided to stop reading segments...
492
0
    if (state.mDone || NS_FAILED(rv)) {
493
0
      break;
494
0
    }
495
0
496
0
    // if stream is empty, then advance to the next stream.
497
0
    if (read == 0) {
498
0
      ++mCurrentStream;
499
0
      mStartedReadingCurrent = false;
500
0
    } else {
501
0
      NS_ASSERTION(aCount >= read, "Read more than requested");
502
0
      state.mOffset += read;
503
0
      aCount -= read;
504
0
      mStartedReadingCurrent = true;
505
0
    }
506
0
  }
507
0
508
0
  // if we successfully read some data, then this call succeeded.
509
0
  *aResult = state.mOffset;
510
0
  return state.mOffset ? NS_OK : rv;
511
0
}
512
513
nsresult
514
nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
515
                                  const char* aFromRawSegment,
516
                                  uint32_t aToOffset, uint32_t aCount,
517
                                  uint32_t* aWriteCount)
518
0
{
519
0
  nsresult rv;
520
0
  ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
521
0
  rv = (state->mWriter)(state->mThisStream,
522
0
                        state->mClosure,
523
0
                        aFromRawSegment,
524
0
                        aToOffset + state->mOffset,
525
0
                        aCount,
526
0
                        aWriteCount);
527
0
  if (NS_FAILED(rv)) {
528
0
    state->mDone = true;
529
0
  }
530
0
  return rv;
531
0
}
532
533
NS_IMETHODIMP
534
nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking)
535
0
{
536
0
  MutexAutoLock lock(mLock);
537
0
538
0
  uint32_t len = mStreams.Length();
539
0
  if (len == 0) {
540
0
    // Claim to be non-blocking, since we won't block the caller.
541
0
    *aNonBlocking = true;
542
0
    return NS_OK;
543
0
  }
544
0
  for (uint32_t i = 0; i < len; ++i) {
545
0
    nsresult rv = mStreams[i].mStream->IsNonBlocking(aNonBlocking);
546
0
    if (NS_WARN_IF(NS_FAILED(rv))) {
547
0
      return rv;
548
0
    }
549
0
    // If one is blocking the entire stream becomes blocking.
550
0
    if (!*aNonBlocking) {
551
0
      return NS_OK;
552
0
    }
553
0
  }
554
0
  return NS_OK;
555
0
}
556
557
NS_IMETHODIMP
558
nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset)
559
0
{
560
0
  MutexAutoLock lock(mLock);
561
0
562
0
  if (NS_FAILED(mStatus)) {
563
0
    return mStatus;
564
0
  }
565
0
566
0
  nsresult rv;
567
0
568
0
  uint32_t oldCurrentStream = mCurrentStream;
569
0
  bool oldStartedReadingCurrent = mStartedReadingCurrent;
570
0
571
0
  if (aWhence == NS_SEEK_SET) {
572
0
    int64_t remaining = aOffset;
573
0
    if (aOffset == 0) {
574
0
      mCurrentStream = 0;
575
0
    }
576
0
    for (uint32_t i = 0; i < mStreams.Length(); ++i) {
577
0
      nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
578
0
      if (!stream) {
579
0
        return NS_ERROR_FAILURE;
580
0
      }
581
0
582
0
      // See if all remaining streams should be rewound
583
0
      if (remaining == 0) {
584
0
        if (i < oldCurrentStream ||
585
0
            (i == oldCurrentStream && oldStartedReadingCurrent)) {
586
0
          rv = stream->Seek(NS_SEEK_SET, 0);
587
0
          if (NS_WARN_IF(NS_FAILED(rv))) {
588
0
            return rv;
589
0
          }
590
0
          continue;
591
0
        } else {
592
0
          break;
593
0
        }
594
0
      }
595
0
596
0
      // Get position in current stream
597
0
      int64_t streamPos;
598
0
      if (i > oldCurrentStream ||
599
0
          (i == oldCurrentStream && !oldStartedReadingCurrent)) {
600
0
        streamPos = 0;
601
0
      } else {
602
0
        rv = TellMaybeSeek(stream, &streamPos);
603
0
        if (NS_WARN_IF(NS_FAILED(rv))) {
604
0
          return rv;
605
0
        }
606
0
      }
607
0
608
0
      // See if we need to seek current stream forward or backward
609
0
      if (remaining < streamPos) {
610
0
        rv = stream->Seek(NS_SEEK_SET, remaining);
611
0
        if (NS_WARN_IF(NS_FAILED(rv))) {
612
0
          return rv;
613
0
        }
614
0
615
0
        mCurrentStream = i;
616
0
        mStartedReadingCurrent = remaining != 0;
617
0
618
0
        remaining = 0;
619
0
      } else if (remaining > streamPos) {
620
0
        if (i < oldCurrentStream) {
621
0
          // We're already at end so no need to seek this stream
622
0
          remaining -= streamPos;
623
0
          NS_ASSERTION(remaining >= 0, "Remaining invalid");
624
0
        } else {
625
0
          uint64_t avail;
626
0
          rv = AvailableMaybeSeek(mStreams[i], &avail);
627
0
          if (NS_WARN_IF(NS_FAILED(rv))) {
628
0
            return rv;
629
0
          }
630
0
631
0
          int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
632
0
633
0
          rv = stream->Seek(NS_SEEK_SET, newPos);
634
0
          if (NS_WARN_IF(NS_FAILED(rv))) {
635
0
            return rv;
636
0
          }
637
0
638
0
          mCurrentStream = i;
639
0
          mStartedReadingCurrent = true;
640
0
641
0
          remaining -= newPos;
642
0
          NS_ASSERTION(remaining >= 0, "Remaining invalid");
643
0
        }
644
0
      } else {
645
0
        NS_ASSERTION(remaining == streamPos, "Huh?");
646
0
        MOZ_ASSERT(remaining != 0, "Zero remaining should be handled earlier");
647
0
        remaining = 0;
648
0
        mCurrentStream = i;
649
0
        mStartedReadingCurrent = true;
650
0
      }
651
0
    }
652
0
653
0
    return NS_OK;
654
0
  }
655
0
656
0
  if (aWhence == NS_SEEK_CUR && aOffset > 0) {
657
0
    int64_t remaining = aOffset;
658
0
    for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
659
0
      uint64_t avail;
660
0
      rv = AvailableMaybeSeek(mStreams[i], &avail);
661
0
      if (NS_WARN_IF(NS_FAILED(rv))) {
662
0
        return rv;
663
0
      }
664
0
665
0
      int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
666
0
667
0
      rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, seek);
668
0
      if (NS_WARN_IF(NS_FAILED(rv))) {
669
0
        return rv;
670
0
      }
671
0
672
0
      mCurrentStream = i;
673
0
      mStartedReadingCurrent = true;
674
0
675
0
      remaining -= seek;
676
0
    }
677
0
678
0
    return NS_OK;
679
0
  }
680
0
681
0
  if (aWhence == NS_SEEK_CUR && aOffset < 0) {
682
0
    int64_t remaining = -aOffset;
683
0
    for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
684
0
      int64_t pos;
685
0
      rv = TellMaybeSeek(mStreams[i].mSeekableStream, &pos);
686
0
      if (NS_WARN_IF(NS_FAILED(rv))) {
687
0
        return rv;
688
0
      }
689
0
690
0
      int64_t seek = XPCOM_MIN(pos, remaining);
691
0
692
0
      rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, -seek);
693
0
      if (NS_WARN_IF(NS_FAILED(rv))) {
694
0
        return rv;
695
0
      }
696
0
697
0
      mCurrentStream = i;
698
0
      mStartedReadingCurrent = seek != -pos;
699
0
700
0
      remaining -= seek;
701
0
    }
702
0
703
0
    return NS_OK;
704
0
  }
705
0
706
0
  if (aWhence == NS_SEEK_CUR) {
707
0
    NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
708
0
709
0
    return NS_OK;
710
0
  }
711
0
712
0
  if (aWhence == NS_SEEK_END) {
713
0
    if (aOffset > 0) {
714
0
      return NS_ERROR_INVALID_ARG;
715
0
    }
716
0
    int64_t remaining = aOffset;
717
0
    for (uint32_t i = mStreams.Length() - 1; i != (uint32_t)-1; --i) {
718
0
      nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
719
0
720
0
      // See if all remaining streams should be seeked to end
721
0
      if (remaining == 0) {
722
0
        if (i >= oldCurrentStream) {
723
0
          rv = stream->Seek(NS_SEEK_END, 0);
724
0
          if (NS_WARN_IF(NS_FAILED(rv))) {
725
0
            return rv;
726
0
          }
727
0
        } else {
728
0
          break;
729
0
        }
730
0
      }
731
0
732
0
      // Get position in current stream
733
0
      int64_t streamPos;
734
0
      if (i < oldCurrentStream) {
735
0
        streamPos = 0;
736
0
      } else {
737
0
        uint64_t avail;
738
0
        rv = AvailableMaybeSeek(mStreams[i], &avail);
739
0
        if (NS_WARN_IF(NS_FAILED(rv))) {
740
0
          return rv;
741
0
        }
742
0
743
0
        streamPos = avail;
744
0
      }
745
0
746
0
      // See if we have enough data in the current stream.
747
0
      if (DeprecatedAbs(remaining) < streamPos) {
748
0
        rv = stream->Seek(NS_SEEK_END, remaining);
749
0
        if (NS_WARN_IF(NS_FAILED(rv))) {
750
0
          return rv;
751
0
        }
752
0
753
0
        mCurrentStream = i;
754
0
        mStartedReadingCurrent = true;
755
0
756
0
        remaining = 0;
757
0
      } else if (DeprecatedAbs(remaining) > streamPos) {
758
0
        if (i > oldCurrentStream ||
759
0
            (i == oldCurrentStream && !oldStartedReadingCurrent)) {
760
0
          // We're already at start so no need to seek this stream
761
0
          remaining += streamPos;
762
0
        } else {
763
0
          int64_t avail;
764
0
          rv = TellMaybeSeek(stream, &avail);
765
0
          if (NS_WARN_IF(NS_FAILED(rv))) {
766
0
            return rv;
767
0
          }
768
0
769
0
          int64_t newPos = streamPos + XPCOM_MIN(avail, DeprecatedAbs(remaining));
770
0
771
0
          rv = stream->Seek(NS_SEEK_END, -newPos);
772
0
          if (NS_WARN_IF(NS_FAILED(rv))) {
773
0
            return rv;
774
0
          }
775
0
776
0
          mCurrentStream = i;
777
0
          mStartedReadingCurrent = true;
778
0
779
0
          remaining += newPos;
780
0
        }
781
0
      } else {
782
0
        NS_ASSERTION(remaining == streamPos, "Huh?");
783
0
        remaining = 0;
784
0
      }
785
0
    }
786
0
787
0
    return NS_OK;
788
0
  }
789
0
790
0
  // other Seeks not implemented yet
791
0
  return NS_ERROR_NOT_IMPLEMENTED;
792
0
}
793
794
NS_IMETHODIMP
795
nsMultiplexInputStream::Tell(int64_t* aResult)
796
0
{
797
0
  MutexAutoLock lock(mLock);
798
0
799
0
  if (NS_FAILED(mStatus)) {
800
0
    return mStatus;
801
0
  }
802
0
803
0
  nsresult rv;
804
0
  int64_t ret64 = 0;
805
0
  uint32_t i, last;
806
0
  last = mStartedReadingCurrent ? mCurrentStream + 1 : mCurrentStream;
807
0
  for (i = 0; i < last; ++i) {
808
0
    if (NS_WARN_IF(!mStreams[i].mSeekableStream)) {
809
0
      return NS_ERROR_NO_INTERFACE;
810
0
    }
811
0
812
0
    int64_t pos;
813
0
    rv = TellMaybeSeek(mStreams[i].mSeekableStream, &pos);
814
0
    if (NS_WARN_IF(NS_FAILED(rv))) {
815
0
      return rv;
816
0
    }
817
0
    ret64 += pos;
818
0
  }
819
0
  *aResult =  ret64;
820
0
821
0
  return NS_OK;
822
0
}
823
824
NS_IMETHODIMP
825
nsMultiplexInputStream::SetEOF()
826
0
{
827
0
  return NS_ERROR_NOT_IMPLEMENTED;
828
0
}
829
830
NS_IMETHODIMP
831
nsMultiplexInputStream::CloseWithStatus(nsresult aStatus)
832
0
{
833
0
  return Close();
834
0
}
835
836
// This class is used to inform nsMultiplexInputStream that it's time to execute
837
// the asyncWait callback.
838
class AsyncWaitRunnable final : public CancelableRunnable
839
{
840
  RefPtr<nsMultiplexInputStream> mStream;
841
842
public:
843
  static void
844
  Create(nsMultiplexInputStream* aStream, nsIEventTarget* aEventTarget)
845
0
  {
846
0
    RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(aStream);
847
0
    if (aEventTarget) {
848
0
      aEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
849
0
    } else {
850
0
      runnable->Run();
851
0
    }
852
0
  }
853
854
  NS_IMETHOD
855
  Run() override
856
0
  {
857
0
    mStream->AsyncWaitCompleted();
858
0
    return NS_OK;
859
0
  }
860
861
private:
862
  explicit AsyncWaitRunnable(nsMultiplexInputStream* aStream)
863
    : CancelableRunnable("AsyncWaitRunnable")
864
    , mStream(aStream)
865
0
  {
866
0
    MOZ_ASSERT(aStream);
867
0
  }
868
869
};
870
871
NS_IMETHODIMP
872
nsMultiplexInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
873
                                  uint32_t aFlags,
874
                                  uint32_t aRequestedCount,
875
                                  nsIEventTarget* aEventTarget)
876
0
{
877
0
  {
878
0
    MutexAutoLock lock(mLock);
879
0
880
0
    // We must execute the callback also when the stream is closed.
881
0
    if (NS_FAILED(mStatus) && mStatus != NS_BASE_STREAM_CLOSED) {
882
0
      return mStatus;
883
0
    }
884
0
885
0
    if (mAsyncWaitCallback && aCallback) {
886
0
      return NS_ERROR_FAILURE;
887
0
    }
888
0
889
0
    mAsyncWaitCallback = aCallback;
890
0
    mAsyncWaitFlags = aFlags;
891
0
    mAsyncWaitRequestedCount = aRequestedCount;
892
0
    mAsyncWaitEventTarget = aEventTarget;
893
0
894
0
    if (!mAsyncWaitCallback) {
895
0
        return NS_OK;
896
0
    }
897
0
  }
898
0
899
0
  return AsyncWaitInternal();
900
0
}
901
902
nsresult
903
nsMultiplexInputStream::AsyncWaitInternal()
904
0
{
905
0
  nsCOMPtr<nsIAsyncInputStream> stream;
906
0
  uint32_t asyncWaitFlags = 0;
907
0
  uint32_t asyncWaitRequestedCount = 0;
908
0
  nsCOMPtr<nsIEventTarget> asyncWaitEventTarget;
909
0
910
0
  {
911
0
    MutexAutoLock lock(mLock);
912
0
913
0
    // Let's take the first async stream if we are not already closed, and if
914
0
    // it has data to read or if it async.
915
0
    if (mStatus != NS_BASE_STREAM_CLOSED) {
916
0
      for (; mCurrentStream < mStreams.Length(); ++mCurrentStream) {
917
0
        stream = mStreams[mCurrentStream].mAsyncStream;
918
0
        if (stream) {
919
0
          break;
920
0
        }
921
0
922
0
        uint64_t avail = 0;
923
0
        nsresult rv = AvailableMaybeSeek(mStreams[mCurrentStream], &avail);
924
0
        if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) {
925
0
          // Nothing to read here. Let's move on.
926
0
          continue;
927
0
        }
928
0
929
0
        if (NS_FAILED(rv)) {
930
0
          return rv;
931
0
        }
932
0
933
0
        break;
934
0
      }
935
0
    }
936
0
937
0
    asyncWaitFlags = mAsyncWaitFlags;
938
0
    asyncWaitRequestedCount = mAsyncWaitRequestedCount;
939
0
    asyncWaitEventTarget = mAsyncWaitEventTarget;
940
0
  }
941
0
942
0
  MOZ_ASSERT_IF(stream, NS_SUCCEEDED(mStatus));
943
0
944
0
  // If we are here it's because we are already closed, or if the current stream
945
0
  // is not async. In both case we have to execute the callback.
946
0
  if (!stream) {
947
0
    AsyncWaitRunnable::Create(this, asyncWaitEventTarget);
948
0
    return NS_OK;
949
0
  }
950
0
951
0
  return stream->AsyncWait(this, asyncWaitFlags, asyncWaitRequestedCount,
952
0
                           asyncWaitEventTarget);
953
0
}
954
955
NS_IMETHODIMP
956
nsMultiplexInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
957
0
{
958
0
  nsCOMPtr<nsIInputStreamCallback> callback;
959
0
960
0
  // When OnInputStreamReady is called, we could be in 2 scenarios:
961
0
  // a. there is something to read;
962
0
  // b. the stream is closed.
963
0
  // But if the stream is closed and it was not the last one, we must proceed
964
0
  // with the following stream in order to have something to read by the callee.
965
0
966
0
  {
967
0
    MutexAutoLock lock(mLock);
968
0
969
0
    // The callback has been nullified in the meantime.
970
0
    if (!mAsyncWaitCallback) {
971
0
      return NS_OK;
972
0
    }
973
0
974
0
    if (NS_SUCCEEDED(mStatus)) {
975
0
      uint64_t avail = 0;
976
0
      nsresult rv = aStream->Available(&avail);
977
0
      if (rv == NS_BASE_STREAM_CLOSED || avail == 0) {
978
0
        // This stream is closed or empty, let's move to the following one.
979
0
        ++mCurrentStream;
980
0
        MutexAutoUnlock unlock(mLock);
981
0
        return AsyncWaitInternal();
982
0
      }
983
0
    }
984
0
985
0
    mAsyncWaitCallback.swap(callback);
986
0
    mAsyncWaitEventTarget = nullptr;
987
0
  }
988
0
989
0
  return callback->OnInputStreamReady(this);
990
0
}
991
992
void
993
nsMultiplexInputStream::AsyncWaitCompleted()
994
0
{
995
0
  nsCOMPtr<nsIInputStreamCallback> callback;
996
0
997
0
  {
998
0
    MutexAutoLock lock(mLock);
999
0
1000
0
    // The callback has been nullified in the meantime.
1001
0
    if (!mAsyncWaitCallback) {
1002
0
      return;
1003
0
    }
1004
0
1005
0
    mAsyncWaitCallback.swap(callback);
1006
0
    mAsyncWaitEventTarget = nullptr;
1007
0
  }
1008
0
1009
0
  callback->OnInputStreamReady(this);
1010
0
}
1011
1012
nsresult
1013
nsMultiplexInputStreamConstructor(nsISupports* aOuter,
1014
                                  REFNSIID aIID,
1015
                                  void** aResult)
1016
0
{
1017
0
  *aResult = nullptr;
1018
0
1019
0
  if (aOuter) {
1020
0
    return NS_ERROR_NO_AGGREGATION;
1021
0
  }
1022
0
1023
0
  RefPtr<nsMultiplexInputStream> inst = new nsMultiplexInputStream();
1024
0
1025
0
  return inst->QueryInterface(aIID, aResult);
1026
0
}
1027
1028
void
1029
nsMultiplexInputStream::Serialize(InputStreamParams& aParams,
1030
                                  FileDescriptorArray& aFileDescriptors)
1031
0
{
1032
0
  MutexAutoLock lock(mLock);
1033
0
1034
0
  MultiplexInputStreamParams params;
1035
0
1036
0
  uint32_t streamCount = mStreams.Length();
1037
0
1038
0
  if (streamCount) {
1039
0
    InfallibleTArray<InputStreamParams>& streams = params.streams();
1040
0
1041
0
    streams.SetCapacity(streamCount);
1042
0
    for (uint32_t index = 0; index < streamCount; index++) {
1043
0
      InputStreamParams childStreamParams;
1044
0
      InputStreamHelper::SerializeInputStream(mStreams[index].mStream,
1045
0
                                              childStreamParams,
1046
0
                                              aFileDescriptors);
1047
0
1048
0
      streams.AppendElement(childStreamParams);
1049
0
    }
1050
0
  }
1051
0
1052
0
  params.currentStream() = mCurrentStream;
1053
0
  params.status() = mStatus;
1054
0
  params.startedReadingCurrent() = mStartedReadingCurrent;
1055
0
1056
0
  aParams = params;
1057
0
}
1058
1059
bool
1060
nsMultiplexInputStream::Deserialize(const InputStreamParams& aParams,
1061
                                    const FileDescriptorArray& aFileDescriptors)
1062
0
{
1063
0
  if (aParams.type() !=
1064
0
      InputStreamParams::TMultiplexInputStreamParams) {
1065
0
    NS_ERROR("Received unknown parameters from the other process!");
1066
0
    return false;
1067
0
  }
1068
0
1069
0
  const MultiplexInputStreamParams& params =
1070
0
    aParams.get_MultiplexInputStreamParams();
1071
0
1072
0
  const InfallibleTArray<InputStreamParams>& streams = params.streams();
1073
0
1074
0
  uint32_t streamCount = streams.Length();
1075
0
  for (uint32_t index = 0; index < streamCount; index++) {
1076
0
    nsCOMPtr<nsIInputStream> stream =
1077
0
      InputStreamHelper::DeserializeInputStream(streams[index],
1078
0
                                                aFileDescriptors);
1079
0
    if (!stream) {
1080
0
      NS_WARNING("Deserialize failed!");
1081
0
      return false;
1082
0
    }
1083
0
1084
0
    if (NS_FAILED(AppendStream(stream))) {
1085
0
      NS_WARNING("AppendStream failed!");
1086
0
      return false;
1087
0
    }
1088
0
  }
1089
0
1090
0
  mCurrentStream = params.currentStream();
1091
0
  mStatus = params.status();
1092
0
  mStartedReadingCurrent = params.startedReadingCurrent();
1093
0
1094
0
  return true;
1095
0
}
1096
1097
Maybe<uint64_t>
1098
nsMultiplexInputStream::ExpectedSerializedLength()
1099
0
{
1100
0
  MutexAutoLock lock(mLock);
1101
0
1102
0
  bool lengthValueExists = false;
1103
0
  uint64_t expectedLength = 0;
1104
0
  uint32_t streamCount = mStreams.Length();
1105
0
  for (uint32_t index = 0; index < streamCount; index++) {
1106
0
    nsCOMPtr<nsIIPCSerializableInputStream> stream =
1107
0
      do_QueryInterface(mStreams[index].mStream);
1108
0
    if (!stream) {
1109
0
      continue;
1110
0
    }
1111
0
    Maybe<uint64_t> length = stream->ExpectedSerializedLength();
1112
0
    if (length.isNothing()) {
1113
0
      continue;
1114
0
    }
1115
0
    lengthValueExists = true;
1116
0
    expectedLength += length.value();
1117
0
  }
1118
0
  return lengthValueExists ? Some(expectedLength) : Nothing();
1119
0
}
1120
1121
NS_IMETHODIMP
1122
nsMultiplexInputStream::GetCloneable(bool* aCloneable)
1123
0
{
1124
0
  MutexAutoLock lock(mLock);
1125
0
  //XXXnsm Cloning a multiplex stream which has started reading is not permitted
1126
0
  //right now.
1127
0
  if (mCurrentStream > 0 || mStartedReadingCurrent) {
1128
0
    *aCloneable = false;
1129
0
    return NS_OK;
1130
0
  }
1131
0
1132
0
  uint32_t len = mStreams.Length();
1133
0
  for (uint32_t i = 0; i < len; ++i) {
1134
0
    nsCOMPtr<nsICloneableInputStream> cis =
1135
0
      do_QueryInterface(mStreams[i].mStream);
1136
0
    if (!cis || !cis->GetCloneable()) {
1137
0
      *aCloneable = false;
1138
0
      return NS_OK;
1139
0
    }
1140
0
  }
1141
0
1142
0
  *aCloneable = true;
1143
0
  return NS_OK;
1144
0
}
1145
1146
NS_IMETHODIMP
1147
nsMultiplexInputStream::Clone(nsIInputStream** aClone)
1148
0
{
1149
0
  MutexAutoLock lock(mLock);
1150
0
1151
0
  //XXXnsm Cloning a multiplex stream which has started reading is not permitted
1152
0
  //right now.
1153
0
  if (mCurrentStream > 0 || mStartedReadingCurrent) {
1154
0
    return NS_ERROR_FAILURE;
1155
0
  }
1156
0
1157
0
  RefPtr<nsMultiplexInputStream> clone = new nsMultiplexInputStream();
1158
0
1159
0
  nsresult rv;
1160
0
  uint32_t len = mStreams.Length();
1161
0
  for (uint32_t i = 0; i < len; ++i) {
1162
0
    nsCOMPtr<nsICloneableInputStream> substream =
1163
0
      do_QueryInterface(mStreams[i].mStream);
1164
0
    if (NS_WARN_IF(!substream)) {
1165
0
      return NS_ERROR_FAILURE;
1166
0
    }
1167
0
1168
0
    nsCOMPtr<nsIInputStream> clonedSubstream;
1169
0
    rv = substream->Clone(getter_AddRefs(clonedSubstream));
1170
0
    if (NS_WARN_IF(NS_FAILED(rv))) {
1171
0
      return rv;
1172
0
    }
1173
0
1174
0
    rv = clone->AppendStream(clonedSubstream);
1175
0
    if (NS_WARN_IF(NS_FAILED(rv))) {
1176
0
      return rv;
1177
0
    }
1178
0
  }
1179
0
1180
0
  clone.forget(aClone);
1181
0
  return NS_OK;
1182
0
}
1183
1184
NS_IMETHODIMP
1185
nsMultiplexInputStream::Length(int64_t* aLength)
1186
0
{
1187
0
  MutexAutoLock lock(mLock);
1188
0
1189
0
  if (mCurrentStream > 0 || mStartedReadingCurrent) {
1190
0
    return NS_ERROR_NOT_AVAILABLE;
1191
0
  }
1192
0
1193
0
  CheckedInt64 length = 0;
1194
0
  nsresult retval = NS_OK;
1195
0
1196
0
  for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
1197
0
    nsCOMPtr<nsIInputStreamLength> substream =
1198
0
      do_QueryInterface(mStreams[i].mStream);
1199
0
    if (!substream) {
1200
0
      // Let's use available as fallback.
1201
0
      uint64_t streamAvail = 0;
1202
0
      nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
1203
0
      if (rv == NS_BASE_STREAM_CLOSED) {
1204
0
        continue;
1205
0
      }
1206
0
1207
0
      if (NS_WARN_IF(NS_FAILED(rv))) {
1208
0
        mStatus = rv;
1209
0
        return mStatus;
1210
0
      }
1211
0
1212
0
      length += streamAvail;
1213
0
      if (!length.isValid()) {
1214
0
        return NS_ERROR_OUT_OF_MEMORY;
1215
0
      }
1216
0
1217
0
      continue;
1218
0
    }
1219
0
1220
0
    int64_t size = 0;
1221
0
    nsresult rv = substream->Length(&size);
1222
0
    if (rv == NS_BASE_STREAM_CLOSED) {
1223
0
      continue;
1224
0
    }
1225
0
1226
0
    if (rv == NS_ERROR_NOT_AVAILABLE) {
1227
0
      return rv;
1228
0
    }
1229
0
1230
0
    // If one stream blocks, we all block.
1231
0
    if (rv != NS_BASE_STREAM_WOULD_BLOCK &&
1232
0
        NS_WARN_IF(NS_FAILED(rv))) {
1233
0
      return rv;
1234
0
    }
1235
0
1236
0
    // We want to return WOULD_BLOCK if there is 1 stream that blocks. But want
1237
0
    // to see if there are other streams with length = -1.
1238
0
    if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1239
0
      retval = NS_BASE_STREAM_WOULD_BLOCK;
1240
0
      continue;
1241
0
    }
1242
0
1243
0
    // If one of the stream doesn't know the size, we all don't know the size.
1244
0
    if (size == -1) {
1245
0
      *aLength = -1;
1246
0
      return NS_OK;
1247
0
    }
1248
0
1249
0
    length += size;
1250
0
    if (!length.isValid()) {
1251
0
      return NS_ERROR_OUT_OF_MEMORY;
1252
0
    }
1253
0
  }
1254
0
1255
0
  *aLength = length.value();
1256
0
  return retval;
1257
0
}
1258
1259
class nsMultiplexInputStream::AsyncWaitLengthHelper final : public nsIInputStreamLengthCallback
1260
1261
{
1262
public:
1263
  NS_DECL_ISUPPORTS
1264
1265
  AsyncWaitLengthHelper()
1266
    : mStreamNotified(false)
1267
    , mLength(0)
1268
    , mNegativeSize(false)
1269
0
  {}
1270
1271
  bool
1272
  AddStream(nsIAsyncInputStreamLength* aStream)
1273
0
  {
1274
0
    return mPendingStreams.AppendElement(aStream, fallible);
1275
0
  }
1276
1277
  bool
1278
  AddSize(int64_t aSize)
1279
0
  {
1280
0
    MOZ_ASSERT(!mNegativeSize);
1281
0
1282
0
    mLength += aSize;
1283
0
    return mLength.isValid();
1284
0
  }
1285
1286
  void
1287
  NegativeSize()
1288
0
  {
1289
0
    MOZ_ASSERT(!mNegativeSize);
1290
0
    mNegativeSize = true;
1291
0
  }
1292
1293
  nsresult
1294
  Proceed(nsMultiplexInputStream* aParentStream,
1295
          nsIEventTarget* aEventTarget,
1296
          const MutexAutoLock& aProofOfLock)
1297
0
  {
1298
0
    MOZ_ASSERT(!mStream);
1299
0
1300
0
    // If we don't need to wait, let's inform the callback immediately.
1301
0
    if (mPendingStreams.IsEmpty() || mNegativeSize) {
1302
0
      RefPtr<nsMultiplexInputStream> parentStream = aParentStream;
1303
0
      int64_t length = -1;
1304
0
      if (!mNegativeSize && mLength.isValid()) {
1305
0
         length = mLength.value();
1306
0
      }
1307
0
      nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
1308
0
        "AsyncWaitLengthHelper",
1309
0
        [parentStream, length]() {
1310
0
          MutexAutoLock lock(parentStream->GetLock());
1311
0
          parentStream->AsyncWaitCompleted(length, lock);
1312
0
        });
1313
0
      return aEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
1314
0
    }
1315
0
1316
0
    // Let's store the callback and the parent stream until we have
1317
0
    // notifications from the async length streams.
1318
0
1319
0
    mStream = aParentStream;
1320
0
1321
0
    // Let's activate all the pending streams.
1322
0
    for (nsIAsyncInputStreamLength* stream : mPendingStreams) {
1323
0
      nsresult rv = stream->AsyncLengthWait(this, aEventTarget);
1324
0
      if (rv == NS_BASE_STREAM_CLOSED) {
1325
0
        continue;
1326
0
      }
1327
0
1328
0
      if (NS_WARN_IF(NS_FAILED(rv))) {
1329
0
        return rv;
1330
0
      }
1331
0
    }
1332
0
1333
0
    return NS_OK;
1334
0
  }
1335
1336
  NS_IMETHOD
1337
  OnInputStreamLengthReady(nsIAsyncInputStreamLength* aStream,
1338
                           int64_t aLength) override
1339
0
  {
1340
0
    MutexAutoLock lock(mStream->GetLock());
1341
0
1342
0
    MOZ_ASSERT(mPendingStreams.Contains(aStream));
1343
0
    mPendingStreams.RemoveElement(aStream);
1344
0
1345
0
    // Already notified.
1346
0
    if (mStreamNotified) {
1347
0
      return NS_OK;
1348
0
    }
1349
0
1350
0
    if (aLength == -1) {
1351
0
      mNegativeSize = true;
1352
0
    } else {
1353
0
      mLength += aLength;
1354
0
      if (!mLength.isValid()) {
1355
0
        mNegativeSize = true;
1356
0
      }
1357
0
    }
1358
0
1359
0
    // We need to wait.
1360
0
    if (!mNegativeSize && !mPendingStreams.IsEmpty()) {
1361
0
      return NS_OK;
1362
0
    }
1363
0
1364
0
    // Let's notify the parent stream.
1365
0
    mStreamNotified = true;
1366
0
    mStream->AsyncWaitCompleted(mNegativeSize ? -1 : mLength.value(), lock);
1367
0
    return NS_OK;
1368
0
  }
1369
1370
private:
1371
0
  ~AsyncWaitLengthHelper() = default;
1372
1373
  RefPtr<nsMultiplexInputStream> mStream;
1374
  bool mStreamNotified;
1375
1376
  CheckedInt64 mLength;
1377
  bool mNegativeSize;
1378
1379
  nsTArray<nsCOMPtr<nsIAsyncInputStreamLength>> mPendingStreams;
1380
};
1381
1382
NS_IMPL_ISUPPORTS(nsMultiplexInputStream::AsyncWaitLengthHelper,
1383
                  nsIInputStreamLengthCallback)
1384
1385
NS_IMETHODIMP
1386
nsMultiplexInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback,
1387
                                        nsIEventTarget* aEventTarget)
1388
0
{
1389
0
  if (NS_WARN_IF(!aEventTarget)) {
1390
0
    return NS_ERROR_NULL_POINTER;
1391
0
  }
1392
0
1393
0
  MutexAutoLock lock(mLock);
1394
0
1395
0
  if (mCurrentStream > 0 || mStartedReadingCurrent) {
1396
0
    return NS_ERROR_NOT_AVAILABLE;
1397
0
  }
1398
0
1399
0
  if (!aCallback) {
1400
0
    mAsyncWaitLengthCallback = nullptr;
1401
0
    return NS_OK;
1402
0
  }
1403
0
1404
0
  // We have a pending operation! Let's use this instead of creating a new one.
1405
0
  if (mAsyncWaitLengthHelper) {
1406
0
   mAsyncWaitLengthCallback = aCallback;
1407
0
   return NS_OK;
1408
0
  }
1409
0
1410
0
  RefPtr<AsyncWaitLengthHelper> helper = new AsyncWaitLengthHelper();
1411
0
1412
0
  for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
1413
0
    nsCOMPtr<nsIAsyncInputStreamLength> asyncStream =
1414
0
      do_QueryInterface(mStreams[i].mStream);
1415
0
    if (asyncStream) {
1416
0
      if (NS_WARN_IF(!helper->AddStream(asyncStream))) {
1417
0
        return NS_ERROR_OUT_OF_MEMORY;
1418
0
      }
1419
0
      continue;
1420
0
    }
1421
0
1422
0
    nsCOMPtr<nsIInputStreamLength> stream =
1423
0
      do_QueryInterface(mStreams[i].mStream);
1424
0
    if (!stream) {
1425
0
      // Let's use available as fallback.
1426
0
      uint64_t streamAvail = 0;
1427
0
      nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
1428
0
      if (rv == NS_BASE_STREAM_CLOSED) {
1429
0
        continue;
1430
0
      }
1431
0
1432
0
      if (NS_WARN_IF(NS_FAILED(rv))) {
1433
0
        mStatus = rv;
1434
0
        return mStatus;
1435
0
      }
1436
0
1437
0
      if (NS_WARN_IF(!helper->AddSize(streamAvail))) {
1438
0
        return NS_ERROR_OUT_OF_MEMORY;
1439
0
      }
1440
0
1441
0
      continue;
1442
0
    }
1443
0
1444
0
    int64_t size = 0;
1445
0
    nsresult rv = stream->Length(&size);
1446
0
    if (rv == NS_BASE_STREAM_CLOSED) {
1447
0
      continue;
1448
0
    }
1449
0
1450
0
    MOZ_ASSERT(rv != NS_BASE_STREAM_WOULD_BLOCK,
1451
0
               "A nsILengthInutStream returns NS_BASE_STREAM_WOULD_BLOCK but it doesn't implement nsIAsyncInputStreamLength.");
1452
0
1453
0
    if (NS_WARN_IF(NS_FAILED(rv))) {
1454
0
      return rv;
1455
0
    }
1456
0
1457
0
    if (size == -1) {
1458
0
      helper->NegativeSize();
1459
0
      break;
1460
0
    }
1461
0
1462
0
    if (NS_WARN_IF(!helper->AddSize(size))) {
1463
0
      return NS_ERROR_OUT_OF_MEMORY;
1464
0
    }
1465
0
  }
1466
0
1467
0
  nsresult rv = helper->Proceed(this, aEventTarget, lock);
1468
0
  if (NS_WARN_IF(NS_FAILED(rv))) {
1469
0
    return rv;
1470
0
  }
1471
0
1472
0
  mAsyncWaitLengthHelper = helper;
1473
0
  mAsyncWaitLengthCallback = aCallback;
1474
0
  return NS_OK;
1475
0
}
1476
1477
void
1478
nsMultiplexInputStream::AsyncWaitCompleted(int64_t aLength,
1479
                                           const MutexAutoLock& aProofOfLock)
1480
0
{
1481
0
  nsCOMPtr<nsIInputStreamLengthCallback> callback;
1482
0
  callback.swap(mAsyncWaitLengthCallback);
1483
0
1484
0
  mAsyncWaitLengthHelper = nullptr;
1485
0
1486
0
  // Already canceled.
1487
0
  if (!callback) {
1488
0
    return;
1489
0
  }
1490
0
1491
0
  MutexAutoUnlock unlock(mLock);
1492
0
  callback->OnInputStreamLengthReady(this, aLength);
1493
0
}
1494
1495
#define MAYBE_UPDATE_VALUE_REAL(x, y) \
1496
0
  if (y) {                            \
1497
0
    if (aCount == 1) {                \
1498
0
      ++x;                            \
1499
0
    } else if (x > 0) {               \
1500
0
      --x;                            \
1501
0
    } else {                          \
1502
0
      MOZ_CRASH("A nsIInputStream changed QI map when stored in a nsMultiplexInputStream!"); \
1503
0
    }                                 \
1504
0
  }
1505
1506
#define MAYBE_UPDATE_VALUE(x, y)                                \
1507
0
  {                                                             \
1508
0
    nsCOMPtr<y> substream = do_QueryInterface(aStream.mStream); \
1509
0
    MAYBE_UPDATE_VALUE_REAL(x, substream)                       \
1510
0
  }
1511
1512
void
1513
nsMultiplexInputStream::UpdateQIMap(StreamData& aStream, int32_t aCount)
1514
0
{
1515
0
  MOZ_ASSERT(aCount == -1 || aCount == 1);
1516
0
1517
0
  MAYBE_UPDATE_VALUE_REAL(mSeekableStreams, aStream.mSeekableStream)
1518
0
  MAYBE_UPDATE_VALUE(mIPCSerializableStreams, nsIIPCSerializableInputStream)
1519
0
  MAYBE_UPDATE_VALUE(mCloneableStreams, nsICloneableInputStream)
1520
0
  MAYBE_UPDATE_VALUE_REAL(mAsyncInputStreams, aStream.mAsyncStream)
1521
0
  MAYBE_UPDATE_VALUE(mInputStreamLengths, nsIInputStreamLength)
1522
0
  MAYBE_UPDATE_VALUE(mAsyncInputStreamLengths, nsIAsyncInputStreamLength)
1523
0
}
1524
1525
#undef MAYBE_UPDATE_VALUE
1526
1527
bool
1528
nsMultiplexInputStream::IsSeekable() const
1529
0
{
1530
0
  return mStreams.Length() == mSeekableStreams;
1531
0
}
1532
1533
bool
1534
nsMultiplexInputStream::IsIPCSerializable() const
1535
0
{
1536
0
  return mStreams.Length() == mIPCSerializableStreams;
1537
0
}
1538
1539
bool
1540
nsMultiplexInputStream::IsCloneable() const
1541
0
{
1542
0
  return mStreams.Length() == mCloneableStreams;
1543
0
}
1544
1545
bool
1546
nsMultiplexInputStream::IsAsyncInputStream() const
1547
0
{
1548
0
  // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
1549
0
  // substream implements that interface.
1550
0
  return !!mAsyncInputStreams;
1551
0
}
1552
1553
bool
1554
nsMultiplexInputStream::IsInputStreamLength() const
1555
0
{
1556
0
  return !!mInputStreamLengths;
1557
0
}
1558
1559
bool
1560
nsMultiplexInputStream::IsAsyncInputStreamLength() const
1561
0
{
1562
0
  return !!mAsyncInputStreamLengths;
1563
0
}