Coverage Report

Created: 2025-12-31 06:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.48.0/src/runtime/runtime.rs
Line
Count
Source
1
use super::BOX_FUTURE_THRESHOLD;
2
use crate::runtime::blocking::BlockingPool;
3
use crate::runtime::scheduler::CurrentThread;
4
use crate::runtime::{context, EnterGuard, Handle};
5
use crate::task::JoinHandle;
6
use crate::util::trace::SpawnMeta;
7
8
use std::future::Future;
9
use std::mem;
10
use std::time::Duration;
11
12
cfg_rt_multi_thread! {
13
    use crate::runtime::Builder;
14
    use crate::runtime::scheduler::MultiThread;
15
}
16
17
/// The Tokio runtime.
18
///
19
/// The runtime provides an I/O driver, task scheduler, [timer], and
20
/// blocking pool, necessary for running asynchronous tasks.
21
///
22
/// Instances of `Runtime` can be created using [`new`], or [`Builder`].
23
/// However, most users will use the [`#[tokio::main]`][main] annotation on
24
/// their entry point instead.
25
///
26
/// See [module level][mod] documentation for more details.
27
///
28
/// # Shutdown
29
///
30
/// Shutting down the runtime is done by dropping the value, or calling
31
/// [`shutdown_background`] or [`shutdown_timeout`].
32
///
33
/// Tasks spawned through [`Runtime::spawn`] keep running until they yield.
34
/// Then they are dropped. They are not *guaranteed* to run to completion, but
35
/// *might* do so if they do not yield until completion.
36
///
37
/// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running
38
/// until they return.
39
///
40
/// The thread initiating the shutdown blocks until all spawned work has been
41
/// stopped. This can take an indefinite amount of time. The `Drop`
42
/// implementation waits forever for this.
43
///
44
/// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if
45
/// waiting forever is undesired. When the timeout is reached, spawned work that
46
/// did not stop in time and threads running it are leaked. The work continues
47
/// to run until one of the stopping conditions is fulfilled, but the thread
48
/// initiating the shutdown is unblocked.
49
///
50
/// Once the runtime has been dropped, any outstanding I/O resources bound to
51
/// it will no longer function. Calling any method on them will result in an
52
/// error.
53
///
54
/// # Sharing
55
///
56
/// There are several ways to establish shared access to a Tokio runtime:
57
///
58
///  * Using an <code>[Arc]\<Runtime></code>.
59
///  * Using a [`Handle`].
60
///  * Entering the runtime context.
61
///
62
/// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various
63
/// things with the runtime such as spawning new tasks or entering the runtime
64
/// context. Both types can be cloned to create a new handle that allows access
65
/// to the same runtime. By passing clones into different tasks or threads, you
66
/// will be able to access the runtime from those tasks or threads.
67
///
68
/// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that
69
/// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down,
70
/// whereas a [`Handle`] does not prevent that. This is because shutdown of the
71
/// runtime happens when the destructor of the `Runtime` object runs.
72
///
73
/// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive
74
/// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>,
75
/// this can be achieved via [`Arc::try_unwrap`] when only one strong count
76
/// reference is left over.
77
///
78
/// The runtime context is entered using the [`Runtime::enter`] or
79
/// [`Handle::enter`] methods, which use a thread-local variable to store the
80
/// current runtime. Whenever you are inside the runtime context, methods such
81
/// as [`tokio::spawn`] will use the runtime whose context you are inside.
82
///
83
/// [timer]: crate::time
84
/// [mod]: index.html
85
/// [`new`]: method@Self::new
86
/// [`Builder`]: struct@Builder
87
/// [`Handle`]: struct@Handle
88
/// [main]: macro@crate::main
89
/// [`tokio::spawn`]: crate::spawn
90
/// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap
91
/// [Arc]: std::sync::Arc
92
/// [`shutdown_background`]: method@Runtime::shutdown_background
93
/// [`shutdown_timeout`]: method@Runtime::shutdown_timeout
94
#[derive(Debug)]
95
pub struct Runtime {
96
    /// Task scheduler
97
    scheduler: Scheduler,
98
99
    /// Handle to runtime, also contains driver handles
100
    handle: Handle,
101
102
    /// Blocking pool handle, used to signal shutdown
103
    blocking_pool: BlockingPool,
104
}
105
106
/// The flavor of a `Runtime`.
107
///
108
/// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()).
109
#[derive(Debug, PartialEq, Eq)]
110
#[non_exhaustive]
111
pub enum RuntimeFlavor {
112
    /// The flavor that executes all tasks on the current thread.
113
    CurrentThread,
114
    /// The flavor that executes tasks across multiple threads.
115
    MultiThread,
116
}
117
118
/// The runtime scheduler is either a multi-thread or a current-thread executor.
119
#[derive(Debug)]
120
pub(super) enum Scheduler {
121
    /// Execute all tasks on the current-thread.
122
    CurrentThread(CurrentThread),
123
124
    /// Execute tasks across multiple threads.
125
    #[cfg(feature = "rt-multi-thread")]
126
    MultiThread(MultiThread),
127
}
128
129
impl Runtime {
130
18.0k
    pub(super) fn from_parts(
131
18.0k
        scheduler: Scheduler,
132
18.0k
        handle: Handle,
133
18.0k
        blocking_pool: BlockingPool,
134
18.0k
    ) -> Runtime {
135
18.0k
        Runtime {
136
18.0k
            scheduler,
137
18.0k
            handle,
138
18.0k
            blocking_pool,
139
18.0k
        }
140
18.0k
    }
141
142
    /// Creates a new runtime instance with default configuration values.
143
    ///
144
    /// This results in the multi threaded scheduler, I/O driver, and time driver being
145
    /// initialized.
146
    ///
147
    /// Most applications will not need to call this function directly. Instead,
148
    /// they will use the  [`#[tokio::main]` attribute][main]. When a more complex
149
    /// configuration is necessary, the [runtime builder] may be used.
150
    ///
151
    /// See [module level][mod] documentation for more details.
152
    ///
153
    /// # Examples
154
    ///
155
    /// Creating a new `Runtime` with default configuration values.
156
    ///
157
    /// ```
158
    /// use tokio::runtime::Runtime;
159
    ///
160
    /// let rt = Runtime::new()
161
    ///     .unwrap();
162
    ///
163
    /// // Use the runtime...
164
    /// ```
165
    ///
166
    /// [mod]: index.html
167
    /// [main]: ../attr.main.html
168
    /// [threaded scheduler]: index.html#threaded-scheduler
169
    /// [runtime builder]: crate::runtime::Builder
170
    #[cfg(feature = "rt-multi-thread")]
171
    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
172
18.0k
    pub fn new() -> std::io::Result<Runtime> {
173
18.0k
        Builder::new_multi_thread().enable_all().build()
174
18.0k
    }
175
176
    /// Returns a handle to the runtime's spawner.
177
    ///
178
    /// The returned handle can be used to spawn tasks that run on this runtime, and can
179
    /// be cloned to allow moving the `Handle` to other threads.
180
    ///
181
    /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone.
182
    /// Refer to the documentation of [`Handle::block_on`] for more.
183
    ///
184
    /// # Examples
185
    ///
186
    /// ```
187
    /// # #[cfg(not(target_family = "wasm"))]
188
    /// # {
189
    /// use tokio::runtime::Runtime;
190
    ///
191
    /// let rt = Runtime::new()
192
    ///     .unwrap();
193
    ///
194
    /// let handle = rt.handle();
195
    ///
196
    /// // Use the handle...
197
    /// # }
198
    /// ```
199
0
    pub fn handle(&self) -> &Handle {
200
0
        &self.handle
201
0
    }
202
203
    /// Spawns a future onto the Tokio runtime.
204
    ///
205
    /// This spawns the given future onto the runtime's executor, usually a
206
    /// thread pool. The thread pool is then responsible for polling the future
207
    /// until it completes.
208
    ///
209
    /// The provided future will start running in the background immediately
210
    /// when `spawn` is called, even if you don't await the returned
211
    /// `JoinHandle`.
212
    ///
213
    /// See [module level][mod] documentation for more details.
214
    ///
215
    /// [mod]: index.html
216
    ///
217
    /// # Examples
218
    ///
219
    /// ```
220
    /// # #[cfg(not(target_family = "wasm"))]
221
    /// # {
222
    /// use tokio::runtime::Runtime;
223
    ///
224
    /// # fn dox() {
225
    /// // Create the runtime
226
    /// let rt = Runtime::new().unwrap();
227
    ///
228
    /// // Spawn a future onto the runtime
229
    /// rt.spawn(async {
230
    ///     println!("now running on a worker thread");
231
    /// });
232
    /// # }
233
    /// # }
234
    /// ```
235
    #[track_caller]
236
0
    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
237
0
    where
238
0
        F: Future + Send + 'static,
239
0
        F::Output: Send + 'static,
240
    {
241
0
        let fut_size = mem::size_of::<F>();
242
0
        if fut_size > BOX_FUTURE_THRESHOLD {
243
0
            self.handle
244
0
                .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
245
        } else {
246
0
            self.handle
247
0
                .spawn_named(future, SpawnMeta::new_unnamed(fut_size))
248
        }
249
0
    }
250
251
    /// Runs the provided function on an executor dedicated to blocking operations.
252
    ///
253
    /// # Examples
254
    ///
255
    /// ```
256
    /// # #[cfg(not(target_family = "wasm"))]
257
    /// # {
258
    /// use tokio::runtime::Runtime;
259
    ///
260
    /// # fn dox() {
261
    /// // Create the runtime
262
    /// let rt = Runtime::new().unwrap();
263
    ///
264
    /// // Spawn a blocking function onto the runtime
265
    /// rt.spawn_blocking(|| {
266
    ///     println!("now running on a worker thread");
267
    /// });
268
    /// # }
269
    /// # }
270
    /// ```
271
    #[track_caller]
272
0
    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
273
0
    where
274
0
        F: FnOnce() -> R + Send + 'static,
275
0
        R: Send + 'static,
276
    {
277
0
        self.handle.spawn_blocking(func)
278
0
    }
279
280
    /// Runs a future to completion on the Tokio runtime. This is the
281
    /// runtime's entry point.
282
    ///
283
    /// This runs the given future on the current thread, blocking until it is
284
    /// complete, and yielding its resolved result. Any tasks or timers
285
    /// which the future spawns internally will be executed on the runtime.
286
    ///
287
    /// # Non-worker future
288
    ///
289
    /// Note that the future required by this function does not run as a
290
    /// worker. The expectation is that other tasks are spawned by the future here.
291
    /// Awaiting on other futures from the future provided here will not
292
    /// perform as fast as those spawned as workers.
293
    ///
294
    /// # Multi thread scheduler
295
    ///
296
    /// When the multi thread scheduler is used this will allow futures
297
    /// to run within the io driver and timer context of the overall runtime.
298
    ///
299
    /// Any spawned tasks will continue running after `block_on` returns.
300
    ///
301
    /// # Current thread scheduler
302
    ///
303
    /// When the current thread scheduler is enabled `block_on`
304
    /// can be called concurrently from multiple threads. The first call
305
    /// will take ownership of the io and timer drivers. This means
306
    /// other threads which do not own the drivers will hook into that one.
307
    /// When the first `block_on` completes, other threads will be able to
308
    /// "steal" the driver to allow continued execution of their futures.
309
    ///
310
    /// Any spawned tasks will be suspended after `block_on` returns. Calling
311
    /// `block_on` again will resume previously spawned tasks.
312
    ///
313
    /// # Panics
314
    ///
315
    /// This function panics if the provided future panics, or if called within an
316
    /// asynchronous execution context.
317
    ///
318
    /// # Examples
319
    ///
320
    /// ```no_run
321
    /// # #[cfg(not(target_family = "wasm"))]
322
    /// # {
323
    /// use tokio::runtime::Runtime;
324
    ///
325
    /// // Create the runtime
326
    /// let rt  = Runtime::new().unwrap();
327
    ///
328
    /// // Execute the future, blocking the current thread until completion
329
    /// rt.block_on(async {
330
    ///     println!("hello");
331
    /// });
332
    /// # }
333
    /// ```
334
    ///
335
    /// [handle]: fn@Handle::block_on
336
    #[track_caller]
337
18.0k
    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
338
18.0k
        let fut_size = mem::size_of::<F>();
339
18.0k
        if fut_size > BOX_FUTURE_THRESHOLD {
340
0
            self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
341
        } else {
342
18.0k
            self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
343
        }
344
18.0k
    }
