/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.49.0/src/runtime/builder.rs
Line | Count | Source |
1 | | #![cfg_attr(loom, allow(unused_imports))] |
2 | | |
3 | | use crate::runtime::handle::Handle; |
4 | | use crate::runtime::{ |
5 | | blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor, |
6 | | }; |
7 | | #[cfg(tokio_unstable)] |
8 | | use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta}; |
9 | | use crate::util::rand::{RngSeed, RngSeedGenerator}; |
10 | | |
11 | | use crate::runtime::blocking::BlockingPool; |
12 | | use crate::runtime::scheduler::CurrentThread; |
13 | | use std::fmt; |
14 | | use std::io; |
15 | | use std::thread::ThreadId; |
16 | | use std::time::Duration; |
17 | | |
18 | | /// Builds Tokio Runtime with custom configuration values. |
19 | | /// |
20 | | /// Methods can be chained in order to set the configuration values. The |
21 | | /// Runtime is constructed by calling [`build`]. |
22 | | /// |
23 | | /// New instances of `Builder` are obtained via [`Builder::new_multi_thread`] |
24 | | /// or [`Builder::new_current_thread`]. |
25 | | /// |
26 | | /// See function level documentation for details on the various configuration |
27 | | /// settings. |
28 | | /// |
29 | | /// [`build`]: method@Self::build |
30 | | /// [`Builder::new_multi_thread`]: method@Self::new_multi_thread |
31 | | /// [`Builder::new_current_thread`]: method@Self::new_current_thread |
32 | | /// |
33 | | /// # Examples |
34 | | /// |
35 | | /// ``` |
36 | | /// # #[cfg(not(target_family = "wasm"))] |
37 | | /// # { |
38 | | /// use tokio::runtime::Builder; |
39 | | /// |
40 | | /// fn main() { |
41 | | /// // build runtime |
42 | | /// let runtime = Builder::new_multi_thread() |
43 | | /// .worker_threads(4) |
44 | | /// .thread_name("my-custom-name") |
45 | | /// .thread_stack_size(3 * 1024 * 1024) |
46 | | /// .build() |
47 | | /// .unwrap(); |
48 | | /// |
49 | | /// // use runtime ... |
50 | | /// } |
51 | | /// # } |
52 | | /// ``` |
53 | | pub struct Builder { |
54 | | /// Runtime type |
55 | | kind: Kind, |
56 | | |
57 | | /// Whether or not to enable the I/O driver |
58 | | enable_io: bool, |
59 | | nevents: usize, |
60 | | |
61 | | /// Whether or not to enable the time driver |
62 | | enable_time: bool, |
63 | | |
64 | | /// Whether or not the clock should start paused. |
65 | | start_paused: bool, |
66 | | |
67 | | /// The number of worker threads, used by Runtime. |
68 | | /// |
69 | | /// Only used when not using the current-thread executor. |
70 | | worker_threads: Option<usize>, |
71 | | |
72 | | /// Cap on thread usage. |
73 | | max_blocking_threads: usize, |
74 | | |
75 | | /// Name fn used for threads spawned by the runtime. |
76 | | pub(super) thread_name: ThreadNameFn, |
77 | | |
78 | | /// Stack size used for threads spawned by the runtime. |
79 | | pub(super) thread_stack_size: Option<usize>, |
80 | | |
81 | | /// Callback to run after each thread starts. |
82 | | pub(super) after_start: Option<Callback>, |
83 | | |
84 | | /// To run before each worker thread stops |
85 | | pub(super) before_stop: Option<Callback>, |
86 | | |
87 | | /// To run before each worker thread is parked. |
88 | | pub(super) before_park: Option<Callback>, |
89 | | |
90 | | /// To run after each thread is unparked. |
91 | | pub(super) after_unpark: Option<Callback>, |
92 | | |
93 | | /// To run before each task is spawned. |
94 | | pub(super) before_spawn: Option<TaskCallback>, |
95 | | |
96 | | /// To run before each poll |
97 | | #[cfg(tokio_unstable)] |
98 | | pub(super) before_poll: Option<TaskCallback>, |
99 | | |
100 | | /// To run after each poll |
101 | | #[cfg(tokio_unstable)] |
102 | | pub(super) after_poll: Option<TaskCallback>, |
103 | | |
104 | | /// To run after each task is terminated. |
105 | | pub(super) after_termination: Option<TaskCallback>, |
106 | | |
107 | | /// Customizable keep alive timeout for `BlockingPool` |
108 | | pub(super) keep_alive: Option<Duration>, |
109 | | |
110 | | /// How many ticks before pulling a task from the global/remote queue? |
111 | | /// |
112 | | /// When `None`, the value is unspecified and behavior details are left to |
113 | | /// the scheduler. Each scheduler flavor could choose to either pick its own |
114 | | /// default value or use some other strategy to decide when to poll from the |
115 | | /// global queue. For example, the multi-threaded scheduler uses a |
116 | | /// self-tuning strategy based on mean task poll times. |
117 | | pub(super) global_queue_interval: Option<u32>, |
118 | | |
119 | | /// How many ticks before yielding to the driver for timer and I/O events? |
120 | | pub(super) event_interval: u32, |
121 | | |
122 | | /// When true, the multi-threade scheduler LIFO slot should not be used. |
123 | | /// |
124 | | /// This option should only be exposed as unstable. |
125 | | pub(super) disable_lifo_slot: bool, |
126 | | |
127 | | /// Specify a random number generator seed to provide deterministic results |
128 | | pub(super) seed_generator: RngSeedGenerator, |
129 | | |
130 | | /// When true, enables task poll count histogram instrumentation. |
131 | | pub(super) metrics_poll_count_histogram_enable: bool, |
132 | | |
133 | | /// Configures the task poll count histogram |
134 | | pub(super) metrics_poll_count_histogram: HistogramBuilder, |
135 | | |
136 | | #[cfg(tokio_unstable)] |
137 | | pub(super) unhandled_panic: UnhandledPanic, |
138 | | |
139 | | timer_flavor: TimerFlavor, |
140 | | } |
141 | | |
142 | | cfg_unstable! { |
143 | | /// How the runtime should respond to unhandled panics. |
144 | | /// |
145 | | /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic` |
146 | | /// to configure the runtime behavior when a spawned task panics. |
147 | | /// |
148 | | /// See [`Builder::unhandled_panic`] for more details. |
149 | | #[derive(Debug, Clone)] |
150 | | #[non_exhaustive] |
151 | | pub enum UnhandledPanic { |
152 | | /// The runtime should ignore panics on spawned tasks. |
153 | | /// |
154 | | /// The panic is forwarded to the task's [`JoinHandle`] and all spawned |
155 | | /// tasks continue running normally. |
156 | | /// |
157 | | /// This is the default behavior. |
158 | | /// |
159 | | /// # Examples |
160 | | /// |
161 | | /// ``` |
162 | | /// # #[cfg(not(target_family = "wasm"))] |
163 | | /// # { |
164 | | /// use tokio::runtime::{self, UnhandledPanic}; |
165 | | /// |
166 | | /// # pub fn main() { |
167 | | /// let rt = runtime::Builder::new_current_thread() |
168 | | /// .unhandled_panic(UnhandledPanic::Ignore) |
169 | | /// .build() |
170 | | /// .unwrap(); |
171 | | /// |
172 | | /// let task1 = rt.spawn(async { panic!("boom"); }); |
173 | | /// let task2 = rt.spawn(async { |
174 | | /// // This task completes normally |
175 | | /// "done" |
176 | | /// }); |
177 | | /// |
178 | | /// rt.block_on(async { |
179 | | /// // The panic on the first task is forwarded to the `JoinHandle` |
180 | | /// assert!(task1.await.is_err()); |
181 | | /// |
182 | | /// // The second task completes normally |
183 | | /// assert!(task2.await.is_ok()); |
184 | | /// }) |
185 | | /// # } |
186 | | /// # } |
187 | | /// ``` |
188 | | /// |
189 | | /// [`JoinHandle`]: struct@crate::task::JoinHandle |
190 | | Ignore, |
191 | | |
192 | | /// The runtime should immediately shutdown if a spawned task panics. |
193 | | /// |
194 | | /// The runtime will immediately shutdown even if the panicked task's |
195 | | /// [`JoinHandle`] is still available. All further spawned tasks will be |
196 | | /// immediately dropped and call to [`Runtime::block_on`] will panic. |
197 | | /// |
198 | | /// # Examples |
199 | | /// |
200 | | /// ```should_panic |
201 | | /// use tokio::runtime::{self, UnhandledPanic}; |
202 | | /// |
203 | | /// # pub fn main() { |
204 | | /// let rt = runtime::Builder::new_current_thread() |
205 | | /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) |
206 | | /// .build() |
207 | | /// .unwrap(); |
208 | | /// |
209 | | /// rt.spawn(async { panic!("boom"); }); |
210 | | /// rt.spawn(async { |
211 | | /// // This task never completes. |
212 | | /// }); |
213 | | /// |
214 | | /// rt.block_on(async { |
215 | | /// // Do some work |
216 | | /// # loop { tokio::task::yield_now().await; } |
217 | | /// }) |
218 | | /// # } |
219 | | /// ``` |
220 | | /// |
221 | | /// [`JoinHandle`]: struct@crate::task::JoinHandle |
222 | | ShutdownRuntime, |
223 | | } |
224 | | } |
225 | | |
226 | | pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>; |
227 | | |
228 | | #[derive(Clone, Copy)] |
229 | | pub(crate) enum Kind { |
230 | | CurrentThread, |
231 | | #[cfg(feature = "rt-multi-thread")] |
232 | | MultiThread, |
233 | | } |
234 | | |
235 | | impl Builder { |
236 | | /// Returns a new builder with the current thread scheduler selected. |
237 | | /// |
238 | | /// Configuration methods can be chained on the return value. |
239 | | /// |
240 | | /// To spawn non-`Send` tasks on the resulting runtime, combine it with a |
241 | | /// [`LocalSet`], or call [`build_local`] to create a [`LocalRuntime`] (unstable). |
242 | | /// |
243 | | /// [`LocalSet`]: crate::task::LocalSet |
244 | | /// [`LocalRuntime`]: crate::runtime::LocalRuntime |
245 | | /// [`build_local`]: crate::runtime::Builder::build_local |
246 | 0 | pub fn new_current_thread() -> Builder { |
247 | | #[cfg(loom)] |
248 | | const EVENT_INTERVAL: u32 = 4; |
249 | | // The number `61` is fairly arbitrary. I believe this value was copied from golang. |
250 | | #[cfg(not(loom))] |
251 | | const EVENT_INTERVAL: u32 = 61; |
252 | | |
253 | 0 | Builder::new(Kind::CurrentThread, EVENT_INTERVAL) |
254 | 0 | } |
255 | | |
256 | | /// Returns a new builder with the multi thread scheduler selected. |
257 | | /// |
258 | | /// Configuration methods can be chained on the return value. |
259 | | #[cfg(feature = "rt-multi-thread")] |
260 | | #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] |
261 | | pub fn new_multi_thread() -> Builder { |
262 | | // The number `61` is fairly arbitrary. I believe this value was copied from golang. |
263 | | Builder::new(Kind::MultiThread, 61) |
264 | | } |
265 | | |
266 | | /// Returns a new runtime builder initialized with default configuration |
267 | | /// values. |
268 | | /// |
269 | | /// Configuration methods can be chained on the return value. |
270 | 0 | pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder { |
271 | | Builder { |
272 | 0 | kind, |
273 | | |
274 | | // I/O defaults to "off" |
275 | | enable_io: false, |
276 | | nevents: 1024, |
277 | | |
278 | | // Time defaults to "off" |
279 | | enable_time: false, |
280 | | |
281 | | // The clock starts not-paused |
282 | | start_paused: false, |
283 | | |
284 | | // Read from environment variable first in multi-threaded mode. |
285 | | // Default to lazy auto-detection (one thread per CPU core) |
286 | 0 | worker_threads: None, |
287 | | |
288 | | max_blocking_threads: 512, |
289 | | |
290 | | // Default thread name |
291 | 0 | thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()), |
292 | | |
293 | | // Do not set a stack size by default |
294 | 0 | thread_stack_size: None, |
295 | | |
296 | | // No worker thread callbacks |
297 | 0 | after_start: None, |
298 | 0 | before_stop: None, |
299 | 0 | before_park: None, |
300 | 0 | after_unpark: None, |
301 | | |
302 | 0 | before_spawn: None, |
303 | 0 | after_termination: None, |
304 | | |
305 | | #[cfg(tokio_unstable)] |
306 | | before_poll: None, |
307 | | #[cfg(tokio_unstable)] |
308 | | after_poll: None, |
309 | | |
310 | 0 | keep_alive: None, |
311 | | |
312 | | // Defaults for these values depend on the scheduler kind, so we get them |
313 | | // as parameters. |
314 | 0 | global_queue_interval: None, |
315 | 0 | event_interval, |
316 | | |
317 | 0 | seed_generator: RngSeedGenerator::new(RngSeed::new()), |
318 | | |
319 | | #[cfg(tokio_unstable)] |
320 | | unhandled_panic: UnhandledPanic::Ignore, |
321 | | |
322 | | metrics_poll_count_histogram_enable: false, |
323 | | |
324 | 0 | metrics_poll_count_histogram: HistogramBuilder::default(), |
325 | | |
326 | | disable_lifo_slot: false, |
327 | | |
328 | 0 | timer_flavor: TimerFlavor::Traditional, |
329 | | } |
330 | 0 | } |
331 | | |
332 | | /// Enables both I/O and time drivers. |
333 | | /// |
334 | | /// Doing this is a shorthand for calling `enable_io` and `enable_time` |
335 | | /// individually. If additional components are added to Tokio in the future, |
336 | | /// `enable_all` will include these future components. |
337 | | /// |
338 | | /// # Examples |
339 | | /// |
340 | | /// ``` |
341 | | /// # #[cfg(not(target_family = "wasm"))] |
342 | | /// # { |
343 | | /// use tokio::runtime; |
344 | | /// |
345 | | /// let rt = runtime::Builder::new_multi_thread() |
346 | | /// .enable_all() |
347 | | /// .build() |
348 | | /// .unwrap(); |
349 | | /// # } |
350 | | /// ``` |
351 | 0 | pub fn enable_all(&mut self) -> &mut Self { |
352 | | #[cfg(any( |
353 | | feature = "net", |
354 | | all(unix, feature = "process"), |
355 | | all(unix, feature = "signal") |
356 | | ))] |
357 | | self.enable_io(); |
358 | | |
359 | | #[cfg(all( |
360 | | tokio_unstable, |
361 | | feature = "io-uring", |
362 | | feature = "rt", |
363 | | feature = "fs", |
364 | | target_os = "linux", |
365 | | ))] |
366 | | self.enable_io_uring(); |
367 | | |
368 | | #[cfg(feature = "time")] |
369 | 0 | self.enable_time(); |
370 | | |
371 | 0 | self |
372 | 0 | } |
373 | | |
374 | | /// Enables the alternative timer implementation, which is disabled by default. |
375 | | /// |
376 | | /// The alternative timer implementation is an unstable feature that may |
377 | | /// provide better performance on multi-threaded runtimes with a large number |
378 | | /// of worker threads. |
379 | | /// |
380 | | /// This option only applies to multi-threaded runtimes. Attempting to use |
381 | | /// this option with any other runtime type will have no effect. |
382 | | /// |
383 | | /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745) |
384 | | /// |
385 | | /// # Examples |
386 | | /// |
387 | | /// ``` |
388 | | /// # #[cfg(not(target_family = "wasm"))] |
389 | | /// # { |
390 | | /// use tokio::runtime; |
391 | | /// |
392 | | /// let rt = runtime::Builder::new_multi_thread() |
393 | | /// .enable_alt_timer() |
394 | | /// .build() |
395 | | /// .unwrap(); |
396 | | /// # } |
397 | | /// ``` |
398 | | #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))] |
399 | | #[cfg_attr( |
400 | | docsrs, |
401 | | doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))) |
402 | | )] |
403 | | pub fn enable_alt_timer(&mut self) -> &mut Self { |
404 | | self.enable_time(); |
405 | | self.timer_flavor = TimerFlavor::Alternative; |
406 | | self |
407 | | } |
408 | | |
409 | | /// Sets the number of worker threads the `Runtime` will use. |
410 | | /// |
411 | | /// This can be any number above 0 though it is advised to keep this value |
412 | | /// on the smaller side. |
413 | | /// |
414 | | /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`. |
415 | | /// |
416 | | /// # Default |
417 | | /// |
418 | | /// The default value is the number of cores available to the system. |
419 | | /// |
420 | | /// When using the `current_thread` runtime this method has no effect. |
421 | | /// |
422 | | /// # Examples |
423 | | /// |
424 | | /// ## Multi threaded runtime with 4 threads |
425 | | /// |
426 | | /// ``` |
427 | | /// # #[cfg(not(target_family = "wasm"))] |
428 | | /// # { |
429 | | /// use tokio::runtime; |
430 | | /// |
431 | | /// // This will spawn a work-stealing runtime with 4 worker threads. |
432 | | /// let rt = runtime::Builder::new_multi_thread() |
433 | | /// .worker_threads(4) |
434 | | /// .build() |
435 | | /// .unwrap(); |
436 | | /// |
437 | | /// rt.spawn(async move {}); |
438 | | /// # } |
439 | | /// ``` |
440 | | /// |
441 | | /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`) |
442 | | /// |
443 | | /// ``` |
444 | | /// use tokio::runtime; |
445 | | /// |
446 | | /// // Create a runtime that _must_ be driven from a call |
447 | | /// // to `Runtime::block_on`. |
448 | | /// let rt = runtime::Builder::new_current_thread() |
449 | | /// .build() |
450 | | /// .unwrap(); |
451 | | /// |
452 | | /// // This will run the runtime and future on the current thread |
453 | | /// rt.block_on(async move {}); |
454 | | /// ``` |
455 | | /// |
456 | | /// # Panics |
457 | | /// |
458 | | /// This will panic if `val` is not larger than `0`. |
459 | | #[track_caller] |
460 | 0 | pub fn worker_threads(&mut self, val: usize) -> &mut Self { |
461 | 0 | assert!(val > 0, "Worker threads cannot be set to 0"); |
462 | 0 | self.worker_threads = Some(val); |
463 | 0 | self |
464 | 0 | } |
465 | | |
466 | | /// Specifies the limit for additional threads spawned by the Runtime. |
467 | | /// |
468 | | /// These threads are used for blocking operations like tasks spawned |
469 | | /// through [`spawn_blocking`], this includes but is not limited to: |
470 | | /// - [`fs`] operations |
471 | | /// - dns resolution through [`ToSocketAddrs`] |
472 | | /// - writing to [`Stdout`] or [`Stderr`] |
473 | | /// - reading from [`Stdin`] |
474 | | /// |
475 | | /// Unlike the [`worker_threads`], they are not always active and will exit |
476 | | /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`]. |
477 | | /// |
478 | | /// It's recommended to not set this limit too low in order to avoid hanging on operations |
479 | | /// requiring [`spawn_blocking`]. |
480 | | /// |
481 | | /// The default value is 512. |
482 | | /// |
483 | | /// # Queue Behavior |
484 | | /// |
485 | | /// When a blocking task is submitted, it will be inserted into a queue. If available, one of |
486 | | /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this |
487 | | /// method has not been reached, a new thread will be spawned. If no idle thread is available |
488 | | /// and no more threads are allowed to be spawned, the task will remain in the queue until one |
489 | | /// of the busy threads pick it up. Note that since the queue does not apply any backpressure, |
490 | | /// it could potentially grow unbounded. |
491 | | /// |
492 | | /// # Panics |
493 | | /// |
494 | | /// This will panic if `val` is not larger than `0`. |
495 | | /// |
496 | | /// # Upgrading from 0.x |
497 | | /// |
498 | | /// In old versions `max_threads` limited both blocking and worker threads, but the |
499 | | /// current `max_blocking_threads` does not include async worker threads in the count. |
500 | | /// |
501 | | /// [`spawn_blocking`]: fn@crate::task::spawn_blocking |
502 | | /// [`fs`]: mod@crate::fs |
503 | | /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs |
504 | | /// [`Stdout`]: struct@crate::io::Stdout |
505 | | /// [`Stdin`]: struct@crate::io::Stdin |
506 | | /// [`Stderr`]: struct@crate::io::Stderr |
507 | | /// [`worker_threads`]: Self::worker_threads |
508 | | /// [`thread_keep_alive`]: Self::thread_keep_alive |
509 | | #[track_caller] |
510 | | #[cfg_attr(docsrs, doc(alias = "max_threads"))] |
511 | 0 | pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self { |
512 | 0 | assert!(val > 0, "Max blocking threads cannot be set to 0"); |
513 | 0 | self.max_blocking_threads = val; |
514 | 0 | self |
515 | 0 | } |
516 | | |
517 | | /// Sets name of threads spawned by the `Runtime`'s thread pool. |
518 | | /// |
519 | | /// The default name is "tokio-runtime-worker". |
520 | | /// |
521 | | /// # Examples |
522 | | /// |
523 | | /// ``` |
524 | | /// # #[cfg(not(target_family = "wasm"))] |
525 | | /// # { |
526 | | /// # use tokio::runtime; |
527 | | /// |
528 | | /// # pub fn main() { |
529 | | /// let rt = runtime::Builder::new_multi_thread() |
530 | | /// .thread_name("my-pool") |
531 | | /// .build(); |
532 | | /// # } |
533 | | /// # } |
534 | | /// ``` |
535 | 0 | pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self { |
536 | 0 | let val = val.into(); |
537 | 0 | self.thread_name = std::sync::Arc::new(move || val.clone()); |
538 | 0 | self |
539 | 0 | } |
540 | | |
541 | | /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool. |
542 | | /// |
543 | | /// The default name fn is `|| "tokio-runtime-worker".into()`. |
544 | | /// |
545 | | /// # Examples |
546 | | /// |
547 | | /// ``` |
548 | | /// # #[cfg(not(target_family = "wasm"))] |
549 | | /// # { |
550 | | /// # use tokio::runtime; |
551 | | /// # use std::sync::atomic::{AtomicUsize, Ordering}; |
552 | | /// # pub fn main() { |
553 | | /// let rt = runtime::Builder::new_multi_thread() |
554 | | /// .thread_name_fn(|| { |
555 | | /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); |
556 | | /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); |
557 | | /// format!("my-pool-{}", id) |
558 | | /// }) |
559 | | /// .build(); |
560 | | /// # } |
561 | | /// # } |
562 | | /// ``` |
563 | 0 | pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self |
564 | 0 | where |
565 | 0 | F: Fn() -> String + Send + Sync + 'static, |
566 | | { |
567 | 0 | self.thread_name = std::sync::Arc::new(f); |
568 | 0 | self |
569 | 0 | } |
570 | | |
571 | | /// Sets the stack size (in bytes) for worker threads. |
572 | | /// |
573 | | /// The actual stack size may be greater than this value if the platform |
574 | | /// specifies minimal stack size. |
575 | | /// |
576 | | /// The default stack size for spawned threads is 2 MiB, though this |
577 | | /// particular stack size is subject to change in the future. |
578 | | /// |
579 | | /// # Examples |
580 | | /// |
581 | | /// ``` |
582 | | /// # #[cfg(not(target_family = "wasm"))] |
583 | | /// # { |
584 | | /// # use tokio::runtime; |
585 | | /// |
586 | | /// # pub fn main() { |
587 | | /// let rt = runtime::Builder::new_multi_thread() |
588 | | /// .thread_stack_size(32 * 1024) |
589 | | /// .build(); |
590 | | /// # } |
591 | | /// # } |
592 | | /// ``` |
593 | 0 | pub fn thread_stack_size(&mut self, val: usize) -> &mut Self { |
594 | 0 | self.thread_stack_size = Some(val); |
595 | 0 | self |
596 | 0 | } |
597 | | |
598 | | /// Executes function `f` after each thread is started but before it starts |
599 | | /// doing work. |
600 | | /// |
601 | | /// This is intended for bookkeeping and monitoring use cases. |
602 | | /// |
603 | | /// # Examples |
604 | | /// |
605 | | /// ``` |
606 | | /// # #[cfg(not(target_family = "wasm"))] |
607 | | /// # { |
608 | | /// # use tokio::runtime; |
609 | | /// # pub fn main() { |
610 | | /// let runtime = runtime::Builder::new_multi_thread() |
611 | | /// .on_thread_start(|| { |
612 | | /// println!("thread started"); |
613 | | /// }) |
614 | | /// .build(); |
615 | | /// # } |
616 | | /// # } |
617 | | /// ``` |
618 | | #[cfg(not(loom))] |
619 | 0 | pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self |
620 | 0 | where |
621 | 0 | F: Fn() + Send + Sync + 'static, |
622 | | { |
623 | 0 | self.after_start = Some(std::sync::Arc::new(f)); |
624 | 0 | self |
625 | 0 | } |
626 | | |
627 | | /// Executes function `f` before each thread stops. |
628 | | /// |
629 | | /// This is intended for bookkeeping and monitoring use cases. |
630 | | /// |
631 | | /// # Examples |
632 | | /// |
633 | | /// ``` |
634 | | /// # #[cfg(not(target_family = "wasm"))] |
635 | | /// { |
636 | | /// # use tokio::runtime; |
637 | | /// # pub fn main() { |
638 | | /// let runtime = runtime::Builder::new_multi_thread() |
639 | | /// .on_thread_stop(|| { |
640 | | /// println!("thread stopping"); |
641 | | /// }) |
642 | | /// .build(); |
643 | | /// # } |
644 | | /// # } |
645 | | /// ``` |
646 | | #[cfg(not(loom))] |
647 | 0 | pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self |
648 | 0 | where |
649 | 0 | F: Fn() + Send + Sync + 'static, |
650 | | { |
651 | 0 | self.before_stop = Some(std::sync::Arc::new(f)); |
652 | 0 | self |
653 | 0 | } |
654 | | |
655 | | /// Executes function `f` just before a thread is parked (goes idle). |
656 | | /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn) |
657 | | /// can be called, and may result in this thread being unparked immediately. |
658 | | /// |
659 | | /// This can be used to start work only when the executor is idle, or for bookkeeping |
660 | | /// and monitoring purposes. |
661 | | /// |
662 | | /// Note: There can only be one park callback for a runtime; calling this function |
663 | | /// more than once replaces the last callback defined, rather than adding to it. |
664 | | /// |
665 | | /// # Examples |
666 | | /// |
667 | | /// ## Multithreaded executor |
668 | | /// ``` |
669 | | /// # #[cfg(not(target_family = "wasm"))] |
670 | | /// # { |
671 | | /// # use std::sync::Arc; |
672 | | /// # use std::sync::atomic::{AtomicBool, Ordering}; |
673 | | /// # use tokio::runtime; |
674 | | /// # use tokio::sync::Barrier; |
675 | | /// # pub fn main() { |
676 | | /// let once = AtomicBool::new(true); |
677 | | /// let barrier = Arc::new(Barrier::new(2)); |
678 | | /// |
679 | | /// let runtime = runtime::Builder::new_multi_thread() |
680 | | /// .worker_threads(1) |
681 | | /// .on_thread_park({ |
682 | | /// let barrier = barrier.clone(); |
683 | | /// move || { |
684 | | /// let barrier = barrier.clone(); |
685 | | /// if once.swap(false, Ordering::Relaxed) { |
686 | | /// tokio::spawn(async move { barrier.wait().await; }); |
687 | | /// } |
688 | | /// } |
689 | | /// }) |
690 | | /// .build() |
691 | | /// .unwrap(); |
692 | | /// |
693 | | /// runtime.block_on(async { |
694 | | /// barrier.wait().await; |
695 | | /// }) |
696 | | /// # } |
697 | | /// # } |
698 | | /// ``` |
699 | | /// ## Current thread executor |
700 | | /// ``` |
701 | | /// # use std::sync::Arc; |
702 | | /// # use std::sync::atomic::{AtomicBool, Ordering}; |
703 | | /// # use tokio::runtime; |
704 | | /// # use tokio::sync::Barrier; |
705 | | /// # pub fn main() { |
706 | | /// let once = AtomicBool::new(true); |
707 | | /// let barrier = Arc::new(Barrier::new(2)); |
708 | | /// |
709 | | /// let runtime = runtime::Builder::new_current_thread() |
710 | | /// .on_thread_park({ |
711 | | /// let barrier = barrier.clone(); |
712 | | /// move || { |
713 | | /// let barrier = barrier.clone(); |
714 | | /// if once.swap(false, Ordering::Relaxed) { |
715 | | /// tokio::spawn(async move { barrier.wait().await; }); |
716 | | /// } |
717 | | /// } |
718 | | /// }) |
719 | | /// .build() |
720 | | /// .unwrap(); |
721 | | /// |
722 | | /// runtime.block_on(async { |
723 | | /// barrier.wait().await; |
724 | | /// }) |
725 | | /// # } |
726 | | /// ``` |
727 | | #[cfg(not(loom))] |
728 | 0 | pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self |
729 | 0 | where |
730 | 0 | F: Fn() + Send + Sync + 'static, |
731 | | { |
732 | 0 | self.before_park = Some(std::sync::Arc::new(f)); |
733 | 0 | self |
734 | 0 | } |
735 | | |
736 | | /// Executes function `f` just after a thread unparks (starts executing tasks). |
737 | | /// |
738 | | /// This is intended for bookkeeping and monitoring use cases; note that work |
739 | | /// in this callback will increase latencies when the application has allowed one or |
740 | | /// more runtime threads to go idle. |
741 | | /// |
742 | | /// Note: There can only be one unpark callback for a runtime; calling this function |
743 | | /// more than once replaces the last callback defined, rather than adding to it. |
744 | | /// |
745 | | /// # Examples |
746 | | /// |
747 | | /// ``` |
748 | | /// # #[cfg(not(target_family = "wasm"))] |
749 | | /// # { |
750 | | /// # use tokio::runtime; |
751 | | /// # pub fn main() { |
752 | | /// let runtime = runtime::Builder::new_multi_thread() |
753 | | /// .on_thread_unpark(|| { |
754 | | /// println!("thread unparking"); |
755 | | /// }) |
756 | | /// .build(); |
757 | | /// |
758 | | /// runtime.unwrap().block_on(async { |
759 | | /// tokio::task::yield_now().await; |
760 | | /// println!("Hello from Tokio!"); |
761 | | /// }) |
762 | | /// # } |
763 | | /// # } |
764 | | /// ``` |
765 | | #[cfg(not(loom))] |
766 | 0 | pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self |
767 | 0 | where |
768 | 0 | F: Fn() + Send + Sync + 'static, |
769 | | { |
770 | 0 | self.after_unpark = Some(std::sync::Arc::new(f)); |
771 | 0 | self |
772 | 0 | } |
773 | | |
774 | | /// Executes function `f` just before a task is spawned. |
775 | | /// |
776 | | /// `f` is called within the Tokio context, so functions like |
777 | | /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being |
778 | | /// invoked immediately. |
779 | | /// |
780 | | /// This can be used for bookkeeping or monitoring purposes. |
781 | | /// |
782 | | /// Note: There can only be one spawn callback for a runtime; calling this function more |
783 | | /// than once replaces the last callback defined, rather than adding to it. |
784 | | /// |
785 | | /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time. |
786 | | /// |
787 | | /// **Note**: This is an [unstable API][unstable]. The public API of this type |
788 | | /// may break in 1.x releases. See [the documentation on unstable |
789 | | /// features][unstable] for details. |
790 | | /// |
791 | | /// [unstable]: crate#unstable-features |
792 | | /// |
793 | | /// # Examples |
794 | | /// |
795 | | /// ``` |
796 | | /// # use tokio::runtime; |
797 | | /// # pub fn main() { |
798 | | /// let runtime = runtime::Builder::new_current_thread() |
799 | | /// .on_task_spawn(|_| { |
800 | | /// println!("spawning task"); |
801 | | /// }) |
802 | | /// .build() |
803 | | /// .unwrap(); |
804 | | /// |
805 | | /// runtime.block_on(async { |
806 | | /// tokio::task::spawn(std::future::ready(())); |
807 | | /// |
808 | | /// for _ in 0..64 { |
809 | | /// tokio::task::yield_now().await; |
810 | | /// } |
811 | | /// }) |
812 | | /// # } |
813 | | /// ``` |
814 | | #[cfg(all(not(loom), tokio_unstable))] |
815 | | #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] |
816 | | pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self |
817 | | where |
818 | | F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
819 | | { |
820 | | self.before_spawn = Some(std::sync::Arc::new(f)); |
821 | | self |
822 | | } |
823 | | |
824 | | /// Executes function `f` just before a task is polled |
825 | | /// |
826 | | /// `f` is called within the Tokio context, so functions like |
827 | | /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being |
828 | | /// invoked immediately. |
829 | | /// |
830 | | /// **Note**: This is an [unstable API][unstable]. The public API of this type |
831 | | /// may break in 1.x releases. See [the documentation on unstable |
832 | | /// features][unstable] for details. |
833 | | /// |
834 | | /// [unstable]: crate#unstable-features |
835 | | /// |
836 | | /// # Examples |
837 | | /// |
838 | | /// ``` |
839 | | /// # #[cfg(not(target_family = "wasm"))] |
840 | | /// # { |
841 | | /// # use std::sync::{atomic::AtomicUsize, Arc}; |
842 | | /// # use tokio::task::yield_now; |
843 | | /// # pub fn main() { |
844 | | /// let poll_start_counter = Arc::new(AtomicUsize::new(0)); |
845 | | /// let poll_start = poll_start_counter.clone(); |
846 | | /// let rt = tokio::runtime::Builder::new_multi_thread() |
847 | | /// .enable_all() |
848 | | /// .on_before_task_poll(move |meta| { |
849 | | /// println!("task {} is about to be polled", meta.id()) |
850 | | /// }) |
851 | | /// .build() |
852 | | /// .unwrap(); |
853 | | /// let task = rt.spawn(async { |
854 | | /// yield_now().await; |
855 | | /// }); |
856 | | /// let _ = rt.block_on(task); |
857 | | /// |
858 | | /// # } |
859 | | /// # } |
860 | | /// ``` |
861 | | #[cfg(tokio_unstable)] |
862 | | #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] |
863 | | pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self |
864 | | where |
865 | | F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
866 | | { |
867 | | self.before_poll = Some(std::sync::Arc::new(f)); |
868 | | self |
869 | | } |
870 | | |
871 | | /// Executes function `f` just after a task is polled |
872 | | /// |
873 | | /// `f` is called within the Tokio context, so functions like |
874 | | /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being |
875 | | /// invoked immediately. |
876 | | /// |
877 | | /// **Note**: This is an [unstable API][unstable]. The public API of this type |
878 | | /// may break in 1.x releases. See [the documentation on unstable |
879 | | /// features][unstable] for details. |
880 | | /// |
881 | | /// [unstable]: crate#unstable-features |
882 | | /// |
883 | | /// # Examples |
884 | | /// |
885 | | /// ``` |
886 | | /// # #[cfg(not(target_family = "wasm"))] |
887 | | /// # { |
888 | | /// # use std::sync::{atomic::AtomicUsize, Arc}; |
889 | | /// # use tokio::task::yield_now; |
890 | | /// # pub fn main() { |
891 | | /// let poll_stop_counter = Arc::new(AtomicUsize::new(0)); |
892 | | /// let poll_stop = poll_stop_counter.clone(); |
893 | | /// let rt = tokio::runtime::Builder::new_multi_thread() |
894 | | /// .enable_all() |
895 | | /// .on_after_task_poll(move |meta| { |
896 | | /// println!("task {} completed polling", meta.id()); |
897 | | /// }) |
898 | | /// .build() |
899 | | /// .unwrap(); |
900 | | /// let task = rt.spawn(async { |
901 | | /// yield_now().await; |
902 | | /// }); |
903 | | /// let _ = rt.block_on(task); |
904 | | /// |
905 | | /// # } |
906 | | /// # } |
907 | | /// ``` |
908 | | #[cfg(tokio_unstable)] |
909 | | #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] |
910 | | pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self |
911 | | where |
912 | | F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
913 | | { |
914 | | self.after_poll = Some(std::sync::Arc::new(f)); |
915 | | self |
916 | | } |
917 | | |
918 | | /// Executes function `f` just after a task is terminated. |
919 | | /// |
920 | | /// `f` is called within the Tokio context, so functions like |
921 | | /// [`tokio::spawn`](crate::spawn) can be called. |
922 | | /// |
923 | | /// This can be used for bookkeeping or monitoring purposes. |
924 | | /// |
925 | | /// Note: There can only be one task termination callback for a runtime; calling this |
926 | | /// function more than once replaces the last callback defined, rather than adding to it. |
927 | | /// |
928 | | /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time. |
929 | | /// |
930 | | /// **Note**: This is an [unstable API][unstable]. The public API of this type |
931 | | /// may break in 1.x releases. See [the documentation on unstable |
932 | | /// features][unstable] for details. |
933 | | /// |
934 | | /// [unstable]: crate#unstable-features |
935 | | /// |
936 | | /// # Examples |
937 | | /// |
938 | | /// ``` |
939 | | /// # use tokio::runtime; |
940 | | /// # pub fn main() { |
941 | | /// let runtime = runtime::Builder::new_current_thread() |
942 | | /// .on_task_terminate(|_| { |
943 | | /// println!("killing task"); |
944 | | /// }) |
945 | | /// .build() |
946 | | /// .unwrap(); |
947 | | /// |
948 | | /// runtime.block_on(async { |
949 | | /// tokio::task::spawn(std::future::ready(())); |
950 | | /// |
951 | | /// for _ in 0..64 { |
952 | | /// tokio::task::yield_now().await; |
953 | | /// } |
954 | | /// }) |
955 | | /// # } |
956 | | /// ``` |
957 | | #[cfg(all(not(loom), tokio_unstable))] |
958 | | #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] |
959 | | pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self |
960 | | where |
961 | | F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, |
962 | | { |
963 | | self.after_termination = Some(std::sync::Arc::new(f)); |
964 | | self |
965 | | } |
966 | | |
967 | | /// Creates the configured `Runtime`. |
968 | | /// |
969 | | /// The returned `Runtime` instance is ready to spawn tasks. |
970 | | /// |
971 | | /// # Examples |
972 | | /// |
973 | | /// ``` |
974 | | /// # #[cfg(not(target_family = "wasm"))] |
975 | | /// # { |
976 | | /// use tokio::runtime::Builder; |
977 | | /// |
978 | | /// let rt = Builder::new_multi_thread().build().unwrap(); |
979 | | /// |
980 | | /// rt.block_on(async { |
981 | | /// println!("Hello from the Tokio runtime"); |
982 | | /// }); |
983 | | /// # } |
984 | | /// ``` |
985 | 0 | pub fn build(&mut self) -> io::Result<Runtime> { |
986 | 0 | match &self.kind { |
987 | 0 | Kind::CurrentThread => self.build_current_thread_runtime(), |
988 | | #[cfg(feature = "rt-multi-thread")] |
989 | | Kind::MultiThread => self.build_threaded_runtime(), |
990 | | } |
991 | 0 | } |
992 | | |
993 | | /// Creates the configured [`LocalRuntime`]. |
994 | | /// |
995 | | /// The returned [`LocalRuntime`] instance is ready to spawn tasks. |
996 | | /// |
997 | | /// # Panics |
998 | | /// |
999 | | /// This will panic if the runtime is configured with [`new_multi_thread()`]. |
1000 | | /// |
1001 | | /// [`new_multi_thread()`]: Builder::new_multi_thread |
1002 | | /// |
1003 | | /// # Examples |
1004 | | /// |
1005 | | /// ``` |
1006 | | /// use tokio::runtime::{Builder, LocalOptions}; |
1007 | | /// |
1008 | | /// let rt = Builder::new_current_thread() |
1009 | | /// .build_local(LocalOptions::default()) |
1010 | | /// .unwrap(); |
1011 | | /// |
1012 | | /// rt.spawn_local(async { |
1013 | | /// println!("Hello from the Tokio runtime"); |
1014 | | /// }); |
1015 | | /// ``` |
1016 | | #[allow(unused_variables, unreachable_patterns)] |
1017 | | #[cfg(tokio_unstable)] |
1018 | | #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] |
1019 | | pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> { |
1020 | | match &self.kind { |
1021 | | Kind::CurrentThread => self.build_current_thread_local_runtime(), |
1022 | | #[cfg(feature = "rt-multi-thread")] |
1023 | | Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"), |
1024 | | } |
1025 | | } |
1026 | | |
1027 | 0 | fn get_cfg(&self) -> driver::Cfg { |
1028 | 0 | driver::Cfg { |
1029 | 0 | enable_pause_time: match self.kind { |
1030 | 0 | Kind::CurrentThread => true, |
1031 | 0 | #[cfg(feature = "rt-multi-thread")] |
1032 | 0 | Kind::MultiThread => false, |
1033 | 0 | }, |
1034 | 0 | enable_io: self.enable_io, |
1035 | 0 | enable_time: self.enable_time, |
1036 | 0 | start_paused: self.start_paused, |
1037 | 0 | nevents: self.nevents, |
1038 | 0 | timer_flavor: self.timer_flavor, |
1039 | 0 | } |
1040 | 0 | } |
1041 | | |
1042 | | /// Sets a custom timeout for a thread in the blocking pool. |
1043 | | /// |
1044 | | /// By default, the timeout for a thread is set to 10 seconds. This can |
1045 | | /// be overridden using `.thread_keep_alive()`. |
1046 | | /// |
1047 | | /// # Example |
1048 | | /// |
1049 | | /// ``` |
1050 | | /// # #[cfg(not(target_family = "wasm"))] |
1051 | | /// # { |
1052 | | /// # use tokio::runtime; |
1053 | | /// # use std::time::Duration; |
1054 | | /// # pub fn main() { |
1055 | | /// let rt = runtime::Builder::new_multi_thread() |
1056 | | /// .thread_keep_alive(Duration::from_millis(100)) |
1057 | | /// .build(); |
1058 | | /// # } |
1059 | | /// # } |
1060 | | /// ``` |
1061 | 0 | pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self { |
1062 | 0 | self.keep_alive = Some(duration); |
1063 | 0 | self |
1064 | 0 | } |
1065 | | |
1066 | | /// Sets the number of scheduler ticks after which the scheduler will poll the global |
1067 | | /// task queue. |
1068 | | /// |
1069 | | /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. |
1070 | | /// |
1071 | | /// By default the global queue interval is 31 for the current-thread scheduler. Please see |
1072 | | /// [the module documentation] for the default behavior of the multi-thread scheduler. |
1073 | | /// |
1074 | | /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming |
1075 | | /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler, |
1076 | | /// at the cost of more synchronization overhead. That can be beneficial for prioritizing |
1077 | | /// getting started on new work, especially if tasks frequently yield rather than complete |
1078 | | /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and |
1079 | | /// tasks from the local queue will be executed only if the global queue is empty. |
1080 | | /// Conversely, a higher value prioritizes existing work, and is a good choice when most |
1081 | | /// tasks quickly complete polling. |
1082 | | /// |
1083 | | /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing |
1084 | | /// |
1085 | | /// # Panics |
1086 | | /// |
1087 | | /// This function will panic if 0 is passed as an argument. |
1088 | | /// |
1089 | | /// # Examples |
1090 | | /// |
1091 | | /// ``` |
1092 | | /// # #[cfg(not(target_family = "wasm"))] |
1093 | | /// # { |
1094 | | /// # use tokio::runtime; |
1095 | | /// # pub fn main() { |
1096 | | /// let rt = runtime::Builder::new_multi_thread() |
1097 | | /// .global_queue_interval(31) |
1098 | | /// .build(); |
1099 | | /// # } |
1100 | | /// # } |
1101 | | /// ``` |
1102 | | #[track_caller] |
1103 | 0 | pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { |
1104 | 0 | assert!(val > 0, "global_queue_interval must be greater than 0"); |
1105 | 0 | self.global_queue_interval = Some(val); |
1106 | 0 | self |
1107 | 0 | } |
1108 | | |
1109 | | /// Sets the number of scheduler ticks after which the scheduler will poll for |
1110 | | /// external events (timers, I/O, and so on). |
1111 | | /// |
1112 | | /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. |
1113 | | /// |
1114 | | /// By default, the event interval is `61` for all scheduler types. |
1115 | | /// |
1116 | | /// Setting the event interval determines the effective "priority" of delivering |
1117 | | /// these external events (which may wake up additional tasks), compared to |
1118 | | /// executing tasks that are currently ready to run. A smaller value is useful |
1119 | | /// when tasks frequently spend a long time in polling, or frequently yield, |
1120 | | /// which can result in overly long delays picking up I/O events. Conversely, |
1121 | | /// picking up new events requires extra synchronization and syscall overhead, |
1122 | | /// so if tasks generally complete their polling quickly, a higher event interval |
1123 | | /// will minimize that overhead while still keeping the scheduler responsive to |
1124 | | /// events. |
1125 | | /// |
1126 | | /// # Examples |
1127 | | /// |
1128 | | /// ``` |
1129 | | /// # #[cfg(not(target_family = "wasm"))] |
1130 | | /// # { |
1131 | | /// # use tokio::runtime; |
1132 | | /// # pub fn main() { |
1133 | | /// let rt = runtime::Builder::new_multi_thread() |
1134 | | /// .event_interval(31) |
1135 | | /// .build(); |
1136 | | /// # } |
1137 | | /// # } |
1138 | | /// ``` |
1139 | 0 | pub fn event_interval(&mut self, val: u32) -> &mut Self { |
1140 | 0 | self.event_interval = val; |
1141 | 0 | self |
1142 | 0 | } |
1143 | | |
1144 | | cfg_unstable! { |
1145 | | /// Configure how the runtime responds to an unhandled panic on a |
1146 | | /// spawned task. |
1147 | | /// |
1148 | | /// By default, an unhandled panic (i.e. a panic not caught by |
1149 | | /// [`std::panic::catch_unwind`]) has no impact on the runtime's |
1150 | | /// execution. The panic's error value is forwarded to the task's |
1151 | | /// [`JoinHandle`] and all other spawned tasks continue running. |
1152 | | /// |
1153 | | /// The `unhandled_panic` option enables configuring this behavior. |
1154 | | /// |
1155 | | /// * `UnhandledPanic::Ignore` is the default behavior. Panics on |
1156 | | /// spawned tasks have no impact on the runtime's execution. |
1157 | | /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to |
1158 | | /// shutdown immediately when a spawned task panics even if that |
1159 | | /// task's `JoinHandle` has not been dropped. All other spawned tasks |
1160 | | /// will immediately terminate and further calls to |
1161 | | /// [`Runtime::block_on`] will panic. |
1162 | | /// |
1163 | | /// # Panics |
1164 | | /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`] |
1165 | | /// on a runtime other than the current thread runtime. |
1166 | | /// |
1167 | | /// # Unstable |
1168 | | /// |
1169 | | /// This option is currently unstable and its implementation is |
1170 | | /// incomplete. The API may change or be removed in the future. See |
1171 | | /// issue [tokio-rs/tokio#4516] for more details. |
1172 | | /// |
1173 | | /// # Examples |
1174 | | /// |
1175 | | /// The following demonstrates a runtime configured to shutdown on |
1176 | | /// panic. The first spawned task panics and results in the runtime |
1177 | | /// shutting down. The second spawned task never has a chance to |
1178 | | /// execute. The call to `block_on` will panic due to the runtime being |
1179 | | /// forcibly shutdown. |
1180 | | /// |
1181 | | /// ```should_panic |
1182 | | /// use tokio::runtime::{self, UnhandledPanic}; |
1183 | | /// |
1184 | | /// # pub fn main() { |
1185 | | /// let rt = runtime::Builder::new_current_thread() |
1186 | | /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) |
1187 | | /// .build() |
1188 | | /// .unwrap(); |
1189 | | /// |
1190 | | /// rt.spawn(async { panic!("boom"); }); |
1191 | | /// rt.spawn(async { |
1192 | | /// // This task never completes. |
1193 | | /// }); |
1194 | | /// |
1195 | | /// rt.block_on(async { |
1196 | | /// // Do some work |
1197 | | /// # loop { tokio::task::yield_now().await; } |
1198 | | /// }) |
1199 | | /// # } |
1200 | | /// ``` |
1201 | | /// |
1202 | | /// [`JoinHandle`]: struct@crate::task::JoinHandle |
1203 | | /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516 |
1204 | | pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self { |
1205 | | if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) { |
1206 | | panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime"); |
1207 | | } |
1208 | | |
1209 | | self.unhandled_panic = behavior; |
1210 | | self |
1211 | | } |
1212 | | |
1213 | | /// Disables the LIFO task scheduler heuristic. |
1214 | | /// |
1215 | | /// The multi-threaded scheduler includes a heuristic for optimizing |
1216 | | /// message-passing patterns. This heuristic results in the **last** |
1217 | | /// scheduled task being polled first. |
1218 | | /// |
1219 | | /// To implement this heuristic, each worker thread has a slot which |
1220 | | /// holds the task that should be polled next. However, this slot cannot |
1221 | | /// be stolen by other worker threads, which can result in lower total |
1222 | | /// throughput when tasks tend to have longer poll times. |
1223 | | /// |
1224 | | /// This configuration option will disable this heuristic resulting in |
1225 | | /// all scheduled tasks being pushed into the worker-local queue, which |
1226 | | /// is stealable. |
1227 | | /// |
1228 | | /// Consider trying this option when the task "scheduled" time is high |
1229 | | /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to |
1230 | | /// collect this data. |
1231 | | /// |
1232 | | /// # Unstable |
1233 | | /// |
1234 | | /// This configuration option is considered a workaround for the LIFO |
1235 | | /// slot not being stealable. When the slot becomes stealable, we will |
1236 | | /// revisit whether or not this option is necessary. See |
1237 | | /// issue [tokio-rs/tokio#4941]. |
1238 | | /// |
1239 | | /// # Examples |
1240 | | /// |
1241 | | /// ``` |
1242 | | /// # #[cfg(not(target_family = "wasm"))] |
1243 | | /// # { |
1244 | | /// use tokio::runtime; |
1245 | | /// |
1246 | | /// let rt = runtime::Builder::new_multi_thread() |
1247 | | /// .disable_lifo_slot() |
1248 | | /// .build() |
1249 | | /// .unwrap(); |
1250 | | /// # } |
1251 | | /// ``` |
1252 | | /// |
1253 | | /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics |
1254 | | /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941 |
1255 | | pub fn disable_lifo_slot(&mut self) -> &mut Self { |
1256 | | self.disable_lifo_slot = true; |
1257 | | self |
1258 | | } |
1259 | | |
1260 | | /// Specifies the random number generation seed to use within all |
1261 | | /// threads associated with the runtime being built. |
1262 | | /// |
1263 | | /// This option is intended to make certain parts of the runtime |
1264 | | /// deterministic (e.g. the [`tokio::select!`] macro). In the case of |
1265 | | /// [`tokio::select!`] it will ensure that the order that branches are |
1266 | | /// polled is deterministic. |
1267 | | /// |
1268 | | /// In addition to the code specifying `rng_seed` and interacting with |
1269 | | /// the runtime, the internals of Tokio and the Rust compiler may affect |
1270 | | /// the sequences of random numbers. In order to ensure repeatable |
1271 | | /// results, the version of Tokio, the versions of all other |
1272 | | /// dependencies that interact with Tokio, and the Rust compiler version |
1273 | | /// should also all remain constant. |
1274 | | /// |
1275 | | /// # Examples |
1276 | | /// |
1277 | | /// ``` |
1278 | | /// # use tokio::runtime::{self, RngSeed}; |
1279 | | /// # pub fn main() { |
1280 | | /// let seed = RngSeed::from_bytes(b"place your seed here"); |
1281 | | /// let rt = runtime::Builder::new_current_thread() |
1282 | | /// .rng_seed(seed) |
1283 | | /// .build(); |
1284 | | /// # } |
1285 | | /// ``` |
1286 | | /// |
1287 | | /// [`tokio::select!`]: crate::select |
1288 | | pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { |
1289 | | self.seed_generator = RngSeedGenerator::new(seed); |
1290 | | self |
1291 | | } |
1292 | | } |
1293 | | |
1294 | | cfg_unstable_metrics! { |
1295 | | /// Enables tracking the distribution of task poll times. |
1296 | | /// |
1297 | | /// Task poll times are not instrumented by default as doing so requires |
1298 | | /// calling [`Instant::now()`] twice per task poll, which could add |
1299 | | /// measurable overhead. Use the [`Handle::metrics()`] to access the |
1300 | | /// metrics data. |
1301 | | /// |
1302 | | /// The histogram uses fixed bucket sizes. In other words, the histogram |
1303 | | /// buckets are not dynamic based on input values. Use the |
1304 | | /// `metrics_poll_time_histogram` builder methods to configure the |
1305 | | /// histogram details. |
1306 | | /// |
1307 | | /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. |
1308 | | /// This has an extremely low memory footprint, but may not provide enough granularity. For |
1309 | | /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`] |
1310 | | /// to select [`LogHistogram`] instead. |
1311 | | /// |
1312 | | /// # Examples |
1313 | | /// |
1314 | | /// ``` |
1315 | | /// # #[cfg(not(target_family = "wasm"))] |
1316 | | /// # { |
1317 | | /// use tokio::runtime; |
1318 | | /// |
1319 | | /// let rt = runtime::Builder::new_multi_thread() |
1320 | | /// .enable_metrics_poll_time_histogram() |
1321 | | /// .build() |
1322 | | /// .unwrap(); |
1323 | | /// # // Test default values here |
1324 | | /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) } |
1325 | | /// # let m = rt.handle().metrics(); |
1326 | | /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10); |
1327 | | /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100)); |
1328 | | /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200)); |
1329 | | /// # } |
1330 | | /// ``` |
1331 | | /// |
1332 | | /// [`Handle::metrics()`]: crate::runtime::Handle::metrics |
1333 | | /// [`Instant::now()`]: std::time::Instant::now |
1334 | | /// [`LogHistogram`]: crate::runtime::LogHistogram |
1335 | | /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration |
1336 | | pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self { |
1337 | | self.metrics_poll_count_histogram_enable = true; |
1338 | | self |
1339 | | } |
1340 | | |
1341 | | /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead. |
1342 | | /// |
1343 | | /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram |
1344 | | #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")] |
1345 | | #[doc(hidden)] |
1346 | | pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self { |
1347 | | self.enable_metrics_poll_time_histogram() |
1348 | | } |
1349 | | |
1350 | | /// Sets the histogram scale for tracking the distribution of task poll |
1351 | | /// times. |
1352 | | /// |
1353 | | /// Tracking the distribution of task poll times can be done using a |
1354 | | /// linear or log scale. When using linear scale, each histogram bucket |
1355 | | /// will represent the same range of poll times. When using log scale, |
1356 | | /// each histogram bucket will cover a range twice as big as the |
1357 | | /// previous bucket. |
1358 | | /// |
1359 | | /// **Default:** linear scale. |
1360 | | /// |
1361 | | /// # Examples |
1362 | | /// |
1363 | | /// ``` |
1364 | | /// # #[cfg(not(target_family = "wasm"))] |
1365 | | /// # { |
1366 | | /// use tokio::runtime::{self, HistogramScale}; |
1367 | | /// |
1368 | | /// # #[allow(deprecated)] |
1369 | | /// let rt = runtime::Builder::new_multi_thread() |
1370 | | /// .enable_metrics_poll_time_histogram() |
1371 | | /// .metrics_poll_count_histogram_scale(HistogramScale::Log) |
1372 | | /// .build() |
1373 | | /// .unwrap(); |
1374 | | /// # } |
1375 | | /// ``` |
1376 | | #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")] |
1377 | | pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self { |
1378 | | self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale); |
1379 | | self |
1380 | | } |
1381 | | |
1382 | | /// Configure the histogram for tracking poll times |
1383 | | /// |
1384 | | /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. |
1385 | | /// This has an extremely low memory footprint, but may not provide enough granularity. For |
1386 | | /// better granularity with low memory usage, use [`LogHistogram`] instead. |
1387 | | /// |
1388 | | /// # Examples |
1389 | | /// Configure a [`LogHistogram`] with [default configuration]: |
1390 | | /// ``` |
1391 | | /// # #[cfg(not(target_family = "wasm"))] |
1392 | | /// # { |
1393 | | /// use tokio::runtime; |
1394 | | /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; |
1395 | | /// |
1396 | | /// let rt = runtime::Builder::new_multi_thread() |
1397 | | /// .enable_metrics_poll_time_histogram() |
1398 | | /// .metrics_poll_time_histogram_configuration( |
1399 | | /// HistogramConfiguration::log(LogHistogram::default()) |
1400 | | /// ) |
1401 | | /// .build() |
1402 | | /// .unwrap(); |
1403 | | /// # } |
1404 | | /// ``` |
1405 | | /// |
1406 | | /// Configure a linear histogram with 100 buckets, each 10μs wide |
1407 | | /// ``` |
1408 | | /// # #[cfg(not(target_family = "wasm"))] |
1409 | | /// # { |
1410 | | /// use tokio::runtime; |
1411 | | /// use std::time::Duration; |
1412 | | /// use tokio::runtime::HistogramConfiguration; |
1413 | | /// |
1414 | | /// let rt = runtime::Builder::new_multi_thread() |
1415 | | /// .enable_metrics_poll_time_histogram() |
1416 | | /// .metrics_poll_time_histogram_configuration( |
1417 | | /// HistogramConfiguration::linear(Duration::from_micros(10), 100) |
1418 | | /// ) |
1419 | | /// .build() |
1420 | | /// .unwrap(); |
1421 | | /// # } |
1422 | | /// ``` |
1423 | | /// |
1424 | | /// Configure a [`LogHistogram`] with the following settings: |
1425 | | /// - Measure times from 100ns to 120s |
1426 | | /// - Max error of 0.1 |
1427 | | /// - No more than 1024 buckets |
1428 | | /// ``` |
1429 | | /// # #[cfg(not(target_family = "wasm"))] |
1430 | | /// # { |
1431 | | /// use std::time::Duration; |
1432 | | /// use tokio::runtime; |
1433 | | /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; |
1434 | | /// |
1435 | | /// let rt = runtime::Builder::new_multi_thread() |
1436 | | /// .enable_metrics_poll_time_histogram() |
1437 | | /// .metrics_poll_time_histogram_configuration( |
1438 | | /// HistogramConfiguration::log(LogHistogram::builder() |
1439 | | /// .max_value(Duration::from_secs(120)) |
1440 | | /// .min_value(Duration::from_nanos(100)) |
1441 | | /// .max_error(0.1) |
1442 | | /// .max_buckets(1024) |
1443 | | /// .expect("configuration uses 488 buckets") |
1444 | | /// ) |
1445 | | /// ) |
1446 | | /// .build() |
1447 | | /// .unwrap(); |
1448 | | /// # } |
1449 | | /// ``` |
1450 | | /// |
1451 | | /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting |
1452 | | /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram |
1453 | | /// where each bucket is twice the size of the previous bucket. |
1454 | | /// ```rust |
1455 | | /// use std::time::Duration; |
1456 | | /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; |
1457 | | /// let rt = tokio::runtime::Builder::new_current_thread() |
1458 | | /// .enable_all() |
1459 | | /// .enable_metrics_poll_time_histogram() |
1460 | | /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log( |
1461 | | /// LogHistogram::builder() |
1462 | | /// .min_value(Duration::from_micros(20)) |
1463 | | /// .max_value(Duration::from_millis(4)) |
1464 | | /// // Set `precision_exact` to `0` to match `HistogramScale::Log` |
1465 | | /// .precision_exact(0) |
1466 | | /// .max_buckets(10) |
1467 | | /// .unwrap(), |
1468 | | /// )) |
1469 | | /// .build() |
1470 | | /// .unwrap(); |
1471 | | /// ``` |
1472 | | /// |
1473 | | /// [`LogHistogram`]: crate::runtime::LogHistogram |
1474 | | /// [default configuration]: crate::runtime::LogHistogramBuilder |
1475 | | /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log |
1476 | | pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self { |
1477 | | self.metrics_poll_count_histogram.histogram_type = configuration.inner; |
1478 | | self |
1479 | | } |
1480 | | |
1481 | | /// Sets the histogram resolution for tracking the distribution of task |
1482 | | /// poll times. |
1483 | | /// |
1484 | | /// The resolution is the histogram's first bucket's range. When using a |
1485 | | /// linear histogram scale, each bucket will cover the same range. When |
1486 | | /// using a log scale, each bucket will cover a range twice as big as |
1487 | | /// the previous bucket. In the log case, the resolution represents the |
1488 | | /// smallest bucket range. |
1489 | | /// |
1490 | | /// Note that, when using log scale, the resolution is rounded up to the |
1491 | | /// nearest power of 2 in nanoseconds. |
1492 | | /// |
1493 | | /// **Default:** 100 microseconds. |
1494 | | /// |
1495 | | /// # Examples |
1496 | | /// |
1497 | | /// ``` |
1498 | | /// # #[cfg(not(target_family = "wasm"))] |
1499 | | /// # { |
1500 | | /// use tokio::runtime; |
1501 | | /// use std::time::Duration; |
1502 | | /// |
1503 | | /// # #[allow(deprecated)] |
1504 | | /// let rt = runtime::Builder::new_multi_thread() |
1505 | | /// .enable_metrics_poll_time_histogram() |
1506 | | /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100)) |
1507 | | /// .build() |
1508 | | /// .unwrap(); |
1509 | | /// # } |
1510 | | /// ``` |
1511 | | #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")] |
1512 | | pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self { |
1513 | | assert!(resolution > Duration::from_secs(0)); |
1514 | | // Sanity check the argument and also make the cast below safe. |
1515 | | assert!(resolution <= Duration::from_secs(1)); |
1516 | | |
1517 | | let resolution = resolution.as_nanos() as u64; |
1518 | | |
1519 | | self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution); |
1520 | | self |
1521 | | } |
1522 | | |
1523 | | /// Sets the number of buckets for the histogram tracking the |
1524 | | /// distribution of task poll times. |
1525 | | /// |
1526 | | /// The last bucket tracks all greater values that fall out of other |
1527 | | /// ranges. So, configuring the histogram using a linear scale, |
1528 | | /// resolution of 50ms, and 10 buckets, the 10th bucket will track task |
1529 | | /// polls that take more than 450ms to complete. |
1530 | | /// |
1531 | | /// **Default:** 10 |
1532 | | /// |
1533 | | /// # Examples |
1534 | | /// |
1535 | | /// ``` |
1536 | | /// # #[cfg(not(target_family = "wasm"))] |
1537 | | /// # { |
1538 | | /// use tokio::runtime; |
1539 | | /// |
1540 | | /// # #[allow(deprecated)] |
1541 | | /// let rt = runtime::Builder::new_multi_thread() |
1542 | | /// .enable_metrics_poll_time_histogram() |
1543 | | /// .metrics_poll_count_histogram_buckets(15) |
1544 | | /// .build() |
1545 | | /// .unwrap(); |
1546 | | /// # } |
1547 | | /// ``` |
1548 | | #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")] |
1549 | | pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self { |
1550 | | self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets); |
1551 | | self |
1552 | | } |
1553 | | } |
1554 | | |
1555 | 0 | fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> { |
1556 | | use crate::runtime::runtime::Scheduler; |
1557 | | |
1558 | 0 | let (scheduler, handle, blocking_pool) = |
1559 | 0 | self.build_current_thread_runtime_components(None)?; |
1560 | | |
1561 | 0 | Ok(Runtime::from_parts( |
1562 | 0 | Scheduler::CurrentThread(scheduler), |
1563 | 0 | handle, |
1564 | 0 | blocking_pool, |
1565 | 0 | )) |
1566 | 0 | } |
1567 | | |
1568 | | #[cfg(tokio_unstable)] |
1569 | | fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> { |
1570 | | use crate::runtime::local_runtime::LocalRuntimeScheduler; |
1571 | | |
1572 | | let tid = std::thread::current().id(); |
1573 | | |
1574 | | let (scheduler, handle, blocking_pool) = |
1575 | | self.build_current_thread_runtime_components(Some(tid))?; |
1576 | | |
1577 | | Ok(LocalRuntime::from_parts( |
1578 | | LocalRuntimeScheduler::CurrentThread(scheduler), |
1579 | | handle, |
1580 | | blocking_pool, |
1581 | | )) |
1582 | | } |
1583 | | |
1584 | 0 | fn build_current_thread_runtime_components( |
1585 | 0 | &mut self, |
1586 | 0 | local_tid: Option<ThreadId>, |
1587 | 0 | ) -> io::Result<(CurrentThread, Handle, BlockingPool)> { |
1588 | | use crate::runtime::scheduler; |
1589 | | use crate::runtime::Config; |
1590 | | |
1591 | 0 | let mut cfg = self.get_cfg(); |
1592 | 0 | cfg.timer_flavor = TimerFlavor::Traditional; |
1593 | 0 | let (driver, driver_handle) = driver::Driver::new(cfg)?; |
1594 | | |
1595 | | // Blocking pool |
1596 | 0 | let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); |
1597 | 0 | let blocking_spawner = blocking_pool.spawner().clone(); |
1598 | | |
1599 | | // Generate a rng seed for this runtime. |
1600 | 0 | let seed_generator_1 = self.seed_generator.next_generator(); |
1601 | 0 | let seed_generator_2 = self.seed_generator.next_generator(); |
1602 | | |
1603 | | // And now put a single-threaded scheduler on top of the timer. When |
1604 | | // there are no futures ready to do something, it'll let the timer or |
1605 | | // the reactor to generate some new stimuli for the futures to continue |
1606 | | // in their life. |
1607 | 0 | let (scheduler, handle) = CurrentThread::new( |
1608 | 0 | driver, |
1609 | 0 | driver_handle, |
1610 | 0 | blocking_spawner, |
1611 | 0 | seed_generator_2, |
1612 | 0 | Config { |
1613 | 0 | before_park: self.before_park.clone(), |
1614 | 0 | after_unpark: self.after_unpark.clone(), |
1615 | 0 | before_spawn: self.before_spawn.clone(), |
1616 | 0 | #[cfg(tokio_unstable)] |
1617 | 0 | before_poll: self.before_poll.clone(), |
1618 | 0 | #[cfg(tokio_unstable)] |
1619 | 0 | after_poll: self.after_poll.clone(), |
1620 | 0 | after_termination: self.after_termination.clone(), |
1621 | 0 | global_queue_interval: self.global_queue_interval, |
1622 | 0 | event_interval: self.event_interval, |
1623 | 0 | #[cfg(tokio_unstable)] |
1624 | 0 | unhandled_panic: self.unhandled_panic.clone(), |
1625 | 0 | disable_lifo_slot: self.disable_lifo_slot, |
1626 | 0 | seed_generator: seed_generator_1, |
1627 | 0 | metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), |
1628 | 0 | }, |
1629 | 0 | local_tid, |
1630 | 0 | ); |
1631 | | |
1632 | 0 | let handle = Handle { |
1633 | 0 | inner: scheduler::Handle::CurrentThread(handle), |
1634 | 0 | }; |
1635 | | |
1636 | 0 | Ok((scheduler, handle, blocking_pool)) |
1637 | 0 | } |
1638 | | |
1639 | 0 | fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> { |
1640 | 0 | if self.metrics_poll_count_histogram_enable { |
1641 | 0 | Some(self.metrics_poll_count_histogram.clone()) |
1642 | | } else { |
1643 | 0 | None |
1644 | | } |
1645 | 0 | } |
1646 | | } |
1647 | | |
1648 | | cfg_io_driver! { |
1649 | | impl Builder { |
1650 | | /// Enables the I/O driver. |
1651 | | /// |
1652 | | /// Doing this enables using net, process, signal, and some I/O types on |
1653 | | /// the runtime. |
1654 | | /// |
1655 | | /// # Examples |
1656 | | /// |
1657 | | /// ``` |
1658 | | /// use tokio::runtime; |
1659 | | /// |
1660 | | /// let rt = runtime::Builder::new_multi_thread() |
1661 | | /// .enable_io() |
1662 | | /// .build() |
1663 | | /// .unwrap(); |
1664 | | /// ``` |
1665 | | pub fn enable_io(&mut self) -> &mut Self { |
1666 | | self.enable_io = true; |
1667 | | self |
1668 | | } |
1669 | | |
1670 | | /// Enables the I/O driver and configures the max number of events to be |
1671 | | /// processed per tick. |
1672 | | /// |
1673 | | /// # Examples |
1674 | | /// |
1675 | | /// ``` |
1676 | | /// use tokio::runtime; |
1677 | | /// |
1678 | | /// let rt = runtime::Builder::new_current_thread() |
1679 | | /// .enable_io() |
1680 | | /// .max_io_events_per_tick(1024) |
1681 | | /// .build() |
1682 | | /// .unwrap(); |
1683 | | /// ``` |
1684 | | pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self { |
1685 | | self.nevents = capacity; |
1686 | | self |
1687 | | } |
1688 | | } |
1689 | | } |
1690 | | |
1691 | | cfg_time! { |
1692 | | impl Builder { |
1693 | | /// Enables the time driver. |
1694 | | /// |
1695 | | /// Doing this enables using `tokio::time` on the runtime. |
1696 | | /// |
1697 | | /// # Examples |
1698 | | /// |
1699 | | /// ``` |
1700 | | /// # #[cfg(not(target_family = "wasm"))] |
1701 | | /// # { |
1702 | | /// use tokio::runtime; |
1703 | | /// |
1704 | | /// let rt = runtime::Builder::new_multi_thread() |
1705 | | /// .enable_time() |
1706 | | /// .build() |
1707 | | /// .unwrap(); |
1708 | | /// # } |
1709 | | /// ``` |
1710 | 0 | pub fn enable_time(&mut self) -> &mut Self { |
1711 | 0 | self.enable_time = true; |
1712 | 0 | self |
1713 | 0 | } |
1714 | | } |
1715 | | } |
1716 | | |
1717 | | cfg_io_uring! { |
1718 | | impl Builder { |
1719 | | /// Enables the tokio's io_uring driver. |
1720 | | /// |
1721 | | /// Doing this enables using io_uring operations on the runtime. |
1722 | | /// |
1723 | | /// # Examples |
1724 | | /// |
1725 | | /// ``` |
1726 | | /// use tokio::runtime; |
1727 | | /// |
1728 | | /// let rt = runtime::Builder::new_multi_thread() |
1729 | | /// .enable_io_uring() |
1730 | | /// .build() |
1731 | | /// .unwrap(); |
1732 | | /// ``` |
1733 | | #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))] |
1734 | | pub fn enable_io_uring(&mut self) -> &mut Self { |
1735 | | // Currently, the uring flag is equivalent to `enable_io`. |
1736 | | self.enable_io = true; |
1737 | | self |
1738 | | } |
1739 | | } |
1740 | | } |
1741 | | |
1742 | | cfg_test_util! { |
1743 | | impl Builder { |
1744 | | /// Controls if the runtime's clock starts paused or advancing. |
1745 | | /// |
1746 | | /// Pausing time requires the current-thread runtime; construction of |
1747 | | /// the runtime will panic otherwise. |
1748 | | /// |
1749 | | /// # Examples |
1750 | | /// |
1751 | | /// ``` |
1752 | | /// use tokio::runtime; |
1753 | | /// |
1754 | | /// let rt = runtime::Builder::new_current_thread() |
1755 | | /// .enable_time() |
1756 | | /// .start_paused(true) |
1757 | | /// .build() |
1758 | | /// .unwrap(); |
1759 | | /// ``` |
1760 | 0 | pub fn start_paused(&mut self, start_paused: bool) -> &mut Self { |
1761 | 0 | self.start_paused = start_paused; |
1762 | 0 | self |
1763 | 0 | } |
1764 | | } |
1765 | | } |
1766 | | |
1767 | | cfg_rt_multi_thread! { |
1768 | | impl Builder { |
1769 | | fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { |
1770 | | use crate::loom::sys::num_cpus; |
1771 | | use crate::runtime::{Config, runtime::Scheduler}; |
1772 | | use crate::runtime::scheduler::{self, MultiThread}; |
1773 | | |
1774 | | let worker_threads = self.worker_threads.unwrap_or_else(num_cpus); |
1775 | | |
1776 | | let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; |
1777 | | |
1778 | | // Create the blocking pool |
1779 | | let blocking_pool = |
1780 | | blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads); |
1781 | | let blocking_spawner = blocking_pool.spawner().clone(); |
1782 | | |
1783 | | // Generate a rng seed for this runtime. |
1784 | | let seed_generator_1 = self.seed_generator.next_generator(); |
1785 | | let seed_generator_2 = self.seed_generator.next_generator(); |
1786 | | |
1787 | | let (scheduler, handle, launch) = MultiThread::new( |
1788 | | worker_threads, |
1789 | | driver, |
1790 | | driver_handle, |
1791 | | blocking_spawner, |
1792 | | seed_generator_2, |
1793 | | Config { |
1794 | | before_park: self.before_park.clone(), |
1795 | | after_unpark: self.after_unpark.clone(), |
1796 | | before_spawn: self.before_spawn.clone(), |
1797 | | #[cfg(tokio_unstable)] |
1798 | | before_poll: self.before_poll.clone(), |
1799 | | #[cfg(tokio_unstable)] |
1800 | | after_poll: self.after_poll.clone(), |
1801 | | after_termination: self.after_termination.clone(), |
1802 | | global_queue_interval: self.global_queue_interval, |
1803 | | event_interval: self.event_interval, |
1804 | | #[cfg(tokio_unstable)] |
1805 | | unhandled_panic: self.unhandled_panic.clone(), |
1806 | | disable_lifo_slot: self.disable_lifo_slot, |
1807 | | seed_generator: seed_generator_1, |
1808 | | metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), |
1809 | | }, |
1810 | | self.timer_flavor, |
1811 | | ); |
1812 | | |
1813 | | let handle = Handle { inner: scheduler::Handle::MultiThread(handle) }; |
1814 | | |
1815 | | // Spawn the thread pool workers |
1816 | | let _enter = handle.enter(); |
1817 | | launch.launch(); |
1818 | | |
1819 | | Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool)) |
1820 | | } |
1821 | | } |
1822 | | } |
1823 | | |
1824 | | impl fmt::Debug for Builder { |
1825 | 0 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
1826 | 0 | fmt.debug_struct("Builder") |
1827 | 0 | .field("worker_threads", &self.worker_threads) |
1828 | 0 | .field("max_blocking_threads", &self.max_blocking_threads) |
1829 | 0 | .field( |
1830 | 0 | "thread_name", |
1831 | 0 | &"<dyn Fn() -> String + Send + Sync + 'static>", |
1832 | 0 | ) |
1833 | 0 | .field("thread_stack_size", &self.thread_stack_size) |
1834 | 0 | .field("after_start", &self.after_start.as_ref().map(|_| "...")) |
1835 | 0 | .field("before_stop", &self.before_stop.as_ref().map(|_| "...")) |
1836 | 0 | .field("before_park", &self.before_park.as_ref().map(|_| "...")) |
1837 | 0 | .field("after_unpark", &self.after_unpark.as_ref().map(|_| "...")) |
1838 | 0 | .finish() |
1839 | 0 | } |
1840 | | } |