Coverage Report

Created: 2026-02-14 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.44.2/src/runtime/builder.rs
Line
Count
Source
1
#![cfg_attr(loom, allow(unused_imports))]
2
3
use crate::runtime::handle::Handle;
4
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5
#[cfg(tokio_unstable)]
6
use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7
use crate::util::rand::{RngSeed, RngSeedGenerator};
8
9
use crate::runtime::blocking::BlockingPool;
10
use crate::runtime::scheduler::CurrentThread;
11
use std::fmt;
12
use std::io;
13
use std::thread::ThreadId;
14
use std::time::Duration;
15
16
/// Builds Tokio Runtime with custom configuration values.
17
///
18
/// Methods can be chained in order to set the configuration values. The
19
/// Runtime is constructed by calling [`build`].
20
///
21
/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22
/// or [`Builder::new_current_thread`].
23
///
24
/// See function level documentation for details on the various configuration
25
/// settings.
26
///
27
/// [`build`]: method@Self::build
28
/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29
/// [`Builder::new_current_thread`]: method@Self::new_current_thread
30
///
31
/// # Examples
32
///
33
/// ```
34
/// use tokio::runtime::Builder;
35
///
36
/// fn main() {
37
///     // build runtime
38
///     let runtime = Builder::new_multi_thread()
39
///         .worker_threads(4)
40
///         .thread_name("my-custom-name")
41
///         .thread_stack_size(3 * 1024 * 1024)
42
///         .build()
43
///         .unwrap();
44
///
45
///     // use runtime ...
46
/// }
47
/// ```
48
pub struct Builder {
49
    /// Runtime type
50
    kind: Kind,
51
52
    /// Whether or not to enable the I/O driver
53
    enable_io: bool,
54
    nevents: usize,
55
56
    /// Whether or not to enable the time driver
57
    enable_time: bool,
58
59
    /// Whether or not the clock should start paused.
60
    start_paused: bool,
61
62
    /// The number of worker threads, used by Runtime.
63
    ///
64
    /// Only used when not using the current-thread executor.
65
    worker_threads: Option<usize>,
66
67
    /// Cap on thread usage.
68
    max_blocking_threads: usize,
69
70
    /// Name fn used for threads spawned by the runtime.
71
    pub(super) thread_name: ThreadNameFn,
72
73
    /// Stack size used for threads spawned by the runtime.
74
    pub(super) thread_stack_size: Option<usize>,
75
76
    /// Callback to run after each thread starts.
77
    pub(super) after_start: Option<Callback>,
78
79
    /// To run before each worker thread stops
80
    pub(super) before_stop: Option<Callback>,
81
82
    /// To run before each worker thread is parked.
83
    pub(super) before_park: Option<Callback>,
84
85
    /// To run after each thread is unparked.
86
    pub(super) after_unpark: Option<Callback>,
87
88
    /// To run before each task is spawned.
89
    pub(super) before_spawn: Option<TaskCallback>,
90
91
    /// To run before each poll
92
    #[cfg(tokio_unstable)]
93
    pub(super) before_poll: Option<TaskCallback>,
94
95
    /// To run after each poll
96
    #[cfg(tokio_unstable)]
97
    pub(super) after_poll: Option<TaskCallback>,
98
99
    /// To run after each task is terminated.
100
    pub(super) after_termination: Option<TaskCallback>,
101
102
    /// Customizable keep alive timeout for `BlockingPool`
103
    pub(super) keep_alive: Option<Duration>,
104
105
    /// How many ticks before pulling a task from the global/remote queue?
106
    ///
107
    /// When `None`, the value is unspecified and behavior details are left to
108
    /// the scheduler. Each scheduler flavor could choose to either pick its own
109
    /// default value or use some other strategy to decide when to poll from the
110
    /// global queue. For example, the multi-threaded scheduler uses a
111
    /// self-tuning strategy based on mean task poll times.
112
    pub(super) global_queue_interval: Option<u32>,
113
114
    /// How many ticks before yielding to the driver for timer and I/O events?
115
    pub(super) event_interval: u32,
116
117
    pub(super) local_queue_capacity: usize,
118
119
    /// When true, the multi-threade scheduler LIFO slot should not be used.
120
    ///
121
    /// This option should only be exposed as unstable.
122
    pub(super) disable_lifo_slot: bool,
123
124
    /// Specify a random number generator seed to provide deterministic results
125
    pub(super) seed_generator: RngSeedGenerator,
126
127
    /// When true, enables task poll count histogram instrumentation.
128
    pub(super) metrics_poll_count_histogram_enable: bool,
129
130
    /// Configures the task poll count histogram
131
    pub(super) metrics_poll_count_histogram: HistogramBuilder,
132
133
    #[cfg(tokio_unstable)]
134
    pub(super) unhandled_panic: UnhandledPanic,
135
}
136
137
cfg_unstable! {
138
    /// How the runtime should respond to unhandled panics.
139
    ///
140
    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
141
    /// to configure the runtime behavior when a spawned task panics.
142
    ///
143
    /// See [`Builder::unhandled_panic`] for more details.
144
    #[derive(Debug, Clone)]
145
    #[non_exhaustive]
146
    pub enum UnhandledPanic {
147
        /// The runtime should ignore panics on spawned tasks.
148
        ///
149
        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
150
        /// tasks continue running normally.
151
        ///
152
        /// This is the default behavior.
153
        ///
154
        /// # Examples
155
        ///
156
        /// ```
157
        /// use tokio::runtime::{self, UnhandledPanic};
158
        ///
159
        /// # pub fn main() {
160
        /// let rt = runtime::Builder::new_current_thread()
161
        ///     .unhandled_panic(UnhandledPanic::Ignore)
162
        ///     .build()
163
        ///     .unwrap();
164
        ///
165
        /// let task1 = rt.spawn(async { panic!("boom"); });
166
        /// let task2 = rt.spawn(async {
167
        ///     // This task completes normally
168
        ///     "done"
169
        /// });
170
        ///
171
        /// rt.block_on(async {
172
        ///     // The panic on the first task is forwarded to the `JoinHandle`
173
        ///     assert!(task1.await.is_err());
174
        ///
175
        ///     // The second task completes normally
176
        ///     assert!(task2.await.is_ok());
177
        /// })
178
        /// # }
179
        /// ```
180
        ///
181
        /// [`JoinHandle`]: struct@crate::task::JoinHandle
182
        Ignore,
183
184
        /// The runtime should immediately shutdown if a spawned task panics.
185
        ///
186
        /// The runtime will immediately shutdown even if the panicked task's
187
        /// [`JoinHandle`] is still available. All further spawned tasks will be
188
        /// immediately dropped and call to [`Runtime::block_on`] will panic.
189
        ///
190
        /// # Examples
191
        ///
192
        /// ```should_panic
193
        /// use tokio::runtime::{self, UnhandledPanic};
194
        ///
195
        /// # pub fn main() {
196
        /// let rt = runtime::Builder::new_current_thread()
197
        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
198
        ///     .build()
199
        ///     .unwrap();
200
        ///
201
        /// rt.spawn(async { panic!("boom"); });
202
        /// rt.spawn(async {
203
        ///     // This task never completes.
204
        /// });
205
        ///
206
        /// rt.block_on(async {
207
        ///     // Do some work
208
        /// # loop { tokio::task::yield_now().await; }
209
        /// })
210
        /// # }
211
        /// ```
212
        ///
213
        /// [`JoinHandle`]: struct@crate::task::JoinHandle
214
        ShutdownRuntime,
215
    }
216
}
217
218
pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
219
220
#[derive(Clone, Copy)]
221
pub(crate) enum Kind {
222
    CurrentThread,
223
    #[cfg(feature = "rt-multi-thread")]
224
    MultiThread,
225
    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
226
    MultiThreadAlt,
227
}
228
229
impl Builder {
230
    /// Returns a new builder with the current thread scheduler selected.
231
    ///
232
    /// Configuration methods can be chained on the return value.
233
    ///
234
    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
235
    /// [`LocalSet`].
236
    ///
237
    /// [`LocalSet`]: crate::task::LocalSet
238
0
    pub fn new_current_thread() -> Builder {
239
        #[cfg(loom)]
240
        const EVENT_INTERVAL: u32 = 4;
241
        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
242
        #[cfg(not(loom))]
243
        const EVENT_INTERVAL: u32 = 61;
244
245
0
        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
246
0
    }
247
248
    /// Returns a new builder with the multi thread scheduler selected.
249
    ///
250
    /// Configuration methods can be chained on the return value.
251
    #[cfg(feature = "rt-multi-thread")]
252
    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
253
0
    pub fn new_multi_thread() -> Builder {
254
        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
255
0
        Builder::new(Kind::MultiThread, 61)
256
0
    }
257
258
    cfg_unstable! {
259
        /// Returns a new builder with the alternate multi thread scheduler
260
        /// selected.
261
        ///
262
        /// The alternate multi threaded scheduler is an in-progress
263
        /// candidate to replace the existing multi threaded scheduler. It
264
        /// currently does not scale as well to 16+ processors.
265
        ///
266
        /// This runtime flavor is currently **not considered production
267
        /// ready**.
268
        ///
269
        /// Configuration methods can be chained on the return value.
270
        #[cfg(feature = "rt-multi-thread")]
271
        #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
272
        pub fn new_multi_thread_alt() -> Builder {
273
            // The number `61` is fairly arbitrary. I believe this value was copied from golang.
274
            Builder::new(Kind::MultiThreadAlt, 61)
275
        }
276
    }
277
278
    /// Returns a new runtime builder initialized with default configuration
279
    /// values.
280
    ///
281
    /// Configuration methods can be chained on the return value.
282
0
    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
283
        Builder {
284
0
            kind,
285
286
            // I/O defaults to "off"
287
            enable_io: false,
288
            nevents: 1024,
289
290
            // Time defaults to "off"
291
            enable_time: false,
292
293
            // The clock starts not-paused
294
            start_paused: false,
295
296
            // Read from environment variable first in multi-threaded mode.
297
            // Default to lazy auto-detection (one thread per CPU core)
298
0
            worker_threads: None,
299
300
            max_blocking_threads: 512,
301
302
            // Default thread name
303
0
            thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
304
305
            // Do not set a stack size by default
306
0
            thread_stack_size: None,
307
308
            // No worker thread callbacks
309
0
            after_start: None,
310
0
            before_stop: None,
311
0
            before_park: None,
312
0
            after_unpark: None,
313
314
0
            before_spawn: None,
315
0
            after_termination: None,
316
317
            #[cfg(tokio_unstable)]
318
            before_poll: None,
319
            #[cfg(tokio_unstable)]
320
            after_poll: None,
321
322
0
            keep_alive: None,
323
324
            // Defaults for these values depend on the scheduler kind, so we get them
325
            // as parameters.
326
0
            global_queue_interval: None,
327
0
            event_interval,
328
329
            #[cfg(not(loom))]
330
            local_queue_capacity: 256,
331
332
            #[cfg(loom)]
333
            local_queue_capacity: 4,
334
335
0
            seed_generator: RngSeedGenerator::new(RngSeed::new()),
336
337
            #[cfg(tokio_unstable)]
338
            unhandled_panic: UnhandledPanic::Ignore,
339
340
            metrics_poll_count_histogram_enable: false,
341
342
0
            metrics_poll_count_histogram: HistogramBuilder::default(),
343
344
            disable_lifo_slot: false,
345
        }
346
0
    }
