Coverage Report

Created: 2025-08-28 07:05

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.47.1/src/runtime/builder.rs
Line
Count
Source (jump to first uncovered line)
1
#![cfg_attr(loom, allow(unused_imports))]
2
3
use crate::runtime::handle::Handle;
4
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5
#[cfg(tokio_unstable)]
6
use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7
use crate::util::rand::{RngSeed, RngSeedGenerator};
8
9
use crate::runtime::blocking::BlockingPool;
10
use crate::runtime::scheduler::CurrentThread;
11
use std::fmt;
12
use std::io;
13
use std::thread::ThreadId;
14
use std::time::Duration;
15
16
/// Builds Tokio Runtime with custom configuration values.
17
///
18
/// Methods can be chained in order to set the configuration values. The
19
/// Runtime is constructed by calling [`build`].
20
///
21
/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22
/// or [`Builder::new_current_thread`].
23
///
24
/// See function level documentation for details on the various configuration
25
/// settings.
26
///
27
/// [`build`]: method@Self::build
28
/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29
/// [`Builder::new_current_thread`]: method@Self::new_current_thread
30
///
31
/// # Examples
32
///
33
/// ```
34
/// use tokio::runtime::Builder;
35
///
36
/// fn main() {
37
///     // build runtime
38
///     let runtime = Builder::new_multi_thread()
39
///         .worker_threads(4)
40
///         .thread_name("my-custom-name")
41
///         .thread_stack_size(3 * 1024 * 1024)
42
///         .build()
43
///         .unwrap();
44
///
45
///     // use runtime ...
46
/// }
47
/// ```
48
pub struct Builder {
49
    /// Runtime type
50
    kind: Kind,
51
52
    /// Whether or not to enable the I/O driver
53
    enable_io: bool,
54
    nevents: usize,
55
56
    /// Whether or not to enable the time driver
57
    enable_time: bool,
58
59
    /// Whether or not the clock should start paused.
60
    start_paused: bool,
61
62
    /// The number of worker threads, used by Runtime.
63
    ///
64
    /// Only used when not using the current-thread executor.
65
    worker_threads: Option<usize>,
66
67
    /// Cap on thread usage.
68
    max_blocking_threads: usize,
69
70
    /// Name fn used for threads spawned by the runtime.
71
    pub(super) thread_name: ThreadNameFn,
72
73
    /// Stack size used for threads spawned by the runtime.
74
    pub(super) thread_stack_size: Option<usize>,
75
76
    /// Callback to run after each thread starts.
77
    pub(super) after_start: Option<Callback>,
78
79
    /// To run before each worker thread stops
80
    pub(super) before_stop: Option<Callback>,
81
82
    /// To run before each worker thread is parked.
83
    pub(super) before_park: Option<Callback>,
84
85
    /// To run after each thread is unparked.
86
    pub(super) after_unpark: Option<Callback>,
87
88
    /// To run before each task is spawned.
89
    pub(super) before_spawn: Option<TaskCallback>,
90
91
    /// To run before each poll
92
    #[cfg(tokio_unstable)]
93
    pub(super) before_poll: Option<TaskCallback>,
94
95
    /// To run after each poll
96
    #[cfg(tokio_unstable)]
97
    pub(super) after_poll: Option<TaskCallback>,
98
99
    /// To run after each task is terminated.
100
    pub(super) after_termination: Option<TaskCallback>,
101
102
    /// Customizable keep alive timeout for `BlockingPool`
103
    pub(super) keep_alive: Option<Duration>,
104
105
    /// How many ticks before pulling a task from the global/remote queue?
106
    ///
107
    /// When `None`, the value is unspecified and behavior details are left to
108
    /// the scheduler. Each scheduler flavor could choose to either pick its own
109
    /// default value or use some other strategy to decide when to poll from the
110
    /// global queue. For example, the multi-threaded scheduler uses a
111
    /// self-tuning strategy based on mean task poll times.
112
    pub(super) global_queue_interval: Option<u32>,
113
114
    /// How many ticks before yielding to the driver for timer and I/O events?
115
    pub(super) event_interval: u32,
116
117
    /// When true, the multi-threade scheduler LIFO slot should not be used.
118
    ///
119
    /// This option should only be exposed as unstable.
120
    pub(super) disable_lifo_slot: bool,
121
122
    /// Specify a random number generator seed to provide deterministic results
123
    pub(super) seed_generator: RngSeedGenerator,
124
125
    /// When true, enables task poll count histogram instrumentation.
126
    pub(super) metrics_poll_count_histogram_enable: bool,
127
128
    /// Configures the task poll count histogram
129
    pub(super) metrics_poll_count_histogram: HistogramBuilder,
130
131
    #[cfg(tokio_unstable)]
132
    pub(super) unhandled_panic: UnhandledPanic,
133
}
134
135
cfg_unstable! {
136
    /// How the runtime should respond to unhandled panics.
137
    ///
138
    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
139
    /// to configure the runtime behavior when a spawned task panics.
140
    ///
141
    /// See [`Builder::unhandled_panic`] for more details.
142
    #[derive(Debug, Clone)]
143
    #[non_exhaustive]
144
    pub enum UnhandledPanic {
145
        /// The runtime should ignore panics on spawned tasks.
146
        ///
147
        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
148
        /// tasks continue running normally.
149
        ///
150
        /// This is the default behavior.
151
        ///
152
        /// # Examples
153
        ///
154
        /// ```
155
        /// use tokio::runtime::{self, UnhandledPanic};
156
        ///
157
        /// # pub fn main() {
158
        /// let rt = runtime::Builder::new_current_thread()
159
        ///     .unhandled_panic(UnhandledPanic::Ignore)
160
        ///     .build()
161
        ///     .unwrap();
162
        ///
163
        /// let task1 = rt.spawn(async { panic!("boom"); });
164
        /// let task2 = rt.spawn(async {
165
        ///     // This task completes normally
166
        ///     "done"
167
        /// });
168
        ///
169
        /// rt.block_on(async {
170
        ///     // The panic on the first task is forwarded to the `JoinHandle`
171
        ///     assert!(task1.await.is_err());
172
        ///
173
        ///     // The second task completes normally
174
        ///     assert!(task2.await.is_ok());
175
        /// })
176
        /// # }
177
        /// ```
178
        ///
179
        /// [`JoinHandle`]: struct@crate::task::JoinHandle
180
        Ignore,
181
182
        /// The runtime should immediately shutdown if a spawned task panics.
183
        ///
184
        /// The runtime will immediately shutdown even if the panicked task's
185
        /// [`JoinHandle`] is still available. All further spawned tasks will be
186
        /// immediately dropped and call to [`Runtime::block_on`] will panic.
187
        ///
188
        /// # Examples
189
        ///
190
        /// ```should_panic
191
        /// use tokio::runtime::{self, UnhandledPanic};
192
        ///
193
        /// # pub fn main() {
194
        /// let rt = runtime::Builder::new_current_thread()
195
        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
196
        ///     .build()
197
        ///     .unwrap();
198
        ///
199
        /// rt.spawn(async { panic!("boom"); });
200
        /// rt.spawn(async {
201
        ///     // This task never completes.
202
        /// });
203
        ///
204
        /// rt.block_on(async {
205
        ///     // Do some work
206
        /// # loop { tokio::task::yield_now().await; }
207
        /// })
208
        /// # }
209
        /// ```
210
        ///
211
        /// [`JoinHandle`]: struct@crate::task::JoinHandle
212
        ShutdownRuntime,
213
    }
214
}
215
216
pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
217
218
#[derive(Clone, Copy)]
219
pub(crate) enum Kind {
220
    CurrentThread,
221
    #[cfg(feature = "rt-multi-thread")]
222
    MultiThread,
223
}
224
225
impl Builder {
226
    /// Returns a new builder with the current thread scheduler selected.
227
    ///
228
    /// Configuration methods can be chained on the return value.
229
    ///
230
    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
231
    /// [`LocalSet`].
232
    ///
233
    /// [`LocalSet`]: crate::task::LocalSet
234
0
    pub fn new_current_thread() -> Builder {
235
        #[cfg(loom)]
236
        const EVENT_INTERVAL: u32 = 4;
237
        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
238
        #[cfg(not(loom))]
239
        const EVENT_INTERVAL: u32 = 61;
240
241
0
        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
242
0
    }
243
244
    /// Returns a new builder with the multi thread scheduler selected.
245
    ///
246
    /// Configuration methods can be chained on the return value.
247
    #[cfg(feature = "rt-multi-thread")]
248
    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
249
17.3k
    pub fn new_multi_thread() -> Builder {
250
17.3k
        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
251
17.3k
        Builder::new(Kind::MultiThread, 61)
252
17.3k
    }
253
254
    /// Returns a new runtime builder initialized with default configuration
255
    /// values.
256
    ///
257
    /// Configuration methods can be chained on the return value.
258
17.3k
    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
259
17.3k
        Builder {
260
17.3k
            kind,
261
17.3k
262
17.3k
            // I/O defaults to "off"
263
17.3k
            enable_io: false,
264
17.3k
            nevents: 1024,
265
17.3k
266
17.3k
            // Time defaults to "off"
267
17.3k
            enable_time: false,
268
17.3k
269
17.3k
            // The clock starts not-paused
270
17.3k
            start_paused: false,
271
17.3k
272
17.3k
            // Read from environment variable first in multi-threaded mode.
273
17.3k
            // Default to lazy auto-detection (one thread per CPU core)
274
17.3k
            worker_threads: None,
275
17.3k
276
17.3k
            max_blocking_threads: 512,
277
17.3k
278
17.3k
            // Default thread name
279
556k
            thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
280
17.3k
281
17.3k
            // Do not set a stack size by default
282
17.3k
            thread_stack_size: None,
283
17.3k
284
17.3k
            // No worker thread callbacks
285
17.3k
            after_start: None,
286
17.3k
            before_stop: None,
287
17.3k
            before_park: None,
288
17.3k
            after_unpark: None,
289
17.3k
290
17.3k
            before_spawn: None,
291
17.3k
            after_termination: None,
292
17.3k
293
17.3k
            #[cfg(tokio_unstable)]
294
17.3k
            before_poll: None,
295
17.3k
            #[cfg(tokio_unstable)]
296
17.3k
            after_poll: None,
297
17.3k
298
17.3k
            keep_alive: None,
299
17.3k
300
17.3k
            // Defaults for these values depend on the scheduler kind, so we get them
301
17.3k
            // as parameters.
302
17.3k
            global_queue_interval: None,
303
17.3k
            event_interval,
304
17.3k
305
17.3k
            seed_generator: RngSeedGenerator::new(RngSeed::new()),
306
17.3k
307
17.3k
            #[cfg(tokio_unstable)]
308
17.3k
            unhandled_panic: UnhandledPanic::Ignore,
309
17.3k
310
17.3k
            metrics_poll_count_histogram_enable: false,
311
17.3k
312
17.3k
            metrics_poll_count_histogram: HistogramBuilder::default(),
313
17.3k
314
17.3k
            disable_lifo_slot: false,
315
17.3k
        }
316
17.3k
    }
