Coverage Report

Created: 2026-02-14 06:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.49.0/src/runtime/builder.rs
Line
Count
Source
1
#![cfg_attr(loom, allow(unused_imports))]
2
3
use crate::runtime::handle::Handle;
4
use crate::runtime::{
5
    blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor,
6
};
7
#[cfg(tokio_unstable)]
8
use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
9
use crate::util::rand::{RngSeed, RngSeedGenerator};
10
11
use crate::runtime::blocking::BlockingPool;
12
use crate::runtime::scheduler::CurrentThread;
13
use std::fmt;
14
use std::io;
15
use std::thread::ThreadId;
16
use std::time::Duration;
17
18
/// Builds Tokio Runtime with custom configuration values.
19
///
20
/// Methods can be chained in order to set the configuration values. The
21
/// Runtime is constructed by calling [`build`].
22
///
23
/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
24
/// or [`Builder::new_current_thread`].
25
///
26
/// See function level documentation for details on the various configuration
27
/// settings.
28
///
29
/// [`build`]: method@Self::build
30
/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
31
/// [`Builder::new_current_thread`]: method@Self::new_current_thread
32
///
33
/// # Examples
34
///
35
/// ```
36
/// # #[cfg(not(target_family = "wasm"))]
37
/// # {
38
/// use tokio::runtime::Builder;
39
///
40
/// fn main() {
41
///     // build runtime
42
///     let runtime = Builder::new_multi_thread()
43
///         .worker_threads(4)
44
///         .thread_name("my-custom-name")
45
///         .thread_stack_size(3 * 1024 * 1024)
46
///         .build()
47
///         .unwrap();
48
///
49
///     // use runtime ...
50
/// }
51
/// # }
52
/// ```
53
pub struct Builder {
54
    /// Runtime type
55
    kind: Kind,
56
57
    /// Whether or not to enable the I/O driver
58
    enable_io: bool,
59
    nevents: usize,
60
61
    /// Whether or not to enable the time driver
62
    enable_time: bool,
63
64
    /// Whether or not the clock should start paused.
65
    start_paused: bool,
66
67
    /// The number of worker threads, used by Runtime.
68
    ///
69
    /// Only used when not using the current-thread executor.
70
    worker_threads: Option<usize>,
71
72
    /// Cap on thread usage.
73
    max_blocking_threads: usize,
74
75
    /// Name fn used for threads spawned by the runtime.
76
    pub(super) thread_name: ThreadNameFn,
77
78
    /// Stack size used for threads spawned by the runtime.
79
    pub(super) thread_stack_size: Option<usize>,
80
81
    /// Callback to run after each thread starts.
82
    pub(super) after_start: Option<Callback>,
83
84
    /// To run before each worker thread stops
85
    pub(super) before_stop: Option<Callback>,
86
87
    /// To run before each worker thread is parked.
88
    pub(super) before_park: Option<Callback>,
89
90
    /// To run after each thread is unparked.
91
    pub(super) after_unpark: Option<Callback>,
92
93
    /// To run before each task is spawned.
94
    pub(super) before_spawn: Option<TaskCallback>,
95
96
    /// To run before each poll
97
    #[cfg(tokio_unstable)]
98
    pub(super) before_poll: Option<TaskCallback>,
99
100
    /// To run after each poll
101
    #[cfg(tokio_unstable)]
102
    pub(super) after_poll: Option<TaskCallback>,
103
104
    /// To run after each task is terminated.
105
    pub(super) after_termination: Option<TaskCallback>,
106
107
    /// Customizable keep alive timeout for `BlockingPool`
108
    pub(super) keep_alive: Option<Duration>,
109
110
    /// How many ticks before pulling a task from the global/remote queue?
111
    ///
112
    /// When `None`, the value is unspecified and behavior details are left to
113
    /// the scheduler. Each scheduler flavor could choose to either pick its own
114
    /// default value or use some other strategy to decide when to poll from the
115
    /// global queue. For example, the multi-threaded scheduler uses a
116
    /// self-tuning strategy based on mean task poll times.
117
    pub(super) global_queue_interval: Option<u32>,
118
119
    /// How many ticks before yielding to the driver for timer and I/O events?
120
    pub(super) event_interval: u32,
121
122
    /// When true, the multi-threade scheduler LIFO slot should not be used.
123
    ///
124
    /// This option should only be exposed as unstable.
125
    pub(super) disable_lifo_slot: bool,
126
127
    /// Specify a random number generator seed to provide deterministic results
128
    pub(super) seed_generator: RngSeedGenerator,
129
130
    /// When true, enables task poll count histogram instrumentation.
131
    pub(super) metrics_poll_count_histogram_enable: bool,
132
133
    /// Configures the task poll count histogram
134
    pub(super) metrics_poll_count_histogram: HistogramBuilder,
135
136
    #[cfg(tokio_unstable)]
137
    pub(super) unhandled_panic: UnhandledPanic,
138
139
    timer_flavor: TimerFlavor,
140
}
141
142
cfg_unstable! {
143
    /// How the runtime should respond to unhandled panics.
144
    ///
145
    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
146
    /// to configure the runtime behavior when a spawned task panics.
147
    ///
148
    /// See [`Builder::unhandled_panic`] for more details.
149
    #[derive(Debug, Clone)]
150
    #[non_exhaustive]
151
    pub enum UnhandledPanic {
152
        /// The runtime should ignore panics on spawned tasks.
153
        ///
154
        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
155
        /// tasks continue running normally.
156
        ///
157
        /// This is the default behavior.
158
        ///
159
        /// # Examples
160
        ///
161
        /// ```
162
        /// # #[cfg(not(target_family = "wasm"))]
163
        /// # {
164
        /// use tokio::runtime::{self, UnhandledPanic};
165
        ///
166
        /// # pub fn main() {
167
        /// let rt = runtime::Builder::new_current_thread()
168
        ///     .unhandled_panic(UnhandledPanic::Ignore)
169
        ///     .build()
170
        ///     .unwrap();
171
        ///
172
        /// let task1 = rt.spawn(async { panic!("boom"); });
173
        /// let task2 = rt.spawn(async {
174
        ///     // This task completes normally
175
        ///     "done"
176
        /// });
177
        ///
178
        /// rt.block_on(async {
179
        ///     // The panic on the first task is forwarded to the `JoinHandle`
180
        ///     assert!(task1.await.is_err());
181
        ///
182
        ///     // The second task completes normally
183
        ///     assert!(task2.await.is_ok());
184
        /// })
185
        /// # }
186
        /// # }
187
        /// ```
188
        ///
189
        /// [`JoinHandle`]: struct@crate::task::JoinHandle
190
        Ignore,
191
192
        /// The runtime should immediately shutdown if a spawned task panics.
193
        ///
194
        /// The runtime will immediately shutdown even if the panicked task's
195
        /// [`JoinHandle`] is still available. All further spawned tasks will be
196
        /// immediately dropped and call to [`Runtime::block_on`] will panic.
197
        ///
198
        /// # Examples
199
        ///
200
        /// ```should_panic
201
        /// use tokio::runtime::{self, UnhandledPanic};
202
        ///
203
        /// # pub fn main() {
204
        /// let rt = runtime::Builder::new_current_thread()
205
        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
206
        ///     .build()
207
        ///     .unwrap();
208
        ///
209
        /// rt.spawn(async { panic!("boom"); });
210
        /// rt.spawn(async {
211
        ///     // This task never completes.
212
        /// });
213
        ///
214
        /// rt.block_on(async {
215
        ///     // Do some work
216
        /// # loop { tokio::task::yield_now().await; }
217
        /// })
218
        /// # }
219
        /// ```
220
        ///
221
        /// [`JoinHandle`]: struct@crate::task::JoinHandle
222
        ShutdownRuntime,
223
    }
224
}
225
226
pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
227
228
#[derive(Clone, Copy)]
229
pub(crate) enum Kind {
230
    CurrentThread,
231
    #[cfg(feature = "rt-multi-thread")]
232
    MultiThread,
233
}
234
235
impl Builder {
236
    /// Returns a new builder with the current thread scheduler selected.
237
    ///
238
    /// Configuration methods can be chained on the return value.
239
    ///
240
    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
241
    /// [`LocalSet`], or call [`build_local`] to create a [`LocalRuntime`] (unstable).
242
    ///
243
    /// [`LocalSet`]: crate::task::LocalSet
244
    /// [`LocalRuntime`]: crate::runtime::LocalRuntime
245
    /// [`build_local`]: crate::runtime::Builder::build_local
246
0
    pub fn new_current_thread() -> Builder {
247
        #[cfg(loom)]
248
        const EVENT_INTERVAL: u32 = 4;
249
        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
250
        #[cfg(not(loom))]
251
        const EVENT_INTERVAL: u32 = 61;
252
253
0
        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
254
0
    }
255
256
    /// Returns a new builder with the multi thread scheduler selected.
257
    ///
258
    /// Configuration methods can be chained on the return value.
259
    #[cfg(feature = "rt-multi-thread")]
260
    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
261
    pub fn new_multi_thread() -> Builder {
262
        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
263
        Builder::new(Kind::MultiThread, 61)
264
    }
265
266
    /// Returns a new runtime builder initialized with default configuration
267
    /// values.
268
    ///
269
    /// Configuration methods can be chained on the return value.
270
0
    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
271
        Builder {
272
0
            kind,
273
274
            // I/O defaults to "off"
275
            enable_io: false,
276
            nevents: 1024,
277
278
            // Time defaults to "off"
279
            enable_time: false,
280
281
            // The clock starts not-paused
282
            start_paused: false,
283
284
            // Read from environment variable first in multi-threaded mode.
285
            // Default to lazy auto-detection (one thread per CPU core)
286
0
            worker_threads: None,
287
288
            max_blocking_threads: 512,
289
290
            // Default thread name
291
0
            thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
292
293
            // Do not set a stack size by default
294
0
            thread_stack_size: None,
295
296
            // No worker thread callbacks
297
0
            after_start: None,
298
0
            before_stop: None,
299
0
            before_park: None,
300
0
            after_unpark: None,
301
302
0
            before_spawn: None,
303
0
            after_termination: None,
304
305
            #[cfg(tokio_unstable)]
306
            before_poll: None,
307
            #[cfg(tokio_unstable)]
308
            after_poll: None,
309
310
0
            keep_alive: None,
311
312
            // Defaults for these values depend on the scheduler kind, so we get them
313
            // as parameters.
314
0
            global_queue_interval: None,
315
0
            event_interval,
316
317
0
            seed_generator: RngSeedGenerator::new(RngSeed::new()),
318
319
            #[cfg(tokio_unstable)]
320
            unhandled_panic: UnhandledPanic::Ignore,
321
322
            metrics_poll_count_histogram_enable: false,
323
324
0
            metrics_poll_count_histogram: HistogramBuilder::default(),
325
326
            disable_lifo_slot: false,
327
328
0
            timer_flavor: TimerFlavor::Traditional,
329
        }
330
0
    }
