/src/gitoxide/gix-packetline/src/blocking_io/read.rs
Line | Count | Source |
1 | | use std::{ |
2 | | io, |
3 | | ops::{Deref, DerefMut}, |
4 | | }; |
5 | | |
6 | | use bstr::ByteSlice; |
7 | | |
8 | | pub use super::sidebands::WithSidebands; |
9 | | use crate::{ |
10 | | decode, |
11 | | read::{ExhaustiveOutcome, ProgressAction, StreamingPeekableIterState}, |
12 | | PacketLineRef, MAX_LINE_LEN, U16_HEX_BYTES, |
13 | | }; |
14 | | |
15 | | /// Read pack lines one after another, without consuming more than needed from the underlying |
16 | | /// [`Read`][std::io::Read]. [`Flush`][PacketLineRef::Flush] lines cause the reader to stop producing lines forever, |
17 | | /// leaving [`Read`][std::io::Read] at the start of whatever comes next. |
18 | | /// |
19 | | /// This implementation tries hard not to allocate at all which leads to quite some added complexity and plenty of extra memory copies. |
20 | | pub struct StreamingPeekableIter<T> { |
21 | | pub(super) state: StreamingPeekableIterState<T>, |
22 | | } |
23 | | |
24 | | /// Non-IO methods |
25 | | impl<T> StreamingPeekableIter<T> |
26 | | where |
27 | | T: io::Read, |
28 | | { |
29 | | /// Return a new instance from `read` which will stop decoding packet lines when receiving one of the given `delimiters`. |
30 | | /// If `trace` is `true`, all packetlines received or sent will be passed to the facilities of the `gix-trace` crate. |
31 | 0 | pub fn new(read: T, delimiters: &'static [PacketLineRef<'static>], trace: bool) -> Self { |
32 | 0 | Self { |
33 | 0 | state: StreamingPeekableIterState::new(read, delimiters, trace), |
34 | 0 | } |
35 | 0 | } Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::new Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::new Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::new |
36 | | |
37 | 0 | fn read_line_inner<'a>(reader: &mut T, buf: &'a mut [u8]) -> io::Result<Result<PacketLineRef<'a>, decode::Error>> { |
38 | 0 | let (hex_bytes, data_bytes) = buf.split_at_mut(4); |
39 | 0 | reader.read_exact(hex_bytes)?; |
40 | 0 | let num_data_bytes = match decode::hex_prefix(hex_bytes) { |
41 | 0 | Ok(decode::PacketLineOrWantedSize::Line(line)) => return Ok(Ok(line)), |
42 | 0 | Ok(decode::PacketLineOrWantedSize::Wanted(additional_bytes)) => additional_bytes as usize, |
43 | 0 | Err(err) => return Ok(Err(err)), |
44 | | }; |
45 | | |
46 | 0 | let (data_bytes, _) = data_bytes.split_at_mut(num_data_bytes); |
47 | 0 | reader.read_exact(data_bytes)?; |
48 | 0 | match decode::to_data_line(data_bytes) { |
49 | 0 | Ok(line) => Ok(Ok(line)), |
50 | 0 | Err(err) => Ok(Err(err)), |
51 | | } |
52 | 0 | } Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::read_line_inner Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::read_line_inner Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::read_line_inner |
53 | | |
54 | | /// This function is needed to help the borrow checker allow us to return references all the time |
55 | | /// It contains a bunch of logic shared between peek and `read_line` invocations. |
56 | 0 | fn read_line_inner_exhaustive<'a>( |
57 | 0 | reader: &mut T, |
58 | 0 | buf: &'a mut Vec<u8>, |
59 | 0 | delimiters: &[PacketLineRef<'static>], |
60 | 0 | fail_on_err_lines: bool, |
61 | 0 | buf_resize: bool, |
62 | 0 | trace: bool, |
63 | 0 | ) -> ExhaustiveOutcome<'a> { |
64 | | ( |
65 | | false, |
66 | 0 | None, |
67 | 0 | Some(match Self::read_line_inner(reader, buf) { |
68 | 0 | Ok(Ok(line)) => { |
69 | 0 | if trace { |
70 | 0 | match line { |
71 | | #[allow(unused_variables)] |
72 | 0 | PacketLineRef::Data(d) => { |
73 | 0 | gix_trace::trace!("<< {}", d.as_bstr().trim().as_bstr()); |
74 | 0 | } |
75 | 0 | PacketLineRef::Flush => { |
76 | 0 | gix_trace::trace!("<< FLUSH"); |
77 | 0 | } |
78 | 0 | PacketLineRef::Delimiter => { |
79 | 0 | gix_trace::trace!("<< DELIM"); |
80 | 0 | } |
81 | 0 | PacketLineRef::ResponseEnd => { |
82 | 0 | gix_trace::trace!("<< RESPONSE_END"); |
83 | 0 | } |
84 | | } |
85 | 0 | } |
86 | 0 | if delimiters.contains(&line) { |
87 | 0 | let stopped_at = delimiters.iter().find(|l| **l == line).copied(); Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::read_line_inner_exhaustive::{closure#0}Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::read_line_inner_exhaustive::{closure#0}Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::read_line_inner_exhaustive::{closure#0} |
88 | 0 | buf.clear(); |
89 | 0 | return (true, stopped_at, None); |
90 | 0 | } else if fail_on_err_lines { |
91 | 0 | if let Some(err) = line.check_error() { |
92 | 0 | let err = err.0.as_bstr().to_owned(); |
93 | 0 | buf.clear(); |
94 | 0 | return ( |
95 | 0 | true, |
96 | 0 | None, |
97 | 0 | Some(Err(io::Error::other(crate::read::Error { message: err }))), |
98 | 0 | ); |
99 | 0 | } |
100 | 0 | } |
101 | 0 | let len = line.as_slice().map_or(U16_HEX_BYTES, |s| s.len() + U16_HEX_BYTES); Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::read_line_inner_exhaustive::{closure#1}Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::read_line_inner_exhaustive::{closure#1}Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::read_line_inner_exhaustive::{closure#1} |
102 | 0 | if buf_resize { |
103 | 0 | buf.resize(len, 0); |
104 | 0 | } |
105 | | // TODO(borrowchk): remove additional decoding of internal buffer which is needed only to make it past borrowchk |
106 | 0 | Ok(Ok(crate::decode(buf).expect("only valid data here"))) |
107 | | } |
108 | 0 | Ok(Err(err)) => { |
109 | 0 | buf.clear(); |
110 | 0 | Ok(Err(err)) |
111 | | } |
112 | 0 | Err(err) => { |
113 | 0 | buf.clear(); |
114 | 0 | Err(err) |
115 | | } |
116 | | }), |
117 | | ) |
118 | 0 | } Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::read_line_inner_exhaustive Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::read_line_inner_exhaustive Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::read_line_inner_exhaustive |
119 | | |
120 | | /// Read a packet line into the internal buffer and return it. |
121 | | /// |
122 | | /// Returns `None` if the end of iteration is reached because of one of the following: |
123 | | /// |
124 | | /// * natural EOF |
125 | | /// * ERR packet line encountered if [`fail_on_err_lines()`](StreamingPeekableIterState::fail_on_err_lines()) is true. |
126 | | /// * A `delimiter` packet line encountered |
127 | 0 | pub fn read_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> { |
128 | 0 | let state = &mut self.state; |
129 | 0 | if state.is_done { |
130 | 0 | return None; |
131 | 0 | } |
132 | 0 | if !state.peek_buf.is_empty() { |
133 | 0 | std::mem::swap(&mut state.peek_buf, &mut state.buf); |
134 | 0 | state.peek_buf.clear(); |
135 | 0 | Some(Ok(Ok(crate::decode(&state.buf).expect("only valid data in peek buf")))) |
136 | | } else { |
137 | 0 | if state.buf.len() != MAX_LINE_LEN { |
138 | 0 | state.buf.resize(MAX_LINE_LEN, 0); |
139 | 0 | } |
140 | 0 | let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive( |
141 | 0 | &mut state.read, |
142 | 0 | &mut state.buf, |
143 | 0 | state.delimiters, |
144 | 0 | state.fail_on_err_lines, |
145 | 0 | false, |
146 | 0 | state.trace, |
147 | 0 | ); |
148 | 0 | state.is_done = is_done; |
149 | 0 | state.stopped_at = stopped_at; |
150 | 0 | res |
151 | | } |
152 | 0 | } Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::read_line Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::read_line Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::read_line |
153 | | |
154 | | /// Peek the next packet line without consuming it. Returns `None` if a stop-packet or an error |
155 | | /// was encountered. |
156 | | /// |
157 | | /// Multiple calls to peek will return the same packet line, if there is one. |
158 | 0 | pub fn peek_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> { |
159 | 0 | let state = &mut self.state; |
160 | 0 | if state.is_done { |
161 | 0 | return None; |
162 | 0 | } |
163 | 0 | if state.peek_buf.is_empty() { |
164 | 0 | state.peek_buf.resize(MAX_LINE_LEN, 0); |
165 | 0 | let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive( |
166 | 0 | &mut state.read, |
167 | 0 | &mut state.peek_buf, |
168 | 0 | state.delimiters, |
169 | 0 | state.fail_on_err_lines, |
170 | 0 | true, |
171 | 0 | state.trace, |
172 | 0 | ); |
173 | 0 | state.is_done = is_done; |
174 | 0 | state.stopped_at = stopped_at; |
175 | 0 | res |
176 | | } else { |
177 | 0 | Some(Ok(Ok(crate::decode(&state.peek_buf).expect("only valid data here")))) |
178 | | } |
179 | 0 | } |
180 | | |
181 | | /// Return this instance as implementor of [`Read`](io::Read) assuming side bands to be used in all received packet lines. |
182 | | /// Each invocation of [`read_line()`](io::BufRead::read_line()) returns a packet line. |
183 | | /// |
184 | | /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool` |
185 | | /// being true in case the `text` is to be interpreted as error. |
186 | | /// |
187 | | /// _Please note_ that side bands need to be negotiated with the server. |
188 | 0 | pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>( |
189 | 0 | &mut self, |
190 | 0 | handle_progress: F, |
191 | 0 | ) -> WithSidebands<'_, T, F> { |
192 | 0 | WithSidebands::with_progress_handler(self, handle_progress) |
193 | 0 | } |
194 | | |
195 | | /// Same as [`as_read_with_sidebands(…)`](StreamingPeekableIter::as_read_with_sidebands()), but for channels without side band support. |
196 | | /// |
197 | | /// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator. |
198 | | /// Use [`as_read()`][StreamingPeekableIter::as_read()]. |
199 | 0 | pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>(&mut self) -> WithSidebands<'_, T, F> { |
200 | 0 | WithSidebands::without_progress_handler(self) |
201 | 0 | } |
202 | | |
203 | | /// Same as [`as_read_with_sidebands(…)`](StreamingPeekableIter::as_read_with_sidebands()), but for channels without side band support. |
204 | | /// |
205 | | /// Due to the preconfigured function type this method can be called without 'turbofish'. |
206 | | #[allow(clippy::type_complexity)] |
207 | 0 | pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> { |
208 | 0 | WithSidebands::new(self) |
209 | 0 | } Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::as_read Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::as_read Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout>>::as_read Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock>>::as_read Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_>>::as_read |
210 | | } |
211 | | |
212 | | impl<T> StreamingPeekableIter<T> { |
213 | | /// Return the inner read |
214 | 0 | pub fn into_inner(self) -> T { |
215 | 0 | self.state.read |
216 | 0 | } |
217 | | } |
218 | | |
219 | | impl<T> Deref for StreamingPeekableIter<T> { |
220 | | type Target = StreamingPeekableIterState<T>; |
221 | 0 | fn deref(&self) -> &Self::Target { |
222 | 0 | &self.state |
223 | 0 | } Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout> as core::ops::deref::Deref>::deref Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock> as core::ops::deref::Deref>::deref Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_> as core::ops::deref::Deref>::deref |
224 | | } |
225 | | |
226 | | impl<T> DerefMut for StreamingPeekableIter<T> { |
227 | 0 | fn deref_mut(&mut self) -> &mut Self::Target { |
228 | 0 | &mut self.state |
229 | 0 | } Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout> as core::ops::deref::DerefMut>::deref_mut Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout> as core::ops::deref::DerefMut>::deref_mut Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::process::ChildStdout> as core::ops::deref::DerefMut>::deref_mut Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<std::io::stdio::StdinLock> as core::ops::deref::DerefMut>::deref_mut Unexecuted instantiation: <gix_packetline::blocking_io::read::StreamingPeekableIter<_> as core::ops::deref::DerefMut>::deref_mut |
230 | | } |