Coverage Report

Created: 2025-02-21 07:11

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.43.0/src/runtime/coop.rs
Line
Count
Source (jump to first uncovered line)
1
#![cfg_attr(not(feature = "full"), allow(dead_code))]
2
3
//! Yield points for improved cooperative scheduling.
4
//!
5
//! Documentation for this can be found in the [`tokio::task`] module.
6
//!
7
//! [`tokio::task`]: crate::task.
8
9
// ```ignore
10
// # use tokio_stream::{Stream, StreamExt};
11
// async fn drop_all<I: Stream + Unpin>(mut input: I) {
12
//     while let Some(_) = input.next().await {
13
//         tokio::coop::proceed().await;
14
//     }
15
// }
16
// ```
17
//
18
// The `proceed` future will coordinate with the executor to make sure that
19
// every so often control is yielded back to the executor so it can run other
20
// tasks.
21
//
22
// # Placing yield points
23
//
24
// Voluntary yield points should be placed _after_ at least some work has been
25
// done. If they are not, a future sufficiently deep in the task hierarchy may
26
// end up _never_ getting to run because of the number of yield points that
27
// inevitably appear before it is reached. In general, you will want yield
28
// points to only appear in "leaf" futures -- those that do not themselves poll
29
// other futures. By doing this, you avoid double-counting each iteration of
30
// the outer future against the cooperating budget.
31
32
use crate::runtime::context;
33
34
/// Opaque type tracking the amount of "work" a task may still do before
35
/// yielding back to the scheduler.
36
#[derive(Debug, Copy, Clone)]
37
pub(crate) struct Budget(Option<u8>);
38
39
pub(crate) struct BudgetDecrement {
40
    success: bool,
41
    hit_zero: bool,
42
}
43
44
impl Budget {
45
    /// Budget assigned to a task on each poll.
46
    ///
47
    /// The value itself is chosen somewhat arbitrarily. It needs to be high
48
    /// enough to amortize wakeup and scheduling costs, but low enough that we
49
    /// do not starve other tasks for too long. The value also needs to be high
50
    /// enough that particularly deep tasks are able to do at least some useful
51
    /// work at all.
52
    ///
53
    /// Note that as more yield points are added in the ecosystem, this value
54
    /// will probably also have to be raised.
55
86.3k
    const fn initial() -> Budget {
56
86.3k
        Budget(Some(128))
57
86.3k
    }
58
59
    /// Returns an unconstrained budget. Operations will not be limited.
60
91.1k
    pub(super) const fn unconstrained() -> Budget {
61
91.1k
        Budget(None)
62
91.1k
    }
63
64
0
    fn has_remaining(self) -> bool {
65
0
        self.0.map_or(true, |budget| budget > 0)
66
0
    }
67
}
68
69
/// Runs the given closure with a cooperative task budget. When the function
70
/// returns, the budget is reset to the value prior to calling the function.
71
#[inline(always)]
72
86.3k
pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
73
86.3k
    with_budget(Budget::initial(), f)
74
86.3k
}
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}>
tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}>
Line
Count
Source
72
4
pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
73
4
    with_budget(Budget::initial(), f)
74
4
}
Unexecuted instantiation: tokio::runtime::coop::budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}>
Unexecuted instantiation: tokio::runtime::coop::budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}>
Unexecuted instantiation: tokio::runtime::coop::budget::<(), <tokio::task::local::LocalSet>::tick::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::context::blocking::BlockingRegionGuard>::block_on_timeout<&mut tokio::sync::oneshot::Receiver<()>>::{closure#1}>
tokio::runtime::coop::budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::park::CachedParkThread>::block_on<&mut tokio::sync::oneshot::Receiver<()>>::{closure#0}>
Line
Count
Source
72
21.4k
pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
73
21.4k
    with_budget(Budget::initial(), f)
74
21.4k
}
Unexecuted instantiation: tokio::runtime::coop::budget::<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core>, ()>, <tokio::runtime::scheduler::multi_thread::worker::Context>::run_task::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}>
tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}>
Line
Count
Source
72
43.7k
pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
73
43.7k
    with_budget(Budget::initial(), f)