347
348
    /// Enables both I/O and time drivers.
349
    ///
350
    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
351
    /// individually. If additional components are added to Tokio in the future,
352
    /// `enable_all` will include these future components.
353
    ///
354
    /// # Examples
355
    ///
356
    /// ```
357
    /// use tokio::runtime;
358
    ///
359
    /// let rt = runtime::Builder::new_multi_thread()
360
    ///     .enable_all()
361
    ///     .build()
362
    ///     .unwrap();
363
    /// ```
364
0
    pub fn enable_all(&mut self) -> &mut Self {
365
        #[cfg(any(
366
            feature = "net",
367
            all(unix, feature = "process"),
368
            all(unix, feature = "signal")
369
        ))]
370
0
        self.enable_io();
371
        #[cfg(feature = "time")]
372
0
        self.enable_time();
373
374
0
        self
375
0
    }
376
377
    /// Sets the number of worker threads the `Runtime` will use.
378
    ///
379
    /// This can be any number above 0 though it is advised to keep this value
380
    /// on the smaller side.
381
    ///
382
    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
383
    ///
384
    /// # Default
385
    ///
386
    /// The default value is the number of cores available to the system.
387
    ///
388
    /// When using the `current_thread` runtime this method has no effect.
389
    ///
390
    /// # Examples
391
    ///
392
    /// ## Multi threaded runtime with 4 threads
393
    ///
394
    /// ```
395
    /// use tokio::runtime;
396
    ///
397
    /// // This will spawn a work-stealing runtime with 4 worker threads.
398
    /// let rt = runtime::Builder::new_multi_thread()
399
    ///     .worker_threads(4)
400
    ///     .build()
401
    ///     .unwrap();
402
    ///
403
    /// rt.spawn(async move {});
404
    /// ```
405
    ///
406
    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
407
    ///
408
    /// ```
409
    /// use tokio::runtime;
410
    ///
411
    /// // Create a runtime that _must_ be driven from a call
412
    /// // to `Runtime::block_on`.
413
    /// let rt = runtime::Builder::new_current_thread()
414
    ///     .build()
415
    ///     .unwrap();
416
    ///
417
    /// // This will run the runtime and future on the current thread
418
    /// rt.block_on(async move {});
419
    /// ```
420
    ///
421
    /// # Panics
422
    ///
423
    /// This will panic if `val` is not larger than `0`.
424
    #[track_caller]
425
0
    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
426
0
        assert!(val > 0, "Worker threads cannot be set to 0");
427
0
        self.worker_threads = Some(val);
428
0
        self
429
0
    }
430
431
    /// Specifies the limit for additional threads spawned by the Runtime.
432
    ///
433
    /// These threads are used for blocking operations like tasks spawned
434
    /// through [`spawn_blocking`], this includes but is not limited to:
435
    /// - [`fs`] operations
436
    /// - dns resolution through [`ToSocketAddrs`]
437
    /// - writing to [`Stdout`] or [`Stderr`]
438
    /// - reading from [`Stdin`]
439
    ///
440
    /// Unlike the [`worker_threads`], they are not always active and will exit
441
    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
442
    ///
443
    /// It's recommended to not set this limit too low in order to avoid hanging on operations
444
    /// requiring [`spawn_blocking`].
445
    ///
446
    /// The default value is 512.
447
    ///
448
    /// # Panics
449
    ///
450
    /// This will panic if `val` is not larger than `0`.
451
    ///
452
    /// # Upgrading from 0.x
453
    ///
454
    /// In old versions `max_threads` limited both blocking and worker threads, but the
455
    /// current `max_blocking_threads` does not include async worker threads in the count.
456
    ///
457
    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
458
    /// [`fs`]: mod@crate::fs
459
    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
460
    /// [`Stdout`]: struct@crate::io::Stdout