317
318
    /// Enables both I/O and time drivers.
319
    ///
320
    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
321
    /// individually. If additional components are added to Tokio in the future,
322
    /// `enable_all` will include these future components.
323
    ///
324
    /// # Examples
325
    ///
326
    /// ```
327
    /// use tokio::runtime;
328
    ///
329
    /// let rt = runtime::Builder::new_multi_thread()
330
    ///     .enable_all()
331
    ///     .build()
332
    ///     .unwrap();
333
    /// ```
334
17.3k
    pub fn enable_all(&mut self) -> &mut Self {
335
17.3k
        #[cfg(any(
336
17.3k
            feature = "net",
337
17.3k
            all(unix, feature = "process"),
338
17.3k
            all(unix, feature = "signal")
339
17.3k
        ))]
340
17.3k
        self.enable_io();
341
17.3k
        #[cfg(feature = "time")]
342
17.3k
        self.enable_time();
343
17.3k
344
17.3k
        self
345
17.3k
    }
346
347
    /// Sets the number of worker threads the `Runtime` will use.
348
    ///
349
    /// This can be any number above 0 though it is advised to keep this value
350
    /// on the smaller side.
351
    ///
352
    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
353
    ///
354
    /// # Default
355
    ///
356
    /// The default value is the number of cores available to the system.
357
    ///
358
    /// When using the `current_thread` runtime this method has no effect.
359
    ///
360
    /// # Examples
361
    ///
362
    /// ## Multi threaded runtime with 4 threads
363
    ///
364
    /// ```
365
    /// use tokio::runtime;
366
    ///
367
    /// // This will spawn a work-stealing runtime with 4 worker threads.
368
    /// let rt = runtime::Builder::new_multi_thread()
369
    ///     .worker_threads(4)
370
    ///     .build()
371
    ///     .unwrap();
372
    ///
373
    /// rt.spawn(async move {});
374
    /// ```
375
    ///
376
    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
377
    ///
378
    /// ```
379
    /// use tokio::runtime;
380
    ///
381
    /// // Create a runtime that _must_ be driven from a call
382
    /// // to `Runtime::block_on`.
383
    /// let rt = runtime::Builder::new_current_thread()
384
    ///     .build()
385
    ///     .unwrap();
386
    ///
387
    /// // This will run the runtime and future on the current thread
388
    /// rt.block_on(async move {});
389
    /// ```
390
    ///
391
    /// # Panics
392
    ///
393
    /// This will panic if `val` is not larger than `0`.
394
    #[track_caller]
395
0
    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
396
0
        assert!(val > 0, "Worker threads cannot be set to 0");
397
0
        self.worker_threads = Some(val);
398
0
        self
399
0
    }
400
401
    /// Specifies the limit for additional threads spawned by the Runtime.
402
    ///
403
    /// These threads are used for blocking operations like tasks spawned
404
    /// through [`spawn_blocking`], this includes but is not limited to:
405
    /// - [`fs`] operations
406
    /// - dns resolution through [`ToSocketAddrs`]
407
    /// - writing to [`Stdout`] or [`Stderr`]
408
    /// - reading from [`Stdin`]
409
    ///
410
    /// Unlike the [`worker_threads`], they are not always active and will exit
411
    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
412
    ///
413
    /// It's recommended to not set this limit too low in order to avoid hanging on operations
414
    /// requiring [`spawn_blocking`].
415
    ///
416
    /// The default value is 512.
417
    ///
418
    /// # Queue Behavior
419
    ///
420
    /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
421
    /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
422
    /// method has not been reached, a new thread will be spawned. If no idle thread is available