331
332
    /// Enables both I/O and time drivers.
333
    ///
334
    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
335
    /// individually. If additional components are added to Tokio in the future,
336
    /// `enable_all` will include these future components.
337
    ///
338
    /// # Examples
339
    ///
340
    /// ```
341
    /// # #[cfg(not(target_family = "wasm"))]
342
    /// # {
343
    /// use tokio::runtime;
344
    ///
345
    /// let rt = runtime::Builder::new_multi_thread()
346
    ///     .enable_all()
347
    ///     .build()
348
    ///     .unwrap();
349
    /// # }
350
    /// ```
351
0
    pub fn enable_all(&mut self) -> &mut Self {
352
        #[cfg(any(
353
            feature = "net",
354
            all(unix, feature = "process"),
355
            all(unix, feature = "signal")
356
        ))]
357
        self.enable_io();
358
359
        #[cfg(all(
360
            tokio_unstable,
361
            feature = "io-uring",
362
            feature = "rt",
363
            feature = "fs",
364
            target_os = "linux",
365
        ))]
366
        self.enable_io_uring();
367
368
        #[cfg(feature = "time")]
369
0
        self.enable_time();
370
371
0
        self
372
0
    }
373
374
    /// Enables the alternative timer implementation, which is disabled by default.
375
    ///
376
    /// The alternative timer implementation is an unstable feature that may
377
    /// provide better performance on multi-threaded runtimes with a large number
378
    /// of worker threads.
379
    ///
380
    /// This option only applies to multi-threaded runtimes. Attempting to use
381
    /// this option with any other runtime type will have no effect.
382
    ///
383
    /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745)
384
    ///
385
    /// # Examples
386
    ///
387
    /// ```
388
    /// # #[cfg(not(target_family = "wasm"))]
389
    /// # {
390
    /// use tokio::runtime;
391
    ///
392
    /// let rt = runtime::Builder::new_multi_thread()
393
    ///   .enable_alt_timer()
394
    ///   .build()
395
    ///   .unwrap();
396
    /// # }
397
    /// ```
398
    #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
399
    #[cfg_attr(
400
        docsrs,
401
        doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread")))
402
    )]
403
    pub fn enable_alt_timer(&mut self) -> &mut Self {
404
        self.enable_time();
405
        self.timer_flavor = TimerFlavor::Alternative;
406
        self
407
    }
408
409
    /// Sets the number of worker threads the `Runtime` will use.
410
    ///
411
    /// This can be any number above 0 though it is advised to keep this value
412
    /// on the smaller side.
413
    ///
414
    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
415
    ///
416
    /// # Default
417
    ///
418
    /// The default value is the number of cores available to the system.
419
    ///
420
    /// When using the `current_thread` runtime this method has no effect.
421
    ///
422
    /// # Examples
423
    ///
424
    /// ## Multi threaded runtime with 4 threads
425
    ///
426
    /// ```
427
    /// # #[cfg(not(target_family = "wasm"))]
428
    /// # {
429
    /// use tokio::runtime;
430
    ///
431
    /// // This will spawn a work-stealing runtime with 4 worker threads.
432
    /// let rt = runtime::Builder::new_multi_thread()
433
    ///     .worker_threads(4)
434
    ///     .build()
435
    ///     .unwrap();
436
    ///
437
    /// rt.spawn(async move {});
438
    /// # }
439
    /// ```
440
    ///
441
    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
442
    ///
443
    /// ```
444
    /// use tokio::runtime;
445
    ///
446
    /// // Create a runtime that _must_ be driven from a call
447
    /// // to `Runtime::block_on`.
448
    /// let rt = runtime::Builder::new_current_thread()
449
    ///     .build()
450
    ///     .unwrap();
451
    ///
452
    /// // This will run the runtime and future on the current thread
453
    /// rt.block_on(async move {});
454
    /// ```
455
    ///
456
    /// # Panics
457
    ///
458
    /// This will panic if `val` is not larger than `0`.
459
    #[track_caller]
460
0
    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
461
0
        assert!(val > 0, "Worker threads cannot be set to 0");
462
0
        self.worker_threads = Some(val);
463
0
        self
464
0
    }
465
466
    /// Specifies the limit for additional threads spawned by the Runtime.
467
    ///
468
    /// These threads are used for blocking operations like tasks spawned
469
    /// through [`spawn_blocking`], this includes but is not limited to:
470
    /// - [`fs`] operations
471
    /// - dns resolution through [`ToSocketAddrs`]
472
    /// - writing to [`Stdout`] or [`Stderr`]
473
    /// - reading from [`Stdin`]
474
    ///
475
    /// Unlike the [`worker_threads`], they are not always active and will exit
476
    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
477
    ///
478
    /// It's recommended to not set this limit too low in order to avoid hanging on operations