461
    /// [`Stdin`]: struct@crate::io::Stdin
462
    /// [`Stderr`]: struct@crate::io::Stderr
463
    /// [`worker_threads`]: Self::worker_threads
464
    /// [`thread_keep_alive`]: Self::thread_keep_alive
465
    #[track_caller]
466
    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
467
0
    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
468
0
        assert!(val > 0, "Max blocking threads cannot be set to 0");
469
0
        self.max_blocking_threads = val;
470
0
        self
471
0
    }
472
473
    /// Sets name of threads spawned by the `Runtime`'s thread pool.
474
    ///
475
    /// The default name is "tokio-runtime-worker".
476
    ///
477
    /// # Examples
478
    ///
479
    /// ```
480
    /// # use tokio::runtime;
481
    ///
482
    /// # pub fn main() {
483
    /// let rt = runtime::Builder::new_multi_thread()
484
    ///     .thread_name("my-pool")
485
    ///     .build();
486
    /// # }
487
    /// ```
488
0
    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
489
0
        let val = val.into();
490
0
        self.thread_name = std::sync::Arc::new(move || val.clone());
491
0
        self
492
0
    }
493
494
    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
495
    ///
496
    /// The default name fn is `|| "tokio-runtime-worker".into()`.
497
    ///
498
    /// # Examples
499
    ///
500
    /// ```
501
    /// # use tokio::runtime;
502
    /// # use std::sync::atomic::{AtomicUsize, Ordering};
503
    /// # pub fn main() {
504
    /// let rt = runtime::Builder::new_multi_thread()
505
    ///     .thread_name_fn(|| {
506
    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
507
    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
508
    ///        format!("my-pool-{}", id)
509
    ///     })
510
    ///     .build();
511
    /// # }
512
    /// ```
513
0
    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
514
0
    where
515
0
        F: Fn() -> String + Send + Sync + 'static,
516
    {
517
0
        self.thread_name = std::sync::Arc::new(f);
518
0
        self
519
0
    }
Unexecuted instantiation: <tokio::runtime::builder::Builder>::thread_name_fn::<ztunnel::app::new_data_plane_pool::{closure#0}::{closure#0}>
Unexecuted instantiation: <tokio::runtime::builder::Builder>::thread_name_fn::<_>
520
521
    /// Sets the stack size (in bytes) for worker threads.
522
    ///
523
    /// The actual stack size may be greater than this value if the platform
524
    /// specifies minimal stack size.
525
    ///
526
    /// The default stack size for spawned threads is 2 MiB, though this
527
    /// particular stack size is subject to change in the future.
528
    ///
529
    /// # Examples
530
    ///
531
    /// ```
532
    /// # use tokio::runtime;
533
    ///
534
    /// # pub fn main() {
535
    /// let rt = runtime::Builder::new_multi_thread()
536
    ///     .thread_stack_size(32 * 1024)
537
    ///     .build();
538
    /// # }
539
    /// ```
540
0
    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
541
0
        self.thread_stack_size = Some(val);
542
0
        self
543
0
    }
544
545
    /// Executes function `f` after each thread is started but before it starts
546
    /// doing work.
547
    ///
548
    /// This is intended for bookkeeping and monitoring use cases.
549
    ///
550
    /// # Examples
551
    ///
552
    /// ```
553
    /// # use tokio::runtime;
554
    /// # pub fn main() {
555
    /// let runtime = runtime::Builder::new_multi_thread()
556
    ///     .on_thread_start(|| {
557
    ///         println!("thread started");
558
    ///     })
559
    ///     .build();
560
    /// # }
561
    /// ```
562
    #[cfg(not(loom))]
563
0
    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
564
0
    where
565
0
        F: Fn() + Send + Sync + 'static,
566
    {
567
0
        self.after_start = Some(std::sync::Arc::new(f));
568
0
        self
569
0
    }
570
571
    /// Executes function `f` before each thread stops.
572
    ///
573
    /// This is intended for bookkeeping and monitoring use cases.
574
    ///
575
    /// # Examples
576
    ///
577
    /// ```
578
    /// # use tokio::runtime;
579
    /// # pub fn main() {
580
    /// let runtime = runtime::Builder::new_multi_thread()
581
    ///     .on_thread_stop(|| {
582
    ///         println!("thread stopping");
583
    ///     })
584
    ///     .build();
585
    /// # }
586
    /// ```
587
    #[cfg(not(loom))]
588
0
    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
589
0
    where
590
0
        F: Fn() + Send + Sync + 'static,
591
    {
592
0
        self.before_stop = Some(std::sync::Arc::new(f));
593
0
        self
594
0
    }
595
596
    /// Executes function `f` just before a thread is parked (goes idle).
597
    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
598
    /// can be called, and may result in this thread being unparked immediately.
599
    ///
600
    /// This can be used to start work only when the executor is idle, or for bookkeeping
601
    /// and monitoring purposes.
602
    ///
603
    /// Note: There can only be one park callback for a runtime; calling this function
604
    /// more than once replaces the last callback defined, rather than adding to it.
605
    ///
606
    /// # Examples
607
    ///
608
    /// ## Multithreaded executor
609
    /// ```
610
    /// # use std::sync::Arc;
611
    /// # use std::sync::atomic::{AtomicBool, Ordering};
612
    /// # use tokio::runtime;
613
    /// # use tokio::sync::Barrier;
614
    /// # pub fn main() {
615
    /// let once = AtomicBool::new(true);
616
    /// let barrier = Arc::new(Barrier::new(2));
617
    ///
618
    /// let runtime = runtime::Builder::new_multi_thread()
619
    ///     .worker_threads(1)
620
    ///     .on_thread_park({
621
    ///         let barrier = barrier.clone();
622
    ///         move || {
623
    ///             let barrier = barrier.clone();
624
    ///             if once.swap(false, Ordering::Relaxed) {
625
    ///                 tokio::spawn(async move { barrier.wait().await; });
626
    ///            }
627
    ///         }
628
    ///     })
629
    ///     .build()
630
    ///     .unwrap();
631
    ///
632
    /// runtime.block_on(async {
633
    ///    barrier.wait().await;
634
    /// })
635
    /// # }
636
    /// ```
637
    /// ## Current thread executor
638
    /// ```
639
    /// # use std::sync::Arc;
640
    /// # use std::sync::atomic::{AtomicBool, Ordering};
641
    /// # use tokio::runtime;
642
    /// # use tokio::sync::Barrier;
643
    /// # pub fn main() {
644
    /// let once = AtomicBool::new(true);
645
    /// let barrier = Arc::new(Barrier::new(2));
646
    ///
647
    /// let runtime = runtime::Builder::new_current_thread()
648
    ///     .on_thread_park({
649
    ///         let barrier = barrier.clone();
650
    ///         move || {
651
    ///             let barrier = barrier.clone();
652
    ///             if once.swap(false, Ordering::Relaxed) {
653
    ///                 tokio::spawn(async move { barrier.wait().await; });
654
    ///            }
655
    ///         }
656
    ///     })
657
    ///     .build()
658
    ///     .unwrap();
659
    ///
660
    /// runtime.block_on(async {
661
    ///    barrier.wait().await;
662
    /// })
663
    /// # }
664
    /// ```
665
    #[cfg(not(loom))]
666
0
    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
667
0
    where
668
0
        F: Fn() + Send + Sync + 'static,
669
    {
670
0
        self.before_park = Some(std::sync::Arc::new(f));
671
0
        self
672
0
    }
673
674
    /// Executes function `f` just after a thread unparks (starts executing tasks).
675
    ///
676
    /// This is intended for bookkeeping and monitoring use cases; note that work
