Coverage Report

Created: 2025-06-22 07:07

/src/oatpp/src/oatpp/data/stream/Stream.cpp
Line
Count
Source (jump to first uncovered line)
1
/***************************************************************************
2
 *
3
 * Project         _____    __   ____   _      _
4
 *                (  _  )  /__\ (_  _)_| |_  _| |_
5
 *                 )(_)(  /(__)\  )( (_   _)(_   _)
6
 *                (_____)(__)(__)(__)  |_|    |_|
7
 *
8
 *
9
 * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
10
 *
11
 * Licensed under the Apache License, Version 2.0 (the "License");
12
 * you may not use this file except in compliance with the License.
13
 * You may obtain a copy of the License at
14
 *
15
 *     http://www.apache.org/licenses/LICENSE-2.0
16
 *
17
 * Unless required by applicable law or agreed to in writing, software
18
 * distributed under the License is distributed on an "AS IS" BASIS,
19
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
 * See the License for the specific language governing permissions and
21
 * limitations under the License.
22
 *
23
 ***************************************************************************/
24
25
#include "./Stream.hpp"
26
#include "oatpp/utils/Conversion.hpp"
27
#include "oatpp/base/Log.hpp"
28
29
namespace oatpp { namespace data{ namespace stream {
30
31
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
32
// WriteCallback
33
34
0
v_io_size WriteCallback::write(data::buffer::InlineWriteData& inlineData, async::Action& action) {
35
0
  auto res = write(inlineData.currBufferPtr, inlineData.bytesLeft, action);
36
0
  if(res > 0) {
37
0
    inlineData.inc(res);
38
0
  }
39
0
  return res;
40
0
}
41
42
5.72M
v_io_size WriteCallback::writeSimple(const void *data, v_buff_size count) {
43
5.72M
  async::Action action;
44
5.72M
  auto res = write(data, count, action);
45
5.72M
  if(!action.isNone()) {
46
0
    OATPP_LOGe("[oatpp::data::stream::WriteCallback::writeSimple()]", "Error. writeSimple is called on a stream in Async mode.")
47
0
    throw std::runtime_error("[oatpp::data::stream::WriteCallback::writeSimple()]: Error. writeSimple is called on a stream in Async mode.");
48
0
  }
49
5.72M
  return res;
50
5.72M
}
51
52
0
v_io_size WriteCallback::writeExactSizeDataSimple(data::buffer::InlineWriteData& inlineData) {
53
0
  auto initialCount = inlineData.bytesLeft;
54
0
  while(inlineData.bytesLeft > 0) {
55
0
    async::Action action;
56
0
    auto res = write(inlineData, action);
57
0
    if(!action.isNone()) {
58
0
      OATPP_LOGe("[oatpp::data::stream::WriteCallback::writeExactSizeDataSimple()]", "Error. writeExactSizeDataSimple() is called on a stream in Async mode.")
59
0
      throw std::runtime_error("[oatpp::data::stream::WriteCallback::writeExactSizeDataSimple()]: Error. writeExactSizeDataSimple() is called on a stream in Async mode.");
60
0
    }
61
0
    if(res == IOError::BROKEN_PIPE || res == IOError::ZERO_VALUE) {
62
0
      break;
63
0
    }
64
0
  }
65
0
  return initialCount - inlineData.bytesLeft;
66
0
}
67
68
0
v_io_size WriteCallback::writeExactSizeDataSimple(const void *data, v_buff_size count) {
69
0
  data::buffer::InlineWriteData inlineData(data, count);
70
0
  return writeExactSizeDataSimple(inlineData);
71
0
}
72
73
0
async::Action WriteCallback::writeExactSizeDataAsyncInline(data::buffer::InlineWriteData& inlineData, async::Action&& nextAction) {
74
75
0
  if(inlineData.bytesLeft > 0) {
76
77
0
    async::Action action;
78
0
    auto res = write(inlineData, action);
79
80
0
    if (!action.isNone()) {
81
0
      return action;
82
0
    }
83
84
0
    if (res > 0) {
85
0
      return async::Action::createActionByType(async::Action::TYPE_REPEAT);
86
0
    } else {
87
0
      switch (res) {
88
0
        case IOError::BROKEN_PIPE:
89
0
          return new AsyncIOError(IOError::BROKEN_PIPE);
90
0
        case IOError::ZERO_VALUE:
91
0
          break;
92
0
        case IOError::RETRY_READ:
93
0
          return async::Action::createActionByType(async::Action::TYPE_REPEAT);
94
0
        case IOError::RETRY_WRITE:
95
0
          return async::Action::createActionByType(async::Action::TYPE_REPEAT);
96
0
        default:
97
0
          OATPP_LOGe("[oatpp::data::stream::writeExactSizeDataAsyncInline()]", "Error. Unknown IO result.")
98
0
          return new async::Error(
99
0
            "[oatpp::data::stream::writeExactSizeDataAsyncInline()]: Error. Unknown IO result.");
100
0
      }
101
0
    }
102
103
0
  }
104
105
0
  return std::forward<async::Action>(nextAction);
106
107
0
}
108
109
0
async::CoroutineStarter WriteCallback::writeExactSizeDataAsync(const void* data, v_buff_size size) {
110
111
0
  class WriteDataCoroutine : public oatpp::async::Coroutine<WriteDataCoroutine> {
112
0
  private:
113
0
    WriteCallback* m_this;
114
0
    data::buffer::InlineWriteData m_inlineData;
115
0
  public:
116
117
0
    WriteDataCoroutine(WriteCallback* _this,
118
0
                       const void* data, v_buff_size size)
119
0
      : m_this(_this)
120
0
      , m_inlineData(data, size)
121
0
    {}
122
123
0
    Action act() override {
124
0
      return m_this->writeExactSizeDataAsyncInline(m_inlineData, finish());
125
0
    }
126
127
0
  };
128
129
0
  return WriteDataCoroutine::start(this, data, size);
130
131
0
}
132
133
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
134
// ReadCallback
135
136
0
v_io_size ReadCallback::read(data::buffer::InlineReadData& inlineData, async::Action& action) {
137
0
  auto res = read(inlineData.currBufferPtr, inlineData.bytesLeft, action);
138
0
  if(res > 0) {
139
0
    inlineData.inc(res);
140
0
  }
141
0
  return res;
142
0
}
143
144
0
v_io_size ReadCallback::readExactSizeDataSimple(data::buffer::InlineReadData& inlineData) {
145
0
  auto initialCount = inlineData.bytesLeft;
146
0
  while(inlineData.bytesLeft > 0) {
147
0
    async::Action action;
148
0
    auto res = read(inlineData, action);
149
0
    if(!action.isNone()) {
150
0
      OATPP_LOGe("[oatpp::data::stream::ReadCallback::readExactSizeDataSimple()]", "Error. readExactSizeDataSimple() is called on a stream in Async mode.")
151
0
      throw std::runtime_error("[oatpp::data::stream::ReadCallback::readExactSizeDataSimple()]: Error. readExactSizeDataSimple() is called on a stream in Async mode.");
152
0
    }
153
0
    if(res <= 0 && res != IOError::RETRY_READ && res != IOError::RETRY_WRITE) {
154
0
      break;
155
0
    }
156
0
  }
157
0
  return initialCount - inlineData.bytesLeft;
158
0
}
159
160
0
v_io_size ReadCallback::readExactSizeDataSimple(void *data, v_buff_size count) {
161
0
  data::buffer::InlineReadData inlineData(data, count);
162
0
  return readExactSizeDataSimple(inlineData);
163
0
}
164
165
0
async::Action ReadCallback::readExactSizeDataAsyncInline(data::buffer::InlineReadData& inlineData, async::Action&& nextAction) {
166
167
0
  if(inlineData.bytesLeft > 0) {
168
169
0
    async::Action action;
170
0
    auto res = read(inlineData, action);
171
172
0
    if (!action.isNone()) {
173
0
      return action;
174
0
    }
175
176
0
    if (res > 0) {
177
0
      return async::Action::createActionByType(async::Action::TYPE_REPEAT);
178
0
    } else {
179
0
      switch (res) {
180
0
        case IOError::BROKEN_PIPE:
181
0
          return new AsyncIOError("[oatpp::data::stream::readExactSizeDataAsyncInline()]: IOError::BROKEN_PIPE", IOError::BROKEN_PIPE);
182
0
        case IOError::ZERO_VALUE:
183
0
          break;
184
0
        case IOError::RETRY_READ:
185
0
          return async::Action::createActionByType(async::Action::TYPE_REPEAT);
186
0
        case IOError::RETRY_WRITE:
187
0
          return async::Action::createActionByType(async::Action::TYPE_REPEAT);
188
0
        default:
189
0
          OATPP_LOGe("[oatpp::data::stream::readExactSizeDataAsyncInline()]", "Error. Unknown IO result.")
190
0
          return new async::Error(
191
0
            "[oatpp::data::stream::readExactSizeDataAsyncInline()]: Error. Unknown IO result.");
192
0
      }
193
0
    }
194
195
0
  }
196
197
0
  return std::forward<async::Action>(nextAction);
198
199
0
}
200
201
0
async::Action ReadCallback::readSomeDataAsyncInline(data::buffer::InlineReadData& inlineData, async::Action&& nextAction) {
202
203
0
  if(inlineData.bytesLeft > 0) {
204
205
0
    async::Action action;
206
0
    auto res = read(inlineData, action);
207
208
0
    if(!action.isNone()) {
209
0
      return action;
210
0
    }
211
212
0
    if(res < 0) {
213
0
      switch (res) {
214
0
        case IOError::BROKEN_PIPE:
215
0
          return new AsyncIOError(IOError::BROKEN_PIPE);
216
//          case IOError::ZERO_VALUE:
217
//            break;
218
0
        case IOError::RETRY_READ:
219
0
          return async::Action::createActionByType(async::Action::TYPE_REPEAT);
220
0
        case IOError::RETRY_WRITE:
221
0
          return async::Action::createActionByType(async::Action::TYPE_REPEAT);
222
0
        default:
223
0
          OATPP_LOGe("[oatpp::data::stream::readSomeDataAsyncInline()]", "Error. Unknown IO result.")
224
0
          return new async::Error(
225
0
            "[oatpp::data::stream::readSomeDataAsyncInline()]: Error. Unknown IO result.");
226
0
      }
227
0
    }
228
229
0
  }
230
231
0
  return std::forward<async::Action>(nextAction);
232
233
0
}
234
235
0
v_io_size ReadCallback::readSimple(void *data, v_buff_size count) {
236
0
  async::Action action;
237
0
  auto res = read(data, count, action);
238
0
  if(!action.isNone()) {
239
0
    OATPP_LOGe("[oatpp::data::stream::ReadCallback::readSimple()]", "Error. readSimple is called on a stream in Async mode.")
240
0
    throw std::runtime_error("[oatpp::data::stream::ReadCallback::readSimple()]: Error. readSimple is called on a stream in Async mode.");
241
0
  }
242
0
  return res;
243
0
}
244
245
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
246
// Context
247
248
Context::Context(Properties&& properties)
249
0
  : m_properties(std::forward<Properties>(properties))
250
0
{}
251
252
0
const Context::Properties& Context::getProperties() const {
253
0
  return m_properties;
254
0
}
255
256
0
Context::Properties& Context::getMutableProperties() {
257
0
  return m_properties;
258
0
}
259
260
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
261
// DefaultInitializedContext
262
263
DefaultInitializedContext::DefaultInitializedContext(StreamType streamType)
264
4
  : m_streamType(streamType)
265
4
{}
266
267
DefaultInitializedContext::DefaultInitializedContext(StreamType streamType, Properties&& properties)
268
0
  : Context(std::forward<Properties>(properties))
269
0
  , m_streamType(streamType)
270
0
{}
271
272
0
void DefaultInitializedContext::init() {
273
  // DO NOTHING
274
0
}
275
276
0
async::CoroutineStarter DefaultInitializedContext::initAsync() {
277
0
  return nullptr;
278
0
}
279
280
0
bool DefaultInitializedContext::isInitialized() const {
281
0
  return true;
282
0
}
283
284
0
StreamType DefaultInitializedContext::getStreamType() const {
285
0
  return m_streamType;
286
0
}
287
288
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
289
// IOStream
290
291
0
void IOStream::initContexts() {
292
293
0
  auto& inStreamContext = getInputStreamContext();
294
0
  if (!inStreamContext.isInitialized()) {
295
0
    inStreamContext.init();
296
0
  }
297
298
0
  auto& outStreamContext = getOutputStreamContext();
299
0
  if(outStreamContext != inStreamContext && !outStreamContext.isInitialized()) {
300
0
    outStreamContext.init();
301
0
  }
302
303
0
}
304
305
/**
306
 * Init input/output stream contexts in an async manner.
307
 */
308
0
async::CoroutineStarter IOStream::initContextsAsync() {
309
310
0
  async::CoroutineStarter starter(nullptr);
311
312
0
  auto& inStreamContext = getInputStreamContext();
313
0
  if (!inStreamContext.isInitialized()) {
314
0
    starter.next(inStreamContext.initAsync());
315
0
  }
316
317
0
  auto& outStreamContext = getOutputStreamContext();
318
0
  if(outStreamContext != inStreamContext && !outStreamContext.isInitialized()) {
319
0
    starter.next(outStreamContext.initAsync());
320
0
  }
321
322
0
  return starter;
323
324
0
}
325
326
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
327
// ConsistentOutputStream
328
329
123k
v_io_size ConsistentOutputStream::writeAsString(v_int8 value){
330
123k
  v_char8 a[16];
331
123k
  auto size = utils::Conversion::int32ToCharSequence(value, &a[0], 16);
332
123k
  if(size > 0){
333
123k
    return writeSimple(&a[0], size);
334
123k
  }
335
0
  return 0;
336
123k
}
337
338
16.2k
v_io_size ConsistentOutputStream::writeAsString(v_uint8 value){
339
16.2k
  v_char8 a[16];
340
16.2k
  auto size = utils::Conversion::uint32ToCharSequence(value, &a[0], 16);
341
16.2k
  if(size > 0){
342
16.2k
    return writeSimple(&a[0], size);
343
16.2k
  }
344
0
  return 0;
345
16.2k
}
346
347
10
v_io_size ConsistentOutputStream::writeAsString(v_int16 value){
348
10
  v_char8 a[16];
349
10
  auto size = utils::Conversion::int32ToCharSequence(value, &a[0], 16);
350
10
  if(size > 0){
351
10
    return writeSimple(&a[0], size);
352
10
  }
353
0
  return 0;
354
10
}
355
356
10
v_io_size ConsistentOutputStream::writeAsString(v_uint16 value){
357
10
  v_char8 a[16];
358
10
  auto size = utils::Conversion::uint32ToCharSequence(value, &a[0], 16);
359
10
  if(size > 0){
360
10
    return writeSimple(&a[0], size);
361
10
  }
362
0
  return 0;
363
10
}
364
365
12
v_io_size ConsistentOutputStream::writeAsString(v_int32 value){
366
12
  v_char8 a[16];
367
12
  auto size = utils::Conversion::int32ToCharSequence(value, &a[0], 16);
368
12
  if(size > 0){
369
12
    return writeSimple(&a[0], size);
370
12
  }
371
0
  return 0;
372
12
}
373
374
2.08k
v_io_size ConsistentOutputStream::writeAsString(v_uint32 value){
375
2.08k
  v_char8 a[16];
376
2.08k
  auto size = utils::Conversion::uint32ToCharSequence(value, &a[0], 16);
377
2.08k
  if(size > 0){
378
2.08k
    return writeSimple(&a[0], size);
379
2.08k
  }
380
0
  return 0;
381
2.08k
}
382
383
533k
v_io_size ConsistentOutputStream::writeAsString(v_int64 value){
384
533k
  v_char8 a[32];
385
533k
  auto size = utils::Conversion::int64ToCharSequence(value, &a[0], 32);
386
533k
  if(size > 0){
387
533k
    return writeSimple(&a[0], size);
388
533k
  }
389
0
  return 0;
390
533k
}
391
392
10
v_io_size ConsistentOutputStream::writeAsString(v_uint64 value){
393
10
  v_char8 a[32];
394
10
  auto size = utils::Conversion::uint64ToCharSequence(value, &a[0], 32);
395
10
  if(size > 0){
396
10
    return writeSimple(&a[0], size);
397
10
  }
398
0
  return 0;
399
10
}
400
401
12
v_io_size ConsistentOutputStream::writeAsString(v_float32 value){
402
12
  v_char8 a[100];
403
12
  auto size = utils::Conversion::float32ToCharSequence(value, &a[0], 100);
404
12
  if(size > 0){
405
12
    return writeSimple(&a[0], size);
406
12
  }
407
0
  return 0;
408
12
}
409
410
2.14k
v_io_size ConsistentOutputStream::writeAsString(v_float64 value){
411
2.14k
  v_char8 a[100];
412
2.14k
  auto size = utils::Conversion::float64ToCharSequence(value, &a[0], 100);
413
2.14k
  if(size > 0){
414
2.14k
    return writeSimple(&a[0], size);
415
2.14k
  }
416
0
  return 0;
417
2.14k
}
418
  
419
1.34k
v_io_size ConsistentOutputStream::writeAsString(bool value) {
420
1.34k
  if(value){
421
773
    return writeSimple("true", 4);
422
773
  } else {
423
576
    return writeSimple("false", 5);
424
576
  }
425
1.34k
}
426
427
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
428
// Other functions
429
430
431
173k
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const oatpp::String& str) {
432
173k
  if(str) {
433
173k
    s.writeSimple(str);
434
173k
  } else {
435
0
    s.writeSimple("[<String(null)>]");
436
0
  }
437
173k
  return s;
438
173k
}
439
440
0
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int8& value) {
441
0
  if(value.getPtr()) {
442
0
    return operator << (s, *value);
443
0
  }
444
0
  s.writeSimple("[<Int8(null)>]");
445
0
  return s;
446
0
}
447
448
0
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt8& value) {
449
0
  if(value.getPtr()) {
450
0
    return operator << (s, *value);
451
0
  }
452
0
  s.writeSimple("[<UInt8(null)>]");
453
0
  return s;
454
0
}
455
456
0
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int16& value) {
457
0
  if(value.getPtr()) {
458
0
    return operator << (s, *value);
459
0
  }
460
0
  s.writeSimple("[<Int16(null)>]");
461
0
  return s;
462
0
}
463
464
0
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt16& value) {
465
0
  if(value.getPtr()) {
466
0
    return operator << (s, *value);
467
0
  }
468
0
  s.writeSimple("[<UInt16(null)>]");
469
0
  return s;
470
0
}
471
472
0
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int32& value) {
473
0
  if(value.getPtr()) {
474
0
    return operator << (s, *value);
475
0
  }
476
0
  s.writeSimple("[<Int32(null)>]");
477
0
  return s;
478
0
}
479
480
0
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt32& value) {
481
0
  if(value.getPtr()) {
482
0
    return operator << (s, *value);
483
0
  }
484
0
  s.writeSimple("[<UInt32(null)>]");
485
0
  return s;
486
0
}
487
488
0
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int64& value) {
489
0
  if(value.getPtr()) {
490
0
    return operator << (s, *value);
491
0
  }
492
0
  s.writeSimple("[<Int64(null)>]");
493
0
  return s;
494
0
}
495
496
0
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt64& value) {
497
0
  if(value.getPtr()) {
498
0
    return operator << (s, *value);
499
0
  }
500
0
  s.writeSimple("[<UInt64(null)>]");
501
0
  return s;
502
0
}
503
504
0
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Float32& value) {
505
0
  if(value.getPtr()) {
506
0
    return operator << (s, *value);
507
0
  }
508
0
  s.writeSimple("[<Float32(null)>]");
509
0
  return s;
510
0
}
511
512
0
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Float64& value) {
513
0
  if(value.getPtr()) {
514
0
    return operator << (s, *value);
515
0
  }
516
0
  s.writeSimple("[<Float64(null)>]");
517
0
  return s;
518
0
}
519
520
0
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Boolean& value) {
521
0
  if(value.getPtr()) { // use getPtr() here to avoid false to nullptr conversion
522
0
    return operator << (s, *value);
523
0
  }
524
0
  s.writeSimple("[<Boolean(null)>]");
525
0
  return s;
526
0
}
527
528
162k
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const char* str) {
529
162k
  if(str != nullptr) {
530
162k
    s.writeSimple(str);
531
162k
  } else {
532
0
    s.writeSimple("[<char*(null)>]");
533
0
  }
534
162k
  return s;
535
162k
}
536
537
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
538
// DataTransferProcessor
539
540
StatelessDataTransferProcessor StatelessDataTransferProcessor::INSTANCE;
541
542
0
v_io_size StatelessDataTransferProcessor::suggestInputStreamReadSize() {
543
0
  return 32767;
544
0
}
545
546
0
v_int32 StatelessDataTransferProcessor::iterate(data::buffer::InlineReadData& dataIn, data::buffer::InlineReadData& dataOut) {
547
548
0
  if(dataOut.bytesLeft > 0) {
549
0
    return Error::FLUSH_DATA_OUT;
550
0
  }
551
552
0
  if(dataIn.currBufferPtr != nullptr) {
553
554
0
    if(dataIn.bytesLeft == 0){
555
0
      return Error::PROVIDE_DATA_IN;
556
0
    }
557
558
0
    dataOut = dataIn;
559
0
    dataIn.setEof();
560
0
    return Error::FLUSH_DATA_OUT;
561
562
0
  }
563
564
0
  dataOut = dataIn;
565
0
  dataIn.setEof();
566
0
  return Error::FINISHED;
567
568
0
}
569
570
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
571
// Other functions
572
573
v_io_size transfer(const base::ObjectHandle<ReadCallback>& readCallback,
574
                         const base::ObjectHandle<WriteCallback>& writeCallback,
575
                         v_io_size transferSize,
576
                         void* buffer,
577
                         v_buff_size bufferSize,
578
                         const base::ObjectHandle<data::buffer::Processor>& processor)