74
43.7k
}
Unexecuted instantiation: tokio::runtime::coop::budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}>
tokio::runtime::coop::budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}>
Line
Count
Source
72
21.1k
pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
73
21.1k
    with_budget(Budget::initial(), f)
74
21.1k
}
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}>
75
76
/// Runs the given closure with an unconstrained task budget. When the function returns, the budget
77
/// is reset to the value prior to calling the function.
78
#[inline(always)]
79
0
pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
80
0
    with_budget(Budget::unconstrained(), f)
81
0
}
82
83
#[inline(always)]
84
86.3k
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
85
    struct ResetGuard {
86
        prev: Budget,
87
    }
88
89
    impl Drop for ResetGuard {
90
86.3k
        fn drop(&mut self) {
91
86.3k
            let _ = context::budget(|cell| {
92
86.3k
                cell.set(self.prev);
93
86.3k
            });
94
86.3k
        }
95
    }
96
97
    #[allow(unused_variables)]
98
86.3k
    let maybe_guard = context::budget(|cell| {
99
86.3k
        let prev = cell.get();
100
86.3k
        cell.set(budget);
101
86.3k
102
86.3k
        ResetGuard { prev }
103
86.3k
    });
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}>::{closure#0}
tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}>::{closure#0}
Line
Count
Source
98
4
    let maybe_guard = context::budget(|cell| {
99
4
        let prev = cell.get();
100
4
        cell.set(budget);
101
4
102
4
        ResetGuard { prev }
103
4
    });
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core>, ()>, <tokio::runtime::scheduler::multi_thread::worker::Context>::run_task::{closure#0}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::context::blocking::BlockingRegionGuard>::block_on_timeout<&mut tokio::sync::oneshot::Receiver<()>>::{closure#1}>::{closure#0}
tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::park::CachedParkThread>::block_on<&mut tokio::sync::oneshot::Receiver<()>>::{closure#0}>::{closure#0}
Line
Count
Source
98
21.4k
    let maybe_guard = context::budget(|cell| {
99
21.4k
        let prev = cell.get();
100
21.4k
        cell.set(budget);
101
21.4k
102
21.4k
        ResetGuard { prev }
103
21.4k
    });
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::task::local::LocalSet>::tick::{closure#0}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}>::{closure#0}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}>::{closure#0}
tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}>::{closure#0}
Line
Count
Source
98
43.7k
    let maybe_guard = context::budget(|cell| {
99
43.7k
        let prev = cell.get();
100
43.7k
        cell.set(budget);
101
43.7k
102
43.7k
        ResetGuard { prev }
103
43.7k
    });
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}>::{closure#0}
tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}>::{closure#0}
Line
Count
Source
98
21.1k
    let maybe_guard = context::budget(|cell| {
99
21.1k
        let prev = cell.get();
100
21.1k
        cell.set(budget);
101
21.1k
102
21.1k
        ResetGuard { prev }
103
21.1k
    });
104
86.3k
105
86.3k
    // The function is called regardless even if the budget is not successfully
106
86.3k
    // set due to the thread-local being destroyed.
107
86.3k
    f()
108
86.3k
}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}>
tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}>
Line
Count
Source
84
4
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
85
    struct ResetGuard {
86
        prev: Budget,
87
    }
88
89
    impl Drop for ResetGuard {
90
        fn drop(&mut self) {
91
            let _ = context::budget(|cell| {
92
                cell.set(self.prev);
93
            });
94
        }
95
    }
96
97
    #[allow(unused_variables)]
98
4
    let maybe_guard = context::budget(|cell| {
99
        let prev = cell.get();
100
        cell.set(budget);
101
102
        ResetGuard { prev }
103
4
    });
104
4
105
4
    // The function is called regardless even if the budget is not successfully
106
4
    // set due to the thread-local being destroyed.
107
4
    f()
108
4
}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}>
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}>
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::task::local::LocalSet>::tick::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::context::blocking::BlockingRegionGuard>::block_on_timeout<&mut tokio::sync::oneshot::Receiver<()>>::{closure#1}>
tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::park::CachedParkThread>::block_on<&mut tokio::sync::oneshot::Receiver<()>>::{closure#0}>
Line
Count
Source
84
21.4k
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
85
    struct ResetGuard {
86
        prev: Budget,
87
    }
88
89
    impl Drop for ResetGuard {
90
        fn drop(&mut self) {
91
            let _ = context::budget(|cell| {
92
                cell.set(self.prev);
93
            });
94
        }
95
    }
96
97
    #[allow(unused_variables)]
98
21.4k
    let maybe_guard = context::budget(|cell| {
99
        let prev = cell.get();
100
        cell.set(budget);
101
102
        ResetGuard { prev }
103
21.4k
    });
104
21.4k
105
21.4k
    // The function is called regardless even if the budget is not successfully
106
21.4k
    // set due to the thread-local being destroyed.
107
21.4k
    f()
108
21.4k
}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core>, ()>, <tokio::runtime::scheduler::multi_thread::worker::Context>::run_task::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}>
tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}>
Line
Count
Source
84
43.7k
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
85
    struct ResetGuard {
86
        prev: Budget,
87
    }