677
    /// in this callback will increase latencies when the application has allowed one or
678
    /// more runtime threads to go idle.
679
    ///
680
    /// Note: There can only be one unpark callback for a runtime; calling this function
681
    /// more than once replaces the last callback defined, rather than adding to it.
682
    ///
683
    /// # Examples
684
    ///
685
    /// ```
686
    /// # use tokio::runtime;
687
    /// # pub fn main() {
688
    /// let runtime = runtime::Builder::new_multi_thread()
689
    ///     .on_thread_unpark(|| {
690
    ///         println!("thread unparking");
691
    ///     })
692
    ///     .build();
693
    ///
694
    /// runtime.unwrap().block_on(async {
695
    ///    tokio::task::yield_now().await;
696
    ///    println!("Hello from Tokio!");
697
    /// })
698
    /// # }
699
    /// ```
700
    #[cfg(not(loom))]
701
0
    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
702
0
    where
703
0
        F: Fn() + Send + Sync + 'static,
704
    {
705
0
        self.after_unpark = Some(std::sync::Arc::new(f));
706
0
        self
707
0
    }
708
709
    /// Executes function `f` just before a task is spawned.
710
    ///
711
    /// `f` is called within the Tokio context, so functions like
712
    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
713
    /// invoked immediately.
714
    ///
715
    /// This can be used for bookkeeping or monitoring purposes.
716
    ///
717
    /// Note: There can only be one spawn callback for a runtime; calling this function more
718
    /// than once replaces the last callback defined, rather than adding to it.
719
    ///
720
    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
721
    ///
722
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
723
    /// may break in 1.x releases. See [the documentation on unstable
724
    /// features][unstable] for details.
725
    ///
726
    /// [unstable]: crate#unstable-features
727
    ///
728
    /// # Examples
729
    ///
730
    /// ```
731
    /// # use tokio::runtime;
732
    /// # pub fn main() {
733
    /// let runtime = runtime::Builder::new_current_thread()
734
    ///     .on_task_spawn(|_| {
735
    ///         println!("spawning task");
736
    ///     })
737
    ///     .build()
738
    ///     .unwrap();
739
    ///
740
    /// runtime.block_on(async {
741
    ///     tokio::task::spawn(std::future::ready(()));
742
    ///
743
    ///     for _ in 0..64 {
744
    ///         tokio::task::yield_now().await;
745
    ///     }
746
    /// })
747
    /// # }
748
    /// ```
749
    #[cfg(all(not(loom), tokio_unstable))]
750
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
751
    pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
752
    where
753
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
754
    {
755
        self.before_spawn = Some(std::sync::Arc::new(f));
756
        self
757
    }
758
759
    /// Executes function `f` just before a task is polled
760
    ///
761
    /// `f` is called within the Tokio context, so functions like
762
    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
763
    /// invoked immediately.
764
    ///
765
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
766
    /// may break in 1.x releases. See [the documentation on unstable
767
    /// features][unstable] for details.
768
    ///
769
    /// [unstable]: crate#unstable-features
770
    ///
771
    /// # Examples
772
    ///
773
    /// ```
774
    /// # use std::sync::{atomic::AtomicUsize, Arc};
775
    /// # use tokio::task::yield_now;
776
    /// # pub fn main() {
777
    /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
778
    /// let poll_start = poll_start_counter.clone();
779
    /// let rt = tokio::runtime::Builder::new_multi_thread()
780
    ///     .enable_all()
781
    ///     .on_before_task_poll(move |meta| {
782
    ///         println!("task {} is about to be polled", meta.id())
783
    ///     })
784
    ///     .build()
785
    ///     .unwrap();
786
    /// let task = rt.spawn(async {
787
    ///     yield_now().await;
788
    /// });
789
    /// let _ = rt.block_on(task);
790
    ///
791
    /// # }
792
    /// ```
793
    #[cfg(tokio_unstable)]
794
    pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
795
    where
796
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
797
    {
798
        self.before_poll = Some(std::sync::Arc::new(f));
799
        self
800
    }
801
802
    /// Executes function `f` just after a task is polled
803
    ///
804
    /// `f` is called within the Tokio context, so functions like
805
    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
806
    /// invoked immediately.
807
    ///
808
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
809
    /// may break in 1.x releases. See [the documentation on unstable
810
    /// features][unstable] for details.
811
    ///
812
    /// [unstable]: crate#unstable-features
813
    ///
814
    /// # Examples
815
    ///
816
    /// ```
817
    /// # use std::sync::{atomic::AtomicUsize, Arc};
818
    /// # use tokio::task::yield_now;
819
    /// # pub fn main() {
820
    /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
821
    /// let poll_stop = poll_stop_counter.clone();
822
    /// let rt = tokio::runtime::Builder::new_multi_thread()
823
    ///     .enable_all()
824
    ///     .on_after_task_poll(move |meta| {
825
    ///         println!("task {} completed polling", meta.id());
826
    ///     })
827
    ///     .build()
828
    ///     .unwrap();
829
    /// let task = rt.spawn(async {
830
    ///     yield_now().await;
831
    /// });
832
    /// let _ = rt.block_on(task);
833
    ///
834
    /// # }
835
    /// ```
836
    #[cfg(tokio_unstable)]
837
    pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
838
    where
839
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
840
    {
841
        self.after_poll = Some(std::sync::Arc::new(f));
842
        self
843
    }
844
845
    /// Executes function `f` just after a task is terminated.
846
    ///
847
    /// `f` is called within the Tokio context, so functions like
848
    /// [`tokio::spawn`](crate::spawn) can be called.
849
    ///
850
    /// This can be used for bookkeeping or monitoring purposes.
851
    ///
852
    /// Note: There can only be one task termination callback for a runtime; calling this
853
    /// function more than once replaces the last callback defined, rather than adding to it.
854
    ///
855
    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
856
    ///
857
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
858
    /// may break in 1.x releases. See [the documentation on unstable
859
    /// features][unstable] for details.
860
    ///
861
    /// [unstable]: crate#unstable-features
862
    ///
863
    /// # Examples
864
    ///
865
    /// ```
866
    /// # use tokio::runtime;
867
    /// # pub fn main() {
868
    /// let runtime = runtime::Builder::new_current_thread()
869
    ///     .on_task_terminate(|_| {
870
    ///         println!("killing task");
871
    ///     })
872
    ///     .build()
873
    ///     .unwrap();
874
    ///
875
    /// runtime.block_on(async {
876
    ///     tokio::task::spawn(std::future::ready(()));
877
    ///
878
    ///     for _ in 0..64 {
879
    ///         tokio::task::yield_now().await;
880
    ///     }
881
    /// })
882
    /// # }
883
    /// ```
884
    #[cfg(all(not(loom), tokio_unstable))]
885
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
886
    pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
887
    where
888
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
889
    {
890
        self.after_termination = Some(std::sync::Arc::new(f));
891
        self
892
    }
893
894
    /// Creates the configured `Runtime`.
895
    ///
896
    /// The returned `Runtime` instance is ready to spawn tasks.
897
    ///
898
    /// # Examples
899
    ///
900
    /// ```
901
    /// use tokio::runtime::Builder;
902
    ///
903
    /// let rt  = Builder::new_multi_thread().build().unwrap();
904
    ///
905
    /// rt.block_on(async {
906
    ///     println!("Hello from the Tokio runtime");
907
    /// });
908
    /// ```
909
0
    pub fn build(&mut self) -> io::Result<Runtime> {
910
0
        match &self.kind {
911
0
            Kind::CurrentThread => self.build_current_thread_runtime(),
912
            #[cfg(feature = "rt-multi-thread")]
913
0
            Kind::MultiThread => self.build_threaded_runtime(),
914
            #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
915
            Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
916
        }
917
0
    }
