Coverage Report

Created: 2025-10-04 07:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/oatpp/src/oatpp/data/stream/Stream.hpp
Line
Count
Source
1
/***************************************************************************
2
 *
3
 * Project         _____    __   ____   _      _
4
 *                (  _  )  /__\ (_  _)_| |_  _| |_
5
 *                 )(_)(  /(__)\  )( (_   _)(_   _)
6
 *                (_____)(__)(__)(__)  |_|    |_|
7
 *
8
 *
9
 * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
10
 *
11
 * Licensed under the Apache License, Version 2.0 (the "License");
12
 * you may not use this file except in compliance with the License.
13
 * You may obtain a copy of the License at
14
 *
15
 *     http://www.apache.org/licenses/LICENSE-2.0
16
 *
17
 * Unless required by applicable law or agreed to in writing, software
18
 * distributed under the License is distributed on an "AS IS" BASIS,
19
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
 * See the License for the specific language governing permissions and
21
 * limitations under the License.
22
 *
23
 ***************************************************************************/
24
25
#ifndef oatpp_data_Stream
26
#define oatpp_data_Stream
27
28
#include "oatpp/data/share/LazyStringMap.hpp"
29
#include "oatpp/async/Coroutine.hpp"
30
#include "oatpp/data/buffer/IOBuffer.hpp"
31
#include "oatpp/data/buffer/Processor.hpp"
32
33
#include "oatpp/IODefinitions.hpp"
34
35
namespace oatpp { namespace data{ namespace stream {
36
37
/**
38
 * Stream Type.
39
 */
40
enum StreamType : v_int32 {
41
42
  /**
43
   * Finite stream.
44
   */
45
  STREAM_FINITE = 0,
46
47
  /**
48
   * Infinite stream.
49
   */
50
  STREAM_INFINITE = 1
51
52
};
53
54
55
/**
56
 * Stream Context.
57
 */
58
class Context {
59
public:
60
  /**
61
   * Convenience typedef for &id:oatpp::data::share::LazyStringMap;.
62
   */
63
  typedef oatpp::data::share::LazyStringMap<oatpp::data::share::StringKeyLabel> Properties;
64
private:
65
  Properties m_properties;
66
protected:
67
  /**
68
   * `protected`. Get mutable additional optional context specific properties.
69
   * @return - &l:Context::Properties;.
70
   */
71
  Properties& getMutableProperties();
72
public:
73
74
  /**
75
   * Default constructor.
76
   */
77
4
  Context() = default;
78
79
  /**
80
   * Constructor.
81
   * @param properties - &l:Context::Properties;.
82
   */
83
  Context(Properties&& properties);
84
85
  /**
86
   * Virtual destructor.
87
   */
88
0
  virtual ~Context() = default;
89
90
  /**
91
   * Initialize stream context.
92
   */
93
  virtual void init() = 0;
94
95
  /**
96
   * Initialize stream context in an async manner.
97
   * @return - &id:oatpp::async::CoroutineStarter;.
98
   */
99
  virtual async::CoroutineStarter initAsync() = 0;
100
101
  /**
102
   * Check if the stream context is initialized.
103
   * @return - `bool`.
104
   */
105
  virtual bool isInitialized() const = 0;
106
107
  /**
108
   * Get stream type.
109
   * @return - &l:StreamType;.
110
   */
111
  virtual StreamType getStreamType() const = 0;
112
113
  /**
114
   * Additional optional context specific properties.
115
   * @return - &l:Context::Properties;.
116
   */
117
  const Properties& getProperties() const;
118
119
0
  inline bool operator == (const Context& other){
120
0
    return this == &other;
121
0
  }
122
123
0
  inline bool operator != (const Context& other){
124
0
    return this != &other;
125
0
  }
126
127
};
128
129
/**
130
 * The default implementation for context with no initialization.
131
 */
132
class DefaultInitializedContext : public oatpp::data::stream::Context {
133
private:
134
  StreamType m_streamType;
135
public:
136
137
  /**
138
   * Constructor.
139
   * @param streamType - &l:StreamType;.
140
   */
141
  DefaultInitializedContext(StreamType streamType);
142
143
  /**
144
   * Constructor.
145
   * @param streamType - &l:StreamType;.
146
   * @param properties - &l:Context::Properties;.
147
   */
148
  DefaultInitializedContext(StreamType streamType, Properties&& properties);
149
150
  /**
151
   * Initialize stream context. <br>
152
   * *This particular implementation does nothing.*
153
   */
154
  void init() override;
155
156
  /**
157
   * Initialize stream context in an async manner.
158
   * *This particular implementation does nothing.*
159
   * @return - &id:oatpp::async::CoroutineStarter;.
160
   */
161
  async::CoroutineStarter initAsync() override;
162
163
  /**
164
   * Check if the stream context is initialized.
165
   * *This particular implementation always returns `true`.*
166
   * @return - `bool`.
167
   */
168
  bool isInitialized() const override;
169
170
  /**
171
   * Get stream type.
172
   * @return - &l:StreamType;.
173
   */
174
  StreamType getStreamType() const override;
175
176
};
177
178
/**
179
 * Stream I/O mode.
180
 */
181
enum IOMode : v_int32 {
182
183
  /**
184
   * Blocking stream I/O mode.
185
   */
186
  BLOCKING = 0,
187
188
  /**
189
   * Non-blocking stream I/O mode.
190
   */
191
  ASYNCHRONOUS = 1
192
};
193
194
/**
195
 * Callback for stream write operation.
196
 */
197
class WriteCallback {
198
public:
199
200
  /**
201
   * Default virtual destructor.
202
   */
203
182k
  virtual ~WriteCallback() = default;
204
205
  /**
206
   * Write operation callback.
207
   * @param data - pointer to data.
208
   * @param count - size of the data in bytes.
209
   * @param action - async specific action. If action is NOT &id:oatpp::async::Action::TYPE_NONE;, then
210
   * caller MUST return this action on coroutine iteration.
211
   * @return - actual number of bytes written. 0 - to indicate end-of-file.
212
   */
213
  virtual v_io_size write(const void *data, v_buff_size count, async::Action& action) = 0;
214
215
  v_io_size write(data::buffer::InlineWriteData& inlineData, async::Action& action);
216
217
  v_io_size writeSimple(const void *data, v_buff_size count);
218
219
  v_io_size writeExactSizeDataSimple(data::buffer::InlineWriteData& inlineData);
220
221
  v_io_size writeExactSizeDataSimple(const void *data, v_buff_size count);
222
223
  async::Action writeExactSizeDataAsyncInline(data::buffer::InlineWriteData& inlineData, async::Action&& nextAction);
224
225
  async::CoroutineStarter writeExactSizeDataAsync(const void* data, v_buff_size size);
226
227
  /**
228
   * Same as `write((p_char8)data, std::strlen(data));`.
229
   * @param data - data to write.
230
   * @return - actual number of bytes written. &id:oatpp::v_io_size;.
231
   */
232
240k
  v_io_size writeSimple(const char* data){
233
240k
    return writeSimple(data, static_cast<v_buff_size>(std::strlen(data)));
234
240k
  }
235
236
  /**
237
   * Same as `write(str->getData(), str->getSize());`
238
   * @param str - data to write.
239
   * @return - actual number of bytes written. &id:oatpp::v_io_size;.
240
   */
241
1.46M
  v_io_size writeSimple(const oatpp::String& str){
242
1.46M
    return writeSimple(str->data(), static_cast<v_buff_size>(str->size()));
243
1.46M
  }
244
245
  /**
246
   * Same as `write(&c, 1);`.
247
   * @param c - one char to write.
248
   * @return - actual number of bytes written. &id:oatpp::v_io_size;.
249
   */
250
2.72M
  v_io_size writeCharSimple(v_char8 c){
251
2.72M
    return writeSimple(&c, 1);
252
2.72M
  }
253
254
};
255
256
/**
257
 * Output Stream.
258
 */
259
class OutputStream : public WriteCallback {
260
public:
261
262
  /**
263
   * Default virtual destructor.
264
   */
265
  virtual ~OutputStream() override = default;
266
267
  /**
268
   * Set stream I/O mode.
269
   * @throws
270
   */
271
  virtual void setOutputStreamIOMode(IOMode ioMode) = 0;
272
273
  /**
274
   * Get stream I/O mode.
275
   * @return
276
   */
277
  virtual IOMode getOutputStreamIOMode() = 0;
278
279
  /**
280
   * Get stream context.
281
   * @return - &l:Context;.
282
   */
283
  virtual Context& getOutputStreamContext() = 0;
284
285
};
286
287
/**
288
 * Stream read callback.
289
 */
290
class ReadCallback {
291
public:
292
293
  /**
294
   * Default virtual destructor.
295
   */
296
0
  virtual ~ReadCallback() = default;
297
298
  /**
299
   * Read operation callback.
300
   * @param buffer - pointer to buffer.
301
   * @param count - size of the buffer in bytes.
302
   * @param action - async specific action. If action is NOT &id:oatpp::async::Action::TYPE_NONE;, then
303
   * caller MUST return this action on coroutine iteration.
304
   * @return - actual number of bytes written to buffer. 0 - to indicate end-of-file.
305
   */
306
  virtual v_io_size read(void *buffer, v_buff_size count, async::Action& action) = 0;
307
308
  v_io_size read(data::buffer::InlineReadData& inlineData, async::Action& action);
309
310
  v_io_size readExactSizeDataSimple(data::buffer::InlineReadData& inlineData);
311
312
  v_io_size readExactSizeDataSimple(void *data, v_buff_size count);
313
314
  async::Action readExactSizeDataAsyncInline(data::buffer::InlineReadData& inlineData, async::Action&& nextAction);
315
316
  async::Action readSomeDataAsyncInline(data::buffer::InlineReadData& inlineData, async::Action&& nextAction);
317
318
  v_io_size readSimple(void *data, v_buff_size count);
319
320
};
321
322
/**
323
 * Input Stream.
324
 */
325
class InputStream : public ReadCallback {
326
public:
327
328
  /**
329
   * Default virtual destructor.
330
   */
331
  virtual ~InputStream() override = default;
332
333
  /**
334
   * Set stream I/O mode.
335
   * @throws
336
   */
337
  virtual void setInputStreamIOMode(IOMode ioMode) = 0;
338
339
  /**
340
   * Get stream I/O mode.
341
   * @return
342
   */
343
  virtual IOMode getInputStreamIOMode() = 0;
344
345
  /**
346
   * Get stream context.
347
   * @return - &l:Context;.
348
   */
349
  virtual Context& getInputStreamContext() = 0;
350
351
};
352
353
/**
354
 * Buffered Input Stream
355
 */
356
class BufferedInputStream : public InputStream {
357
 public:
358
  /**
359
   * Default virtual destructor.
360
   */
361
  virtual ~BufferedInputStream() override = default;
362
363
  /**
364
   * Peek up to count of bytes int he buffer
365
   * @param data
366
   * @param count
367
   * @return [1..count], IOErrors.
368
   */
369
  virtual v_io_size peek(void *data, v_buff_size count, async::Action& action) = 0;
370
371
  /**
372
   * Amount of bytes currently available to read from buffer.
373
   * @return &id:oatpp::v_io_size;.
374
   */
375
  virtual v_io_size availableToRead() const = 0;
376
377
  /**
378
   * Commit read offset
379
   * @param count
380
   * @return [1..count], IOErrors.
381
   */
382
  virtual v_io_size commitReadOffset(v_buff_size count) = 0;
383
};
384
385
/**
386
 * I/O Stream.
387
 */
388
class IOStream : public InputStream, public OutputStream {
389
public:
390
391
  /**
392
   * Init input/output stream contexts.
393
   */
394
  void initContexts();
395
396
  /**
397
   * Init input/output stream contexts in an async manner.
398
   */
399
  async::CoroutineStarter initContextsAsync();
400
401
};
402
403
/**
404
 * Streams that guarantee data to be written in exact amount as specified in call to &l:OutputStream::write (); should extend this class.
405
 */
406
class ConsistentOutputStream : public OutputStream {
407
public:
408
409
  /**
410
   * Convert value to string and write to stream.
411
   * @param value
412
   * @return - actual number of bytes written. &id:oatpp::v_io_size;. <br>
413
   */
414
  v_io_size writeAsString(v_int8 value);
415
416
  /**
417
   * Convert value to string and write to stream.
418
   * @param value
419
   * @return - actual number of bytes written. &id:oatpp::v_io_size;. <br>
420
   */
421
  v_io_size writeAsString(v_uint8 value);
422
423
  /**
424
   * Convert value to string and write to stream.
425
   * @param value
426
   * @return - actual number of bytes written. &id:oatpp::v_io_size;. <br>
427
   */
428
  v_io_size writeAsString(v_int16 value);
429
430
  /**
431
   * Convert value to string and write to stream.
432
   * @param value
433
   * @return - actual number of bytes written. &id:oatpp::v_io_size;. <br>
434
   */
435
  v_io_size writeAsString(v_uint16 value);
436
437
  /**
438
   * Convert value to string and write to stream.
439
   * @param value
440
   * @return - actual number of bytes written. &id:oatpp::v_io_size;. <br>
441
   */
442
  v_io_size writeAsString(v_int32 value);
443
444
  /**
445
   * Convert value to string and write to stream.
446
   * @param value
447
   * @return - actual number of bytes written. &id:oatpp::v_io_size;. <br>
448
   */
449
  v_io_size writeAsString(v_uint32 value);
450
451
  /**
452
   * Convert value to string and write to stream.
453
   * @param value
454
   * @return - actual number of bytes written. &id:oatpp::v_io_size;. <br>
455
   */
456
  v_io_size writeAsString(v_int64 value);
457
458
  /**
459
   * Convert value to string and write to stream.
460
   * @param value
461
   * @return - actual number of bytes written. &id:oatpp::v_io_size;. <br>
462
   */
463
  v_io_size writeAsString(v_uint64 value);
464
465
  /**
466
   * Convert value to string and write to stream.
467
   * @param value
468
   * @return - actual number of bytes written. &id:oatpp::v_io_size;. <br>
469
   */
470
  v_io_size writeAsString(v_float32 value);
471
472
  /**
473
   * Convert value to string and write to stream.
474
   * @param value
475
   * @return - actual number of bytes written. &id:oatpp::v_io_size;. <br>
476
   */
477
  v_io_size writeAsString(v_float64 value);
478
479
  /**
480
   * Convert value to string and write to stream.
481
   * @param value
482
   * @return - actual number of bytes written. &id:oatpp::v_io_size;. <br>
483
   */
484
  v_io_size writeAsString(bool value);
485
486
};
487
488
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const oatpp::String& str);
489
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int8& value);
490
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt8& value);
491
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int16& value);
492
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt16& value);
493
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int32& value);
494
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt32& value);
495
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Int64& value);
496
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const UInt64& value);
497
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Float32& value);
498
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Float64& value);
499
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const Boolean& value);
500
501
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const char* str);
502
503
template<typename T>
504
82
ConsistentOutputStream& operator << (ConsistentOutputStream& s, T value) {
505
82
  s.writeAsString(value);
506
82
  return s;
507
82
}
Unexecuted instantiation: oatpp::data::stream::ConsistentOutputStream& oatpp::data::stream::operator<< <signed char>(oatpp::data::stream::ConsistentOutputStream&, signed char)
Unexecuted instantiation: oatpp::data::stream::ConsistentOutputStream& oatpp::data::stream::operator<< <unsigned char>(oatpp::data::stream::ConsistentOutputStream&, unsigned char)
Unexecuted instantiation: oatpp::data::stream::ConsistentOutputStream& oatpp::data::stream::operator<< <short>(oatpp::data::stream::ConsistentOutputStream&, short)
Unexecuted instantiation: oatpp::data::stream::ConsistentOutputStream& oatpp::data::stream::operator<< <unsigned short>(oatpp::data::stream::ConsistentOutputStream&, unsigned short)
Unexecuted instantiation: oatpp::data::stream::ConsistentOutputStream& oatpp::data::stream::operator<< <int>(oatpp::data::stream::ConsistentOutputStream&, int)
Unexecuted instantiation: oatpp::data::stream::ConsistentOutputStream& oatpp::data::stream::operator<< <unsigned int>(oatpp::data::stream::ConsistentOutputStream&, unsigned int)
oatpp::data::stream::ConsistentOutputStream& oatpp::data::stream::operator<< <long>(oatpp::data::stream::ConsistentOutputStream&, long)
Line
Count
Source
504
27
ConsistentOutputStream& operator << (ConsistentOutputStream& s, T value) {
505
27
  s.writeAsString(value);
506
27
  return s;
507
27
}
Unexecuted instantiation: oatpp::data::stream::ConsistentOutputStream& oatpp::data::stream::operator<< <unsigned long>(oatpp::data::stream::ConsistentOutputStream&, unsigned long)
Unexecuted instantiation: oatpp::data::stream::ConsistentOutputStream& oatpp::data::stream::operator<< <float>(oatpp::data::stream::ConsistentOutputStream&, float)
oatpp::data::stream::ConsistentOutputStream& oatpp::data::stream::operator<< <double>(oatpp::data::stream::ConsistentOutputStream&, double)
Line
Count
Source
504
51
ConsistentOutputStream& operator << (ConsistentOutputStream& s, T value) {
505
51
  s.writeAsString(value);
506
51
  return s;
507
51
}
oatpp::data::stream::ConsistentOutputStream& oatpp::data::stream::operator<< <bool>(oatpp::data::stream::ConsistentOutputStream&, bool)
Line
Count
Source
504
4
ConsistentOutputStream& operator << (ConsistentOutputStream& s, T value) {
505
4
  s.writeAsString(value);
506
4
  return s;
507
4
}
508
509
/**
510
 * Error of Asynchronous stream transfer.
511
 */