88
89
    impl Drop for ResetGuard {
90
        fn drop(&mut self) {
91
            let _ = context::budget(|cell| {
92
                cell.set(self.prev);
93
            });
94
        }
95
    }
96
97
    #[allow(unused_variables)]
98
43.7k
    let maybe_guard = context::budget(|cell| {
99
        let prev = cell.get();
100
        cell.set(budget);
101
102
        ResetGuard { prev }
103
43.7k
    });
104
43.7k
105
43.7k
    // The function is called regardless even if the budget is not successfully
106
43.7k
    // set due to the thread-local being destroyed.
107
43.7k
    f()
108
43.7k
}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}>
tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}>
Line
Count
Source
84
21.1k
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
85
    struct ResetGuard {
86
        prev: Budget,
87
    }
88
89
    impl Drop for ResetGuard {
90
        fn drop(&mut self) {
91
            let _ = context::budget(|cell| {
92
                cell.set(self.prev);
93
            });
94
        }
95
    }
96
97
    #[allow(unused_variables)]
98
21.1k
    let maybe_guard = context::budget(|cell| {
99
        let prev = cell.get();
100
        cell.set(budget);
101
102
        ResetGuard { prev }
103
21.1k
    });
104
21.1k
105
21.1k
    // The function is called regardless even if the budget is not successfully
106
21.1k
    // set due to the thread-local being destroyed.
107
21.1k
    f()