Unexecuted instantiation: <tokio::runtime::runtime::Runtime>::block_on::<_>
<tokio::runtime::runtime::Runtime>::block_on::<fuzz_client::fuzz_entry::{closure#0}>
Line
Count
Source
337
5.05k
    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
338
5.05k
        let fut_size = mem::size_of::<F>();
339
5.05k
        if fut_size > BOX_FUTURE_THRESHOLD {
340
0
            self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
341
        } else {
342
5.05k
            self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
343
        }
344
5.05k
    }
<tokio::runtime::runtime::Runtime>::block_on::<fuzz_e2e::run::{closure#0}>
Line
Count
Source
337
13.0k
    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
338
13.0k
        let fut_size = mem::size_of::<F>();
339
13.0k
        if fut_size > BOX_FUTURE_THRESHOLD {
340
0
            self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
341
        } else {
342
13.0k
            self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
343
        }
344
13.0k
    }
345
346
    #[track_caller]
347
18.0k
    fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
348
        #[cfg(all(
349
            tokio_unstable,
350
            feature = "taskdump",
351
            feature = "rt",
352
            target_os = "linux",
353
            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
354
        ))]
355
        let future = super::task::trace::Trace::root(future);
356
357
        #[cfg(all(tokio_unstable, feature = "tracing"))]
