Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/xpcom/io/nsStreamUtils.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this
5
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include "mozilla/Mutex.h"
8
#include "mozilla/Attributes.h"
9
#include "nsStreamUtils.h"
10
#include "nsAutoPtr.h"
11
#include "nsCOMPtr.h"
12
#include "nsIPipe.h"
13
#include "nsICloneableInputStream.h"
14
#include "nsIEventTarget.h"
15
#include "nsICancelableRunnable.h"
16
#include "nsISafeOutputStream.h"
17
#include "nsString.h"
18
#include "nsIAsyncInputStream.h"
19
#include "nsIAsyncOutputStream.h"
20
#include "nsIBufferedStreams.h"
21
#include "nsNetCID.h"
22
#include "nsServiceManagerUtils.h"
23
#include "nsThreadUtils.h"
24
#include "nsITransport.h"
25
#include "nsIStreamTransportService.h"
26
#include "NonBlockingAsyncInputStream.h"
27
28
using namespace mozilla;
29
30
static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
31
32
//-----------------------------------------------------------------------------
33
34
// This is a nsICancelableRunnable because we can dispatch it to Workers and
35
// those can be shut down at any time, and in these cases, Cancel() is called
36
// instead of Run().
37
class nsInputStreamReadyEvent final
38
  : public CancelableRunnable
39
  , public nsIInputStreamCallback
40
{
41
public:
42
  NS_DECL_ISUPPORTS_INHERITED
43
44
    nsInputStreamReadyEvent(const char* aName,
45
                            nsIInputStreamCallback* aCallback,
46
                            nsIEventTarget* aTarget)
47
    : CancelableRunnable(aName)
48
    , mCallback(aCallback)
49
    , mTarget(aTarget)
50
0
  {
51
0
  }
52
53
private:
54
  ~nsInputStreamReadyEvent()
55
0
  {
56
0
    if (!mCallback) {
57
0
      return;
58
0
    }
59
0
    //
60
0
    // whoa!!  looks like we never posted this event.  take care to
61
0
    // release mCallback on the correct thread.  if mTarget lives on the
62
0
    // calling thread, then we are ok.  otherwise, we have to try to
63
0
    // proxy the Release over the right thread.  if that thread is dead,
64
0
    // then there's nothing we can do... better to leak than crash.
65
0
    //
66
0
    bool val;
67
0
    nsresult rv = mTarget->IsOnCurrentThread(&val);
68
0
    if (NS_FAILED(rv) || !val) {
69
0
      nsCOMPtr<nsIInputStreamCallback> event =
70
0
        NS_NewInputStreamReadyEvent("~nsInputStreamReadyEvent", mCallback, mTarget);
71
0
      mCallback = nullptr;
72
0
      if (event) {
73
0
        rv = event->OnInputStreamReady(nullptr);
74
0
        if (NS_FAILED(rv)) {
75
0
          MOZ_ASSERT_UNREACHABLE("leaking stream event");
76
0
          nsISupports* sup = event;
77
0
          NS_ADDREF(sup);
78
0
        }
79
0
      }
80
0
    }
81
0
  }
82
83
public:
84
  NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override
85
0
  {
86
0
    mStream = aStream;
87
0
88
0
    nsresult rv =
89
0
      mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
90
0
    if (NS_FAILED(rv)) {
91
0
      NS_WARNING("Dispatch failed");
92
0
      return NS_ERROR_FAILURE;
93
0
    }
94
0
95
0
    return NS_OK;
96
0
  }
97
98
  NS_IMETHOD Run() override
99
0
  {
100
0
    if (mCallback) {
101
0
      if (mStream) {
102
0
        mCallback->OnInputStreamReady(mStream);
103
0
      }
104
0
      mCallback = nullptr;
105
0
    }
106
0
    return NS_OK;
107
0
  }
108
109
  nsresult Cancel() override
110
0
  {
111
0
    mCallback = nullptr;
112
0
    return NS_OK;
113
0
  }
114
115
private:
116
  nsCOMPtr<nsIAsyncInputStream>    mStream;
117
  nsCOMPtr<nsIInputStreamCallback> mCallback;
118
  nsCOMPtr<nsIEventTarget>         mTarget;
119
};
120
121
NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent, CancelableRunnable,
122
                            nsIInputStreamCallback)
123
124
//-----------------------------------------------------------------------------
125
126
// This is a nsICancelableRunnable because we can dispatch it to Workers and
127
// those can be shut down at any time, and in these cases, Cancel() is called
128
// instead of Run().
129
class nsOutputStreamReadyEvent final
130
  : public CancelableRunnable
131
  , public nsIOutputStreamCallback
132
{
133
public:
134
  NS_DECL_ISUPPORTS_INHERITED
135
136
  nsOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
137
                           nsIEventTarget* aTarget)