108
21.1k
}
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}>
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}>
109
110
#[inline(always)]
111
0
pub(crate) fn has_budget_remaining() -> bool {
112
0
    // If the current budget cannot be accessed due to the thread-local being
113
0
    // shutdown, then we assume there is budget remaining.
114
0
    context::budget(|cell| cell.get().has_remaining()).unwrap_or(true)
115
0
}
116
117
cfg_rt_multi_thread! {
118
    /// Sets the current task's budget.
119
0
    pub(crate) fn set(budget: Budget) {
120
0
        let _ = context::budget(|cell| cell.set(budget));
121
0
    }
122
}
123
124
cfg_rt! {
125
    /// Forcibly removes the budgeting constraints early.
126
    ///
127
    /// Returns the remaining budget
128
1.16k
    pub(crate) fn stop() -> Budget {
129
1.16k
        context::budget(|cell| {
130
1.16k
            let prev = cell.get();
131
1.16k
            cell.set(Budget::unconstrained());
132
1.16k
            prev
133
1.16k
        }).unwrap_or(Budget::unconstrained())
134
1.16k
    }
135
}
136
137
cfg_coop! {
138
    use pin_project_lite::pin_project;
139
    use std::cell::Cell;
140
    use std::future::Future;
141
    use std::pin::Pin;
142
    use std::task::{ready, Context, Poll};
143
144
    #[must_use]
145
    pub(crate) struct RestoreOnPending(Cell<Budget>);
146
147
    impl RestoreOnPending {
148
43.8k
        pub(crate) fn made_progress(&self) {
149
43.8k
            self.0.set(Budget::unconstrained());
150
43.8k
        }
151
    }
152
153
    impl Drop for RestoreOnPending {
154
90.1k
        fn drop(&mut self) {
155
90.1k
            // Don't reset if budget was unconstrained or if we made progress.
156
90.1k
            // They are both represented as the remembered budget being unconstrained.
157
90.1k
            let budget = self.0.get();
158
90.1k
            if !budget.is_unconstrained() {
159
1.23k
                let _ = context::budget(|cell| {
160
1.23k
                    cell.set(budget);
161
1.23k
                });
162
88.8k
            }
163
90.1k
        }
164
    }
165
166
    /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
167
    ///
168
    /// When you call this method, the current budget is decremented. However, to ensure that
169
    /// progress is made every time a task is polled, the budget is automatically restored to its
170
    /// former value if the returned `RestoreOnPending` is dropped. It is the caller's
171
    /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure
172
    /// that the budget empties appropriately.
173
    ///
174
    /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
175
    /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
176
    /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
177
    /// that progress was made.
178
    #[inline]
179
45.0k
    pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
180
45.0k
        context::budget(|cell| {
181
45.0k
            let mut budget = cell.get();
182
45.0k
183
45.0k
            let decrement = budget.decrement();
184
45.0k
185
45.0k
            if decrement.success {
186
45.0k
                let restore = RestoreOnPending(Cell::new(cell.get()));
187
45.0k
                cell.set(budget);
188
45.0k
189
45.0k
                // avoid double counting
190
45.0k
                if decrement.hit_zero {
191
0
                    inc_budget_forced_yield_count();
192
45.0k
                }
193
194
45.0k
                Poll::Ready(restore)
195
            } else {
196
0
                cx.waker().wake_by_ref();
197
0
                Poll::Pending
198
            }
199
45.0k
        }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
200
45.0k
    }
Unexecuted instantiation: tokio::runtime::coop::poll_proceed
tokio::runtime::coop::poll_proceed
Line
Count
Source
179
2.32k
    pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
180
2.32k
        context::budget(|cell| {
181
            let mut budget = cell.get();
182
183
            let decrement = budget.decrement();
184
185
            if decrement.success {
186
                let restore = RestoreOnPending(Cell::new(cell.get()));
187
                cell.set(budget);
188
189
                // avoid double counting
190
                if decrement.hit_zero {
191
                    inc_budget_forced_yield_count();
192
                }
193
194
                Poll::Ready(restore)
195
            } else {
196
                cx.waker().wake_by_ref();
197
                Poll::Pending
198
            }
199
2.32k
        }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
200
2.32k
    }
Unexecuted instantiation: tokio::runtime::coop::poll_proceed
tokio::runtime::coop::poll_proceed
Line
Count
Source
179
42.7k
    pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
180
42.7k
        context::budget(|cell| {
181
            let mut budget = cell.get();
182
183
            let decrement = budget.decrement();
184
185
            if decrement.success {
186
                let restore = RestoreOnPending(Cell::new(cell.get()));
187
                cell.set(budget);
188
189
                // avoid double counting
190
                if decrement.hit_zero {
191
                    inc_budget_forced_yield_count();
192
                }
193
194
                Poll::Ready(restore)
195
            } else {
196
                cx.waker().wake_by_ref();
197
                Poll::Pending
198
            }
199
42.7k
        }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
200
42.7k
    }
201
202
    cfg_rt! {
203
        cfg_unstable_metrics! {
204
            #[inline(always)]
205
            fn inc_budget_forced_yield_count() {
206
                let _ = context::with_current(|handle| {
207
                    handle.scheduler_metrics().inc_budget_forced_yield_count();
208
                });
209
            }
210
        }
211
212
        cfg_not_unstable_metrics! {
213
            #[inline(always)]
214
0
            fn inc_budget_forced_yield_count() {}
215
        }
216
    }
217
218
    cfg_not_rt! {
219
        #[inline(always)]
220
        fn inc_budget_forced_yield_count() {}
221
    }
222
223
    impl Budget {
224
        /// Decrements the budget. Returns `true` if successful. Decrementing fails
225
        /// when there is not enough remaining budget.
226
45.0k
        fn decrement(&mut self) -> BudgetDecrement {
227
45.0k
            if let Some(num) = &mut self.0 {
228
45.0k
                if *num > 0 {
229
45.0k
                    *num -= 1;
230
45.0k
231
45.0k
                    let hit_zero = *num == 0;
232
45.0k
233
45.0k
                    BudgetDecrement { success: true, hit_zero }
234
                } else {
235
0
                    BudgetDecrement { success: false, hit_zero: false }
236
                }
237
            } else {
238
0
                BudgetDecrement { success: true, hit_zero: false }
239
            }
240
45.0k
        }
241
242
90.1k
        fn is_unconstrained(self) -> bool {
243
90.1k
            self.0.is_none()
244
90.1k
        }
245
    }
246
247
    pin_project! {
248
        /// Future wrapper to ensure cooperative scheduling.
249
        ///
250
        /// When being polled `poll_proceed` is called before the inner future is polled to check
251
        /// if the inner future has exceeded its budget. If the inner future resolves, this will
252
        /// automatically call `RestoreOnPending::made_progress` before resolving this future with
253
        /// the result of the inner one. If polling the inner future is pending, polling this future
254
        /// type will also return a `Poll::Pending`.
255
        #[must_use = "futures do nothing unless polled"]
256
        pub(crate) struct Coop<F: Future> {
257
            #[pin]
258
            pub(crate) fut: F,
259
        }
260
    }
261
262
    impl<F: Future> Future for Coop<F> {
263
        type Output = F::Output;
264
265
0
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
266
0
            let coop = ready!(poll_proceed(cx));
267
0
            let me = self.project();
268
0
            if let Poll::Ready(ret) = me.fut.poll(cx) {
269
0
                coop.made_progress();
270
0
                Poll::Ready(ret)
271
            } else {
272
0
                Poll::Pending
273
            }
274
0
        }
275
    }
