Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/ipc/glue/IPCStreamDestination.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this
5
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include "IPCStreamDestination.h"
8
#include "mozilla/InputStreamLengthWrapper.h"
9
#include "mozilla/Mutex.h"
10
#include "nsIAsyncInputStream.h"
11
#include "nsIAsyncOutputStream.h"
12
#include "nsIBufferedStreams.h"
13
#include "nsICloneableInputStream.h"
14
#include "nsIPipe.h"
15
16
namespace mozilla {
17
namespace ipc {
18
19
// ----------------------------------------------------------------------------
20
// IPCStreamDestination::DelayedStartInputStream
21
//
22
// When AutoIPCStream is used with delayedStart, we need to ask for data at the
23
// first real use of the nsIInputStream. In order to do so, we wrap the
24
// nsIInputStream, created by the nsIPipe, with this wrapper.
25
26
class IPCStreamDestination::DelayedStartInputStream final
27
  : public nsIAsyncInputStream
28
  , public nsISearchableInputStream
29
  , public nsICloneableInputStream
30
  , public nsIBufferedInputStream
31
{
32
public:
33
  NS_DECL_THREADSAFE_ISUPPORTS
34
35
  DelayedStartInputStream(IPCStreamDestination* aDestination,
36
                          already_AddRefed<nsIAsyncInputStream>&& aStream)
37
    : mDestination(aDestination)
38
    , mStream(aStream)
39
    , mMutex("IPCStreamDestination::DelayedStartInputStream::mMutex")
40
0
  {
41
0
    MOZ_ASSERT(mDestination);
42
0
    MOZ_ASSERT(mStream);
43
0
  }
44
45
  void
46
  DestinationShutdown()
47
0
  {
48
0
    MutexAutoLock lock(mMutex);
49
0
    mDestination = nullptr;
50
0
  }
51
52
  // nsIInputStream interface
53
54
  NS_IMETHOD
55
  Close() override
56
0
  {
57
0
    MaybeCloseDestination();
58
0
    return mStream->Close();
59
0
  }
60
61
  NS_IMETHOD
62
  Available(uint64_t* aLength) override
63
0
  {
64
0
    MaybeStartReading();
65
0
    return mStream->Available(aLength);
66
0
  }
67
68
  NS_IMETHOD
69
  Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override
70
0
  {
71
0
    MaybeStartReading();
72
0
    return mStream->Read(aBuffer, aCount, aReadCount);
73
0
  }
74
75
  NS_IMETHOD
76
  ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
77
               uint32_t *aResult) override
78
0
  {
79
0
    MaybeStartReading();
80
0
    return mStream->ReadSegments(aWriter, aClosure, aCount, aResult);
81
0
  }
82
83
  NS_IMETHOD
84
  IsNonBlocking(bool* aNonBlocking) override
85
0
  {
86
0
    MaybeStartReading();
87
0
    return mStream->IsNonBlocking(aNonBlocking);
88
0
  }
89
90
  // nsIAsyncInputStream interface
91
92
  NS_IMETHOD
93
  CloseWithStatus(nsresult aReason) override
94
0
  {
95
0
    MaybeCloseDestination();
96
0
    return mStream->CloseWithStatus(aReason);
97
0
  }
98
99
  NS_IMETHOD
100
  AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
101
            uint32_t aRequestedCount, nsIEventTarget* aTarget) override
102
0
  {
103
0
    MaybeStartReading();
104
0
    return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aTarget);
105
0
  }
106
107
  NS_IMETHOD
108
  Search(const char* aForString, bool aIgnoreCase, bool* aFound,
109
         uint32_t* aOffsetSearchedTo) override
110
0
  {
111
0
    MaybeStartReading();
112
0
    nsCOMPtr<nsISearchableInputStream> searchable = do_QueryInterface(mStream);
113
0
    MOZ_ASSERT(searchable);
114
0
    return searchable->Search(aForString, aIgnoreCase, aFound, aOffsetSearchedTo);
115
0
  }
116
117
  // nsICloneableInputStream interface
118
119
  NS_IMETHOD
120
  GetCloneable(bool* aCloneable) override
121
0
  {
122
0
    MaybeStartReading();
123
0
    nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
124
0
    MOZ_ASSERT(cloneable);
125
0
    return cloneable->GetCloneable(aCloneable);
126
0
  }
127
128
  NS_IMETHOD
129
  Clone(nsIInputStream** aResult) override
130
0
  {
131
0
    MaybeStartReading();
132
0
    nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
133
0
    MOZ_ASSERT(cloneable);
134
0
    return cloneable->Clone(aResult);
135
0
  }
136
137
  // nsIBufferedInputStream
138
139
  NS_IMETHOD
140
  Init(nsIInputStream* aStream, uint32_t aBufferSize) override