479
    /// requiring [`spawn_blocking`].
480
    ///
481
    /// The default value is 512.
482
    ///
483
    /// # Queue Behavior
484
    ///
485
    /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
486
    /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
487
    /// method has not been reached, a new thread will be spawned. If no idle thread is available
488
    /// and no more threads are allowed to be spawned, the task will remain in the queue until one
489
    /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
490
    /// it could potentially grow unbounded.
491
    ///
492
    /// # Panics
493
    ///
494
    /// This will panic if `val` is not larger than `0`.
495
    ///
496
    /// # Upgrading from 0.x
497
    ///
498
    /// In old versions `max_threads` limited both blocking and worker threads, but the
499
    /// current `max_blocking_threads` does not include async worker threads in the count.
500
    ///
501
    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
502
    /// [`fs`]: mod@crate::fs
503
    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
504
    /// [`Stdout`]: struct@crate::io::Stdout
505
    /// [`Stdin`]: struct@crate::io::Stdin
506
    /// [`Stderr`]: struct@crate::io::Stderr
507
    /// [`worker_threads`]: Self::worker_threads
508
    /// [`thread_keep_alive`]: Self::thread_keep_alive
509
    #[track_caller]
510
    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
511
0
    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
512
0
        assert!(val > 0, "Max blocking threads cannot be set to 0");
513
0
        self.max_blocking_threads = val;
514
0
        self
515
0
    }
516
517
    /// Sets name of threads spawned by the `Runtime`'s thread pool.
518
    ///
519
    /// The default name is "tokio-runtime-worker".
520
    ///
521
    /// # Examples
522
    ///
523
    /// ```
524
    /// # #[cfg(not(target_family = "wasm"))]
525
    /// # {
526
    /// # use tokio::runtime;
527
    ///
528
    /// # pub fn main() {
529
    /// let rt = runtime::Builder::new_multi_thread()
530
    ///     .thread_name("my-pool")
531
    ///     .build();
532
    /// # }
533
    /// # }
534
    /// ```
535
0
    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
536
0
        let val = val.into();
537
0
        self.thread_name = std::sync::Arc::new(move || val.clone());
538
0
        self
539
0
    }
540
541
    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
542
    ///
543
    /// The default name fn is `|| "tokio-runtime-worker".into()`.
544
    ///
545
    /// # Examples
546
    ///
547
    /// ```
548
    /// # #[cfg(not(target_family = "wasm"))]
549
    /// # {
550
    /// # use tokio::runtime;
551
    /// # use std::sync::atomic::{AtomicUsize, Ordering};
552
    /// # pub fn main() {
553
    /// let rt = runtime::Builder::new_multi_thread()
554
    ///     .thread_name_fn(|| {
555
    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
556
    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
557
    ///        format!("my-pool-{}", id)
558
    ///     })
559
    ///     .build();
560
    /// # }
561
    /// # }
562
    /// ```
563
0
    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
564
0
    where
565
0
        F: Fn() -> String + Send + Sync + 'static,
566
    {
567
0
        self.thread_name = std::sync::Arc::new(f);
568
0
        self
569
0
    }
570
571
    /// Sets the stack size (in bytes) for worker threads.
572
    ///
573
    /// The actual stack size may be greater than this value if the platform
574
    /// specifies minimal stack size.
575
    ///
576
    /// The default stack size for spawned threads is 2 MiB, though this
577
    /// particular stack size is subject to change in the future.
578
    ///
579
    /// # Examples
580
    ///
581
    /// ```
582
    /// # #[cfg(not(target_family = "wasm"))]
583
    /// # {
584
    /// # use tokio::runtime;
585
    ///
586
    /// # pub fn main() {
587
    /// let rt = runtime::Builder::new_multi_thread()
588
    ///     .thread_stack_size(32 * 1024)
589
    ///     .build();
590
    /// # }
591
    /// # }
592
    /// ```
593
0
    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
594
0
        self.thread_stack_size = Some(val);
595
0
        self
596
0
    }
597
598
    /// Executes function `f` after each thread is started but before it starts
599
    /// doing work.
600
    ///
601
    /// This is intended for bookkeeping and monitoring use cases.
602
    ///
603
    /// # Examples
604
    ///
605
    /// ```
606
    /// # #[cfg(not(target_family = "wasm"))]
607
    /// # {
608
    /// # use tokio::runtime;
609
    /// # pub fn main() {
610
    /// let runtime = runtime::Builder::new_multi_thread()
611
    ///     .on_thread_start(|| {
612
    ///         println!("thread started");
613
    ///     })
614
    ///     .build();
615
    /// # }
616
    /// # }
617
    /// ```
618
    #[cfg(not(loom))]
619
0
    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
620
0
    where
621
0
        F: Fn() + Send + Sync + 'static,
622
    {
623
0
        self.after_start = Some(std::sync::Arc::new(f));
624
0
        self
625
0
    }
626
627
    /// Executes function `f` before each thread stops.
628
    ///
629
    /// This is intended for bookkeeping and monitoring use cases.
630
    ///
631
    /// # Examples
632
    ///
633
    /// ```
634
    /// # #[cfg(not(target_family = "wasm"))]
635
    /// {
636
    /// # use tokio::runtime;
637
    /// # pub fn main() {
638
    /// let runtime = runtime::Builder::new_multi_thread()
639
    ///     .on_thread_stop(|| {
640
    ///         println!("thread stopping");
641
    ///     })
642
    ///     .build();
643
    /// # }
644
    /// # }
645
    /// ```
646
    #[cfg(not(loom))]
647
0
    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
648
0
    where
649
0
        F: Fn() + Send + Sync + 'static,
650
    {
651
0
        self.before_stop = Some(std::sync::Arc::new(f));
652
0
        self
653
0
    }
654
655
    /// Executes function `f` just before a thread is parked (goes idle).
656
    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
657
    /// can be called, and may result in this thread being unparked immediately.
658
    ///
659
    /// This can be used to start work only when the executor is idle, or for bookkeeping
660
    /// and monitoring purposes.
661
    ///
662
    /// Note: There can only be one park callback for a runtime; calling this function
663
    /// more than once replaces the last callback defined, rather than adding to it.
664
    ///
665
    /// # Examples
666
    ///
667
    /// ## Multithreaded executor
668
    /// ```
669
    /// # #[cfg(not(target_family = "wasm"))]
670
    /// # {
671
    /// # use std::sync::Arc;
672
    /// # use std::sync::atomic::{AtomicBool, Ordering};
673
    /// # use tokio::runtime;
674
    /// # use tokio::sync::Barrier;
675
    /// # pub fn main() {
676
    /// let once = AtomicBool::new(true);
677
    /// let barrier = Arc::new(Barrier::new(2));
678
    ///
679
    /// let runtime = runtime::Builder::new_multi_thread()
680
    ///     .worker_threads(1)
681
    ///     .on_thread_park({
682
    ///         let barrier = barrier.clone();
683
    ///         move || {
684
    ///             let barrier = barrier.clone();
685
    ///             if once.swap(false, Ordering::Relaxed) {
686
    ///                 tokio::spawn(async move { barrier.wait().await; });
687
    ///            }
688
    ///         }
689
    ///     })
690
    ///     .build()
691
    ///     .unwrap();
692
    ///
693
    /// runtime.block_on(async {
694
    ///    barrier.wait().await;
695
    /// })
696
    /// # }
697
    /// # }
698
    /// ```
699
    /// ## Current thread executor
700
    /// ```
701
    /// # use std::sync::Arc;
702
    /// # use std::sync::atomic::{AtomicBool, Ordering};
703
    /// # use tokio::runtime;
704
    /// # use tokio::sync::Barrier;
705
    /// # pub fn main() {
706
    /// let once = AtomicBool::new(true);
707
    /// let barrier = Arc::new(Barrier::new(2));
708
    ///
709
    /// let runtime = runtime::Builder::new_current_thread()
