Coverage Report

Created: 2026-02-14 07:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/gitoxide/gix-packetline/src/blocking_io/read.rs
Line
Count
Source
1
use std::{
2
    io,
3
    ops::{Deref, DerefMut},
4
};
5
6
use bstr::ByteSlice;
7
8
pub use super::sidebands::WithSidebands;
9
use crate::{
10
    decode,
11
    read::{ExhaustiveOutcome, ProgressAction, StreamingPeekableIterState},
12
    PacketLineRef, MAX_LINE_LEN, U16_HEX_BYTES,
13
};
14
15
/// Read pack lines one after another, without consuming more than needed from the underlying
16
/// [`Read`][std::io::Read]. [`Flush`][PacketLineRef::Flush] lines cause the reader to stop producing lines forever,
17
/// leaving [`Read`][std::io::Read] at the start of whatever comes next.
18
///
19
/// This implementation tries hard not to allocate at all which leads to quite some added complexity and plenty of extra memory copies.
20
pub struct StreamingPeekableIter<T> {
21
    pub(super) state: StreamingPeekableIterState<T>,
22
}
23
24
/// Non-IO methods
25
impl<T> StreamingPeekableIter<T>
26
where
27
    T: io::Read,
28
{
29
    /// Return a new instance from `read` which will stop decoding packet lines when receiving one of the given `delimiters`.
30
    /// If `trace` is `true`, all packetlines received or sent will be passed to the facilities of the `gix-trace` crate.
31
0
    pub fn new(read: T, delimiters: &'static [PacketLineRef<'static>], trace: bool) -> Self {
32
0
        Self {
33
0
            state: StreamingPeekableIterState::new(read, delimiters, trace),
34
0
        }
35
0
    }
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::new
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::new
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::new
36
37
0
    fn read_line_inner<'a>(reader: &mut T, buf: &'a mut [u8]) -> io::Result<Result<PacketLineRef<'a>, decode::Error>> {
38
0
        let (hex_bytes, data_bytes) = buf.split_at_mut(4);
39
0
        reader.read_exact(hex_bytes)?;
40
0
        let num_data_bytes = match decode::hex_prefix(hex_bytes) {
41
0
            Ok(decode::PacketLineOrWantedSize::Line(line)) => return Ok(Ok(line)),
42
0
            Ok(decode::PacketLineOrWantedSize::Wanted(additional_bytes)) => additional_bytes as usize,
43
0
            Err(err) => return Ok(Err(err)),
44
        };
45
46
0
        let (data_bytes, _) = data_bytes.split_at_mut(num_data_bytes);
47
0
        reader.read_exact(data_bytes)?;
48
0
        match decode::to_data_line(data_bytes) {
49
0
            Ok(line) => Ok(Ok(line)),
50
0
            Err(err) => Ok(Err(err)),
51
        }
52
0
    }
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::read_line_inner
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::read_line_inner
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::read_line_inner
53
54
    /// This function is needed to help the borrow checker allow us to return references all the time
55
    /// It contains a bunch of logic shared between peek and `read_line` invocations.
56
0
    fn read_line_inner_exhaustive<'a>(
57
0
        reader: &mut T,
58
0
        buf: &'a mut Vec<u8>,
59
0
        delimiters: &[PacketLineRef<'static>],
60
0
        fail_on_err_lines: bool,
61
0
        buf_resize: bool,
62
0
        trace: bool,
63
0
    ) -> ExhaustiveOutcome<'a> {
