Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/netwerk/base/nsInputStreamPump.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
/* vim:set ts=4 sts=4 sw=4 et cin: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this
5
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include "nsIOService.h"
8
#include "nsInputStreamPump.h"
9
#include "nsIStreamTransportService.h"
10
#include "nsISeekableStream.h"
11
#include "nsITransport.h"
12
#include "nsIThreadRetargetableStreamListener.h"
13
#include "nsThreadUtils.h"
14
#include "nsCOMPtr.h"
15
#include "mozilla/Logging.h"
16
#include "mozilla/NonBlockingAsyncInputStream.h"
17
#include "mozilla/SlicedInputStream.h"
18
#include "GeckoProfiler.h"
19
#include "nsIStreamListener.h"
20
#include "nsILoadGroup.h"
21
#include "nsNetCID.h"
22
#include "nsStreamUtils.h"
23
#include <algorithm>
24
25
static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
26
27
//
28
// MOZ_LOG=nsStreamPump:5
29
//
30
static mozilla::LazyLogModule gStreamPumpLog("nsStreamPump");
31
#undef LOG
32
0
#define LOG(args) MOZ_LOG(gStreamPumpLog, mozilla::LogLevel::Debug, args)
33
34
//-----------------------------------------------------------------------------
35
// nsInputStreamPump methods
36
//-----------------------------------------------------------------------------
37
38
nsInputStreamPump::nsInputStreamPump()
39
    : mState(STATE_IDLE)
40
    , mStreamOffset(0)
41
    , mStreamLength(0)
42
    , mSegSize(0)
43
    , mSegCount(0)
44
    , mStatus(NS_OK)
45
    , mSuspendCount(0)
46
    , mLoadFlags(LOAD_NORMAL)
47
    , mIsPending(false)
48
    , mProcessingCallbacks(false)
49
    , mWaitingForInputStreamReady(false)
50
    , mCloseWhenDone(false)
51
    , mRetargeting(false)
52
    , mAsyncStreamIsBuffered(false)
53
    , mMutex("nsInputStreamPump")
54
0
{
55
0
}
56
57
nsresult
58
nsInputStreamPump::Create(nsInputStreamPump  **result,
59
                          nsIInputStream      *stream,
60
                          uint32_t             segsize,
61
                          uint32_t             segcount,
62
                          bool                 closeWhenDone,
63
                          nsIEventTarget      *mainThreadTarget)
64
0
{
65
0
    nsresult rv = NS_ERROR_OUT_OF_MEMORY;
66
0
    RefPtr<nsInputStreamPump> pump = new nsInputStreamPump();
67
0
    if (pump) {
68
0
        rv = pump->Init(stream, segsize, segcount, closeWhenDone,
69
0
                        mainThreadTarget);
70
0
        if (NS_SUCCEEDED(rv)) {
71
0
            pump.forget(result);
72
0
        }
73
0
    }
74
0
    return rv;
75
0
}
76
77
struct PeekData {
78
  PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure)
79
0
    : mFunc(fun), mClosure(closure) {}
80
81
  nsInputStreamPump::PeekSegmentFun mFunc;
82
  void* mClosure;
83
};
84
85
static nsresult
86
CallPeekFunc(nsIInputStream *aInStream, void *aClosure,
87
             const char *aFromSegment, uint32_t aToOffset, uint32_t aCount,
88
             uint32_t *aWriteCount)
89
0
{
90
0
  NS_ASSERTION(aToOffset == 0, "Called more than once?");
91
0
  NS_ASSERTION(aCount > 0, "Called without data?");
92
0
93
0
  PeekData* data = static_cast<PeekData*>(aClosure);
94
0
  data->mFunc(data->mClosure,
95
0
              reinterpret_cast<const uint8_t*>(aFromSegment), aCount);
96
0
  return NS_BINDING_ABORTED;
97
0
}
98
99
nsresult
100
nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure)
101
0
{
102
0
  RecursiveMutexAutoLock lock(mMutex);
103
0
104
0
  MOZ_ASSERT(mAsyncStream, "PeekStream called without stream");
105
0
106
0
  nsresult rv = CreateBufferedStreamIfNeeded();
107
0
  NS_ENSURE_SUCCESS(rv, rv);
108
0
109
0
  // See if the pipe is closed by checking the return of Available.
110
0
  uint64_t dummy64;
111
0
  rv = mAsyncStream->Available(&dummy64);
112
0
  if (NS_FAILED(rv))
113
0
    return rv;
114
0
  uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX);
115
0
116
0
  PeekData data(callback, closure);
117
0
  return mAsyncStream->ReadSegments(CallPeekFunc,
118
0
                                    &data,
119
0
                                    net::nsIOService::gDefaultSegmentSize,
120
0
                                    &dummy);
121
0
}
122
123
nsresult
124
nsInputStreamPump::EnsureWaiting()
125
0
{
126
0
    mMutex.AssertCurrentThreadIn();
127
0
128
0
    // no need to worry about multiple threads... an input stream pump lives
129
0
    // on only one thread at a time.
130
0
    MOZ_ASSERT(mAsyncStream);
131
0
    if (!mWaitingForInputStreamReady && !mProcessingCallbacks) {
132
0
        // Ensure OnStateStop is called on the main thread.
133
0
        if (mState == STATE_STOP) {
134
0
            nsCOMPtr<nsIEventTarget> mainThread = mLabeledMainThreadTarget
135
0
                ? mLabeledMainThreadTarget
136
0
                : do_AddRef(GetMainThreadEventTarget());
137
0
            if (mTargetThread != mainThread) {
138
0
                mTargetThread = mainThread;
139
0
            }
140
0
        }
141
0
        MOZ_ASSERT(mTargetThread);
142
0
        nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
143
0
        if (NS_FAILED(rv)) {
144
0
            NS_ERROR("AsyncWait failed");
145
0
            return rv;
146
0
        }
147
0
        // Any retargeting during STATE_START or START_TRANSFER is complete
148
0
        // after the call to AsyncWait; next callback wil be on mTargetThread.
149
0
        mRetargeting = false;
150
0
        mWaitingForInputStreamReady = true;
151
0
    }
152
0
    return NS_OK;
153
0
}
154
155
//-----------------------------------------------------------------------------
156
// nsInputStreamPump::nsISupports
157
//-----------------------------------------------------------------------------
158
159
// although this class can only be accessed from one thread at a time, we do
160
// allow its ownership to move from thread to thread, assuming the consumer
161
// understands the limitations of this.
162
NS_IMPL_ISUPPORTS(nsInputStreamPump,
163
                  nsIRequest,
164
                  nsIThreadRetargetableRequest,
165
                  nsIInputStreamCallback,
166
                  nsIInputStreamPump)
167
168
//-----------------------------------------------------------------------------
169
// nsInputStreamPump::nsIRequest
170
//-----------------------------------------------------------------------------
171
172
NS_IMETHODIMP
173
nsInputStreamPump::GetName(nsACString &result)
174
0
{
175
0
    RecursiveMutexAutoLock lock(mMutex);
176
0
177
0
    result.Truncate();
178
0
    return NS_OK;
179
0
}
180
181
NS_IMETHODIMP
182
nsInputStreamPump::IsPending(bool *result)
183
0
{
184
0
    RecursiveMutexAutoLock lock(mMutex);
185
0
186
0
    *result = (mState != STATE_IDLE);
187
0
    return NS_OK;
188
0
}
189
190
NS_IMETHODIMP
191
nsInputStreamPump::GetStatus(nsresult *status)
192
0
{
193
0
    RecursiveMutexAutoLock lock(mMutex);
194
0
195
0
    *status = mStatus;
196
0
    return NS_OK;
197
0
}
198
199
NS_IMETHODIMP
200
nsInputStreamPump::Cancel(nsresult status)
201
0
{
202
0
    MOZ_ASSERT(NS_IsMainThread());
203
0
204
0
    RecursiveMutexAutoLock lock(mMutex);
205
0
206
0
    LOG(("nsInputStreamPump::Cancel [this=%p status=%" PRIx32 "]\n",
207
0
        this, static_cast<uint32_t>(status)));
208
0
209
0
    if (NS_FAILED(mStatus)) {
210
0
        LOG(("  already canceled\n"));
211
0
        return NS_OK;
212
0
    }
213
0
214
0
    NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
215
0
    mStatus = status;
216
0
217
0
    // close input stream
218
0
    if (mAsyncStream) {
219
0
        mAsyncStream->CloseWithStatus(status);
220
0
        if (mSuspendCount == 0)
221
0
            EnsureWaiting();
222
0
        // Otherwise, EnsureWaiting will be called by Resume().
223
0
        // Note that while suspended, OnInputStreamReady will
224
0
        // not do anything, and also note that calling asyncWait
225
0
        // on a closed stream works and will dispatch an event immediately.
226
0
    }
227
0
    return NS_OK;
228
0
}
229
230
NS_IMETHODIMP
231
nsInputStreamPump::Suspend()
232
0
{
233
0
    RecursiveMutexAutoLock lock(mMutex);
234
0
235
0
    LOG(("nsInputStreamPump::Suspend [this=%p]\n", this));
236
0
    NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
237
0
    ++mSuspendCount;
238
0
    return NS_OK;
239
0
}
240
241
NS_IMETHODIMP
242
nsInputStreamPump::Resume()
243
0
{
244
0
    RecursiveMutexAutoLock lock(mMutex);
245
0
246
0
    LOG(("nsInputStreamPump::Resume [this=%p]\n", this));
247
0
    NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
248
0
    NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
249
0
250
0
    // There is a brief in-between state when we null out mAsyncStream in
251
0
    // OnStateStop() before calling OnStopRequest, and only afterwards set
252
0
    // STATE_IDLE, which we need to handle gracefully.
253
0
    if (--mSuspendCount == 0 && mAsyncStream)
254
0
        EnsureWaiting();
255
0
    return NS_OK;
256
0
}
257
258
NS_IMETHODIMP
259
nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags)
260
0
{
261
0
    RecursiveMutexAutoLock lock(mMutex);
262
0
263
0
    *aLoadFlags = mLoadFlags;
264
0
    return NS_OK;
265
0
}
266
267
NS_IMETHODIMP
268
nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags)
269
0
{
270
0
    RecursiveMutexAutoLock lock(mMutex);
271
0
272
0
    mLoadFlags = aLoadFlags;
273
0
    return NS_OK;
274
0
}
275
276
NS_IMETHODIMP
277
nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup)
278
0
{
279
0
    RecursiveMutexAutoLock lock(mMutex);
280
0
281
0
    NS_IF_ADDREF(*aLoadGroup = mLoadGroup);
282
0
    return NS_OK;
283
0
}
284
285
NS_IMETHODIMP
286
nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup)
287
0
{
288
0
    RecursiveMutexAutoLock lock(mMutex);
289
0
290
0
    mLoadGroup = aLoadGroup;
291
0
    return NS_OK;
292
0
}
293
294
//-----------------------------------------------------------------------------
295
// nsInputStreamPump::nsIInputStreamPump implementation
296
//-----------------------------------------------------------------------------
297
298
NS_IMETHODIMP
299
nsInputStreamPump::Init(nsIInputStream *stream,
300
                        uint32_t segsize, uint32_t segcount,
301
                        bool closeWhenDone, nsIEventTarget *mainThreadTarget)
302
0
{
303
0
    NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
304
0
305
0
    mStream = stream;
306
0
    mSegSize = segsize;
307
0
    mSegCount = segcount;
308
0
    mCloseWhenDone = closeWhenDone;
309
0
    mLabeledMainThreadTarget = mainThreadTarget;
310
0
311
0
    return NS_OK;
312
0
}
313
314
NS_IMETHODIMP
315
nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
316
0
{
317
0
    RecursiveMutexAutoLock lock(mMutex);
318
0
319
0
    NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
320
0
    NS_ENSURE_ARG_POINTER(listener);
321
0
    MOZ_ASSERT(NS_IsMainThread(), "nsInputStreamPump should be read from the "
322
0
                                  "main thread only.");
323
0
324
0
    //
325
0
    // OK, we need to use the stream transport service if
326
0
    //
327
0
    // (1) the stream is blocking
328
0
    // (2) the stream does not support nsIAsyncInputStream
329
0
    //
330
0
331
0
    bool nonBlocking;
332
0
    nsresult rv = mStream->IsNonBlocking(&nonBlocking);
333
0
    if (NS_FAILED(rv)) return rv;
334
0
335
0
    if (nonBlocking) {
336
0
        mAsyncStream = do_QueryInterface(mStream);
337
0
        if (!mAsyncStream) {
338
0
            rv = NonBlockingAsyncInputStream::Create(mStream.forget(),
339
0
                                                     getter_AddRefs(mAsyncStream));
340
0
            if (NS_WARN_IF(NS_FAILED(rv))) return rv;
341
0
        }
342
0
        MOZ_ASSERT(mAsyncStream);
343
0
    }
344
0
345
0
    if (!mAsyncStream) {
346
0
        // ok, let's use the stream transport service to read this stream.
347
0
        nsCOMPtr<nsIStreamTransportService> sts =
348
0
            do_GetService(kStreamTransportServiceCID, &rv);
349
0
        if (NS_FAILED(rv)) return rv;
350
0
351
0
        nsCOMPtr<nsITransport> transport;
352
0
        rv = sts->CreateInputTransport(mStream, mCloseWhenDone, getter_AddRefs(transport));
353
0
        if (NS_FAILED(rv)) return rv;
354
0
355
0
        nsCOMPtr<nsIInputStream> wrapper;
356
0
        rv = transport->OpenInputStream(0, mSegSize, mSegCount, getter_AddRefs(wrapper));
357
0
        if (NS_FAILED(rv)) return rv;
358
0
359
0
        mAsyncStream = do_QueryInterface(wrapper, &rv);
360
0
        if (NS_FAILED(rv)) return rv;
361
0
    }
362
0
363
0
    // release our reference to the original stream.  from this point forward,
364
0
    // we only reference the "stream" via mAsyncStream.
365
0
    mStream = nullptr;
366
0
367
0
    // mStreamOffset now holds the number of bytes currently read.
368
0
    mStreamOffset = 0;
369
0
370
0
    // grab event queue (we must do this here by contract, since all notifications
371
0
    // must go to the thread which called AsyncRead)
372
0
    if (NS_IsMainThread() && mLabeledMainThreadTarget) {
373
0
        mTargetThread = mLabeledMainThreadTarget;
374
0
    } else {
375
0
        mTargetThread = GetCurrentThreadEventTarget();
376
0
    }
377
0
    NS_ENSURE_STATE(mTargetThread);
378
0
379
0
    rv = EnsureWaiting();
380
0
    if (NS_FAILED(rv)) return rv;
381
0
382
0
    if (mLoadGroup)
383
0
        mLoadGroup->AddRequest(this, nullptr);
384
0
385
0
    mState = STATE_START;
386
0
    mListener = listener;
387
0
    mListenerContext = ctxt;
388
0
    return NS_OK;
389
0
}
390
391
//-----------------------------------------------------------------------------
392
// nsInputStreamPump::nsIInputStreamCallback implementation
393
//-----------------------------------------------------------------------------
394
395
NS_IMETHODIMP
396
nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream *stream)
397
0
{
398
0
    LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this));
399
0
400
0
    AUTO_PROFILER_LABEL("nsInputStreamPump::OnInputStreamReady", NETWORK);
401
0
402
0
    // this function has been called from a PLEvent, so we can safely call
403
0
    // any listener or progress sink methods directly from here.
404
0
405
0
    for (;;) {
406
0
        // There should only be one iteration of this loop happening at a time.
407
0
        // To prevent AsyncWait() (called during callbacks or on other threads)
408
0
        // from creating a parallel OnInputStreamReady(), we use:
409
0
        // -- a mutex; and
410
0
        // -- a boolean mProcessingCallbacks to detect parallel loops
411
0
        //    when exiting the mutex for callbacks.
412
0
        RecursiveMutexAutoLock lock(mMutex);
413
0
414
0
        // Prevent parallel execution during callbacks, while out of mutex.
415
0
        if (mProcessingCallbacks) {
416
0
            MOZ_ASSERT(!mProcessingCallbacks);
417
0
            break;
418
0
        }
419
0
        mProcessingCallbacks = true;
420
0
        if (mSuspendCount || mState == STATE_IDLE) {
421
0
            mWaitingForInputStreamReady = false;
422
0
            mProcessingCallbacks = false;
423
0
            break;
424
0
        }
425
0
426
0
        uint32_t nextState;
427
0
        switch (mState) {
428
0
        case STATE_START:
429
0
            nextState = OnStateStart();
430
0
            break;
431
0
        case STATE_TRANSFER:
432
0
            nextState = OnStateTransfer();
433
0
            break;
434
0
        case STATE_STOP:
435
0
            mRetargeting = false;
436
0
            nextState = OnStateStop();
437
0
            break;
438
0
        default:
439
0
            nextState = 0;
440
0
            MOZ_ASSERT_UNREACHABLE("Unknown enum value.");
441
0
            return NS_ERROR_UNEXPECTED;
442
0
        }
443
0
444
0
        bool stillTransferring = (mState == STATE_TRANSFER &&
445
0
                                  nextState == STATE_TRANSFER);
446
0
        if (stillTransferring) {
447
0
            NS_ASSERTION(NS_SUCCEEDED(mStatus),
448
0
                         "Should not have failed status for ongoing transfer");
449
0
        } else {
450
0
            NS_ASSERTION(mState != nextState,
451
0
                         "Only OnStateTransfer can be called more than once.");
452
0
        }
453
0
        if (mRetargeting) {
454
0
            NS_ASSERTION(mState != STATE_STOP,
455
0
                         "Retargeting should not happen during OnStateStop.");
456
0
        }
457
0
458
0
        // Set mRetargeting so EnsureWaiting will be called. It ensures that
459
0
        // OnStateStop is called on the main thread.
460
0
        if (nextState == STATE_STOP && !NS_IsMainThread()) {
461
0
            mRetargeting = true;
462
0
        }
463
0
464
0
        // Unset mProcessingCallbacks here (while we have lock) so our own call to
465
0
        // EnsureWaiting isn't blocked by it.
466
0
        mProcessingCallbacks = false;
467
0
468
0
        // We must break the loop if suspended during one of the previous
469
0
        // operation.
470
0
        if (mSuspendCount) {
471
0
            mState = nextState;
472
0
            mWaitingForInputStreamReady = false;
473
0
            break;
474
0
        }
475
0
476
0
        // Wait asynchronously if there is still data to transfer, or we're
477
0
        // switching event delivery to another thread.
478
0
        if (stillTransferring || mRetargeting) {
479
0
            mState = nextState;
480
0
            mWaitingForInputStreamReady = false;
481
0
            nsresult rv = EnsureWaiting();
482
0
            if (NS_SUCCEEDED(rv))
483
0
                break;
484
0
485
0
            // Failure to start asynchronous wait: stop transfer.
486
0
            // Do not set mStatus if it was previously set to report a failure.
487
0
            if (NS_SUCCEEDED(mStatus)) {
488
0
                mStatus = rv;
489
0
            }
490
0
            nextState = STATE_STOP;
491
0
        }
492
0
493
0
        mState = nextState;
494
0
    }
495
0
    return NS_OK;
496
0
}
497
498
uint32_t
499
nsInputStreamPump::OnStateStart()
500
0
{
501
0
    mMutex.AssertCurrentThreadIn();
502
0
503
0
    AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStart", NETWORK);
504
0
505
0
    LOG(("  OnStateStart [this=%p]\n", this));
506
0
507
0
    nsresult rv;
508
0
509
0
    // need to check the reason why the stream is ready.  this is required
510
0
    // so our listener can check our status from OnStartRequest.
511
0
    // XXX async streams should have a GetStatus method!
512
0
    if (NS_SUCCEEDED(mStatus)) {
513
0
        uint64_t avail;
514
0
        rv = mAsyncStream->Available(&avail);
515
0
        if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED)
516
0
            mStatus = rv;
517
0
    }
518
0
519
0
    {
520
0
        // Note: Must exit mutex for call to OnStartRequest to avoid
521
0
        // deadlocks when calls to RetargetDeliveryTo for multiple
522
0
        // nsInputStreamPumps are needed (e.g. nsHttpChannel).
523
0
        RecursiveMutexAutoUnlock unlock(mMutex);
524
0
        rv = mListener->OnStartRequest(this, mListenerContext);
525
0
    }
526
0
527
0
    // an error returned from OnStartRequest should cause us to abort; however,
528
0
    // we must not stomp on mStatus if already canceled.
529
0
    if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus))
530
0
        mStatus = rv;
531
0
532
0
    return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
533
0
}
534
535
uint32_t
536
nsInputStreamPump::OnStateTransfer()
537
0
{
538
0
    mMutex.AssertCurrentThreadIn();
539
0
540
0
    AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateTransfer", NETWORK);
541
0
542
0
    LOG(("  OnStateTransfer [this=%p]\n", this));
543
0
544
0
    // if canceled, go directly to STATE_STOP...
545
0
    if (NS_FAILED(mStatus))
546
0
        return STATE_STOP;
547
0
548
0
    nsresult rv = CreateBufferedStreamIfNeeded();
549
0
    if (NS_WARN_IF(NS_FAILED(rv))) {
550
0
      return STATE_STOP;
551
0
    }
552
0
553
0
    uint64_t avail;
554
0
    rv = mAsyncStream->Available(&avail);
555
0
    LOG(("  Available returned [stream=%p rv=%" PRIx32 " avail=%" PRIu64 "]\n", mAsyncStream.get(),
556
0
         static_cast<uint32_t>(rv), avail));
557
0
558
0
    if (rv == NS_BASE_STREAM_CLOSED) {
559
0
        rv = NS_OK;
560
0
        avail = 0;
561
0
    }
562
0
    else if (NS_SUCCEEDED(rv) && avail) {
563
0
        // we used to limit avail to 16K - we were afraid some ODA handlers
564
0
        // might assume they wouldn't get more than 16K at once
565
0
        // we're removing that limit since it speeds up local file access.
566
0
        // Now there's an implicit 64K limit of 4 16K segments
567
0
        // NOTE: ok, so the story is as follows.  OnDataAvailable impls
568
0
        //       are by contract supposed to consume exactly |avail| bytes.
569
0
        //       however, many do not... mailnews... stream converters...
570
0
        //       cough, cough.  the input stream pump is fairly tolerant
571
0
        //       in this regard; however, if an ODA does not consume any
572
0
        //       data from the stream, then we could potentially end up in
573
0
        //       an infinite loop.  we do our best here to try to catch
574
0
        //       such an error.  (see bug 189672)
575
0
576
0
        // in most cases this QI will succeed (mAsyncStream is almost always
577
0
        // a nsPipeInputStream, which implements nsISeekableStream::Tell).
578
0
        int64_t offsetBefore;
579
0
        nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mAsyncStream);
580
0
        if (seekable && NS_FAILED(seekable->Tell(&offsetBefore))) {
581
0
            MOZ_ASSERT_UNREACHABLE("Tell failed on readable stream");
582
0
            offsetBefore = 0;
583
0
        }
584
0
585
0
        uint32_t odaAvail =
586
0
            avail > UINT32_MAX ?
587
0
            UINT32_MAX : uint32_t(avail);
588
0
589
0
        LOG(("  calling OnDataAvailable [offset=%" PRIu64 " count=%" PRIu64 "(%u)]\n",
590
0
            mStreamOffset, avail, odaAvail));
591
0
592
0
        {
593
0
            // Note: Must exit mutex for call to OnStartRequest to avoid
594
0
            // deadlocks when calls to RetargetDeliveryTo for multiple
595
0
            // nsInputStreamPumps are needed (e.g. nsHttpChannel).
596
0
            RecursiveMutexAutoUnlock unlock(mMutex);
597
0
            rv = mListener->OnDataAvailable(this, mListenerContext,
598
0
                                            mAsyncStream, mStreamOffset,
599
0
                                            odaAvail);
600
0
        }
601
0
602
0
        // don't enter this code if ODA failed or called Cancel
603
0
        if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
604
0
            // test to see if this ODA failed to consume data
605
0
            if (seekable) {
606
0
                // NOTE: if Tell fails, which can happen if the stream is
607
0
                // now closed, then we assume that everything was read.
608
0
                int64_t offsetAfter;
609
0
                if (NS_FAILED(seekable->Tell(&offsetAfter)))
610
0
                    offsetAfter = offsetBefore + odaAvail;
611
0
                if (offsetAfter > offsetBefore)
612
0
                    mStreamOffset += (offsetAfter - offsetBefore);
613
0
                else if (mSuspendCount == 0) {
614
0
                    //
615
0
                    // possible infinite loop if we continue pumping data!
616
0
                    //
617
0
                    // NOTE: although not allowed by nsIStreamListener, we
618
0
                    // will allow the ODA impl to Suspend the pump.  IMAP
619
0
                    // does this :-(
620
0
                    //
621
0
                    NS_ERROR("OnDataAvailable implementation consumed no data");
622
0
                    mStatus = NS_ERROR_UNEXPECTED;
623
0
                }
624
0
            }
625
0
            else
626
0
                mStreamOffset += odaAvail; // assume ODA behaved well
627
0
        }
628
0
    }
629
0
630
0
    // an error returned from Available or OnDataAvailable should cause us to
631
0
    // abort; however, we must not stop on mStatus if already canceled.
632
0
633
0
    if (NS_SUCCEEDED(mStatus)) {
634
0
        if (NS_FAILED(rv))
635
0
            mStatus = rv;
636
0
        else if (avail) {
637
0
            // if stream is now closed, advance to STATE_STOP right away.
638
0
            // Available may return 0 bytes available at the moment; that
639
0
            // would not mean that we are done.
640
0
            // XXX async streams should have a GetStatus method!
641
0
            rv = mAsyncStream->Available(&avail);
642
0
            if (NS_SUCCEEDED(rv))
643
0
                return STATE_TRANSFER;
644
0
            if (rv != NS_BASE_STREAM_CLOSED)
645
0
                mStatus = rv;
646
0
        }
647
0
    }
648
0
    return STATE_STOP;
649
0
}
650
651
nsresult
652
nsInputStreamPump::CallOnStateStop()
653
0
{
654
0
    RecursiveMutexAutoLock lock(mMutex);
655
0
656
0
    MOZ_ASSERT(NS_IsMainThread(),
657
0
               "CallOnStateStop should only be called on the main thread.");
658
0
659
0
    mState = OnStateStop();
660
0
    return NS_OK;
661
0
}
662
663
uint32_t
664
nsInputStreamPump::OnStateStop()
665
0
{
666
0
    mMutex.AssertCurrentThreadIn();
667
0
668
0
    if (!NS_IsMainThread()) {
669
0
        // This method can be called on a different thread if nsInputStreamPump
670
0
        // is used off the main-thread.
671
0
        nsresult rv = mLabeledMainThreadTarget->Dispatch(
672
0
          NewRunnableMethod("nsInputStreamPump::CallOnStateStop",
673
0
                            this,
674
0
                            &nsInputStreamPump::CallOnStateStop));
675
0
        NS_ENSURE_SUCCESS(rv, STATE_IDLE);
676
0
        return STATE_IDLE;
677
0
    }
678
0
679
0
    AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStop", NETWORK);
680
0
681
0
    LOG(("  OnStateStop [this=%p status=%" PRIx32 "]\n", this, static_cast<uint32_t>(mStatus)));
682
0
683
0
    // if an error occurred, we must be sure to pass the error onto the async
684
0
    // stream.  in some cases, this is redundant, but since close is idempotent,
685
0
    // this is OK.  otherwise, be sure to honor the "close-when-done" option.
686
0
687
0
    if (!mAsyncStream || !mListener) {
688
0
        MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?");
689
0
        MOZ_ASSERT(mListener, "null mListener: OnStateStop called twice?");
690
0
        return STATE_IDLE;
691
0
    }
692
0
693
0
    if (NS_FAILED(mStatus))
694
0
        mAsyncStream->CloseWithStatus(mStatus);
695
0
    else if (mCloseWhenDone)
696
0
        mAsyncStream->Close();
697
0
698
0
    mAsyncStream = nullptr;
699
0
    mTargetThread = nullptr;
700
0
    mIsPending = false;
701
0
    {
702
0
        // Note: Must exit mutex for call to OnStartRequest to avoid
703
0
        // deadlocks when calls to RetargetDeliveryTo for multiple
704
0
        // nsInputStreamPumps are needed (e.g. nsHttpChannel).
705
0
        RecursiveMutexAutoUnlock unlock(mMutex);
706
0
        mListener->OnStopRequest(this, mListenerContext, mStatus);
707
0
    }
708
0
    mListener = nullptr;
709
0
    mListenerContext = nullptr;
710
0
711
0
    if (mLoadGroup)
712
0
        mLoadGroup->RemoveRequest(this, nullptr, mStatus);
713
0
714
0
    return STATE_IDLE;
715
0
}
716
717
nsresult
718
nsInputStreamPump::CreateBufferedStreamIfNeeded()
719
0
{
720
0
  if (mAsyncStreamIsBuffered) {
721
0
    return NS_OK;
722
0
  }
723
0
724
0
  // ReadSegments is not available for any nsIAsyncInputStream. In order to use
725
0
  // it, we wrap a nsIBufferedInputStream around it, if needed.
726
0
727
0
  if (NS_InputStreamIsBuffered(mAsyncStream)) {
728
0
    mAsyncStreamIsBuffered = true;
729
0
    return NS_OK;
730
0
  }
731
0
732
0
  nsCOMPtr<nsIInputStream> stream;
733
0
  nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(stream),
734
0
                                          mAsyncStream.forget(), 4096);
735
0
  NS_ENSURE_SUCCESS(rv, rv);
736
0
737
0
  // A buffered inputStream must implement nsIAsyncInputStream.
738
0
  mAsyncStream = do_QueryInterface(stream);
739
0
  MOZ_DIAGNOSTIC_ASSERT(mAsyncStream);
740
0
  mAsyncStreamIsBuffered = true;
741
0
742
0
  return NS_OK;
743
0
}
744
745
//-----------------------------------------------------------------------------
746
// nsIThreadRetargetableRequest
747
//-----------------------------------------------------------------------------
748
749
NS_IMETHODIMP
750
nsInputStreamPump::RetargetDeliveryTo(nsIEventTarget* aNewTarget)
751
0
{
752
0
    RecursiveMutexAutoLock lock(mMutex);
753
0
754
0
    NS_ENSURE_ARG(aNewTarget);
755
0
    NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER,
756
0
                   NS_ERROR_UNEXPECTED);
757
0
758
0
    // If canceled, do not retarget. Return with canceled status.
759
0
    if (NS_FAILED(mStatus)) {
760
0
        return mStatus;
761
0
    }
762
0
763
0
    if (aNewTarget == mTargetThread) {
764
0
        NS_WARNING("Retargeting delivery to same thread");
765
0
        return NS_OK;
766
0
    }
767
0
768
0
    // Ensure that |mListener| and any subsequent listeners can be retargeted
769
0
    // to another thread.
770
0
    nsresult rv = NS_OK;
771
0
    nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener =
772
0
        do_QueryInterface(mListener, &rv);
773
0
    if (NS_SUCCEEDED(rv) && retargetableListener) {
774
0
        rv = retargetableListener->CheckListenerChain();
775
0
        if (NS_SUCCEEDED(rv)) {
776
0
            mTargetThread = aNewTarget;
777
0
            mRetargeting = true;
778
0
        }
779
0
    }
780
0
    LOG(("nsInputStreamPump::RetargetDeliveryTo [this=%p aNewTarget=%p] "
781
0
         "%s listener [%p] rv[%" PRIx32 "]",
782
0
         this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"),
783
0
         (nsIStreamListener*)mListener, static_cast<uint32_t>(rv)));
784
0
    return rv;
785
0
}
786
787
NS_IMETHODIMP
788
nsInputStreamPump::GetDeliveryTarget(nsIEventTarget** aNewTarget)
789
0
{
790
0
    RecursiveMutexAutoLock lock(mMutex);
791
0
792
0
    nsCOMPtr<nsIEventTarget> target = mTargetThread;
793
0
    target.forget(aNewTarget);
794
0
    return NS_OK;
795
0
}