358
        let future = crate::util::trace::task(
359
            future,
360
            "block_on",
361
            _meta,
362
            crate::runtime::task::Id::next().as_u64(),
363
        );
364
365
18.0k
        let _enter = self.enter();
366
367
18.0k
        match &self.scheduler {
368
0
            Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
369
            #[cfg(feature = "rt-multi-thread")]
370
18.0k
            Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
371
        }
372
18.0k
    }
Unexecuted instantiation: <tokio::runtime::runtime::Runtime>::block_on_inner::<_>
Unexecuted instantiation: <tokio::runtime::runtime::Runtime>::block_on_inner::<core::pin::Pin<alloc::boxed::Box<fuzz_client::fuzz_entry::{closure#0}>>>
<tokio::runtime::runtime::Runtime>::block_on_inner::<fuzz_client::fuzz_entry::{closure#0}>
Line
Count
Source
347
5.05k
    fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
348
        #[cfg(all(
349
            tokio_unstable,
350
            feature = "taskdump",
351
            feature = "rt",
352
            target_os = "linux",
353
            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
354
        ))]
355
        let future = super::task::trace::Trace::root(future);
356
357
        #[cfg(all(tokio_unstable, feature = "tracing"))]
358
        let future = crate::util::trace::task(
359
            future,
360
            "block_on",
361
            _meta,
362
            crate::runtime::task::Id::next().as_u64(),
363
        );