423
    /// and no more threads are allowed to be spawned, the task will remain in the queue until one
424
    /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
425
    /// it could potentially grow unbounded.
426
    ///
427
    /// # Panics
428
    ///
429
    /// This will panic if `val` is not larger than `0`.
430
    ///
431
    /// # Upgrading from 0.x
432
    ///
433
    /// In old versions `max_threads` limited both blocking and worker threads, but the
434
    /// current `max_blocking_threads` does not include async worker threads in the count.
435
    ///
436
    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
437
    /// [`fs`]: mod@crate::fs
438
    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
439
    /// [`Stdout`]: struct@crate::io::Stdout
440
    /// [`Stdin`]: struct@crate::io::Stdin
441
    /// [`Stderr`]: struct@crate::io::Stderr
442
    /// [`worker_threads`]: Self::worker_threads
443
    /// [`thread_keep_alive`]: Self::thread_keep_alive
444
    #[track_caller]
445
    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
446
0
    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
447
0
        assert!(val > 0, "Max blocking threads cannot be set to 0");
448
0
        self.max_blocking_threads = val;
449
0
        self
450
0
    }
451
452
    /// Sets name of threads spawned by the `Runtime`'s thread pool.
453
    ///
454
    /// The default name is "tokio-runtime-worker".
455
    ///
456
    /// # Examples
457
    ///
458
    /// ```
459
    /// # use tokio::runtime;
460
    ///
461
    /// # pub fn main() {
462
    /// let rt = runtime::Builder::new_multi_thread()
463
    ///     .thread_name("my-pool")
464
    ///     .build();
465
    /// # }
466
    /// ```
467
0
    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
468
0
        let val = val.into();
469
0
        self.thread_name = std::sync::Arc::new(move || val.clone());
470
0
        self
471
0
    }
472
473
    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
474
    ///
475
    /// The default name fn is `|| "tokio-runtime-worker".into()`.
476
    ///
477
    /// # Examples
478
    ///
479
    /// ```
480
    /// # use tokio::runtime;
481
    /// # use std::sync::atomic::{AtomicUsize, Ordering};
482
    /// # pub fn main() {
483
    /// let rt = runtime::Builder::new_multi_thread()
484
    ///     .thread_name_fn(|| {
485
    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
486
    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
487
    ///        format!("my-pool-{}", id)
488
    ///     })
489
    ///     .build();
490
    /// # }
491
    /// ```
492
0
    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
493
0
    where
494
0
        F: Fn() -> String + Send + Sync + 'static,
495
0
    {
496
0
        self.thread_name = std::sync::Arc::new(f);
497
0
        self
498
0
    }
499
500
    /// Sets the stack size (in bytes) for worker threads.
501
    ///
502
    /// The actual stack size may be greater than this value if the platform
503
    /// specifies minimal stack size.
504
    ///
505
    /// The default stack size for spawned threads is 2 MiB, though this
506
    /// particular stack size is subject to change in the future.
507
    ///
508
    /// # Examples
509
    ///
510
    /// ```
511
    /// # use tokio::runtime;
512
    ///
513
    /// # pub fn main() {
514
    /// let rt = runtime::Builder::new_multi_thread()
515
    ///     .thread_stack_size(32 * 1024)
516
    ///     .build();
517
    /// # }
518
    /// ```
519
0
    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
520
0
        self.thread_stack_size = Some(val);
521
0
        self
522
0
    }
523
524
    /// Executes function `f` after each thread is started but before it starts
525
    /// doing work.
526
    ///
527
    /// This is intended for bookkeeping and monitoring use cases.
528
    ///
529
    /// # Examples
530
    ///
531
    /// ```
532
    /// # use tokio::runtime;
533
    /// # pub fn main() {
534
    /// let runtime = runtime::Builder::new_multi_thread()
535
    ///     .on_thread_start(|| {
536
    ///         println!("thread started");
537
    ///     })
538
    ///     .build();
539
    /// # }
540
    /// ```
541
    #[cfg(not(loom))]
542
0
    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
543
0
    where
544
0
        F: Fn() + Send + Sync + 'static,
545
0
    {
546
0
        self.after_start = Some(std::sync::Arc::new(f));
547
0
        self
548
0
    }
549
550
    /// Executes function `f` before each thread stops.
551
    ///
552
    /// This is intended for bookkeeping and monitoring use cases.
553
    ///
554
    /// # Examples
555
    ///
556
    /// ```
557
    /// # use tokio::runtime;
558
    /// # pub fn main() {
559
    /// let runtime = runtime::Builder::new_multi_thread()
560
    ///     .on_thread_stop(|| {
561
    ///         println!("thread stopping");
562
    ///     })
563
    ///     .build();
564
    /// # }
565
    /// ```
566
    #[cfg(not(loom))]
567
0
    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
568
0
    where
569
0
        F: Fn() + Send + Sync + 'static,
570
0
    {
571
0
        self.before_stop = Some(std::sync::Arc::new(f));
572
0
        self
573
0
    }
574
575
    /// Executes function `f` just before a thread is parked (goes idle).
576
    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
577
    /// can be called, and may result in this thread being unparked immediately.
578
    ///
579
    /// This can be used to start work only when the executor is idle, or for bookkeeping
580
    /// and monitoring purposes.
581
    ///
582
    /// Note: There can only be one park callback for a runtime; calling this function
583
    /// more than once replaces the last callback defined, rather than adding to it.
584
    ///
585
    /// # Examples
586
    ///
587
    /// ## Multithreaded executor
588
    /// ```
589
    /// # use std::sync::Arc;
590
    /// # use std::sync::atomic::{AtomicBool, Ordering};
591
    /// # use tokio::runtime;
592
    /// # use tokio::sync::Barrier;
593
    /// # pub fn main() {
594
    /// let once = AtomicBool::new(true);
595
    /// let barrier = Arc::new(Barrier::new(2));
596
    ///
597
    /// let runtime = runtime::Builder::new_multi_thread()
598
    ///     .worker_threads(1)
599
    ///     .on_thread_park({
600
    ///         let barrier = barrier.clone();
601
    ///         move || {
602
    ///             let barrier = barrier.clone();
603
    ///             if once.swap(false, Ordering::Relaxed) {
604
    ///                 tokio::spawn(async move { barrier.wait().await; });
605
    ///            }
606
    ///         }
607
    ///     })
608
    ///     .build()
609
    ///     .unwrap();
610
    ///
611
    /// runtime.block_on(async {
612
    ///    barrier.wait().await;
613
    /// })
614
    /// # }
615
    /// ```
616
    /// ## Current thread executor
617
    /// ```
618
    /// # use std::sync::Arc;
619
    /// # use std::sync::atomic::{AtomicBool, Ordering};
620
    /// # use tokio::runtime;
621
    /// # use tokio::sync::Barrier;
622
    /// # pub fn main() {
623
    /// let once = AtomicBool::new(true);
624
    /// let barrier = Arc::new(Barrier::new(2));
625
    ///
626
    /// let runtime = runtime::Builder::new_current_thread()
627
    ///     .on_thread_park({
628
    ///         let barrier = barrier.clone();
629
    ///         move || {
630
    ///             let barrier = barrier.clone();
631
    ///             if once.swap(false, Ordering::Relaxed) {
632
    ///                 tokio::spawn(async move { barrier.wait().await; });
633
    ///            }
634
    ///         }
635
    ///     })
