Coverage Report

Created: 2025-02-21 07:11

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.43.0/src/runtime/driver.rs
Line
Count
Source (jump to first uncovered line)
1
//! Abstracts out the entire chain of runtime sub-drivers into common types.
2
3
// Eventually, this file will see significant refactoring / cleanup. For now, we
4
// don't need to worry much about dead code with certain feature permutations.
5
#![cfg_attr(
6
    any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
7
    allow(dead_code)
8
)]
9
10
use crate::runtime::park::{ParkThread, UnparkThread};
11
12
use std::io;
13
use std::time::Duration;
14
15
#[derive(Debug)]
16
pub(crate) struct Driver {
17
    inner: TimeDriver,
18
}
19
20
#[derive(Debug)]
21
pub(crate) struct Handle {
22
    /// IO driver handle
23
    pub(crate) io: IoHandle,
24
25
    /// Signal driver handle
26
    #[cfg_attr(any(not(unix), loom), allow(dead_code))]
27
    pub(crate) signal: SignalHandle,
28
29
    /// Time driver handle
30
    pub(crate) time: TimeHandle,
31
32
    /// Source of `Instant::now()`
33
    #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
34
    pub(crate) clock: Clock,
35
}
36
37
pub(crate) struct Cfg {
38
    pub(crate) enable_io: bool,
39
    pub(crate) enable_time: bool,
40
    pub(crate) enable_pause_time: bool,
41
    pub(crate) start_paused: bool,
42
    pub(crate) nevents: usize,
43
    pub(crate) workers: usize,
44
}
45
46
impl Driver {
47
21.4k
    pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
48
21.4k
        let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
49
50
21.4k
        let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
51
21.4k
52
21.4k
        let (time_driver, time_handle) =
53
21.4k
            create_time_driver(cfg.enable_time, io_stack, &clock, cfg.workers);
54
21.4k
55
21.4k
        Ok((
56
21.4k
            Self { inner: time_driver },
57
21.4k
            Handle {
58
21.4k
                io: io_handle,
59
21.4k
                signal: signal_handle,
60
21.4k
                time: time_handle,
61
21.4k
                clock,
62
21.4k
            },
63
21.4k
        ))
64
21.4k
    }
65
66
0
    pub(crate) fn is_enabled(&self) -> bool {
67
0
        self.inner.is_enabled()
68
0
    }
69
70
22.2k
    pub(crate) fn park(&mut self, handle: &Handle) {
71
22.2k
        self.inner.park(handle);
72
22.2k
    }
73
74
126
    pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
75
126
        self.inner.park_timeout(handle, duration);
76
126
    }
77
78
21.4k
    pub(crate) fn shutdown(&mut self, handle: &Handle) {
79
21.4k
        self.inner.shutdown(handle);
80
21.4k
    }
81
}
82
83
impl Handle {
84
25.0k
    pub(crate) fn unpark(&self) {
85
        #[cfg(feature = "time")]
86
25.0k
        if let Some(handle) = &self.time {
87
25.0k
            handle.unpark();
88
25.0k
        }
89
90
25.0k
        self.io.unpark();
91
25.0k
    }
92
93
    cfg_io_driver! {
94
        #[track_caller]
95
        pub(crate) fn io(&self) -> &crate::runtime::io::Handle {
96
            self.io
97
                .as_ref()
98
                .expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
99
        }
100
    }
101
102
    cfg_signal_internal_and_unix! {
103
        #[track_caller]
104
        pub(crate) fn signal(&self) -> &crate::runtime::signal::Handle {
105
            self.signal
106
                .as_ref()
107
                .expect("there is no signal driver running, must be called from the context of Tokio runtime")
108
        }
109
    }
110
111
    cfg_time! {
112
        /// Returns a reference to the time driver handle.
113
        ///
114
        /// Panics if no time driver is present.
115
        #[track_caller]
116
88.4k
        pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
117
88.4k
            self.time
118
88.4k
                .as_ref()
119
88.4k
                .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
120
88.4k
        }
121
122
22.3k
        pub(crate) fn clock(&self) -> &Clock {
123
22.3k
            &self.clock
124
22.3k
        }
125
    }
