/src/oatpp/src/oatpp/data/stream/Stream.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /*************************************************************************** |
2 | | * |
3 | | * Project _____ __ ____ _ _ |
4 | | * ( _ ) /__\ (_ _)_| |_ _| |_ |
5 | | * )(_)( /(__)\ )( (_ _)(_ _) |
6 | | * (_____)(__)(__)(__) |_| |_| |
7 | | * |
8 | | * |
9 | | * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com> |
10 | | * |
11 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
12 | | * you may not use this file except in compliance with the License. |
13 | | * You may obtain a copy of the License at |
14 | | * |
15 | | * http://www.apache.org/licenses/LICENSE-2.0 |
16 | | * |
17 | | * Unless required by applicable law or agreed to in writing, software |
18 | | * distributed under the License is distributed on an "AS IS" BASIS, |
19 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
20 | | * See the License for the specific language governing permissions and |
21 | | * limitations under the License. |
22 | | * |
23 | | ***************************************************************************/ |
24 | | |
25 | | #include "./Stream.hpp" |
26 | | #include "oatpp/utils/Conversion.hpp" |
27 | | #include "oatpp/base/Log.hpp" |
28 | | |
29 | | namespace oatpp { namespace data{ namespace stream { |
30 | | |
31 | | //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
32 | | // WriteCallback |
33 | | |
34 | 0 | v_io_size WriteCallback::write(data::buffer::InlineWriteData& inlineData, async::Action& action) { |
35 | 0 | auto res = write(inlineData.currBufferPtr, inlineData.bytesLeft, action); |
36 | 0 | if(res > 0) { |
37 | 0 | inlineData.inc(res); |
38 | 0 | } |
39 | 0 | return res; |
40 | 0 | } |
41 | | |
42 | 5.72M | v_io_size WriteCallback::writeSimple(const void *data, v_buff_size count) { |
43 | 5.72M | async::Action action; |
44 | 5.72M | auto res = write(data, count, action); |
45 | 5.72M | if(!action.isNone()) { |
46 | 0 | OATPP_LOGe("[oatpp::data::stream::WriteCallback::writeSimple()]", "Error. writeSimple is called on a stream in Async mode.") |
47 | 0 | throw std::runtime_error("[oatpp::data::stream::WriteCallback::writeSimple()]: Error. writeSimple is called on a stream in Async mode."); |
48 | 0 | } |
49 | 5.72M | return res; |
50 | 5.72M | } |
51 | | |
52 | 0 | v_io_size WriteCallback::writeExactSizeDataSimple(data::buffer::InlineWriteData& inlineData) { |
53 | 0 | auto initialCount = inlineData.bytesLeft; |
54 | 0 | while(inlineData.bytesLeft > 0) { |
55 | 0 | async::Action action; |
56 | 0 | auto res = write(inlineData, action); |
57 | 0 | if(!action.isNone()) { |
58 | 0 | OATPP_LOGe("[oatpp::data::stream::WriteCallback::writeExactSizeDataSimple()]", "Error. writeExactSizeDataSimple() is called on a stream in Async mode.") |
59 | 0 | throw std::runtime_error("[oatpp::data::stream::WriteCallback::writeExactSizeDataSimple()]: Error. writeExactSizeDataSimple() is called on a stream in Async mode."); |
60 | 0 | } |
61 | 0 | if(res == IOError::BROKEN_PIPE || res == IOError::ZERO_VALUE) { |
62 | 0 | break; |
63 | 0 | } |
64 | 0 | } |
65 | 0 | return initialCount - inlineData.bytesLeft; |
66 | 0 | } |
67 | | |
68 | 0 | v_io_size WriteCallback::writeExactSizeDataSimple(const void *data, v_buff_size count) { |
69 | 0 | data::buffer::InlineWriteData inlineData(data, count); |
70 | 0 | return writeExactSizeDataSimple(inlineData); |
71 | 0 | } |
72 | | |
73 | 0 | async::Action WriteCallback::writeExactSizeDataAsyncInline(data::buffer::InlineWriteData& inlineData, async::Action&& nextAction) { |
74 | |
|
75 | 0 | if(inlineData.bytesLeft > 0) { |
76 | |
|
77 | 0 | async::Action action; |
78 | 0 | auto res = write(inlineData, action); |
79 | |
|
80 | 0 | if (!action.isNone()) { |
81 | 0 | return action; |
82 | 0 | } |
83 | | |
84 | 0 | if (res > 0) { |
85 | 0 | return async::Action::createActionByType(async::Action::TYPE_REPEAT); |
86 | 0 | } else { |
87 | 0 | switch (res) { |
88 | 0 | case IOError::BROKEN_PIPE: |
89 | 0 | return new AsyncIOError(IOError::BROKEN_PIPE); |
90 | 0 | case IOError::ZERO_VALUE: |
91 | 0 | break; |
92 | 0 | case IOError::RETRY_READ: |
93 | 0 | return async::Action::createActionByType(async::Action::TYPE_REPEAT); |
94 | 0 | case IOError::RETRY_WRITE: |
95 | 0 | return async::Action::createActionByType(async::Action::TYPE_REPEAT); |
96 | 0 | default: |
97 | 0 | OATPP_LOGe("[oatpp::data::stream::writeExactSizeDataAsyncInline()]", "Error. Unknown IO result.") |
98 | 0 | return new async::Error( |
99 | 0 | "[oatpp::data::stream::writeExactSizeDataAsyncInline()]: Error. Unknown IO result."); |
100 | 0 | } |
101 | 0 | } |
102 | |
|
103 | 0 | } |
104 | | |
105 | 0 | return std::forward<async::Action>(nextAction); |
106 | |
|
107 | 0 | } |
108 | | |
109 | 0 | async::CoroutineStarter WriteCallback::writeExactSizeDataAsync(const void* data, v_buff_size size) { |
110 | |
|
111 | 0 | class WriteDataCoroutine : public oatpp::async::Coroutine<WriteDataCoroutine> { |
112 | 0 | private: |
113 | 0 | WriteCallback* m_this; |
114 | 0 | data::buffer::InlineWriteData m_inlineData; |
115 | 0 | public: |
116 | |
|
117 | 0 | WriteDataCoroutine(WriteCallback* _this, |
118 | 0 | const void* data, v_buff_size size) |
119 | 0 | : m_this(_this) |
120 | 0 | , m_inlineData(data, size) |
121 | 0 | {} |
122 | |
|
123 | 0 | Action act() override { |
124 | 0 | return m_this->writeExactSizeDataAsyncInline(m_inlineData, finish()); |
125 | 0 | } |
126 | |
|
127 | 0 | }; |
128 | |
|
129 | 0 | return WriteDataCoroutine::start(this, data, size); |
130 | |
|
131 | 0 | } |
132 | | |
133 | | //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
134 | | // ReadCallback |
135 | | |
136 | 0 | v_io_size ReadCallback::read(data::buffer::InlineReadData& inlineData, async::Action& action) { |
137 | 0 | auto res = read(inlineData.currBufferPtr, inlineData.bytesLeft, action); |
138 | 0 | if(res > 0) { |
139 | 0 | inlineData.inc(res); |
140 | 0 | } |
141 | 0 | return res; |
142 | 0 | } |
143 | | |
144 | 0 | v_io_size ReadCallback::readExactSizeDataSimple(data::buffer::InlineReadData& inlineData) { |
145 | 0 | auto initialCount = inlineData.bytesLeft; |
146 | 0 | while(inlineData.bytesLeft > 0) { |
147 | 0 | async::Action action; |
148 | 0 | auto res = read(inlineData, action); |
149 | 0 | if(!action.isNone()) { |
150 | 0 | OATPP_LOGe("[oatpp::data::stream::ReadCallback::readExactSizeDataSimple()]", "Error. readExactSizeDataSimple() is called on a stream in Async mode.") |
151 | 0 | throw std::runtime_error("[oatpp::data::stream::ReadCallback::readExactSizeDataSimple()]: Error. readExactSizeDataSimple() is called on a stream in Async mode."); |
152 | 0 | } |
153 | 0 | if(res <= 0 && res != IOError::RETRY_READ && res != IOError::RETRY_WRITE) { |
154 | 0 | break; |
155 | 0 | } |
156 | 0 | } |
157 | 0 | return initialCount - inlineData.bytesLeft; |
158 | 0 | } |
159 | | |
160 | 0 | v_io_size ReadCallback::readExactSizeDataSimple(void *data, v_buff_size count) { |
161 | 0 | data::buffer::InlineReadData inlineData(data, count); |
162 | 0 | return readExactSizeDataSimple(inlineData); |
163 | 0 | } |
164 | | |
165 | 0 | async::Action ReadCallback::readExactSizeDataAsyncInline(data::buffer::InlineReadData& inlineData, async::Action&& nextAction) { |
166 | |
|
167 | 0 | if(inlineData.bytesLeft > 0) { |
168 | |
|
169 | 0 | async::Action action; |
170 | 0 | auto res = read(inlineData, action); |
171 | |
|
172 | 0 | if (!action.isNone()) { |
173 | 0 | return action; |
174 | 0 | } |
175 | | |
176 | 0 | if (res > 0) { |
177 | 0 | return async::Action::createActionByType(async::Action::TYPE_REPEAT); |
178 | 0 | } else { |
179 | 0 | switch (res) { |
180 | 0 | case IOError::BROKEN_PIPE: |
181 | 0 | return new AsyncIOError("[oatpp::data::stream::readExactSizeDataAsyncInline()]: IOError::BROKEN_PIPE", IOError::BROKEN_PIPE); |
182 | 0 | case IOError::ZERO_VALUE: |
183 | 0 | break; |
184 | 0 | case IOError::RETRY_READ: |
185 | 0 | return async::Action::createActionByType(async::Action::TYPE_REPEAT); |
186 | 0 | case IOError::RETRY_WRITE: |
187 | 0 | return async::Action::createActionByType(async::Action::TYPE_REPEAT); |
188 | 0 | default: |
189 | 0 | OATPP_LOGe("[oatpp::data::stream::readExactSizeDataAsyncInline()]", "Error. Unknown IO result.") |
190 | 0 | return new async::Error( |
191 | 0 | "[oatpp::data::stream::readExactSizeDataAsyncInline()]: Error. Unknown IO result."); |
192 | 0 | } |
193 | 0 | } |
194 | |
|
195 | 0 | } |
196 | | |
197 | 0 | return std::forward<async::Action>(nextAction); |
198 | |
|
199 | 0 | } |
200 | | |
201 | 0 | async::Action ReadCallback::readSomeDataAsyncInline(data::buffer::InlineReadData& inlineData, async::Action&& nextAction) { |
202 | |
|
203 | 0 | if(inlineData.bytesLeft > 0) { |
204 | |
|
205 | 0 | async::Action action; |
206 | 0 | auto res = read(inlineData, action); |
207 | |
|
208 | 0 | if(!action.isNone()) { |
209 | 0 | return action; |
210 | 0 | } |
211 | | |
212 | 0 | if(res < 0) { |
213 | 0 | switch (res) { |
214 | 0 | case IOError::BROKEN_PIPE: |
215 | 0 | return new AsyncIOError(IOError::BROKEN_PIPE); |
216 | | // case IOError::ZERO_VALUE: |
217 | | // break; |
218 | 0 | case IOError::RETRY_READ: |
219 | 0 | return async::Action::createActionByType(async::Action::TYPE_REPEAT); |
220 | 0 | case IOError::RETRY_WRITE: |
221 | 0 | return async::Action::createActionByType(async::Action::TYPE_REPEAT); |
222 | 0 | default: |
223 | 0 | OATPP_LOGe("[oatpp::data::stream::readSomeDataAsyncInline()]", "Error. Unknown IO result.") |
224 | 0 | return new async::Error( |
225 | 0 | "[oatpp::data::stream::readSomeDataAsyncInline()]: Error. Unknown IO result."); |
226 | 0 | } |
227 | 0 | } |
228 | |
|
229 | 0 | } |
230 | | |
231 | 0 | return std::forward<async::Action>(nextAction); |
232 | |
|
233 | 0 | } |
234 | | |
235 | 0 | v_io_size ReadCallback::readSimple(void *data, v_buff_size count) { |
236 | 0 | async::Action action; |
237 | 0 | auto res = read(data, count, action); |
238 | 0 | if(!action.isNone()) { |
239 | 0 | OATPP_LOGe("[oatpp::data::stream::ReadCallback::readSimple()]", "Error. readSimple is called on a stream in Async mode.") |
240 | 0 | throw std::runtime_error("[oatpp::data::stream::ReadCallback::readSimple()]: Error. readSimple is called on a stream in Async mode."); |
241 | 0 | } |
242 | 0 | return res; |
243 | 0 | } |
244 | | |
245 | | //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
246 | | // Context |
247 | | |
248 | | Context::Context(Properties&& properties) |
249 | 0 | : m_properties(std::forward<Properties>(properties)) |
250 | 0 | {} |
251 | | |
252 | 0 | const Context::Properties& Context::getProperties() const { |
253 | 0 | return m_properties; |
254 | 0 | } |
255 | | |
256 | 0 | Context::Properties& Context::getMutableProperties() { |
257 | 0 | return m_properties; |
258 | 0 | } |
259 | | |
260 | | //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
261 | | // DefaultInitializedContext |
262 | | |
263 | | DefaultInitializedContext::DefaultInitializedContext(StreamType streamType) |
264 | 4 | : m_streamType(streamType) |
265 | 4 | {} |
266 | | |
267 | | DefaultInitializedContext::DefaultInitializedContext(StreamType streamType, Properties&& properties) |
268 | 0 | : Context(std::forward<Properties>(properties)) |
269 | 0 | , m_streamType(streamType) |
270 | 0 | {} |
271 | | |
272 | 0 | void DefaultInitializedContext::init() { |
273 | | // DO NOTHING |
274 | 0 | } |
275 | | |
276 | 0 | async::CoroutineStarter DefaultInitializedContext::initAsync() { |
277 | 0 | return nullptr; |
278 | 0 | } |
279 | | |
280 | 0 | bool DefaultInitializedContext::isInitialized() const { |
281 | 0 | return true; |
282 | 0 | } |
283 | | |
284 | 0 | StreamType DefaultInitializedContext::getStreamType() const { |
285 | 0 | return m_streamType; |
286 | 0 | } |
287 | | |
288 | | //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
289 | | // IOStream |
290 | | |
291 | 0 | void IOStream::initContexts() { |
292 | |
|
293 | 0 | auto& inStreamContext = getInputStreamContext(); |
294 | 0 | if (!inStreamContext.isInitialized()) { |
295 | 0 | inStreamContext.init(); |
296 | 0 | } |
297 | |
|
298 | 0 | auto& outStreamContext = getOutputStreamContext(); |
299 | 0 | if(outStreamContext != inStreamContext && !outStreamContext.isInitialized()) { |
300 | 0 | outStreamContext.init(); |
301 | 0 | } |
302 | |
|
303 | 0 | } |
304 | | |
305 | | /** |
306 | | * Init input/output stream contexts in an async manner. |
307 | | */ |
308 | 0 | async::CoroutineStarter IOStream::initContextsAsync() { |
309 | |
|
310 | 0 | async::CoroutineStarter starter(nullptr); |
311 | |
|
312 | 0 | auto& inStreamContext = getInputStreamContext(); |
313 | 0 | if (!inStreamContext.isInitialized()) { |
314 | 0 | starter.next(inStreamContext.initAsync()); |
315 | 0 | } |
316 | |
|
317 | 0 | auto& outStreamContext = getOutputStreamContext(); |
318 | 0 | if(outStreamContext != inStreamContext && !outStreamContext.isInitialized()) { |
319 | 0 | starter.next(outStreamContext.initAsync()); |
320 | 0 | } |
321 | |
|
322 | 0 | return starter; |
323 | |
|
324 | 0 | } |
325 | | |
326 | | //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
327 | | // ConsistentOutputStream |
328 | | |
329 | 123k | v_io_size ConsistentOutputStream::writeAsString(v_int8 value){ |
330 | 123k | v_char8 a[16]; |
331 | 123k | auto size = utils::Conversion::int32ToCharSequence(value, &a[0], 16); |
332 | 123k | if(size > 0){ |
333 | 123k | return writeSimple(&a[0], size); |
334 | 123k | } |
335 | 0 | return 0; |
336 | 123k | } |
337 | | |
338 | 16.2k | v_io_size ConsistentOutputStream::writeAsString(v_uint8 value){ |
339 | 16.2k | v_char8 a[16]; |
340 | 16.2k | auto size = utils::Conversion::uint32ToCharSequence(value, &a[0], 16); |
341 | 16.2k | if(size > 0){ |
342 | 16.2k | return writeSimple(&a[0], size); |
343 | 16.2k | } |
344 | 0 | return 0; |
345 | 16.2k | } |
346 | | |
347 | 10 | v_io_size ConsistentOutputStream::writeAsString(v_int16 value){ |
348 | 10 | v_char8 a[16]; |
349 | 10 | auto size = utils::Conversion::int32ToCharSequence(value, &a[0], 16); |
350 | 10 | if(size > 0){ |
351 | 10 | return writeSimple(&a[0], size); |
352 | 10 | } |
353 | 0 | return 0; |
354 | 10 | } |
355 | | |
356 | 10 | v_io_size ConsistentOutputStream::writeAsString(v_uint16 value){ |
357 | 10 | v_char8 a[16]; |
358 | 10 | auto size = utils::Conversion::uint32ToCharSequence(value, &a[0], 16); |
359 | 10 | if(size > 0){ |
360 | 10 | return writeSimple(&a[0], size); |
361 | 10 | } |
362 | 0 | return 0; |
363 | 10 | } |
364 | | |
365 | 12 | v_io_size ConsistentOutputStream::writeAsString(v_int32 value){ |
366 | 12 | v_char8 a[16]; |
367 | 12 | auto size = utils::Conversion::int32ToCharSequence(value, &a[0], 16); |
368 | 12 | if(size > 0){ |
369 | 12 | return writeSimple(&a[0], size); |
370 | 12 | } |
371 | 0 | return 0; |
372 | 12 | } |
373 | | |
374 | 2.08k | v_io_size ConsistentOutputStream::writeAsString(v_uint32 value){ |
375 | 2.08k | v_char8 a[16]; |
376 | 2.08k | auto size = utils::Conversion::uint32ToCharSequence(value, &a[0], 16); |
377 | 2.08k | if(size > 0){ |
378 | 2.08k | return writeSimple(&a[0], size); |
379 | 2.08k | } |
380 | 0 | return 0; |
381 | 2.08k | } |
382 | | |
383 | 533k | v_io_size ConsistentOutputStream::writeAsString(v_int64 value){ |
384 | 533k | v_char8 a[32]; |
385 | 533k | auto size = utils::Conversion::int64ToCharSequence(value, &a[0], 32); |
386 | 533k | if(size > 0){ |
387 | 533k | return writeSimple(&a[0], size); |
388 | 533k | } |
389 | 0 | return 0; |
390 | 533k | } |
391 | | |
392 | 10 | v_io_size ConsistentOutputStream::writeAsString(v_uint64 value){ |
393 | 10 | v_char8 a[32]; |
394 | 10 | auto size = utils::Conversion::uint64ToCharSequence(value, &a[0], 32); |
395 | 10 | if(size > 0){ |
396 | 10 | return writeSimple(&a[0], size); |
397 | 10 | } |
398 | 0 | return 0; |
399 | 10 | } |
400 | | |
401 | 12 | v_io_size ConsistentOutputStream::writeAsString(v_float32 value){ |
402 | 12 | v_char8 a[100]; |
403 | 12 | auto size = utils::Conversion::float32ToCharSequence(value, &a[0], 100); |
404 | 12 | if(size > 0){ |
405 | 12 | return writeSimple(&a[0], size); |
406 | 12 | } |
407 | 0 | return 0; |
408 | 12 | } |
409 | | |
410 | 2.14k | v_io_size ConsistentOutputStream::writeAsString(v_float64 value){ |
411 | 2.14k | v_char8 a[100]; |
412 | 2.14k | auto size = utils::Conversion::float64ToCharSequence(value, &a[0], 100); |
413 | 2.14k | if(size > 0){ |
414 | 2.14k | return writeSimple(&a[0], size); |
415 | 2.14k | } |
416 | 0 | return 0; |
417 | 2.14k | } |
418 | | |
419 | 1.34k | v_io_size ConsistentOutputStream::writeAsString(bool value) { |
420 | 1.34k | if(value){ |
421 | 773 | return writeSimple("true", 4); |
422 | 773 | } else { |
423 | 576 | return writeSimple("false", 5); |
424 | 576 | } |
425 | 1.34k | } |
426 | | |
427 | | //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
428 | | // Other functions |
429 | | |
430 | | |
431 | 173k | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const oatpp::String& str) { |
432 | 173k | if(str) { |
433 | 173k | s.writeSimple(str); |
434 | 173k | } else { |
435 | 0 | s.writeSimple("[<String(null)>]"); |
436 | 0 | } |
437 | 173k | return s; |
438 | 173k | } |
439 | | |
440 | 0 | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int8& value) { |
441 | 0 | if(value.getPtr()) { |
442 | 0 | return operator << (s, *value); |
443 | 0 | } |
444 | 0 | s.writeSimple("[<Int8(null)>]"); |
445 | 0 | return s; |
446 | 0 | } |
447 | | |
448 | 0 | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt8& value) { |
449 | 0 | if(value.getPtr()) { |
450 | 0 | return operator << (s, *value); |
451 | 0 | } |
452 | 0 | s.writeSimple("[<UInt8(null)>]"); |
453 | 0 | return s; |
454 | 0 | } |
455 | | |
456 | 0 | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int16& value) { |
457 | 0 | if(value.getPtr()) { |
458 | 0 | return operator << (s, *value); |
459 | 0 | } |
460 | 0 | s.writeSimple("[<Int16(null)>]"); |
461 | 0 | return s; |
462 | 0 | } |
463 | | |
464 | 0 | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt16& value) { |
465 | 0 | if(value.getPtr()) { |
466 | 0 | return operator << (s, *value); |
467 | 0 | } |
468 | 0 | s.writeSimple("[<UInt16(null)>]"); |
469 | 0 | return s; |
470 | 0 | } |
471 | | |
472 | 0 | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int32& value) { |
473 | 0 | if(value.getPtr()) { |
474 | 0 | return operator << (s, *value); |
475 | 0 | } |
476 | 0 | s.writeSimple("[<Int32(null)>]"); |
477 | 0 | return s; |
478 | 0 | } |
479 | | |
480 | 0 | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt32& value) { |
481 | 0 | if(value.getPtr()) { |
482 | 0 | return operator << (s, *value); |
483 | 0 | } |
484 | 0 | s.writeSimple("[<UInt32(null)>]"); |
485 | 0 | return s; |
486 | 0 | } |
487 | | |
488 | 0 | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int64& value) { |
489 | 0 | if(value.getPtr()) { |
490 | 0 | return operator << (s, *value); |
491 | 0 | } |
492 | 0 | s.writeSimple("[<Int64(null)>]"); |
493 | 0 | return s; |
494 | 0 | } |
495 | | |
496 | 0 | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt64& value) { |
497 | 0 | if(value.getPtr()) { |
498 | 0 | return operator << (s, *value); |
499 | 0 | } |
500 | 0 | s.writeSimple("[<UInt64(null)>]"); |
501 | 0 | return s; |
502 | 0 | } |
503 | | |
504 | 0 | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Float32& value) { |
505 | 0 | if(value.getPtr()) { |
506 | 0 | return operator << (s, *value); |
507 | 0 | } |
508 | 0 | s.writeSimple("[<Float32(null)>]"); |
509 | 0 | return s; |
510 | 0 | } |
511 | | |
512 | 0 | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Float64& value) { |
513 | 0 | if(value.getPtr()) { |
514 | 0 | return operator << (s, *value); |
515 | 0 | } |
516 | 0 | s.writeSimple("[<Float64(null)>]"); |
517 | 0 | return s; |
518 | 0 | } |
519 | | |
520 | 0 | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Boolean& value) { |
521 | 0 | if(value.getPtr()) { // use getPtr() here to avoid false to nullptr conversion |
522 | 0 | return operator << (s, *value); |
523 | 0 | } |
524 | 0 | s.writeSimple("[<Boolean(null)>]"); |
525 | 0 | return s; |
526 | 0 | } |
527 | | |
528 | 162k | ConsistentOutputStream& operator << (ConsistentOutputStream& s, const char* str) { |
529 | 162k | if(str != nullptr) { |
530 | 162k | s.writeSimple(str); |
531 | 162k | } else { |
532 | 0 | s.writeSimple("[<char*(null)>]"); |
533 | 0 | } |
534 | 162k | return s; |
535 | 162k | } |
536 | | |
537 | | //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
538 | | // DataTransferProcessor |
539 | | |
540 | | StatelessDataTransferProcessor StatelessDataTransferProcessor::INSTANCE; |
541 | | |
542 | 0 | v_io_size StatelessDataTransferProcessor::suggestInputStreamReadSize() { |
543 | 0 | return 32767; |
544 | 0 | } |
545 | | |
546 | 0 | v_int32 StatelessDataTransferProcessor::iterate(data::buffer::InlineReadData& dataIn, data::buffer::InlineReadData& dataOut) { |
547 | |
|
548 | 0 | if(dataOut.bytesLeft > 0) { |
549 | 0 | return Error::FLUSH_DATA_OUT; |
550 | 0 | } |
551 | | |
552 | 0 | if(dataIn.currBufferPtr != nullptr) { |
553 | |
|
554 | 0 | if(dataIn.bytesLeft == 0){ |
555 | 0 | return Error::PROVIDE_DATA_IN; |
556 | 0 | } |
557 | | |
558 | 0 | dataOut = dataIn; |
559 | 0 | dataIn.setEof(); |
560 | 0 | return Error::FLUSH_DATA_OUT; |
561 | |
|
562 | 0 | } |
563 | | |
564 | 0 | dataOut = dataIn; |
565 | 0 | dataIn.setEof(); |
566 | 0 | return Error::FINISHED; |
567 | |
|
568 | 0 | } |
569 | | |
570 | | //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
571 | | // Other functions |
572 | | |
573 | | v_io_size transfer(const base::ObjectHandle<ReadCallback>& readCallback, |
574 | | const base::ObjectHandle<WriteCallback>& writeCallback, |
575 | | v_io_size transferSize, |
576 | | void* buffer, |
577 | | v_buff_size bufferSize, |
578 | | const base::ObjectHandle<data::buffer::Processor>& processor) |
579 | 0 | { |
580 | |
|
581 | 0 | data::buffer::InlineReadData inData; |
582 | 0 | data::buffer::InlineReadData outData; |
583 | |
|
584 | 0 | v_int32 procRes = data::buffer::Processor::Error::PROVIDE_DATA_IN; |
585 | 0 | v_io_size progress = 0; |
586 | |
|
587 | 0 | while(procRes != data::buffer::Processor::Error::FINISHED) { |
588 | |
|
589 | 0 | if(procRes == data::buffer::Processor::Error::PROVIDE_DATA_IN && inData.bytesLeft == 0) { |
590 | |
|
591 | 0 | v_buff_size desiredToRead = processor->suggestInputStreamReadSize(); |
592 | |
|
593 | 0 | if (desiredToRead > bufferSize) { |
594 | 0 | desiredToRead = bufferSize; |
595 | 0 | } |
596 | |
|
597 | 0 | if(transferSize > 0 && progress + desiredToRead > transferSize) { |
598 | 0 | desiredToRead = transferSize - progress; |
599 | 0 | } |
600 | |
|
601 | 0 | v_io_size res = 0; |
602 | |
|
603 | 0 | if(desiredToRead > 0) { |
604 | 0 | res = IOError::RETRY_READ; |
605 | 0 | while (res == IOError::RETRY_READ || res == IOError::RETRY_WRITE) { |
606 | 0 | res = readCallback->readSimple(buffer, desiredToRead); |
607 | 0 | } |
608 | 0 | } |
609 | |
|
610 | 0 | if (res > 0) { |
611 | 0 | inData.set(buffer, res); |
612 | 0 | progress += res; |
613 | 0 | } else { |
614 | 0 | inData.set(nullptr, 0); |
615 | 0 | } |
616 | |
|
617 | 0 | } |
618 | |
|
619 | 0 | procRes = data::buffer::Processor::Error::OK; |
620 | 0 | while(procRes == data::buffer::Processor::Error::OK) { |
621 | 0 | procRes = processor->iterate(inData, outData); |
622 | 0 | } |
623 | |
|
624 | 0 | switch(procRes) { |
625 | | |
626 | 0 | case data::buffer::Processor::Error::PROVIDE_DATA_IN: { |
627 | 0 | continue; |
628 | 0 | } |
629 | | |
630 | 0 | case data::buffer::Processor::Error::FLUSH_DATA_OUT: { |
631 | 0 | v_io_size res = IOError::RETRY_WRITE; |
632 | 0 | while(res == IOError::RETRY_WRITE || res == IOError::RETRY_READ) { |
633 | 0 | res = writeCallback->writeSimple(outData.currBufferPtr, outData.bytesLeft); |
634 | 0 | } |
635 | 0 | if(res > 0) { |
636 | 0 | outData.inc(res); |
637 | 0 | } else { |
638 | 0 | return progress; |
639 | 0 | } |
640 | 0 | break; |
641 | 0 | } |
642 | | |
643 | 0 | case data::buffer::Processor::Error::FINISHED: |
644 | 0 | return progress; |
645 | | |
646 | 0 | default: |
647 | | //throw std::runtime_error("Unknown buffer processor error."); |
648 | 0 | return progress; |
649 | |
|
650 | 0 | } |
651 | |
|
652 | 0 | } |
653 | | |
654 | 0 | return progress; |
655 | |
|
656 | 0 | } |
657 | | |
658 | | async::CoroutineStarter transferAsync(const base::ObjectHandle<ReadCallback>& readCallback, |
659 | | const base::ObjectHandle<WriteCallback>& writeCallback, |
660 | | v_buff_size transferSize, |
661 | | const base::ObjectHandle<data::buffer::IOBuffer>& buffer, |
662 | | const base::ObjectHandle<data::buffer::Processor>& processor) |
663 | 0 | { |
664 | |
|
665 | 0 | class TransferCoroutine : public oatpp::async::Coroutine<TransferCoroutine> { |
666 | 0 | private: |
667 | 0 | base::ObjectHandle<ReadCallback> m_readCallback; |
668 | 0 | base::ObjectHandle<WriteCallback> m_writeCallback; |
669 | 0 | v_buff_size m_transferSize; |
670 | 0 | base::ObjectHandle<oatpp::data::buffer::IOBuffer> m_buffer; |
671 | 0 | base::ObjectHandle<data::buffer::Processor> m_processor; |
672 | 0 | private: |
673 | 0 | v_buff_size m_progress; |
674 | 0 | private: |
675 | 0 | v_int32 m_procRes; |
676 | 0 | data::buffer::InlineReadData m_readData; |
677 | 0 | data::buffer::InlineWriteData m_writeData; |
678 | 0 | data::buffer::InlineReadData m_inData; |
679 | 0 | data::buffer::InlineReadData m_outData; |
680 | 0 | public: |
681 | |
|
682 | 0 | TransferCoroutine(const base::ObjectHandle<ReadCallback>& readCallback, |
683 | 0 | const base::ObjectHandle<WriteCallback>& writeCallback, |
684 | 0 | v_buff_size transferSize, |
685 | 0 | const base::ObjectHandle<buffer::IOBuffer>& buffer, |
686 | 0 | const base::ObjectHandle<buffer::Processor>& processor) |
687 | 0 | : m_readCallback(readCallback) |
688 | 0 | , m_writeCallback(writeCallback) |
689 | 0 | , m_transferSize(transferSize) |
690 | 0 | , m_buffer(buffer) |
691 | 0 | , m_processor(processor) |
692 | 0 | , m_progress(0) |
693 | 0 | , m_procRes(data::buffer::Processor::Error::PROVIDE_DATA_IN) |
694 | 0 | , m_readData(buffer->getData(), buffer->getSize()) |
695 | 0 | {} |
696 | |
|
697 | 0 | Action act() override { |
698 | |
|
699 | 0 | if(m_procRes == data::buffer::Processor::Error::FINISHED) { |
700 | 0 | return finish(); |
701 | 0 | } |
702 | | |
703 | 0 | if(m_procRes == data::buffer::Processor::Error::PROVIDE_DATA_IN && m_inData.bytesLeft == 0) { |
704 | |
|
705 | 0 | auto desiredToRead = m_processor->suggestInputStreamReadSize(); |
706 | |
|
707 | 0 | if (desiredToRead > m_readData.bytesLeft) { |
708 | 0 | desiredToRead = m_readData.bytesLeft; |
709 | 0 | } |
710 | |
|
711 | 0 | if(m_transferSize > 0 && m_progress + desiredToRead > m_transferSize) { |
712 | 0 | desiredToRead = m_transferSize - m_progress; |
713 | 0 | } |
714 | |
|
715 | 0 | Action action; |
716 | 0 | v_io_size res = 0; |
717 | |
|
718 | 0 | if(desiredToRead > 0) { |
719 | 0 | res = m_readCallback->read(m_readData.currBufferPtr, desiredToRead, action); |
720 | 0 | } |
721 | |
|
722 | 0 | if (res > 0) { |
723 | 0 | m_readData.inc(res); |
724 | 0 | m_inData.set(m_buffer->getData(), m_buffer->getSize() - m_readData.bytesLeft); |
725 | 0 | m_progress += res; |
726 | 0 | } else { |
727 | |
|
728 | 0 | switch(res) { |
729 | | |
730 | 0 | case IOError::BROKEN_PIPE: |
731 | 0 | if(m_transferSize > 0) { |
732 | 0 | return error<AsyncTransferError>("[oatpp::data::stream::transferAsync]: Error. ReadCallback. BROKEN_PIPE."); |
733 | 0 | } |
734 | 0 | m_inData.set(nullptr, 0); |
735 | 0 | break; |
736 | | |
737 | 0 | case IOError::ZERO_VALUE: |
738 | 0 | m_inData.set(nullptr, 0); |
739 | 0 | break; |
740 | | |
741 | 0 | case IOError::RETRY_READ: |
742 | 0 | if(!action.isNone()) { |
743 | 0 | return action; |
744 | 0 | } |
745 | 0 | return repeat(); |
746 | | |
747 | 0 | case IOError::RETRY_WRITE: |
748 | 0 | if(!action.isNone()) { |
749 | 0 | return action; |
750 | 0 | } |
751 | 0 | return repeat(); |
752 | | |
753 | 0 | default: |
754 | 0 | if(m_transferSize > 0) { |
755 | 0 | if (!action.isNone()) { |
756 | 0 | return action; |
757 | 0 | } |
758 | 0 | return error<AsyncTransferError>("[oatpp::data::stream::transferAsync]: Error. ReadCallback. Unknown IO error."); |
759 | 0 | } |
760 | 0 | m_inData.set(nullptr, 0); |
761 | |
|
762 | 0 | } |
763 | |
|
764 | 0 | } |
765 | | |
766 | 0 | if(!action.isNone()){ |
767 | 0 | return action; |
768 | 0 | } |
769 | |
|
770 | 0 | } |
771 | | |
772 | 0 | return yieldTo(&TransferCoroutine::process); |
773 | |
|
774 | 0 | } |
775 | |
|
776 | 0 | Action process() { |
777 | |
|
778 | 0 | m_procRes = m_processor->iterate(m_inData, m_outData); |
779 | |
|
780 | 0 | switch(m_procRes) { |
781 | | |
782 | 0 | case data::buffer::Processor::Error::OK: |
783 | 0 | return repeat(); |
784 | | |
785 | 0 | case data::buffer::Processor::Error::PROVIDE_DATA_IN: { |
786 | 0 | m_readData.set(m_buffer->getData(), m_buffer->getSize()); |
787 | 0 | return yieldTo(&TransferCoroutine::act); |
788 | 0 | } |
789 | | |
790 | 0 | case data::buffer::Processor::Error::FLUSH_DATA_OUT: { |
791 | 0 | m_readData.set(m_buffer->getData(), m_buffer->getSize()); |
792 | 0 | m_writeData.set(m_outData.currBufferPtr, m_outData.bytesLeft); |
793 | 0 | m_outData.setEof(); |
794 | 0 | return yieldTo(&TransferCoroutine::flushData); |
795 | 0 | } |
796 | | |
797 | 0 | case data::buffer::Processor::Error::FINISHED: |
798 | 0 | return finish(); |
799 | | |
800 | 0 | default: |
801 | 0 | return error<AsyncTransferError>("[oatpp::data::stream::transferAsync]: Error. ReadCallback. Unknown processing error."); |
802 | |
|
803 | 0 | } |
804 | |
|
805 | 0 | } |
806 | |
|
807 | 0 | Action flushData() { |
808 | 0 | return m_writeCallback->writeExactSizeDataAsyncInline(m_writeData, yieldTo(&TransferCoroutine::act)); |
809 | 0 | } |
810 | |
|
811 | 0 | }; |
812 | |
|
813 | 0 | return TransferCoroutine::start(readCallback, writeCallback, transferSize, buffer, processor); |
814 | |
|
815 | 0 | } |
816 | | |
817 | | }}} |