Coverage Report

Created: 2026-04-29 06:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/ureq-3.3.0/src/run.rs
Line
Count
Source
1
use std::sync::{Arc, LazyLock};
2
use std::{io, mem};
3
4
use http::uri::Scheme;
5
use http::{HeaderValue, Method, Request, Response, Uri, header};
6
use ureq_proto::BodyMode;
7
use ureq_proto::client::state::{Await100, RecvBody, RecvResponse, Redirect, SendRequest};
8
use ureq_proto::client::state::{Prepare, SendBody as SendBodyState};
9
use ureq_proto::client::{Await100Result, RecvBodyResult};
10
use ureq_proto::client::{RecvResponseResult, SendRequestResult};
11
12
use crate::body::ResponseInfo;
13
use crate::config::{Config, DEFAULT_USER_AGENT, RequestLevelConfig};
14
use crate::http;
15
use crate::pool::Connection;
16
use crate::request::ForceSendBody;
17
use crate::response::{RedirectHistory, ResponseUri};
18
use crate::timings::{CallTimings, CurrentTime};
19
use crate::transport::ConnectionDetails;
20
use crate::transport::time::{Duration, Instant};
21
use crate::util::{DebugRequest, DebugResponse, DebugUri, HeaderMapExt, UriExt};
22
use crate::{Agent, Body, Error, SendBody, Timeout};
23
24
type Call<T> = ureq_proto::client::Call<T>;
25
26
/// Run a request.
27
///
28
/// This is the "main loop" of entire ureq.
29
0
pub(crate) fn run(
30
0
    agent: &Agent,
31
0
    mut request: Request<()>,
32
0
    mut body: SendBody,
33
0
) -> Result<Response<Body>, Error> {
34
0
    let mut redirect_count = 0;
35
36
    // Configuration on the request level overrides the agent level.
37
0
    let (config, request_level) = request
38
0
        .extensions_mut()
39
0
        .remove::<RequestLevelConfig>()
40
0
        .map(|rl| (Arc::new(rl.0), true))
41
0
        .unwrap_or_else(|| (agent.config.clone(), false));
42
43
0
    let force_send_body = request.extensions_mut().remove::<ForceSendBody>().is_some();
44
45
0
    let mut redirect_history: Option<Vec<Uri>> =
46
0
        config.save_redirect_history().then_some(Vec::new());
47
48
0
    let timeouts = config.timeouts();
49
50
0
    let mut timings = CallTimings::new(timeouts, CurrentTime::default());
51
52
0
    let mut call = Call::new(request)?;
53
54
0
    if force_send_body {
55
0
        call.force_send_body();
56
0
    }
57
58
0
    call.allow_non_standard_methods(config.allow_non_standard_methods());
59
60
0
    let (response, handler) = loop {
61
0
        let timeout = timings.next_timeout(Timeout::Global);
62
0
        let timed_out = match timeout.after {
63
0
            Duration::Exact(v) => v.is_zero(),
64
0
            Duration::NotHappening => false,
65
        };
66
0
        if timed_out {
67
0
            return Err(Error::Timeout(Timeout::Global));
68
0
        }
69
70
0
        match call_run(
71
0
            agent,
72
0
            &config,
73
0
            request_level,
74
0
            call,
75
0
            &mut body,
76
0
            redirect_count,
77
0
            &mut redirect_history,
78
0
            &mut timings,
79
0
        )? {
80
            // Follow redirect
81
0
            CallResult::Redirect(rcall, rtimings) => {
82
0
                redirect_count += 1;
83
84
0
                call = handle_redirect(rcall, &config)?;
85
86
                // If the new method doesn't need a body, clear it.
87
                // This prevents Content-Length/Transfer-Encoding headers from being added
88
                // on methods like GET that don't send bodies (e.g., POST->GET redirects).
89
0
                let method = call.method();
90
0
                if !matches!(method, &Method::POST | &Method::PUT | &Method::PATCH) {
91
0
                    body.remove();
92
0
                }
93
94
0
                timings = rtimings.new_call();
95
            }
96
97
            // Return response
98
0
            CallResult::Response(response, handler) => break (response, handler),
99
        }
100
    };
101
102
0
    let (mut parts, _) = response.into_parts();
103
104
0
    let recv_body_mode = handler
105
0
        .call
106
0
        .as_ref()
107
0
        .map(|f| f.body_mode())
108
0
        .unwrap_or(BodyMode::NoBody);
109
110
0
    let info = ResponseInfo::new(&parts.headers, recv_body_mode);
111
112
    // If the body will be decompressed, strip Content-Encoding and Content-Length
113
    // from the response headers. The Content-Length no longer matches the
114
    // decompressed body size, and Content-Encoding no longer applies since
115
    // the body is delivered to the caller already decompressed (RFC 9110 ยง8.7).
116
0
    if info.is_decompressing() {
117
0
        parts.headers.remove(http::header::CONTENT_ENCODING);
118
0
        parts.headers.remove(http::header::CONTENT_LENGTH);
119
0
    }
120
121
0
    let body = Body::new(handler, info);
122
123
0
    let response = Response::from_parts(parts, body);
124
125
0
    let status = response.status();
126
0
    let is_err = status.is_client_error() || status.is_server_error();
127
128
0
    if config.http_status_as_error() && is_err {
129
0
        return Err(Error::StatusCode(status.as_u16()));
130
0
    }
131
132
0
    Ok(response)
133
0
}
134
135
#[allow(clippy::too_many_arguments)]
136
0
fn call_run(
137
0
    agent: &Agent,
138
0
    config: &Config,
139
0
    request_level: bool,
140
0
    mut call: Call<Prepare>,
141
0
    body: &mut SendBody,
142
0
    redirect_count: u32,
143
0
    redirect_history: &mut Option<Vec<Uri>>,
144
0
    timings: &mut CallTimings,
145
0
) -> Result<CallResult, Error> {
146
0
    let uri = call.uri().clone();
147
0
    debug!("{} {:?}", call.method(), &DebugUri(call.uri()));
148
149
0
    if config.https_only() && uri.scheme() != Some(&Scheme::HTTPS) {
150
0
        return Err(Error::RequireHttpsOnly(uri.to_string()));
151
0
    }
152
153
0
    add_headers(&mut call, agent, config, body, &uri)?;
154
155
0
    let mut connection = connect(agent, config, request_level, &uri, timings)?;
156
157
0
    let mut call = call.proceed();
158
159
0
    if log_enabled!(log::Level::Debug) {
160
0
        let headers = call.headers_map()?;
161
162
0
        let r = DebugRequest {
163
0
            method: call.method(),
164
0
            uri: call.uri(),
165
0
            version: call.version(),
166
0
            headers,
167
0
        };
168
0
        debug!("{:?}", r);
169
0
    }
170
171
0
    let call = match send_request(call, &mut connection, timings)? {
172
0
        SendRequestResult::Await100(call) => match await_100(call, &mut connection, timings)? {
173
0
            Await100Result::SendBody(call) => send_body(call, body, &mut connection, timings)?,
174
0
            Await100Result::RecvResponse(call) => call,
175
        },
176
0
        SendRequestResult::SendBody(call) => send_body(call, body, &mut connection, timings)?,
177
0
        SendRequestResult::RecvResponse(call) => call,
178
    };
179
180
0
    let is_following_redirects = redirect_count < config.max_redirects();
181
182
0
    let (mut response, response_result) = recv_response(
183
0
        call,
184
0
        &mut connection,
185
0
        config,
186
0
        timings,
187
0
        is_following_redirects,
188
0
    )?;
189
190
0
    debug!("{:?}", DebugResponse(&response));
191
192
    #[cfg(feature = "cookies")]
193
    {
194
        let mut jar = agent.cookie_jar_lock();
195
196
        let iter = response
197
            .headers()
198
            .get_all(http::header::SET_COOKIE)
199
            .iter()
200
            .filter_map(|h| h.to_str().ok())
201
            .filter_map(|s| crate::Cookie::parse(s, &uri).ok());
202
203
        jar.store_response_cookies(iter, &uri);
204
    }
205
206
0
    if let Some(history) = redirect_history.as_mut() {
207
0
        history.push(uri.clone());
208
0
        response
209
0
            .extensions_mut()
210
0
            .insert(RedirectHistory(history.clone()));
211
0
    }
212
0
    response.extensions_mut().insert(ResponseUri(uri));
213
214
0
    let ret = match response_result {
215
0
        RecvResponseResult::RecvBody(call) => {
216
0
            let timings = mem::take(timings);
217
0
            let mut handler = BodyHandler {
218
0
                call: Some(call),
219
0
                connection: Some(connection),
220
0
                timings,
221
0
                ..Default::default()
222
0
            };
223
224
0
            if response.status().is_redirection() {
225
0
                if is_following_redirects {
226
0
                    let call = handler.consume_redirect_body()?;
227
228
0
                    CallResult::Redirect(call, handler.timings)
229
0
                } else if config.max_redirects_do_error() {
230
0
                    return Err(Error::TooManyRedirects);
231
                } else {
232
0
                    CallResult::Response(response, handler)
233
                }
234
            } else {
235
0
                CallResult::Response(response, handler)
236
            }
237
        }
238
0
        RecvResponseResult::Redirect(call) => {
239
0
            cleanup(connection, call.must_close_connection(), timings.now());
240
241
0
            if is_following_redirects {
242
0
                CallResult::Redirect(call, mem::take(timings))
243
0
            } else if config.max_redirects_do_error() {
244
0
                return Err(Error::TooManyRedirects);
245
            } else {
246
0
                CallResult::Response(response, BodyHandler::default())
247
            }
248
        }
249
0
        RecvResponseResult::Cleanup(call) => {
250
0
            cleanup(connection, call.must_close_connection(), timings.now());
251
0
            CallResult::Response(response, BodyHandler::default())
252
        }
253
    };
254
255
0
    Ok(ret)
256
0
}
257
258
/// Return type of [`call_run`].
259
#[allow(clippy::large_enum_variant)]
260
enum CallResult {
261
    /// Call resulted in a redirect.
262
    Redirect(Call<Redirect>, CallTimings),
263
264
    /// Call resulted in a response.
265
    Response(Response<()>, BodyHandler),
266
}
267
268
0
fn add_headers(
269
0
    call: &mut Call<Prepare>,
270
0
    agent: &Agent,
271
0
    config: &Config,
272
0
    body: &mut SendBody,
273
0
    uri: &Uri,
274
0
) -> Result<(), Error> {
275
0
    let headers = call.headers();
276
277
0
    let send_body_mode = if headers.has_send_body_mode() {
278
0
        None
279
    } else {
280
0
        Some(body.body_mode()?)
281
    };
282
0
    let has_header_accept_enc = headers.has_accept_encoding();
283
0
    let has_header_ua = headers.has_user_agent();
284
0
    let has_header_accept = headers.has_accept();
285
286
    #[cfg(not(feature = "cookies"))]
287
0
    {
288
0
        let _ = agent;
289
0
        let _ = uri;
290
0
    }
291
    #[cfg(feature = "cookies")]
292
    {
293
        let value = agent.jar.get_request_cookies(uri);
294
        if !value.is_empty() {
295
            let value = HeaderValue::from_str(&value)
296
                .map_err(|_| Error::CookieValue("Cookie value is an invalid http-header"))?;
297
            call.header(header::COOKIE, value)?;
298
        }
299
    }
300
301
    {
302
0
        static ACCEPTS: LazyLock<String> = LazyLock::new(|| {
303
            #[allow(unused_mut)]
304
0
            let mut value = String::with_capacity(10);
305
            #[cfg(feature = "gzip")]
306
            value.push_str("gzip");
307
            #[cfg(all(feature = "gzip", feature = "brotli"))]
308
            value.push_str(", ");
309
            #[cfg(feature = "brotli")]
310
            value.push_str("br");
311
0
            value
312
0
        });
313
314
0
        let accepts = &*ACCEPTS;
315
316
0
        if !has_header_accept_enc {
317
0
            if let Some(v) = config.accept_encoding().as_str(accepts) {
318
                // unwrap is ok because above ACCEPTS will produce a valid value,
319
                // or the value is user provided in which case it must be valid.
320
0
                let value = HeaderValue::from_str(v).unwrap();
321
0
                call.header(header::ACCEPT_ENCODING, value)?;
322
0
            }
323
0
        }
324
    }
325
326
0
    if let Some(send_body_mode) = send_body_mode {
327
0
        match send_body_mode {
328
0
            BodyMode::LengthDelimited(v) => {
329
0
                let value = HeaderValue::from(v);
330
0
                call.header(header::CONTENT_LENGTH, value)?;
331
            }
332
            BodyMode::Chunked => {
333
0
                let value = HeaderValue::from_static("chunked");
334
0
                call.header(header::TRANSFER_ENCODING, value)?;
335
            }
336
0
            _ => {}
337
        }
338
0
    }
339
340
0
    if !has_header_ua {
341
        // unwrap is ok because a user might override the agent, and if they
342
        // set bad values, it's not really ureq's problem.
343
0
        if let Some(v) = config.user_agent().as_str(DEFAULT_USER_AGENT) {
344
0
            let value = HeaderValue::from_str(v).unwrap();
345
0
            call.header(header::USER_AGENT, value)?;
346
0
        }
347
0
    }
348
349
0
    if !has_header_accept {
350
        // unwrap is ok because a user might override accepts header, and if they
351
        // set bad values, it's not really ureq's problem.
352
0
        if let Some(v) = config.accept().as_str("*/*") {
353
0
            let value = HeaderValue::from_str(v).unwrap();
354
0
            call.header(header::ACCEPT, value)?;
355
0
        }
356
0
    }
357
358
0
    Ok(())
359
0
}
360
361
0
fn connect(
362
0
    agent: &Agent,
363
0
    config: &Config,
364
0
    request_level: bool,
365
0
    uri: &Uri,
366
0
    timings: &mut CallTimings,
367
0
) -> Result<Connection, Error> {
368
    // Before resolving the URI we need to ensure it is a full URI. We
369
    // cannot make requests with partial uri like "/path".
370
0
    uri.ensure_valid_url()?;
371
372
0
    let is_proxy = config.proxy().is_some();
373
374
    // For most proxy configs, the proxy itself should resolve the host name we are connecting to.
375
    // However for SOCKS4, we must do it and pass the resolved IP to the proxy.
376
0
    let is_proxy_local_resolve = config.proxy().map(|p| p.resolve_target()).unwrap_or(false);
377
378
    // Tells if this host matches NO_PROXY
379
0
    let is_no_proxy = config.proxy().map(|p| p.is_no_proxy(uri)).unwrap_or(false);
380
381
0
    let addrs = if is_no_proxy || !is_proxy || is_proxy_local_resolve {
382
0
        agent
383
0
            .resolver
384
0
            .resolve(uri, config, timings.next_timeout(Timeout::Resolve))?
385
    } else {
386
0
        agent.resolver.empty()
387
    };
388
389
0
    timings.record_time(Timeout::Resolve);
390
391
0
    let details = ConnectionDetails {
392
0
        uri,
393
0
        addrs,
394
0
        resolver: &*agent.resolver,
395
0
        config,
396
0
        request_level,
397
0
        now: timings.now(),
398
0
        timeout: timings.next_timeout(Timeout::Connect),
399
0
        current_time: timings.current_time().clone(),
400
0
        run_connector: agent.run_connector.clone(),
401
0
    };
402
403
0
    let connection = agent.pool.connect(&details, config.max_idle_age().into())?;
404
405
0
    if details.needs_tls() && !connection.is_tls() {
406
0
        return Err(Error::TlsRequired);
407
0
    }
408
409
0
    timings.record_time(Timeout::Connect);
410
411
0
    Ok(connection)
412
0
}
413
414
0
fn send_request(
415
0
    mut call: Call<SendRequest>,
416
0
    connection: &mut Connection,
417
0
    timings: &mut CallTimings,
418
0
) -> Result<SendRequestResult, Error> {
419
    loop {
420
0
        if call.can_proceed() {
421
0
            break;
422
0
        }
423
424
0
        let buffers = connection.buffers();
425
0
        let amount = call.write(buffers.output())?;
426
0
        let timeout = timings.next_timeout(Timeout::SendRequest);
427
0
        connection.transmit_output(amount, timeout)?;
428
    }
429
430
0
    timings.record_time(Timeout::SendRequest);
431
432
    // The request might be misconfigured.
433
0
    let call = call.proceed()?;
434
435
    // We checked can_proceed() above, this unwrap is fine.
436
0
    Ok(call.unwrap())
437
0
}
438
439
0
fn await_100(
440
0
    mut call: Call<Await100>,
441
0
    connection: &mut Connection,
442
0
    timings: &mut CallTimings,
443
0
) -> Result<Await100Result, Error> {
444
0
    while call.can_keep_await_100() {
445
0
        let timeout = timings.next_timeout(Timeout::Await100);
446
447
0
        if timeout.after.is_zero() {
448
            // Stop waiting for 100-continue.
449
0
            break;
450
0
        }
451
452
0
        match connection.maybe_await_input(timeout) {
453
            Ok(_) => {
454
0
                let input = connection.buffers().input();
455
0
                if input.is_empty() {
456
0
                    return Err(Error::disconnected("await_100 empty input"));
457
0
                }
458
459
0
                let amount = call.try_read_100(input)?;
460
0
                if amount > 0 {
461
0
                    connection.consume_input(amount);
462
0
                    break;
463
0
                }
464
            }
465
            Err(Error::Timeout(_)) => {
466
                // If we get a timeout while waiting for input, that is not an error,
467
                // we progress to send the request body.
468
0
                break;
469
            }
470
0
            Err(e) => return Err(e),
471
        }
472
    }
473
474
0
    timings.record_time(Timeout::Await100);
475
476
    // A misconfigured request might surface here.
477
0
    let call = call.proceed()?;
478
479
0
    Ok(call)
480
0
}
481
482
0
fn send_body(
483
0
    mut call: Call<SendBodyState>,
484
0
    body: &mut SendBody,
485
0
    connection: &mut Connection,
486
0
    timings: &mut CallTimings,
487
0
) -> Result<Call<RecvResponse>, Error> {
488
    loop {
489
0
        if call.can_proceed() {
490
0
            break;
491
0
        }
492
493
0
        let buffers = connection.buffers();
494
495
0
        let (tmp, output) = buffers.tmp_and_output();
496
497
0
        let input_len = tmp.len();
498
499
0
        let input_fitting_in_output = call.calculate_max_input(output.len());
500
0
        let max_input = input_len.min(input_fitting_in_output);
501
502
0
        let output_used = if !call.is_chunked() {
503
            // For non-chunked, The body can be written directly to the output.
504
            // This optimizes away a memcopy if we were to go via call.write().
505
0
            let output_used = body.read(output)?;
506
507
            // Size checking is still in the call.
508
0
            call.consume_direct_write(output_used)?;
509
510
0
            output_used
511
        } else {
512
0
            let tmp = &mut tmp[..max_input];
513
0
            let n = body.read(tmp)?;
514
515
0
            let (input_used, output_used) = call.write(&tmp[..n], output)?;
516
517
            // Since output is "a bit" larger than the input (compensate for chunk ovexrhead),
518
            // the entire input we read from the body should also be shipped to the output.
519
0
            assert!(input_used == n);
520
521
0
            output_used
522
        };
523
524
0
        let timeout = timings.next_timeout(Timeout::SendBody);
525
0
        connection.transmit_output(output_used, timeout)?;
526
    }
527
528
0
    timings.record_time(Timeout::SendBody);
529
0
    Ok(call.proceed().unwrap())
530
0
}
531
532
0
fn recv_response(
533
0
    mut call: Call<RecvResponse>,
534
0
    connection: &mut Connection,
535
0
    config: &Config,
536
0
    timings: &mut CallTimings,
537
0
    is_following_redirects: bool,
538
0
) -> Result<(Response<()>, RecvResponseResult), Error> {
539
0
    let response = loop {
540
0
        let timeout = timings.next_timeout(Timeout::RecvResponse);
541
0
        let made_progress = connection.maybe_await_input(timeout)?;
542
543
0
        let input = connection.buffers().input();
544
545
        // If cookies are disabled, we can allow ourselves to follow
546
        // the redirect as soon as we discover the `Location` header.
547
        // There are plenty of broken servers out there that don't send
548
        // the finishing \r\n on redirects. With cookies off, we can
549
        // handle that situation.
550
        //
551
        // If cookies are enabled, we risk mising a `Set-Cookie` header
552
        // with such a strategy.
553
0
        let cookies_enabled = cfg!(feature = "cookies");
554
555
        // If we are not following redirects, do not allow partial returned
556
        // 302 responses.
557
0
        let allow_partial_redirect = !cookies_enabled && is_following_redirects;
558
559
0
        let (amount, maybe_response) = call.try_response(input, allow_partial_redirect)?;
560
561
0
        let check_size = if maybe_response.is_some() {
562
            // We got a parsed response, ensure the size is within
563
            // configured parameters.
564
0
            amount
565
        } else {
566
            // We did not parse a response, if input is too large,
567
            // we stop trying to get more data.
568
0
            input.len()
569
        };
570
571
0
        if check_size > config.max_response_header_size() {
572
0
            return Err(Error::LargeResponseHeader(
573
0
                input.len(),
574
0
                config.max_response_header_size(),
575
0
            ));
576
0
        }
577
578
0
        connection.consume_input(amount);
579
580
0
        if let Some(response) = maybe_response {
581
0
            assert!(call.can_proceed());
582
0
            break response;
583
0
        } else if !made_progress {
584
0
            return Err(Error::disconnected("recv_respone made no progress"));
585
0
        }
586
    };
587
588
0
    timings.record_time(Timeout::RecvResponse);
589
0
    Ok((response, call.proceed().unwrap()))
590
0
}
591
592
0
fn handle_redirect(mut call: Call<Redirect>, config: &Config) -> Result<Call<Prepare>, Error> {
593
0
    let maybe_new_call = call.as_new_call(config.redirect_auth_headers())?;
594
0
    let status = call.status();
595
596
0
    if let Some(call) = maybe_new_call {
597
0
        debug!(
598
0
            "Redirect ({}): {} {:?}",
599
            status,
600
0
            call.method(),
601
0
            DebugUri(call.uri())
602
        );
603
604
0
        Ok(call)
605
    } else {
606
0
        Err(Error::RedirectFailed)
607
    }
608
0
}
609
610
0
fn cleanup(connection: Connection, must_close: bool, now: Instant) {
611
0
    if must_close {
612
0
        connection.close();
613
0
    } else {
614
0
        connection.reuse(now)
615
    }
616
0
}
617
618
#[derive(Default)]
619
pub(crate) struct BodyHandler {
620
    call: Option<Call<RecvBody>>,
621
    connection: Option<Connection>,
622
    timings: CallTimings,
623
    remote_closed: bool,
624
    redirect: Option<Box<Call<Redirect>>>,
625
}
626
627
impl BodyHandler {
628
0
    fn do_read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
629
0
        let (Some(call), Some(connection), timings) =
630
0
            (&mut self.call, &mut self.connection, &mut self.timings)
631
        else {
632
0
            return Ok(0);
633
        };
634
635
        loop {
636
0
            let body_fulfilled = match call.body_mode() {
637
0
                BodyMode::NoBody => unreachable!("must be a BodyMode for BodyHandler"),
638
                // These modes are fulfilled by either reaching the content-length or
639
                // receiving an end chunk delimiter.
640
0
                BodyMode::LengthDelimited(_) | BodyMode::Chunked => call.can_proceed(),
641
                // This mode can only end once remote closes
642
0
                BodyMode::CloseDelimited => false,
643
            };
644
645
0
            if body_fulfilled {
646
0
                self.ended()?;
647
0
                return Ok(0);
648
0
            }
649
650
0
            let has_buffered_input = connection.buffers().can_use_input();
651
652
            // First try to use input already buffered
653
0
            if has_buffered_input {
654
0
                let input = connection.buffers().input();
655
0
                let (input_used, output_used) = call.read(input, buf)?;
656
0
                connection.consume_input(input_used);
657
658
0
                if output_used > 0 {
659
0
                    return Ok(output_used);
660
0
                }
661
662
0
                if input_used > 0 {
663
                    // Body might be fulfilled now.
664
0
                    continue;
665
0
                }
666
0
            }
667
668
0
            if self.remote_closed {
669
                // we should not try to await_input again.
670
0
                self.ended()?;
671
0
                return Ok(0);
672
0
            }
673
674
0
            let timeout = timings.next_timeout(Timeout::RecvBody);
675
676
0
            let made_progress = match connection.maybe_await_input(timeout) {
677
0
                Ok(v) => v,
678
0
                Err(Error::Io(e)) => match e.kind() {
679
                    io::ErrorKind::UnexpectedEof
680
                    | io::ErrorKind::ConnectionAborted
681
                    | io::ErrorKind::ConnectionReset => {
682
0
                        self.remote_closed = true;
683
0
                        true
684
                    }
685
0
                    _ => return Err(Error::Io(e)),
686
                },
687
0
                Err(e) => return Err(e),
688
            };
689
690
0
            let input = connection.buffers().input();
691
0
            let input_ended = input.is_empty();
692
693
0
            let (input_used, output_used) = call.read(input, buf)?;
694
0
            connection.consume_input(input_used);
695
696
0
            if output_used > 0 {
697
0
                return Ok(output_used);
698
0
            } else if input_ended {
699
0
                self.ended()?;
700
0
                return Ok(0);
701
0
            } else if made_progress {
702
                // The maybe_await_input() made progress, but handled amount is 0. This
703
                // can for instance happen if we read some data, but not enough for
704
                // decoding any gzip.
705
0
                continue;
706
            } else {
707
                // This is an error case we don't want to see.
708
0
                return Err(Error::BodyStalled);
709
            }
710
        }
711
0
    }