636
    ///     .build()
637
    ///     .unwrap();
638
    ///
639
    /// runtime.block_on(async {
640
    ///    barrier.wait().await;
641
    /// })
642
    /// # }
643
    /// ```
644
    #[cfg(not(loom))]
645
0
    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
646
0
    where
647
0
        F: Fn() + Send + Sync + 'static,
648
0
    {
649
0
        self.before_park = Some(std::sync::Arc::new(f));
650
0
        self
651
0
    }
652
653
    /// Executes function `f` just after a thread unparks (starts executing tasks).
654
    ///
655
    /// This is intended for bookkeeping and monitoring use cases; note that work
656
    /// in this callback will increase latencies when the application has allowed one or
657
    /// more runtime threads to go idle.
658
    ///
659
    /// Note: There can only be one unpark callback for a runtime; calling this function
660
    /// more than once replaces the last callback defined, rather than adding to it.
661
    ///
662
    /// # Examples
663
    ///
664
    /// ```
665
    /// # use tokio::runtime;
666
    /// # pub fn main() {
667
    /// let runtime = runtime::Builder::new_multi_thread()
668
    ///     .on_thread_unpark(|| {
669
    ///         println!("thread unparking");
670
    ///     })
671
    ///     .build();
672
    ///
673
    /// runtime.unwrap().block_on(async {
674
    ///    tokio::task::yield_now().await;
675
    ///    println!("Hello from Tokio!");
676
    /// })
677
    /// # }
678
    /// ```
679
    #[cfg(not(loom))]
680
0
    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
681
0
    where
682
0
        F: Fn() + Send + Sync + 'static,
683
0
    {
684
0
        self.after_unpark = Some(std::sync::Arc::new(f));
685
0
        self
686
0
    }
687
688
    /// Executes function `f` just before a task is spawned.
689
    ///
690
    /// `f` is called within the Tokio context, so functions like
691
    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
692
    /// invoked immediately.
693
    ///
694
    /// This can be used for bookkeeping or monitoring purposes.
695
    ///
696
    /// Note: There can only be one spawn callback for a runtime; calling this function more
697
    /// than once replaces the last callback defined, rather than adding to it.
698
    ///
699
    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
700
    ///
701
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
702
    /// may break in 1.x releases. See [the documentation on unstable
703
    /// features][unstable] for details.
704
    ///
705
    /// [unstable]: crate#unstable-features
706
    ///
707
    /// # Examples
708
    ///
709
    /// ```
710
    /// # use tokio::runtime;
711
    /// # pub fn main() {
712
    /// let runtime = runtime::Builder::new_current_thread()
713
    ///     .on_task_spawn(|_| {
714
    ///         println!("spawning task");
715
    ///     })
716
    ///     .build()
717
    ///     .unwrap();
718
    ///
719
    /// runtime.block_on(async {
720
    ///     tokio::task::spawn(std::future::ready(()));
721
    ///
722
    ///     for _ in 0..64 {
723
    ///         tokio::task::yield_now().await;
724
    ///     }
725
    /// })
726
    /// # }
727
    /// ```
728
    #[cfg(all(not(loom), tokio_unstable))]
729
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
730
    pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
731
    where
732
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
733
    {
734
        self.before_spawn = Some(std::sync::Arc::new(f));
735
        self
736
    }
737
738
    /// Executes function `f` just before a task is polled
739
    ///
740
    /// `f` is called within the Tokio context, so functions like
741
    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
742
    /// invoked immediately.
743
    ///
744
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
745
    /// may break in 1.x releases. See [the documentation on unstable
746
    /// features][unstable] for details.
747
    ///
748
    /// [unstable]: crate#unstable-features
749
    ///
750
    /// # Examples
751
    ///
752
    /// ```
753
    /// # use std::sync::{atomic::AtomicUsize, Arc};
754
    /// # use tokio::task::yield_now;
755
    /// # pub fn main() {
756
    /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
757
    /// let poll_start = poll_start_counter.clone();
758
    /// let rt = tokio::runtime::Builder::new_multi_thread()
759
    ///     .enable_all()
760
    ///     .on_before_task_poll(move |meta| {
761
    ///         println!("task {} is about to be polled", meta.id())
762
    ///     })
763
    ///     .build()
764
    ///     .unwrap();
765
    /// let task = rt.spawn(async {
766
    ///     yield_now().await;
767
    /// });
768
    /// let _ = rt.block_on(task);
769
    ///
770
    /// # }
771
    /// ```
772
    #[cfg(tokio_unstable)]
773
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
774
    pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
775
    where
776
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
777
    {
778
        self.before_poll = Some(std::sync::Arc::new(f));
779
        self
780
    }
781
782
    /// Executes function `f` just after a task is polled
783
    ///
784
    /// `f` is called within the Tokio context, so functions like
785
    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
786
    /// invoked immediately.
787
    ///
788
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
789
    /// may break in 1.x releases. See [the documentation on unstable
790
    /// features][unstable] for details.
791
    ///
792
    /// [unstable]: crate#unstable-features
793
    ///
794
    /// # Examples
795
    ///
796
    /// ```
797
    /// # use std::sync::{atomic::AtomicUsize, Arc};
798
    /// # use tokio::task::yield_now;
799
    /// # pub fn main() {
800
    /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
801
    /// let poll_stop = poll_stop_counter.clone();
802
    /// let rt = tokio::runtime::Builder::new_multi_thread()
803
    ///     .enable_all()
804
    ///     .on_after_task_poll(move |meta| {
805
    ///         println!("task {} completed polling", meta.id());
806
    ///     })
807
    ///     .build()
808
    ///     .unwrap();
809
    /// let task = rt.spawn(async {
810
    ///     yield_now().await;
811
    /// });
812
    /// let _ = rt.block_on(task);
813
    ///
814
    /// # }
815
    /// ```
816
    #[cfg(tokio_unstable)]
817
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
818
    pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
819
    where
820
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
821
    {
822
        self.after_poll = Some(std::sync::Arc::new(f));
823
        self
824
    }
825
826
    /// Executes function `f` just after a task is terminated.
827
    ///
828
    /// `f` is called within the Tokio context, so functions like
829
    /// [`tokio::spawn`](crate::spawn) can be called.
830
    ///
831
    /// This can be used for bookkeeping or monitoring purposes.
832
    ///
833
    /// Note: There can only be one task termination callback for a runtime; calling this
834
    /// function more than once replaces the last callback defined, rather than adding to it.
835
    ///
836
    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
837
    ///
838
    /// **Note**: This is an [unstable API][unstable]. The public API of this type
839
    /// may break in 1.x releases. See [the documentation on unstable
840
    /// features][unstable] for details.
841
    ///
842
    /// [unstable]: crate#unstable-features
843
    ///
844
    /// # Examples
845
    ///
846
    /// ```
847
    /// # use tokio::runtime;
848
    /// # pub fn main() {
849
    /// let runtime = runtime::Builder::new_current_thread()
850
    ///     .on_task_terminate(|_| {
851
    ///         println!("killing task");
852
    ///     })
853
    ///     .build()
854
    ///     .unwrap();
855
    ///
856
    /// runtime.block_on(async {
857
    ///     tokio::task::spawn(std::future::ready(()));
858
    ///
859
    ///     for _ in 0..64 {
860
    ///         tokio::task::yield_now().await;
861
    ///     }
862
    /// })