918
919
    /// Creates the configured `LocalRuntime`.
920
    ///
921
    /// The returned `LocalRuntime` instance is ready to spawn tasks.
922
    ///
923
    /// # Panics
924
    /// This will panic if `current_thread` is not the selected runtime flavor.
925
    /// All other runtime flavors are unsupported by [`LocalRuntime`].
926
    ///
927
    /// [`LocalRuntime`]: [crate::runtime::LocalRuntime]
928
    ///
929
    /// # Examples
930
    ///
931
    /// ```
932
    /// use tokio::runtime::Builder;
933
    ///
934
    /// let rt  = Builder::new_current_thread().build_local(&mut Default::default()).unwrap();
935
    ///
936
    /// rt.block_on(async {
937
    ///     println!("Hello from the Tokio runtime");
938
    /// });
939
    /// ```
940
    #[allow(unused_variables, unreachable_patterns)]
941
    #[cfg(tokio_unstable)]
942
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
943
    pub fn build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime> {
944
        match &self.kind {
945
            Kind::CurrentThread => self.build_current_thread_local_runtime(),
946
            _ => panic!("Only current_thread is supported when building a local runtime"),
947
        }
948
    }
949
950
0
    fn get_cfg(&self, workers: usize) -> driver::Cfg {
951
        driver::Cfg {
952
0
            enable_pause_time: match self.kind {
953
0
                Kind::CurrentThread => true,
954
                #[cfg(feature = "rt-multi-thread")]
955
0
                Kind::MultiThread => false,
956
                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
957
                Kind::MultiThreadAlt => false,
958
            },
959
0
            enable_io: self.enable_io,
960
0
            enable_time: self.enable_time,
961
0
            start_paused: self.start_paused,
962
0
            nevents: self.nevents,
963
0
            workers,
964
        }
965
0
    }
966
967
    /// Sets a custom timeout for a thread in the blocking pool.
968
    ///
969
    /// By default, the timeout for a thread is set to 10 seconds. This can
970
    /// be overridden using `.thread_keep_alive()`.
971
    ///
972
    /// # Example
973
    ///
974
    /// ```
975
    /// # use tokio::runtime;
976
    /// # use std::time::Duration;
977
    /// # pub fn main() {
978
    /// let rt = runtime::Builder::new_multi_thread()
979
    ///     .thread_keep_alive(Duration::from_millis(100))
980
    ///     .build();
981
    /// # }
982
    /// ```
983
0
    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
984
0
        self.keep_alive = Some(duration);
985
0
        self
986
0
    }
987
988
    /// Sets the number of scheduler ticks after which the scheduler will poll the global
989
    /// task queue.
990
    ///
991
    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
992
    ///
993
    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
994
    /// [the module documentation] for the default behavior of the multi-thread scheduler.
995
    ///
996
    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
997
    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
998
    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
999
    /// getting started on new work, especially if tasks frequently yield rather than complete
1000
    /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
1001
    /// is a good choice when most tasks quickly complete polling.
1002
    ///
1003
    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1004
    ///
1005
    /// # Panics
1006
    ///
1007
    /// This function will panic if 0 is passed as an argument.
1008
    ///
1009
    /// # Examples
1010
    ///
1011
    /// ```
1012
    /// # use tokio::runtime;
1013
    /// # pub fn main() {
1014
    /// let rt = runtime::Builder::new_multi_thread()
1015
    ///     .global_queue_interval(31)
1016
    ///     .build();
1017
    /// # }
1018
    /// ```
1019
    #[track_caller]
1020
0
    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1021
0
        assert!(val > 0, "global_queue_interval must be greater than 0");
1022
0
        self.global_queue_interval = Some(val);
1023
0
        self
1024
0
    }
1025
1026
    /// Sets the number of scheduler ticks after which the scheduler will poll for
1027
    /// external events (timers, I/O, and so on).
1028
    ///
1029
    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1030
    ///
1031
    /// By default, the event interval is `61` for all scheduler types.
1032
    ///
1033
    /// Setting the event interval determines the effective "priority" of delivering
1034
    /// these external events (which may wake up additional tasks), compared to
1035
    /// executing tasks that are currently ready to run. A smaller value is useful
1036
    /// when tasks frequently spend a long time in polling, or frequently yield,
1037
    /// which can result in overly long delays picking up I/O events. Conversely,
1038
    /// picking up new events requires extra synchronization and syscall overhead,
1039
    /// so if tasks generally complete their polling quickly, a higher event interval
1040
    /// will minimize that overhead while still keeping the scheduler responsive to
1041
    /// events.
1042
    ///
1043
    /// # Examples
1044
    ///
1045
    /// ```
1046
    /// # use tokio::runtime;
1047
    /// # pub fn main() {
1048
    /// let rt = runtime::Builder::new_multi_thread()
1049
    ///     .event_interval(31)
1050
    ///     .build();
1051
    /// # }
1052
    /// ```
1053
0
    pub fn event_interval(&mut self, val: u32) -> &mut Self {
1054
0
        self.event_interval = val;
1055
0
        self
1056
0
    }
1057
1058
    cfg_unstable! {
1059
        /// Configure how the runtime responds to an unhandled panic on a
1060
        /// spawned task.
1061
        ///
1062
        /// By default, an unhandled panic (i.e. a panic not caught by
1063
        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1064
        /// execution. The panic's error value is forwarded to the task's
1065
        /// [`JoinHandle`] and all other spawned tasks continue running.
1066
        ///
1067
        /// The `unhandled_panic` option enables configuring this behavior.
1068
        ///
1069
        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1070
        ///   spawned tasks have no impact on the runtime's execution.
1071
        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1072
        ///   shutdown immediately when a spawned task panics even if that
1073
        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
1074
        ///   will immediately terminate and further calls to
1075
        ///   [`Runtime::block_on`] will panic.
1076
        ///
1077
        /// # Panics
1078
        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1079
        /// on a runtime other than the current thread runtime.
1080
        ///
1081
        /// # Unstable
1082
        ///
1083
        /// This option is currently unstable and its implementation is
1084
        /// incomplete. The API may change or be removed in the future. See
1085
        /// issue [tokio-rs/tokio#4516] for more details.
1086
        ///
1087
        /// # Examples
1088
        ///
1089
        /// The following demonstrates a runtime configured to shutdown on
1090
        /// panic. The first spawned task panics and results in the runtime
1091
        /// shutting down. The second spawned task never has a chance to
1092
        /// execute. The call to `block_on` will panic due to the runtime being
1093
        /// forcibly shutdown.
1094
        ///
1095
        /// ```should_panic
1096
        /// use tokio::runtime::{self, UnhandledPanic};
1097
        ///
1098
        /// # pub fn main() {
1099
        /// let rt = runtime::Builder::new_current_thread()
1100
        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1101
        ///     .build()
1102
        ///     .unwrap();
1103
        ///
1104
        /// rt.spawn(async { panic!("boom"); });
1105
        /// rt.spawn(async {
1106
        ///     // This task never completes.
1107
        /// });
1108
        ///
1109
        /// rt.block_on(async {
1110
        ///     // Do some work
1111
        /// # loop { tokio::task::yield_now().await; }
1112
        /// })
1113
        /// # }
1114
        /// ```
1115
        ///
1116
        /// [`JoinHandle`]: struct@crate::task::JoinHandle
1117
        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1118
        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1119
            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1120
                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1121
            }
1122
1123
            self.unhandled_panic = behavior;
1124
            self
1125
        }
