Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/xpcom/io/NonBlockingAsyncInputStream.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
/* This Source Code Form is subject to the terms of the Mozilla Public
3
 * License, v. 2.0. If a copy of the MPL was not distributed with this
4
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6
#include "NonBlockingAsyncInputStream.h"
7
#include "mozilla/ipc/InputStreamUtils.h"
8
#include "nsISeekableStream.h"
9
#include "nsStreamUtils.h"
10
11
namespace mozilla {
12
13
using namespace ipc;
14
15
class NonBlockingAsyncInputStream::AsyncWaitRunnable final : public CancelableRunnable
16
{
17
  RefPtr<NonBlockingAsyncInputStream> mStream;
18
  nsCOMPtr<nsIInputStreamCallback> mCallback;
19
20
public:
21
  AsyncWaitRunnable(NonBlockingAsyncInputStream* aStream,
22
                    nsIInputStreamCallback* aCallback)
23
    : CancelableRunnable("AsyncWaitRunnable")
24
    , mStream(aStream)
25
    , mCallback(aCallback)
26
0
  {}
27
28
  NS_IMETHOD
29
  Run() override
30
0
  {
31
0
    mStream->RunAsyncWaitCallback(this, mCallback.forget());
32
0
    return NS_OK;
33
0
  }
34
};
35
36
NS_IMPL_ADDREF(NonBlockingAsyncInputStream);
37
NS_IMPL_RELEASE(NonBlockingAsyncInputStream);
38
39
NonBlockingAsyncInputStream::WaitClosureOnly::WaitClosureOnly(AsyncWaitRunnable* aRunnable,
40
                                                              nsIEventTarget* aEventTarget)
41
  : mRunnable(aRunnable)
42
  , mEventTarget(aEventTarget)
43
0
{}
44
45
0
NS_INTERFACE_MAP_BEGIN(NonBlockingAsyncInputStream)
46
0
  NS_INTERFACE_MAP_ENTRY(nsIInputStream)
47
0
  NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
48
0
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
49
0
                                     mWeakCloneableInputStream)
50
0
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
51
0
                                     mWeakIPCSerializableInputStream)
52
0
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream,
53
0
                                     mWeakSeekableInputStream)
54
0
  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream)
55
0
NS_INTERFACE_MAP_END
56
57
/* static */ nsresult
58
NonBlockingAsyncInputStream::Create(already_AddRefed<nsIInputStream> aInputStream,
59
                                    nsIAsyncInputStream** aResult)
60
0
{
61
0
  MOZ_DIAGNOSTIC_ASSERT(aResult);
62
0
63
0
  nsCOMPtr<nsIInputStream> inputStream = std::move(aInputStream);
64
0
65
0
  bool nonBlocking = false;
66
0
  nsresult rv = inputStream->IsNonBlocking(&nonBlocking);
67
0
  if (NS_WARN_IF(NS_FAILED(rv))) {
68
0
    return rv;
69
0
  }
70
0
71
0
  MOZ_DIAGNOSTIC_ASSERT(nonBlocking);
72
0
73
0
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
74
0
  nsCOMPtr<nsIAsyncInputStream> asyncInputStream =
75
0
    do_QueryInterface(inputStream);
76
0
  MOZ_DIAGNOSTIC_ASSERT(!asyncInputStream);
77
0
#endif // MOZ_DIAGNOSTIC_ASSERT_ENABLED
78
0
79
0
  RefPtr<NonBlockingAsyncInputStream> stream =
80
0
    new NonBlockingAsyncInputStream(inputStream.forget());
81
0
82
0
  stream.forget(aResult);
83
0
  return NS_OK;
84
0
}
85
86
NonBlockingAsyncInputStream::NonBlockingAsyncInputStream(already_AddRefed<nsIInputStream> aInputStream)
87
  : mInputStream(std::move(aInputStream))
88
  , mWeakCloneableInputStream(nullptr)
89
  , mWeakIPCSerializableInputStream(nullptr)
90
  , mWeakSeekableInputStream(nullptr)