710
    ///     .on_thread_park({
711
    ///         let barrier = barrier.clone();
712
    ///         move || {
713
    ///             let barrier = barrier.clone();
714
    ///             if once.swap(false, Ordering::Relaxed) {
715
    ///                 tokio::spawn(async move { barrier.wait().await; });
716
    ///            }
717
    ///         }
718
    ///     })
719
    ///     .build()
720
    ///     .unwrap();
721
    ///
722
    /// runtime.block_on(async {
723
    ///    barrier.wait().await;
724
    /// })
725
    /// # }
726
    /// ```
727
    #[cfg(not(loom))]
728
0
    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
729
0
    where
730
0
        F: Fn() + Send + Sync + 'static,
731
    {
732
0
        self.before_park = Some(std::sync::Arc::new(f));
733
0
        self
734
0
    }
735
736
    /// Executes function `f` just after a thread unparks (starts executing tasks).
737
    ///
738
    /// This is intended for bookkeeping and monitoring use cases; note that work
739
    /// in this callback will increase latencies when the application has allowed one or
740
    /// more runtime threads to go idle.
741
    ///
742
    /// Note: There can only be one unpark callback for a runtime; calling this function
743
    /// more than once replaces the last callback defined, rather than adding to it.
744
    ///
745
    /// # Examples
746
    ///
747
    /// ```
748
    /// # #[cfg(not(target_family = "wasm"))]
749
    /// # {
750
    /// # use tokio::runtime;
751
    /// # pub fn main() {
752
    /// let runtime = runtime::Builder::new_multi_thread()
753
    ///     .on_thread_unpark(|| {
754
    ///         println!("thread unparking");
755
    ///     })
756
    ///     .build();
757
    ///
758
    /// runtime.unwrap().block_on(async {
759
    ///    tokio::task::yield_now().await;
760
    ///    println!("Hello from Tokio!");
761
    /// })
762
    /// # }
763
    /// # }
764
    /// ```
765
    #[cfg(not(loom))]
766
0
    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
767
0
    where
768
0
        F: Fn() + Send + Sync + 'static,
769
    {
770
0
        self.after_unpark = Some(std::sync::Arc::new(f));
771
0
        self
772
0
    }
773
774
    /// Executes function `f` just before a task is spawned.
775
    ///
776
    /// `f` is called within the Tokio context, so functions like
777
    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
778
    /// invoked immediately.
779
    ///
780
    /// This can be used for bookkeeping or monitoring purposes.
781
    ///
782
    /// Note: There can only be one spawn callback for a runtime; calling this function more
783
    /// than once replaces the last callback defined, rather than adding to it.
784
    ///
785
    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
786
    ///
787
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
788
    /// may break in 1.x releases. See [the documentation on unstable
789
    /// features][unstable] for details.
790
    ///
791
    /// [unstable]: crate#unstable-features
792
    ///
793
    /// # Examples
794
    ///
795
    /// ```
796
    /// # use tokio::runtime;
797
    /// # pub fn main() {
798
    /// let runtime = runtime::Builder::new_current_thread()
799
    ///     .on_task_spawn(|_| {
800
    ///         println!("spawning task");
801
    ///     })
802
    ///     .build()
803
    ///     .unwrap();
804
    ///
805
    /// runtime.block_on(async {
806
    ///     tokio::task::spawn(std::future::ready(()));
807
    ///
808
    ///     for _ in 0..64 {
809
    ///         tokio::task::yield_now().await;
810
    ///     }
811
    /// })
812
    /// # }
813
    /// ```
814
    #[cfg(all(not(loom), tokio_unstable))]
815
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
816
    pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
817
    where
818
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
819
    {
820
        self.before_spawn = Some(std::sync::Arc::new(f));
821
        self
822
    }
823
824
    /// Executes function `f` just before a task is polled
825
    ///
826
    /// `f` is called within the Tokio context, so functions like
827
    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
828
    /// invoked immediately.
829
    ///
830
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
831
    /// may break in 1.x releases. See [the documentation on unstable
832
    /// features][unstable] for details.
833
    ///
834
    /// [unstable]: crate#unstable-features
835
    ///
836
    /// # Examples
837
    ///
838
    /// ```
839
    /// # #[cfg(not(target_family = "wasm"))]
840
    /// # {
841
    /// # use std::sync::{atomic::AtomicUsize, Arc};
842
    /// # use tokio::task::yield_now;
843
    /// # pub fn main() {
844
    /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
845
    /// let poll_start = poll_start_counter.clone();
846
    /// let rt = tokio::runtime::Builder::new_multi_thread()
847
    ///     .enable_all()
848
    ///     .on_before_task_poll(move |meta| {
849
    ///         println!("task {} is about to be polled", meta.id())
850
    ///     })
851
    ///     .build()
852
    ///     .unwrap();
853
    /// let task = rt.spawn(async {
854
    ///     yield_now().await;
855
    /// });
856
    /// let _ = rt.block_on(task);
857
    ///
858
    /// # }
859
    /// # }
860
    /// ```
861
    #[cfg(tokio_unstable)]
862
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
863
    pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
864
    where
865
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
866
    {
867
        self.before_poll = Some(std::sync::Arc::new(f));
868
        self
869
    }
870
871
    /// Executes function `f` just after a task is polled
872
    ///
873
    /// `f` is called within the Tokio context, so functions like
874
    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
875
    /// invoked immediately.
876
    ///
877
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
878
    /// may break in 1.x releases. See [the documentation on unstable
879
    /// features][unstable] for details.
880
    ///
881
    /// [unstable]: crate#unstable-features
882
    ///
883
    /// # Examples
884
    ///
885
    /// ```
886
    /// # #[cfg(not(target_family = "wasm"))]
887
    /// # {
888
    /// # use std::sync::{atomic::AtomicUsize, Arc};
889
    /// # use tokio::task::yield_now;
890
    /// # pub fn main() {
891
    /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
892
    /// let poll_stop = poll_stop_counter.clone();
893
    /// let rt = tokio::runtime::Builder::new_multi_thread()
894
    ///     .enable_all()
895
    ///     .on_after_task_poll(move |meta| {
896
    ///         println!("task {} completed polling", meta.id());
897
    ///     })
898
    ///     .build()
899
    ///     .unwrap();
900
    /// let task = rt.spawn(async {
901
    ///     yield_now().await;
902
    /// });
903
    /// let _ = rt.block_on(task);
904
    ///
905
    /// # }
906
    /// # }
907
    /// ```
908
    #[cfg(tokio_unstable)]
909
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
910
    pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
911
    where
912
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
913
    {
914
        self.after_poll = Some(std::sync::Arc::new(f));
915
        self
916
    }
917
918
    /// Executes function `f` just after a task is terminated.
919
    ///
920
    /// `f` is called within the Tokio context, so functions like
921
    /// [`tokio::spawn`](crate::spawn) can be called.
922
    ///
923
    /// This can be used for bookkeeping or monitoring purposes.
924
    ///
925
    /// Note: There can only be one task termination callback for a runtime; calling this
926
    /// function more than once replaces the last callback defined, rather than adding to it.
927
    ///
928
    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
929
    ///
930
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
931
    /// may break in 1.x releases. See [the documentation on unstable
932
    /// features][unstable] for details.
933
    ///
934
    /// [unstable]: crate#unstable-features
935
    ///
936
    /// # Examples
937
    ///
938
    /// ```
939
    /// # use tokio::runtime;
940
    /// # pub fn main() {
941
    /// let runtime = runtime::Builder::new_current_thread()
942
    ///     .on_task_terminate(|_| {
943
    ///         println!("killing task");
944
    ///     })
945
    ///     .build()
946
    ///     .unwrap();
947
    ///
948
    /// runtime.block_on(async {
949
    ///     tokio::task::spawn(std::future::ready(()));
950
    ///
951
    ///     for _ in 0..64 {
