/src/mozilla-central/netwerk/base/nsStreamTransportService.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
2 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
3 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
4 | | |
5 | | #include "nsStreamTransportService.h" |
6 | | #include "nsXPCOMCIDInternal.h" |
7 | | #include "nsNetSegmentUtils.h" |
8 | | #include "nsTransportUtils.h" |
9 | | #include "nsStreamUtils.h" |
10 | | #include "nsError.h" |
11 | | #include "nsNetCID.h" |
12 | | |
13 | | #include "nsIAsyncInputStream.h" |
14 | | #include "nsIAsyncOutputStream.h" |
15 | | #include "nsISeekableStream.h" |
16 | | #include "nsIPipe.h" |
17 | | #include "nsITransport.h" |
18 | | #include "nsIObserverService.h" |
19 | | #include "nsThreadPool.h" |
20 | | #include "mozilla/Services.h" |
21 | | |
22 | | namespace mozilla { |
23 | | namespace net { |
24 | | |
25 | | //----------------------------------------------------------------------------- |
26 | | // nsInputStreamTransport |
27 | | // |
28 | | // Implements nsIInputStream as a wrapper around the real input stream. This |
29 | | // allows the transport to support seeking, range-limiting, progress reporting, |
30 | | // and close-when-done semantics while utilizing NS_AsyncCopy. |
31 | | //----------------------------------------------------------------------------- |
32 | | |
33 | | class nsInputStreamTransport : public nsITransport |
34 | | , public nsIInputStream |
35 | | { |
36 | | public: |
37 | | // Record refcount changes to ensure that stream transports are destroyed |
38 | | // on consistent threads when recording/replaying. |
39 | | NS_DECL_THREADSAFE_ISUPPORTS_WITH_RECORDING(recordreplay::Behavior::Preserve) |
40 | | NS_DECL_NSITRANSPORT |
41 | | NS_DECL_NSIINPUTSTREAM |
42 | | |
43 | | nsInputStreamTransport(nsIInputStream *source, |
44 | | bool closeWhenDone) |
45 | | : mSource(source) |
46 | | , mOffset(0) |
47 | | , mCloseWhenDone(closeWhenDone) |
48 | | , mInProgress(false) |
49 | 0 | { |
50 | 0 | } |
51 | | |
52 | | private: |
53 | 0 | virtual ~nsInputStreamTransport() = default; |
54 | | |
55 | | nsCOMPtr<nsIAsyncInputStream> mPipeIn; |
56 | | |
57 | | // while the copy is active, these members may only be accessed from the |
58 | | // nsIInputStream implementation. |
59 | | nsCOMPtr<nsITransportEventSink> mEventSink; |
60 | | nsCOMPtr<nsIInputStream> mSource; |
61 | | int64_t mOffset; |
62 | | bool mCloseWhenDone; |
63 | | |
64 | | // this variable serves as a lock to prevent the state of the transport |
65 | | // from being modified once the copy is in progress. |
66 | | bool mInProgress; |
67 | | }; |
68 | | |
69 | | NS_IMPL_ISUPPORTS(nsInputStreamTransport, |
70 | | nsITransport, |
71 | | nsIInputStream) |
72 | | |
73 | | /** nsITransport **/ |
74 | | |
75 | | NS_IMETHODIMP |
76 | | nsInputStreamTransport::OpenInputStream(uint32_t flags, |
77 | | uint32_t segsize, |
78 | | uint32_t segcount, |
79 | | nsIInputStream **result) |
80 | 0 | { |
81 | 0 | NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); |
82 | 0 |
|
83 | 0 | nsresult rv; |
84 | 0 | nsCOMPtr<nsIEventTarget> target = |
85 | 0 | do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv); |
86 | 0 | if (NS_FAILED(rv)) return rv; |
87 | 0 | |
88 | 0 | // XXX if the caller requests an unbuffered stream, then perhaps |
89 | 0 | // we'd want to simply return mSource; however, then we would |
90 | 0 | // not be reading mSource on a background thread. is this ok? |
91 | 0 | |
92 | 0 | bool nonblocking = !(flags & OPEN_BLOCKING); |
93 | 0 |
|
94 | 0 | net_ResolveSegmentParams(segsize, segcount); |
95 | 0 |
|
96 | 0 | nsCOMPtr<nsIAsyncOutputStream> pipeOut; |
97 | 0 | rv = NS_NewPipe2(getter_AddRefs(mPipeIn), |
98 | 0 | getter_AddRefs(pipeOut), |
99 | 0 | nonblocking, true, |
100 | 0 | segsize, segcount); |
101 | 0 | if (NS_FAILED(rv)) return rv; |
102 | 0 | |
103 | 0 | mInProgress = true; |
104 | 0 |
|
105 | 0 | // startup async copy process... |
106 | 0 | rv = NS_AsyncCopy(this, pipeOut, target, |
107 | 0 | NS_ASYNCCOPY_VIA_WRITESEGMENTS, segsize); |
108 | 0 | if (NS_SUCCEEDED(rv)) |
109 | 0 | NS_ADDREF(*result = mPipeIn); |
110 | 0 |
|
111 | 0 | return rv; |
112 | 0 | } |
113 | | |
114 | | NS_IMETHODIMP |
115 | | nsInputStreamTransport::OpenOutputStream(uint32_t flags, |
116 | | uint32_t segsize, |
117 | | uint32_t segcount, |
118 | | nsIOutputStream **result) |
119 | 0 | { |
120 | 0 | // this transport only supports reading! |
121 | 0 | MOZ_ASSERT_UNREACHABLE("nsInputStreamTransport::OpenOutputStream"); |
122 | 0 | return NS_ERROR_UNEXPECTED; |
123 | 0 | } |
124 | | |
125 | | NS_IMETHODIMP |
126 | | nsInputStreamTransport::Close(nsresult reason) |
127 | 0 | { |
128 | 0 | if (NS_SUCCEEDED(reason)) |
129 | 0 | reason = NS_BASE_STREAM_CLOSED; |
130 | 0 |
|
131 | 0 | return mPipeIn->CloseWithStatus(reason); |
132 | 0 | } |
133 | | |
134 | | NS_IMETHODIMP |
135 | | nsInputStreamTransport::SetEventSink(nsITransportEventSink *sink, |
136 | | nsIEventTarget *target) |
137 | 0 | { |
138 | 0 | NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); |
139 | 0 |
|
140 | 0 | if (target) |
141 | 0 | return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), |
142 | 0 | sink, target); |
143 | 0 | |
144 | 0 | mEventSink = sink; |
145 | 0 | return NS_OK; |
146 | 0 | } |
147 | | |
148 | | /** nsIInputStream **/ |
149 | | |
150 | | NS_IMETHODIMP |
151 | | nsInputStreamTransport::Close() |
152 | 0 | { |
153 | 0 | if (mCloseWhenDone) |
154 | 0 | mSource->Close(); |
155 | 0 |
|
156 | 0 | // make additional reads return early... |
157 | 0 | mOffset = 0; |
158 | 0 | return NS_OK; |
159 | 0 | } |
160 | | |
161 | | NS_IMETHODIMP |
162 | | nsInputStreamTransport::Available(uint64_t *result) |
163 | 0 | { |
164 | 0 | return NS_ERROR_NOT_IMPLEMENTED; |
165 | 0 | } |
166 | | |
167 | | NS_IMETHODIMP |
168 | | nsInputStreamTransport::Read(char *buf, uint32_t count, uint32_t *result) |
169 | 0 | { |
170 | 0 | nsresult rv = mSource->Read(buf, count, result); |
171 | 0 |
|
172 | 0 | if (NS_SUCCEEDED(rv)) { |
173 | 0 | mOffset += *result; |
174 | 0 | if (mEventSink) |
175 | 0 | mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset, |
176 | 0 | -1); |
177 | 0 | } |
178 | 0 | return rv; |
179 | 0 | } |
180 | | |
181 | | NS_IMETHODIMP |
182 | | nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void *closure, |
183 | | uint32_t count, uint32_t *result) |
184 | 0 | { |
185 | 0 | return NS_ERROR_NOT_IMPLEMENTED; |
186 | 0 | } |
187 | | |
188 | | NS_IMETHODIMP |
189 | | nsInputStreamTransport::IsNonBlocking(bool *result) |
190 | 0 | { |
191 | 0 | *result = false; |
192 | 0 | return NS_OK; |
193 | 0 | } |
194 | | |
195 | | //----------------------------------------------------------------------------- |
196 | | // nsStreamTransportService |
197 | | //----------------------------------------------------------------------------- |
198 | | |
199 | | nsStreamTransportService::~nsStreamTransportService() |
200 | 0 | { |
201 | 0 | NS_ASSERTION(!mPool, "thread pool wasn't shutdown"); |
202 | 0 | } |
203 | | |
204 | | nsresult |
205 | | nsStreamTransportService::Init() |
206 | 1 | { |
207 | 1 | mPool = new nsThreadPool(); |
208 | 1 | |
209 | 1 | // Configure the pool |
210 | 1 | mPool->SetName(NS_LITERAL_CSTRING("StreamTrans")); |
211 | 1 | mPool->SetThreadLimit(25); |
212 | 1 | mPool->SetIdleThreadLimit(1); |
213 | 1 | mPool->SetIdleThreadTimeout(PR_SecondsToInterval(30)); |
214 | 1 | |
215 | 1 | nsCOMPtr<nsIObserverService> obsSvc = |
216 | 1 | mozilla::services::GetObserverService(); |
217 | 1 | if (obsSvc) |
218 | 1 | obsSvc->AddObserver(this, "xpcom-shutdown-threads", false); |
219 | 1 | return NS_OK; |
220 | 1 | } |
221 | | |
222 | | NS_IMPL_ISUPPORTS(nsStreamTransportService, |
223 | | nsIStreamTransportService, |
224 | | nsIEventTarget, |
225 | | nsIObserver) |
226 | | |
227 | | NS_IMETHODIMP |
228 | | nsStreamTransportService::DispatchFromScript(nsIRunnable *task, uint32_t flags) |
229 | 0 | { |
230 | 0 | nsCOMPtr<nsIRunnable> event(task); |
231 | 0 | return Dispatch(event.forget(), flags); |
232 | 0 | } |
233 | | |
234 | | NS_IMETHODIMP |
235 | | nsStreamTransportService::Dispatch(already_AddRefed<nsIRunnable> task, uint32_t flags) |
236 | 0 | { |
237 | 0 | nsCOMPtr<nsIRunnable> event(task); // so it gets released on failure paths |
238 | 0 | nsCOMPtr<nsIThreadPool> pool; |
239 | 0 | { |
240 | 0 | mozilla::MutexAutoLock lock(mShutdownLock); |
241 | 0 | if (mIsShutdown) { |
242 | 0 | return NS_ERROR_NOT_INITIALIZED; |
243 | 0 | } |
244 | 0 | pool = mPool; |
245 | 0 | } |
246 | 0 | NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED); |
247 | 0 | return pool->Dispatch(event.forget(), flags); |
248 | 0 | } |
249 | | |
250 | | NS_IMETHODIMP |
251 | | nsStreamTransportService::DelayedDispatch(already_AddRefed<nsIRunnable>, uint32_t) |
252 | 0 | { |
253 | 0 | return NS_ERROR_NOT_IMPLEMENTED; |
254 | 0 | } |
255 | | |
256 | | NS_IMETHODIMP_(bool) |
257 | | nsStreamTransportService::IsOnCurrentThreadInfallible() |
258 | 0 | { |
259 | 0 | nsCOMPtr<nsIThreadPool> pool; |
260 | 0 | { |
261 | 0 | mozilla::MutexAutoLock lock(mShutdownLock); |
262 | 0 | pool = mPool; |
263 | 0 | } |
264 | 0 | if (!pool) { |
265 | 0 | return false; |
266 | 0 | } |
267 | 0 | return pool->IsOnCurrentThread(); |
268 | 0 | } |
269 | | |
270 | | NS_IMETHODIMP |
271 | | nsStreamTransportService::IsOnCurrentThread(bool *result) |
272 | 0 | { |
273 | 0 | nsCOMPtr<nsIThreadPool> pool; |
274 | 0 | { |
275 | 0 | mozilla::MutexAutoLock lock(mShutdownLock); |
276 | 0 | if (mIsShutdown) { |
277 | 0 | return NS_ERROR_NOT_INITIALIZED; |
278 | 0 | } |
279 | 0 | pool = mPool; |
280 | 0 | } |
281 | 0 | NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED); |
282 | 0 | return pool->IsOnCurrentThread(result); |
283 | 0 | } |
284 | | |
285 | | NS_IMETHODIMP |
286 | | nsStreamTransportService::CreateInputTransport(nsIInputStream *stream, |
287 | | bool closeWhenDone, |
288 | | nsITransport **result) |
289 | 0 | { |
290 | 0 | nsInputStreamTransport *trans = |
291 | 0 | new nsInputStreamTransport(stream, closeWhenDone); |
292 | 0 | if (!trans) |
293 | 0 | return NS_ERROR_OUT_OF_MEMORY; |
294 | 0 | NS_ADDREF(*result = trans); |
295 | 0 | return NS_OK; |
296 | 0 | } |
297 | | |
298 | | NS_IMETHODIMP |
299 | | nsStreamTransportService::Observe(nsISupports *subject, const char *topic, |
300 | | const char16_t *data) |
301 | 0 | { |
302 | 0 | NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops"); |
303 | 0 |
|
304 | 0 | { |
305 | 0 | mozilla::MutexAutoLock lock(mShutdownLock); |
306 | 0 | mIsShutdown = true; |
307 | 0 | } |
308 | 0 |
|
309 | 0 | if (mPool) { |
310 | 0 | mPool->Shutdown(); |
311 | 0 | mPool = nullptr; |
312 | 0 | } |
313 | 0 | return NS_OK; |
314 | 0 | } |
315 | | |
316 | | class AvailableEvent final : public Runnable |
317 | | { |
318 | | public: |
319 | | AvailableEvent(nsIInputStream* stream, |
320 | | nsIInputAvailableCallback* callback) |
321 | | : Runnable("net::AvailableEvent") |
322 | | , mStream(stream) |
323 | | , mCallback(callback) |
324 | | , mDoingCallback(false) |
325 | | , mSize(0) |
326 | | , mResultForCallback(NS_OK) |
327 | 0 | { |
328 | 0 | mCallbackTarget = GetCurrentThreadEventTarget(); |
329 | 0 | } |
330 | | |
331 | | NS_IMETHOD Run() override |
332 | 0 | { |
333 | 0 | if (mDoingCallback) { |
334 | 0 | // pong |
335 | 0 | mCallback->OnInputAvailableComplete(mSize, mResultForCallback); |
336 | 0 | mCallback = nullptr; |
337 | 0 | } else { |
338 | 0 | // ping |
339 | 0 | mResultForCallback = mStream->Available(&mSize); |
340 | 0 | mStream = nullptr; |
341 | 0 | mDoingCallback = true; |
342 | 0 |
|
343 | 0 | nsCOMPtr<nsIRunnable> event(this); // overly cute |
344 | 0 | mCallbackTarget->Dispatch(event.forget(), NS_DISPATCH_NORMAL); |
345 | 0 | mCallbackTarget = nullptr; |
346 | 0 | } |
347 | 0 | return NS_OK; |
348 | 0 | } |
349 | | |
350 | | private: |
351 | 0 | virtual ~AvailableEvent() = default; |
352 | | |
353 | | nsCOMPtr<nsIInputStream> mStream; |
354 | | nsCOMPtr<nsIInputAvailableCallback> mCallback; |
355 | | nsCOMPtr<nsIEventTarget> mCallbackTarget; |
356 | | bool mDoingCallback; |
357 | | uint64_t mSize; |
358 | | nsresult mResultForCallback; |
359 | | }; |
360 | | |
361 | | NS_IMETHODIMP |
362 | | nsStreamTransportService::InputAvailable(nsIInputStream *stream, |
363 | | nsIInputAvailableCallback *callback) |
364 | 0 | { |
365 | 0 | nsCOMPtr<nsIThreadPool> pool; |
366 | 0 | { |
367 | 0 | mozilla::MutexAutoLock lock(mShutdownLock); |
368 | 0 | if (mIsShutdown) { |
369 | 0 | return NS_ERROR_NOT_INITIALIZED; |
370 | 0 | } |
371 | 0 | pool = mPool; |
372 | 0 | } |
373 | 0 | nsCOMPtr<nsIRunnable> event = new AvailableEvent(stream, callback); |
374 | 0 | return pool->Dispatch(event.forget(), NS_DISPATCH_NORMAL); |
375 | 0 | } |
376 | | |
377 | | } // namespace net |
378 | | } // namespace mozilla |