Coverage Report

Created: 2025-08-28 07:05

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.47.1/src/runtime/runtime.rs
Line
Count
Source (jump to first uncovered line)
1
use super::BOX_FUTURE_THRESHOLD;
2
use crate::runtime::blocking::BlockingPool;
3
use crate::runtime::scheduler::CurrentThread;
4
use crate::runtime::{context, EnterGuard, Handle};
5
use crate::task::JoinHandle;
6
use crate::util::trace::SpawnMeta;
7
8
use std::future::Future;
9
use std::mem;
10
use std::time::Duration;
11
12
cfg_rt_multi_thread! {
13
    use crate::runtime::Builder;
14
    use crate::runtime::scheduler::MultiThread;
15
}
16
17
/// The Tokio runtime.
18
///
19
/// The runtime provides an I/O driver, task scheduler, [timer], and
20
/// blocking pool, necessary for running asynchronous tasks.
21
///
22
/// Instances of `Runtime` can be created using [`new`], or [`Builder`].
23
/// However, most users will use the [`#[tokio::main]`][main] annotation on
24
/// their entry point instead.
25
///
26
/// See [module level][mod] documentation for more details.
27
///
28
/// # Shutdown
29
///
30
/// Shutting down the runtime is done by dropping the value, or calling
31
/// [`shutdown_background`] or [`shutdown_timeout`].
32
///
33
/// Tasks spawned through [`Runtime::spawn`] keep running until they yield.
34
/// Then they are dropped. They are not *guaranteed* to run to completion, but
35
/// *might* do so if they do not yield until completion.
36
///
37
/// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running
38
/// until they return.
39
///
40
/// The thread initiating the shutdown blocks until all spawned work has been
41
/// stopped. This can take an indefinite amount of time. The `Drop`
42
/// implementation waits forever for this.
43
///
44
/// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if
45
/// waiting forever is undesired. When the timeout is reached, spawned work that
46
/// did not stop in time and threads running it are leaked. The work continues
47
/// to run until one of the stopping conditions is fulfilled, but the thread
48
/// initiating the shutdown is unblocked.
49
///
50
/// Once the runtime has been dropped, any outstanding I/O resources bound to
51
/// it will no longer function. Calling any method on them will result in an
52
/// error.
53
///
54
/// # Sharing
55
///
56
/// There are several ways to establish shared access to a Tokio runtime:
57
///
58
///  * Using an <code>[Arc]\<Runtime></code>.
59
///  * Using a [`Handle`].
60
///  * Entering the runtime context.
61
///
62
/// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various
63
/// things with the runtime such as spawning new tasks or entering the runtime
64
/// context. Both types can be cloned to create a new handle that allows access
65
/// to the same runtime. By passing clones into different tasks or threads, you
66
/// will be able to access the runtime from those tasks or threads.
67
///
68
/// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that
69
/// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down,
70
/// whereas a [`Handle`] does not prevent that. This is because shutdown of the
71
/// runtime happens when the destructor of the `Runtime` object runs.
72
///
73
/// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive
74
/// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>,
75
/// this can be achieved via [`Arc::try_unwrap`] when only one strong count
76
/// reference is left over.
77
///
78
/// The runtime context is entered using the [`Runtime::enter`] or
79
/// [`Handle::enter`] methods, which use a thread-local variable to store the
80
/// current runtime. Whenever you are inside the runtime context, methods such
81
/// as [`tokio::spawn`] will use the runtime whose context you are inside.
82
///
83
/// [timer]: crate::time
84
/// [mod]: index.html
85
/// [`new`]: method@Self::new
86
/// [`Builder`]: struct@Builder
87
/// [`Handle`]: struct@Handle
88
/// [main]: macro@crate::main
89
/// [`tokio::spawn`]: crate::spawn
90
/// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap
91
/// [Arc]: std::sync::Arc
92
/// [`shutdown_background`]: method@Runtime::shutdown_background
93
/// [`shutdown_timeout`]: method@Runtime::shutdown_timeout
94
#[derive(Debug)]
95
pub struct Runtime {
96
    /// Task scheduler
97
    scheduler: Scheduler,
98
99
    /// Handle to runtime, also contains driver handles
100
    handle: Handle,
101
102
    /// Blocking pool handle, used to signal shutdown
103
    blocking_pool: BlockingPool,
104
}
105
106
/// The flavor of a `Runtime`.
107
///
108
/// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()).
109
#[derive(Debug, PartialEq, Eq)]
110
#[non_exhaustive]
111
pub enum RuntimeFlavor {
112
    /// The flavor that executes all tasks on the current thread.
113
    CurrentThread,
114
    /// The flavor that executes tasks across multiple threads.
115
    MultiThread,
116
}
117
118
/// The runtime scheduler is either a multi-thread or a current-thread executor.
119
#[derive(Debug)]
120
pub(super) enum Scheduler {
121
    /// Execute all tasks on the current-thread.
122
    CurrentThread(CurrentThread),
123
124
    /// Execute tasks across multiple threads.
125
    #[cfg(feature = "rt-multi-thread")]
126
    MultiThread(MultiThread),
127
}
128
129
impl Runtime {
130
17.3k
    pub(super) fn from_parts(
131
17.3k
        scheduler: Scheduler,
132
17.3k
        handle: Handle,
133
17.3k
        blocking_pool: BlockingPool,
134
17.3k
    ) -> Runtime {
135
17.3k
        Runtime {
136
17.3k
            scheduler,
137
17.3k
            handle,
138
17.3k
            blocking_pool,
139
17.3k
        }
140
17.3k
    }
141
142
    /// Creates a new runtime instance with default configuration values.
143
    ///
144
    /// This results in the multi threaded scheduler, I/O driver, and time driver being
145
    /// initialized.
146
    ///
147
    /// Most applications will not need to call this function directly. Instead,
148
    /// they will use the  [`#[tokio::main]` attribute][main]. When a more complex
149
    /// configuration is necessary, the [runtime builder] may be used.
150
    ///
151
    /// See [module level][mod] documentation for more details.
152
    ///
153
    /// # Examples
154
    ///
155
    /// Creating a new `Runtime` with default configuration values.
156
    ///
157
    /// ```
158
    /// use tokio::runtime::Runtime;
159
    ///
160
    /// let rt = Runtime::new()
161
    ///     .unwrap();
162
    ///
163
    /// // Use the runtime...
164
    /// ```
165
    ///
166
    /// [mod]: index.html
167
    /// [main]: ../attr.main.html
168
    /// [threaded scheduler]: index.html#threaded-scheduler
169
    /// [runtime builder]: crate::runtime::Builder
170
    #[cfg(feature = "rt-multi-thread")]
171
    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
172
17.3k
    pub fn new() -> std::io::Result<Runtime> {
173
17.3k
        Builder::new_multi_thread().enable_all().build()
174
17.3k
    }
175
176
    /// Returns a handle to the runtime's spawner.
177
    ///
178
    /// The returned handle can be used to spawn tasks that run on this runtime, and can
179
    /// be cloned to allow moving the `Handle` to other threads.
180
    ///
181
    /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone.
182
    /// Refer to the documentation of [`Handle::block_on`] for more.
183
    ///
184
    /// # Examples
185
    ///
186
    /// ```
187
    /// use tokio::runtime::Runtime;
188
    ///
189
    /// let rt = Runtime::new()
190
    ///     .unwrap();
191
    ///
192
    /// let handle = rt.handle();
193
    ///
194
    /// // Use the handle...
195
    /// ```
196
0
    pub fn handle(&self) -> &Handle {
197
0
        &self.handle
198
0
    }
199
200
    /// Spawns a future onto the Tokio runtime.
201
    ///
202
    /// This spawns the given future onto the runtime's executor, usually a
203
    /// thread pool. The thread pool is then responsible for polling the future
204
    /// until it completes.
205
    ///
206
    /// The provided future will start running in the background immediately
207
    /// when `spawn` is called, even if you don't await the returned
208
    /// `JoinHandle`.
209
    ///
210
    /// See [module level][mod] documentation for more details.
211
    ///
212
    /// [mod]: index.html
213
    ///
214
    /// # Examples
215
    ///
216
    /// ```
217
    /// use tokio::runtime::Runtime;
218
    ///
219
    /// # fn dox() {
220
    /// // Create the runtime
221
    /// let rt = Runtime::new().unwrap();
222
    ///
223
    /// // Spawn a future onto the runtime
224
    /// rt.spawn(async {
225
    ///     println!("now running on a worker thread");
226
    /// });
227
    /// # }
228
    /// ```
229
    #[track_caller]
230
0
    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
231
0
    where
232
0
        F: Future + Send + 'static,
233
0
        F::Output: Send + 'static,
234
0
    {
235
0
        let fut_size = mem::size_of::<F>();
236
0
        if fut_size > BOX_FUTURE_THRESHOLD {
237
0
            self.handle
238
0
                .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
239
        } else {
240
0
            self.handle
241
0
                .spawn_named(future, SpawnMeta::new_unnamed(fut_size))
242
        }
243
0
    }
244
245
    /// Runs the provided function on an executor dedicated to blocking operations.
246
    ///
247
    /// # Examples
248
    ///
249
    /// ```
250
    /// use tokio::runtime::Runtime;
251
    ///
252
    /// # fn dox() {
253
    /// // Create the runtime
254
    /// let rt = Runtime::new().unwrap();
255
    ///
256
    /// // Spawn a blocking function onto the runtime
257
    /// rt.spawn_blocking(|| {
258
    ///     println!("now running on a worker thread");
259
    /// });
260
    /// # }
261
    /// ```
262
    #[track_caller]
263
0
    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
264
0
    where
265
0
        F: FnOnce() -> R + Send + 'static,
266
0
        R: Send + 'static,
267
0
    {
268
0
        self.handle.spawn_blocking(func)
269
0
    }
270
271
    /// Runs a future to completion on the Tokio runtime. This is the
272
    /// runtime's entry point.
273
    ///
274
    /// This runs the given future on the current thread, blocking until it is
275
    /// complete, and yielding its resolved result. Any tasks or timers
276
    /// which the future spawns internally will be executed on the runtime.
277
    ///
278
    /// # Non-worker future
279
    ///
280
    /// Note that the future required by this function does not run as a
281
    /// worker. The expectation is that other tasks are spawned by the future here.
282
    /// Awaiting on other futures from the future provided here will not
283
    /// perform as fast as those spawned as workers.
284
    ///
285
    /// # Multi thread scheduler
286
    ///
287
    /// When the multi thread scheduler is used this will allow futures
288
    /// to run within the io driver and timer context of the overall runtime.
289
    ///
290
    /// Any spawned tasks will continue running after `block_on` returns.
291
    ///
292
    /// # Current thread scheduler
293
    ///
294
    /// When the current thread scheduler is enabled `block_on`
295
    /// can be called concurrently from multiple threads. The first call
296
    /// will take ownership of the io and timer drivers. This means
297
    /// other threads which do not own the drivers will hook into that one.
298
    /// When the first `block_on` completes, other threads will be able to
299
    /// "steal" the driver to allow continued execution of their futures.
300
    ///
301
    /// Any spawned tasks will be suspended after `block_on` returns. Calling
302
    /// `block_on` again will resume previously spawned tasks.
303
    ///
304
    /// # Panics
305
    ///
306
    /// This function panics if the provided future panics, or if called within an
307
    /// asynchronous execution context.
308
    ///
309
    /// # Examples
310
    ///
311
    /// ```no_run
312
    /// use tokio::runtime::Runtime;
313
    ///
314
    /// // Create the runtime
315
    /// let rt  = Runtime::new().unwrap();
316
    ///
317
    /// // Execute the future, blocking the current thread until completion
318
    /// rt.block_on(async {
319
    ///     println!("hello");
320
    /// });
321
    /// ```
322
    ///
323
    /// [handle]: fn@Handle::block_on
324
    #[track_caller]
325
17.3k
    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
326
17.3k
        let fut_size = mem::size_of::<F>();
327
17.3k
        if fut_size > BOX_FUTURE_THRESHOLD {
328
0
            self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
329
        } else {
330
17.3k
            self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
331
        }
332
17.3k
    }