952
    ///         tokio::task::yield_now().await;
953
    ///     }
954
    /// })
955
    /// # }
956
    /// ```
957
    #[cfg(all(not(loom), tokio_unstable))]
958
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
959
    pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
960
    where
961
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
962
    {
963
        self.after_termination = Some(std::sync::Arc::new(f));
964
        self
965
    }
966
967
    /// Creates the configured `Runtime`.
968
    ///
969
    /// The returned `Runtime` instance is ready to spawn tasks.
970
    ///
971
    /// # Examples
972
    ///
973
    /// ```
974
    /// # #[cfg(not(target_family = "wasm"))]
975
    /// # {
976
    /// use tokio::runtime::Builder;
977
    ///
978
    /// let rt  = Builder::new_multi_thread().build().unwrap();
979
    ///
980
    /// rt.block_on(async {
981
    ///     println!("Hello from the Tokio runtime");
982
    /// });
983
    /// # }
984
    /// ```
985
0
    pub fn build(&mut self) -> io::Result<Runtime> {
986
0
        match &self.kind {
987
0
            Kind::CurrentThread => self.build_current_thread_runtime(),
988
            #[cfg(feature = "rt-multi-thread")]
989
            Kind::MultiThread => self.build_threaded_runtime(),
990
        }
991
0
    }
992
993
    /// Creates the configured [`LocalRuntime`].
994
    ///
995
    /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
996
    ///
997
    /// # Panics
998
    ///
999
    /// This will panic if the runtime is configured with [`new_multi_thread()`].
1000
    ///
1001
    /// [`new_multi_thread()`]: Builder::new_multi_thread
1002
    ///
1003
    /// # Examples
1004
    ///
1005
    /// ```
1006
    /// use tokio::runtime::{Builder, LocalOptions};
1007
    ///
1008
    /// let rt = Builder::new_current_thread()
1009
    ///     .build_local(LocalOptions::default())
1010
    ///     .unwrap();
1011
    ///
1012
    /// rt.spawn_local(async {
1013
    ///     println!("Hello from the Tokio runtime");
1014
    /// });
1015
    /// ```
1016
    #[allow(unused_variables, unreachable_patterns)]
1017
    #[cfg(tokio_unstable)]
1018
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
1019
    pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
1020
        match &self.kind {
1021
            Kind::CurrentThread => self.build_current_thread_local_runtime(),
1022
            #[cfg(feature = "rt-multi-thread")]
1023
            Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
1024
        }
1025
    }
1026
1027
0
    fn get_cfg(&self) -> driver::Cfg {
1028
0
        driver::Cfg {
1029
0
            enable_pause_time: match self.kind {
1030
0
                Kind::CurrentThread => true,
1031
0
                #[cfg(feature = "rt-multi-thread")]
1032
0
                Kind::MultiThread => false,
1033
0
            },
1034
0
            enable_io: self.enable_io,
1035
0
            enable_time: self.enable_time,
1036
0
            start_paused: self.start_paused,
1037
0
            nevents: self.nevents,
1038
0
            timer_flavor: self.timer_flavor,
1039
0
        }
1040
0
    }
1041
1042
    /// Sets a custom timeout for a thread in the blocking pool.
1043
    ///
1044
    /// By default, the timeout for a thread is set to 10 seconds. This can
1045
    /// be overridden using `.thread_keep_alive()`.
1046
    ///
1047
    /// # Example
1048
    ///
1049
    /// ```
1050
    /// # #[cfg(not(target_family = "wasm"))]
1051
    /// # {
1052
    /// # use tokio::runtime;
1053
    /// # use std::time::Duration;
1054
    /// # pub fn main() {
1055
    /// let rt = runtime::Builder::new_multi_thread()
1056
    ///     .thread_keep_alive(Duration::from_millis(100))
1057
    ///     .build();
1058
    /// # }
1059
    /// # }
1060
    /// ```
1061
0
    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1062
0
        self.keep_alive = Some(duration);
1063
0
        self
1064
0
    }
1065
1066
    /// Sets the number of scheduler ticks after which the scheduler will poll the global
1067
    /// task queue.
1068
    ///
1069
    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1070
    ///
1071
    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1072
    /// [the module documentation] for the default behavior of the multi-thread scheduler.
1073
    ///
1074
    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1075
    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1076
    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1077
    /// getting started on new work, especially if tasks frequently yield rather than complete
1078
    /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1079
    /// tasks from the local queue will be executed only if the global queue is empty.
1080
    /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1081
    /// tasks quickly complete polling.
1082
    ///
1083
    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1084
    ///
1085
    /// # Panics
1086
    ///
1087
    /// This function will panic if 0 is passed as an argument.
1088
    ///
1089
    /// # Examples
1090
    ///
1091
    /// ```
1092
    /// # #[cfg(not(target_family = "wasm"))]
1093
    /// # {
1094
    /// # use tokio::runtime;
1095
    /// # pub fn main() {
1096
    /// let rt = runtime::Builder::new_multi_thread()
1097
    ///     .global_queue_interval(31)
1098
    ///     .build();
1099
    /// # }
1100
    /// # }
1101
    /// ```
1102
    #[track_caller]
1103
0
    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1104
0
        assert!(val > 0, "global_queue_interval must be greater than 0");
1105
0
        self.global_queue_interval = Some(val);
1106
0
        self
1107
0
    }
1108
1109
    /// Sets the number of scheduler ticks after which the scheduler will poll for
1110
    /// external events (timers, I/O, and so on).
1111
    ///
1112
    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1113
    ///
1114
    /// By default, the event interval is `61` for all scheduler types.
1115
    ///
1116
    /// Setting the event interval determines the effective "priority" of delivering
1117
    /// these external events (which may wake up additional tasks), compared to
1118
    /// executing tasks that are currently ready to run. A smaller value is useful
1119
    /// when tasks frequently spend a long time in polling, or frequently yield,
1120
    /// which can result in overly long delays picking up I/O events. Conversely,
1121
    /// picking up new events requires extra synchronization and syscall overhead,
1122
    /// so if tasks generally complete their polling quickly, a higher event interval
1123
    /// will minimize that overhead while still keeping the scheduler responsive to
1124
    /// events.
1125
    ///
1126
    /// # Examples
1127
    ///
1128
    /// ```
1129
    /// # #[cfg(not(target_family = "wasm"))]
1130
    /// # {
1131
    /// # use tokio::runtime;
1132
    /// # pub fn main() {
1133
    /// let rt = runtime::Builder::new_multi_thread()
1134
    ///     .event_interval(31)
1135
    ///     .build();
1136
    /// # }
1137
    /// # }
1138
    /// ```
1139
0
    pub fn event_interval(&mut self, val: u32) -> &mut Self {
1140
0
        self.event_interval = val;
1141
0
        self
1142
0
    }
