/src/mozilla-central/xpcom/io/nsInputStreamTee.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ |
2 | | /* vim: set ts=8 sts=2 et sw=2 tw=80: */ |
3 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
4 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
5 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
6 | | |
7 | | #include <stdlib.h> |
8 | | #include "mozilla/Logging.h" |
9 | | |
10 | | #include "mozilla/Mutex.h" |
11 | | #include "mozilla/Attributes.h" |
12 | | #include "nsIInputStreamTee.h" |
13 | | #include "nsIInputStream.h" |
14 | | #include "nsIOutputStream.h" |
15 | | #include "nsCOMPtr.h" |
16 | | #include "nsAutoPtr.h" |
17 | | #include "nsIEventTarget.h" |
18 | | #include "nsThreadUtils.h" |
19 | | |
20 | | using namespace mozilla; |
21 | | |
22 | | #ifdef LOG |
23 | | #undef LOG |
24 | | #endif |
25 | | |
26 | | static LazyLogModule sTeeLog("nsInputStreamTee"); |
27 | 0 | #define LOG(args) MOZ_LOG(sTeeLog, mozilla::LogLevel::Debug, args) |
28 | | |
29 | | class nsInputStreamTee final : public nsIInputStreamTee |
30 | | { |
31 | | public: |
32 | | NS_DECL_THREADSAFE_ISUPPORTS |
33 | | NS_DECL_NSIINPUTSTREAM |
34 | | NS_DECL_NSIINPUTSTREAMTEE |
35 | | |
36 | | nsInputStreamTee(); |
37 | | bool SinkIsValid(); |
38 | | void InvalidateSink(); |
39 | | |
40 | | private: |
41 | | ~nsInputStreamTee() |
42 | 0 | { |
43 | 0 | } |
44 | | |
45 | | nsresult TeeSegment(const char* aBuf, uint32_t aCount); |
46 | | |
47 | | static nsresult WriteSegmentFun(nsIInputStream*, void*, const char*, |
48 | | uint32_t, uint32_t, uint32_t*); |
49 | | |
50 | | private: |
51 | | nsCOMPtr<nsIInputStream> mSource; |
52 | | nsCOMPtr<nsIOutputStream> mSink; |
53 | | nsCOMPtr<nsIEventTarget> mEventTarget; |
54 | | nsWriteSegmentFun mWriter; // for implementing ReadSegments |
55 | | void* mClosure; // for implementing ReadSegments |
56 | | nsAutoPtr<Mutex> mLock; // synchronize access to mSinkIsValid |
57 | | bool mSinkIsValid; // False if TeeWriteEvent fails |
58 | | }; |
59 | | |
60 | | class nsInputStreamTeeWriteEvent : public Runnable |
61 | | { |
62 | | public: |
63 | | // aTee's lock is held across construction of this object |
64 | | nsInputStreamTeeWriteEvent(const char* aBuf, |
65 | | uint32_t aCount, |
66 | | nsIOutputStream* aSink, |
67 | | nsInputStreamTee* aTee) |
68 | | : mozilla::Runnable("nsInputStreamTeeWriteEvent") |
69 | 0 | { |
70 | 0 | // copy the buffer - will be free'd by dtor |
71 | 0 | mBuf = (char*)malloc(aCount); |
72 | 0 | if (mBuf) { |
73 | 0 | memcpy(mBuf, (char*)aBuf, aCount); |
74 | 0 | } |
75 | 0 | mCount = aCount; |
76 | 0 | mSink = aSink; |
77 | 0 | bool isNonBlocking; |
78 | 0 | mSink->IsNonBlocking(&isNonBlocking); |
79 | 0 | NS_ASSERTION(isNonBlocking == false, "mSink is nonblocking"); |
80 | 0 | mTee = aTee; |
81 | 0 | } |
82 | | |
83 | | NS_IMETHOD Run() override |
84 | 0 | { |
85 | 0 | if (!mBuf) { |
86 | 0 | NS_WARNING("nsInputStreamTeeWriteEvent::Run() " |
87 | 0 | "memory not allocated\n"); |
88 | 0 | return NS_OK; |
89 | 0 | } |
90 | 0 | MOZ_ASSERT(mSink, "mSink is null!"); |
91 | 0 |
|
92 | 0 | // The output stream could have been invalidated between when |
93 | 0 | // this event was dispatched and now, so check before writing. |
94 | 0 | if (!mTee->SinkIsValid()) { |
95 | 0 | return NS_OK; |
96 | 0 | } |
97 | 0 | |
98 | 0 | LOG(("nsInputStreamTeeWriteEvent::Run() [%p]" |
99 | 0 | "will write %u bytes to %p\n", |
100 | 0 | this, mCount, mSink.get())); |
101 | 0 |
|
102 | 0 | uint32_t totalBytesWritten = 0; |
103 | 0 | while (mCount) { |
104 | 0 | nsresult rv; |
105 | 0 | uint32_t bytesWritten = 0; |
106 | 0 | rv = mSink->Write(mBuf + totalBytesWritten, mCount, &bytesWritten); |
107 | 0 | if (NS_FAILED(rv)) { |
108 | 0 | LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %" PRIx32 " in writing", |
109 | 0 | this, static_cast<uint32_t>(rv))); |
110 | 0 | mTee->InvalidateSink(); |
111 | 0 | break; |
112 | 0 | } |
113 | 0 | totalBytesWritten += bytesWritten; |
114 | 0 | NS_ASSERTION(bytesWritten <= mCount, "wrote too much"); |
115 | 0 | mCount -= bytesWritten; |
116 | 0 | } |
117 | 0 | return NS_OK; |
118 | 0 | } |
119 | | |
120 | | protected: |
121 | | virtual ~nsInputStreamTeeWriteEvent() |
122 | 0 | { |
123 | 0 | if (mBuf) { |
124 | 0 | free(mBuf); |
125 | 0 | } |
126 | 0 | mBuf = nullptr; |
127 | 0 | } |
128 | | |
129 | | private: |
130 | | char* mBuf; |
131 | | uint32_t mCount; |
132 | | nsCOMPtr<nsIOutputStream> mSink; |
133 | | // back pointer to the tee that created this runnable |
134 | | RefPtr<nsInputStreamTee> mTee; |
135 | | }; |
136 | | |
137 | | nsInputStreamTee::nsInputStreamTee() |
138 | | : mWriter(nullptr) |
139 | | , mClosure(nullptr) |
140 | | , mLock(nullptr) |
141 | | , mSinkIsValid(true) |
142 | 0 | { |
143 | 0 | } |
144 | | |
145 | | bool |
146 | | nsInputStreamTee::SinkIsValid() |
147 | 0 | { |
148 | 0 | MutexAutoLock lock(*mLock); |
149 | 0 | return mSinkIsValid; |
150 | 0 | } |
151 | | |
152 | | void |
153 | | nsInputStreamTee::InvalidateSink() |
154 | 0 | { |
155 | 0 | MutexAutoLock lock(*mLock); |
156 | 0 | mSinkIsValid = false; |
157 | 0 | } |
158 | | |
159 | | nsresult |
160 | | nsInputStreamTee::TeeSegment(const char* aBuf, uint32_t aCount) |
161 | 0 | { |
162 | 0 | if (!mSink) { |
163 | 0 | return NS_OK; // nothing to do |
164 | 0 | } |
165 | 0 | if (mLock) { // asynchronous case |
166 | 0 | NS_ASSERTION(mEventTarget, "mEventTarget is null, mLock is not null."); |
167 | 0 | if (!SinkIsValid()) { |
168 | 0 | return NS_OK; // nothing to do |
169 | 0 | } |
170 | 0 | nsCOMPtr<nsIRunnable> event = |
171 | 0 | new nsInputStreamTeeWriteEvent(aBuf, aCount, mSink, this); |
172 | 0 | LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n", |
173 | 0 | this, aCount)); |
174 | 0 | return mEventTarget->Dispatch(event, NS_DISPATCH_NORMAL); |
175 | 0 | } else { // synchronous case |
176 | 0 | NS_ASSERTION(!mEventTarget, "mEventTarget is not null, mLock is null."); |
177 | 0 | nsresult rv; |
178 | 0 | uint32_t totalBytesWritten = 0; |
179 | 0 | while (aCount) { |
180 | 0 | uint32_t bytesWritten = 0; |
181 | 0 | rv = mSink->Write(aBuf + totalBytesWritten, aCount, &bytesWritten); |
182 | 0 | if (NS_FAILED(rv)) { |
183 | 0 | // ok, this is not a fatal error... just drop our reference to mSink |
184 | 0 | // and continue on as if nothing happened. |
185 | 0 | NS_WARNING("Write failed (non-fatal)"); |
186 | 0 | // catch possible misuse of the input stream tee |
187 | 0 | NS_ASSERTION(rv != NS_BASE_STREAM_WOULD_BLOCK, "sink must be a blocking stream"); |
188 | 0 | mSink = nullptr; |
189 | 0 | break; |
190 | 0 | } |
191 | 0 | totalBytesWritten += bytesWritten; |
192 | 0 | NS_ASSERTION(bytesWritten <= aCount, "wrote too much"); |
193 | 0 | aCount -= bytesWritten; |
194 | 0 | } |
195 | 0 | return NS_OK; |
196 | 0 | } |
197 | 0 | } |
198 | | |
199 | | nsresult |
200 | | nsInputStreamTee::WriteSegmentFun(nsIInputStream* aIn, void* aClosure, |
201 | | const char* aFromSegment, uint32_t aOffset, |
202 | | uint32_t aCount, uint32_t* aWriteCount) |
203 | 0 | { |
204 | 0 | nsInputStreamTee* tee = reinterpret_cast<nsInputStreamTee*>(aClosure); |
205 | 0 | nsresult rv = tee->mWriter(aIn, tee->mClosure, aFromSegment, aOffset, |
206 | 0 | aCount, aWriteCount); |
207 | 0 | if (NS_FAILED(rv) || (*aWriteCount == 0)) { |
208 | 0 | NS_ASSERTION((NS_FAILED(rv) ? (*aWriteCount == 0) : true), |
209 | 0 | "writer returned an error with non-zero writeCount"); |
210 | 0 | return rv; |
211 | 0 | } |
212 | 0 |
|
213 | 0 | return tee->TeeSegment(aFromSegment, *aWriteCount); |
214 | 0 | } |
215 | | |
216 | | NS_IMPL_ISUPPORTS(nsInputStreamTee, |
217 | | nsIInputStreamTee, |
218 | | nsIInputStream) |
219 | | NS_IMETHODIMP |
220 | | nsInputStreamTee::Close() |
221 | 0 | { |
222 | 0 | if (NS_WARN_IF(!mSource)) { |
223 | 0 | return NS_ERROR_NOT_INITIALIZED; |
224 | 0 | } |
225 | 0 | nsresult rv = mSource->Close(); |
226 | 0 | mSource = nullptr; |
227 | 0 | mSink = nullptr; |
228 | 0 | return rv; |
229 | 0 | } |
230 | | |
231 | | NS_IMETHODIMP |
232 | | nsInputStreamTee::Available(uint64_t* aAvail) |
233 | 0 | { |
234 | 0 | if (NS_WARN_IF(!mSource)) { |
235 | 0 | return NS_ERROR_NOT_INITIALIZED; |
236 | 0 | } |
237 | 0 | return mSource->Available(aAvail); |
238 | 0 | } |
239 | | |
240 | | NS_IMETHODIMP |
241 | | nsInputStreamTee::Read(char* aBuf, uint32_t aCount, uint32_t* aBytesRead) |
242 | 0 | { |
243 | 0 | if (NS_WARN_IF(!mSource)) { |
244 | 0 | return NS_ERROR_NOT_INITIALIZED; |
245 | 0 | } |
246 | 0 | |
247 | 0 | nsresult rv = mSource->Read(aBuf, aCount, aBytesRead); |
248 | 0 | if (NS_FAILED(rv) || (*aBytesRead == 0)) { |
249 | 0 | return rv; |
250 | 0 | } |
251 | 0 | |
252 | 0 | return TeeSegment(aBuf, *aBytesRead); |
253 | 0 | } |
254 | | |
255 | | NS_IMETHODIMP |
256 | | nsInputStreamTee::ReadSegments(nsWriteSegmentFun aWriter, |
257 | | void* aClosure, |
258 | | uint32_t aCount, |
259 | | uint32_t* aBytesRead) |
260 | 0 | { |
261 | 0 | if (NS_WARN_IF(!mSource)) { |
262 | 0 | return NS_ERROR_NOT_INITIALIZED; |
263 | 0 | } |
264 | 0 | |
265 | 0 | mWriter = aWriter; |
266 | 0 | mClosure = aClosure; |
267 | 0 |
|
268 | 0 | return mSource->ReadSegments(WriteSegmentFun, this, aCount, aBytesRead); |
269 | 0 | } |
270 | | |
271 | | NS_IMETHODIMP |
272 | | nsInputStreamTee::IsNonBlocking(bool* aResult) |
273 | 0 | { |
274 | 0 | if (NS_WARN_IF(!mSource)) { |
275 | 0 | return NS_ERROR_NOT_INITIALIZED; |
276 | 0 | } |
277 | 0 | return mSource->IsNonBlocking(aResult); |
278 | 0 | } |
279 | | |
280 | | NS_IMETHODIMP |
281 | | nsInputStreamTee::SetSource(nsIInputStream* aSource) |
282 | 0 | { |
283 | 0 | mSource = aSource; |
284 | 0 | return NS_OK; |
285 | 0 | } |
286 | | |
287 | | NS_IMETHODIMP |
288 | | nsInputStreamTee::GetSource(nsIInputStream** aSource) |
289 | 0 | { |
290 | 0 | NS_IF_ADDREF(*aSource = mSource); |
291 | 0 | return NS_OK; |
292 | 0 | } |
293 | | |
294 | | NS_IMETHODIMP |
295 | | nsInputStreamTee::SetSink(nsIOutputStream* aSink) |
296 | 0 | { |
297 | | #ifdef DEBUG |
298 | | if (aSink) { |
299 | | bool nonBlocking; |
300 | | nsresult rv = aSink->IsNonBlocking(&nonBlocking); |
301 | | if (NS_FAILED(rv) || nonBlocking) { |
302 | | NS_ERROR("aSink should be a blocking stream"); |
303 | | } |
304 | | } |
305 | | #endif |
306 | | mSink = aSink; |
307 | 0 | return NS_OK; |
308 | 0 | } |
309 | | |
310 | | NS_IMETHODIMP |
311 | | nsInputStreamTee::GetSink(nsIOutputStream** aSink) |
312 | 0 | { |
313 | 0 | NS_IF_ADDREF(*aSink = mSink); |
314 | 0 | return NS_OK; |
315 | 0 | } |
316 | | |
317 | | NS_IMETHODIMP |
318 | | nsInputStreamTee::SetEventTarget(nsIEventTarget* aEventTarget) |
319 | 0 | { |
320 | 0 | mEventTarget = aEventTarget; |
321 | 0 | if (mEventTarget) { |
322 | 0 | // Only need synchronization if this is an async tee |
323 | 0 | mLock = new Mutex("nsInputStreamTee.mLock"); |
324 | 0 | } |
325 | 0 | return NS_OK; |
326 | 0 | } |
327 | | |
328 | | NS_IMETHODIMP |
329 | | nsInputStreamTee::GetEventTarget(nsIEventTarget** aEventTarget) |
330 | 0 | { |
331 | 0 | NS_IF_ADDREF(*aEventTarget = mEventTarget); |
332 | 0 | return NS_OK; |
333 | 0 | } |
334 | | |
335 | | |
336 | | nsresult |
337 | | NS_NewInputStreamTeeAsync(nsIInputStream** aResult, |
338 | | nsIInputStream* aSource, |
339 | | nsIOutputStream* aSink, |
340 | | nsIEventTarget* aEventTarget) |
341 | 0 | { |
342 | 0 | nsresult rv; |
343 | 0 |
|
344 | 0 | nsCOMPtr<nsIInputStreamTee> tee = new nsInputStreamTee(); |
345 | 0 | rv = tee->SetSource(aSource); |
346 | 0 | if (NS_FAILED(rv)) { |
347 | 0 | return rv; |
348 | 0 | } |
349 | 0 | |
350 | 0 | rv = tee->SetSink(aSink); |
351 | 0 | if (NS_FAILED(rv)) { |
352 | 0 | return rv; |
353 | 0 | } |
354 | 0 | |
355 | 0 | rv = tee->SetEventTarget(aEventTarget); |
356 | 0 | if (NS_FAILED(rv)) { |
357 | 0 | return rv; |
358 | 0 | } |
359 | 0 | |
360 | 0 | tee.forget(aResult); |
361 | 0 | return rv; |
362 | 0 | } |
363 | | |
364 | | nsresult |
365 | | NS_NewInputStreamTee(nsIInputStream** aResult, |
366 | | nsIInputStream* aSource, |
367 | | nsIOutputStream* aSink) |
368 | 0 | { |
369 | 0 | return NS_NewInputStreamTeeAsync(aResult, aSource, aSink, nullptr); |
370 | 0 | } |
371 | | |
372 | | #undef LOG |