Unexecuted instantiation: <tokio::runtime::runtime::Runtime>::block_on::<_>
<tokio::runtime::runtime::Runtime>::block_on::<fuzz_client::fuzz_entry::{closure#0}>
Line
Count
Source
325
4.86k
    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
326
4.86k
        let fut_size = mem::size_of::<F>();
327
4.86k
        if fut_size > BOX_FUTURE_THRESHOLD {
328
0
            self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
329
        } else {
330
4.86k
            self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
331
        }
332
4.86k
    }
<tokio::runtime::runtime::Runtime>::block_on::<fuzz_e2e::run::{closure#0}>
Line
Count
Source
325
12.5k
    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
326
12.5k
        let fut_size = mem::size_of::<F>();
327
12.5k
        if fut_size > BOX_FUTURE_THRESHOLD {
328
0
            self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
329
        } else {
330
12.5k
            self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
331
        }
332
12.5k
    }
333
334
    #[track_caller]
335
17.3k
    fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
336
17.3k
        #[cfg(all(
337
17.3k
            tokio_unstable,
338
17.3k
            tokio_taskdump,
339
17.3k
            feature = "rt",
340
17.3k
            target_os = "linux",
341
17.3k
            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
342
17.3k
        ))]
343
17.3k
        let future = super::task::trace::Trace::root(future);