1143
1144
    cfg_unstable! {
1145
        /// Configure how the runtime responds to an unhandled panic on a
1146
        /// spawned task.
1147
        ///
1148
        /// By default, an unhandled panic (i.e. a panic not caught by
1149
        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1150
        /// execution. The panic's error value is forwarded to the task's
1151
        /// [`JoinHandle`] and all other spawned tasks continue running.
1152
        ///
1153
        /// The `unhandled_panic` option enables configuring this behavior.
1154
        ///
1155
        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1156
        ///   spawned tasks have no impact on the runtime's execution.
1157
        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1158
        ///   shutdown immediately when a spawned task panics even if that
1159
        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
1160
        ///   will immediately terminate and further calls to
1161
        ///   [`Runtime::block_on`] will panic.
1162
        ///
1163
        /// # Panics
1164
        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1165
        /// on a runtime other than the current thread runtime.
1166
        ///
1167
        /// # Unstable
1168
        ///
1169
        /// This option is currently unstable and its implementation is
1170
        /// incomplete. The API may change or be removed in the future. See
1171
        /// issue [tokio-rs/tokio#4516] for more details.
1172
        ///
1173
        /// # Examples
1174
        ///
1175
        /// The following demonstrates a runtime configured to shutdown on
1176
        /// panic. The first spawned task panics and results in the runtime
1177
        /// shutting down. The second spawned task never has a chance to
1178
        /// execute. The call to `block_on` will panic due to the runtime being
1179
        /// forcibly shutdown.
1180
        ///
1181
        /// ```should_panic
1182
        /// use tokio::runtime::{self, UnhandledPanic};
1183
        ///
1184
        /// # pub fn main() {
1185
        /// let rt = runtime::Builder::new_current_thread()
1186
        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1187
        ///     .build()
1188
        ///     .unwrap();
1189
        ///
1190
        /// rt.spawn(async { panic!("boom"); });
1191
        /// rt.spawn(async {
1192
        ///     // This task never completes.
1193
        /// });
1194
        ///
1195
        /// rt.block_on(async {
1196
        ///     // Do some work
1197
        /// # loop { tokio::task::yield_now().await; }
1198
        /// })
1199
        /// # }
1200
        /// ```
1201
        ///
1202
        /// [`JoinHandle`]: struct@crate::task::JoinHandle
1203
        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1204
        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1205
            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1206
                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1207
            }
1208
1209
            self.unhandled_panic = behavior;
1210
            self
1211
        }
1212
1213
        /// Disables the LIFO task scheduler heuristic.
1214
        ///
1215
        /// The multi-threaded scheduler includes a heuristic for optimizing
1216
        /// message-passing patterns. This heuristic results in the **last**
1217
        /// scheduled task being polled first.
1218
        ///
1219
        /// To implement this heuristic, each worker thread has a slot which
1220
        /// holds the task that should be polled next. However, this slot cannot
1221
        /// be stolen by other worker threads, which can result in lower total
1222
        /// throughput when tasks tend to have longer poll times.
1223
        ///
1224
        /// This configuration option will disable this heuristic resulting in
1225
        /// all scheduled tasks being pushed into the worker-local queue, which
1226
        /// is stealable.
1227
        ///
1228
        /// Consider trying this option when the task "scheduled" time is high
1229
        /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1230
        /// collect this data.
1231
        ///
1232
        /// # Unstable
1233
        ///
1234
        /// This configuration option is considered a workaround for the LIFO
1235
        /// slot not being stealable. When the slot becomes stealable, we will
1236
        /// revisit whether or not this option is necessary. See
1237
        /// issue [tokio-rs/tokio#4941].
1238
        ///
1239
        /// # Examples
1240
        ///
1241
        /// ```
1242
        /// # #[cfg(not(target_family = "wasm"))]
1243
        /// # {
1244
        /// use tokio::runtime;
1245
        ///
1246
        /// let rt = runtime::Builder::new_multi_thread()
1247
        ///     .disable_lifo_slot()
1248
        ///     .build()
1249
        ///     .unwrap();
1250
        /// # }
1251
        /// ```
1252
        ///
1253
        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1254
        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1255
        pub fn disable_lifo_slot(&mut self) -> &mut Self {
1256
            self.disable_lifo_slot = true;
1257
            self
1258
        }
1259
1260
        /// Specifies the random number generation seed to use within all
1261
        /// threads associated with the runtime being built.
1262
        ///
1263
        /// This option is intended to make certain parts of the runtime
1264
        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1265
        /// [`tokio::select!`] it will ensure that the order that branches are
1266
        /// polled is deterministic.
1267
        ///
1268
        /// In addition to the code specifying `rng_seed` and interacting with
1269
        /// the runtime, the internals of Tokio and the Rust compiler may affect
1270
        /// the sequences of random numbers. In order to ensure repeatable
1271
        /// results, the version of Tokio, the versions of all other
1272
        /// dependencies that interact with Tokio, and the Rust compiler version
1273
        /// should also all remain constant.
1274
        ///
1275
        /// # Examples
1276
        ///
1277
        /// ```
1278
        /// # use tokio::runtime::{self, RngSeed};
1279
        /// # pub fn main() {
1280
        /// let seed = RngSeed::from_bytes(b"place your seed here");
1281
        /// let rt = runtime::Builder::new_current_thread()
1282
        ///     .rng_seed(seed)
1283
        ///     .build();
1284
        /// # }
1285
        /// ```
1286
        ///
1287
        /// [`tokio::select!`]: crate::select
1288
        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1289
            self.seed_generator = RngSeedGenerator::new(seed);
1290
            self
1291
        }
1292
    }
1293
1294
    cfg_unstable_metrics! {
1295
        /// Enables tracking the distribution of task poll times.
1296
        ///
1297
        /// Task poll times are not instrumented by default as doing so requires
1298
        /// calling [`Instant::now()`] twice per task poll, which could add
1299
        /// measurable overhead. Use the [`Handle::metrics()`] to access the
1300
        /// metrics data.
1301
        ///
1302
        /// The histogram uses fixed bucket sizes. In other words, the histogram
1303
        /// buckets are not dynamic based on input values. Use the
1304
        /// `metrics_poll_time_histogram` builder methods to configure the
1305
        /// histogram details.
1306
        ///
1307
        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1308
        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1309
        /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1310
        /// to select [`LogHistogram`] instead.
1311
        ///
1312
        /// # Examples
1313
        ///
1314
        /// ```
1315
        /// # #[cfg(not(target_family = "wasm"))]
1316
        /// # {
1317
        /// use tokio::runtime;
1318
        ///
1319
        /// let rt = runtime::Builder::new_multi_thread()
1320
        ///     .enable_metrics_poll_time_histogram()
1321
        ///     .build()
1322
        ///     .unwrap();
1323
        /// # // Test default values here
1324
        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1325
        /// # let m = rt.handle().metrics();
1326
        /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1327
        /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1328
        /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1329
        /// # }
1330
        /// ```
1331
        ///
1332
        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1333
        /// [`Instant::now()`]: std::time::Instant::now
1334
        /// [`LogHistogram`]: crate::runtime::LogHistogram
1335
        /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1336
        pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1337
            self.metrics_poll_count_histogram_enable = true;
1338
            self
1339
        }
1340
1341
        /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1342
        ///
1343
        /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1344
        #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1345
        #[doc(hidden)]
1346
        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1347
            self.enable_metrics_poll_time_histogram()
1348
        }
1349
1350
        /// Sets the histogram scale for tracking the distribution of task poll
1351
        /// times.
1352
        ///
1353
        /// Tracking the distribution of task poll times can be done using a
1354
        /// linear or log scale. When using linear scale, each histogram bucket
1355
        /// will represent the same range of poll times. When using log scale,
1356
        /// each histogram bucket will cover a range twice as big as the
1357
        /// previous bucket.
1358
        ///
1359
        /// **Default:** linear scale.
1360
        ///
1361
        /// # Examples
1362
        ///
1363
        /// ```
1364
        /// # #[cfg(not(target_family = "wasm"))]
1365
        /// # {
1366
        /// use tokio::runtime::{self, HistogramScale};
1367
        ///
1368
        /// # #[allow(deprecated)]
1369
        /// let rt = runtime::Builder::new_multi_thread()
1370
        ///     .enable_metrics_poll_time_histogram()
1371
        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1372
        ///     .build()
1373
        ///     .unwrap();
1374
        /// # }
1375
        /// ```
1376
        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1377
        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1378
            self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1379
            self
1380
        }
1381
1382
        /// Configure the histogram for tracking poll times
1383
        ///
1384
        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1385
        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1386
        /// better granularity with low memory usage, use [`LogHistogram`] instead.
1387
        ///
1388
        /// # Examples
1389
        /// Configure a [`LogHistogram`] with [default configuration]:
1390
        /// ```
1391
        /// # #[cfg(not(target_family = "wasm"))]
1392
        /// # {
1393
        /// use tokio::runtime;
1394
        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1395
        ///