138
    : CancelableRunnable("nsOutputStreamReadyEvent")
139
    , mCallback(aCallback)
140
    , mTarget(aTarget)
141
0
  {
142
0
  }
143
144
private:
145
  ~nsOutputStreamReadyEvent()
146
0
  {
147
0
    if (!mCallback) {
148
0
      return;
149
0
    }
150
0
    //
151
0
    // whoa!!  looks like we never posted this event.  take care to
152
0
    // release mCallback on the correct thread.  if mTarget lives on the
153
0
    // calling thread, then we are ok.  otherwise, we have to try to
154
0
    // proxy the Release over the right thread.  if that thread is dead,
155
0
    // then there's nothing we can do... better to leak than crash.
156
0
    //
157
0
    bool val;
158
0
    nsresult rv = mTarget->IsOnCurrentThread(&val);
159
0
    if (NS_FAILED(rv) || !val) {
160
0
      nsCOMPtr<nsIOutputStreamCallback> event =
161
0
        NS_NewOutputStreamReadyEvent(mCallback, mTarget);
162
0
      mCallback = nullptr;
163
0
      if (event) {
164
0
        rv = event->OnOutputStreamReady(nullptr);
165
0
        if (NS_FAILED(rv)) {
166
0
          MOZ_ASSERT_UNREACHABLE("leaking stream event");
167
0
          nsISupports* sup = event;
168
0
          NS_ADDREF(sup);
169
0
        }
170
0
      }
171
0
    }
172
0
  }
173
174
public:
175
  NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override
176
0
  {
177
0
    mStream = aStream;
178
0
179
0
    nsresult rv =
180
0
      mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
181
0
    if (NS_FAILED(rv)) {
182
0
      NS_WARNING("PostEvent failed");
183
0
      return NS_ERROR_FAILURE;
184
0
    }
185
0
186
0
    return NS_OK;
187
0
  }
188
189
  NS_IMETHOD Run() override
190
0
  {
191
0
    if (mCallback) {
192
0
      if (mStream) {
193
0
        mCallback->OnOutputStreamReady(mStream);
194
0
      }
195
0
      mCallback = nullptr;
196
0
    }
197
0
    return NS_OK;
198
0
  }
199
200
  nsresult Cancel() override
201
0
  {
202
0
    mCallback = nullptr;
203
0
    return NS_OK;
204
0
  }
205
206
private:
207
  nsCOMPtr<nsIAsyncOutputStream>    mStream;
208
  nsCOMPtr<nsIOutputStreamCallback> mCallback;
209
  nsCOMPtr<nsIEventTarget>          mTarget;
210
};
211
212
NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent, CancelableRunnable,
213
                            nsIOutputStreamCallback)
214
215
//-----------------------------------------------------------------------------
216
217
already_AddRefed<nsIInputStreamCallback>
218
NS_NewInputStreamReadyEvent(const char* aName,
219
                            nsIInputStreamCallback* aCallback,
220
                            nsIEventTarget* aTarget)
221
0
{
222
0
  NS_ASSERTION(aCallback, "null callback");
223
0
  NS_ASSERTION(aTarget, "null target");
224
0
  RefPtr<nsInputStreamReadyEvent> ev =
225
0
    new nsInputStreamReadyEvent(aName, aCallback, aTarget);
226
0
  return ev.forget();
227
0
}
228
229
already_AddRefed<nsIOutputStreamCallback>
230
NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
231
                             nsIEventTarget* aTarget)
232
0
{
233
0
  NS_ASSERTION(aCallback, "null callback");
234
0
  NS_ASSERTION(aTarget, "null target");
235
0
  RefPtr<nsOutputStreamReadyEvent> ev =
236
0
    new nsOutputStreamReadyEvent(aCallback, aTarget);
237
0
  return ev.forget();
238
0
}
239
240
//-----------------------------------------------------------------------------
241
// NS_AsyncCopy implementation
242
243
// abstract stream copier...
244
class nsAStreamCopier
245
  : public nsIInputStreamCallback
246
  , public nsIOutputStreamCallback
247
  , public CancelableRunnable