1126
1127
        /// Disables the LIFO task scheduler heuristic.
1128
        ///
1129
        /// The multi-threaded scheduler includes a heuristic for optimizing
1130
        /// message-passing patterns. This heuristic results in the **last**
1131
        /// scheduled task being polled first.
1132
        ///
1133
        /// To implement this heuristic, each worker thread has a slot which
1134
        /// holds the task that should be polled next. However, this slot cannot
1135
        /// be stolen by other worker threads, which can result in lower total
1136
        /// throughput when tasks tend to have longer poll times.
1137
        ///
1138
        /// This configuration option will disable this heuristic resulting in
1139
        /// all scheduled tasks being pushed into the worker-local queue, which
1140
        /// is stealable.
1141
        ///
1142
        /// Consider trying this option when the task "scheduled" time is high
1143
        /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1144
        /// collect this data.
1145
        ///
1146
        /// # Unstable
1147
        ///
1148
        /// This configuration option is considered a workaround for the LIFO
1149
        /// slot not being stealable. When the slot becomes stealable, we will
1150
        /// revisit whether or not this option is necessary. See
1151
        /// issue [tokio-rs/tokio#4941].
1152
        ///
1153
        /// # Examples
1154
        ///
1155
        /// ```
1156
        /// use tokio::runtime;
1157
        ///
1158
        /// let rt = runtime::Builder::new_multi_thread()
1159
        ///     .disable_lifo_slot()
1160
        ///     .build()
1161
        ///     .unwrap();
1162
        /// ```
1163
        ///
1164
        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1165
        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1166
        pub fn disable_lifo_slot(&mut self) -> &mut Self {
1167
            self.disable_lifo_slot = true;
1168
            self
1169
        }
1170
1171
        /// Specifies the random number generation seed to use within all
1172
        /// threads associated with the runtime being built.
1173
        ///
1174
        /// This option is intended to make certain parts of the runtime
1175
        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1176
        /// [`tokio::select!`] it will ensure that the order that branches are
1177
        /// polled is deterministic.
1178
        ///
1179
        /// In addition to the code specifying `rng_seed` and interacting with
1180
        /// the runtime, the internals of Tokio and the Rust compiler may affect
1181
        /// the sequences of random numbers. In order to ensure repeatable
1182
        /// results, the version of Tokio, the versions of all other
1183
        /// dependencies that interact with Tokio, and the Rust compiler version
1184
        /// should also all remain constant.
1185
        ///
1186
        /// # Examples
1187
        ///
1188
        /// ```
1189
        /// # use tokio::runtime::{self, RngSeed};
1190
        /// # pub fn main() {
1191
        /// let seed = RngSeed::from_bytes(b"place your seed here");
1192
        /// let rt = runtime::Builder::new_current_thread()
1193
        ///     .rng_seed(seed)
1194
        ///     .build();
1195
        /// # }
1196
        /// ```
1197
        ///
1198
        /// [`tokio::select!`]: crate::select
1199
        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1200
            self.seed_generator = RngSeedGenerator::new(seed);
1201
            self
1202
        }
1203
    }
1204
1205
    cfg_unstable_metrics! {
1206
        /// Enables tracking the distribution of task poll times.
1207
        ///
1208
        /// Task poll times are not instrumented by default as doing so requires
1209
        /// calling [`Instant::now()`] twice per task poll, which could add
1210
        /// measurable overhead. Use the [`Handle::metrics()`] to access the
1211
        /// metrics data.
1212
        ///
1213
        /// The histogram uses fixed bucket sizes. In other words, the histogram
1214
        /// buckets are not dynamic based on input values. Use the
1215
        /// `metrics_poll_time_histogram` builder methods to configure the
1216
        /// histogram details.
1217
        ///
1218
        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1219
        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1220
        /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1221
        /// to select [`LogHistogram`] instead.
1222
        ///
1223
        /// # Examples
1224
        ///
1225
        /// ```
1226
        /// use tokio::runtime;
1227
        ///
1228
        /// let rt = runtime::Builder::new_multi_thread()
1229
        ///     .enable_metrics_poll_time_histogram()
1230
        ///     .build()
1231
        ///     .unwrap();
1232
        /// # // Test default values here
1233
        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1234
        /// # let m = rt.handle().metrics();
1235
        /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1236
        /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1237
        /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1238
        /// ```
1239
        ///
1240
        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1241
        /// [`Instant::now()`]: std::time::Instant::now
1242
        /// [`LogHistogram`]: crate::runtime::LogHistogram
1243
        /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1244
        pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1245
            self.metrics_poll_count_histogram_enable = true;
1246
            self
1247
        }
1248
1249
        /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1250
        ///
1251
        /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1252
        #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1253
        #[doc(hidden)]
1254
        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1255
            self.enable_metrics_poll_time_histogram()
1256
        }
1257
1258
        /// Sets the histogram scale for tracking the distribution of task poll
1259
        /// times.
1260
        ///
1261
        /// Tracking the distribution of task poll times can be done using a
1262
        /// linear or log scale. When using linear scale, each histogram bucket
1263
        /// will represent the same range of poll times. When using log scale,
1264
        /// each histogram bucket will cover a range twice as big as the
1265
        /// previous bucket.
1266
        ///
1267
        /// **Default:** linear scale.
1268
        ///
1269
        /// # Examples
1270
        ///
1271
        /// ```
1272
        /// use tokio::runtime::{self, HistogramScale};
1273
        ///
1274
        /// # #[allow(deprecated)]
1275
        /// let rt = runtime::Builder::new_multi_thread()
1276
        ///     .enable_metrics_poll_time_histogram()
1277
        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1278
        ///     .build()
1279
        ///     .unwrap();
1280
        /// ```
1281
        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1282
        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1283
            self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1284
            self
1285
        }
1286
1287
        /// Configure the histogram for tracking poll times
1288
        ///
1289
        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1290
        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1291
        /// better granularity with low memory usage, use [`LogHistogram`] instead.
1292
        ///
1293
        /// # Examples
1294
        /// Configure a [`LogHistogram`] with [default configuration]:
1295
        /// ```
1296
        /// use tokio::runtime;
1297
        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1298
        ///
1299
        /// let rt = runtime::Builder::new_multi_thread()
1300
        ///     .enable_metrics_poll_time_histogram()
1301
        ///     .metrics_poll_time_histogram_configuration(
1302
        ///         HistogramConfiguration::log(LogHistogram::default())
1303
        ///     )
1304
        ///     .build()
1305
        ///     .unwrap();
1306
        /// ```
1307
        ///
1308
        /// Configure a linear histogram with 100 buckets, each 10μs wide
1309
        /// ```
1310
        /// use tokio::runtime;
1311
        /// use std::time::Duration;
1312
        /// use tokio::runtime::HistogramConfiguration;
1313
        ///
1314
        /// let rt = runtime::Builder::new_multi_thread()
1315
        ///     .enable_metrics_poll_time_histogram()
1316
        ///     .metrics_poll_time_histogram_configuration(
1317
        ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1318
        ///     )
1319
        ///     .build()
1320
        ///     .unwrap();
1321
        /// ```
1322
        ///
1323
        /// Configure a [`LogHistogram`] with the following settings:
1324
        /// - Measure times from 100ns to 120s
1325
        /// - Max error of 0.1
1326
        /// - No more than 1024 buckets
1327
        /// ```
1328
        /// use std::time::Duration;
1329
        /// use tokio::runtime;
1330
        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1331
        ///
1332
        /// let rt = runtime::Builder::new_multi_thread()
1333
        ///     .enable_metrics_poll_time_histogram()
1334
        ///     .metrics_poll_time_histogram_configuration(