1396
        /// let rt = runtime::Builder::new_multi_thread()
1397
        ///     .enable_metrics_poll_time_histogram()
1398
        ///     .metrics_poll_time_histogram_configuration(
1399
        ///         HistogramConfiguration::log(LogHistogram::default())
1400
        ///     )
1401
        ///     .build()
1402
        ///     .unwrap();
1403
        /// # }
1404
        /// ```
1405
        ///
1406
        /// Configure a linear histogram with 100 buckets, each 10μs wide
1407
        /// ```
1408
        /// # #[cfg(not(target_family = "wasm"))]
1409
        /// # {
1410
        /// use tokio::runtime;
1411
        /// use std::time::Duration;
1412
        /// use tokio::runtime::HistogramConfiguration;
1413
        ///
1414
        /// let rt = runtime::Builder::new_multi_thread()
1415
        ///     .enable_metrics_poll_time_histogram()
1416
        ///     .metrics_poll_time_histogram_configuration(
1417
        ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1418
        ///     )
1419
        ///     .build()
1420
        ///     .unwrap();
1421
        /// # }
1422
        /// ```
1423
        ///
1424
        /// Configure a [`LogHistogram`] with the following settings:
1425
        /// - Measure times from 100ns to 120s
1426
        /// - Max error of 0.1
1427
        /// - No more than 1024 buckets
1428
        /// ```
1429
        /// # #[cfg(not(target_family = "wasm"))]
1430
        /// # {
1431
        /// use std::time::Duration;
1432
        /// use tokio::runtime;
1433
        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1434
        ///
1435
        /// let rt = runtime::Builder::new_multi_thread()
1436
        ///     .enable_metrics_poll_time_histogram()
1437
        ///     .metrics_poll_time_histogram_configuration(
1438
        ///         HistogramConfiguration::log(LogHistogram::builder()
1439
        ///             .max_value(Duration::from_secs(120))
1440
        ///             .min_value(Duration::from_nanos(100))
1441
        ///             .max_error(0.1)
1442
        ///             .max_buckets(1024)
1443
        ///             .expect("configuration uses 488 buckets")
1444
        ///         )
1445
        ///     )
1446
        ///     .build()
1447
        ///     .unwrap();
1448
        /// # }
1449
        /// ```
1450
        ///
1451
        /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1452
        /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1453
        /// where each bucket is twice the size of the previous bucket.
1454
        /// ```rust
1455
        /// use std::time::Duration;
1456
        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1457
        /// let rt = tokio::runtime::Builder::new_current_thread()
1458
        ///     .enable_all()
1459
        ///     .enable_metrics_poll_time_histogram()
1460
        ///     .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1461
        ///         LogHistogram::builder()
1462
        ///             .min_value(Duration::from_micros(20))
1463
        ///             .max_value(Duration::from_millis(4))
1464
        ///             // Set `precision_exact` to `0` to match `HistogramScale::Log`
1465
        ///             .precision_exact(0)
1466
        ///             .max_buckets(10)
1467
        ///             .unwrap(),
1468
        ///     ))
1469
        ///     .build()
1470
        ///     .unwrap();
1471
        /// ```
1472
        ///
1473
        /// [`LogHistogram`]: crate::runtime::LogHistogram
1474
        /// [default configuration]: crate::runtime::LogHistogramBuilder
1475
        /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1476
        pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1477
            self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1478
            self
1479
        }
1480
1481
        /// Sets the histogram resolution for tracking the distribution of task
1482
        /// poll times.
1483
        ///
1484
        /// The resolution is the histogram's first bucket's range. When using a
1485
        /// linear histogram scale, each bucket will cover the same range. When
1486
        /// using a log scale, each bucket will cover a range twice as big as
1487
        /// the previous bucket. In the log case, the resolution represents the
1488
        /// smallest bucket range.
1489
        ///
1490
        /// Note that, when using log scale, the resolution is rounded up to the
1491
        /// nearest power of 2 in nanoseconds.
1492
        ///
1493
        /// **Default:** 100 microseconds.
1494
        ///
1495
        /// # Examples
1496
        ///
1497
        /// ```
1498
        /// # #[cfg(not(target_family = "wasm"))]
1499
        /// # {
1500
        /// use tokio::runtime;
1501
        /// use std::time::Duration;
1502
        ///
1503
        /// # #[allow(deprecated)]
1504
        /// let rt = runtime::Builder::new_multi_thread()
1505
        ///     .enable_metrics_poll_time_histogram()
1506
        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1507
        ///     .build()
1508
        ///     .unwrap();
1509
        /// # }
1510
        /// ```
1511
        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1512
        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1513
            assert!(resolution > Duration::from_secs(0));
1514
            // Sanity check the argument and also make the cast below safe.
1515
            assert!(resolution <= Duration::from_secs(1));
1516
1517
            let resolution = resolution.as_nanos() as u64;
1518
1519
            self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1520
            self
1521
        }
1522
1523
        /// Sets the number of buckets for the histogram tracking the
1524
        /// distribution of task poll times.
1525
        ///
1526
        /// The last bucket tracks all greater values that fall out of other
1527
        /// ranges. So, configuring the histogram using a linear scale,
1528
        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1529
        /// polls that take more than 450ms to complete.
1530
        ///
1531
        /// **Default:** 10
1532
        ///
1533
        /// # Examples
1534
        ///
1535
        /// ```
1536
        /// # #[cfg(not(target_family = "wasm"))]
1537
        /// # {
1538
        /// use tokio::runtime;
1539
        ///
1540
        /// # #[allow(deprecated)]
1541
        /// let rt = runtime::Builder::new_multi_thread()
1542
        ///     .enable_metrics_poll_time_histogram()
1543
        ///     .metrics_poll_count_histogram_buckets(15)
1544
        ///     .build()
1545
        ///     .unwrap();
1546
        /// # }
1547
        /// ```
1548
        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1549
        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1550
            self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1551
            self
1552
        }
1553
    }
1554
1555
0
    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1556
        use crate::runtime::runtime::Scheduler;
1557
1558
0
        let (scheduler, handle, blocking_pool) =
1559
0
            self.build_current_thread_runtime_components(None)?;
1560
1561
0
        Ok(Runtime::from_parts(
1562
0
            Scheduler::CurrentThread(scheduler),
1563
0
            handle,
1564
0
            blocking_pool,
1565
0
        ))
1566
0
    }
1567
1568
    #[cfg(tokio_unstable)]
1569
    fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1570
        use crate::runtime::local_runtime::LocalRuntimeScheduler;
1571
1572
        let tid = std::thread::current().id();
1573
1574
        let (scheduler, handle, blocking_pool) =
1575
            self.build_current_thread_runtime_components(Some(tid))?;
1576
1577
        Ok(LocalRuntime::from_parts(
1578
            LocalRuntimeScheduler::CurrentThread(scheduler),
1579
            handle,
1580
            blocking_pool,
1581
        ))
1582
    }
