Coverage Report

Created: 2025-08-28 06:26

/src/serenity/AK/BufferedStream.h
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2021, sin-ack <sin-ack@protonmail.com>
3
 * Copyright (c) 2022, the SerenityOS developers.
4
 *
5
 * SPDX-License-Identifier: BSD-2-Clause
6
 */
7
8
#pragma once
9
10
#include <AK/CircularBuffer.h>
11
#include <AK/OwnPtr.h>
12
#include <AK/Stream.h>
13
14
namespace AK {
15
16
template<typename T>
17
concept StreamLike = IsBaseOf<Stream, T>;
18
template<typename T>
19
concept SeekableStreamLike = IsBaseOf<SeekableStream, T>;
20
21
template<typename T>
22
class BufferedHelper {
23
    AK_MAKE_NONCOPYABLE(BufferedHelper);
24
    AK_MAKE_DEFAULT_MOVABLE(BufferedHelper);
25
26
public:
27
    template<StreamLike U>
28
    BufferedHelper(Badge<U>, NonnullOwnPtr<T> stream, CircularBuffer buffer)
29
0
        : m_stream(move(stream))
30
0
        , m_buffer(move(buffer))
31
0
    {
32
0
    }
33
34
    template<template<typename> typename BufferedType>
35
    static ErrorOr<NonnullOwnPtr<BufferedType<T>>> create_buffered(NonnullOwnPtr<T> stream, size_t buffer_size)
36
0
    {
37
0
        if (!buffer_size)
38
0
            return Error::from_errno(EINVAL);
39
0
        if (!stream->is_open())
40
0
            return Error::from_errno(ENOTCONN);
41
42
0
        auto buffer = TRY(CircularBuffer::create_empty(buffer_size));
43
44
0
        return adopt_nonnull_own_or_enomem(new BufferedType<T>(move(stream), move(buffer)));
45
0
    }
46
47
0
    T& stream() { return *m_stream; }
48
0
    T const& stream() const { return *m_stream; }
49
50
    ErrorOr<Bytes> read(Bytes buffer)
51
0
    {
52
0
        if (!stream().is_open())
53
0
            return Error::from_errno(ENOTCONN);
54
0
        if (buffer.is_empty())
55
0
            return buffer;
56
57
        // Fill the internal buffer if it has run dry.
58
0
        if (m_buffer.used_space() == 0)
59
0
            TRY(populate_read_buffer());
60
61
        // Let's try to take all we can from the buffer first.
62
0
        return m_buffer.read(buffer);
63
0
    }
64
65
    // Reads into the buffer until \n is encountered.
66
    // The size of the Bytes object is the maximum amount of bytes that will be
67
    // read. Returns the bytes read as a StringView.
68
    ErrorOr<StringView> read_line(Bytes buffer)
69
0
    {
70
0
        return StringView { TRY(read_until(buffer, "\n"sv)) };
71
0
    }
72
73
    ErrorOr<Bytes> read_until(Bytes buffer, StringView candidate)
74
0
    {
75
0
        return read_until_any_of(buffer, Array { candidate });
76
0
    }
77
78
    template<size_t N>
79
    ErrorOr<Bytes> read_until_any_of(Bytes buffer, Array<StringView, N> candidates)
80
0
    {
81
0
        if (!stream().is_open())
82
0
            return Error::from_errno(ENOTCONN);
83
84
0
        if (buffer.is_empty())
85
0
            return buffer;
86
87
0
        auto const candidate = TRY(find_and_populate_until_any_of(candidates, buffer.size()));
88
89
0
        if (stream().is_eof()) {
90
0
            if ((candidate.has_value() && candidate->offset + candidate->size > buffer.size())
91
0
                || (!candidate.has_value() && buffer.size() < m_buffer.used_space())) {
92
                // Normally, reading from an EOFed stream and receiving bytes
93
                // would mean that the stream is no longer EOF. However, it's
94
                // possible with a buffered stream that the user is able to read
95
                // the buffer contents even when the underlying stream is EOF.
96
                // We already violate this invariant once by giving the user the
97
                // chance to read the remaining buffer contents, but if the user
98
                // doesn't give us a big enough buffer, then we would be
99
                // violating the invariant twice the next time the user attempts
100
                // to read, which is No Good. So let's give a descriptive error
101
                // to the caller about why it can't read.
102
0
                return Error::from_errno(EMSGSIZE);
103
0
            }
104
0
        }
105
106
0
        if (candidate.has_value()) {
107
0
            auto const read_bytes = m_buffer.read(buffer.trim(candidate->offset));
108
0
            TRY(m_buffer.discard(candidate->size));
109
0
            return read_bytes;
110
0
        }
111
112
        // If we still haven't found anything, then it's most likely the case
113
        // that the delimiter ends beyond the length of the caller-passed
114
        // buffer. Let's just fill the caller's buffer up.
115
0
        return m_buffer.read(buffer);
116
0
    }
117
118
    ErrorOr<StringView> read_line_with_resize(ByteBuffer& buffer)
119
    {
120
        return StringView { TRY(read_until_with_resize(buffer, "\n"sv)) };
121
    }
122
123
    ErrorOr<Bytes> read_until_with_resize(ByteBuffer& buffer, StringView candidate)
124
    {
125
        return read_until_any_of_with_resize(buffer, Array { candidate });
126
    }
127
128
    template<size_t N>
129
    ErrorOr<Bytes> read_until_any_of_with_resize(ByteBuffer& buffer, Array<StringView, N> candidates)
130
    {
131
        if (!stream().is_open())
132
            return Error::from_errno(ENOTCONN);
133
134
        auto candidate = TRY(find_and_populate_until_any_of(candidates));
135
136
        size_t bytes_read_to_user_buffer = 0;
137
        while (!candidate.has_value()) {
138
            if (m_buffer.used_space() == 0 && stream().is_eof()) {
139
                // If we read to the very end of the buffered and unbuffered data,
140
                // then treat the remainder as a full line (the last one), even if it
141
                // doesn't end in the delimiter.
142
                return buffer.span().trim(bytes_read_to_user_buffer);
143
            }
144
145
            if (buffer.size() - bytes_read_to_user_buffer < m_buffer.used_space()) {
146
                // Resize the user supplied buffer because it cannot fit
147
                // the contents of m_buffer.
148
                TRY(buffer.try_resize(buffer.size() + m_buffer.used_space()));
149
            }
150
151
            // Read bytes into the buffer starting from the offset of how many bytes have previously been read.
152
            bytes_read_to_user_buffer += m_buffer.read(buffer.span().slice(bytes_read_to_user_buffer)).size();
153
            candidate = TRY(find_and_populate_until_any_of(candidates));
154
        }
155
156
        // Once the candidate has been found, read the contents of m_buffer into the buffer,
157
        // offset by how many bytes have already been read in.
158
        TRY(buffer.try_resize(bytes_read_to_user_buffer + candidate->offset));
159
        m_buffer.read(buffer.span().slice(bytes_read_to_user_buffer));
160
        TRY(m_buffer.discard(candidate->size));
161
        return buffer.span();
162
    }
163
164
    struct Match {
165
        size_t offset {};
166
        size_t size {};
167
    };
168
169
    template<size_t N>
170
    ErrorOr<Optional<Match>> find_and_populate_until_any_of(Array<StringView, N> const& candidates, Optional<size_t> max_offset = {})
171
0
    {
172
0
        Optional<size_t> longest_candidate;
173
0
        for (auto& candidate : candidates) {
174
0
            if (candidate.length() >= longest_candidate.value_or(candidate.length()))
175
0
                longest_candidate = candidate.length();
176
0
        }
177
178
        // The intention here is to try to match all the possible
179
        // delimiter candidates and try to find the longest one we can
180
        // remove from the buffer after copying up to the delimiter to the
181
        // user buffer.
182
183
0
        auto const find_candidates = [this, &candidates, &longest_candidate](size_t min_offset, Optional<size_t> max_offset = {}) -> Optional<Match> {
184
0
            auto const corrected_minimum_offset = *longest_candidate > min_offset ? 0 : min_offset - *longest_candidate;
185
0
            max_offset = max_offset.value_or(m_buffer.used_space());
186
187
0
            Optional<size_t> longest_match;
188
0
            size_t match_size = 0;
189
0
            for (auto& candidate : candidates) {
190
0
                auto const result = m_buffer.offset_of(candidate, corrected_minimum_offset, *max_offset);
191
192
0
                if (result.has_value()) {
193
0
                    auto previous_match = longest_match.value_or(*result);
194
0
                    if ((previous_match < *result) || (previous_match == *result && match_size < candidate.length())) {
195
0
                        longest_match = result;
196
0
                        match_size = candidate.length();
197
0
                    }
198
0
                }
199
0
            }
200
201
0
            if (longest_match.has_value())
202
0
                return Match { *longest_match, match_size };
203
204
0
            return {};
205
0
        };
206
207
0
        if (auto first_find = find_candidates(0, max_offset); first_find.has_value())
208
0
            return first_find;
209
210
0
        auto last_size = m_buffer.used_space();
211
212
0
        while (m_buffer.used_space() < max_offset.value_or(m_buffer.capacity())) {
213
0
            auto const read_bytes = TRY(populate_read_buffer());
214
0
            if (read_bytes == 0)
215
0
                break;
216
217
0
            if (auto first_find = find_candidates(last_size, max_offset); first_find.has_value())
218
0
                return first_find;
219
0
            last_size = m_buffer.used_space();
220
0
        }
221
222
0
        return Optional<Match> {};
223
0
    }
224
225
    // Populates the buffer, and returns whether it is possible to read up to the given delimiter.
226
    ErrorOr<bool> can_read_up_to_delimiter(ReadonlyBytes delimiter)
227
0
    {
228
0
        if (stream().is_eof())
229
0
            return m_buffer.offset_of(delimiter).has_value();
230
231
0
        auto maybe_match = TRY(find_and_populate_until_any_of(Array { StringView { delimiter } }));
232
0
        if (maybe_match.has_value())
233
0
            return true;
234
235
0
        return stream().is_eof() && m_buffer.offset_of(delimiter).has_value();
236
0
    }
237
238
    bool is_eof_with_data_left_over() const
239
0
    {
240
0
        return stream().is_eof() && m_buffer.used_space() > 0;
241
0
    }
242
243
    bool is_eof() const
244
0
    {
245
0
        if (m_buffer.used_space() > 0) {
246
0
            return false;
247
0
        }
248
249
0
        return stream().is_eof();
250
0
    }
251
252
    size_t buffer_size() const
253
    {
254
        return m_buffer.capacity();
255
    }
256
257
    size_t buffered_data_size() const
258
0
    {
259
0
        return m_buffer.used_space();
260
0
    }
261
262
    void clear_buffer()
263
0
    {
264
0
        m_buffer.clear();
265
0
    }
266
267
    ErrorOr<void> discard_bytes(size_t count)
268
0
    {
269
0
        return m_buffer.discard(count);
270
0
    }
271
272
private:
273
    ErrorOr<size_t> populate_read_buffer()
274
0
    {
275
0
        if (m_buffer.empty_space() == 0)
276
0
            return 0;
277
278
0
        size_t nread = 0;
279
280
0
        while (true) {
281
0
            auto result = m_buffer.fill_from_stream(stream());
282
283
0
            if (result.is_error()) {
284
0
                if (!result.error().is_errno())
285
0
                    return result.release_error();
286
0
                if (result.error().code() == EINTR)
287
0
                    continue;
288
0
                if (result.error().code() == EAGAIN)
289
0
                    break;
290
0
                return result.release_error();
291
0
            }
292
293
0
            nread += result.value();
294
0
            break;
295
0
        }
296
297
0
        return nread;
298
0
    }
299
300
    NonnullOwnPtr<T> m_stream;
301
    CircularBuffer m_buffer;
302
};
303
304
// NOTE: A Buffered which accepts any Stream could be added here, but it is not
305
//       needed at the moment.
306
307
template<SeekableStreamLike T>
308
class InputBufferedSeekable final : public SeekableStream {
309
    friend BufferedHelper<T>;
310
311
public:
312
    static ErrorOr<NonnullOwnPtr<InputBufferedSeekable<T>>> create(NonnullOwnPtr<T> stream, size_t buffer_size = 16384)
313
0
    {
314
0
        return BufferedHelper<T>::template create_buffered<InputBufferedSeekable>(move(stream), buffer_size);
315
0
    }
316
317
    InputBufferedSeekable(InputBufferedSeekable&& other) = default;
318
    InputBufferedSeekable& operator=(InputBufferedSeekable&& other) = default;
319
320
0
    virtual ErrorOr<Bytes> read_some(Bytes buffer) override { return m_helper.read(buffer); }
321
0
    virtual ErrorOr<size_t> write_some(ReadonlyBytes buffer) override { return m_helper.stream().write_some(buffer); }
322
0
    virtual bool is_eof() const override { return m_helper.is_eof(); }
323
0
    virtual bool is_open() const override { return m_helper.stream().is_open(); }
324
0
    virtual void close() override { m_helper.stream().close(); }
325
    virtual ErrorOr<size_t> seek(i64 offset, SeekMode mode) override
326
0
    {
327
0
        if (mode == SeekMode::FromCurrentPosition) {
328
            // If possible, seek using the buffer alone.
329
0
            if (0 <= offset && static_cast<u64>(offset) <= m_helper.buffered_data_size()) {
330
0
                MUST(m_helper.discard_bytes(offset));
331
0
                return TRY(m_helper.stream().tell()) - m_helper.buffered_data_size();
332
0
            }
333
334
0
            offset = offset - m_helper.buffered_data_size();
335
0
        }
336
337
0
        auto result = TRY(m_helper.stream().seek(offset, mode));
338
0
        m_helper.clear_buffer();
339
340
0
        return result;
341
0
    }
342
    virtual ErrorOr<void> truncate(size_t length) override
343
0
    {
344
0
        return m_helper.stream().truncate(length);
345
0
    }
346
347
0
    ErrorOr<StringView> read_line(Bytes buffer) { return m_helper.read_line(buffer); }
348
    ErrorOr<bool> can_read_line()
349
0
    {
350
0
        return TRY(m_helper.can_read_up_to_delimiter("\n"sv.bytes())) || m_helper.is_eof_with_data_left_over();
351
0
    }
352
    ErrorOr<Bytes> read_until(Bytes buffer, StringView candidate) { return m_helper.read_until(buffer, candidate); }
353
    template<size_t N>
354
    ErrorOr<Bytes> read_until_any_of(Bytes buffer, Array<StringView, N> candidates) { return m_helper.read_until_any_of(buffer, move(candidates)); }
355
    ErrorOr<bool> can_read_up_to_delimiter(ReadonlyBytes delimiter) { return m_helper.can_read_up_to_delimiter(delimiter); }
356
357
    // Methods for reading stream into an auto-adjusting buffer
358
    ErrorOr<StringView> read_line_with_resize(ByteBuffer& buffer) { return m_helper.read_line_with_resize(buffer); }
359
    ErrorOr<Bytes> read_until_with_resize(ByteBuffer& buffer, StringView candidate) { return m_helper.read_until_with_resize(buffer, candidate); }
360
    template<size_t N>
361
    ErrorOr<Bytes> read_until_any_of_with_resize(ByteBuffer& buffer, Array<StringView, N> candidates) { return m_helper.read_until_any_of_with_resize(buffer, move(candidates)); }
362
363
    size_t buffer_size() const { return m_helper.buffer_size(); }
364
365
0
    virtual ~InputBufferedSeekable() override = default;
366
367
private:
368
    InputBufferedSeekable(NonnullOwnPtr<T> stream, CircularBuffer buffer)
369
0
        : m_helper(Badge<InputBufferedSeekable<T>> {}, move(stream), move(buffer))
370
0
    {
371
0
    }
372
373
    BufferedHelper<T> m_helper;
374
};
375
376
template<SeekableStreamLike T>
377
class OutputBufferedSeekable : public SeekableStream {
378
public:
379
    static ErrorOr<NonnullOwnPtr<OutputBufferedSeekable<T>>> create(NonnullOwnPtr<T> stream, size_t buffer_size = 16 * KiB)
380
    {
381
        if (buffer_size == 0)
382
            return Error::from_errno(EINVAL);
383
        if (!stream->is_open())
384
            return Error::from_errno(ENOTCONN);
385
386
        auto buffer = TRY(CircularBuffer::create_empty(buffer_size));
387
388
        return adopt_nonnull_own_or_enomem(new OutputBufferedSeekable<T>(move(stream), move(buffer)));
389
    }
390
391
    OutputBufferedSeekable(OutputBufferedSeekable&& other) = default;
392
    OutputBufferedSeekable& operator=(OutputBufferedSeekable&& other) = default;
393
394
    virtual ErrorOr<Bytes> read_some(Bytes buffer) override
395
    {
396
        TRY(flush_buffer());
397
        return m_stream->read_some(buffer);
398
    }
399
400
    virtual ErrorOr<size_t> write_some(ReadonlyBytes buffer) override
401
    {
402
        if (!m_stream->is_open())
403
            return Error::from_errno(ENOTCONN);
404
405
        auto const written = m_buffer.write(buffer);
406
407
        if (m_buffer.empty_space() == 0)
408
            TRY(m_buffer.flush_to_stream(*m_stream));
409
410
        return written;
411
    }
412
413
    virtual bool is_eof() const override
414
    {
415
        MUST(flush_buffer());
416
        return m_stream->is_eof();
417
    }
418
419
    virtual bool is_open() const override { return m_stream->is_open(); }
420
421
    virtual void close() override
422
    {
423
        MUST(flush_buffer());
424
        m_stream->close();
425
    }
426
427
    ErrorOr<void> flush_buffer() const
428
    {
429
        while (m_buffer.used_space() > 0)
430
            TRY(m_buffer.flush_to_stream(*m_stream));
431
        return {};
432
    }
433
434
    // Since tell() doesn't involve moving the write offset, we can skip flushing the buffer here.
435
    virtual ErrorOr<size_t> tell() const override
436
    {
437
        return TRY(m_stream->tell()) + m_buffer.used_space();
438
    }
439
440
    virtual ErrorOr<size_t> seek(i64 offset, SeekMode mode) override
441
    {
442
        TRY(flush_buffer());
443
        return m_stream->seek(offset, mode);
444
    }
445
446
    virtual ErrorOr<void> truncate(size_t length) override
447
    {
448
        TRY(flush_buffer());
449
        return m_stream->truncate(length);
450
    }
451
452
    virtual ~OutputBufferedSeekable() override
453
    {
454
        MUST(flush_buffer());
455
    }
456
457
private:
458
    OutputBufferedSeekable(NonnullOwnPtr<T> stream, CircularBuffer buffer)
459
        : m_stream(move(stream))
460
        , m_buffer(move(buffer))
461
    {
462
    }
463
464
    mutable NonnullOwnPtr<T> m_stream;
465
    mutable CircularBuffer m_buffer;
466
};
467
468
}
469
470
#if USING_AK_GLOBALLY
471
using AK::BufferedHelper;
472
using AK::InputBufferedSeekable;
473
using AK::OutputBufferedSeekable;
474
#endif