/rust/registry/src/index.crates.io-1949cf8c6b5b557f/ureq-3.3.0/src/run.rs
Line | Count | Source |
1 | | use std::sync::{Arc, LazyLock}; |
2 | | use std::{io, mem}; |
3 | | |
4 | | use http::uri::Scheme; |
5 | | use http::{HeaderValue, Method, Request, Response, Uri, header}; |
6 | | use ureq_proto::BodyMode; |
7 | | use ureq_proto::client::state::{Await100, RecvBody, RecvResponse, Redirect, SendRequest}; |
8 | | use ureq_proto::client::state::{Prepare, SendBody as SendBodyState}; |
9 | | use ureq_proto::client::{Await100Result, RecvBodyResult}; |
10 | | use ureq_proto::client::{RecvResponseResult, SendRequestResult}; |
11 | | |
12 | | use crate::body::ResponseInfo; |
13 | | use crate::config::{Config, DEFAULT_USER_AGENT, RequestLevelConfig}; |
14 | | use crate::http; |
15 | | use crate::pool::Connection; |
16 | | use crate::request::ForceSendBody; |
17 | | use crate::response::{RedirectHistory, ResponseUri}; |
18 | | use crate::timings::{CallTimings, CurrentTime}; |
19 | | use crate::transport::ConnectionDetails; |
20 | | use crate::transport::time::{Duration, Instant}; |
21 | | use crate::util::{DebugRequest, DebugResponse, DebugUri, HeaderMapExt, UriExt}; |
22 | | use crate::{Agent, Body, Error, SendBody, Timeout}; |
23 | | |
24 | | type Call<T> = ureq_proto::client::Call<T>; |
25 | | |
26 | | /// Run a request. |
27 | | /// |
28 | | /// This is the "main loop" of entire ureq. |
29 | 0 | pub(crate) fn run( |
30 | 0 | agent: &Agent, |
31 | 0 | mut request: Request<()>, |
32 | 0 | mut body: SendBody, |
33 | 0 | ) -> Result<Response<Body>, Error> { |
34 | 0 | let mut redirect_count = 0; |
35 | | |
36 | | // Configuration on the request level overrides the agent level. |
37 | 0 | let (config, request_level) = request |
38 | 0 | .extensions_mut() |
39 | 0 | .remove::<RequestLevelConfig>() |
40 | 0 | .map(|rl| (Arc::new(rl.0), true)) |
41 | 0 | .unwrap_or_else(|| (agent.config.clone(), false)); |
42 | | |
43 | 0 | let force_send_body = request.extensions_mut().remove::<ForceSendBody>().is_some(); |
44 | | |
45 | 0 | let mut redirect_history: Option<Vec<Uri>> = |
46 | 0 | config.save_redirect_history().then_some(Vec::new()); |
47 | | |
48 | 0 | let timeouts = config.timeouts(); |
49 | | |
50 | 0 | let mut timings = CallTimings::new(timeouts, CurrentTime::default()); |
51 | | |
52 | 0 | let mut call = Call::new(request)?; |
53 | | |
54 | 0 | if force_send_body { |
55 | 0 | call.force_send_body(); |
56 | 0 | } |
57 | | |
58 | 0 | call.allow_non_standard_methods(config.allow_non_standard_methods()); |
59 | | |
60 | 0 | let (response, handler) = loop { |
61 | 0 | let timeout = timings.next_timeout(Timeout::Global); |
62 | 0 | let timed_out = match timeout.after { |
63 | 0 | Duration::Exact(v) => v.is_zero(), |
64 | 0 | Duration::NotHappening => false, |
65 | | }; |
66 | 0 | if timed_out { |
67 | 0 | return Err(Error::Timeout(Timeout::Global)); |
68 | 0 | } |
69 | | |
70 | 0 | match call_run( |
71 | 0 | agent, |
72 | 0 | &config, |
73 | 0 | request_level, |
74 | 0 | call, |
75 | 0 | &mut body, |
76 | 0 | redirect_count, |
77 | 0 | &mut redirect_history, |
78 | 0 | &mut timings, |
79 | 0 | )? { |
80 | | // Follow redirect |
81 | 0 | CallResult::Redirect(rcall, rtimings) => { |
82 | 0 | redirect_count += 1; |
83 | | |
84 | 0 | call = handle_redirect(rcall, &config)?; |
85 | | |
86 | | // If the new method doesn't need a body, clear it. |
87 | | // This prevents Content-Length/Transfer-Encoding headers from being added |
88 | | // on methods like GET that don't send bodies (e.g., POST->GET redirects). |
89 | 0 | let method = call.method(); |
90 | 0 | if !matches!(method, &Method::POST | &Method::PUT | &Method::PATCH) { |
91 | 0 | body.remove(); |
92 | 0 | } |
93 | | |
94 | 0 | timings = rtimings.new_call(); |
95 | | } |
96 | | |
97 | | // Return response |
98 | 0 | CallResult::Response(response, handler) => break (response, handler), |
99 | | } |
100 | | }; |
101 | | |
102 | 0 | let (mut parts, _) = response.into_parts(); |
103 | | |
104 | 0 | let recv_body_mode = handler |
105 | 0 | .call |
106 | 0 | .as_ref() |
107 | 0 | .map(|f| f.body_mode()) |
108 | 0 | .unwrap_or(BodyMode::NoBody); |
109 | | |
110 | 0 | let info = ResponseInfo::new(&parts.headers, recv_body_mode); |
111 | | |
112 | | // If the body will be decompressed, strip Content-Encoding and Content-Length |
113 | | // from the response headers. The Content-Length no longer matches the |
114 | | // decompressed body size, and Content-Encoding no longer applies since |
115 | | // the body is delivered to the caller already decompressed (RFC 9110 ยง8.7). |
116 | 0 | if info.is_decompressing() { |
117 | 0 | parts.headers.remove(http::header::CONTENT_ENCODING); |
118 | 0 | parts.headers.remove(http::header::CONTENT_LENGTH); |
119 | 0 | } |
120 | | |
121 | 0 | let body = Body::new(handler, info); |
122 | | |
123 | 0 | let response = Response::from_parts(parts, body); |
124 | | |
125 | 0 | let status = response.status(); |
126 | 0 | let is_err = status.is_client_error() || status.is_server_error(); |
127 | | |
128 | 0 | if config.http_status_as_error() && is_err { |
129 | 0 | return Err(Error::StatusCode(status.as_u16())); |
130 | 0 | } |
131 | | |
132 | 0 | Ok(response) |
133 | 0 | } |
134 | | |
135 | | #[allow(clippy::too_many_arguments)] |
136 | 0 | fn call_run( |
137 | 0 | agent: &Agent, |
138 | 0 | config: &Config, |
139 | 0 | request_level: bool, |
140 | 0 | mut call: Call<Prepare>, |
141 | 0 | body: &mut SendBody, |
142 | 0 | redirect_count: u32, |
143 | 0 | redirect_history: &mut Option<Vec<Uri>>, |
144 | 0 | timings: &mut CallTimings, |
145 | 0 | ) -> Result<CallResult, Error> { |
146 | 0 | let uri = call.uri().clone(); |
147 | 0 | debug!("{} {:?}", call.method(), &DebugUri(call.uri())); |
148 | | |
149 | 0 | if config.https_only() && uri.scheme() != Some(&Scheme::HTTPS) { |
150 | 0 | return Err(Error::RequireHttpsOnly(uri.to_string())); |
151 | 0 | } |
152 | | |
153 | 0 | add_headers(&mut call, agent, config, body, &uri)?; |
154 | | |
155 | 0 | let mut connection = connect(agent, config, request_level, &uri, timings)?; |
156 | | |
157 | 0 | let mut call = call.proceed(); |
158 | | |
159 | 0 | if log_enabled!(log::Level::Debug) { |
160 | 0 | let headers = call.headers_map()?; |
161 | | |
162 | 0 | let r = DebugRequest { |
163 | 0 | method: call.method(), |
164 | 0 | uri: call.uri(), |
165 | 0 | version: call.version(), |
166 | 0 | headers, |
167 | 0 | }; |
168 | 0 | debug!("{:?}", r); |
169 | 0 | } |
170 | | |
171 | 0 | let call = match send_request(call, &mut connection, timings)? { |
172 | 0 | SendRequestResult::Await100(call) => match await_100(call, &mut connection, timings)? { |
173 | 0 | Await100Result::SendBody(call) => send_body(call, body, &mut connection, timings)?, |
174 | 0 | Await100Result::RecvResponse(call) => call, |
175 | | }, |
176 | 0 | SendRequestResult::SendBody(call) => send_body(call, body, &mut connection, timings)?, |
177 | 0 | SendRequestResult::RecvResponse(call) => call, |
178 | | }; |
179 | | |
180 | 0 | let is_following_redirects = redirect_count < config.max_redirects(); |
181 | | |
182 | 0 | let (mut response, response_result) = recv_response( |
183 | 0 | call, |
184 | 0 | &mut connection, |
185 | 0 | config, |
186 | 0 | timings, |
187 | 0 | is_following_redirects, |
188 | 0 | )?; |
189 | | |
190 | 0 | debug!("{:?}", DebugResponse(&response)); |
191 | | |
192 | | #[cfg(feature = "cookies")] |
193 | | { |
194 | | let mut jar = agent.cookie_jar_lock(); |
195 | | |
196 | | let iter = response |
197 | | .headers() |
198 | | .get_all(http::header::SET_COOKIE) |
199 | | .iter() |
200 | | .filter_map(|h| h.to_str().ok()) |
201 | | .filter_map(|s| crate::Cookie::parse(s, &uri).ok()); |
202 | | |
203 | | jar.store_response_cookies(iter, &uri); |
204 | | } |
205 | | |
206 | 0 | if let Some(history) = redirect_history.as_mut() { |
207 | 0 | history.push(uri.clone()); |
208 | 0 | response |
209 | 0 | .extensions_mut() |
210 | 0 | .insert(RedirectHistory(history.clone())); |
211 | 0 | } |
212 | 0 | response.extensions_mut().insert(ResponseUri(uri)); |
213 | | |
214 | 0 | let ret = match response_result { |
215 | 0 | RecvResponseResult::RecvBody(call) => { |
216 | 0 | let timings = mem::take(timings); |
217 | 0 | let mut handler = BodyHandler { |
218 | 0 | call: Some(call), |
219 | 0 | connection: Some(connection), |
220 | 0 | timings, |
221 | 0 | ..Default::default() |
222 | 0 | }; |
223 | | |
224 | 0 | if response.status().is_redirection() { |
225 | 0 | if is_following_redirects { |
226 | 0 | let call = handler.consume_redirect_body()?; |
227 | | |
228 | 0 | CallResult::Redirect(call, handler.timings) |
229 | 0 | } else if config.max_redirects_do_error() { |
230 | 0 | return Err(Error::TooManyRedirects); |
231 | | } else { |
232 | 0 | CallResult::Response(response, handler) |
233 | | } |
234 | | } else { |
235 | 0 | CallResult::Response(response, handler) |
236 | | } |
237 | | } |
238 | 0 | RecvResponseResult::Redirect(call) => { |
239 | 0 | cleanup(connection, call.must_close_connection(), timings.now()); |
240 | | |
241 | 0 | if is_following_redirects { |
242 | 0 | CallResult::Redirect(call, mem::take(timings)) |
243 | 0 | } else if config.max_redirects_do_error() { |
244 | 0 | return Err(Error::TooManyRedirects); |
245 | | } else { |
246 | 0 | CallResult::Response(response, BodyHandler::default()) |
247 | | } |
248 | | } |
249 | 0 | RecvResponseResult::Cleanup(call) => { |
250 | 0 | cleanup(connection, call.must_close_connection(), timings.now()); |
251 | 0 | CallResult::Response(response, BodyHandler::default()) |
252 | | } |
253 | | }; |
254 | | |
255 | 0 | Ok(ret) |
256 | 0 | } |
257 | | |
258 | | /// Return type of [`call_run`]. |
259 | | #[allow(clippy::large_enum_variant)] |
260 | | enum CallResult { |
261 | | /// Call resulted in a redirect. |
262 | | Redirect(Call<Redirect>, CallTimings), |
263 | | |
264 | | /// Call resulted in a response. |
265 | | Response(Response<()>, BodyHandler), |
266 | | } |
267 | | |
268 | 0 | fn add_headers( |
269 | 0 | call: &mut Call<Prepare>, |
270 | 0 | agent: &Agent, |
271 | 0 | config: &Config, |
272 | 0 | body: &mut SendBody, |
273 | 0 | uri: &Uri, |
274 | 0 | ) -> Result<(), Error> { |
275 | 0 | let headers = call.headers(); |
276 | | |
277 | 0 | let send_body_mode = if headers.has_send_body_mode() { |
278 | 0 | None |
279 | | } else { |
280 | 0 | Some(body.body_mode()?) |
281 | | }; |
282 | 0 | let has_header_accept_enc = headers.has_accept_encoding(); |
283 | 0 | let has_header_ua = headers.has_user_agent(); |
284 | 0 | let has_header_accept = headers.has_accept(); |
285 | | |
286 | | #[cfg(not(feature = "cookies"))] |
287 | 0 | { |
288 | 0 | let _ = agent; |
289 | 0 | let _ = uri; |
290 | 0 | } |
291 | | #[cfg(feature = "cookies")] |
292 | | { |
293 | | let value = agent.jar.get_request_cookies(uri); |
294 | | if !value.is_empty() { |
295 | | let value = HeaderValue::from_str(&value) |
296 | | .map_err(|_| Error::CookieValue("Cookie value is an invalid http-header"))?; |
297 | | call.header(header::COOKIE, value)?; |
298 | | } |
299 | | } |
300 | | |
301 | | { |
302 | 0 | static ACCEPTS: LazyLock<String> = LazyLock::new(|| { |
303 | | #[allow(unused_mut)] |
304 | 0 | let mut value = String::with_capacity(10); |
305 | | #[cfg(feature = "gzip")] |
306 | | value.push_str("gzip"); |
307 | | #[cfg(all(feature = "gzip", feature = "brotli"))] |
308 | | value.push_str(", "); |
309 | | #[cfg(feature = "brotli")] |
310 | | value.push_str("br"); |
311 | 0 | value |
312 | 0 | }); |
313 | | |
314 | 0 | let accepts = &*ACCEPTS; |
315 | | |
316 | 0 | if !has_header_accept_enc { |
317 | 0 | if let Some(v) = config.accept_encoding().as_str(accepts) { |
318 | | // unwrap is ok because above ACCEPTS will produce a valid value, |
319 | | // or the value is user provided in which case it must be valid. |
320 | 0 | let value = HeaderValue::from_str(v).unwrap(); |
321 | 0 | call.header(header::ACCEPT_ENCODING, value)?; |
322 | 0 | } |
323 | 0 | } |
324 | | } |
325 | | |
326 | 0 | if let Some(send_body_mode) = send_body_mode { |
327 | 0 | match send_body_mode { |
328 | 0 | BodyMode::LengthDelimited(v) => { |
329 | 0 | let value = HeaderValue::from(v); |
330 | 0 | call.header(header::CONTENT_LENGTH, value)?; |
331 | | } |
332 | | BodyMode::Chunked => { |
333 | 0 | let value = HeaderValue::from_static("chunked"); |
334 | 0 | call.header(header::TRANSFER_ENCODING, value)?; |
335 | | } |
336 | 0 | _ => {} |
337 | | } |
338 | 0 | } |
339 | | |
340 | 0 | if !has_header_ua { |
341 | | // unwrap is ok because a user might override the agent, and if they |
342 | | // set bad values, it's not really ureq's problem. |
343 | 0 | if let Some(v) = config.user_agent().as_str(DEFAULT_USER_AGENT) { |
344 | 0 | let value = HeaderValue::from_str(v).unwrap(); |
345 | 0 | call.header(header::USER_AGENT, value)?; |
346 | 0 | } |
347 | 0 | } |
348 | | |
349 | 0 | if !has_header_accept { |
350 | | // unwrap is ok because a user might override accepts header, and if they |
351 | | // set bad values, it's not really ureq's problem. |
352 | 0 | if let Some(v) = config.accept().as_str("*/*") { |
353 | 0 | let value = HeaderValue::from_str(v).unwrap(); |
354 | 0 | call.header(header::ACCEPT, value)?; |
355 | 0 | } |
356 | 0 | } |
357 | | |
358 | 0 | Ok(()) |
359 | 0 | } |
360 | | |
361 | 0 | fn connect( |
362 | 0 | agent: &Agent, |
363 | 0 | config: &Config, |
364 | 0 | request_level: bool, |
365 | 0 | uri: &Uri, |
366 | 0 | timings: &mut CallTimings, |
367 | 0 | ) -> Result<Connection, Error> { |
368 | | // Before resolving the URI we need to ensure it is a full URI. We |
369 | | // cannot make requests with partial uri like "/path". |
370 | 0 | uri.ensure_valid_url()?; |
371 | | |
372 | 0 | let is_proxy = config.proxy().is_some(); |
373 | | |
374 | | // For most proxy configs, the proxy itself should resolve the host name we are connecting to. |
375 | | // However for SOCKS4, we must do it and pass the resolved IP to the proxy. |
376 | 0 | let is_proxy_local_resolve = config.proxy().map(|p| p.resolve_target()).unwrap_or(false); |
377 | | |
378 | | // Tells if this host matches NO_PROXY |
379 | 0 | let is_no_proxy = config.proxy().map(|p| p.is_no_proxy(uri)).unwrap_or(false); |
380 | | |
381 | 0 | let addrs = if is_no_proxy || !is_proxy || is_proxy_local_resolve { |
382 | 0 | agent |
383 | 0 | .resolver |
384 | 0 | .resolve(uri, config, timings.next_timeout(Timeout::Resolve))? |
385 | | } else { |
386 | 0 | agent.resolver.empty() |
387 | | }; |
388 | | |
389 | 0 | timings.record_time(Timeout::Resolve); |
390 | | |
391 | 0 | let details = ConnectionDetails { |
392 | 0 | uri, |
393 | 0 | addrs, |
394 | 0 | resolver: &*agent.resolver, |
395 | 0 | config, |
396 | 0 | request_level, |
397 | 0 | now: timings.now(), |
398 | 0 | timeout: timings.next_timeout(Timeout::Connect), |
399 | 0 | current_time: timings.current_time().clone(), |
400 | 0 | run_connector: agent.run_connector.clone(), |
401 | 0 | }; |
402 | | |
403 | 0 | let connection = agent.pool.connect(&details, config.max_idle_age().into())?; |
404 | | |
405 | 0 | if details.needs_tls() && !connection.is_tls() { |
406 | 0 | return Err(Error::TlsRequired); |
407 | 0 | } |
408 | | |
409 | 0 | timings.record_time(Timeout::Connect); |
410 | | |
411 | 0 | Ok(connection) |
412 | 0 | } |
413 | | |
414 | 0 | fn send_request( |
415 | 0 | mut call: Call<SendRequest>, |
416 | 0 | connection: &mut Connection, |
417 | 0 | timings: &mut CallTimings, |
418 | 0 | ) -> Result<SendRequestResult, Error> { |
419 | | loop { |
420 | 0 | if call.can_proceed() { |
421 | 0 | break; |
422 | 0 | } |
423 | | |
424 | 0 | let buffers = connection.buffers(); |
425 | 0 | let amount = call.write(buffers.output())?; |
426 | 0 | let timeout = timings.next_timeout(Timeout::SendRequest); |
427 | 0 | connection.transmit_output(amount, timeout)?; |
428 | | } |
429 | | |
430 | 0 | timings.record_time(Timeout::SendRequest); |
431 | | |
432 | | // The request might be misconfigured. |
433 | 0 | let call = call.proceed()?; |
434 | | |
435 | | // We checked can_proceed() above, this unwrap is fine. |
436 | 0 | Ok(call.unwrap()) |
437 | 0 | } |
438 | | |
439 | 0 | fn await_100( |
440 | 0 | mut call: Call<Await100>, |
441 | 0 | connection: &mut Connection, |
442 | 0 | timings: &mut CallTimings, |
443 | 0 | ) -> Result<Await100Result, Error> { |
444 | 0 | while call.can_keep_await_100() { |
445 | 0 | let timeout = timings.next_timeout(Timeout::Await100); |
446 | | |
447 | 0 | if timeout.after.is_zero() { |
448 | | // Stop waiting for 100-continue. |
449 | 0 | break; |
450 | 0 | } |
451 | | |
452 | 0 | match connection.maybe_await_input(timeout) { |
453 | | Ok(_) => { |
454 | 0 | let input = connection.buffers().input(); |
455 | 0 | if input.is_empty() { |
456 | 0 | return Err(Error::disconnected("await_100 empty input")); |
457 | 0 | } |
458 | | |
459 | 0 | let amount = call.try_read_100(input)?; |
460 | 0 | if amount > 0 { |
461 | 0 | connection.consume_input(amount); |
462 | 0 | break; |
463 | 0 | } |
464 | | } |
465 | | Err(Error::Timeout(_)) => { |
466 | | // If we get a timeout while waiting for input, that is not an error, |
467 | | // we progress to send the request body. |
468 | 0 | break; |
469 | | } |
470 | 0 | Err(e) => return Err(e), |
471 | | } |
472 | | } |
473 | | |
474 | 0 | timings.record_time(Timeout::Await100); |
475 | | |
476 | | // A misconfigured request might surface here. |
477 | 0 | let call = call.proceed()?; |
478 | | |
479 | 0 | Ok(call) |
480 | 0 | } |
481 | | |
482 | 0 | fn send_body( |
483 | 0 | mut call: Call<SendBodyState>, |
484 | 0 | body: &mut SendBody, |
485 | 0 | connection: &mut Connection, |
486 | 0 | timings: &mut CallTimings, |
487 | 0 | ) -> Result<Call<RecvResponse>, Error> { |
488 | | loop { |
489 | 0 | if call.can_proceed() { |
490 | 0 | break; |
491 | 0 | } |
492 | | |
493 | 0 | let buffers = connection.buffers(); |
494 | | |
495 | 0 | let (tmp, output) = buffers.tmp_and_output(); |
496 | | |
497 | 0 | let input_len = tmp.len(); |
498 | | |
499 | 0 | let input_fitting_in_output = call.calculate_max_input(output.len()); |
500 | 0 | let max_input = input_len.min(input_fitting_in_output); |
501 | | |
502 | 0 | let output_used = if !call.is_chunked() { |
503 | | // For non-chunked, The body can be written directly to the output. |
504 | | // This optimizes away a memcopy if we were to go via call.write(). |
505 | 0 | let output_used = body.read(output)?; |
506 | | |
507 | | // Size checking is still in the call. |
508 | 0 | call.consume_direct_write(output_used)?; |
509 | | |
510 | 0 | output_used |
511 | | } else { |
512 | 0 | let tmp = &mut tmp[..max_input]; |
513 | 0 | let n = body.read(tmp)?; |
514 | | |
515 | 0 | let (input_used, output_used) = call.write(&tmp[..n], output)?; |
516 | | |
517 | | // Since output is "a bit" larger than the input (compensate for chunk ovexrhead), |
518 | | // the entire input we read from the body should also be shipped to the output. |
519 | 0 | assert!(input_used == n); |
520 | | |
521 | 0 | output_used |
522 | | }; |
523 | | |
524 | 0 | let timeout = timings.next_timeout(Timeout::SendBody); |
525 | 0 | connection.transmit_output(output_used, timeout)?; |
526 | | } |
527 | | |
528 | 0 | timings.record_time(Timeout::SendBody); |
529 | 0 | Ok(call.proceed().unwrap()) |
530 | 0 | } |
531 | | |
532 | 0 | fn recv_response( |
533 | 0 | mut call: Call<RecvResponse>, |
534 | 0 | connection: &mut Connection, |
535 | 0 | config: &Config, |
536 | 0 | timings: &mut CallTimings, |
537 | 0 | is_following_redirects: bool, |
538 | 0 | ) -> Result<(Response<()>, RecvResponseResult), Error> { |
539 | 0 | let response = loop { |
540 | 0 | let timeout = timings.next_timeout(Timeout::RecvResponse); |
541 | 0 | let made_progress = connection.maybe_await_input(timeout)?; |
542 | | |
543 | 0 | let input = connection.buffers().input(); |
544 | | |
545 | | // If cookies are disabled, we can allow ourselves to follow |
546 | | // the redirect as soon as we discover the `Location` header. |
547 | | // There are plenty of broken servers out there that don't send |
548 | | // the finishing \r\n on redirects. With cookies off, we can |
549 | | // handle that situation. |
550 | | // |
551 | | // If cookies are enabled, we risk mising a `Set-Cookie` header |
552 | | // with such a strategy. |
553 | 0 | let cookies_enabled = cfg!(feature = "cookies"); |
554 | | |
555 | | // If we are not following redirects, do not allow partial returned |
556 | | // 302 responses. |
557 | 0 | let allow_partial_redirect = !cookies_enabled && is_following_redirects; |
558 | | |
559 | 0 | let (amount, maybe_response) = call.try_response(input, allow_partial_redirect)?; |
560 | | |
561 | 0 | let check_size = if maybe_response.is_some() { |
562 | | // We got a parsed response, ensure the size is within |
563 | | // configured parameters. |
564 | 0 | amount |
565 | | } else { |
566 | | // We did not parse a response, if input is too large, |
567 | | // we stop trying to get more data. |
568 | 0 | input.len() |
569 | | }; |
570 | | |
571 | 0 | if check_size > config.max_response_header_size() { |
572 | 0 | return Err(Error::LargeResponseHeader( |
573 | 0 | input.len(), |
574 | 0 | config.max_response_header_size(), |
575 | 0 | )); |
576 | 0 | } |
577 | | |
578 | 0 | connection.consume_input(amount); |
579 | | |
580 | 0 | if let Some(response) = maybe_response { |
581 | 0 | assert!(call.can_proceed()); |
582 | 0 | break response; |
583 | 0 | } else if !made_progress { |
584 | 0 | return Err(Error::disconnected("recv_respone made no progress")); |
585 | 0 | } |
586 | | }; |
587 | | |
588 | 0 | timings.record_time(Timeout::RecvResponse); |
589 | 0 | Ok((response, call.proceed().unwrap())) |
590 | 0 | } |
591 | | |
592 | 0 | fn handle_redirect(mut call: Call<Redirect>, config: &Config) -> Result<Call<Prepare>, Error> { |
593 | 0 | let maybe_new_call = call.as_new_call(config.redirect_auth_headers())?; |
594 | 0 | let status = call.status(); |
595 | | |
596 | 0 | if let Some(call) = maybe_new_call { |
597 | 0 | debug!( |
598 | 0 | "Redirect ({}): {} {:?}", |
599 | | status, |
600 | 0 | call.method(), |
601 | 0 | DebugUri(call.uri()) |
602 | | ); |
603 | | |
604 | 0 | Ok(call) |
605 | | } else { |
606 | 0 | Err(Error::RedirectFailed) |
607 | | } |
608 | 0 | } |
609 | | |
610 | 0 | fn cleanup(connection: Connection, must_close: bool, now: Instant) { |
611 | 0 | if must_close { |
612 | 0 | connection.close(); |
613 | 0 | } else { |
614 | 0 | connection.reuse(now) |
615 | | } |
616 | 0 | } |
617 | | |
618 | | #[derive(Default)] |
619 | | pub(crate) struct BodyHandler { |
620 | | call: Option<Call<RecvBody>>, |
621 | | connection: Option<Connection>, |
622 | | timings: CallTimings, |
623 | | remote_closed: bool, |
624 | | redirect: Option<Box<Call<Redirect>>>, |
625 | | } |
626 | | |
627 | | impl BodyHandler { |
628 | 0 | fn do_read(&mut self, buf: &mut [u8]) -> Result<usize, Error> { |
629 | 0 | let (Some(call), Some(connection), timings) = |
630 | 0 | (&mut self.call, &mut self.connection, &mut self.timings) |
631 | | else { |
632 | 0 | return Ok(0); |
633 | | }; |
634 | | |
635 | | loop { |
636 | 0 | let body_fulfilled = match call.body_mode() { |
637 | 0 | BodyMode::NoBody => unreachable!("must be a BodyMode for BodyHandler"), |
638 | | // These modes are fulfilled by either reaching the content-length or |
639 | | // receiving an end chunk delimiter. |
640 | 0 | BodyMode::LengthDelimited(_) | BodyMode::Chunked => call.can_proceed(), |
641 | | // This mode can only end once remote closes |
642 | 0 | BodyMode::CloseDelimited => false, |
643 | | }; |
644 | | |
645 | 0 | if body_fulfilled { |
646 | 0 | self.ended()?; |
647 | 0 | return Ok(0); |
648 | 0 | } |
649 | | |
650 | 0 | let has_buffered_input = connection.buffers().can_use_input(); |
651 | | |
652 | | // First try to use input already buffered |
653 | 0 | if has_buffered_input { |
654 | 0 | let input = connection.buffers().input(); |
655 | 0 | let (input_used, output_used) = call.read(input, buf)?; |
656 | 0 | connection.consume_input(input_used); |
657 | | |
658 | 0 | if output_used > 0 { |
659 | 0 | return Ok(output_used); |
660 | 0 | } |
661 | | |
662 | 0 | if input_used > 0 { |
663 | | // Body might be fulfilled now. |
664 | 0 | continue; |
665 | 0 | } |
666 | 0 | } |
667 | | |
668 | 0 | if self.remote_closed { |
669 | | // we should not try to await_input again. |
670 | 0 | self.ended()?; |
671 | 0 | return Ok(0); |
672 | 0 | } |
673 | | |
674 | 0 | let timeout = timings.next_timeout(Timeout::RecvBody); |
675 | | |
676 | 0 | let made_progress = match connection.maybe_await_input(timeout) { |
677 | 0 | Ok(v) => v, |
678 | 0 | Err(Error::Io(e)) => match e.kind() { |
679 | | io::ErrorKind::UnexpectedEof |
680 | | | io::ErrorKind::ConnectionAborted |
681 | | | io::ErrorKind::ConnectionReset => { |
682 | 0 | self.remote_closed = true; |
683 | 0 | true |
684 | | } |
685 | 0 | _ => return Err(Error::Io(e)), |
686 | | }, |
687 | 0 | Err(e) => return Err(e), |
688 | | }; |
689 | | |
690 | 0 | let input = connection.buffers().input(); |
691 | 0 | let input_ended = input.is_empty(); |
692 | | |
693 | 0 | let (input_used, output_used) = call.read(input, buf)?; |
694 | 0 | connection.consume_input(input_used); |
695 | | |
696 | 0 | if output_used > 0 { |
697 | 0 | return Ok(output_used); |
698 | 0 | } else if input_ended { |
699 | 0 | self.ended()?; |
700 | 0 | return Ok(0); |
701 | 0 | } else if made_progress { |
702 | | // The maybe_await_input() made progress, but handled amount is 0. This |
703 | | // can for instance happen if we read some data, but not enough for |
704 | | // decoding any gzip. |
705 | 0 | continue; |
706 | | } else { |
707 | | // This is an error case we don't want to see. |
708 | 0 | return Err(Error::BodyStalled); |
709 | | } |
710 | | } |
711 | 0 | } |
712 | | |
713 | 0 | fn ended(&mut self) -> Result<(), Error> { |
714 | 0 | self.timings.record_time(Timeout::RecvBody); |
715 | | |
716 | 0 | let call = self.call.take().expect("ended() called with body"); |
717 | | |
718 | | // In some cases, when reading chunked, the server send 0\r\n to indicate |
719 | | // the end of the body, and then abruptly does a FIN. In these cases we have |
720 | | // received the entire body, but must clean up the connection. |
721 | 0 | let is_ended_chunked = call.is_ended_chunked(); |
722 | | |
723 | 0 | let mut force_close = false; |
724 | | |
725 | 0 | if !call.can_proceed() { |
726 | 0 | if is_ended_chunked { |
727 | | // This case means we got 0\r\n, but can_proceed() is false because |
728 | | // it only goes true on fully received chunked bodies. |
729 | 0 | debug!("Server ended connection after sending chunked 0\\r\\n"); |
730 | 0 | force_close = true; |
731 | | } else { |
732 | 0 | return Err(Error::disconnected("ended call cannot proceed")); |
733 | | } |
734 | 0 | } |
735 | | |
736 | 0 | let must_close_connection = force_close |
737 | 0 | || match call.proceed().unwrap() { |
738 | 0 | RecvBodyResult::Redirect(call) => { |
739 | 0 | let c = call.must_close_connection(); |
740 | 0 | self.redirect = Some(Box::new(call)); |
741 | 0 | c |
742 | | } |
743 | 0 | RecvBodyResult::Cleanup(v) => v.must_close_connection(), |
744 | | }; |
745 | | |
746 | 0 | let connection = self.connection.take().expect("ended() called with body"); |
747 | 0 | cleanup(connection, must_close_connection, self.timings.now()); |
748 | | |
749 | 0 | Ok(()) |
750 | 0 | } |
751 | | |
752 | 0 | fn consume_redirect_body(&mut self) -> Result<Call<Redirect>, Error> { |
753 | 0 | let mut buf = vec![0; 1024]; |
754 | | loop { |
755 | 0 | let amount = self.do_read(&mut buf)?; |
756 | 0 | if amount == 0 { |
757 | 0 | break; |
758 | 0 | } |
759 | | } |
760 | | |
761 | | // Unwrap is OK, because we are only consuming the redirect body if |
762 | | // such a body was signalled by the remote. |
763 | 0 | let redirect = self.redirect.take().map(|b| *b); |
764 | 0 | Ok(redirect.expect("remote to have signaled redirect")) |
765 | 0 | } |
766 | | } |
767 | | |
768 | | impl io::Read for BodyHandler { |
769 | 0 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
770 | 0 | self.do_read(buf).map_err(|e| e.into_io()) |
771 | 0 | } |
772 | | } |