Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/dom/file/ipc/IPCBlobInputStreamChild.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this
5
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include "IPCBlobInputStreamChild.h"
8
#include "IPCBlobInputStreamThread.h"
9
10
#include "mozilla/ipc/IPCStreamUtils.h"
11
#include "mozilla/dom/WorkerCommon.h"
12
#include "mozilla/dom/WorkerRef.h"
13
14
namespace mozilla {
15
namespace dom {
16
17
namespace {
18
19
// This runnable is used in case the last stream is forgotten on the 'wrong'
20
// thread.
21
class ShutdownRunnable final : public CancelableRunnable
22
{
23
public:
24
  explicit ShutdownRunnable(IPCBlobInputStreamChild* aActor)
25
    : CancelableRunnable("dom::ShutdownRunnable")
26
    , mActor(aActor)
27
0
  {}
28
29
  NS_IMETHOD
30
  Run() override
31
0
  {
32
0
    mActor->Shutdown();
33
0
    return NS_OK;
34
0
  }
35
36
private:
37
  RefPtr<IPCBlobInputStreamChild> mActor;
38
};
39
40
// This runnable is used in case StreamNeeded() has been called on a non-owning
41
// thread.
42
class StreamNeededRunnable final : public CancelableRunnable
43
{
44
public:
45
  explicit StreamNeededRunnable(IPCBlobInputStreamChild* aActor)
46
    : CancelableRunnable("dom::StreamNeededRunnable")
47
    , mActor(aActor)
48
0
  {}
49
50
  NS_IMETHOD
51
  Run() override
52
0
  {
53
0
    MOZ_ASSERT(mActor->State() != IPCBlobInputStreamChild::eActiveMigrating &&
54
0
               mActor->State() != IPCBlobInputStreamChild::eInactiveMigrating);
55
0
    if (mActor->State() == IPCBlobInputStreamChild::eActive) {
56
0
      mActor->SendStreamNeeded();
57
0
    }
58
0
    return NS_OK;
59
0
  }
60
61
private:
62
  RefPtr<IPCBlobInputStreamChild> mActor;
63
};
64
65
// When the stream has been received from the parent, we inform the
66
// IPCBlobInputStream.
67
class StreamReadyRunnable final : public CancelableRunnable
68
{
69
public:
70
  StreamReadyRunnable(IPCBlobInputStream* aDestinationStream,
71
                      already_AddRefed<nsIInputStream> aCreatedStream)
72
    : CancelableRunnable("dom::StreamReadyRunnable")
73
    , mDestinationStream(aDestinationStream)
74
    , mCreatedStream(std::move(aCreatedStream))
75
0
  {
76
0
    MOZ_ASSERT(mDestinationStream);
77
0
    // mCreatedStream can be null.
78
0
  }
79
80
  NS_IMETHOD
81
  Run() override
82
0
  {
83
0
    mDestinationStream->StreamReady(mCreatedStream.forget());
84
0
    return NS_OK;
85
0
  }
86
87
private:
88
  RefPtr<IPCBlobInputStream> mDestinationStream;
89
  nsCOMPtr<nsIInputStream> mCreatedStream;
90
};
91
92
// This runnable is used in case LengthNeeded() has been called on a non-owning
93
// thread.
94
class LengthNeededRunnable final : public CancelableRunnable
95
{
96
public:
97
  explicit LengthNeededRunnable(IPCBlobInputStreamChild* aActor)
98
    : CancelableRunnable("dom::LengthNeededRunnable")
99
    , mActor(aActor)
100
0
  {}
101
102
  NS_IMETHOD
103
  Run() override
104
0
  {
105
0
    MOZ_ASSERT(mActor->State() != IPCBlobInputStreamChild::eActiveMigrating &&
106
0
               mActor->State() != IPCBlobInputStreamChild::eInactiveMigrating);
107
0
    if (mActor->State() == IPCBlobInputStreamChild::eActive) {
108
0
      mActor->SendLengthNeeded();
109
0
    }
110
0
    return NS_OK;
111
0
  }
112
113
private:
114
  RefPtr<IPCBlobInputStreamChild> mActor;
115
};
116
117
// When the stream has been received from the parent, we inform the
118
// IPCBlobInputStream.
119
class LengthReadyRunnable final : public CancelableRunnable
120
{
121
public:
122
  LengthReadyRunnable(IPCBlobInputStream* aDestinationStream, int64_t aSize)
123
    : CancelableRunnable("dom::LengthReadyRunnable")
124
    , mDestinationStream(aDestinationStream)
125
    , mSize(aSize)
126
0
  {
127
0
    MOZ_ASSERT(mDestinationStream);
128
0
  }
129
130
  NS_IMETHOD
131
  Run() override
132
0
  {
133
0
    mDestinationStream->LengthReady(mSize);
134
0
    return NS_OK;
135
0
  }
136
137
private:
138
  RefPtr<IPCBlobInputStream> mDestinationStream;
139
  int64_t mSize;
140
};
141
142
} // anonymous
143
144
IPCBlobInputStreamChild::IPCBlobInputStreamChild(const nsID& aID,
145
                                                 uint64_t aSize)
146
  : mMutex("IPCBlobInputStreamChild::mMutex")
147
  , mID(aID)
148
  , mSize(aSize)
149
  , mState(eActive)
150
  , mOwningEventTarget(GetCurrentThreadSerialEventTarget())
151
0
{
152
0
  // If we are running in a worker, we need to send a Close() to the parent side
153
0
  // before the thread is released.
154
0
  if (!NS_IsMainThread()) {
155
0
    WorkerPrivate* workerPrivate = GetCurrentThreadWorkerPrivate();
156
0
    if (!workerPrivate) {
157
0
      return;
158
0
    }
159
0
160
0
    RefPtr<StrongWorkerRef> workerRef =
161
0
      StrongWorkerRef::Create(workerPrivate, "IPCBlobInputStreamChild");
162
0
    if (!workerRef) {
163
0
      return;
164
0
    }
165
0
166
0
    // We must keep the worker alive until the migration is completed.
167
0
    mWorkerRef = new ThreadSafeWorkerRef(workerRef);
168
0
  }
169
0
}
170
171
IPCBlobInputStreamChild::~IPCBlobInputStreamChild()
172
0
{}
173
174
void
175
IPCBlobInputStreamChild::Shutdown()
176
0
{
177
0
  MutexAutoLock lock(mMutex);
178
0
179
0
  RefPtr<IPCBlobInputStreamChild> kungFuDeathGrip = this;
180
0
181
0
  mWorkerRef = nullptr;
182
0
  mPendingOperations.Clear();
183
0
184
0
  if (mState == eActive) {
185
0
    SendClose();
186
0
    mState = eInactive;
187
0
  }
188
0
}
189
190
void
191
IPCBlobInputStreamChild::ActorDestroy(IProtocol::ActorDestroyReason aReason)
192
0
{
193
0
  bool migrating = false;
194
0
195
0
  {
196
0
    MutexAutoLock lock(mMutex);
197
0
    migrating = mState == eActiveMigrating;
198
0
    mState = migrating ? eInactiveMigrating : eInactive;
199
0
  }
200
0
201
0
  if (migrating) {
202
0
    // We were waiting for this! Now we can migrate the actor in the correct
203
0
    // thread.
204
0
    RefPtr<IPCBlobInputStreamThread> thread =
205
0
      IPCBlobInputStreamThread::GetOrCreate();
206
0
    MOZ_ASSERT(thread, "We cannot continue without DOMFile thread.");
207
0
208
0
    ResetManager();
209
0
    thread->MigrateActor(this);
210
0
    return;
211
0
  }
212
0
213
0
  // Let's cleanup the workerRef and the pending operation queue.
214
0
  Shutdown();
215
0
}
216
217
IPCBlobInputStreamChild::ActorState
218
IPCBlobInputStreamChild::State()
219
0
{
220
0
  MutexAutoLock lock(mMutex);
221
0
  return mState;
222
0
}
223
224
already_AddRefed<IPCBlobInputStream>
225
IPCBlobInputStreamChild::CreateStream()
226
0
{
227
0
  bool shouldMigrate = false;
228
0
229
0
  RefPtr<IPCBlobInputStream> stream = new IPCBlobInputStream(this);
230
0
231
0
  {
232
0
    MutexAutoLock lock(mMutex);
233
0
234
0
    if (mState == eInactive) {
235
0
      return nullptr;
236
0
    }
237
0
238
0
    // The stream is active but maybe it is not running in the DOM-File thread.
239
0
    // We should migrate it there.
240
0
    if (mState == eActive &&
241
0
        !IPCBlobInputStreamThread::IsOnFileEventTarget(mOwningEventTarget)) {
242
0
      MOZ_ASSERT(mStreams.IsEmpty());
243
0
      shouldMigrate = true;
244
0
      mState = eActiveMigrating;
245
0
    }
246
0
247
0
    mStreams.AppendElement(stream);
248
0
  }
249
0
250
0
  // Send__delete__ will call ActorDestroy(). mMutex cannot be locked at this
251
0
  // time.
252
0
  if (shouldMigrate) {
253
0
    Send__delete__(this);
254
0
  }
255
0
256
0
  return stream.forget();
257
0
}
258
259
void
260
IPCBlobInputStreamChild::ForgetStream(IPCBlobInputStream* aStream)
261
0
{
262
0
  MOZ_ASSERT(aStream);
263
0
264
0
  RefPtr<IPCBlobInputStreamChild> kungFuDeathGrip = this;
265
0
266
0
  {
267
0
    MutexAutoLock lock(mMutex);
268
0
    mStreams.RemoveElement(aStream);
269
0
270
0
    if (!mStreams.IsEmpty() || mState != eActive) {
271
0
      return;
272
0
    }
273
0
  }
274
0
275
0
  if (mOwningEventTarget->IsOnCurrentThread()) {
276
0
    Shutdown();
277
0
    return;
278
0
  }
279
0
280
0
  RefPtr<ShutdownRunnable> runnable = new ShutdownRunnable(this);
281
0
  mOwningEventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
282
0
}
283
284
void
285
IPCBlobInputStreamChild::StreamNeeded(IPCBlobInputStream* aStream,
286
                                      nsIEventTarget* aEventTarget)
287
0
{
288
0
  MutexAutoLock lock(mMutex);
289
0
290
0
  if (mState == eInactive) {
291
0
    return;
292
0
  }
293
0
294
0
  MOZ_ASSERT(mStreams.Contains(aStream));
295
0
296
0
  PendingOperation* opt = mPendingOperations.AppendElement();
297
0
  opt->mStream = aStream;
298
0
  opt->mEventTarget = aEventTarget;
299
0
  opt->mOp = PendingOperation::eStreamNeeded;
300
0
301
0
  if (mState == eActiveMigrating || mState == eInactiveMigrating) {
302
0
    // This operation will be continued when the migration is completed.
303
0
    return;
304
0
  }
305
0
306
0
  MOZ_ASSERT(mState == eActive);
307
0
308
0
  if (mOwningEventTarget->IsOnCurrentThread()) {
309
0
    SendStreamNeeded();
310
0
    return;
311
0
  }
312
0
313
0
  RefPtr<StreamNeededRunnable> runnable = new StreamNeededRunnable(this);
314
0
  mOwningEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
315
0
}
316
317
mozilla::ipc::IPCResult
318
IPCBlobInputStreamChild::RecvStreamReady(const OptionalIPCStream& aStream)
319
0
{
320
0
  nsCOMPtr<nsIInputStream> stream = mozilla::ipc::DeserializeIPCStream(aStream);
321
0
322
0
  RefPtr<IPCBlobInputStream> pendingStream;
323
0
  nsCOMPtr<nsIEventTarget> eventTarget;
324
0
325
0
  {
326
0
    MutexAutoLock lock(mMutex);
327
0
328
0
    // We have been shutdown in the meantime.
329
0
    if (mState == eInactive) {
330
0
      return IPC_OK();
331
0
    }
332
0
333
0
    MOZ_ASSERT(!mPendingOperations.IsEmpty());
334
0
    MOZ_ASSERT(mState == eActive);
335
0
336
0
    pendingStream = mPendingOperations[0].mStream;
337
0
    eventTarget = mPendingOperations[0].mEventTarget;
338
0
    MOZ_ASSERT(mPendingOperations[0].mOp == PendingOperation::eStreamNeeded);
339
0
340
0
    mPendingOperations.RemoveElementAt(0);
341
0
  }
342
0
343
0
  RefPtr<StreamReadyRunnable> runnable =
344
0
    new StreamReadyRunnable(pendingStream, stream.forget());
345
0
346
0
  // If IPCBlobInputStream::AsyncWait() has been executed without passing an
347
0
  // event target, we run the callback synchronous because any thread could be
348
0
  // result to be the wrong one. See more in nsIAsyncInputStream::asyncWait
349
0
  // documentation.
350
0
  if (eventTarget) {
351
0
    eventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
352
0
  } else {
353
0
    runnable->Run();
354
0
  }
355
0
356
0
  return IPC_OK();
357
0
}
358
359
void
360
IPCBlobInputStreamChild::LengthNeeded(IPCBlobInputStream* aStream,
361
                                      nsIEventTarget* aEventTarget)
362
0
{
363
0
  MutexAutoLock lock(mMutex);
364
0
365
0
  if (mState == eInactive) {
366
0
    return;
367
0
  }
368
0
369
0
  MOZ_ASSERT(mStreams.Contains(aStream));
370
0
371
0
  PendingOperation* opt = mPendingOperations.AppendElement();
372
0
  opt->mStream = aStream;
373
0
  opt->mEventTarget = aEventTarget;
374
0
  opt->mOp = PendingOperation::eLengthNeeded;
375
0
376
0
  if (mState == eActiveMigrating || mState == eInactiveMigrating) {
377
0
    // This operation will be continued when the migration is completed.
378
0
    return;
379
0
  }
380
0
381
0
  MOZ_ASSERT(mState == eActive);
382
0
383
0
  if (mOwningEventTarget->IsOnCurrentThread()) {
384
0
    SendLengthNeeded();
385
0
    return;
386
0
  }
387
0
388
0
  RefPtr<LengthNeededRunnable> runnable = new LengthNeededRunnable(this);
389
0
  mOwningEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
390
0
}
391
392
mozilla::ipc::IPCResult
393
IPCBlobInputStreamChild::RecvLengthReady(const int64_t& aLength)
394
0
{
395
0
  RefPtr<IPCBlobInputStream> pendingStream;
396
0
  nsCOMPtr<nsIEventTarget> eventTarget;
397
0
398
0
  {
399
0
    MutexAutoLock lock(mMutex);
400
0
401
0
    // We have been shutdown in the meantime.
402
0
    if (mState == eInactive) {
403
0
      return IPC_OK();
404
0
    }
405
0
406
0
    MOZ_ASSERT(!mPendingOperations.IsEmpty());
407
0
    MOZ_ASSERT(mState == eActive);
408
0
409
0
    pendingStream = mPendingOperations[0].mStream;
410
0
    eventTarget = mPendingOperations[0].mEventTarget;
411
0
    MOZ_ASSERT(mPendingOperations[0].mOp == PendingOperation::eLengthNeeded);
412
0
413
0
    mPendingOperations.RemoveElementAt(0);
414
0
  }
415
0
416
0
  RefPtr<LengthReadyRunnable> runnable =
417
0
    new LengthReadyRunnable(pendingStream, aLength);
418
0
419
0
   MOZ_ASSERT(eventTarget);
420
0
  eventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
421
0
422
0
  return IPC_OK();
423
0
}
424
void
425
IPCBlobInputStreamChild::Migrated()
426
0
{
427
0
  MutexAutoLock lock(mMutex);
428
0
  MOZ_ASSERT(mState == eInactiveMigrating);
429
0
430
0
  mWorkerRef = nullptr;
431
0
432
0
  mOwningEventTarget = GetCurrentThreadSerialEventTarget();
433
0
  MOZ_ASSERT(IPCBlobInputStreamThread::IsOnFileEventTarget(mOwningEventTarget));
434
0
435
0
  // Maybe we have no reasons to keep this actor alive.
436
0
  if (mStreams.IsEmpty()) {
437
0
    mState = eInactive;
438
0
    SendClose();
439
0
    return;
440
0
  }
441
0
442
0
  mState = eActive;
443
0
444
0
  // Let's processing the pending operations. We need a stream for each pending
445
0
  // operation.
446
0
  for (uint32_t i = 0; i < mPendingOperations.Length(); ++i) {
447
0
    if (mPendingOperations[i].mOp == PendingOperation::eStreamNeeded) {
448
0
      SendStreamNeeded();
449
0
    } else {
450
0
      MOZ_ASSERT(mPendingOperations[i].mOp == PendingOperation::eLengthNeeded);
451
0
      SendLengthNeeded();
452
0
    }
453
0
  }
454
0
}
455
456
} // namespace dom
457
} // namespace mozilla