1335
        ///         HistogramConfiguration::log(LogHistogram::builder()
1336
        ///             .max_value(Duration::from_secs(120))
1337
        ///             .min_value(Duration::from_nanos(100))
1338
        ///             .max_error(0.1)
1339
        ///             .max_buckets(1024)
1340
        ///             .expect("configuration uses 488 buckets")
1341
        ///         )
1342
        ///     )
1343
        ///     .build()
1344
        ///     .unwrap();
1345
        /// ```
1346
        ///
1347
        /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1348
        /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1349
        /// where each bucket is twice the size of the previous bucket.
1350
        /// ```rust
1351
        /// use std::time::Duration;
1352
        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1353
        /// let rt = tokio::runtime::Builder::new_current_thread()
1354
        ///     .enable_all()
1355
        ///     .enable_metrics_poll_time_histogram()
1356
        ///     .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1357
        ///         LogHistogram::builder()
1358
        ///             .min_value(Duration::from_micros(20))
1359
        ///             .max_value(Duration::from_millis(4))
1360
        ///             // Set `precision_exact` to `0` to match `HistogramScale::Log`
1361
        ///             .precision_exact(0)
1362
        ///             .max_buckets(10)
1363
        ///             .unwrap(),
1364
        ///     ))
1365
        ///     .build()
1366
        ///     .unwrap();
1367
        /// ```
1368
        ///
1369
        /// [`LogHistogram`]: crate::runtime::LogHistogram
1370
        /// [default configuration]: crate::runtime::LogHistogramBuilder
1371
        /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1372
        pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1373
            self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1374
            self
1375
        }
1376
1377
        /// Sets the histogram resolution for tracking the distribution of task
1378
        /// poll times.
1379
        ///
1380
        /// The resolution is the histogram's first bucket's range. When using a
1381
        /// linear histogram scale, each bucket will cover the same range. When
1382
        /// using a log scale, each bucket will cover a range twice as big as
1383
        /// the previous bucket. In the log case, the resolution represents the
1384
        /// smallest bucket range.
1385
        ///
1386
        /// Note that, when using log scale, the resolution is rounded up to the
1387
        /// nearest power of 2 in nanoseconds.
1388
        ///
1389
        /// **Default:** 100 microseconds.
1390
        ///
1391
        /// # Examples
1392
        ///
1393
        /// ```
1394
        /// use tokio::runtime;
1395
        /// use std::time::Duration;
1396
        ///
1397
        /// # #[allow(deprecated)]
1398
        /// let rt = runtime::Builder::new_multi_thread()
1399
        ///     .enable_metrics_poll_time_histogram()
1400
        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1401
        ///     .build()
1402
        ///     .unwrap();
1403
        /// ```
1404
        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1405
        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1406
            assert!(resolution > Duration::from_secs(0));
1407
            // Sanity check the argument and also make the cast below safe.
1408
            assert!(resolution <= Duration::from_secs(1));
1409
1410
            let resolution = resolution.as_nanos() as u64;
1411
1412
            self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1413
            self
1414
        }
1415
1416
        /// Sets the number of buckets for the histogram tracking the
1417
        /// distribution of task poll times.
1418
        ///
1419
        /// The last bucket tracks all greater values that fall out of other
1420
        /// ranges. So, configuring the histogram using a linear scale,
1421
        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1422
        /// polls that take more than 450ms to complete.
1423
        ///
1424
        /// **Default:** 10
1425
        ///
1426
        /// # Examples
1427
        ///
1428
        /// ```
1429
        /// use tokio::runtime;
1430
        ///
1431
        /// # #[allow(deprecated)]
1432
        /// let rt = runtime::Builder::new_multi_thread()
1433
        ///     .enable_metrics_poll_time_histogram()
1434
        ///     .metrics_poll_count_histogram_buckets(15)
1435
        ///     .build()
1436
        ///     .unwrap();
1437
        /// ```
1438
        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1439
        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1440
            self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1441
            self
1442
        }
1443
    }
1444
1445
    cfg_loom! {
1446
        pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
1447
            assert!(value.is_power_of_two());
1448
            self.local_queue_capacity = value;
1449
            self
1450
        }
1451
    }
1452
1453
0
    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1454
        use crate::runtime::runtime::Scheduler;
1455
1456
0
        let (scheduler, handle, blocking_pool) =
1457
0
            self.build_current_thread_runtime_components(None)?;
1458
1459
0
        Ok(Runtime::from_parts(
1460
0
            Scheduler::CurrentThread(scheduler),
1461
0
            handle,
1462
0
            blocking_pool,
1463
0
        ))
1464
0
    }
1465
1466
    #[cfg(tokio_unstable)]
1467
    fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1468
        use crate::runtime::local_runtime::LocalRuntimeScheduler;
1469
1470
        let tid = std::thread::current().id();
1471
1472
        let (scheduler, handle, blocking_pool) =
1473
            self.build_current_thread_runtime_components(Some(tid))?;
1474
1475
        Ok(LocalRuntime::from_parts(
1476
            LocalRuntimeScheduler::CurrentThread(scheduler),
1477
            handle,
1478
            blocking_pool,
1479
        ))
1480
    }
1481
1482
0
    fn build_current_thread_runtime_components(
1483
0
        &mut self,
1484
0
        local_tid: Option<ThreadId>,
1485
0
    ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1486
        use crate::runtime::scheduler;
1487
        use crate::runtime::Config;
1488
1489
0
        let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?;
1490
1491
        // Blocking pool
1492
0
        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1493
0
        let blocking_spawner = blocking_pool.spawner().clone();
1494
1495
        // Generate a rng seed for this runtime.
1496
0
        let seed_generator_1 = self.seed_generator.next_generator();
1497
0
        let seed_generator_2 = self.seed_generator.next_generator();
1498
1499
        // And now put a single-threaded scheduler on top of the timer. When
1500
        // there are no futures ready to do something, it'll let the timer or
1501
        // the reactor to generate some new stimuli for the futures to continue
1502
        // in their life.
1503
0
        let (scheduler, handle) = CurrentThread::new(
1504
0
            driver,
1505
0
            driver_handle,
1506
0
            blocking_spawner,
1507
0
            seed_generator_2,
1508
0
            Config {
1509
0
                before_park: self.before_park.clone(),
1510
0
                after_unpark: self.after_unpark.clone(),
1511
0
                before_spawn: self.before_spawn.clone(),
1512
0
                #[cfg(tokio_unstable)]
1513
0
                before_poll: self.before_poll.clone(),
1514
0
                #[cfg(tokio_unstable)]
1515
0
                after_poll: self.after_poll.clone(),
1516
0
                after_termination: self.after_termination.clone(),
1517
0
                global_queue_interval: self.global_queue_interval,
1518
0
                event_interval: self.event_interval,
1519
0
                local_queue_capacity: self.local_queue_capacity,
1520
0
                #[cfg(tokio_unstable)]
1521
0
                unhandled_panic: self.unhandled_panic.clone(),
1522
0
                disable_lifo_slot: self.disable_lifo_slot,
1523
0
                seed_generator: seed_generator_1,
1524
0
                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1525
0
            },
1526
0
            local_tid,
1527
0
        );
1528
1529
0
        let handle = Handle {
1530
0
            inner: scheduler::Handle::CurrentThread(handle),
1531
0
        };
1532
1533
0
        Ok((scheduler, handle, blocking_pool))
1534
0
    }
1535
1536
0
    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1537
0
        if self.metrics_poll_count_histogram_enable {
1538
0
            Some(self.metrics_poll_count_histogram.clone())
1539
        } else {
1540
0
            None
1541
        }
1542
0
    }