91
  , mLock("NonBlockingAsyncInputStream::mLock")
92
  , mClosed(false)
93
0
{
94
0
  MOZ_ASSERT(mInputStream);
95
0
96
0
  nsCOMPtr<nsICloneableInputStream> cloneableStream =
97
0
    do_QueryInterface(mInputStream);
98
0
  if (cloneableStream && SameCOMIdentity(mInputStream, cloneableStream)) {
99
0
    mWeakCloneableInputStream = cloneableStream;
100
0
  }
101
0
102
0
  nsCOMPtr<nsIIPCSerializableInputStream> serializableStream =
103
0
    do_QueryInterface(mInputStream);
104
0
  if (serializableStream &&
105
0
      SameCOMIdentity(mInputStream, serializableStream)) {
106
0
    mWeakIPCSerializableInputStream = serializableStream;
107
0
  }
108
0
109
0
  nsCOMPtr<nsISeekableStream> seekableStream =
110
0
    do_QueryInterface(mInputStream);
111
0
  if (seekableStream && SameCOMIdentity(mInputStream, seekableStream)) {
112
0
    mWeakSeekableInputStream = seekableStream;
113
0
  }
114
0
}
115
116
NonBlockingAsyncInputStream::~NonBlockingAsyncInputStream()
117
0
{}
118
119
NS_IMETHODIMP
120
NonBlockingAsyncInputStream::Close()
121
0
{
122
0
  RefPtr<AsyncWaitRunnable> waitClosureOnlyRunnable;
123
0
  nsCOMPtr<nsIEventTarget> waitClosureOnlyEventTarget;
124
0
125
0
  {
126
0
    MutexAutoLock lock(mLock);
127
0
128
0
    if (mClosed) {
129
0
      // Here we could return NS_BASE_STREAM_CLOSED as well, but just to avoid
130
0
      // warning messages, let's make everybody happy with a NS_OK.
131
0
      return NS_OK;
132
0
    }
133
0
134
0
    mClosed = true;
135
0
136
0
    NS_ENSURE_STATE(mInputStream);
137
0
    nsresult rv = mInputStream->Close();
138
0
    if (NS_WARN_IF(NS_FAILED(rv))) {
139
0
      mWaitClosureOnly.reset();
140
0
      return rv;
141
0
    }
142
0
143
0
    // If we have a WaitClosureOnly runnable, it's time to use it.
144
0
    if (mWaitClosureOnly.isSome()) {
145
0
      waitClosureOnlyRunnable = std::move(mWaitClosureOnly->mRunnable);
146
0
      waitClosureOnlyEventTarget = std::move(mWaitClosureOnly->mEventTarget);
147
0
148
0
      mWaitClosureOnly.reset();
149
0
150
0
      // Now we want to dispatch the asyncWaitCallback.
151
0
      mAsyncWaitCallback = waitClosureOnlyRunnable;
152
0
    }
153
0
  }
154
0
155
0
  if (waitClosureOnlyRunnable) {
156
0
    if (waitClosureOnlyEventTarget) {
157
0
      waitClosureOnlyEventTarget->Dispatch(waitClosureOnlyRunnable,
158
0
                                           NS_DISPATCH_NORMAL);
159
0
    } else {
160
0
      waitClosureOnlyRunnable->Run();
161
0
    }
162
0
  }
163
0
164
0
  return NS_OK;
165
0
}
166
167
// nsIInputStream interface
168
169
NS_IMETHODIMP
170
NonBlockingAsyncInputStream::Available(uint64_t* aLength)
171
0
{
172
0
  return mInputStream->Available(aLength);
173
0
}
174
175
NS_IMETHODIMP
176
NonBlockingAsyncInputStream::Read(char* aBuffer, uint32_t aCount,
177
                                  uint32_t* aReadCount)