64
        (
65
            false,
66
0
            None,
67
0
            Some(match Self::read_line_inner(reader, buf) {
68
0
                Ok(Ok(line)) => {
69
0
                    if trace {
70
0
                        match line {
71
                            #[allow(unused_variables)]
72
0
                            PacketLineRef::Data(d) => {
73
0
                                gix_trace::trace!("<< {}", d.as_bstr().trim().as_bstr());
74
0
                            }
75
0
                            PacketLineRef::Flush => {
76
0
                                gix_trace::trace!("<< FLUSH");
77
0
                            }
78
0
                            PacketLineRef::Delimiter => {
79
0
                                gix_trace::trace!("<< DELIM");
80
0
                            }
81
0
                            PacketLineRef::ResponseEnd => {
82
0
                                gix_trace::trace!("<< RESPONSE_END");
83
0
                            }
84
                        }
85
0
                    }
86
0
                    if delimiters.contains(&line) {
87
0
                        let stopped_at = delimiters.iter().find(|l| **l == line).copied();
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::read_line_inner_exhaustive::{closure#0}
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::read_line_inner_exhaustive::{closure#0}
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::read_line_inner_exhaustive::{closure#0}
88
0
                        buf.clear();
89
0
                        return (true, stopped_at, None);
90
0
                    } else if fail_on_err_lines {
91
0
                        if let Some(err) = line.check_error() {
92
0
                            let err = err.0.as_bstr().to_owned();
93
0
                            buf.clear();
94
0
                            return (
95
0
                                true,
96
0
                                None,
97
0
                                Some(Err(io::Error::other(crate::read::Error { message: err }))),
98
0
                            );
99
0
                        }
100
0
                    }
101
0
                    let len = line.as_slice().map_or(U16_HEX_BYTES, |s| s.len() + U16_HEX_BYTES);
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::read_line_inner_exhaustive::{closure#1}
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::read_line_inner_exhaustive::{closure#1}
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::read_line_inner_exhaustive::{closure#1}
102
0
                    if buf_resize {
103
0
                        buf.resize(len, 0);
104
0
                    }
105
                    // TODO(borrowchk): remove additional decoding of internal buffer which is needed only to make it past borrowchk
106
0
                    Ok(Ok(crate::decode(buf).expect("only valid data here")))
107
                }
108
0
                Ok(Err(err)) => {
109
0
                    buf.clear();
110
0
                    Ok(Err(err))
111
                }
112
0
                Err(err) => {
113
0
                    buf.clear();
114
0
                    Err(err)
115
                }
116
            }),
117
        )
118
0
    }
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::read_line_inner_exhaustive
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::read_line_inner_exhaustive
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::read_line_inner_exhaustive
119
120
    /// Read a packet line into the internal buffer and return it.
121
    ///
122
    /// Returns `None` if the end of iteration is reached because of one of the following:
123
    ///
124
    ///  * natural EOF
125
    ///  * ERR packet line encountered if [`fail_on_err_lines()`](StreamingPeekableIterState::fail_on_err_lines()) is true.
126
    ///  * A `delimiter` packet line encountered
127
0
    pub fn read_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> {
128
0
        let state = &mut self.state;
129
0
        if state.is_done {
130
0
            return None;
131
0
        }
132
0
        if !state.peek_buf.is_empty() {
133
0
            std::mem::swap(&mut state.peek_buf, &mut state.buf);
134
0
            state.peek_buf.clear();
135
0
            Some(Ok(Ok(crate::decode(&state.buf).expect("only valid data in peek buf"))))
136
        } else {
137
0
            if state.buf.len() != MAX_LINE_LEN {
138
0
                state.buf.resize(MAX_LINE_LEN, 0);
139
0
            }
140
0
            let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive(
141
0
                &mut state.read,
142
0
                &mut state.buf,
143
0
                state.delimiters,
144
0
                state.fail_on_err_lines,
145
0
                false,
146
0
                state.trace,
147
0
            );
148
0
            state.is_done = is_done;
149
0
            state.stopped_at = stopped_at;
150
0
            res
151
        }
152
0
    }
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::read_line
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::read_line
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::read_line
153
154
    /// Peek the next packet line without consuming it. Returns `None` if a stop-packet or an error
155
    /// was encountered.
156
    ///
157
    /// Multiple calls to peek will return the same packet line, if there is one.
158
0
    pub fn peek_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> {
159
0
        let state = &mut self.state;
160
0
        if state.is_done {
161
0
            return None;
162
0
        }
163
0
        if state.peek_buf.is_empty() {
164
0
            state.peek_buf.resize(MAX_LINE_LEN, 0);
165
0
            let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive(
166
0
                &mut state.read,
167
0
                &mut state.peek_buf,
168
0
                state.delimiters,
169
0
                state.fail_on_err_lines,
170
0
                true,
171
0
                state.trace,
172
0
            );
173
0
            state.is_done = is_done;
174
0
            state.stopped_at = stopped_at;
175
0
            res
176
        } else {
177
0
            Some(Ok(Ok(crate::decode(&state.peek_buf).expect("only valid data here"))))
178
        }
179
0
    }
180
181
    /// Return this instance as implementor of [`Read`](io::Read) assuming side bands to be used in all received packet lines.
182
    /// Each invocation of [`read_line()`](io::BufRead::read_line()) returns a packet line.
183
    ///
184
    /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool`
185
    /// being true in case the `text` is to be interpreted as error.
186
    ///
187
    /// _Please note_ that side bands need to be negotiated with the server.
188
0
    pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>(
189
0
        &mut self,
190
0
        handle_progress: F,
191
0
    ) -> WithSidebands<'_, T, F> {
192
0
        WithSidebands::with_progress_handler(self, handle_progress)
193
0
    }
194
195
    /// Same as [`as_read_with_sidebands(…)`](StreamingPeekableIter::as_read_with_sidebands()), but for channels without side band support.
196
    ///
197
    /// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator.
198
    /// Use [`as_read()`][StreamingPeekableIter::as_read()].
199
0
    pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>(&mut self) -> WithSidebands<'_, T, F> {
200
0
        WithSidebands::without_progress_handler(self)
201
0
    }
202
203
    /// Same as [`as_read_with_sidebands(…)`](StreamingPeekableIter::as_read_with_sidebands()), but for channels without side band support.
204
    ///
205
    /// Due to the preconfigured function type this method can be called without 'turbofish'.
206
    #[allow(clippy::type_complexity)]
207
0
    pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> {
208
0
        WithSidebands::new(self)
209
0
    }
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::as_read
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::as_read
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::as_read
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::as_read
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::as_read
210
}
211
212
impl<T> StreamingPeekableIter<T> {
213
    /// Return the inner read
214
0
    pub fn into_inner(self) -> T {
215
0
        self.state.read
216
0
    }
217
}
218
219
impl<T> Deref for StreamingPeekableIter<T> {
220
    type Target = StreamingPeekableIterState<T>;
221
0
    fn deref(&self) -> &Self::Target {
222
0
        &self.state
223
0
    }
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout> as core::ops::deref::Deref>::deref
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock> as core::ops::deref::Deref>::deref
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_> as core::ops::deref::Deref>::deref
224
}
225
226
impl<T> DerefMut for StreamingPeekableIter<T> {
227
0
    fn deref_mut(&mut self) -> &mut Self::Target {
228
0
        &mut self.state
229
0
    }
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout> as core::ops::deref::DerefMut>::deref_mut
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout> as core::ops::deref::DerefMut>::deref_mut
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout> as core::ops::deref::DerefMut>::deref_mut
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock> as core::ops::deref::DerefMut>::deref_mut
Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_> as core::ops::deref::DerefMut>::deref_mut
230
}