/src/serenity/AK/BufferedStream.h
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2021, sin-ack <sin-ack@protonmail.com> |
3 | | * Copyright (c) 2022, the SerenityOS developers. |
4 | | * |
5 | | * SPDX-License-Identifier: BSD-2-Clause |
6 | | */ |
7 | | |
8 | | #pragma once |
9 | | |
10 | | #include <AK/CircularBuffer.h> |
11 | | #include <AK/OwnPtr.h> |
12 | | #include <AK/Stream.h> |
13 | | |
14 | | namespace AK { |
15 | | |
16 | | template<typename T> |
17 | | concept StreamLike = IsBaseOf<Stream, T>; |
18 | | template<typename T> |
19 | | concept SeekableStreamLike = IsBaseOf<SeekableStream, T>; |
20 | | |
21 | | template<typename T> |
22 | | class BufferedHelper { |
23 | | AK_MAKE_NONCOPYABLE(BufferedHelper); |
24 | | AK_MAKE_DEFAULT_MOVABLE(BufferedHelper); |
25 | | |
26 | | public: |
27 | | template<StreamLike U> |
28 | | BufferedHelper(Badge<U>, NonnullOwnPtr<T> stream, CircularBuffer buffer) |
29 | 0 | : m_stream(move(stream)) |
30 | 0 | , m_buffer(move(buffer)) |
31 | 0 | { |
32 | 0 | } |
33 | | |
34 | | template<template<typename> typename BufferedType> |
35 | | static ErrorOr<NonnullOwnPtr<BufferedType<T>>> create_buffered(NonnullOwnPtr<T> stream, size_t buffer_size) |
36 | 0 | { |
37 | 0 | if (!buffer_size) |
38 | 0 | return Error::from_errno(EINVAL); |
39 | 0 | if (!stream->is_open()) |
40 | 0 | return Error::from_errno(ENOTCONN); |
41 | | |
42 | 0 | auto buffer = TRY(CircularBuffer::create_empty(buffer_size)); |
43 | | |
44 | 0 | return adopt_nonnull_own_or_enomem(new BufferedType<T>(move(stream), move(buffer))); |
45 | 0 | } |
46 | | |
47 | 0 | T& stream() { return *m_stream; } |
48 | 0 | T const& stream() const { return *m_stream; } |
49 | | |
50 | | ErrorOr<Bytes> read(Bytes buffer) |
51 | 0 | { |
52 | 0 | if (!stream().is_open()) |
53 | 0 | return Error::from_errno(ENOTCONN); |
54 | 0 | if (buffer.is_empty()) |
55 | 0 | return buffer; |
56 | | |
57 | | // Fill the internal buffer if it has run dry. |
58 | 0 | if (m_buffer.used_space() == 0) |
59 | 0 | TRY(populate_read_buffer()); |
60 | | |
61 | | // Let's try to take all we can from the buffer first. |
62 | 0 | return m_buffer.read(buffer); |
63 | 0 | } |
64 | | |
65 | | // Reads into the buffer until \n is encountered. |
66 | | // The size of the Bytes object is the maximum amount of bytes that will be |
67 | | // read. Returns the bytes read as a StringView. |
68 | | ErrorOr<StringView> read_line(Bytes buffer) |
69 | 0 | { |
70 | 0 | return StringView { TRY(read_until(buffer, "\n"sv)) }; |
71 | 0 | } |
72 | | |
73 | | ErrorOr<Bytes> read_until(Bytes buffer, StringView candidate) |
74 | 0 | { |
75 | 0 | return read_until_any_of(buffer, Array { candidate }); |
76 | 0 | } |
77 | | |
78 | | template<size_t N> |
79 | | ErrorOr<Bytes> read_until_any_of(Bytes buffer, Array<StringView, N> candidates) |
80 | 0 | { |
81 | 0 | if (!stream().is_open()) |
82 | 0 | return Error::from_errno(ENOTCONN); |
83 | | |
84 | 0 | if (buffer.is_empty()) |
85 | 0 | return buffer; |
86 | | |
87 | 0 | auto const candidate = TRY(find_and_populate_until_any_of(candidates, buffer.size())); |
88 | | |
89 | 0 | if (stream().is_eof()) { |
90 | 0 | if ((candidate.has_value() && candidate->offset + candidate->size > buffer.size()) |
91 | 0 | || (!candidate.has_value() && buffer.size() < m_buffer.used_space())) { |
92 | | // Normally, reading from an EOFed stream and receiving bytes |
93 | | // would mean that the stream is no longer EOF. However, it's |
94 | | // possible with a buffered stream that the user is able to read |
95 | | // the buffer contents even when the underlying stream is EOF. |
96 | | // We already violate this invariant once by giving the user the |
97 | | // chance to read the remaining buffer contents, but if the user |
98 | | // doesn't give us a big enough buffer, then we would be |
99 | | // violating the invariant twice the next time the user attempts |
100 | | // to read, which is No Good. So let's give a descriptive error |
101 | | // to the caller about why it can't read. |
102 | 0 | return Error::from_errno(EMSGSIZE); |
103 | 0 | } |
104 | 0 | } |
105 | | |
106 | 0 | if (candidate.has_value()) { |
107 | 0 | auto const read_bytes = m_buffer.read(buffer.trim(candidate->offset)); |
108 | 0 | TRY(m_buffer.discard(candidate->size)); |
109 | 0 | return read_bytes; |
110 | 0 | } |
111 | | |
112 | | // If we still haven't found anything, then it's most likely the case |
113 | | // that the delimiter ends beyond the length of the caller-passed |
114 | | // buffer. Let's just fill the caller's buffer up. |
115 | 0 | return m_buffer.read(buffer); |
116 | 0 | } |
117 | | |
118 | | ErrorOr<StringView> read_line_with_resize(ByteBuffer& buffer) |
119 | | { |
120 | | return StringView { TRY(read_until_with_resize(buffer, "\n"sv)) }; |
121 | | } |
122 | | |
123 | | ErrorOr<Bytes> read_until_with_resize(ByteBuffer& buffer, StringView candidate) |
124 | | { |
125 | | return read_until_any_of_with_resize(buffer, Array { candidate }); |
126 | | } |
127 | | |
128 | | template<size_t N> |
129 | | ErrorOr<Bytes> read_until_any_of_with_resize(ByteBuffer& buffer, Array<StringView, N> candidates) |
130 | | { |
131 | | if (!stream().is_open()) |
132 | | return Error::from_errno(ENOTCONN); |
133 | | |
134 | | auto candidate = TRY(find_and_populate_until_any_of(candidates)); |
135 | | |
136 | | size_t bytes_read_to_user_buffer = 0; |
137 | | while (!candidate.has_value()) { |
138 | | if (m_buffer.used_space() == 0 && stream().is_eof()) { |
139 | | // If we read to the very end of the buffered and unbuffered data, |
140 | | // then treat the remainder as a full line (the last one), even if it |
141 | | // doesn't end in the delimiter. |
142 | | return buffer.span().trim(bytes_read_to_user_buffer); |
143 | | } |
144 | | |
145 | | if (buffer.size() - bytes_read_to_user_buffer < m_buffer.used_space()) { |
146 | | // Resize the user supplied buffer because it cannot fit |
147 | | // the contents of m_buffer. |
148 | | TRY(buffer.try_resize(buffer.size() + m_buffer.used_space())); |
149 | | } |
150 | | |
151 | | // Read bytes into the buffer starting from the offset of how many bytes have previously been read. |
152 | | bytes_read_to_user_buffer += m_buffer.read(buffer.span().slice(bytes_read_to_user_buffer)).size(); |
153 | | candidate = TRY(find_and_populate_until_any_of(candidates)); |
154 | | } |
155 | | |
156 | | // Once the candidate has been found, read the contents of m_buffer into the buffer, |
157 | | // offset by how many bytes have already been read in. |
158 | | TRY(buffer.try_resize(bytes_read_to_user_buffer + candidate->offset)); |
159 | | m_buffer.read(buffer.span().slice(bytes_read_to_user_buffer)); |
160 | | TRY(m_buffer.discard(candidate->size)); |
161 | | return buffer.span(); |
162 | | } |
163 | | |
164 | | struct Match { |
165 | | size_t offset {}; |
166 | | size_t size {}; |
167 | | }; |
168 | | |
169 | | template<size_t N> |
170 | | ErrorOr<Optional<Match>> find_and_populate_until_any_of(Array<StringView, N> const& candidates, Optional<size_t> max_offset = {}) |
171 | 0 | { |
172 | 0 | Optional<size_t> longest_candidate; |
173 | 0 | for (auto& candidate : candidates) { |
174 | 0 | if (candidate.length() >= longest_candidate.value_or(candidate.length())) |
175 | 0 | longest_candidate = candidate.length(); |
176 | 0 | } |
177 | | |
178 | | // The intention here is to try to match all the possible |
179 | | // delimiter candidates and try to find the longest one we can |
180 | | // remove from the buffer after copying up to the delimiter to the |
181 | | // user buffer. |
182 | |
|
183 | 0 | auto const find_candidates = [this, &candidates, &longest_candidate](size_t min_offset, Optional<size_t> max_offset = {}) -> Optional<Match> { |
184 | 0 | auto const corrected_minimum_offset = *longest_candidate > min_offset ? 0 : min_offset - *longest_candidate; |
185 | 0 | max_offset = max_offset.value_or(m_buffer.used_space()); |
186 | |
|
187 | 0 | Optional<size_t> longest_match; |
188 | 0 | size_t match_size = 0; |
189 | 0 | for (auto& candidate : candidates) { |
190 | 0 | auto const result = m_buffer.offset_of(candidate, corrected_minimum_offset, *max_offset); |
191 | |
|
192 | 0 | if (result.has_value()) { |
193 | 0 | auto previous_match = longest_match.value_or(*result); |
194 | 0 | if ((previous_match < *result) || (previous_match == *result && match_size < candidate.length())) { |
195 | 0 | longest_match = result; |
196 | 0 | match_size = candidate.length(); |
197 | 0 | } |
198 | 0 | } |
199 | 0 | } |
200 | |
|
201 | 0 | if (longest_match.has_value()) |
202 | 0 | return Match { *longest_match, match_size }; |
203 | | |
204 | 0 | return {}; |
205 | 0 | }; |
206 | |
|
207 | 0 | if (auto first_find = find_candidates(0, max_offset); first_find.has_value()) |
208 | 0 | return first_find; |
209 | | |
210 | 0 | auto last_size = m_buffer.used_space(); |
211 | |
|
212 | 0 | while (m_buffer.used_space() < max_offset.value_or(m_buffer.capacity())) { |
213 | 0 | auto const read_bytes = TRY(populate_read_buffer()); |
214 | 0 | if (read_bytes == 0) |
215 | 0 | break; |
216 | | |
217 | 0 | if (auto first_find = find_candidates(last_size, max_offset); first_find.has_value()) |
218 | 0 | return first_find; |
219 | 0 | last_size = m_buffer.used_space(); |
220 | 0 | } |
221 | | |
222 | 0 | return Optional<Match> {}; |
223 | 0 | } |
224 | | |
225 | | // Populates the buffer, and returns whether it is possible to read up to the given delimiter. |
226 | | ErrorOr<bool> can_read_up_to_delimiter(ReadonlyBytes delimiter) |
227 | 0 | { |
228 | 0 | if (stream().is_eof()) |
229 | 0 | return m_buffer.offset_of(delimiter).has_value(); |
230 | | |
231 | 0 | auto maybe_match = TRY(find_and_populate_until_any_of(Array { StringView { delimiter } })); |
232 | 0 | if (maybe_match.has_value()) |
233 | 0 | return true; |
234 | | |
235 | 0 | return stream().is_eof() && m_buffer.offset_of(delimiter).has_value(); |
236 | 0 | } |
237 | | |
238 | | bool is_eof_with_data_left_over() const |
239 | 0 | { |
240 | 0 | return stream().is_eof() && m_buffer.used_space() > 0; |
241 | 0 | } |
242 | | |
243 | | bool is_eof() const |
244 | 0 | { |
245 | 0 | if (m_buffer.used_space() > 0) { |
246 | 0 | return false; |
247 | 0 | } |
248 | | |
249 | 0 | return stream().is_eof(); |
250 | 0 | } |
251 | | |
252 | | size_t buffer_size() const |
253 | | { |
254 | | return m_buffer.capacity(); |
255 | | } |
256 | | |
257 | | size_t buffered_data_size() const |
258 | 0 | { |
259 | 0 | return m_buffer.used_space(); |
260 | 0 | } |
261 | | |
262 | | void clear_buffer() |
263 | 0 | { |
264 | 0 | m_buffer.clear(); |
265 | 0 | } |
266 | | |
267 | | ErrorOr<void> discard_bytes(size_t count) |
268 | 0 | { |
269 | 0 | return m_buffer.discard(count); |
270 | 0 | } |
271 | | |
272 | | private: |
273 | | ErrorOr<size_t> populate_read_buffer() |
274 | 0 | { |
275 | 0 | if (m_buffer.empty_space() == 0) |
276 | 0 | return 0; |
277 | | |
278 | 0 | size_t nread = 0; |
279 | |
|
280 | 0 | while (true) { |
281 | 0 | auto result = m_buffer.fill_from_stream(stream()); |
282 | |
|
283 | 0 | if (result.is_error()) { |
284 | 0 | if (!result.error().is_errno()) |
285 | 0 | return result.release_error(); |
286 | 0 | if (result.error().code() == EINTR) |
287 | 0 | continue; |
288 | 0 | if (result.error().code() == EAGAIN) |
289 | 0 | break; |
290 | 0 | return result.release_error(); |
291 | 0 | } |
292 | | |
293 | 0 | nread += result.value(); |
294 | 0 | break; |
295 | 0 | } |
296 | | |
297 | 0 | return nread; |
298 | 0 | } |
299 | | |
300 | | NonnullOwnPtr<T> m_stream; |
301 | | CircularBuffer m_buffer; |
302 | | }; |
303 | | |
304 | | // NOTE: A Buffered which accepts any Stream could be added here, but it is not |
305 | | // needed at the moment. |
306 | | |
307 | | template<SeekableStreamLike T> |
308 | | class InputBufferedSeekable final : public SeekableStream { |
309 | | friend BufferedHelper<T>; |
310 | | |
311 | | public: |
312 | | static ErrorOr<NonnullOwnPtr<InputBufferedSeekable<T>>> create(NonnullOwnPtr<T> stream, size_t buffer_size = 16384) |
313 | 0 | { |
314 | 0 | return BufferedHelper<T>::template create_buffered<InputBufferedSeekable>(move(stream), buffer_size); |
315 | 0 | } |
316 | | |
317 | | InputBufferedSeekable(InputBufferedSeekable&& other) = default; |
318 | | InputBufferedSeekable& operator=(InputBufferedSeekable&& other) = default; |
319 | | |
320 | 0 | virtual ErrorOr<Bytes> read_some(Bytes buffer) override { return m_helper.read(buffer); } |
321 | 0 | virtual ErrorOr<size_t> write_some(ReadonlyBytes buffer) override { return m_helper.stream().write_some(buffer); } |
322 | 0 | virtual bool is_eof() const override { return m_helper.is_eof(); } |
323 | 0 | virtual bool is_open() const override { return m_helper.stream().is_open(); } |
324 | 0 | virtual void close() override { m_helper.stream().close(); } |
325 | | virtual ErrorOr<size_t> seek(i64 offset, SeekMode mode) override |
326 | 0 | { |
327 | 0 | if (mode == SeekMode::FromCurrentPosition) { |
328 | | // If possible, seek using the buffer alone. |
329 | 0 | if (0 <= offset && static_cast<u64>(offset) <= m_helper.buffered_data_size()) { |
330 | 0 | MUST(m_helper.discard_bytes(offset)); |
331 | 0 | return TRY(m_helper.stream().tell()) - m_helper.buffered_data_size(); |
332 | 0 | } |
333 | | |
334 | 0 | offset = offset - m_helper.buffered_data_size(); |
335 | 0 | } |
336 | | |
337 | 0 | auto result = TRY(m_helper.stream().seek(offset, mode)); |
338 | 0 | m_helper.clear_buffer(); |
339 | |
|
340 | 0 | return result; |
341 | 0 | } |
342 | | virtual ErrorOr<void> truncate(size_t length) override |
343 | 0 | { |
344 | 0 | return m_helper.stream().truncate(length); |
345 | 0 | } |
346 | | |
347 | 0 | ErrorOr<StringView> read_line(Bytes buffer) { return m_helper.read_line(buffer); } |
348 | | ErrorOr<bool> can_read_line() |
349 | 0 | { |
350 | 0 | return TRY(m_helper.can_read_up_to_delimiter("\n"sv.bytes())) || m_helper.is_eof_with_data_left_over(); |
351 | 0 | } |
352 | | ErrorOr<Bytes> read_until(Bytes buffer, StringView candidate) { return m_helper.read_until(buffer, candidate); } |
353 | | template<size_t N> |
354 | | ErrorOr<Bytes> read_until_any_of(Bytes buffer, Array<StringView, N> candidates) { return m_helper.read_until_any_of(buffer, move(candidates)); } |
355 | | ErrorOr<bool> can_read_up_to_delimiter(ReadonlyBytes delimiter) { return m_helper.can_read_up_to_delimiter(delimiter); } |
356 | | |
357 | | // Methods for reading stream into an auto-adjusting buffer |
358 | | ErrorOr<StringView> read_line_with_resize(ByteBuffer& buffer) { return m_helper.read_line_with_resize(buffer); } |
359 | | ErrorOr<Bytes> read_until_with_resize(ByteBuffer& buffer, StringView candidate) { return m_helper.read_until_with_resize(buffer, candidate); } |
360 | | template<size_t N> |
361 | | ErrorOr<Bytes> read_until_any_of_with_resize(ByteBuffer& buffer, Array<StringView, N> candidates) { return m_helper.read_until_any_of_with_resize(buffer, move(candidates)); } |
362 | | |
363 | | size_t buffer_size() const { return m_helper.buffer_size(); } |
364 | | |
365 | 0 | virtual ~InputBufferedSeekable() override = default; |
366 | | |
367 | | private: |
368 | | InputBufferedSeekable(NonnullOwnPtr<T> stream, CircularBuffer buffer) |
369 | 0 | : m_helper(Badge<InputBufferedSeekable<T>> {}, move(stream), move(buffer)) |
370 | 0 | { |
371 | 0 | } |
372 | | |
373 | | BufferedHelper<T> m_helper; |
374 | | }; |
375 | | |
376 | | template<SeekableStreamLike T> |
377 | | class OutputBufferedSeekable : public SeekableStream { |
378 | | public: |
379 | | static ErrorOr<NonnullOwnPtr<OutputBufferedSeekable<T>>> create(NonnullOwnPtr<T> stream, size_t buffer_size = 16 * KiB) |
380 | | { |
381 | | if (buffer_size == 0) |
382 | | return Error::from_errno(EINVAL); |
383 | | if (!stream->is_open()) |
384 | | return Error::from_errno(ENOTCONN); |
385 | | |
386 | | auto buffer = TRY(CircularBuffer::create_empty(buffer_size)); |
387 | | |
388 | | return adopt_nonnull_own_or_enomem(new OutputBufferedSeekable<T>(move(stream), move(buffer))); |
389 | | } |
390 | | |
391 | | OutputBufferedSeekable(OutputBufferedSeekable&& other) = default; |
392 | | OutputBufferedSeekable& operator=(OutputBufferedSeekable&& other) = default; |
393 | | |
394 | | virtual ErrorOr<Bytes> read_some(Bytes buffer) override |
395 | | { |
396 | | TRY(flush_buffer()); |
397 | | return m_stream->read_some(buffer); |
398 | | } |
399 | | |
400 | | virtual ErrorOr<size_t> write_some(ReadonlyBytes buffer) override |
401 | | { |
402 | | if (!m_stream->is_open()) |
403 | | return Error::from_errno(ENOTCONN); |
404 | | |
405 | | auto const written = m_buffer.write(buffer); |
406 | | |
407 | | if (m_buffer.empty_space() == 0) |
408 | | TRY(m_buffer.flush_to_stream(*m_stream)); |
409 | | |
410 | | return written; |
411 | | } |
412 | | |
413 | | virtual bool is_eof() const override |
414 | | { |
415 | | MUST(flush_buffer()); |
416 | | return m_stream->is_eof(); |
417 | | } |
418 | | |
419 | | virtual bool is_open() const override { return m_stream->is_open(); } |
420 | | |
421 | | virtual void close() override |
422 | | { |
423 | | MUST(flush_buffer()); |
424 | | m_stream->close(); |
425 | | } |
426 | | |
427 | | ErrorOr<void> flush_buffer() const |
428 | | { |
429 | | while (m_buffer.used_space() > 0) |
430 | | TRY(m_buffer.flush_to_stream(*m_stream)); |
431 | | return {}; |
432 | | } |
433 | | |
434 | | // Since tell() doesn't involve moving the write offset, we can skip flushing the buffer here. |
435 | | virtual ErrorOr<size_t> tell() const override |
436 | | { |
437 | | return TRY(m_stream->tell()) + m_buffer.used_space(); |
438 | | } |
439 | | |
440 | | virtual ErrorOr<size_t> seek(i64 offset, SeekMode mode) override |
441 | | { |
442 | | TRY(flush_buffer()); |
443 | | return m_stream->seek(offset, mode); |
444 | | } |
445 | | |
446 | | virtual ErrorOr<void> truncate(size_t length) override |
447 | | { |
448 | | TRY(flush_buffer()); |
449 | | return m_stream->truncate(length); |
450 | | } |
451 | | |
452 | | virtual ~OutputBufferedSeekable() override |
453 | | { |
454 | | MUST(flush_buffer()); |
455 | | } |
456 | | |
457 | | private: |
458 | | OutputBufferedSeekable(NonnullOwnPtr<T> stream, CircularBuffer buffer) |
459 | | : m_stream(move(stream)) |
460 | | , m_buffer(move(buffer)) |
461 | | { |
462 | | } |
463 | | |
464 | | mutable NonnullOwnPtr<T> m_stream; |
465 | | mutable CircularBuffer m_buffer; |
466 | | }; |
467 | | |
468 | | } |
469 | | |
470 | | #if USING_AK_GLOBALLY |
471 | | using AK::BufferedHelper; |
472 | | using AK::InputBufferedSeekable; |
473 | | using AK::OutputBufferedSeekable; |
474 | | #endif |