364
365
5.05k
        let _enter = self.enter();
366
367
5.05k
        match &self.scheduler {
368
0
            Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
369
            #[cfg(feature = "rt-multi-thread")]
370
5.05k
            Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
371
        }
372
5.05k
    }
Unexecuted instantiation: <tokio::runtime::runtime::Runtime>::block_on_inner::<core::pin::Pin<alloc::boxed::Box<fuzz_e2e::run::{closure#0}>>>
<tokio::runtime::runtime::Runtime>::block_on_inner::<fuzz_e2e::run::{closure#0}>
Line
Count
Source
347
13.0k
    fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
348
        #[cfg(all(
349
            tokio_unstable,
350
            feature = "taskdump",
351
            feature = "rt",
352
            target_os = "linux",
353
            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
354
        ))]
355
        let future = super::task::trace::Trace::root(future);
356
357
        #[cfg(all(tokio_unstable, feature = "tracing"))]
358
        let future = crate::util::trace::task(
359
            future,
360
            "block_on",
361
            _meta,
362
            crate::runtime::task::Id::next().as_u64(),
363
        );
364
365
13.0k
        let _enter = self.enter();
366
367
13.0k
        match &self.scheduler {
368
0
            Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
369
            #[cfg(feature = "rt-multi-thread")]
370
13.0k
            Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
371
        }
372
13.0k
    }
373
374
    /// Enters the runtime context.
375
    ///
376
    /// This allows you to construct types that must have an executor
377
    /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
378
    /// also allow you to call methods such as [`tokio::spawn`].
379
    ///
380
    /// [`Sleep`]: struct@crate::time::Sleep
381
    /// [`TcpStream`]: struct@crate::net::TcpStream
382
    /// [`tokio::spawn`]: fn@crate::spawn
383
    ///
384
    /// # Example
385
    ///
386
    /// ```
387
    /// # #[cfg(not(target_family = "wasm"))]
388
    /// # {
389
    /// use tokio::runtime::Runtime;
390
    /// use tokio::task::JoinHandle;
391
    ///
392
    /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
393
    ///     // Had we not used `rt.enter` below, this would panic.
394
    ///     tokio::spawn(async move {
395
    ///         println!("{}", msg);
396
    ///     })
397
    /// }
398
    ///
399
    /// fn main() {
400
    ///     let rt = Runtime::new().unwrap();
401
    ///