863
    /// # }
864
    /// ```
865
    #[cfg(all(not(loom), tokio_unstable))]
866
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
867
    pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
868
    where
869
        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
870
    {
871
        self.after_termination = Some(std::sync::Arc::new(f));
872
        self
873
    }
874
875
    /// Creates the configured `Runtime`.
876
    ///
877
    /// The returned `Runtime` instance is ready to spawn tasks.
878
    ///
879
    /// # Examples
880
    ///
881
    /// ```
882
    /// use tokio::runtime::Builder;
883
    ///
884
    /// let rt  = Builder::new_multi_thread().build().unwrap();
885
    ///
886
    /// rt.block_on(async {
887
    ///     println!("Hello from the Tokio runtime");
888
    /// });
889
    /// ```
890
17.3k
    pub fn build(&mut self) -> io::Result<Runtime> {
891
17.3k
        match &self.kind {
892
0
            Kind::CurrentThread => self.build_current_thread_runtime(),
893
            #[cfg(feature = "rt-multi-thread")]
894
17.3k
            Kind::MultiThread => self.build_threaded_runtime(),
895
        }
896
17.3k
    }
897
898
    /// Creates the configured [`LocalRuntime`].
899
    ///
900
    /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
901
    ///
902
    /// # Panics
903
    ///
904
    /// This will panic if the runtime is configured with [`new_multi_thread()`].
905
    ///
906
    /// [`new_multi_thread()`]: Builder::new_multi_thread
907
    ///
908
    /// # Examples
909
    ///
910
    /// ```
911
    /// use tokio::runtime::{Builder, LocalOptions};
912
    ///
913
    /// let rt = Builder::new_current_thread()
914
    ///     .build_local(LocalOptions::default())
915
    ///     .unwrap();
916
    ///
917
    /// rt.spawn_local(async {
918
    ///     println!("Hello from the Tokio runtime");
919
    /// });
920
    /// ```
921
    #[allow(unused_variables, unreachable_patterns)]
922
    #[cfg(tokio_unstable)]
923
    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
924
    pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
925
        match &self.kind {
926
            Kind::CurrentThread => self.build_current_thread_local_runtime(),
927
            #[cfg(feature = "rt-multi-thread")]
928
            Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
929
        }
930
    }
931
932
17.3k
    fn get_cfg(&self) -> driver::Cfg {
933
17.3k
        driver::Cfg {
934
17.3k
            enable_pause_time: match self.kind {
935
0
                Kind::CurrentThread => true,
936
                #[cfg(feature = "rt-multi-thread")]
937
17.3k
                Kind::MultiThread => false,
938
            },
939
17.3k
            enable_io: self.enable_io,
940
17.3k
            enable_time: self.enable_time,
941
17.3k
            start_paused: self.start_paused,
942
17.3k
            nevents: self.nevents,
943
17.3k
        }
944
17.3k
    }
945
946
    /// Sets a custom timeout for a thread in the blocking pool.
947
    ///
948
    /// By default, the timeout for a thread is set to 10 seconds. This can
949
    /// be overridden using `.thread_keep_alive()`.
950
    ///
951
    /// # Example
952
    ///
953
    /// ```
954
    /// # use tokio::runtime;
955
    /// # use std::time::Duration;
956
    /// # pub fn main() {
957
    /// let rt = runtime::Builder::new_multi_thread()
958
    ///     .thread_keep_alive(Duration::from_millis(100))
959
    ///     .build();
960
    /// # }
961
    /// ```
962
0
    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
963
0
        self.keep_alive = Some(duration);
964
0
        self
965
0
    }
966
967
    /// Sets the number of scheduler ticks after which the scheduler will poll the global
968
    /// task queue.
969
    ///
970
    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
971
    ///
972
    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
973
    /// [the module documentation] for the default behavior of the multi-thread scheduler.
974
    ///
975
    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
976
    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
977
    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
978
    /// getting started on new work, especially if tasks frequently yield rather than complete
979
    /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
980
    /// is a good choice when most tasks quickly complete polling.
981
    ///
982
    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
983
    ///
984
    /// # Panics
985
    ///
986
    /// This function will panic if 0 is passed as an argument.
987
    ///
988
    /// # Examples
989
    ///
990
    /// ```
991
    /// # use tokio::runtime;
992
    /// # pub fn main() {
993
    /// let rt = runtime::Builder::new_multi_thread()
994
    ///     .global_queue_interval(31)
995
    ///     .build();
996
    /// # }
997
    /// ```
998
    #[track_caller]
999
0
    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1000
0
        assert!(val > 0, "global_queue_interval must be greater than 0");
1001
0
        self.global_queue_interval = Some(val);
1002
0
        self
1003
0
    }
1004
1005
    /// Sets the number of scheduler ticks after which the scheduler will poll for
1006
    /// external events (timers, I/O, and so on).
1007
    ///
1008
    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1009
    ///
1010
    /// By default, the event interval is `61` for all scheduler types.
1011
    ///
1012
    /// Setting the event interval determines the effective "priority" of delivering
1013
    /// these external events (which may wake up additional tasks), compared to
1014
    /// executing tasks that are currently ready to run. A smaller value is useful
1015
    /// when tasks frequently spend a long time in polling, or frequently yield,
1016
    /// which can result in overly long delays picking up I/O events. Conversely,
1017
    /// picking up new events requires extra synchronization and syscall overhead,
1018
    /// so if tasks generally complete their polling quickly, a higher event interval
1019
    /// will minimize that overhead while still keeping the scheduler responsive to
1020
    /// events.
1021
    ///
1022
    /// # Examples
1023
    ///
1024
    /// ```
1025
    /// # use tokio::runtime;
1026
    /// # pub fn main() {
1027
    /// let rt = runtime::Builder::new_multi_thread()
1028
    ///     .event_interval(31)
1029
    ///     .build();
1030
    /// # }
1031
    /// ```
1032
0
    pub fn event_interval(&mut self, val: u32) -> &mut Self {
1033
0
        self.event_interval = val;
1034
0
        self
1035
0
    }
1036
1037
    cfg_unstable! {
1038
        /// Configure how the runtime responds to an unhandled panic on a
1039
        /// spawned task.
1040
        ///
1041
        /// By default, an unhandled panic (i.e. a panic not caught by
1042
        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1043
        /// execution. The panic's error value is forwarded to the task's
1044
        /// [`JoinHandle`] and all other spawned tasks continue running.
1045
        ///
1046
        /// The `unhandled_panic` option enables configuring this behavior.
1047
        ///
1048
        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1049
        ///   spawned tasks have no impact on the runtime's execution.
1050
        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1051
        ///   shutdown immediately when a spawned task panics even if that
1052
        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
1053
        ///   will immediately terminate and further calls to
1054
        ///   [`Runtime::block_on`] will panic.
1055
        ///
1056
        /// # Panics
1057
        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1058
        /// on a runtime other than the current thread runtime.
1059
        ///
1060
        /// # Unstable
1061
        ///
1062
        /// This option is currently unstable and its implementation is
1063
        /// incomplete. The API may change or be removed in the future. See
1064
        /// issue [tokio-rs/tokio#4516] for more details.
1065
        ///
1066
        /// # Examples
1067
        ///
1068
        /// The following demonstrates a runtime configured to shutdown on
1069
        /// panic. The first spawned task panics and results in the runtime
1070
        /// shutting down. The second spawned task never has a chance to
1071
        /// execute. The call to `block_on` will panic due to the runtime being
1072
        /// forcibly shutdown.
1073
        ///
1074
        /// ```should_panic
1075
        /// use tokio::runtime::{self, UnhandledPanic};
1076
        ///
1077
        /// # pub fn main() {
1078
        /// let rt = runtime::Builder::new_current_thread()
1079
        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1080
        ///     .build()
1081
        ///     .unwrap();
1082
        ///
1083
        /// rt.spawn(async { panic!("boom"); });
1084
        /// rt.spawn(async {
1085
        ///     // This task never completes.
1086
        /// });
1087
        ///
1088
        /// rt.block_on(async {
1089
        ///     // Do some work
1090
        /// # loop { tokio::task::yield_now().await; }
1091
        /// })
1092
        /// # }
1093
        /// ```
1094
        ///
1095
        /// [`JoinHandle`]: struct@crate::task::JoinHandle
1096
        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1097
        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1098
            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1099
                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1100
            }
