Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/ipc/glue/IPCStreamSource.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this
5
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include "IPCStreamSource.h"
8
#include "BackgroundParent.h" // for AssertIsOnBackgroundThread
9
#include "mozilla/webrender/WebRenderTypes.h"
10
#include "nsIAsyncInputStream.h"
11
#include "nsICancelableRunnable.h"
12
#include "nsIRunnable.h"
13
#include "nsISerialEventTarget.h"
14
#include "nsStreamUtils.h"
15
#include "nsThreadUtils.h"
16
17
using mozilla::wr::ByteBuffer;
18
19
namespace mozilla {
20
namespace ipc {
21
22
class IPCStreamSource::Callback final : public nsIInputStreamCallback
23
                                      , public nsIRunnable
24
                                      , public nsICancelableRunnable
25
{
26
public:
27
  explicit Callback(IPCStreamSource* aSource)
28
    : mSource(aSource)
29
    , mOwningEventTarget(GetCurrentThreadSerialEventTarget())
30
0
  {
31
0
    MOZ_ASSERT(mSource);
32
0
  }
33
34
  NS_IMETHOD
35
  OnInputStreamReady(nsIAsyncInputStream* aStream) override
36
0
  {
37
0
    // any thread
38
0
    if (mOwningEventTarget->IsOnCurrentThread()) {
39
0
      return Run();
40
0
    }
41
0
42
0
    // If this fails, then it means the owning thread is a Worker that has
43
0
    // been shutdown.  Its ok to lose the event in this case because the
44
0
    // IPCStreamChild listens for this event through the WorkerRef.
45
0
    nsresult rv = mOwningEventTarget->Dispatch(this, nsIThread::DISPATCH_NORMAL);
46
0
    if (NS_FAILED(rv)) {
47
0
      NS_WARNING("Failed to dispatch stream readable event to owning thread");
48
0
    }
49
0
50
0
    return NS_OK;
51
0
  }
52
53
  NS_IMETHOD
54
  Run() override
55
0
  {
56
0
    MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
57
0
    if (mSource) {
58
0
      mSource->OnStreamReady(this);
59
0
    }
60
0
    return NS_OK;
61
0
  }
62
63
  nsresult
64
  Cancel() override
65
0
  {
66
0
    // Cancel() gets called when the Worker thread is being shutdown.  We have
67
0
    // nothing to do here because IPCStreamChild handles this case via
68
0
    // the WorkerRef.
69
0
    return NS_OK;
70
0
  }
71
72
  void
73
  ClearSource()
74
0
  {
75
0
    MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
76
0
    MOZ_ASSERT(mSource);
77
0
    mSource = nullptr;
78
0
  }
79
80
private:
81
  ~Callback()
82
0
  {
83
0
    // called on any thread
84
0
85
0
    // ClearSource() should be called before the Callback is destroyed
86
0
    MOZ_ASSERT(!mSource);
87
0
  }
88
89
  // This is a raw pointer because the source keeps alive the callback and,
90
  // before beeing destroyed, it nullifies this pointer (this happens when
91
  // ActorDestroyed() is called).
92
  IPCStreamSource* mSource;
93
94
  nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;
95
96
  NS_DECL_THREADSAFE_ISUPPORTS
97
};
98
99
NS_IMPL_ISUPPORTS(IPCStreamSource::Callback, nsIInputStreamCallback,
100
                                             nsIRunnable,
101
                                             nsICancelableRunnable);
102
103
IPCStreamSource::IPCStreamSource(nsIAsyncInputStream* aInputStream)
104
  : mStream(aInputStream)
105
  , mState(ePending)
106
0
{
107
0
  MOZ_ASSERT(aInputStream);
108
0
}
109
110
IPCStreamSource::~IPCStreamSource()
111
0
{
112
0
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
113
0
  MOZ_ASSERT(mState == eClosed);
114
0
  MOZ_ASSERT(!mCallback);
115
0
  MOZ_ASSERT(!mWorkerRef);
116
0
}
117
118
bool
119
IPCStreamSource::Initialize()
120
0
{
121
0
  bool nonBlocking = false;
122
0
  MOZ_ALWAYS_TRUE(NS_SUCCEEDED(mStream->IsNonBlocking(&nonBlocking)));
123
0
  // IPCStreamChild reads in the current thread, so it is only supported on
124
0
  // non-blocking, async channels
125
0
  if (!nonBlocking) {
126
0
    return false;
127
0
  }
128
0
129
0
  // A source can be used on any thread, but we only support IPCStream on
130
0
  // main thread, Workers and PBackground thread right now.  This is due
131
0
  // to the requirement  that the thread be guaranteed to live long enough to
132
0
  // receive messages. We can enforce this guarantee with a StrongWorkerRef on
133
0
  // worker threads, but not other threads. Main-thread and PBackground thread
134
0
  // do not need anything special in order to be kept alive.
135
0
  if (!NS_IsMainThread()) {
136
0
    mozilla::dom::WorkerPrivate* workerPrivate =
137
0
      mozilla::dom::GetCurrentThreadWorkerPrivate();
138
0
    if (workerPrivate) {
139
0
      RefPtr<mozilla::dom::StrongWorkerRef> workerRef =
140
0
        mozilla::dom::StrongWorkerRef::Create(workerPrivate, "IPCStreamSource");
141
0
      if (NS_WARN_IF(!workerRef)) {
142
0
        return false;
143
0
      }
144
0
145
0
      mWorkerRef = std::move(workerRef);
146
0
    } else {
147
0
      AssertIsOnBackgroundThread();
148
0
    }
149
0
  }
150
0
151
0
  return true;
152
0
}
153
154
void
155
IPCStreamSource::ActorConstructed()
156
0
{
157
0
  MOZ_ASSERT(mState == ePending);
158
0
  mState = eActorConstructed;
159
0
}
160
161
void
162
IPCStreamSource::ActorDestroyed()
163
0
{
164
0
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
165
0
166
0
  mState = eClosed;
167
0
168
0
  if (mCallback) {
169
0
    mCallback->ClearSource();
170
0
    mCallback = nullptr;
171
0
  }
172
0
173
0
  mWorkerRef = nullptr;
174
0
}
175
176
void
177
IPCStreamSource::Start()
178
0
{
179
0
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
180
0
  DoRead();
181
0
}
182
183
void
184
IPCStreamSource::StartDestroy()
185
0
{
186
0
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
187
0
  OnEnd(NS_ERROR_ABORT);
188
0
}
189
190
void
191
IPCStreamSource::DoRead()
192
0
{
193
0
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
194
0
  MOZ_ASSERT(mState == eActorConstructed);
195
0
  MOZ_ASSERT(!mCallback);
196
0
197
0
  // The input stream (likely a pipe) probably uses a segment size of
198
0
  // 4kb.  If there is data already buffered it would be nice to aggregate
199
0
  // multiple segments into a single IPC call.  Conversely, don't send too
200
0
  // too large of a buffer in a single call to avoid spiking memory.
201
0
  static const uint64_t kMaxBytesPerMessage = 32 * 1024;
202
0
  static_assert(kMaxBytesPerMessage <= static_cast<uint64_t>(UINT32_MAX),
203
0
                "kMaxBytesPerMessage must cleanly cast to uint32_t");
204
0
205
0
  char buffer[kMaxBytesPerMessage];
206
0
207
0
  while (true) {
208
0
    // It should not be possible to transition to closed state without
209
0
    // this loop terminating via a return.
210
0
    MOZ_ASSERT(mState == eActorConstructed);
211
0
212
0
    // See if the stream is closed by checking the return of Available.
213
0
    uint64_t dummy;
214
0
    nsresult rv = mStream->Available(&dummy);
215
0
    if (NS_FAILED(rv)) {
216
0
      OnEnd(rv);
217
0
      return;
218
0
    }
219
0
220
0
    uint32_t bytesRead = 0;
221
0
    rv = mStream->Read(buffer, kMaxBytesPerMessage, &bytesRead);
222
0
223
0
    if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
224
0
      MOZ_ASSERT(bytesRead == 0);
225
0
      Wait();
226
0
      return;
227
0
    }
228
0
229
0
    if (NS_FAILED(rv)) {
230
0
      MOZ_ASSERT(bytesRead == 0);
231
0
      OnEnd(rv);
232
0
      return;
233
0
    }
234
0
235
0
    // Zero-byte read indicates end-of-stream.
236
0
    if (bytesRead == 0) {
237
0
      OnEnd(NS_BASE_STREAM_CLOSED);
238
0
      return;
239
0
    }
240
0
241
0
    // We read some data from the stream, send it across.
242
0
    SendData(ByteBuffer(bytesRead, reinterpret_cast<uint8_t*>(buffer)));
243
0
  }
244
0
}
245
246
void
247
IPCStreamSource::Wait()
248
0
{
249
0
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
250
0
  MOZ_ASSERT(mState == eActorConstructed);
251
0
  MOZ_ASSERT(!mCallback);
252
0
253
0
  // Set mCallback immediately instead of waiting for success.  Its possible
254
0
  // AsyncWait() will callback synchronously.
255
0
  mCallback = new Callback(this);
256
0
  nsresult rv = mStream->AsyncWait(mCallback, 0, 0, nullptr);
257
0
  if (NS_FAILED(rv)) {
258
0
    OnEnd(rv);
259
0
    return;
260
0
  }
261
0
}
262
263
void
264
IPCStreamSource::OnStreamReady(Callback* aCallback)
265
0
{
266
0
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
267
0
  MOZ_ASSERT(mCallback);
268
0
  MOZ_ASSERT(aCallback == mCallback);
269
0
  mCallback->ClearSource();
270
0
  mCallback = nullptr;
271
0
  DoRead();
272
0
}
273
274
void
275
IPCStreamSource::OnEnd(nsresult aRv)
276
0
{
277
0
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
278
0
  MOZ_ASSERT(aRv != NS_BASE_STREAM_WOULD_BLOCK);
279
0
280
0
  if (mState == eClosed) {
281
0
    return;
282
0
  }
283
0
284
0
  mState = eClosed;
285
0
286
0
  mStream->CloseWithStatus(aRv);
287
0
288
0
  if (aRv == NS_BASE_STREAM_CLOSED) {
289
0
    aRv = NS_OK;
290
0
  }
291
0
292
0
  // This will trigger an ActorDestroy() from the other side
293
0
  Close(aRv);
294
0
}
295
296
} // namespace ipc
297
} // namespace mozilla