579
0
{
580
581
0
  data::buffer::InlineReadData inData;
582
0
  data::buffer::InlineReadData outData;
583
584
0
  v_int32 procRes = data::buffer::Processor::Error::PROVIDE_DATA_IN;
585
0
  v_io_size progress = 0;
586
587
0
  while(procRes != data::buffer::Processor::Error::FINISHED) {
588
589
0
    if(procRes == data::buffer::Processor::Error::PROVIDE_DATA_IN && inData.bytesLeft == 0) {
590
591
0
      v_buff_size desiredToRead = processor->suggestInputStreamReadSize();
592
593
0
      if (desiredToRead > bufferSize) {
594
0
        desiredToRead = bufferSize;
595
0
      }
596
597
0
      if(transferSize > 0 && progress + desiredToRead > transferSize) {
598
0
        desiredToRead = transferSize - progress;
599
0
      }
600
601
0
      v_io_size res = 0;
602
603
0
      if(desiredToRead > 0) {
604
0
        res = IOError::RETRY_READ;
605
0
        while (res == IOError::RETRY_READ || res == IOError::RETRY_WRITE) {
606
0
          res = readCallback->readSimple(buffer, desiredToRead);
607
0
        }
608
0
      }
609
610
0
      if (res > 0) {
611
0
        inData.set(buffer, res);
612
0
        progress += res;
613
0
      } else {
614
0
        inData.set(nullptr, 0);
615
0
      }
616
617
0
    }
618
619
0
    procRes = data::buffer::Processor::Error::OK;
620
0
    while(procRes == data::buffer::Processor::Error::OK) {
621
0
      procRes = processor->iterate(inData, outData);
622
0
    }
623
624
0
    switch(procRes) {
625
626
0
      case data::buffer::Processor::Error::PROVIDE_DATA_IN: {
627
0
        continue;
628
0
      }
629
630
0
      case data::buffer::Processor::Error::FLUSH_DATA_OUT: {
631
0
        v_io_size res = IOError::RETRY_WRITE;
632
0
        while(res == IOError::RETRY_WRITE || res == IOError::RETRY_READ) {
633
0
          res = writeCallback->writeSimple(outData.currBufferPtr, outData.bytesLeft);
634
0
        }
635
0
        if(res > 0) {
636
0
          outData.inc(res);
637
0
        } else {
638
0
          return progress;
639
0
        }
640
0
        break;
641
0
      }
642
643
0
      case data::buffer::Processor::Error::FINISHED:
644
0
        return progress;
645
646
0
      default:
647
        //throw std::runtime_error("Unknown buffer processor error.");
648
0
        return progress;
649
650
0
    }
651
652
0
  }
653
654
0
  return progress;
655
656
0
}
657
658
async::CoroutineStarter transferAsync(const base::ObjectHandle<ReadCallback>& readCallback,
659
                                      const base::ObjectHandle<WriteCallback>& writeCallback,
660
                                      v_buff_size transferSize,
661
                                      const base::ObjectHandle<data::buffer::IOBuffer>& buffer,
662
                                      const base::ObjectHandle<data::buffer::Processor>& processor)