712
713
0
    fn ended(&mut self) -> Result<(), Error> {
714
0
        self.timings.record_time(Timeout::RecvBody);
715
716
0
        let call = self.call.take().expect("ended() called with body");
717
718
        // In some cases, when reading chunked, the server send 0\r\n to indicate
719
        // the end of the body, and then abruptly does a FIN. In these cases we have
720
        // received the entire body, but must clean up the connection.
721
0
        let is_ended_chunked = call.is_ended_chunked();
722
723
0
        let mut force_close = false;
724
725
0
        if !call.can_proceed() {
726
0
            if is_ended_chunked {
727
                // This case means we got 0\r\n, but can_proceed() is false because
728
                // it only goes true on fully received chunked bodies.
729
0
                debug!("Server ended connection after sending chunked 0\\r\\n");
730
0
                force_close = true;
731
            } else {
732
0
                return Err(Error::disconnected("ended call cannot proceed"));
733
            }
734
0
        }
735
736
0
        let must_close_connection = force_close
737
0
            || match call.proceed().unwrap() {
738
0
                RecvBodyResult::Redirect(call) => {
739
0
                    let c = call.must_close_connection();
740
0
                    self.redirect = Some(Box::new(call));
741
0
                    c
742
                }
743
0
                RecvBodyResult::Cleanup(v) => v.must_close_connection(),
744
            };
745
746
0
        let connection = self.connection.take().expect("ended() called with body");
747
0
        cleanup(connection, must_close_connection, self.timings.now());
748
749
0
        Ok(())
750
0
    }
751
752
0
    fn consume_redirect_body(&mut self) -> Result<Call<Redirect>, Error> {
753
0
        let mut buf = vec![0; 1024];
754
        loop {
755
0
            let amount = self.do_read(&mut buf)?;
756
0
            if amount == 0 {
757
0
                break;
758
0
            }
759
        }
760
761
        // Unwrap is OK, because we are only consuming the redirect body if
762
        // such a body was signalled by the remote.
763
0
        let redirect = self.redirect.take().map(|b| *b);
764
0
        Ok(redirect.expect("remote to have signaled redirect"))
765
0
    }
766
}
767
768
impl io::Read for BodyHandler {
769
0
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
770
0
        self.do_read(buf).map_err(|e| e.into_io())
771
0
    }
772
}