178
0
{
179
0
  return mInputStream->Read(aBuffer, aCount, aReadCount);
180
0
}
181
182
namespace {
183
184
class MOZ_RAII ReadSegmentsData
185
{
186
public:
187
  ReadSegmentsData(NonBlockingAsyncInputStream* aStream,
188
                   nsWriteSegmentFun aFunc,
189
                   void* aClosure)
190
    : mStream(aStream)
191
    , mFunc(aFunc)
192
    , mClosure(aClosure)
193
0
  {}
194
195
  NonBlockingAsyncInputStream* mStream;
196
  nsWriteSegmentFun mFunc;
197
  void* mClosure;
198
};
199
200
nsresult
201
ReadSegmentsWriter(nsIInputStream* aInStream,
202
                   void* aClosure,
203
                   const char* aFromSegment,
204
                   uint32_t aToOffset,
205
                   uint32_t aCount,
206
                   uint32_t* aWriteCount)
207
0
{
208
0
  ReadSegmentsData* data = static_cast<ReadSegmentsData*>(aClosure);
209
0
  return data->mFunc(data->mStream, data->mClosure, aFromSegment, aToOffset,
210
0
                     aCount, aWriteCount);
211
0
}
212
213
} // anonymous
214
215
NS_IMETHODIMP
216
NonBlockingAsyncInputStream::ReadSegments(nsWriteSegmentFun aWriter,
217
                                          void* aClosure, uint32_t aCount,
218
                                          uint32_t* aResult)
219
0
{
220
0
  ReadSegmentsData data(this, aWriter, aClosure);
221
0
  return mInputStream->ReadSegments(ReadSegmentsWriter, &data, aCount, aResult);
222
0
}
223
224
NS_IMETHODIMP
225
NonBlockingAsyncInputStream::IsNonBlocking(bool* aNonBlocking)
226
0
{
227
0
  *aNonBlocking = true;
228
0
  return NS_OK;
229
0
}
230
231
// nsICloneableInputStream interface
232
233
NS_IMETHODIMP
234
NonBlockingAsyncInputStream::GetCloneable(bool* aCloneable)
235
0
{
236
0
  NS_ENSURE_STATE(mWeakCloneableInputStream);
237
0
  return mWeakCloneableInputStream->GetCloneable(aCloneable);
238
0
}
239
240
NS_IMETHODIMP
241
NonBlockingAsyncInputStream::Clone(nsIInputStream** aResult)
242
0
{
243
0
  NS_ENSURE_STATE(mWeakCloneableInputStream);
244
0
245
0
  nsCOMPtr<nsIInputStream> clonedStream;
246
0
  nsresult rv = mWeakCloneableInputStream->Clone(getter_AddRefs(clonedStream));
247
0
  if (NS_WARN_IF(NS_FAILED(rv))) {
248
0
    return rv;
249
0
  }
250
0
251
0
  nsCOMPtr<nsIAsyncInputStream> asyncStream;
252
0
  rv = Create(clonedStream.forget(), getter_AddRefs(asyncStream));
253
0
  if (NS_WARN_IF(NS_FAILED(rv))) {
254
0
    return rv;
255
0
  }
256
0
257
0
  asyncStream.forget(aResult);
258
0
  return NS_OK;
259
0
}
260
261
// nsIAsyncInputStream interface
262
263
NS_IMETHODIMP
264
NonBlockingAsyncInputStream::CloseWithStatus(nsresult aStatus)
265
0
{
266
0
  return Close();
267
0
}
268
269
NS_IMETHODIMP
270
NonBlockingAsyncInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
271
                                       uint32_t aFlags,
272
                                       uint32_t aRequestedCount,
273
                                       nsIEventTarget* aEventTarget)