141
0
  {
142
0
    MaybeStartReading();
143
0
    nsCOMPtr<nsIBufferedInputStream> stream = do_QueryInterface(mStream);
144
0
    MOZ_ASSERT(stream);
145
0
    return stream->Init(aStream, aBufferSize);
146
0
  }
147
148
  NS_IMETHODIMP
149
  GetData(nsIInputStream **aResult) override
150
0
  {
151
0
    return NS_ERROR_NOT_IMPLEMENTED;
152
0
  }
153
154
  void
155
  MaybeStartReading();
156
157
  void
158
  MaybeCloseDestination();
159
160
private:
161
0
  ~DelayedStartInputStream() = default;
162
163
  IPCStreamDestination* mDestination;
164
  nsCOMPtr<nsIAsyncInputStream> mStream;
165
166
  // This protects mDestination: any method can be called by any thread.
167
  Mutex mMutex;
168
169
  class HelperRunnable;
170
};
171
172
class IPCStreamDestination::DelayedStartInputStream::HelperRunnable final
173
  : public Runnable
174
{
175
public:
176
  enum Op {
177
    eStartReading,
178
    eCloseDestination,
179
  };
180
181
  HelperRunnable(
182
    IPCStreamDestination::DelayedStartInputStream* aDelayedStartInputStream,
183
    Op aOp)
184
    : Runnable(
185
        "ipc::IPCStreamDestination::DelayedStartInputStream::HelperRunnable")
186
    , mDelayedStartInputStream(aDelayedStartInputStream)
187
    , mOp(aOp)
188
0
  {
189
0
    MOZ_ASSERT(aDelayedStartInputStream);
190
0
  }
191
192
  NS_IMETHOD
193
  Run() override
194
0
  {
195
0
    switch (mOp) {
196
0
    case eStartReading:
197
0
      mDelayedStartInputStream->MaybeStartReading();
198
0
      break;
199
0
    case eCloseDestination:
200
0
      mDelayedStartInputStream->MaybeCloseDestination();
201
0
      break;
202
0
    }
203
0
204
0
    return NS_OK;
205
0
  }
206
207
private:
208
  RefPtr<IPCStreamDestination::DelayedStartInputStream> mDelayedStartInputStream;
209
  Op mOp;
210
};
211
212
void
213
IPCStreamDestination::DelayedStartInputStream::MaybeStartReading()
214
0
{
215
0
  MutexAutoLock lock(mMutex);
216
0
  if (!mDestination) {
217
0
    return;
218
0
  }
219
0
220
0
  if (mDestination->IsOnOwningThread()) {
221
0
    mDestination->StartReading();
222
0
    mDestination = nullptr;
223
0
    return;
224
0
  }
225
0
226
0
  RefPtr<Runnable> runnable =
227
0
    new HelperRunnable(this, HelperRunnable::eStartReading);
228
0
  mDestination->DispatchRunnable(runnable.forget());
229
0
}
230
231
void
232
IPCStreamDestination::DelayedStartInputStream::MaybeCloseDestination()
233
0
{
234
0
  MutexAutoLock lock(mMutex);
235
0
  if (!mDestination) {
236
0
    return;
237
0
  }
238
0
239
0
  if (mDestination->IsOnOwningThread()) {
240
0
    mDestination->RequestClose(NS_ERROR_ABORT);
241
0
    mDestination = nullptr;
242
0
    return;
243
0
  }
244
0
245
0
  RefPtr<Runnable> runnable =
246
0
    new HelperRunnable(this, HelperRunnable::eCloseDestination);
247
0
  mDestination->DispatchRunnable(runnable.forget());
248
0
}
249
250
NS_IMPL_ADDREF(IPCStreamDestination::DelayedStartInputStream);
251
NS_IMPL_RELEASE(IPCStreamDestination::DelayedStartInputStream);
252
253
0
NS_INTERFACE_MAP_BEGIN(IPCStreamDestination::DelayedStartInputStream)
254
0
  NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
255
0
  NS_INTERFACE_MAP_ENTRY(nsISearchableInputStream)
256
0
  NS_INTERFACE_MAP_ENTRY(nsICloneableInputStream)
257
0
  NS_INTERFACE_MAP_ENTRY(nsIBufferedInputStream)
258
0
  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsIInputStream, nsIAsyncInputStream)
259
0
  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIAsyncInputStream)
260
0
NS_INTERFACE_MAP_END
261
262
// ----------------------------------------------------------------------------
263
// IPCStreamDestination
264
265
IPCStreamDestination::IPCStreamDestination()
266
  : mOwningThread(NS_GetCurrentThread())
267
  , mDelayedStart(false)
268
#ifdef MOZ_DEBUG
269
  , mLengthSet(false)