1583
1584
0
    fn build_current_thread_runtime_components(
1585
0
        &mut self,
1586
0
        local_tid: Option<ThreadId>,
1587
0
    ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1588
        use crate::runtime::scheduler;
1589
        use crate::runtime::Config;
1590
1591
0
        let mut cfg = self.get_cfg();
1592
0
        cfg.timer_flavor = TimerFlavor::Traditional;
1593
0
        let (driver, driver_handle) = driver::Driver::new(cfg)?;
1594
1595
        // Blocking pool
1596
0
        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1597
0
        let blocking_spawner = blocking_pool.spawner().clone();
1598
1599
        // Generate a rng seed for this runtime.
1600
0
        let seed_generator_1 = self.seed_generator.next_generator();
1601
0
        let seed_generator_2 = self.seed_generator.next_generator();
1602
1603
        // And now put a single-threaded scheduler on top of the timer. When
1604
        // there are no futures ready to do something, it'll let the timer or
1605
        // the reactor to generate some new stimuli for the futures to continue
1606
        // in their life.
1607
0
        let (scheduler, handle) = CurrentThread::new(
1608
0
            driver,
1609
0
            driver_handle,
1610
0
            blocking_spawner,
1611
0
            seed_generator_2,
1612
0
            Config {
1613
0
                before_park: self.before_park.clone(),
1614
0
                after_unpark: self.after_unpark.clone(),
1615
0
                before_spawn: self.before_spawn.clone(),
1616
0
                #[cfg(tokio_unstable)]
1617
0
                before_poll: self.before_poll.clone(),
1618
0
                #[cfg(tokio_unstable)]
1619
0
                after_poll: self.after_poll.clone(),
1620
0
                after_termination: self.after_termination.clone(),
1621
0
                global_queue_interval: self.global_queue_interval,
1622
0
                event_interval: self.event_interval,
1623
0
                #[cfg(tokio_unstable)]
1624
0
                unhandled_panic: self.unhandled_panic.clone(),
1625
0
                disable_lifo_slot: self.disable_lifo_slot,
1626
0
                seed_generator: seed_generator_1,
1627
0
                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1628
0
            },
1629
0
            local_tid,
1630
0
        );
1631
1632
0
        let handle = Handle {
1633
0
            inner: scheduler::Handle::CurrentThread(handle),
1634
0
        };
1635
1636
0
        Ok((scheduler, handle, blocking_pool))
1637
0
    }
1638
1639
0
    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1640
0
        if self.metrics_poll_count_histogram_enable {
1641
0
            Some(self.metrics_poll_count_histogram.clone())
1642
        } else {
1643
0
            None
1644
        }
1645
0
    }
1646
}
1647
1648
cfg_io_driver! {
1649
    impl Builder {
1650
        /// Enables the I/O driver.
1651
        ///
1652
        /// Doing this enables using net, process, signal, and some I/O types on
1653
        /// the runtime.
1654
        ///
1655
        /// # Examples
1656
        ///
1657
        /// ```
1658
        /// use tokio::runtime;
1659
        ///
1660
        /// let rt = runtime::Builder::new_multi_thread()
1661
        ///     .enable_io()
1662
        ///     .build()
1663
        ///     .unwrap();
1664
        /// ```
1665
        pub fn enable_io(&mut self) -> &mut Self {
1666
            self.enable_io = true;
1667
            self
1668
        }
1669
1670
        /// Enables the I/O driver and configures the max number of events to be
1671
        /// processed per tick.
1672
        ///
1673
        /// # Examples
1674
        ///
1675
        /// ```
1676
        /// use tokio::runtime;
1677
        ///
1678
        /// let rt = runtime::Builder::new_current_thread()
1679
        ///     .enable_io()
1680
        ///     .max_io_events_per_tick(1024)
1681
        ///     .build()
1682
        ///     .unwrap();
1683
        /// ```
1684
        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1685
            self.nevents = capacity;
1686
            self
1687
        }
1688
    }
1689
}
1690
1691
cfg_time! {
1692
    impl Builder {
1693
        /// Enables the time driver.
1694
        ///
1695
        /// Doing this enables using `tokio::time` on the runtime.
1696
        ///
1697
        /// # Examples
1698
        ///
1699
        /// ```
1700
        /// # #[cfg(not(target_family = "wasm"))]
1701
        /// # {
1702
        /// use tokio::runtime;
1703
        ///
1704
        /// let rt = runtime::Builder::new_multi_thread()
1705
        ///     .enable_time()
1706
        ///     .build()
1707
        ///     .unwrap();
1708
        /// # }
1709
        /// ```
1710
0
        pub fn enable_time(&mut self) -> &mut Self {
1711
0
            self.enable_time = true;
1712
0
            self
1713
0
        }
1714
    }
1715
}
1716
1717
cfg_io_uring! {
1718
    impl Builder {
1719
        /// Enables the tokio's io_uring driver.
1720
        ///
1721
        /// Doing this enables using io_uring operations on the runtime.
1722
        ///
1723
        /// # Examples
1724
        ///
1725
        /// ```
1726
        /// use tokio::runtime;
1727
        ///
1728
        /// let rt = runtime::Builder::new_multi_thread()
1729
        ///     .enable_io_uring()
1730
        ///     .build()
1731
        ///     .unwrap();
1732
        /// ```
1733
        #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1734
        pub fn enable_io_uring(&mut self) -> &mut Self {
1735
            // Currently, the uring flag is equivalent to `enable_io`.
1736
            self.enable_io = true;
1737
            self
1738
        }
1739
    }
1740
}
1741
1742
cfg_test_util! {
1743
    impl Builder {
1744
        /// Controls if the runtime's clock starts paused or advancing.
1745
        ///
1746
        /// Pausing time requires the current-thread runtime; construction of
1747
        /// the runtime will panic otherwise.
1748
        ///
1749
        /// # Examples
1750
        ///
1751
        /// ```
1752
        /// use tokio::runtime;
1753
        ///
1754
        /// let rt = runtime::Builder::new_current_thread()
1755
        ///     .enable_time()
1756
        ///     .start_paused(true)
1757
        ///     .build()
1758
        ///     .unwrap();
1759
        /// ```
1760
0
        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1761
0
            self.start_paused = start_paused;
1762
0
            self
1763
0
        }
1764
    }
1765
}
1766
1767
cfg_rt_multi_thread! {
1768
    impl Builder {
1769
        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1770
            use crate::loom::sys::num_cpus;
1771
            use crate::runtime::{Config, runtime::Scheduler};
1772
            use crate::runtime::scheduler::{self, MultiThread};
1773
1774
            let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1775
1776
            let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1777
1778
            // Create the blocking pool
1779
            let blocking_pool =
1780
                blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1781
            let blocking_spawner = blocking_pool.spawner().clone();
1782
1783
            // Generate a rng seed for this runtime.
1784
            let seed_generator_1 = self.seed_generator.next_generator();
1785
            let seed_generator_2 = self.seed_generator.next_generator();
1786
1787
            let (scheduler, handle, launch) = MultiThread::new(
1788
                worker_threads,
1789
                driver,
1790
                driver_handle,
1791
                blocking_spawner,
1792
                seed_generator_2,
1793
                Config {
1794
                    before_park: self.before_park.clone(),
1795
                    after_unpark: self.after_unpark.clone(),
1796
                    before_spawn: self.before_spawn.clone(),
1797
                    #[cfg(tokio_unstable)]
1798
                    before_poll: self.before_poll.clone(),
1799
                    #[cfg(tokio_unstable)]
1800
                    after_poll: self.after_poll.clone(),
1801
                    after_termination: self.after_termination.clone(),
1802
                    global_queue_interval: self.global_queue_interval,
1803
                    event_interval: self.event_interval,
1804
                    #[cfg(tokio_unstable)]
1805
                    unhandled_panic: self.unhandled_panic.clone(),
1806
                    disable_lifo_slot: self.disable_lifo_slot,
1807
                    seed_generator: seed_generator_1,
1808
                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1809
                },
1810
                self.timer_flavor,
1811
            );
1812
1813
            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1814
1815
            // Spawn the thread pool workers
1816
            let _enter = handle.enter();
1817
            launch.launch();
1818
1819
            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1820
        }
1821
    }
1822
}
1823
1824
impl fmt::Debug for Builder {
1825
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1826
0
        fmt.debug_struct("Builder")
1827
0
            .field("worker_threads", &self.worker_threads)
1828
0
            .field("max_blocking_threads", &self.max_blocking_threads)
1829
0
            .field(
1830
0
                "thread_name",
1831
0
                &"<dyn Fn() -> String + Send + Sync + 'static>",
1832
0
            )
1833
0
            .field("thread_stack_size", &self.thread_stack_size)
1834
0
            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1835
0
            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1836
0
            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1837
0
            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1838
0
            .finish()
1839
0
    }
1840
}