402
    ///     let s = "Hello World!".to_string();
403
    ///
404
    ///     // By entering the context, we tie `tokio::spawn` to this executor.
405
    ///     let _guard = rt.enter();
406
    ///     let handle = function_that_spawns(s);
407
    ///
408
    ///     // Wait for the task before we end the test.
409
    ///     rt.block_on(handle).unwrap();
410
    /// }
411
    /// # }
412
    /// ```
413
18.0k
    pub fn enter(&self) -> EnterGuard<'_> {
414
18.0k
        self.handle.enter()
415
18.0k
    }
416
417
    /// Shuts down the runtime, waiting for at most `duration` for all spawned
418
    /// work to stop.
419
    ///
420
    /// See the [struct level documentation](Runtime#shutdown) for more details.
421
    ///
422
    /// # Examples
423
    ///
424
    /// ```
425
    /// # #[cfg(not(target_family = "wasm"))]
426
    /// # {
427
    /// use tokio::runtime::Runtime;
428
    /// use tokio::task;
429
    ///
430
    /// use std::thread;
431
    /// use std::time::Duration;
432
    ///
433
    /// fn main() {
434
    /// #  if cfg!(miri) { return } // Miri reports error when main thread terminated without waiting all remaining threads.
435
    ///    let runtime = Runtime::new().unwrap();
436
    ///
437
    ///    runtime.block_on(async move {
438
    ///        task::spawn_blocking(move || {
439
    ///            thread::sleep(Duration::from_secs(10_000));
440
    ///        });
441
    ///    });
442
    ///
443
    ///    runtime.shutdown_timeout(Duration::from_millis(100));
444
    /// }
445
    /// # }
446
    /// ```
447
0
    pub fn shutdown_timeout(mut self, duration: Duration) {
448
        // Wakeup and shutdown all the worker threads
449
0
        self.handle.inner.shutdown();
450
0
        self.blocking_pool.shutdown(Some(duration));
451
0
    }
452
453
    /// Shuts down the runtime, without waiting for any spawned work to stop.
454
    ///
455
    /// This can be useful if you want to drop a runtime from within another runtime.
456
    /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
457
    /// to complete, which would normally not be permitted within an asynchronous context.
458
    /// By calling `shutdown_background()`, you can drop the runtime from such a context.
459
    ///
460
    /// Note however, that because we do not wait for any blocking tasks to complete, this
461
    /// may result in a resource leak (in that any blocking tasks are still running until they
462
    /// return.
463
    ///
464
    /// See the [struct level documentation](Runtime#shutdown) for more details.
465
    ///
466
    /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
467
    ///
468
    /// ```
469
    /// # #[cfg(not(target_family = "wasm"))]
470
    /// # {
471
    /// use tokio::runtime::Runtime;
472
    ///
473
    /// fn main() {
474
    ///    let runtime = Runtime::new().unwrap();
475
    ///
476
    ///    runtime.block_on(async move {
477
    ///        let inner_runtime = Runtime::new().unwrap();
478
    ///        // ...
479
    ///        inner_runtime.shutdown_background();
480
    ///    });
481
    /// }
482
    /// # }
483
    /// ```
484
0
    pub fn shutdown_background(self) {
485
0
        self.shutdown_timeout(Duration::from_nanos(0));
486
0
    }
487
488
    /// Returns a view that lets you get information about how the runtime
489
    /// is performing.
490
0
    pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
491
0
        self.handle.metrics()
492
0
    }
493
}
494
495
impl Drop for Runtime {
496
18.0k
    fn drop(&mut self) {
497
18.0k
        match &mut self.scheduler {
498
0
            Scheduler::CurrentThread(current_thread) => {
499
0
                // This ensures that tasks spawned on the current-thread
500
0
                // runtime are dropped inside the runtime's context.
501
0
                let _guard = context::try_set_current(&self.handle.inner);
502
0
                current_thread.shutdown(&self.handle.inner);
503
0
            }
504
            #[cfg(feature = "rt-multi-thread")]
505
18.0k
            Scheduler::MultiThread(multi_thread) => {
506
18.0k
                // The threaded scheduler drops its tasks on its worker threads, which is
507
18.0k
                // already in the runtime's context.
508
18.0k
                multi_thread.shutdown(&self.handle.inner);
509
18.0k
            }
510
        }
511
18.0k
    }
512
}
513
514
impl std::panic::UnwindSafe for Runtime {}
515
516
impl std::panic::RefUnwindSafe for Runtime {}