248
{
249
public:
250
  NS_DECL_ISUPPORTS_INHERITED
251
252
  nsAStreamCopier()
253
    : CancelableRunnable("nsAStreamCopier")
254
    , mLock("nsAStreamCopier.mLock")
255
    , mCallback(nullptr)
256
    , mProgressCallback(nullptr)
257
    , mClosure(nullptr)
258
    , mChunkSize(0)
259
    , mEventInProcess(false)
260
    , mEventIsPending(false)
261
    , mCloseSource(true)
262
    , mCloseSink(true)
263
    , mCanceled(false)
264
    , mCancelStatus(NS_OK)
265
0
  {
266
0
  }
267
268
  // kick off the async copy...
269
  nsresult Start(nsIInputStream* aSource,
270
                 nsIOutputStream* aSink,
271
                 nsIEventTarget* aTarget,
272
                 nsAsyncCopyCallbackFun aCallback,
273
                 void* aClosure,
274
                 uint32_t aChunksize,
275
                 bool aCloseSource,
276
                 bool aCloseSink,
277
                 nsAsyncCopyProgressFun aProgressCallback)
278
0
  {
279
0
    mSource = aSource;
280
0
    mSink = aSink;
281
0
    mTarget = aTarget;
282
0
    mCallback = aCallback;
283
0
    mClosure = aClosure;
284
0
    mChunkSize = aChunksize;
285
0
    mCloseSource = aCloseSource;
286
0
    mCloseSink = aCloseSink;
287
0
    mProgressCallback = aProgressCallback;
288
0
289
0
    mAsyncSource = do_QueryInterface(mSource);
290
0
    mAsyncSink = do_QueryInterface(mSink);
291
0
292
0
    return PostContinuationEvent();
293
0
  }
294
295
  // implemented by subclasses, returns number of bytes copied and
296
  // sets source and sink condition before returning.
297
  virtual uint32_t DoCopy(nsresult* aSourceCondition,
298
                          nsresult* aSinkCondition) = 0;
299
300
  void Process()
301
0
  {
302
0
    if (!mSource || !mSink) {
303
0
      return;
304
0
    }
305
0
306
0
    nsresult cancelStatus;
307
0
    bool canceled;
308
0
    {
309
0
      MutexAutoLock lock(mLock);
310
0
      canceled = mCanceled;
311
0
      cancelStatus = mCancelStatus;
312
0
    }
313
0
314
0
    // If the copy was canceled before Process() was even called, then
315
0
    // sourceCondition and sinkCondition should be set to error results to
316
0
    // ensure we don't call Finish() on a canceled nsISafeOutputStream.
317
0
    MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error");
318
0
    nsresult sourceCondition = cancelStatus;
319
0
    nsresult sinkCondition = cancelStatus;
320
0
321
0
    // Copy data from the source to the sink until we hit failure or have
322
0
    // copied all the data.
323
0
    for (;;) {
324
0
      // Note: copyFailed will be true if the source or the sink have
325
0
      //       reported an error, or if we failed to write any bytes
326
0
      //       because we have consumed all of our data.
327
0
      bool copyFailed = false;
328
0
      if (!canceled) {
329
0
        uint32_t n = DoCopy(&sourceCondition, &sinkCondition);
330
0
        if (n > 0 && mProgressCallback) {
331
0
          mProgressCallback(mClosure, n);
332
0
        }
333
0
        copyFailed = NS_FAILED(sourceCondition) ||
334
0
                     NS_FAILED(sinkCondition) || n == 0;
335
0
336
0
        MutexAutoLock lock(mLock);
337
0
        canceled = mCanceled;
338
0
        cancelStatus = mCancelStatus;
339
0
      }
340
0
      if (copyFailed && !canceled) {
341
0
        if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
342
0
          // need to wait for more data from source.  while waiting for
343
0
          // more source data, be sure to observe failures on output end.
344
0
          mAsyncSource->AsyncWait(this, 0, 0, nullptr);
345
0
346
0
          if (mAsyncSink)
347
0
            mAsyncSink->AsyncWait(this,
348
0
                                  nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
349
0
                                  0, nullptr);
350
0
          break;
351
0
        } else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
352
0
          // need to wait for more room in the sink.  while waiting for
353
0
          // more room in the sink, be sure to observer failures on the
354
0
          // input end.
355
0
          mAsyncSink->AsyncWait(this, 0, 0, nullptr);
356
0
357
0
          if (mAsyncSource)
358
0
            mAsyncSource->AsyncWait(this,
359
0
                                    nsIAsyncInputStream::WAIT_CLOSURE_ONLY,
360
0
                                    0, nullptr);
361
0
          break;
362
0
        }
363
0
      }
364
0
      if (copyFailed || canceled) {
365
0
        if (mCloseSource) {
366
0
          // close source
367
0
          if (mAsyncSource)
368
0
            mAsyncSource->CloseWithStatus(
369
0
              canceled ? cancelStatus : sinkCondition);
370
0
          else {
371
0
            mSource->Close();
372
0
          }
373
0
        }
374
0
        mAsyncSource = nullptr;
375
0
        mSource = nullptr;
376
0
377
0
        if (mCloseSink) {
378
0
          // close sink
379
0
          if (mAsyncSink)
380
0
            mAsyncSink->CloseWithStatus(
381
0
              canceled ? cancelStatus : sourceCondition);
382
0
          else {
383
0
            // If we have an nsISafeOutputStream, and our
384
0
            // sourceCondition and sinkCondition are not set to a
385
0
            // failure state, finish writing.
386
0
            nsCOMPtr<nsISafeOutputStream> sostream =
387
0
              do_QueryInterface(mSink);
388
0
            if (sostream && NS_SUCCEEDED(sourceCondition) &&
389
0
                NS_SUCCEEDED(sinkCondition)) {
390
0
              sostream->Finish();
391
0
            } else {
392
0
              mSink->Close();
393
0
            }
394
0
          }
395
0
        }
396
0
        mAsyncSink = nullptr;
397
0
        mSink = nullptr;
398
0
399
0
        // notify state complete...
400
0
        if (mCallback) {
401
0
          nsresult status;
402
0
          if (!canceled) {
403
0
            status = sourceCondition;
404
0
            if (NS_SUCCEEDED(status)) {
405
0
              status = sinkCondition;
406
0
            }
407
0
            if (status == NS_BASE_STREAM_CLOSED) {
408
0
              status = NS_OK;
409
0
            }
410
0
          } else {
411
0
            status = cancelStatus;
412
0
          }
413
0
          mCallback(mClosure, status);
414
0
        }
415
0
        break;
416
0
      }
417
0
    }