126
}
127
128
// ===== io driver =====
129
130
cfg_io_driver! {
131
    pub(crate) type IoDriver = crate::runtime::io::Driver;
132
133
    #[derive(Debug)]
134
    pub(crate) enum IoStack {
135
        Enabled(ProcessDriver),
136
        Disabled(ParkThread),
137
    }
138
139
    #[derive(Debug)]
140
    pub(crate) enum IoHandle {
141
        Enabled(crate::runtime::io::Handle),
142
        Disabled(UnparkThread),
143
    }
144
145
    fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
146
        #[cfg(loom)]
147
        assert!(!enabled);
148
149
        let ret = if enabled {
150
            let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
151
152
            let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
153
            let process_driver = create_process_driver(signal_driver);
154
155
            (IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle)
156
        } else {
157
            let park_thread = ParkThread::new();
158
            let unpark_thread = park_thread.unpark();
159
            (IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), Default::default())
160
        };
161
162
        Ok(ret)
163
    }
164
165
    impl IoStack {
166
        pub(crate) fn is_enabled(&self) -> bool {
167
            match self {
168
                IoStack::Enabled(..) => true,
169
                IoStack::Disabled(..) => false,
170
            }
171
        }
172
173
        pub(crate) fn park(&mut self, handle: &Handle) {
174
            match self {
175
                IoStack::Enabled(v) => v.park(handle),
176
                IoStack::Disabled(v) => v.park(),
177
            }
178
        }
179
180
        pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
181
            match self {
182
                IoStack::Enabled(v) => v.park_timeout(handle, duration),
183
                IoStack::Disabled(v) => v.park_timeout(duration),
184
            }
185
        }
186
187
        pub(crate) fn shutdown(&mut self, handle: &Handle) {
188
            match self {
189
                IoStack::Enabled(v) => v.shutdown(handle),
190
                IoStack::Disabled(v) => v.shutdown(),
191
            }
192
        }
193
    }
194
195
    impl IoHandle {
196
        pub(crate) fn unpark(&self) {
197
            match self {
198
                IoHandle::Enabled(handle) => handle.unpark(),
199
                IoHandle::Disabled(handle) => handle.unpark(),
200
            }
201
        }
202
203
        pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
204
            match self {
205
                IoHandle::Enabled(v) => Some(v),
206
                IoHandle::Disabled(..) => None,
207
            }
208
        }
209
    }
210
}
211
212
cfg_not_io_driver! {
213
    pub(crate) type IoHandle = UnparkThread;
214
215
    #[derive(Debug)]
216
    pub(crate) struct IoStack(ParkThread);
217
218
21.4k
    fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
219
21.4k
        let park_thread = ParkThread::new();
220
21.4k
        let unpark_thread = park_thread.unpark();
221
21.4k
        Ok((IoStack(park_thread), unpark_thread, Default::default()))
222
21.4k
    }
223
224
    impl IoStack {
225
22.2k
        pub(crate) fn park(&mut self, _handle: &Handle) {
226
22.2k
            self.0.park();
227
22.2k
        }
228
229
126
        pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) {
230
126
            self.0.park_timeout(duration);
231
126
        }
232
233
21.4k
        pub(crate) fn shutdown(&mut self, _handle: &Handle) {
234
21.4k
            self.0.shutdown();
235
21.4k
        }
236
237
        /// This is not a "real" driver, so it is not considered enabled.
238
0
        pub(crate) fn is_enabled(&self) -> bool {
239
0
            false
240
0
        }
241
    }