1101
1102
            self.unhandled_panic = behavior;
1103
            self
1104
        }
1105
1106
        /// Disables the LIFO task scheduler heuristic.
1107
        ///
1108
        /// The multi-threaded scheduler includes a heuristic for optimizing
1109
        /// message-passing patterns. This heuristic results in the **last**
1110
        /// scheduled task being polled first.
1111
        ///
1112
        /// To implement this heuristic, each worker thread has a slot which
1113
        /// holds the task that should be polled next. However, this slot cannot
1114
        /// be stolen by other worker threads, which can result in lower total
1115
        /// throughput when tasks tend to have longer poll times.
1116
        ///
1117
        /// This configuration option will disable this heuristic resulting in
1118
        /// all scheduled tasks being pushed into the worker-local queue, which
1119
        /// is stealable.
1120
        ///
1121
        /// Consider trying this option when the task "scheduled" time is high
1122
        /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1123
        /// collect this data.
1124
        ///
1125
        /// # Unstable
1126
        ///
1127
        /// This configuration option is considered a workaround for the LIFO
1128
        /// slot not being stealable. When the slot becomes stealable, we will
1129
        /// revisit whether or not this option is necessary. See
1130
        /// issue [tokio-rs/tokio#4941].
1131
        ///
1132
        /// # Examples
1133
        ///
1134
        /// ```
1135
        /// use tokio::runtime;
1136
        ///
1137
        /// let rt = runtime::Builder::new_multi_thread()
1138
        ///     .disable_lifo_slot()
1139
        ///     .build()
1140
        ///     .unwrap();
1141
        /// ```
1142
        ///
1143
        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1144
        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1145
        pub fn disable_lifo_slot(&mut self) -> &mut Self {
1146
            self.disable_lifo_slot = true;
1147
            self
1148
        }
1149
1150
        /// Specifies the random number generation seed to use within all
1151
        /// threads associated with the runtime being built.
1152
        ///
1153
        /// This option is intended to make certain parts of the runtime
1154
        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1155
        /// [`tokio::select!`] it will ensure that the order that branches are
1156
        /// polled is deterministic.
1157
        ///
1158
        /// In addition to the code specifying `rng_seed` and interacting with
1159
        /// the runtime, the internals of Tokio and the Rust compiler may affect
1160
        /// the sequences of random numbers. In order to ensure repeatable
1161
        /// results, the version of Tokio, the versions of all other
1162
        /// dependencies that interact with Tokio, and the Rust compiler version
1163
        /// should also all remain constant.
1164
        ///
1165
        /// # Examples
1166
        ///
1167
        /// ```
1168
        /// # use tokio::runtime::{self, RngSeed};
1169
        /// # pub fn main() {
1170
        /// let seed = RngSeed::from_bytes(b"place your seed here");
1171
        /// let rt = runtime::Builder::new_current_thread()
1172
        ///     .rng_seed(seed)
1173
        ///     .build();
1174
        /// # }
1175
        /// ```
1176
        ///
1177
        /// [`tokio::select!`]: crate::select
1178
        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1179
            self.seed_generator = RngSeedGenerator::new(seed);
1180
            self
1181
        }
1182
    }
1183
1184
    cfg_unstable_metrics! {
1185
        /// Enables tracking the distribution of task poll times.
1186
        ///
1187
        /// Task poll times are not instrumented by default as doing so requires
1188
        /// calling [`Instant::now()`] twice per task poll, which could add
1189
        /// measurable overhead. Use the [`Handle::metrics()`] to access the
1190
        /// metrics data.
1191
        ///
1192
        /// The histogram uses fixed bucket sizes. In other words, the histogram
1193
        /// buckets are not dynamic based on input values. Use the
1194
        /// `metrics_poll_time_histogram` builder methods to configure the
1195
        /// histogram details.
1196
        ///
1197
        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1198
        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1199
        /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1200
        /// to select [`LogHistogram`] instead.
1201
        ///
1202
        /// # Examples
1203
        ///
1204
        /// ```
1205
        /// use tokio::runtime;
1206
        ///
1207
        /// let rt = runtime::Builder::new_multi_thread()
1208
        ///     .enable_metrics_poll_time_histogram()
1209
        ///     .build()
1210
        ///     .unwrap();
1211
        /// # // Test default values here
1212
        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1213
        /// # let m = rt.handle().metrics();
1214
        /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1215
        /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1216
        /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1217
        /// ```
1218
        ///
1219
        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1220
        /// [`Instant::now()`]: std::time::Instant::now
1221
        /// [`LogHistogram`]: crate::runtime::LogHistogram
1222
        /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1223
        pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1224
            self.metrics_poll_count_histogram_enable = true;
1225
            self
1226
        }
1227
1228
        /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1229
        ///
1230
        /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1231
        #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1232
        #[doc(hidden)]
1233
        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1234
            self.enable_metrics_poll_time_histogram()
1235
        }
1236
1237
        /// Sets the histogram scale for tracking the distribution of task poll
1238
        /// times.
1239
        ///
1240
        /// Tracking the distribution of task poll times can be done using a
1241
        /// linear or log scale. When using linear scale, each histogram bucket
1242
        /// will represent the same range of poll times. When using log scale,
1243
        /// each histogram bucket will cover a range twice as big as the
1244
        /// previous bucket.
1245
        ///
1246
        /// **Default:** linear scale.
1247
        ///
1248
        /// # Examples
1249
        ///
1250
        /// ```
1251
        /// use tokio::runtime::{self, HistogramScale};
1252
        ///
1253
        /// # #[allow(deprecated)]
1254
        /// let rt = runtime::Builder::new_multi_thread()
1255
        ///     .enable_metrics_poll_time_histogram()
1256
        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1257
        ///     .build()
1258
        ///     .unwrap();
1259
        /// ```
1260
        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1261
        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1262
            self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1263
            self
1264
        }
1265
1266
        /// Configure the histogram for tracking poll times
1267
        ///
1268
        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1269
        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1270
        /// better granularity with low memory usage, use [`LogHistogram`] instead.
1271
        ///
1272
        /// # Examples
1273
        /// Configure a [`LogHistogram`] with [default configuration]:
1274
        /// ```
1275
        /// use tokio::runtime;
1276
        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1277
        ///
1278
        /// let rt = runtime::Builder::new_multi_thread()
1279
        ///     .enable_metrics_poll_time_histogram()