418
0
  }
419
420
  nsresult Cancel(nsresult aReason)
421
0
  {
422
0
    MutexAutoLock lock(mLock);
423
0
    if (mCanceled) {
424
0
      return NS_ERROR_FAILURE;
425
0
    }
426
0
427
0
    if (NS_SUCCEEDED(aReason)) {
428
0
      NS_WARNING("cancel with non-failure status code");
429
0
      aReason = NS_BASE_STREAM_CLOSED;
430
0
    }
431
0
432
0
    mCanceled = true;
433
0
    mCancelStatus = aReason;
434
0
    return NS_OK;
435
0
  }
436
437
  NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aSource) override
438
0
  {
439
0
    PostContinuationEvent();
440
0
    return NS_OK;
441
0
  }
442
443
  NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aSink) override
444
0
  {
445
0
    PostContinuationEvent();
446
0
    return NS_OK;
447
0
  }
448
449
  // continuation event handler
450
  NS_IMETHOD Run() override
451
0
  {
452
0
    Process();
453
0
454
0
    // clear "in process" flag and post any pending continuation event
455
0
    MutexAutoLock lock(mLock);
456
0
    mEventInProcess = false;
457
0
    if (mEventIsPending) {
458
0
      mEventIsPending = false;
459
0
      PostContinuationEvent_Locked();
460
0
    }
461
0
462
0
    return NS_OK;
463
0
  }
464
465
  nsresult Cancel() MOZ_MUST_OVERRIDE override = 0;
466
467
  nsresult PostContinuationEvent()
468
0
  {
469
0
    // we cannot post a continuation event if there is currently
470
0
    // an event in process.  doing so could result in Process being
471
0
    // run simultaneously on multiple threads, so we mark the event
472
0
    // as pending, and if an event is already in process then we
473
0
    // just let that existing event take care of posting the real
474
0
    // continuation event.
475
0
476
0
    MutexAutoLock lock(mLock);
477
0
    return PostContinuationEvent_Locked();
478
0
  }
479
480
  nsresult PostContinuationEvent_Locked()
481
0
  {
482
0
    nsresult rv = NS_OK;
483
0
    if (mEventInProcess) {
484
0
      mEventIsPending = true;
485
0
    } else {
486
0
      rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
487
0
      if (NS_SUCCEEDED(rv)) {
488
0
        mEventInProcess = true;
489
0
      } else {
490
0
        NS_WARNING("unable to post continuation event");
491
0
      }
492
0
    }
493
0
    return rv;
494
0
  }
495
496
protected:
497
  nsCOMPtr<nsIInputStream>       mSource;
498
  nsCOMPtr<nsIOutputStream>      mSink;
499
  nsCOMPtr<nsIAsyncInputStream>  mAsyncSource;
500
  nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
501
  nsCOMPtr<nsIEventTarget>       mTarget;
502
  Mutex                          mLock;
503
  nsAsyncCopyCallbackFun         mCallback;
504
  nsAsyncCopyProgressFun         mProgressCallback;
505
  void*                          mClosure;
506
  uint32_t                       mChunkSize;
507
  bool                           mEventInProcess;
508
  bool                           mEventIsPending;
509
  bool                           mCloseSource;
510
  bool                           mCloseSink;
511
  bool                           mCanceled;
512
  nsresult                       mCancelStatus;
513
514
  // virtual since subclasses call superclass Release()
515
  virtual ~nsAStreamCopier()
516
0
  {
517
0
  }
518
};
519
520
NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier,
521
                            CancelableRunnable,