344
17.3k
345
17.3k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
346
17.3k
        let future = crate::util::trace::task(
347
17.3k
            future,
348
17.3k
            "block_on",
349
17.3k
            _meta,
350
17.3k
            crate::runtime::task::Id::next().as_u64(),
351
17.3k
        );
352
17.3k
353
17.3k
        let _enter = self.enter();
354
17.3k
355
17.3k
        match &self.scheduler {
356
0
            Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
357
            #[cfg(feature = "rt-multi-thread")]
358
17.3k
            Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
359
        }
360
17.3k
    }
Unexecuted instantiation: <tokio::runtime::runtime::Runtime>::block_on_inner::<_>
Unexecuted instantiation: <tokio::runtime::runtime::Runtime>::block_on_inner::<core::pin::Pin<alloc::boxed::Box<fuzz_client::fuzz_entry::{closure#0}>>>
<tokio::runtime::runtime::Runtime>::block_on_inner::<fuzz_client::fuzz_entry::{closure#0}>
Line
Count
Source
335
4.86k
    fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
336
4.86k
        #[cfg(all(
337
4.86k
            tokio_unstable,
338
4.86k
            tokio_taskdump,
339
4.86k
            feature = "rt",
340
4.86k
            target_os = "linux",
341
4.86k
            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
342
4.86k
        ))]