1280
        ///     .metrics_poll_time_histogram_configuration(
1281
        ///         HistogramConfiguration::log(LogHistogram::default())
1282
        ///     )
1283
        ///     .build()
1284
        ///     .unwrap();
1285
        /// ```
1286
        ///
1287
        /// Configure a linear histogram with 100 buckets, each 10μs wide
1288
        /// ```
1289
        /// use tokio::runtime;
1290
        /// use std::time::Duration;
1291
        /// use tokio::runtime::HistogramConfiguration;
1292
        ///
1293
        /// let rt = runtime::Builder::new_multi_thread()
1294
        ///     .enable_metrics_poll_time_histogram()
1295
        ///     .metrics_poll_time_histogram_configuration(
1296
        ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1297
        ///     )
1298
        ///     .build()
1299
        ///     .unwrap();
1300
        /// ```
1301
        ///
1302
        /// Configure a [`LogHistogram`] with the following settings:
1303
        /// - Measure times from 100ns to 120s
1304
        /// - Max error of 0.1
1305
        /// - No more than 1024 buckets
1306
        /// ```
1307
        /// use std::time::Duration;
1308
        /// use tokio::runtime;
1309
        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1310
        ///
1311
        /// let rt = runtime::Builder::new_multi_thread()
1312
        ///     .enable_metrics_poll_time_histogram()
1313
        ///     .metrics_poll_time_histogram_configuration(
1314
        ///         HistogramConfiguration::log(LogHistogram::builder()
1315
        ///             .max_value(Duration::from_secs(120))
1316
        ///             .min_value(Duration::from_nanos(100))
1317
        ///             .max_error(0.1)
1318
        ///             .max_buckets(1024)
1319
        ///             .expect("configuration uses 488 buckets")
1320
        ///         )
1321
        ///     )
1322
        ///     .build()
1323
        ///     .unwrap();
1324
        /// ```
1325
        ///
1326
        /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1327
        /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1328
        /// where each bucket is twice the size of the previous bucket.
1329
        /// ```rust
1330
        /// use std::time::Duration;
1331
        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1332
        /// let rt = tokio::runtime::Builder::new_current_thread()
1333
        ///     .enable_all()
1334
        ///     .enable_metrics_poll_time_histogram()
1335
        ///     .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1336
        ///         LogHistogram::builder()
1337
        ///             .min_value(Duration::from_micros(20))
1338
        ///             .max_value(Duration::from_millis(4))
1339
        ///             // Set `precision_exact` to `0` to match `HistogramScale::Log`
1340
        ///             .precision_exact(0)
1341
        ///             .max_buckets(10)
1342
        ///             .unwrap(),
1343
        ///     ))
1344
        ///     .build()
1345
        ///     .unwrap();
1346
        /// ```
1347
        ///
1348
        /// [`LogHistogram`]: crate::runtime::LogHistogram
1349
        /// [default configuration]: crate::runtime::LogHistogramBuilder
1350
        /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1351
        pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1352
            self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1353
            self
1354
        }
1355
1356
        /// Sets the histogram resolution for tracking the distribution of task
1357
        /// poll times.
1358
        ///
1359
        /// The resolution is the histogram's first bucket's range. When using a
1360
        /// linear histogram scale, each bucket will cover the same range. When
1361
        /// using a log scale, each bucket will cover a range twice as big as
1362
        /// the previous bucket. In the log case, the resolution represents the
1363
        /// smallest bucket range.
1364
        ///
1365
        /// Note that, when using log scale, the resolution is rounded up to the
1366
        /// nearest power of 2 in nanoseconds.
1367
        ///
1368
        /// **Default:** 100 microseconds.
1369
        ///
1370
        /// # Examples
1371
        ///
1372
        /// ```
1373
        /// use tokio::runtime;
1374
        /// use std::time::Duration;
1375
        ///
1376
        /// # #[allow(deprecated)]
1377
        /// let rt = runtime::Builder::new_multi_thread()
1378
        ///     .enable_metrics_poll_time_histogram()
1379
        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1380
        ///     .build()
1381
        ///     .unwrap();
1382
        /// ```
1383
        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1384
        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1385
            assert!(resolution > Duration::from_secs(0));
1386
            // Sanity check the argument and also make the cast below safe.
1387
            assert!(resolution <= Duration::from_secs(1));
1388
1389
            let resolution = resolution.as_nanos() as u64;
1390
1391
            self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1392
            self
1393
        }
1394
1395
        /// Sets the number of buckets for the histogram tracking the
1396
        /// distribution of task poll times.
1397
        ///
1398
        /// The last bucket tracks all greater values that fall out of other
1399
        /// ranges. So, configuring the histogram using a linear scale,
1400
        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1401
        /// polls that take more than 450ms to complete.
1402
        ///
1403
        /// **Default:** 10
1404
        ///
1405
        /// # Examples
1406
        ///
1407
        /// ```
1408
        /// use tokio::runtime;
1409
        ///
1410
        /// # #[allow(deprecated)]
1411
        /// let rt = runtime::Builder::new_multi_thread()
1412
        ///     .enable_metrics_poll_time_histogram()
1413
        ///     .metrics_poll_count_histogram_buckets(15)
1414
        ///     .build()
1415
        ///     .unwrap();
1416
        /// ```
1417
        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1418
        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1419
            self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1420
            self
1421
        }
1422
    }
1423
1424
0
    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1425
        use crate::runtime::runtime::Scheduler;
1426
1427
0
        let (scheduler, handle, blocking_pool) =
1428
0
            self.build_current_thread_runtime_components(None)?;
1429
1430
0
        Ok(Runtime::from_parts(
1431
0
            Scheduler::CurrentThread(scheduler),
1432
0
            handle,
1433
0
            blocking_pool,
1434
0
        ))
1435
0
    }
1436
1437
    #[cfg(tokio_unstable)]
1438
    fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1439
        use crate::runtime::local_runtime::LocalRuntimeScheduler;
1440
1441
        let tid = std::thread::current().id();
1442
1443
        let (scheduler, handle, blocking_pool) =
1444
            self.build_current_thread_runtime_components(Some(tid))?;
1445
1446
        Ok(LocalRuntime::from_parts(
1447
            LocalRuntimeScheduler::CurrentThread(scheduler),
1448
            handle,
1449
            blocking_pool,
1450
        ))
1451
    }