522
                            nsIInputStreamCallback,
523
                            nsIOutputStreamCallback)
524
525
class nsStreamCopierIB final : public nsAStreamCopier
526
{
527
public:
528
  nsStreamCopierIB() : nsAStreamCopier()
529
0
  {
530
0
  }
531
  virtual ~nsStreamCopierIB()
532
0
  {
533
0
  }
534
535
  struct MOZ_STACK_CLASS ReadSegmentsState
536
  {
537
    // the nsIOutputStream will outlive the ReadSegmentsState on the stack
538
    nsIOutputStream* MOZ_NON_OWNING_REF mSink;
539
    nsresult         mSinkCondition;
540
  };
541
542
  static nsresult ConsumeInputBuffer(nsIInputStream* aInStr,
543
                                     void* aClosure,
544
                                     const char* aBuffer,
545
                                     uint32_t aOffset,
546
                                     uint32_t aCount,
547
                                     uint32_t* aCountWritten)
548
0
  {
549
0
    ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
550
0
551
0
    nsresult rv = state->mSink->Write(aBuffer, aCount, aCountWritten);
552
0
    if (NS_FAILED(rv)) {
553
0
      state->mSinkCondition = rv;
554
0
    } else if (*aCountWritten == 0) {
555
0
      state->mSinkCondition = NS_BASE_STREAM_CLOSED;
556
0
    }
557
0
558
0
    return state->mSinkCondition;
559
0
  }
560
561
  uint32_t DoCopy(nsresult* aSourceCondition,
562
                  nsresult* aSinkCondition) override
563
0
  {
564
0
    ReadSegmentsState state;
565
0
    state.mSink = mSink;
566
0
    state.mSinkCondition = NS_OK;
567
0
568
0
    uint32_t n;
569
0
    *aSourceCondition =
570
0
      mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
571
0
    *aSinkCondition = state.mSinkCondition;
572
0
    return n;
573
0
  }
574
575
  nsresult Cancel() override
576
0
  {
577
0
    return NS_OK;
578
0
  }
579
};
580
581
class nsStreamCopierOB final : public nsAStreamCopier
582
{
583
public:
584
  nsStreamCopierOB() : nsAStreamCopier()
585
0
  {
586
0
  }
587
  virtual ~nsStreamCopierOB()
588
0
  {
589
0
  }
590
591
  struct MOZ_STACK_CLASS WriteSegmentsState
592
  {
593
    // the nsIInputStream will outlive the WriteSegmentsState on the stack
594
    nsIInputStream* MOZ_NON_OWNING_REF mSource;
595
    nsresult        mSourceCondition;
596
  };
597
598
  static nsresult FillOutputBuffer(nsIOutputStream* aOutStr,
599
                                   void* aClosure,
600
                                   char* aBuffer,
601
                                   uint32_t aOffset,
602
                                   uint32_t aCount,
603
                                   uint32_t* aCountRead)
604
0
  {
605
0
    WriteSegmentsState* state = (WriteSegmentsState*)aClosure;
606
0
607
0
    nsresult rv = state->mSource->Read(aBuffer, aCount, aCountRead);
608
0
    if (NS_FAILED(rv)) {
609
0
      state->mSourceCondition = rv;
610
0
    } else if (*aCountRead == 0) {
611
0
      state->mSourceCondition = NS_BASE_STREAM_CLOSED;
612
0
    }
613
0
614
0
    return state->mSourceCondition;
615
0
  }
616
617
  uint32_t DoCopy(nsresult* aSourceCondition,
618
                  nsresult* aSinkCondition) override
619
0
  {
620
0
    WriteSegmentsState state;
621
0
    state.mSource = mSource;
622
0
    state.mSourceCondition = NS_OK;
623
0
624
0
    uint32_t n;
625
0
    *aSinkCondition =
626
0
      mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
627
0
    *aSourceCondition = state.mSourceCondition;
628
0
    return n;
629
0
  }
630
631
  nsresult Cancel() override
632
0
  {
633
0
    return NS_OK;
634
0
  }
635
};
636
637
//-----------------------------------------------------------------------------
638
639
nsresult
640
NS_AsyncCopy(nsIInputStream*         aSource,
641
             nsIOutputStream*        aSink,
642
             nsIEventTarget*         aTarget,
643
             nsAsyncCopyMode         aMode,
644
             uint32_t                aChunkSize,
645
             nsAsyncCopyCallbackFun  aCallback,
646
             void*                   aClosure,
647
             bool                    aCloseSource,
648
             bool                    aCloseSink,
649
             nsISupports**           aCopierCtx,
650
             nsAsyncCopyProgressFun  aProgressCallback)