512
class AsyncTransferError : public oatpp::async::Error {
513
public:
514
  /**
515
   * Constructor.
516
   * @param what
517
   */
518
0
  AsyncTransferError(const char* what) : oatpp::async::Error(what) {}
519
};
520
521
/**
522
 * Plain data transfer processor.
523
 * Transfers data as is.
524
 */
525
class StatelessDataTransferProcessor : public data::buffer::Processor {
526
public:
527
  static StatelessDataTransferProcessor INSTANCE;
528
public:
529
  v_io_size suggestInputStreamReadSize() override;
530
  v_int32 iterate(data::buffer::InlineReadData& dataIn, data::buffer::InlineReadData& dataOut) override;
531
};
532
533
/**
534
 * Transfer data from `readCallback` to `writeCallback`.
535
 * @param readCallback - &l:ReadCallback;.
536
 * @param writeCallback - &l:WriteCallback;.
537
 * @param transferSize - how much data should be read from the `readCallback`. `0` - to read until error.
538
 * @param buffer - pointer to buffer used to do the transfer by chunks.
539
 * @param bufferSize - size of the buffer.
540
 * @param processor - data processing to be applied during the transfer.
541
 * @return - the actual amout of bytes read from the `readCallback`.
542
 */
543
v_io_size transfer(const base::ObjectHandle<ReadCallback>& readCallback,
544
                   const base::ObjectHandle<WriteCallback>& writeCallback,
545
                   v_io_size transferSize,
546
                   void* buffer,
547
                   v_buff_size bufferSize,
548
                   const base::ObjectHandle<data::buffer::Processor>& processor = &StatelessDataTransferProcessor::INSTANCE);
549
550
/**
551
 * Transfer data from `readCallback` to `writeCallback` in Async manner.
552
 * @param readCallback - &l:ReadCallback;.
553
 * @param writeCallback - &l:WriteCallback;.
554
 * @param transferSize - how much data should be read from the `readCallback`. `0` - to read until error.
555
 * @param buffer - &id:oatpp::data::buffer::IOBuffer; used to do the transfer by chunks.
556
 * @param processor - data processing to be applied during the transfer.
557
 * @return - &id:oatpp::async::CoroutineStarter;.
558
 */
559
async::CoroutineStarter transferAsync(const base::ObjectHandle<ReadCallback>& readCallback,
560
                                      const base::ObjectHandle<WriteCallback>& writeCallback,
561
                                      v_buff_size transferSize,
562
                                      const base::ObjectHandle<data::buffer::IOBuffer>& buffer,
563
                                      const base::ObjectHandle<data::buffer::Processor>& processor = &StatelessDataTransferProcessor::INSTANCE);
564
565
  
566
}}}
567
568
#endif /* defined(_data_Stream) */