274
0
{
275
0
  RefPtr<AsyncWaitRunnable> runnable;
276
0
  {
277
0
    MutexAutoLock lock(mLock);
278
0
279
0
    if (aCallback && (mWaitClosureOnly.isSome() || mAsyncWaitCallback)) {
280
0
      return NS_ERROR_FAILURE;
281
0
    }
282
0
283
0
    if (!aCallback) {
284
0
      // Canceling previous callbacks.
285
0
      mWaitClosureOnly.reset();
286
0
      mAsyncWaitCallback = nullptr;
287
0
      return NS_OK;
288
0
    }
289
0
290
0
    // Maybe the stream is already closed.
291
0
    if (!mClosed) {
292
0
      uint64_t length;
293
0
      nsresult rv = mInputStream->Available(&length);
294
0
      if (NS_SUCCEEDED(rv) && length == 0) {
295
0
        mInputStream->Close();
296
0
        mClosed = true;
297
0
      }
298
0
    }
299
0
300
0
    runnable = new AsyncWaitRunnable(this, aCallback);
301
0
    if ((aFlags & nsIAsyncInputStream::WAIT_CLOSURE_ONLY) && !mClosed) {
302
0
      mWaitClosureOnly.emplace(runnable, aEventTarget);
303
0
      return NS_OK;
304
0
    }
305
0
306
0
    mAsyncWaitCallback = runnable;
307
0
  }
308
0
309
0
  MOZ_ASSERT(runnable);
310
0
311
0
  if (aEventTarget) {
312
0
    return aEventTarget->Dispatch(runnable.forget());
313
0
  }
314
0
315
0
  return runnable->Run();
316
0
}
317
318
// nsIIPCSerializableInputStream
319
320
void
321
NonBlockingAsyncInputStream::Serialize(mozilla::ipc::InputStreamParams& aParams,
322
                                       FileDescriptorArray& aFileDescriptors)
323
0
{
324
0
  MOZ_ASSERT(mWeakIPCSerializableInputStream);
325
0
  InputStreamHelper::SerializeInputStream(mInputStream, aParams,
326
0
                                          aFileDescriptors);
327
0
}
328
329
bool
330
NonBlockingAsyncInputStream::Deserialize(const mozilla::ipc::InputStreamParams& aParams,
331
                                         const FileDescriptorArray& aFileDescriptors)
332
0
{
333
0
  MOZ_CRASH("NonBlockingAsyncInputStream cannot be deserialized!");
334
0
  return true;
335
0
}
336
337
Maybe<uint64_t>
338
NonBlockingAsyncInputStream::ExpectedSerializedLength()
339
0
{
340
0
  NS_ENSURE_TRUE(mWeakIPCSerializableInputStream, Nothing());
341
0
  return mWeakIPCSerializableInputStream->ExpectedSerializedLength();
342
0
}
343
344
// nsISeekableStream
345
346
NS_IMETHODIMP
347
NonBlockingAsyncInputStream::Seek(int32_t aWhence, int64_t aOffset)
348
0
{
349
0
  NS_ENSURE_STATE(mWeakSeekableInputStream);
350
0
  return mWeakSeekableInputStream->Seek(aWhence, aOffset);
351
0
}
352
353
NS_IMETHODIMP
354
NonBlockingAsyncInputStream::Tell(int64_t* aResult)
355
0
{
356
0
  NS_ENSURE_STATE(mWeakSeekableInputStream);
357
0
  return mWeakSeekableInputStream->Tell(aResult);
358
0
}
359
360
NS_IMETHODIMP
361
NonBlockingAsyncInputStream::SetEOF()
362
0
{
363
0
  NS_ENSURE_STATE(mWeakSeekableInputStream);
364
0
  return NS_ERROR_NOT_IMPLEMENTED;
365
0
}
366
367
void
368
NonBlockingAsyncInputStream::RunAsyncWaitCallback(NonBlockingAsyncInputStream::AsyncWaitRunnable* aRunnable,
369
                                                  already_AddRefed<nsIInputStreamCallback> aCallback)
370
0
{
371
0
  nsCOMPtr<nsIInputStreamCallback> callback = std::move(aCallback);
372
0
373
0
  {
374
0
    MutexAutoLock lock(mLock);
375
0
    if (mAsyncWaitCallback != aRunnable) {
376
0
      // The callback has been canceled in the meantime.
377
0
      return;
378
0
    }
379
0
380
0
    mAsyncWaitCallback = nullptr;
381
0
  }
382
0
383
0
  callback->OnInputStreamReady(this);
384
0
}
385
386
} // mozilla namespace