663
0
{
664
665
0
  class TransferCoroutine : public oatpp::async::Coroutine<TransferCoroutine> {
666
0
  private:
667
0
    base::ObjectHandle<ReadCallback> m_readCallback;
668
0
    base::ObjectHandle<WriteCallback> m_writeCallback;
669
0
    v_buff_size m_transferSize;
670
0
    base::ObjectHandle<oatpp::data::buffer::IOBuffer> m_buffer;
671
0
    base::ObjectHandle<data::buffer::Processor> m_processor;
672
0
  private:
673
0
    v_buff_size m_progress;
674
0
  private:
675
0
    v_int32 m_procRes;
676
0
    data::buffer::InlineReadData m_readData;
677
0
    data::buffer::InlineWriteData m_writeData;
678
0
    data::buffer::InlineReadData m_inData;
679
0
    data::buffer::InlineReadData m_outData;
680
0
  public:
681
682
0
    TransferCoroutine(const base::ObjectHandle<ReadCallback>& readCallback,
683
0
                      const base::ObjectHandle<WriteCallback>& writeCallback,
684
0
                      v_buff_size transferSize,
685
0
                      const base::ObjectHandle<buffer::IOBuffer>& buffer,
686
0
                      const base::ObjectHandle<buffer::Processor>& processor)
687
0
      : m_readCallback(readCallback)
688
0
      , m_writeCallback(writeCallback)
689
0
      , m_transferSize(transferSize)
690
0
      , m_buffer(buffer)
691
0
      , m_processor(processor)
692
0
      , m_progress(0)
693
0
      , m_procRes(data::buffer::Processor::Error::PROVIDE_DATA_IN)
694
0
      , m_readData(buffer->getData(), buffer->getSize())
695
0
    {}
696
697
0
    Action act() override {
698
699
0
      if(m_procRes == data::buffer::Processor::Error::FINISHED) {
700
0
        return finish();
701
0
      }
702
703
0
      if(m_procRes == data::buffer::Processor::Error::PROVIDE_DATA_IN && m_inData.bytesLeft == 0) {
704
705
0
        auto desiredToRead = m_processor->suggestInputStreamReadSize();
706
707
0
        if (desiredToRead > m_readData.bytesLeft) {
708
0
          desiredToRead = m_readData.bytesLeft;
709
0
        }
710
711
0
        if(m_transferSize > 0 && m_progress + desiredToRead > m_transferSize) {
712
0
          desiredToRead = m_transferSize - m_progress;
713
0
        }
714
715
0
        Action action;
716
0
        v_io_size res = 0;
717
718
0
        if(desiredToRead > 0) {
719
0
          res = m_readCallback->read(m_readData.currBufferPtr, desiredToRead, action);
720
0
        }
721
722
0
        if (res > 0) {
723
0
          m_readData.inc(res);
724
0
          m_inData.set(m_buffer->getData(), m_buffer->getSize() - m_readData.bytesLeft);
725
0
          m_progress += res;
726
0
        } else {
727
728
0
          switch(res) {
729
730
0
            case IOError::BROKEN_PIPE:
731
0
              if(m_transferSize > 0) {
732
0
                return error<AsyncTransferError>("[oatpp::data::stream::transferAsync]: Error. ReadCallback. BROKEN_PIPE.");
733
0
              }
734
0
              m_inData.set(nullptr, 0);
735
0
              break;
736
737
0
            case IOError::ZERO_VALUE:
738
0
              m_inData.set(nullptr, 0);
739
0
              break;
740
741
0
            case IOError::RETRY_READ:
742
0
              if(!action.isNone()) {
743
0
                return action;
744
0
              }
745
0
              return repeat();
746
747
0
            case IOError::RETRY_WRITE:
748
0
              if(!action.isNone()) {
749
0
                return action;
750
0
              }
751
0
              return repeat();
752
753
0
            default:
754
0
              if(m_transferSize > 0) {
755
0
                if (!action.isNone()) {
756
0
                  return action;
757
0
                }
758
0
                return error<AsyncTransferError>("[oatpp::data::stream::transferAsync]: Error. ReadCallback. Unknown IO error.");
759
0
              }
760
0
              m_inData.set(nullptr, 0);
761
762
0
          }
763
764
0
        }
765
766
0
        if(!action.isNone()){
767
0
          return action;
768
0
        }
769
770
0
      }
771
772
0
      return yieldTo(&TransferCoroutine::process);
773
774
0
    }
775
776
0
    Action process() {
777
778
0
      m_procRes = m_processor->iterate(m_inData, m_outData);
779
780
0
      switch(m_procRes) {
781
782
0
        case data::buffer::Processor::Error::OK:
783
0
          return repeat();
784
785
0
        case data::buffer::Processor::Error::PROVIDE_DATA_IN: {
786
0
          m_readData.set(m_buffer->getData(), m_buffer->getSize());
787
0
          return yieldTo(&TransferCoroutine::act);
788
0
        }
789
790
0
        case data::buffer::Processor::Error::FLUSH_DATA_OUT: {
791
0
          m_readData.set(m_buffer->getData(), m_buffer->getSize());
792
0
          m_writeData.set(m_outData.currBufferPtr, m_outData.bytesLeft);
793
0
          m_outData.setEof();
794
0
          return yieldTo(&TransferCoroutine::flushData);
795
0
        }
796
797
0
        case data::buffer::Processor::Error::FINISHED:
798
0
          return finish();
799
800
0
        default:
801
0
          return error<AsyncTransferError>("[oatpp::data::stream::transferAsync]: Error. ReadCallback. Unknown processing error.");
802
803
0
      }
804
805
0
    }
806
807
0
    Action flushData() {
808
0
      return m_writeCallback->writeExactSizeDataAsyncInline(m_writeData, yieldTo(&TransferCoroutine::act));
809
0
    }
810
811
0
  };
812
813
0
  return TransferCoroutine::start(readCallback, writeCallback, transferSize, buffer, processor);
814
815
0
}
816
  
817
}}}