1543
}
1544
1545
cfg_io_driver! {
1546
    impl Builder {
1547
        /// Enables the I/O driver.
1548
        ///
1549
        /// Doing this enables using net, process, signal, and some I/O types on
1550
        /// the runtime.
1551
        ///
1552
        /// # Examples
1553
        ///
1554
        /// ```
1555
        /// use tokio::runtime;
1556
        ///
1557
        /// let rt = runtime::Builder::new_multi_thread()
1558
        ///     .enable_io()
1559
        ///     .build()
1560
        ///     .unwrap();
1561
        /// ```
1562
0
        pub fn enable_io(&mut self) -> &mut Self {
1563
0
            self.enable_io = true;
1564
0
            self
1565
0
        }
1566
1567
        /// Enables the I/O driver and configures the max number of events to be
1568
        /// processed per tick.
1569
        ///
1570
        /// # Examples
1571
        ///
1572
        /// ```
1573
        /// use tokio::runtime;
1574
        ///
1575
        /// let rt = runtime::Builder::new_current_thread()
1576
        ///     .enable_io()
1577
        ///     .max_io_events_per_tick(1024)
1578
        ///     .build()
1579
        ///     .unwrap();
1580
        /// ```
1581
0
        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1582
0
            self.nevents = capacity;
1583
0
            self
1584
0
        }
1585
    }
1586
}
1587
1588
cfg_time! {
1589
    impl Builder {
1590
        /// Enables the time driver.
1591
        ///
1592
        /// Doing this enables using `tokio::time` on the runtime.
1593
        ///
1594
        /// # Examples
1595
        ///
1596
        /// ```
1597
        /// use tokio::runtime;
1598
        ///
1599
        /// let rt = runtime::Builder::new_multi_thread()
1600
        ///     .enable_time()
1601
        ///     .build()
1602
        ///     .unwrap();
1603
        /// ```
1604
0
        pub fn enable_time(&mut self) -> &mut Self {
1605
0
            self.enable_time = true;
1606
0
            self
1607
0
        }
1608
    }
1609
}
1610
1611
cfg_test_util! {
1612
    impl Builder {
1613
        /// Controls if the runtime's clock starts paused or advancing.
1614
        ///
1615
        /// Pausing time requires the current-thread runtime; construction of
1616
        /// the runtime will panic otherwise.
1617
        ///
1618
        /// # Examples
1619
        ///
1620
        /// ```
1621
        /// use tokio::runtime;
1622
        ///
1623
        /// let rt = runtime::Builder::new_current_thread()
1624
        ///     .enable_time()
1625
        ///     .start_paused(true)
1626
        ///     .build()
1627
        ///     .unwrap();
1628
        /// ```
1629
0
        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1630
0
            self.start_paused = start_paused;
1631
0
            self
1632
0
        }
1633
    }
1634
}
1635
1636
cfg_rt_multi_thread! {
1637
    impl Builder {
1638
0
        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1639
            use crate::loom::sys::num_cpus;
1640
            use crate::runtime::{Config, runtime::Scheduler};
1641
            use crate::runtime::scheduler::{self, MultiThread};
1642
1643
0
            let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1644
1645
0
            let (driver, driver_handle) = driver::Driver::new(self.get_cfg(worker_threads))?;
1646
1647
            // Create the blocking pool
1648
0
            let blocking_pool =
1649
0
                blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1650
0
            let blocking_spawner = blocking_pool.spawner().clone();
1651
1652
            // Generate a rng seed for this runtime.
1653
0
            let seed_generator_1 = self.seed_generator.next_generator();
1654
0
            let seed_generator_2 = self.seed_generator.next_generator();
1655
1656
0
            let (scheduler, handle, launch) = MultiThread::new(
1657
0
                worker_threads,
1658
0
                driver,
1659
0
                driver_handle,
1660
0
                blocking_spawner,
1661
0
                seed_generator_2,
1662
0
                Config {
1663
0
                    before_park: self.before_park.clone(),
1664
0
                    after_unpark: self.after_unpark.clone(),
1665
0
                    before_spawn: self.before_spawn.clone(),
1666
0
                    #[cfg(tokio_unstable)]
1667
0
                    before_poll: self.before_poll.clone(),
1668
0
                    #[cfg(tokio_unstable)]
1669
0
                    after_poll: self.after_poll.clone(),
1670
0
                    after_termination: self.after_termination.clone(),
1671
0
                    global_queue_interval: self.global_queue_interval,
1672
0
                    event_interval: self.event_interval,
1673
0
                    local_queue_capacity: self.local_queue_capacity,
1674
0
                    #[cfg(tokio_unstable)]
1675
0
                    unhandled_panic: self.unhandled_panic.clone(),
1676
0
                    disable_lifo_slot: self.disable_lifo_slot,
1677
0
                    seed_generator: seed_generator_1,
1678
0
                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1679
0
                },
1680
0
            );
1681
1682
0
            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1683
1684
            // Spawn the thread pool workers
1685
0
            let _enter = handle.enter();
1686
0
            launch.launch();
1687
1688
0
            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1689
0
        }
1690
1691
        cfg_unstable! {
1692
            fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
1693
                use crate::loom::sys::num_cpus;
1694
                use crate::runtime::{Config, runtime::Scheduler};
1695
                use crate::runtime::scheduler::MultiThreadAlt;
1696
1697
                let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1698
                let (driver, driver_handle) = driver::Driver::new(self.get_cfg(worker_threads))?;
1699
1700
                // Create the blocking pool
1701
                let blocking_pool =
1702
                    blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1703
                let blocking_spawner = blocking_pool.spawner().clone();
1704
1705
                // Generate a rng seed for this runtime.
1706
                let seed_generator_1 = self.seed_generator.next_generator();
1707
                let seed_generator_2 = self.seed_generator.next_generator();
1708
1709
                let (scheduler, handle) = MultiThreadAlt::new(
1710
                    worker_threads,
1711
                    driver,
1712
                    driver_handle,
1713
                    blocking_spawner,
1714
                    seed_generator_2,
1715
                    Config {
1716
                        before_park: self.before_park.clone(),
1717
                        after_unpark: self.after_unpark.clone(),
1718
                        before_spawn: self.before_spawn.clone(),
1719
                        after_termination: self.after_termination.clone(),
1720
                        #[cfg(tokio_unstable)]
1721
                        before_poll: self.before_poll.clone(),
1722
                        #[cfg(tokio_unstable)]
1723
                        after_poll: self.after_poll.clone(),
1724
                        global_queue_interval: self.global_queue_interval,
1725
                        event_interval: self.event_interval,
1726
                        local_queue_capacity: self.local_queue_capacity,
1727
                        #[cfg(tokio_unstable)]
1728
                        unhandled_panic: self.unhandled_panic.clone(),
1729
                        disable_lifo_slot: self.disable_lifo_slot,
1730
                        seed_generator: seed_generator_1,
1731
                        metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1732
                    },
1733
                );
1734
1735
                Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
1736
            }
1737
        }
1738
    }
1739
}
1740
1741
impl fmt::Debug for Builder {
1742
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1743
0
        fmt.debug_struct("Builder")
1744
0
            .field("worker_threads", &self.worker_threads)
1745
0
            .field("max_blocking_threads", &self.max_blocking_threads)
1746
0
            .field(
1747
0
                "thread_name",
1748
0
                &"<dyn Fn() -> String + Send + Sync + 'static>",
1749
0
            )
1750
0
            .field("thread_stack_size", &self.thread_stack_size)
1751
0
            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1752
0
            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1753
0
            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1754
0
            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1755
0
            .finish()
1756
0
    }
1757
}