343
4.86k
        let future = super::task::trace::Trace::root(future);
344
4.86k
345
4.86k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
346
4.86k
        let future = crate::util::trace::task(
347
4.86k
            future,
348
4.86k
            "block_on",
349
4.86k
            _meta,
350
4.86k
            crate::runtime::task::Id::next().as_u64(),
351
4.86k
        );
352
4.86k
353
4.86k
        let _enter = self.enter();
354
4.86k
355
4.86k
        match &self.scheduler {
356
0
            Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
357
            #[cfg(feature = "rt-multi-thread")]
358
4.86k
            Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
359
        }
360
4.86k
    }
Unexecuted instantiation: <tokio::runtime::runtime::Runtime>::block_on_inner::<core::pin::Pin<alloc::boxed::Box<fuzz_e2e::run::{closure#0}>>>
<tokio::runtime::runtime::Runtime>::block_on_inner::<fuzz_e2e::run::{closure#0}>
Line
Count
Source
335
12.5k
    fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
336
12.5k
        #[cfg(all(
337
12.5k
            tokio_unstable,
338
12.5k
            tokio_taskdump,
339
12.5k
            feature = "rt",
340
12.5k
            target_os = "linux",
341
12.5k
            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
342
12.5k
        ))]
343
12.5k
        let future = super::task::trace::Trace::root(future);
344
12.5k
345
12.5k
        #[cfg(all(tokio_unstable, feature = "tracing"))]
346
12.5k
        let future = crate::util::trace::task(
347
12.5k
            future,
348
12.5k
            "block_on",
349
12.5k
            _meta,
350
12.5k
            crate::runtime::task::Id::next().as_u64(),
351
12.5k
        );
352
12.5k
353
12.5k
        let _enter = self.enter();
354
12.5k
355
12.5k
        match &self.scheduler {
356
0
            Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
357
            #[cfg(feature = "rt-multi-thread")]
358
12.5k
            Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
359
        }
360
12.5k
    }
361
362
    /// Enters the runtime context.
363
    ///
364
    /// This allows you to construct types that must have an executor
365
    /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
366
    /// also allow you to call methods such as [`tokio::spawn`].
367
    ///
368
    /// [`Sleep`]: struct@crate::time::Sleep
369
    /// [`TcpStream`]: struct@crate::net::TcpStream
370
    /// [`tokio::spawn`]: fn@crate::spawn
371
    ///
372
    /// # Example
373
    ///
374
    /// ```
375
    /// use tokio::runtime::Runtime;
376
    /// use tokio::task::JoinHandle;
377
    ///
378
    /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
379
    ///     // Had we not used `rt.enter` below, this would panic.
380
    ///     tokio::spawn(async move {
