Coverage Report

Created: 2025-12-08 06:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/combine-4.6.7/src/stream/buffered.rs
Line
Count
Source
1
use alloc::collections::VecDeque;
2
3
use crate::{
4
    error::StreamError,
5
    stream::{ParseError, Positioned, ResetStream, StreamErrorFor, StreamOnce},
6
};
7
8
/// `Stream` which buffers items from an instance of `StreamOnce` into a ring buffer.
9
/// Instances of `StreamOnce` which is not able to implement `ResetStream` (such as `ReadStream`) may
10
/// use this as a way to implement `ResetStream` and become a full `Stream` instance.
11
///
12
/// The drawback is that the buffer only stores a limited number of items which limits how many
13
/// tokens that can be reset and replayed. If a `buffered::Stream` is reset past this limit an error
14
/// will be returned when `uncons` is next called.
15
///
16
/// NOTE: If this stream is used in conjunction with an error enhancing stream such as
17
/// `easy::Stream` (also via the `easy_parser` method) it is recommended that the `buffered::Stream`
18
/// instance wraps the `easy::Stream` instance instead of the other way around.
19
///
20
/// ```ignore
21
/// // DO
22
/// buffered::Stream::new(easy::Stream(..), ..)
23
/// // DON'T
24
/// easy::Stream(buffered::Stream::new(.., ..))
25
/// parser.easy_parse(buffered::Stream::new(..));
26
/// ```
27
#[derive(Debug, PartialEq)]
28
pub struct Stream<Input>
29
where
30
    Input: StreamOnce + Positioned,
31
{
32
    offset: usize,
33
    iter: Input,
34
    buffer_offset: usize,
35
    buffer: VecDeque<(Input::Token, Input::Position)>,
36
}
37
38
impl<Input> ResetStream for Stream<Input>
39
where
40
    Input: Positioned,
41
{
42
    type Checkpoint = usize;
43
44
0
    fn checkpoint(&self) -> Self::Checkpoint {
45
0
        self.offset
46
0
    }
47
48
0
    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error> {
49
0
        if checkpoint < self.buffer_offset - self.buffer.len() {
50
            // We have backtracked to far
51
0
            Err(Self::Error::from_error(
52
0
                self.position(),
53
0
                StreamErrorFor::<Self>::message_static_message("Backtracked to far"),
54
0
            ))
55
        } else {
56
0
            self.offset = checkpoint;
57
0
            Ok(())
58
        }
59
0
    }
60
}
61
62
impl<Input> Stream<Input>
63
where
64
    Input: StreamOnce + Positioned,
65
    Input::Position: Clone,
66
    Input::Token: Clone,
67
{
68
    /// Constructs a new `BufferedStream` from a `StreamOnce` instance with a `lookahead`
69
    /// number of elements that can be stored in the buffer.
70
0
    pub fn new(iter: Input, lookahead: usize) -> Stream<Input> {
71
0
        Stream {
72
0
            offset: 0,
73
0
            iter,
74
0
            buffer_offset: 0,
75
0
            buffer: VecDeque::with_capacity(lookahead),
76
0
        }
77
0
    }
78
}
79
80
impl<Input> Positioned for Stream<Input>
81
where
82
    Input: StreamOnce + Positioned,
83
{
84
    #[inline]
85
0
    fn position(&self) -> Self::Position {
86
0
        if self.offset >= self.buffer_offset {
87
0
            self.iter.position()
88
0
        } else if self.offset < self.buffer_offset - self.buffer.len() {
89
0
            self.buffer
90
0
                .front()
91
0
                .expect("At least 1 element in the buffer")
92
0
                .1
93
0
                .clone()
94
        } else {
95
0
            self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
96
0
                .1
97
0
                .clone()
98
        }
99
0
    }
100
}
101
102
impl<Input> StreamOnce for Stream<Input>
103
where
104
    Input: StreamOnce + Positioned,
105
    Input::Token: Clone,
106
{
107
    type Token = Input::Token;
108
    type Range = Input::Range;
109
    type Position = Input::Position;
110
    type Error = Input::Error;
111
112
    #[inline]
113
0
    fn uncons(&mut self) -> Result<Input::Token, StreamErrorFor<Self>> {
114
0
        if self.offset >= self.buffer_offset {
115
0
            let position = self.iter.position();
116
0
            let token = self.iter.uncons()?;
117
0
            self.buffer_offset += 1;
118
            // We want the VecDeque to only keep the last .capacity() elements so we need to remove
119
            // an element if it gets to large
120
0
            if self.buffer.len() == self.buffer.capacity() {
121
0
                self.buffer.pop_front();
122
0
            }
123
0
            self.buffer.push_back((token.clone(), position));
124
0
            self.offset += 1;
125
0
            Ok(token)
126
0
        } else if self.offset < self.buffer_offset - self.buffer.len() {
127
            // We have backtracked to far
128
0
            Err(StreamError::message_static_message("Backtracked to far"))
129
        } else {
130
0
            let value = self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
131
0
                .0
132
0
                .clone();
133
0
            self.offset += 1;
134
0
            Ok(value)
135
        }
136
0
    }
137
138
0
    fn is_partial(&self) -> bool {
139
0
        self.iter.is_partial()
140
0
    }
141
}