242
}
243
244
// ===== signal driver =====
245
246
cfg_signal_internal_and_unix! {
247
    type SignalDriver = crate::runtime::signal::Driver;
248
    pub(crate) type SignalHandle = Option<crate::runtime::signal::Handle>;
249
250
    fn create_signal_driver(io_driver: IoDriver, io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
251
        let driver = crate::runtime::signal::Driver::new(io_driver, io_handle)?;
252
        let handle = driver.handle();
253
        Ok((driver, Some(handle)))
254
    }
255
}
256
257
cfg_not_signal_internal! {
258
    pub(crate) type SignalHandle = ();
259
260
    cfg_io_driver! {
261
        type SignalDriver = IoDriver;
262
263
        fn create_signal_driver(io_driver: IoDriver, _io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
264
            Ok((io_driver, ()))
265
        }
266
    }
267
}
268
269
// ===== process driver =====
270
271
cfg_process_driver! {
272
    type ProcessDriver = crate::runtime::process::Driver;
273
274
    fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
275
        ProcessDriver::new(signal_driver)
276
    }
277
}
278
279
cfg_not_process_driver! {
280
    cfg_io_driver! {
281
        type ProcessDriver = SignalDriver;
282
283
        fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
284
            signal_driver
285
        }
286
    }
287
}
288
289
// ===== time driver =====
290
291
cfg_time! {
292
    #[derive(Debug)]
293
    pub(crate) enum TimeDriver {
294
        Enabled {
295
            driver: crate::runtime::time::Driver,
296
        },
297
        Disabled(IoStack),
298
    }
299
300
    pub(crate) type Clock = crate::time::Clock;
301
    pub(crate) type TimeHandle = Option<crate::runtime::time::Handle>;
302
303
21.4k
    fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
304
21.4k
        crate::time::Clock::new(enable_pausing, start_paused)
305
21.4k
    }
306
307
21.4k
    fn create_time_driver(
308
21.4k
        enable: bool,
309
21.4k
        io_stack: IoStack,
310
21.4k
        clock: &Clock,
311
21.4k
        workers: usize,
312
21.4k
    ) -> (TimeDriver, TimeHandle) {
313
21.4k
        if enable {
314
21.4k
            let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock, workers as u32);
315
21.4k
316
21.4k
            (TimeDriver::Enabled { driver }, Some(handle))
317
        } else {
318
0
            (TimeDriver::Disabled(io_stack), None)
319
        }
320
21.4k
    }
321
322
    impl TimeDriver {
323
0
        pub(crate) fn is_enabled(&self) -> bool {
324
0
            match self {
325
0
                TimeDriver::Enabled { .. } => true,
326
0
                TimeDriver::Disabled(inner) => inner.is_enabled(),
327
            }
328
0
        }
329
330
22.2k
        pub(crate) fn park(&mut self, handle: &Handle) {
331
22.2k
            match self {
332
22.2k
                TimeDriver::Enabled { driver, .. } => driver.park(handle),
333
0
                TimeDriver::Disabled(v) => v.park(handle),
334
            }
335
22.2k
        }
336
337
126
        pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
338
126
            match self {
339
126
                TimeDriver::Enabled { driver } => driver.park_timeout(handle, duration),
340
0
                TimeDriver::Disabled(v) => v.park_timeout(handle, duration),
341
            }
342
126
        }
343
344
21.4k
        pub(crate) fn shutdown(&mut self, handle: &Handle) {
345
21.4k
            match self {
346
21.4k
                TimeDriver::Enabled { driver } => driver.shutdown(handle),
347
0
                TimeDriver::Disabled(v) => v.shutdown(handle),
348
            }
349
21.4k
        }
350
    }
351
}
352
353
cfg_not_time! {
354
    type TimeDriver = IoStack;
355
356
    pub(crate) type Clock = ();
357
    pub(crate) type TimeHandle = ();
358
359
    fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock {
360
        ()
361
    }
362
363
    fn create_time_driver(
364
        _enable: bool,
365
        io_stack: IoStack,
366
        _clock: &Clock,
367
        _workers: usize,
368
    ) -> (TimeDriver, TimeHandle) {
369
        (io_stack, ())
370
    }
371
}