651
0
{
652
0
  NS_ASSERTION(aTarget, "non-null target required");
653
0
654
0
  nsresult rv;
655
0
  nsAStreamCopier* copier;
656
0
657
0
  if (aMode == NS_ASYNCCOPY_VIA_READSEGMENTS) {
658
0
    copier = new nsStreamCopierIB();
659
0
  } else {
660
0
    copier = new nsStreamCopierOB();
661
0
  }
662
0
663
0
  // Start() takes an owning ref to the copier...
664
0
  NS_ADDREF(copier);
665
0
  rv = copier->Start(aSource, aSink, aTarget, aCallback, aClosure, aChunkSize,
666
0
                     aCloseSource, aCloseSink, aProgressCallback);
667
0
668
0
  if (aCopierCtx) {
669
0
    *aCopierCtx = static_cast<nsISupports*>(static_cast<nsIRunnable*>(copier));
670
0
    NS_ADDREF(*aCopierCtx);
671
0
  }
672
0
  NS_RELEASE(copier);
673
0
674
0
  return rv;
675
0
}
676
677
//-----------------------------------------------------------------------------
678
679
nsresult
680
NS_CancelAsyncCopy(nsISupports* aCopierCtx, nsresult aReason)
681
0
{
682
0
  nsAStreamCopier* copier =
683
0
    static_cast<nsAStreamCopier*>(static_cast<nsIRunnable *>(aCopierCtx));
684
0
  return copier->Cancel(aReason);
685
0
}
686
687
//-----------------------------------------------------------------------------
688
689
nsresult
690
NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
691
                 nsACString& aResult)
692
0
{
693
0
  nsresult rv = NS_OK;
694
0
  aResult.Truncate();
695
0
696
0
  while (aMaxCount) {
697
0
    uint64_t avail64;
698
0
    rv = aStream->Available(&avail64);
699
0
    if (NS_FAILED(rv)) {
700
0
      if (rv == NS_BASE_STREAM_CLOSED) {
701
0
        rv = NS_OK;
702
0
      }
703
0
      break;
704
0
    }
705
0
    if (avail64 == 0) {
706
0
      break;
707
0
    }
708
0
709
0
    uint32_t avail = (uint32_t)XPCOM_MIN<uint64_t>(avail64, aMaxCount);
710
0
711
0
    // resize aResult buffer
712
0
    uint32_t length = aResult.Length();
713
0
    if (avail > UINT32_MAX - length) {
714
0
      return NS_ERROR_FILE_TOO_BIG;
715
0
    }
716
0
717
0
    aResult.SetLength(length + avail);
718
0
    if (aResult.Length() != (length + avail)) {
719
0
      return NS_ERROR_OUT_OF_MEMORY;
720
0
    }
721
0
    char* buf = aResult.BeginWriting() + length;
722
0
723
0
    uint32_t n;
724
0
    rv = aStream->Read(buf, avail, &n);
725
0
    if (NS_FAILED(rv)) {
726
0
      break;
727
0
    }
728
0
    if (n != avail) {
729
0
      aResult.SetLength(length + n);
730
0
    }
731
0
    if (n == 0) {
732
0
      break;
733
0
    }
734
0
    aMaxCount -= n;
735
0
  }
736
0
737
0
  return rv;
738
0
}
739
740
//-----------------------------------------------------------------------------
741
742
static nsresult
743
TestInputStream(nsIInputStream* aInStr,
744
                void* aClosure,
745
                const char* aBuffer,
746
                uint32_t aOffset,
747
                uint32_t aCount,
748
                uint32_t* aCountWritten)
749
0
{
750
0
  bool* result = static_cast<bool*>(aClosure);
751
0
  *result = true;
752
0
  *aCountWritten = 0;
753
0
  return NS_ERROR_ABORT; // don't call me anymore
754
0
}
755
756
bool
757
NS_InputStreamIsBuffered(nsIInputStream* aStream)
758
0
{
759
0
  nsCOMPtr<nsIBufferedInputStream> bufferedIn = do_QueryInterface(aStream);
760
0
  if (bufferedIn) {
761
0
    return true;
762
0
  }
763
0
764
0
  bool result = false;
765
0
  uint32_t n;
766
0
  nsresult rv = aStream->ReadSegments(TestInputStream, &result, 1, &n);
767
0
  return result || NS_SUCCEEDED(rv);
768
0
}
769
770
static nsresult
771
TestOutputStream(nsIOutputStream* aOutStr,
772
                 void* aClosure,
773
                 char* aBuffer,
774
                 uint32_t aOffset,
775
                 uint32_t aCount,
776
                 uint32_t* aCountRead)
