/src/mozilla-central/xpcom/io/NonBlockingAsyncInputStream.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
3 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
4 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
5 | | |
6 | | #include "NonBlockingAsyncInputStream.h" |
7 | | #include "mozilla/ipc/InputStreamUtils.h" |
8 | | #include "nsISeekableStream.h" |
9 | | #include "nsStreamUtils.h" |
10 | | |
11 | | namespace mozilla { |
12 | | |
13 | | using namespace ipc; |
14 | | |
15 | | class NonBlockingAsyncInputStream::AsyncWaitRunnable final : public CancelableRunnable |
16 | | { |
17 | | RefPtr<NonBlockingAsyncInputStream> mStream; |
18 | | nsCOMPtr<nsIInputStreamCallback> mCallback; |
19 | | |
20 | | public: |
21 | | AsyncWaitRunnable(NonBlockingAsyncInputStream* aStream, |
22 | | nsIInputStreamCallback* aCallback) |
23 | | : CancelableRunnable("AsyncWaitRunnable") |
24 | | , mStream(aStream) |
25 | | , mCallback(aCallback) |
26 | 0 | {} |
27 | | |
28 | | NS_IMETHOD |
29 | | Run() override |
30 | 0 | { |
31 | 0 | mStream->RunAsyncWaitCallback(this, mCallback.forget()); |
32 | 0 | return NS_OK; |
33 | 0 | } |
34 | | }; |
35 | | |
36 | | NS_IMPL_ADDREF(NonBlockingAsyncInputStream); |
37 | | NS_IMPL_RELEASE(NonBlockingAsyncInputStream); |
38 | | |
39 | | NonBlockingAsyncInputStream::WaitClosureOnly::WaitClosureOnly(AsyncWaitRunnable* aRunnable, |
40 | | nsIEventTarget* aEventTarget) |
41 | | : mRunnable(aRunnable) |
42 | | , mEventTarget(aEventTarget) |
43 | 0 | {} |
44 | | |
45 | 0 | NS_INTERFACE_MAP_BEGIN(NonBlockingAsyncInputStream) |
46 | 0 | NS_INTERFACE_MAP_ENTRY(nsIInputStream) |
47 | 0 | NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream) |
48 | 0 | NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream, |
49 | 0 | mWeakCloneableInputStream) |
50 | 0 | NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream, |
51 | 0 | mWeakIPCSerializableInputStream) |
52 | 0 | NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, |
53 | 0 | mWeakSeekableInputStream) |
54 | 0 | NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream) |
55 | 0 | NS_INTERFACE_MAP_END |
56 | | |
57 | | /* static */ nsresult |
58 | | NonBlockingAsyncInputStream::Create(already_AddRefed<nsIInputStream> aInputStream, |
59 | | nsIAsyncInputStream** aResult) |
60 | 0 | { |
61 | 0 | MOZ_DIAGNOSTIC_ASSERT(aResult); |
62 | 0 |
|
63 | 0 | nsCOMPtr<nsIInputStream> inputStream = std::move(aInputStream); |
64 | 0 |
|
65 | 0 | bool nonBlocking = false; |
66 | 0 | nsresult rv = inputStream->IsNonBlocking(&nonBlocking); |
67 | 0 | if (NS_WARN_IF(NS_FAILED(rv))) { |
68 | 0 | return rv; |
69 | 0 | } |
70 | 0 | |
71 | 0 | MOZ_DIAGNOSTIC_ASSERT(nonBlocking); |
72 | 0 |
|
73 | 0 | #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED |
74 | 0 | nsCOMPtr<nsIAsyncInputStream> asyncInputStream = |
75 | 0 | do_QueryInterface(inputStream); |
76 | 0 | MOZ_DIAGNOSTIC_ASSERT(!asyncInputStream); |
77 | 0 | #endif // MOZ_DIAGNOSTIC_ASSERT_ENABLED |
78 | 0 |
|
79 | 0 | RefPtr<NonBlockingAsyncInputStream> stream = |
80 | 0 | new NonBlockingAsyncInputStream(inputStream.forget()); |
81 | 0 |
|
82 | 0 | stream.forget(aResult); |
83 | 0 | return NS_OK; |
84 | 0 | } |
85 | | |
86 | | NonBlockingAsyncInputStream::NonBlockingAsyncInputStream(already_AddRefed<nsIInputStream> aInputStream) |
87 | | : mInputStream(std::move(aInputStream)) |
88 | | , mWeakCloneableInputStream(nullptr) |
89 | | , mWeakIPCSerializableInputStream(nullptr) |
90 | | , mWeakSeekableInputStream(nullptr) |
91 | | , mLock("NonBlockingAsyncInputStream::mLock") |
92 | | , mClosed(false) |
93 | 0 | { |
94 | 0 | MOZ_ASSERT(mInputStream); |
95 | 0 |
|
96 | 0 | nsCOMPtr<nsICloneableInputStream> cloneableStream = |
97 | 0 | do_QueryInterface(mInputStream); |
98 | 0 | if (cloneableStream && SameCOMIdentity(mInputStream, cloneableStream)) { |
99 | 0 | mWeakCloneableInputStream = cloneableStream; |
100 | 0 | } |
101 | 0 |
|
102 | 0 | nsCOMPtr<nsIIPCSerializableInputStream> serializableStream = |
103 | 0 | do_QueryInterface(mInputStream); |
104 | 0 | if (serializableStream && |
105 | 0 | SameCOMIdentity(mInputStream, serializableStream)) { |
106 | 0 | mWeakIPCSerializableInputStream = serializableStream; |
107 | 0 | } |
108 | 0 |
|
109 | 0 | nsCOMPtr<nsISeekableStream> seekableStream = |
110 | 0 | do_QueryInterface(mInputStream); |
111 | 0 | if (seekableStream && SameCOMIdentity(mInputStream, seekableStream)) { |
112 | 0 | mWeakSeekableInputStream = seekableStream; |
113 | 0 | } |
114 | 0 | } |
115 | | |
116 | | NonBlockingAsyncInputStream::~NonBlockingAsyncInputStream() |
117 | 0 | {} |
118 | | |
119 | | NS_IMETHODIMP |
120 | | NonBlockingAsyncInputStream::Close() |
121 | 0 | { |
122 | 0 | RefPtr<AsyncWaitRunnable> waitClosureOnlyRunnable; |
123 | 0 | nsCOMPtr<nsIEventTarget> waitClosureOnlyEventTarget; |
124 | 0 |
|
125 | 0 | { |
126 | 0 | MutexAutoLock lock(mLock); |
127 | 0 |
|
128 | 0 | if (mClosed) { |
129 | 0 | // Here we could return NS_BASE_STREAM_CLOSED as well, but just to avoid |
130 | 0 | // warning messages, let's make everybody happy with a NS_OK. |
131 | 0 | return NS_OK; |
132 | 0 | } |
133 | 0 | |
134 | 0 | mClosed = true; |
135 | 0 |
|
136 | 0 | NS_ENSURE_STATE(mInputStream); |
137 | 0 | nsresult rv = mInputStream->Close(); |
138 | 0 | if (NS_WARN_IF(NS_FAILED(rv))) { |
139 | 0 | mWaitClosureOnly.reset(); |
140 | 0 | return rv; |
141 | 0 | } |
142 | 0 | |
143 | 0 | // If we have a WaitClosureOnly runnable, it's time to use it. |
144 | 0 | if (mWaitClosureOnly.isSome()) { |
145 | 0 | waitClosureOnlyRunnable = std::move(mWaitClosureOnly->mRunnable); |
146 | 0 | waitClosureOnlyEventTarget = std::move(mWaitClosureOnly->mEventTarget); |
147 | 0 |
|
148 | 0 | mWaitClosureOnly.reset(); |
149 | 0 |
|
150 | 0 | // Now we want to dispatch the asyncWaitCallback. |
151 | 0 | mAsyncWaitCallback = waitClosureOnlyRunnable; |
152 | 0 | } |
153 | 0 | } |
154 | 0 |
|
155 | 0 | if (waitClosureOnlyRunnable) { |
156 | 0 | if (waitClosureOnlyEventTarget) { |
157 | 0 | waitClosureOnlyEventTarget->Dispatch(waitClosureOnlyRunnable, |
158 | 0 | NS_DISPATCH_NORMAL); |
159 | 0 | } else { |
160 | 0 | waitClosureOnlyRunnable->Run(); |
161 | 0 | } |
162 | 0 | } |
163 | 0 |
|
164 | 0 | return NS_OK; |
165 | 0 | } |
166 | | |
167 | | // nsIInputStream interface |
168 | | |
169 | | NS_IMETHODIMP |
170 | | NonBlockingAsyncInputStream::Available(uint64_t* aLength) |
171 | 0 | { |
172 | 0 | return mInputStream->Available(aLength); |
173 | 0 | } |
174 | | |
175 | | NS_IMETHODIMP |
176 | | NonBlockingAsyncInputStream::Read(char* aBuffer, uint32_t aCount, |
177 | | uint32_t* aReadCount) |
178 | 0 | { |
179 | 0 | return mInputStream->Read(aBuffer, aCount, aReadCount); |
180 | 0 | } |
181 | | |
182 | | namespace { |
183 | | |
184 | | class MOZ_RAII ReadSegmentsData |
185 | | { |
186 | | public: |
187 | | ReadSegmentsData(NonBlockingAsyncInputStream* aStream, |
188 | | nsWriteSegmentFun aFunc, |
189 | | void* aClosure) |
190 | | : mStream(aStream) |
191 | | , mFunc(aFunc) |
192 | | , mClosure(aClosure) |
193 | 0 | {} |
194 | | |
195 | | NonBlockingAsyncInputStream* mStream; |
196 | | nsWriteSegmentFun mFunc; |
197 | | void* mClosure; |
198 | | }; |
199 | | |
200 | | nsresult |
201 | | ReadSegmentsWriter(nsIInputStream* aInStream, |
202 | | void* aClosure, |
203 | | const char* aFromSegment, |
204 | | uint32_t aToOffset, |
205 | | uint32_t aCount, |
206 | | uint32_t* aWriteCount) |
207 | 0 | { |
208 | 0 | ReadSegmentsData* data = static_cast<ReadSegmentsData*>(aClosure); |
209 | 0 | return data->mFunc(data->mStream, data->mClosure, aFromSegment, aToOffset, |
210 | 0 | aCount, aWriteCount); |
211 | 0 | } |
212 | | |
213 | | } // anonymous |
214 | | |
215 | | NS_IMETHODIMP |
216 | | NonBlockingAsyncInputStream::ReadSegments(nsWriteSegmentFun aWriter, |
217 | | void* aClosure, uint32_t aCount, |
218 | | uint32_t* aResult) |
219 | 0 | { |
220 | 0 | ReadSegmentsData data(this, aWriter, aClosure); |
221 | 0 | return mInputStream->ReadSegments(ReadSegmentsWriter, &data, aCount, aResult); |
222 | 0 | } |
223 | | |
224 | | NS_IMETHODIMP |
225 | | NonBlockingAsyncInputStream::IsNonBlocking(bool* aNonBlocking) |
226 | 0 | { |
227 | 0 | *aNonBlocking = true; |
228 | 0 | return NS_OK; |
229 | 0 | } |
230 | | |
231 | | // nsICloneableInputStream interface |
232 | | |
233 | | NS_IMETHODIMP |
234 | | NonBlockingAsyncInputStream::GetCloneable(bool* aCloneable) |
235 | 0 | { |
236 | 0 | NS_ENSURE_STATE(mWeakCloneableInputStream); |
237 | 0 | return mWeakCloneableInputStream->GetCloneable(aCloneable); |
238 | 0 | } |
239 | | |
240 | | NS_IMETHODIMP |
241 | | NonBlockingAsyncInputStream::Clone(nsIInputStream** aResult) |
242 | 0 | { |
243 | 0 | NS_ENSURE_STATE(mWeakCloneableInputStream); |
244 | 0 |
|
245 | 0 | nsCOMPtr<nsIInputStream> clonedStream; |
246 | 0 | nsresult rv = mWeakCloneableInputStream->Clone(getter_AddRefs(clonedStream)); |
247 | 0 | if (NS_WARN_IF(NS_FAILED(rv))) { |
248 | 0 | return rv; |
249 | 0 | } |
250 | 0 | |
251 | 0 | nsCOMPtr<nsIAsyncInputStream> asyncStream; |
252 | 0 | rv = Create(clonedStream.forget(), getter_AddRefs(asyncStream)); |
253 | 0 | if (NS_WARN_IF(NS_FAILED(rv))) { |
254 | 0 | return rv; |
255 | 0 | } |
256 | 0 | |
257 | 0 | asyncStream.forget(aResult); |
258 | 0 | return NS_OK; |
259 | 0 | } |
260 | | |
261 | | // nsIAsyncInputStream interface |
262 | | |
263 | | NS_IMETHODIMP |
264 | | NonBlockingAsyncInputStream::CloseWithStatus(nsresult aStatus) |
265 | 0 | { |
266 | 0 | return Close(); |
267 | 0 | } |
268 | | |
269 | | NS_IMETHODIMP |
270 | | NonBlockingAsyncInputStream::AsyncWait(nsIInputStreamCallback* aCallback, |
271 | | uint32_t aFlags, |
272 | | uint32_t aRequestedCount, |
273 | | nsIEventTarget* aEventTarget) |
274 | 0 | { |
275 | 0 | RefPtr<AsyncWaitRunnable> runnable; |
276 | 0 | { |
277 | 0 | MutexAutoLock lock(mLock); |
278 | 0 |
|
279 | 0 | if (aCallback && (mWaitClosureOnly.isSome() || mAsyncWaitCallback)) { |
280 | 0 | return NS_ERROR_FAILURE; |
281 | 0 | } |
282 | 0 | |
283 | 0 | if (!aCallback) { |
284 | 0 | // Canceling previous callbacks. |
285 | 0 | mWaitClosureOnly.reset(); |
286 | 0 | mAsyncWaitCallback = nullptr; |
287 | 0 | return NS_OK; |
288 | 0 | } |
289 | 0 | |
290 | 0 | // Maybe the stream is already closed. |
291 | 0 | if (!mClosed) { |
292 | 0 | uint64_t length; |
293 | 0 | nsresult rv = mInputStream->Available(&length); |
294 | 0 | if (NS_SUCCEEDED(rv) && length == 0) { |
295 | 0 | mInputStream->Close(); |
296 | 0 | mClosed = true; |
297 | 0 | } |
298 | 0 | } |
299 | 0 |
|
300 | 0 | runnable = new AsyncWaitRunnable(this, aCallback); |
301 | 0 | if ((aFlags & nsIAsyncInputStream::WAIT_CLOSURE_ONLY) && !mClosed) { |
302 | 0 | mWaitClosureOnly.emplace(runnable, aEventTarget); |
303 | 0 | return NS_OK; |
304 | 0 | } |
305 | 0 | |
306 | 0 | mAsyncWaitCallback = runnable; |
307 | 0 | } |
308 | 0 |
|
309 | 0 | MOZ_ASSERT(runnable); |
310 | 0 |
|
311 | 0 | if (aEventTarget) { |
312 | 0 | return aEventTarget->Dispatch(runnable.forget()); |
313 | 0 | } |
314 | 0 | |
315 | 0 | return runnable->Run(); |
316 | 0 | } |
317 | | |
318 | | // nsIIPCSerializableInputStream |
319 | | |
320 | | void |
321 | | NonBlockingAsyncInputStream::Serialize(mozilla::ipc::InputStreamParams& aParams, |
322 | | FileDescriptorArray& aFileDescriptors) |
323 | 0 | { |
324 | 0 | MOZ_ASSERT(mWeakIPCSerializableInputStream); |
325 | 0 | InputStreamHelper::SerializeInputStream(mInputStream, aParams, |
326 | 0 | aFileDescriptors); |
327 | 0 | } |
328 | | |
329 | | bool |
330 | | NonBlockingAsyncInputStream::Deserialize(const mozilla::ipc::InputStreamParams& aParams, |
331 | | const FileDescriptorArray& aFileDescriptors) |
332 | 0 | { |
333 | 0 | MOZ_CRASH("NonBlockingAsyncInputStream cannot be deserialized!"); |
334 | 0 | return true; |
335 | 0 | } |
336 | | |
337 | | Maybe<uint64_t> |
338 | | NonBlockingAsyncInputStream::ExpectedSerializedLength() |
339 | 0 | { |
340 | 0 | NS_ENSURE_TRUE(mWeakIPCSerializableInputStream, Nothing()); |
341 | 0 | return mWeakIPCSerializableInputStream->ExpectedSerializedLength(); |
342 | 0 | } |
343 | | |
344 | | // nsISeekableStream |
345 | | |
346 | | NS_IMETHODIMP |
347 | | NonBlockingAsyncInputStream::Seek(int32_t aWhence, int64_t aOffset) |
348 | 0 | { |
349 | 0 | NS_ENSURE_STATE(mWeakSeekableInputStream); |
350 | 0 | return mWeakSeekableInputStream->Seek(aWhence, aOffset); |
351 | 0 | } |
352 | | |
353 | | NS_IMETHODIMP |
354 | | NonBlockingAsyncInputStream::Tell(int64_t* aResult) |
355 | 0 | { |
356 | 0 | NS_ENSURE_STATE(mWeakSeekableInputStream); |
357 | 0 | return mWeakSeekableInputStream->Tell(aResult); |
358 | 0 | } |
359 | | |
360 | | NS_IMETHODIMP |
361 | | NonBlockingAsyncInputStream::SetEOF() |
362 | 0 | { |
363 | 0 | NS_ENSURE_STATE(mWeakSeekableInputStream); |
364 | 0 | return NS_ERROR_NOT_IMPLEMENTED; |
365 | 0 | } |
366 | | |
367 | | void |
368 | | NonBlockingAsyncInputStream::RunAsyncWaitCallback(NonBlockingAsyncInputStream::AsyncWaitRunnable* aRunnable, |
369 | | already_AddRefed<nsIInputStreamCallback> aCallback) |
370 | 0 | { |
371 | 0 | nsCOMPtr<nsIInputStreamCallback> callback = std::move(aCallback); |
372 | 0 |
|
373 | 0 | { |
374 | 0 | MutexAutoLock lock(mLock); |
375 | 0 | if (mAsyncWaitCallback != aRunnable) { |
376 | 0 | // The callback has been canceled in the meantime. |
377 | 0 | return; |
378 | 0 | } |
379 | 0 | |
380 | 0 | mAsyncWaitCallback = nullptr; |
381 | 0 | } |
382 | 0 |
|
383 | 0 | callback->OnInputStreamReady(this); |
384 | 0 | } |
385 | | |
386 | | } // mozilla namespace |