/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.43.0/src/runtime/coop.rs
Line | Count | Source (jump to first uncovered line) |
1 | | #![cfg_attr(not(feature = "full"), allow(dead_code))] |
2 | | |
3 | | //! Yield points for improved cooperative scheduling. |
4 | | //! |
5 | | //! Documentation for this can be found in the [`tokio::task`] module. |
6 | | //! |
7 | | //! [`tokio::task`]: crate::task. |
8 | | |
9 | | // ```ignore |
10 | | // # use tokio_stream::{Stream, StreamExt}; |
11 | | // async fn drop_all<I: Stream + Unpin>(mut input: I) { |
12 | | // while let Some(_) = input.next().await { |
13 | | // tokio::coop::proceed().await; |
14 | | // } |
15 | | // } |
16 | | // ``` |
17 | | // |
18 | | // The `proceed` future will coordinate with the executor to make sure that |
19 | | // every so often control is yielded back to the executor so it can run other |
20 | | // tasks. |
21 | | // |
22 | | // # Placing yield points |
23 | | // |
24 | | // Voluntary yield points should be placed _after_ at least some work has been |
25 | | // done. If they are not, a future sufficiently deep in the task hierarchy may |
26 | | // end up _never_ getting to run because of the number of yield points that |
27 | | // inevitably appear before it is reached. In general, you will want yield |
28 | | // points to only appear in "leaf" futures -- those that do not themselves poll |
29 | | // other futures. By doing this, you avoid double-counting each iteration of |
30 | | // the outer future against the cooperating budget. |
31 | | |
32 | | use crate::runtime::context; |
33 | | |
34 | | /// Opaque type tracking the amount of "work" a task may still do before |
35 | | /// yielding back to the scheduler. |
36 | | #[derive(Debug, Copy, Clone)] |
37 | | pub(crate) struct Budget(Option<u8>); |
38 | | |
39 | | pub(crate) struct BudgetDecrement { |
40 | | success: bool, |
41 | | hit_zero: bool, |
42 | | } |
43 | | |
44 | | impl Budget { |
45 | | /// Budget assigned to a task on each poll. |
46 | | /// |
47 | | /// The value itself is chosen somewhat arbitrarily. It needs to be high |
48 | | /// enough to amortize wakeup and scheduling costs, but low enough that we |
49 | | /// do not starve other tasks for too long. The value also needs to be high |
50 | | /// enough that particularly deep tasks are able to do at least some useful |
51 | | /// work at all. |
52 | | /// |
53 | | /// Note that as more yield points are added in the ecosystem, this value |
54 | | /// will probably also have to be raised. |
55 | 86.3k | const fn initial() -> Budget { |
56 | 86.3k | Budget(Some(128)) |
57 | 86.3k | } |
58 | | |
59 | | /// Returns an unconstrained budget. Operations will not be limited. |
60 | 91.1k | pub(super) const fn unconstrained() -> Budget { |
61 | 91.1k | Budget(None) |
62 | 91.1k | } |
63 | | |
64 | 0 | fn has_remaining(self) -> bool { |
65 | 0 | self.0.map_or(true, |budget| budget > 0) |
66 | 0 | } |
67 | | } |
68 | | |
69 | | /// Runs the given closure with a cooperative task budget. When the function |
70 | | /// returns, the budget is reset to the value prior to calling the function. |
71 | | #[inline(always)] |
72 | 86.3k | pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { |
73 | 86.3k | with_budget(Budget::initial(), f) |
74 | 86.3k | } Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}> tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}> Line | Count | Source | 72 | 4 | pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { | 73 | 4 | with_budget(Budget::initial(), f) | 74 | 4 | } |
Unexecuted instantiation: tokio::runtime::coop::budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}> Unexecuted instantiation: tokio::runtime::coop::budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}> Unexecuted instantiation: tokio::runtime::coop::budget::<(), <tokio::task::local::LocalSet>::tick::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::context::blocking::BlockingRegionGuard>::block_on_timeout<&mut tokio::sync::oneshot::Receiver<()>>::{closure#1}> tokio::runtime::coop::budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::park::CachedParkThread>::block_on<&mut tokio::sync::oneshot::Receiver<()>>::{closure#0}> Line | Count | Source | 72 | 21.4k | pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { | 73 | 21.4k | with_budget(Budget::initial(), f) | 74 | 21.4k | } |
Unexecuted instantiation: tokio::runtime::coop::budget::<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core>, ()>, <tokio::runtime::scheduler::multi_thread::worker::Context>::run_task::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}> tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}> Line | Count | Source | 72 | 43.7k | pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { | 73 | 43.7k | with_budget(Budget::initial(), f) | 74 | 43.7k | } |
Unexecuted instantiation: tokio::runtime::coop::budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}> tokio::runtime::coop::budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}> Line | Count | Source | 72 | 21.1k | pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { | 73 | 21.1k | with_budget(Budget::initial(), f) | 74 | 21.1k | } |
Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}> |
75 | | |
76 | | /// Runs the given closure with an unconstrained task budget. When the function returns, the budget |
77 | | /// is reset to the value prior to calling the function. |
78 | | #[inline(always)] |
79 | 0 | pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R { |
80 | 0 | with_budget(Budget::unconstrained(), f) |
81 | 0 | } |
82 | | |
83 | | #[inline(always)] |
84 | 86.3k | fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { |
85 | | struct ResetGuard { |
86 | | prev: Budget, |
87 | | } |
88 | | |
89 | | impl Drop for ResetGuard { |
90 | 86.3k | fn drop(&mut self) { |
91 | 86.3k | let _ = context::budget(|cell| { |
92 | 86.3k | cell.set(self.prev); |
93 | 86.3k | }); |
94 | 86.3k | } |
95 | | } |
96 | | |
97 | | #[allow(unused_variables)] |
98 | 86.3k | let maybe_guard = context::budget(|cell| { |
99 | 86.3k | let prev = cell.get(); |
100 | 86.3k | cell.set(budget); |
101 | 86.3k | |
102 | 86.3k | ResetGuard { prev } |
103 | 86.3k | }); Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}>::{closure#0} tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}>::{closure#0} Line | Count | Source | 98 | 4 | let maybe_guard = context::budget(|cell| { | 99 | 4 | let prev = cell.get(); | 100 | 4 | cell.set(budget); | 101 | 4 | | 102 | 4 | ResetGuard { prev } | 103 | 4 | }); |
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core>, ()>, <tokio::runtime::scheduler::multi_thread::worker::Context>::run_task::{closure#0}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::context::blocking::BlockingRegionGuard>::block_on_timeout<&mut tokio::sync::oneshot::Receiver<()>>::{closure#1}>::{closure#0} tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::park::CachedParkThread>::block_on<&mut tokio::sync::oneshot::Receiver<()>>::{closure#0}>::{closure#0} Line | Count | Source | 98 | 21.4k | let maybe_guard = context::budget(|cell| { | 99 | 21.4k | let prev = cell.get(); | 100 | 21.4k | cell.set(budget); | 101 | 21.4k | | 102 | 21.4k | ResetGuard { prev } | 103 | 21.4k | }); |
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::task::local::LocalSet>::tick::{closure#0}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}>::{closure#0} Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}>::{closure#0} tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}>::{closure#0} Line | Count | Source | 98 | 43.7k | let maybe_guard = context::budget(|cell| { | 99 | 43.7k | let prev = cell.get(); | 100 | 43.7k | cell.set(budget); | 101 | 43.7k | | 102 | 43.7k | ResetGuard { prev } | 103 | 43.7k | }); |
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}>::{closure#0} tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}>::{closure#0} Line | Count | Source | 98 | 21.1k | let maybe_guard = context::budget(|cell| { | 99 | 21.1k | let prev = cell.get(); | 100 | 21.1k | cell.set(budget); | 101 | 21.1k | | 102 | 21.1k | ResetGuard { prev } | 103 | 21.1k | }); |
|
104 | 86.3k | |
105 | 86.3k | // The function is called regardless even if the budget is not successfully |
106 | 86.3k | // set due to the thread-local being destroyed. |
107 | 86.3k | f() |
108 | 86.3k | } Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}> tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}> Line | Count | Source | 84 | 4 | fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { | 85 | | struct ResetGuard { | 86 | | prev: Budget, | 87 | | } | 88 | | | 89 | | impl Drop for ResetGuard { | 90 | | fn drop(&mut self) { | 91 | | let _ = context::budget(|cell| { | 92 | | cell.set(self.prev); | 93 | | }); | 94 | | } | 95 | | } | 96 | | | 97 | | #[allow(unused_variables)] | 98 | 4 | let maybe_guard = context::budget(|cell| { | 99 | | let prev = cell.get(); | 100 | | cell.set(budget); | 101 | | | 102 | | ResetGuard { prev } | 103 | 4 | }); | 104 | 4 | | 105 | 4 | // The function is called regardless even if the budget is not successfully | 106 | 4 | // set due to the thread-local being destroyed. | 107 | 4 | f() | 108 | 4 | } |
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}> Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_structured_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}> Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::task::local::LocalSet>::tick::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::context::blocking::BlockingRegionGuard>::block_on_timeout<&mut tokio::sync::oneshot::Receiver<()>>::{closure#1}> tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::result::Result<(), tokio::sync::oneshot::error::RecvError>>, <tokio::runtime::park::CachedParkThread>::block_on<&mut tokio::sync::oneshot::Receiver<()>>::{closure#0}> Line | Count | Source | 84 | 21.4k | fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { | 85 | | struct ResetGuard { | 86 | | prev: Budget, | 87 | | } | 88 | | | 89 | | impl Drop for ResetGuard { | 90 | | fn drop(&mut self) { | 91 | | let _ = context::budget(|cell| { | 92 | | cell.set(self.prev); | 93 | | }); | 94 | | } | 95 | | } | 96 | | | 97 | | #[allow(unused_variables)] | 98 | 21.4k | let maybe_guard = context::budget(|cell| { | 99 | | let prev = cell.get(); | 100 | | cell.set(budget); | 101 | | | 102 | | ResetGuard { prev } | 103 | 21.4k | }); | 104 | 21.4k | | 105 | 21.4k | // The function is called regardless even if the budget is not successfully | 106 | 21.4k | // set due to the thread-local being destroyed. | 107 | 21.4k | f() | 108 | 21.4k | } |
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core>, ()>, <tokio::runtime::scheduler::multi_thread::worker::Context>::run_task::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#0}::{closure#0}> tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#0}::{closure#0}> Line | Count | Source | 84 | 43.7k | fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { | 85 | | struct ResetGuard { | 86 | | prev: Budget, | 87 | | } | 88 | | | 89 | | impl Drop for ResetGuard { | 90 | | fn drop(&mut self) { | 91 | | let _ = context::budget(|cell| { | 92 | | cell.set(self.prev); | 93 | | }); | 94 | | } | 95 | | } | 96 | | | 97 | | #[allow(unused_variables)] | 98 | 43.7k | let maybe_guard = context::budget(|cell| { | 99 | | let prev = cell.get(); | 100 | | cell.set(budget); | 101 | | | 102 | | ResetGuard { prev } | 103 | 43.7k | }); | 104 | 43.7k | | 105 | 43.7k | // The function is called regardless even if the budget is not successfully | 106 | 43.7k | // set due to the thread-local being destroyed. | 107 | 43.7k | f() | 108 | 43.7k | } |
Unexecuted instantiation: tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>>::{closure#0}::{closure#1}> tokio::runtime::coop::with_budget::<(), <tokio::runtime::scheduler::current_thread::CoreGuard>::block_on<core::pin::Pin<&mut fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>::{closure#0}::{closure#1}> Line | Count | Source | 84 | 21.1k | fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { | 85 | | struct ResetGuard { | 86 | | prev: Budget, | 87 | | } | 88 | | | 89 | | impl Drop for ResetGuard { | 90 | | fn drop(&mut self) { | 91 | | let _ = context::budget(|cell| { | 92 | | cell.set(self.prev); | 93 | | }); | 94 | | } | 95 | | } | 96 | | | 97 | | #[allow(unused_variables)] | 98 | 21.1k | let maybe_guard = context::budget(|cell| { | 99 | | let prev = cell.get(); | 100 | | cell.set(budget); | 101 | | | 102 | | ResetGuard { prev } | 103 | 21.1k | }); | 104 | 21.1k | | 105 | 21.1k | // The function is called regardless even if the budget is not successfully | 106 | 21.1k | // set due to the thread-local being destroyed. | 107 | 21.1k | f() | 108 | 21.1k | } |
Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}::{closure#0}>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<core::option::Option<()>>, <tokio::runtime::park::CachedParkThread>::block_on<core::future::poll_fn::PollFn<<tokio::runtime::scheduler::current_thread::CurrentThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}::{closure#0}>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<core::pin::Pin<alloc::boxed::Box<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>>>::{closure#0}> Unexecuted instantiation: tokio::runtime::coop::with_budget::<core::task::poll::Poll<()>, <tokio::runtime::park::CachedParkThread>::block_on<fuzz_executor::_::__libfuzzer_sys_run::{closure#0}>::{closure#0}> |
109 | | |
110 | | #[inline(always)] |
111 | 0 | pub(crate) fn has_budget_remaining() -> bool { |
112 | 0 | // If the current budget cannot be accessed due to the thread-local being |
113 | 0 | // shutdown, then we assume there is budget remaining. |
114 | 0 | context::budget(|cell| cell.get().has_remaining()).unwrap_or(true) |
115 | 0 | } |
116 | | |
117 | | cfg_rt_multi_thread! { |
118 | | /// Sets the current task's budget. |
119 | 0 | pub(crate) fn set(budget: Budget) { |
120 | 0 | let _ = context::budget(|cell| cell.set(budget)); |
121 | 0 | } |
122 | | } |
123 | | |
124 | | cfg_rt! { |
125 | | /// Forcibly removes the budgeting constraints early. |
126 | | /// |
127 | | /// Returns the remaining budget |
128 | 1.16k | pub(crate) fn stop() -> Budget { |
129 | 1.16k | context::budget(|cell| { |
130 | 1.16k | let prev = cell.get(); |
131 | 1.16k | cell.set(Budget::unconstrained()); |
132 | 1.16k | prev |
133 | 1.16k | }).unwrap_or(Budget::unconstrained()) |
134 | 1.16k | } |
135 | | } |
136 | | |
137 | | cfg_coop! { |
138 | | use pin_project_lite::pin_project; |
139 | | use std::cell::Cell; |
140 | | use std::future::Future; |
141 | | use std::pin::Pin; |
142 | | use std::task::{ready, Context, Poll}; |
143 | | |
144 | | #[must_use] |
145 | | pub(crate) struct RestoreOnPending(Cell<Budget>); |
146 | | |
147 | | impl RestoreOnPending { |
148 | 43.8k | pub(crate) fn made_progress(&self) { |
149 | 43.8k | self.0.set(Budget::unconstrained()); |
150 | 43.8k | } |
151 | | } |
152 | | |
153 | | impl Drop for RestoreOnPending { |
154 | 90.1k | fn drop(&mut self) { |
155 | 90.1k | // Don't reset if budget was unconstrained or if we made progress. |
156 | 90.1k | // They are both represented as the remembered budget being unconstrained. |
157 | 90.1k | let budget = self.0.get(); |
158 | 90.1k | if !budget.is_unconstrained() { |
159 | 1.23k | let _ = context::budget(|cell| { |
160 | 1.23k | cell.set(budget); |
161 | 1.23k | }); |
162 | 88.8k | } |
163 | 90.1k | } |
164 | | } |
165 | | |
166 | | /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. |
167 | | /// |
168 | | /// When you call this method, the current budget is decremented. However, to ensure that |
169 | | /// progress is made every time a task is polled, the budget is automatically restored to its |
170 | | /// former value if the returned `RestoreOnPending` is dropped. It is the caller's |
171 | | /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure |
172 | | /// that the budget empties appropriately. |
173 | | /// |
174 | | /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**. |
175 | | /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and |
176 | | /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates |
177 | | /// that progress was made. |
178 | | #[inline] |
179 | 45.0k | pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> { |
180 | 45.0k | context::budget(|cell| { |
181 | 45.0k | let mut budget = cell.get(); |
182 | 45.0k | |
183 | 45.0k | let decrement = budget.decrement(); |
184 | 45.0k | |
185 | 45.0k | if decrement.success { |
186 | 45.0k | let restore = RestoreOnPending(Cell::new(cell.get())); |
187 | 45.0k | cell.set(budget); |
188 | 45.0k | |
189 | 45.0k | // avoid double counting |
190 | 45.0k | if decrement.hit_zero { |
191 | 0 | inc_budget_forced_yield_count(); |
192 | 45.0k | } |
193 | | |
194 | 45.0k | Poll::Ready(restore) |
195 | | } else { |
196 | 0 | cx.waker().wake_by_ref(); |
197 | 0 | Poll::Pending |
198 | | } |
199 | 45.0k | }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) |
200 | 45.0k | } Unexecuted instantiation: tokio::runtime::coop::poll_proceed tokio::runtime::coop::poll_proceed Line | Count | Source | 179 | 2.32k | pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> { | 180 | 2.32k | context::budget(|cell| { | 181 | | let mut budget = cell.get(); | 182 | | | 183 | | let decrement = budget.decrement(); | 184 | | | 185 | | if decrement.success { | 186 | | let restore = RestoreOnPending(Cell::new(cell.get())); | 187 | | cell.set(budget); | 188 | | | 189 | | // avoid double counting | 190 | | if decrement.hit_zero { | 191 | | inc_budget_forced_yield_count(); | 192 | | } | 193 | | | 194 | | Poll::Ready(restore) | 195 | | } else { | 196 | | cx.waker().wake_by_ref(); | 197 | | Poll::Pending | 198 | | } | 199 | 2.32k | }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) | 200 | 2.32k | } |
Unexecuted instantiation: tokio::runtime::coop::poll_proceed tokio::runtime::coop::poll_proceed Line | Count | Source | 179 | 42.7k | pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> { | 180 | 42.7k | context::budget(|cell| { | 181 | | let mut budget = cell.get(); | 182 | | | 183 | | let decrement = budget.decrement(); | 184 | | | 185 | | if decrement.success { | 186 | | let restore = RestoreOnPending(Cell::new(cell.get())); | 187 | | cell.set(budget); | 188 | | | 189 | | // avoid double counting | 190 | | if decrement.hit_zero { | 191 | | inc_budget_forced_yield_count(); | 192 | | } | 193 | | | 194 | | Poll::Ready(restore) | 195 | | } else { | 196 | | cx.waker().wake_by_ref(); | 197 | | Poll::Pending | 198 | | } | 199 | 42.7k | }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) | 200 | 42.7k | } |
|
201 | | |
202 | | cfg_rt! { |
203 | | cfg_unstable_metrics! { |
204 | | #[inline(always)] |
205 | | fn inc_budget_forced_yield_count() { |
206 | | let _ = context::with_current(|handle| { |
207 | | handle.scheduler_metrics().inc_budget_forced_yield_count(); |
208 | | }); |
209 | | } |
210 | | } |
211 | | |
212 | | cfg_not_unstable_metrics! { |
213 | | #[inline(always)] |
214 | 0 | fn inc_budget_forced_yield_count() {} |
215 | | } |
216 | | } |
217 | | |
218 | | cfg_not_rt! { |
219 | | #[inline(always)] |
220 | | fn inc_budget_forced_yield_count() {} |
221 | | } |
222 | | |
223 | | impl Budget { |
224 | | /// Decrements the budget. Returns `true` if successful. Decrementing fails |
225 | | /// when there is not enough remaining budget. |
226 | 45.0k | fn decrement(&mut self) -> BudgetDecrement { |
227 | 45.0k | if let Some(num) = &mut self.0 { |
228 | 45.0k | if *num > 0 { |
229 | 45.0k | *num -= 1; |
230 | 45.0k | |
231 | 45.0k | let hit_zero = *num == 0; |
232 | 45.0k | |
233 | 45.0k | BudgetDecrement { success: true, hit_zero } |
234 | | } else { |
235 | 0 | BudgetDecrement { success: false, hit_zero: false } |
236 | | } |
237 | | } else { |
238 | 0 | BudgetDecrement { success: true, hit_zero: false } |
239 | | } |
240 | 45.0k | } |
241 | | |
242 | 90.1k | fn is_unconstrained(self) -> bool { |
243 | 90.1k | self.0.is_none() |
244 | 90.1k | } |
245 | | } |
246 | | |
247 | | pin_project! { |
248 | | /// Future wrapper to ensure cooperative scheduling. |
249 | | /// |
250 | | /// When being polled `poll_proceed` is called before the inner future is polled to check |
251 | | /// if the inner future has exceeded its budget. If the inner future resolves, this will |
252 | | /// automatically call `RestoreOnPending::made_progress` before resolving this future with |
253 | | /// the result of the inner one. If polling the inner future is pending, polling this future |
254 | | /// type will also return a `Poll::Pending`. |
255 | | #[must_use = "futures do nothing unless polled"] |
256 | | pub(crate) struct Coop<F: Future> { |
257 | | #[pin] |
258 | | pub(crate) fut: F, |
259 | | } |
260 | | } |
261 | | |
262 | | impl<F: Future> Future for Coop<F> { |
263 | | type Output = F::Output; |
264 | | |
265 | 0 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
266 | 0 | let coop = ready!(poll_proceed(cx)); |
267 | 0 | let me = self.project(); |
268 | 0 | if let Poll::Ready(ret) = me.fut.poll(cx) { |
269 | 0 | coop.made_progress(); |
270 | 0 | Poll::Ready(ret) |
271 | | } else { |
272 | 0 | Poll::Pending |
273 | | } |
274 | 0 | } |
275 | | } |
276 | | |
277 | | /// Run a future with a budget constraint for cooperative scheduling. |
278 | | /// If the future exceeds its budget while being polled, control is yielded back to the |
279 | | /// runtime. |
280 | | #[inline] |
281 | 0 | pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> { |
282 | 0 | Coop { fut } |
283 | 0 | } |
284 | | } |
285 | | |
286 | | #[cfg(all(test, not(loom)))] |
287 | | mod test { |
288 | | use super::*; |
289 | | |
290 | | #[cfg(all(target_family = "wasm", not(target_os = "wasi")))] |
291 | | use wasm_bindgen_test::wasm_bindgen_test as test; |
292 | | |
293 | | fn get() -> Budget { |
294 | | context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained()) |
295 | | } |
296 | | |
297 | | #[test] |
298 | | fn budgeting() { |
299 | | use std::future::poll_fn; |
300 | | use tokio_test::*; |
301 | | |
302 | | assert!(get().0.is_none()); |
303 | | |
304 | | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
305 | | |
306 | | assert!(get().0.is_none()); |
307 | | drop(coop); |
308 | | assert!(get().0.is_none()); |
309 | | |
310 | | budget(|| { |
311 | | assert_eq!(get().0, Budget::initial().0); |
312 | | |
313 | | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
314 | | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
315 | | drop(coop); |
316 | | // we didn't make progress |
317 | | assert_eq!(get().0, Budget::initial().0); |
318 | | |
319 | | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
320 | | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
321 | | coop.made_progress(); |
322 | | drop(coop); |
323 | | // we _did_ make progress |
324 | | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
325 | | |
326 | | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
327 | | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
328 | | coop.made_progress(); |
329 | | drop(coop); |
330 | | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
331 | | |
332 | | budget(|| { |
333 | | assert_eq!(get().0, Budget::initial().0); |
334 | | |
335 | | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
336 | | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
337 | | coop.made_progress(); |
338 | | drop(coop); |
339 | | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
340 | | }); |
341 | | |
342 | | assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
343 | | }); |
344 | | |
345 | | assert!(get().0.is_none()); |
346 | | |
347 | | budget(|| { |
348 | | let n = get().0.unwrap(); |
349 | | |
350 | | for _ in 0..n { |
351 | | let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
352 | | coop.made_progress(); |
353 | | } |
354 | | |
355 | | let mut task = task::spawn(poll_fn(|cx| { |
356 | | let coop = std::task::ready!(poll_proceed(cx)); |
357 | | coop.made_progress(); |
358 | | Poll::Ready(()) |
359 | | })); |
360 | | |
361 | | assert_pending!(task.poll()); |
362 | | }); |
363 | | } |
364 | | } |