777
0
{
778
0
  bool* result = static_cast<bool*>(aClosure);
779
0
  *result = true;
780
0
  *aCountRead = 0;
781
0
  return NS_ERROR_ABORT; // don't call me anymore
782
0
}
783
784
bool
785
NS_OutputStreamIsBuffered(nsIOutputStream* aStream)
786
0
{
787
0
  nsCOMPtr<nsIBufferedOutputStream> bufferedOut = do_QueryInterface(aStream);
788
0
  if (bufferedOut) {
789
0
    return true;
790
0
  }
791
0
792
0
  bool result = false;
793
0
  uint32_t n;
794
0
  aStream->WriteSegments(TestOutputStream, &result, 1, &n);
795
0
  return result;
796
0
}
797
798
//-----------------------------------------------------------------------------
799
800
nsresult
801
NS_CopySegmentToStream(nsIInputStream* aInStr,
802
                       void* aClosure,
803
                       const char* aBuffer,
804
                       uint32_t aOffset,
805
                       uint32_t aCount,
806
                       uint32_t* aCountWritten)
807
0
{
808
0
  nsIOutputStream* outStr = static_cast<nsIOutputStream*>(aClosure);
809
0
  *aCountWritten = 0;
810
0
  while (aCount) {
811
0
    uint32_t n;
812
0
    nsresult rv = outStr->Write(aBuffer, aCount, &n);
813
0
    if (NS_FAILED(rv)) {
814
0
      return rv;
815
0
    }
816
0
    aBuffer += n;
817
0
    aCount -= n;
818
0
    *aCountWritten += n;
819
0
  }
820
0
  return NS_OK;
821
0
}
822
823
nsresult
824
NS_CopySegmentToBuffer(nsIInputStream* aInStr,
825
                       void* aClosure,
826
                       const char* aBuffer,
827
                       uint32_t aOffset,
828
                       uint32_t aCount,
829
                       uint32_t* aCountWritten)
830
0
{
831
0
  char* toBuf = static_cast<char*>(aClosure);
832
0
  memcpy(&toBuf[aOffset], aBuffer, aCount);
833
0
  *aCountWritten = aCount;
834
0
  return NS_OK;
835
0
}
836
837
nsresult
838
NS_CopySegmentToBuffer(nsIOutputStream* aOutStr,
839
                       void* aClosure,
840
                       char* aBuffer,
841
                       uint32_t aOffset,
842
                       uint32_t aCount,
843
                       uint32_t* aCountRead)
844
0
{
845
0
  const char* fromBuf = static_cast<const char*>(aClosure);
846
0
  memcpy(aBuffer, &fromBuf[aOffset], aCount);
847
0
  *aCountRead = aCount;
848
0
  return NS_OK;
849
0
}
850
851
nsresult
852
NS_DiscardSegment(nsIInputStream* aInStr,
853
                  void* aClosure,
854
                  const char* aBuffer,
855
                  uint32_t aOffset,
856
                  uint32_t aCount,
857
                  uint32_t* aCountWritten)
858
0
{
859
0
  *aCountWritten = aCount;
860
0
  return NS_OK;
861
0
}
862
863
//-----------------------------------------------------------------------------
864
865
nsresult
866
NS_WriteSegmentThunk(nsIInputStream* aInStr,
867
                     void* aClosure,
868
                     const char* aBuffer,
869
                     uint32_t aOffset,
870
                     uint32_t aCount,
871
                     uint32_t* aCountWritten)
872
0
{
873
0
  nsWriteSegmentThunk* thunk = static_cast<nsWriteSegmentThunk*>(aClosure);
874
0
  return thunk->mFun(thunk->mStream, thunk->mClosure, aBuffer, aOffset, aCount,
875
0
                     aCountWritten);
876
0
}
877
878
nsresult
879
NS_FillArray(FallibleTArray<char>& aDest, nsIInputStream* aInput,
880
             uint32_t aKeep, uint32_t* aNewBytes)
881
0
{
882
0
  MOZ_ASSERT(aInput, "null stream");
883
0
  MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count");
884
0
885
0
  char* aBuffer = aDest.Elements();
886
0
  int64_t keepOffset = int64_t(aDest.Length()) - aKeep;
887
0
  if (aKeep != 0 && keepOffset > 0) {
888
0
    memmove(aBuffer, aBuffer + keepOffset, aKeep);
889
0
  }
890
0
891
0
  nsresult rv =
892
0
    aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes);
893
0
  if (NS_FAILED(rv)) {
894
0
    *aNewBytes = 0;
895
0
  }
896
0
  // NOTE: we rely on the fact that the new slots are NOT initialized by
897
0
  // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
898
0
  // in nsTArray.h:
899
0
  aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes);
900
0
901
0
  MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow");
902
0
  return rv;
903
0
}
904
905
bool
906
NS_InputStreamIsCloneable(nsIInputStream* aSource)
907
0
{
908
0
  if (!aSource) {
909
0
    return false;
910
0
  }
911
0
912
0
  nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
913
0
  return cloneable && cloneable->GetCloneable();
914
0
}
915
916
nsresult
917
NS_CloneInputStream(nsIInputStream* aSource, nsIInputStream** aCloneOut,
918
                    nsIInputStream** aReplacementOut)