381
    ///         println!("{}", msg);
382
    ///     })
383
    /// }
384
    ///
385
    /// fn main() {
386
    ///     let rt = Runtime::new().unwrap();
387
    ///
388
    ///     let s = "Hello World!".to_string();
389
    ///
390
    ///     // By entering the context, we tie `tokio::spawn` to this executor.
391
    ///     let _guard = rt.enter();
392
    ///     let handle = function_that_spawns(s);
393
    ///
394
    ///     // Wait for the task before we end the test.
395
    ///     rt.block_on(handle).unwrap();
396
    /// }
397
    /// ```
398
17.3k
    pub fn enter(&self) -> EnterGuard<'_> {
399
17.3k
        self.handle.enter()
400
17.3k
    }
401
402
    /// Shuts down the runtime, waiting for at most `duration` for all spawned
403
    /// work to stop.
404
    ///
405
    /// See the [struct level documentation](Runtime#shutdown) for more details.
406
    ///
407
    /// # Examples
408
    ///
409
    /// ```
410
    /// use tokio::runtime::Runtime;
411
    /// use tokio::task;
412
    ///
413
    /// use std::thread;
414
    /// use std::time::Duration;
415
    ///
416
    /// fn main() {
417
    /// #  if cfg!(miri) { return } // Miri reports error when main thread terminated without waiting all remaining threads.
418
    ///    let runtime = Runtime::new().unwrap();
419
    ///
420
    ///    runtime.block_on(async move {
421
    ///        task::spawn_blocking(move || {
422
    ///            thread::sleep(Duration::from_secs(10_000));
423
    ///        });
424
    ///    });
425
    ///
426
    ///    runtime.shutdown_timeout(Duration::from_millis(100));
427
    /// }
428
    /// ```
429
0
    pub fn shutdown_timeout(mut self, duration: Duration) {
430
0
        // Wakeup and shutdown all the worker threads
431
0
        self.handle.inner.shutdown();
432
0
        self.blocking_pool.shutdown(Some(duration));
433
0
    }
434
435
    /// Shuts down the runtime, without waiting for any spawned work to stop.
436
    ///
437
    /// This can be useful if you want to drop a runtime from within another runtime.
438
    /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
439
    /// to complete, which would normally not be permitted within an asynchronous context.
440
    /// By calling `shutdown_background()`, you can drop the runtime from such a context.
441
    ///
442
    /// Note however, that because we do not wait for any blocking tasks to complete, this
443
    /// may result in a resource leak (in that any blocking tasks are still running until they
444
    /// return.
445
    ///
446
    /// See the [struct level documentation](Runtime#shutdown) for more details.
447
    ///
448
    /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
449
    ///
450
    /// ```
451
    /// use tokio::runtime::Runtime;
452
    ///
453
    /// fn main() {
454
    ///    let runtime = Runtime::new().unwrap();
455
    ///
456
    ///    runtime.block_on(async move {
457
    ///        let inner_runtime = Runtime::new().unwrap();
458
    ///        // ...
459
    ///        inner_runtime.shutdown_background();
460
    ///    });
461
    /// }
462
    /// ```
463
0
    pub fn shutdown_background(self) {
464
0
        self.shutdown_timeout(Duration::from_nanos(0));
465
0
    }
466
467
    /// Returns a view that lets you get information about how the runtime
468
    /// is performing.
469
0
    pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
470
0
        self.handle.metrics()
471
0
    }
472
}
473
474
impl Drop for Runtime {
475
17.3k
    fn drop(&mut self) {
476
17.3k
        match &mut self.scheduler {
477
0
            Scheduler::CurrentThread(current_thread) => {
478
0
                // This ensures that tasks spawned on the current-thread
479
0
                // runtime are dropped inside the runtime's context.
480
0
                let _guard = context::try_set_current(&self.handle.inner);
481
0
                current_thread.shutdown(&self.handle.inner);
482
0
            }
483
            #[cfg(feature = "rt-multi-thread")]
484
17.3k
            Scheduler::MultiThread(multi_thread) => {
485
17.3k
                // The threaded scheduler drops its tasks on its worker threads, which is
486
17.3k
                // already in the runtime's context.
487
17.3k
                multi_thread.shutdown(&self.handle.inner);
488
17.3k
            }
489
        }
490
17.3k
    }
491
}
492
493
impl std::panic::UnwindSafe for Runtime {}
494
495
impl std::panic::RefUnwindSafe for Runtime {}