276
277
    /// Run a future with a budget constraint for cooperative scheduling.
278
    /// If the future exceeds its budget while being polled, control is yielded back to the
279
    /// runtime.
280
    #[inline]
281
0
    pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> {
282
0
        Coop { fut }
283
0
    }
284
}
285
286
#[cfg(all(test, not(loom)))]
287
mod test {
288
    use super::*;
289
290
    #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
291
    use wasm_bindgen_test::wasm_bindgen_test as test;
292
293
    fn get() -> Budget {
294
        context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained())
295
    }
296
297
    #[test]
298
    fn budgeting() {
299
        use std::future::poll_fn;
300
        use tokio_test::*;
301
302
        assert!(get().0.is_none());
303
304
        let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
305
306
        assert!(get().0.is_none());
307
        drop(coop);
308
        assert!(get().0.is_none());
309
310
        budget(|| {
311
            assert_eq!(get().0, Budget::initial().0);
312
313
            let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
314
            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
315
            drop(coop);
316
            // we didn't make progress
317
            assert_eq!(get().0, Budget::initial().0);
318
319
            let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
320
            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
321
            coop.made_progress();
322
            drop(coop);
323
            // we _did_ make progress
324
            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
325
326
            let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
327
            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
328
            coop.made_progress();
329
            drop(coop);
330
            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
331
332
            budget(|| {
333
                assert_eq!(get().0, Budget::initial().0);
334
335
                let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
336
                assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
337
                coop.made_progress();
338
                drop(coop);
339
                assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
340
            });
341
342
            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
343
        });
344
345
        assert!(get().0.is_none());
346
347
        budget(|| {
348
            let n = get().0.unwrap();
349
350
            for _ in 0..n {
351
                let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
352
                coop.made_progress();
353
            }
354
355
            let mut task = task::spawn(poll_fn(|cx| {
356
                let coop = std::task::ready!(poll_proceed(cx));
357
                coop.made_progress();
358
                Poll::Ready(())
359
            }));
360
361
            assert_pending!(task.poll());
362
        });
363
    }
364
}