919
0
{
920
0
  if (NS_WARN_IF(!aSource)) {
921
0
    return NS_ERROR_FAILURE;
922
0
  }
923
0
924
0
  // Attempt to perform the clone directly on the source stream
925
0
  nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
926
0
  if (cloneable && cloneable->GetCloneable()) {
927
0
    if (aReplacementOut) {
928
0
      *aReplacementOut = nullptr;
929
0
    }
930
0
    return cloneable->Clone(aCloneOut);
931
0
  }
932
0
933
0
  // If we failed the clone and the caller does not want to replace their
934
0
  // original stream, then we are done.  Return error.
935
0
  if (!aReplacementOut) {
936
0
    return NS_ERROR_FAILURE;
937
0
  }
938
0
939
0
  // The caller has opted-in to the fallback clone support that replaces
940
0
  // the original stream.  Copy the data to a pipe and return two cloned
941
0
  // input streams.
942
0
943
0
  nsCOMPtr<nsIInputStream> reader;
944
0
  nsCOMPtr<nsIInputStream> readerClone;
945
0
  nsCOMPtr<nsIOutputStream> writer;
946
0
947
0
  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
948
0
                           0, 0,        // default segment size and max size
949
0
                           true, true); // non-blocking
950
0
  if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
951
0
952
0
  cloneable = do_QueryInterface(reader);
953
0
  MOZ_ASSERT(cloneable && cloneable->GetCloneable());
954
0
955
0
  rv = cloneable->Clone(getter_AddRefs(readerClone));
956
0
  if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
957
0
958
0
  nsCOMPtr<nsIEventTarget> target =
959
0
    do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
960
0
  if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
961
0
962
0
  rv = NS_AsyncCopy(aSource, writer, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS);
963
0
  if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
964
0
965
0
  readerClone.forget(aCloneOut);
966
0
  reader.forget(aReplacementOut);
967
0
968
0
  return NS_OK;
969
0
}
970
971
nsresult
972
NS_MakeAsyncNonBlockingInputStream(already_AddRefed<nsIInputStream> aSource,
973
                                   nsIAsyncInputStream** aAsyncInputStream)
974
0
{
975
0
  nsCOMPtr<nsIInputStream> source = std::move(aSource);
976
0
  if (NS_WARN_IF(!aAsyncInputStream)) {
977
0
    return NS_ERROR_FAILURE;
978
0
  }
979
0
980
0
  bool nonBlocking = false;
981
0
  nsresult rv = source->IsNonBlocking(&nonBlocking);
982
0
  if (NS_WARN_IF(NS_FAILED(rv))) {
983
0
    return rv;
984
0
  }
985
0
986
0
  nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(source);
987
0
988
0
  if (nonBlocking && asyncStream) {
989
0
    // This stream is perfect!
990
0
    asyncStream.forget(aAsyncInputStream);
991
0
    return NS_OK;
992
0
  }
993
0
994
0
  if (nonBlocking) {
995
0
    // If the stream is non-blocking but not async, we wrap it.
996
0
    return NonBlockingAsyncInputStream::Create(source.forget(),
997
0
                                               aAsyncInputStream);
998
0
  }
999
0
1000
0
  nsCOMPtr<nsIStreamTransportService> sts =
1001
0
    do_GetService(kStreamTransportServiceCID, &rv);
1002
0
  if (NS_WARN_IF(NS_FAILED(rv))) {
1003
0
    return rv;
1004
0
  }
1005
0
1006
0
  nsCOMPtr<nsITransport> transport;
1007
0
  rv = sts->CreateInputTransport(source,
1008
0
                                 /* aCloseWhenDone */ true,
1009
0
                                 getter_AddRefs(transport));
1010
0
  if (NS_WARN_IF(NS_FAILED(rv))) {
1011
0
    return rv;
1012
0
  }
1013
0
1014
0
  nsCOMPtr<nsIInputStream> wrapper;
1015
0
  rv = transport->OpenInputStream(/* aFlags */ 0,
1016
0
                                  /* aSegmentSize */ 0,
1017
0
                                  /* aSegmentCount */ 0,
1018
0
                                  getter_AddRefs(wrapper));
1019
0
  if (NS_WARN_IF(NS_FAILED(rv))) {
1020
0
    return rv;
1021
0
  }
1022
0
1023
0
  asyncStream = do_QueryInterface(wrapper);
1024
0
  MOZ_ASSERT(asyncStream);
1025
0
1026
0
  asyncStream.forget(aAsyncInputStream);
1027
0
  return NS_OK;
1028
0
}