1452
1453
0
    fn build_current_thread_runtime_components(
1454
0
        &mut self,
1455
0
        local_tid: Option<ThreadId>,
1456
0
    ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1457
        use crate::runtime::scheduler;
1458
        use crate::runtime::Config;
1459
1460
0
        let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1461
1462
        // Blocking pool
1463
0
        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1464
0
        let blocking_spawner = blocking_pool.spawner().clone();
1465
0
1466
0
        // Generate a rng seed for this runtime.
1467
0
        let seed_generator_1 = self.seed_generator.next_generator();
1468
0
        let seed_generator_2 = self.seed_generator.next_generator();
1469
0
1470
0
        // And now put a single-threaded scheduler on top of the timer. When
1471
0
        // there are no futures ready to do something, it'll let the timer or
1472
0
        // the reactor to generate some new stimuli for the futures to continue
1473
0
        // in their life.
1474
0
        let (scheduler, handle) = CurrentThread::new(
1475
0
            driver,
1476
0
            driver_handle,
1477
0
            blocking_spawner,
1478
0
            seed_generator_2,
1479
0
            Config {
1480
0
                before_park: self.before_park.clone(),
1481
0
                after_unpark: self.after_unpark.clone(),
1482
0
                before_spawn: self.before_spawn.clone(),
1483
0
                #[cfg(tokio_unstable)]
1484
0
                before_poll: self.before_poll.clone(),
1485
0
                #[cfg(tokio_unstable)]
1486
0
                after_poll: self.after_poll.clone(),
1487
0
                after_termination: self.after_termination.clone(),
1488
0
                global_queue_interval: self.global_queue_interval,
1489
0
                event_interval: self.event_interval,
1490
0
                #[cfg(tokio_unstable)]
1491
0
                unhandled_panic: self.unhandled_panic.clone(),
1492
0
                disable_lifo_slot: self.disable_lifo_slot,
1493
0
                seed_generator: seed_generator_1,
1494
0
                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1495
0
            },
1496
0
            local_tid,
1497
0
        );
1498
0
1499
0
        let handle = Handle {
1500
0
            inner: scheduler::Handle::CurrentThread(handle),
1501
0
        };
1502
0
1503
0
        Ok((scheduler, handle, blocking_pool))
1504
0
    }
1505
1506
17.3k
    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1507
17.3k
        if self.metrics_poll_count_histogram_enable {
1508
0
            Some(self.metrics_poll_count_histogram.clone())
1509
        } else {
1510
17.3k
            None
1511
        }
1512
17.3k
    }
1513
}
1514
1515
cfg_io_driver! {
1516
    impl Builder {
1517
        /// Enables the I/O driver.
1518
        ///
1519
        /// Doing this enables using net, process, signal, and some I/O types on
1520
        /// the runtime.
1521
        ///
1522
        /// # Examples
1523
        ///
1524
        /// ```
1525
        /// use tokio::runtime;
1526
        ///
1527
        /// let rt = runtime::Builder::new_multi_thread()
1528
        ///     .enable_io()
1529
        ///     .build()
1530
        ///     .unwrap();
1531
        /// ```
1532
17.3k
        pub fn enable_io(&mut self) -> &mut Self {
1533
17.3k
            self.enable_io = true;
1534
17.3k
            self
1535
17.3k
        }
1536
1537
        /// Enables the I/O driver and configures the max number of events to be
1538
        /// processed per tick.
1539
        ///
1540
        /// # Examples
1541
        ///
1542
        /// ```
1543
        /// use tokio::runtime;
1544
        ///
1545
        /// let rt = runtime::Builder::new_current_thread()
1546
        ///     .enable_io()
1547
        ///     .max_io_events_per_tick(1024)
1548
        ///     .build()
1549
        ///     .unwrap();
1550
        /// ```
1551
0
        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1552
0
            self.nevents = capacity;
1553
0
            self
1554
0
        }
1555
    }
1556
}
1557
1558
cfg_time! {
1559
    impl Builder {
1560
        /// Enables the time driver.
1561
        ///
1562
        /// Doing this enables using `tokio::time` on the runtime.
1563
        ///
1564
        /// # Examples
1565
        ///
1566
        /// ```
1567
        /// use tokio::runtime;
1568
        ///
1569
        /// let rt = runtime::Builder::new_multi_thread()
1570
        ///     .enable_time()
1571
        ///     .build()
1572
        ///     .unwrap();
1573
        /// ```
1574
17.3k
        pub fn enable_time(&mut self) -> &mut Self {
1575
17.3k
            self.enable_time = true;
1576
17.3k
            self
1577
17.3k
        }
1578
    }
1579
}
1580
1581
cfg_test_util! {
1582
    impl Builder {
1583
        /// Controls if the runtime's clock starts paused or advancing.
1584
        ///
1585
        /// Pausing time requires the current-thread runtime; construction of
1586
        /// the runtime will panic otherwise.
1587
        ///
1588
        /// # Examples
1589
        ///
1590
        /// ```
1591
        /// use tokio::runtime;
1592
        ///
1593
        /// let rt = runtime::Builder::new_current_thread()
1594
        ///     .enable_time()
1595
        ///     .start_paused(true)
1596
        ///     .build()
1597
        ///     .unwrap();
1598
        /// ```
1599
0
        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1600
0
            self.start_paused = start_paused;
1601
0
            self
1602
0
        }
1603
    }
1604
}
1605
1606
cfg_rt_multi_thread! {
1607
    impl Builder {
1608
17.3k
        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1609
            use crate::loom::sys::num_cpus;
1610
            use crate::runtime::{Config, runtime::Scheduler};
1611
            use crate::runtime::scheduler::{self, MultiThread};
1612
1613
17.3k
            let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1614
1615
17.3k
            let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1616
1617
            // Create the blocking pool
1618
17.3k
            let blocking_pool =
1619
17.3k
                blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1620
17.3k
            let blocking_spawner = blocking_pool.spawner().clone();
1621
17.3k
1622
17.3k
            // Generate a rng seed for this runtime.
1623
17.3k
            let seed_generator_1 = self.seed_generator.next_generator();
1624
17.3k
            let seed_generator_2 = self.seed_generator.next_generator();
1625
17.3k
1626
17.3k
            let (scheduler, handle, launch) = MultiThread::new(
1627
17.3k
                worker_threads,
1628
17.3k
                driver,
1629
17.3k
                driver_handle,
1630
17.3k
                blocking_spawner,
1631
17.3k
                seed_generator_2,
1632
17.3k
                Config {
1633
17.3k
                    before_park: self.before_park.clone(),
1634
17.3k
                    after_unpark: self.after_unpark.clone(),
1635
17.3k
                    before_spawn: self.before_spawn.clone(),
1636
17.3k
                    #[cfg(tokio_unstable)]
1637
17.3k
                    before_poll: self.before_poll.clone(),
1638
17.3k
                    #[cfg(tokio_unstable)]
1639
17.3k
                    after_poll: self.after_poll.clone(),
1640
17.3k
                    after_termination: self.after_termination.clone(),
1641
17.3k
                    global_queue_interval: self.global_queue_interval,
1642
17.3k
                    event_interval: self.event_interval,
1643
17.3k
                    #[cfg(tokio_unstable)]
1644
17.3k
                    unhandled_panic: self.unhandled_panic.clone(),
1645
17.3k
                    disable_lifo_slot: self.disable_lifo_slot,
1646
17.3k
                    seed_generator: seed_generator_1,
1647
17.3k
                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1648
17.3k
                },
1649
17.3k
            );
1650
17.3k
1651
17.3k
            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1652
17.3k
1653
17.3k
            // Spawn the thread pool workers
1654
17.3k
            let _enter = handle.enter();
1655
17.3k
            launch.launch();
1656
17.3k
1657
17.3k
            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1658
17.3k
        }
1659
    }
1660
}
1661
1662
impl fmt::Debug for Builder {
1663
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1664
0
        fmt.debug_struct("Builder")
1665
0
            .field("worker_threads", &self.worker_threads)
1666
0
            .field("max_blocking_threads", &self.max_blocking_threads)
1667
0
            .field(
1668
0
                "thread_name",
1669
0
                &"<dyn Fn() -> String + Send + Sync + 'static>",
1670
0
            )
1671
0
            .field("thread_stack_size", &self.thread_stack_size)
1672
0
            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1673
0
            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1674
0
            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1675
0
            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1676
0
            .finish()
1677
0
    }
1678
}