270
#endif
271
0
{
272
0
}
273
274
IPCStreamDestination::~IPCStreamDestination()
275
0
{
276
0
}
277
278
nsresult
279
IPCStreamDestination::Initialize()
280
0
{
281
0
  MOZ_ASSERT(!mReader);
282
0
  MOZ_ASSERT(!mWriter);
283
0
284
0
  // use async versions for both reader and writer even though we are
285
0
  // opening the writer as an infinite stream.  We want to be able to
286
0
  // use CloseWithStatus() to communicate errors through the pipe.
287
0
288
0
  // Use an "infinite" pipe because we cannot apply back-pressure through
289
0
  // the async IPC layer at the moment.  Blocking the IPC worker thread
290
0
  // is not desirable, either.
291
0
  nsresult rv = NS_NewPipe2(getter_AddRefs(mReader),
292
0
                            getter_AddRefs(mWriter),
293
0
                            true, true,   // non-blocking
294
0
                            0,            // segment size
295
0
                            UINT32_MAX);  // "infinite" pipe
296
0
  if (NS_WARN_IF(NS_FAILED(rv))) {
297
0
    return rv;
298
0
  }
299
0
300
0
  return NS_OK;
301
0
}
302
303
void
304
IPCStreamDestination::SetDelayedStart(bool aDelayedStart)
305
0
{
306
0
  mDelayedStart = aDelayedStart;
307
0
}
308
309
void
310
IPCStreamDestination::SetLength(int64_t aLength)
311
0
{
312
0
  MOZ_ASSERT(mReader);
313
0
  MOZ_ASSERT(!mLengthSet);
314
0
315
#ifdef DEBUG
316
  mLengthSet = true;
317
#endif
318
319
0
  if (aLength != -1) {
320
0
    nsCOMPtr<nsIInputStream> finalStream;
321
0
    finalStream = new InputStreamLengthWrapper(mReader.forget(), aLength);
322
0
    mReader = do_QueryInterface(finalStream);
323
0
    MOZ_ASSERT(mReader);
324
0
  }
325
0
}
326
327
already_AddRefed<nsIInputStream>
328
IPCStreamDestination::TakeReader()
329
0
{
330
0
  MOZ_ASSERT(mReader);
331
0
  MOZ_ASSERT(!mDelayedStartInputStream);
332
0
333
0
  if (mDelayedStart) {
334
0
    mDelayedStartInputStream =
335
0
      new DelayedStartInputStream(this, mReader.forget());
336
0
    RefPtr<nsIAsyncInputStream> inputStream = mDelayedStartInputStream;
337
0
    return inputStream.forget();
338
0
  }
339
0
340
0
  return mReader.forget();
341
0
}
342
343
bool
344
IPCStreamDestination::IsOnOwningThread() const
345
0
{
346
0
  return mOwningThread == NS_GetCurrentThread();
347
0
}
348
349
void
350
IPCStreamDestination::DispatchRunnable(already_AddRefed<nsIRunnable>&& aRunnable)
351
0
{
352
0
  nsCOMPtr<nsIRunnable> runnable = aRunnable;
353
0
  mOwningThread->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
354
0
}
355
356
void
357
IPCStreamDestination::ActorDestroyed()
358
0
{
359
0
  MOZ_ASSERT(mWriter);
360
0
361
0
  // If we were gracefully closed we should have gotten RecvClose().  In
362
0
  // that case, the writer will already be closed and this will have no
363
0
  // effect.  This just aborts the writer in the case where the child process
364
0
  // crashes.
365
0
  mWriter->CloseWithStatus(NS_ERROR_ABORT);
366
0
367
0
  if (mDelayedStartInputStream) {
368
0
    mDelayedStartInputStream->DestinationShutdown();
369
0
    mDelayedStartInputStream = nullptr;
370
0
  }
371
0
}
372
373
void
374
IPCStreamDestination::BufferReceived(const wr::ByteBuffer& aBuffer)
375
0
{
376
0
  MOZ_ASSERT(mWriter);
377
0
378
0
  uint32_t numWritten = 0;
379
0
380
0
  // This should only fail if we hit an OOM condition.
381
0
  nsresult rv = mWriter->Write(reinterpret_cast<char*>(aBuffer.mData),
382
0
                               aBuffer.mLength,
383
0
                               &numWritten);
384
0
  if (NS_WARN_IF(NS_FAILED(rv))) {
385
0
    RequestClose(rv);
386
0
  }
387
0
}
388
389
void
390
IPCStreamDestination::CloseReceived(nsresult aRv)
391
0
{
392
0
  MOZ_ASSERT(mWriter);
393
0
  mWriter->CloseWithStatus(aRv);
394
0
  TerminateDestination();
395
0
}
396
397
} // namespace ipc
398
} // namespace mozilla