Coverage Report

Created: 2025-12-31 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/crosvm/devices/src/virtio/console/worker.rs
Line
Count
Source
1
// Copyright 2024 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
//! Virtio console device worker thread.
6
7
use std::collections::BTreeMap;
8
use std::collections::VecDeque;
9
use std::sync::mpsc;
10
use std::sync::Arc;
11
12
use anyhow::anyhow;
13
use anyhow::Context;
14
use base::error;
15
use base::Event;
16
use base::EventToken;
17
use base::WaitContext;
18
use base::WorkerThread;
19
use sync::Mutex;
20
21
use crate::virtio::console::control::process_control_receive_queue;
22
use crate::virtio::console::control::process_control_transmit_queue;
23
use crate::virtio::console::control::ControlMsgBytes;
24
use crate::virtio::console::input::process_receive_queue;
25
use crate::virtio::console::output::process_transmit_queue;
26
use crate::virtio::console::port::ConsolePort;
27
use crate::virtio::console::port::ConsolePortInfo;
28
use crate::virtio::Queue;
29
30
const PORT0_RECEIVEQ_IDX: usize = 0;
31
const PORT0_TRANSMITQ_IDX: usize = 1;
32
const CONTROL_RECEIVEQ_IDX: usize = 2;
33
const CONTROL_TRANSMITQ_IDX: usize = 3;
34
const PORT1_RECEIVEQ_IDX: usize = 4;
35
const PORT1_TRANSMITQ_IDX: usize = 5;
36
37
pub struct WorkerPort {
38
    info: Option<ConsolePortInfo>,
39
40
    in_avail_evt: Event,
41
    input_buffer: Arc<Mutex<VecDeque<u8>>>,
42
    output: Box<dyn std::io::Write + Send>,
43
}
44
45
impl WorkerPort {
46
0
    pub fn from_console_port(port: &mut ConsolePort) -> WorkerPort {
47
0
        let in_avail_evt = port.clone_in_avail_evt().unwrap();
48
0
        let input_buffer = port.clone_input_buffer();
49
0
        let output = port
50
0
            .take_output()
51
0
            .unwrap_or_else(|| Box::new(std::io::sink()));
52
0
        let info = port.port_info().cloned();
53
0
        WorkerPort {
54
0
            info,
55
0
            in_avail_evt,
56
0
            input_buffer,
57
0
            output,
58
0
        }
59
0
    }
60
61
    /// Restore the state retrieved from `ConsolePort` by `WorkerPort::from_console_port()`.
62
0
    pub fn into_console_port(self, console_port: &mut ConsolePort) {
63
0
        console_port.restore_output(self.output);
64
0
    }
65
66
0
    pub fn is_console(&self) -> bool {
67
0
        self.info
68
0
            .as_ref()
69
0
            .map(|info| info.console)
70
0
            .unwrap_or_default()
71
0
    }
72
73
0
    pub fn name(&self) -> Option<&str> {
74
0
        self.info.as_ref().and_then(ConsolePortInfo::name)
75
0
    }
76
}
77
78
#[derive(EventToken)]
79
enum Token {
80
    ReceiveQueueAvailable(u32),
81
    TransmitQueueAvailable(u32),
82
    InputAvailable(u32),
83
    ControlReceiveQueueAvailable,
84
    ControlTransmitQueueAvailable,
85
    WorkerRequest,
86
    Kill,
87
}
88
89
pub enum WorkerRequest {
90
    StartQueue {
91
        idx: usize,
92
        queue: Queue,
93
        response_sender: mpsc::SyncSender<anyhow::Result<()>>,
94
    },
95
    StopQueue {
96
        idx: usize,
97
        response_sender: mpsc::SyncSender<Option<Queue>>,
98
    },
99
}
100
101
pub struct Worker {
102
    wait_ctx: WaitContext<Token>,
103
104
    // Currently running queues.
105
    queues: BTreeMap<usize, Queue>,
106
107
    // Console ports indexed by port ID. At least port 0 will exist, and other ports may be
108
    // available if `VIRTIO_CONSOLE_F_MULTIPORT` is enabled.
109
    ports: Vec<WorkerPort>,
110
111
    // Device-to-driver messages to be received by the driver via the control receiveq.
112
    pending_receive_control_msgs: VecDeque<ControlMsgBytes>,
113
114
    worker_receiver: mpsc::Receiver<WorkerRequest>,
115
    worker_event: Event,
116
}
117
118
impl Worker {
119
0
    pub fn new(
120
0
        ports: Vec<WorkerPort>,
121
0
        worker_receiver: mpsc::Receiver<WorkerRequest>,
122
0
        worker_event: Event,
123
0
    ) -> anyhow::Result<Self> {
124
0
        let wait_ctx = WaitContext::new().context("WaitContext::new() failed")?;
125
126
0
        wait_ctx.add(&worker_event, Token::WorkerRequest)?;
127
128
0
        for (index, port) in ports.iter().enumerate() {
129
0
            let port_id = index as u32;
130
0
            wait_ctx.add(&port.in_avail_evt, Token::InputAvailable(port_id))?;
131
        }
132
133
0
        Ok(Worker {
134
0
            wait_ctx,
135
0
            queues: BTreeMap::new(),
136
0
            ports,
137
0
            pending_receive_control_msgs: VecDeque::new(),
138
0
            worker_receiver,
139
0
            worker_event,
140
0
        })
141
0
    }
142
143
0
    pub fn run(&mut self, kill_evt: &Event) -> anyhow::Result<()> {
144
0
        self.wait_ctx.add(kill_evt, Token::Kill)?;
145
0
        let res = self.run_loop();
146
0
        self.wait_ctx.delete(kill_evt)?;
147
0
        res
148
0
    }
149
150
0
    fn run_loop(&mut self) -> anyhow::Result<()> {
151
0
        let mut running = true;
152
0
        while running {
153
0
            let events = self.wait_ctx.wait()?;
154
155
0
            for event in events.iter().filter(|e| e.is_readable) {
156
0
                match event.token {
157
0
                    Token::TransmitQueueAvailable(port_id) => {
158
0
                        if let (Some(port), Some(transmitq)) = (
159
0
                            self.ports.get_mut(port_id as usize),
160
0
                            transmitq_idx(port_id).and_then(|idx| self.queues.get_mut(&idx)),
161
                        ) {
162
0
                            transmitq
163
0
                                .event()
164
0
                                .wait()
165
0
                                .context("failed reading transmit queue Event")?;
166
0
                            process_transmit_queue(transmitq, &mut port.output);
167
0
                        }
168
                    }
169
0
                    Token::ReceiveQueueAvailable(port_id) | Token::InputAvailable(port_id) => {
170
0
                        let port = self.ports.get_mut(port_id as usize);
171
0
                        let receiveq =
172
0
                            receiveq_idx(port_id).and_then(|idx| self.queues.get_mut(&idx));
173
174
0
                        let event = if matches!(event.token, Token::ReceiveQueueAvailable(..)) {
175
0
                            receiveq.as_ref().map(|q| q.event())
176
                        } else {
177
0
                            port.as_ref().map(|p| &p.in_avail_evt)
178
                        };
179
0
                        if let Some(event) = event {
180
0
                            event.wait().context("failed to clear receive event")?;
181
0
                        }
182
183
0
                        if let (Some(port), Some(receiveq)) = (port, receiveq) {
184
0
                            let mut input_buffer = port.input_buffer.lock();
185
0
                            process_receive_queue(&mut input_buffer, receiveq);
186
0
                        }
187
                    }
188
                    Token::ControlReceiveQueueAvailable => {
189
0
                        if let Some(ctrl_receiveq) = self.queues.get_mut(&CONTROL_RECEIVEQ_IDX) {
190
0
                            ctrl_receiveq
191
0
                                .event()
192
0
                                .wait()
193
0
                                .context("failed waiting on control event")?;
194
0
                            process_control_receive_queue(
195
0
                                ctrl_receiveq,
196
0
                                &mut self.pending_receive_control_msgs,
197
                            );
198
0
                        }
199
                    }
200
                    Token::ControlTransmitQueueAvailable => {
201
0
                        if let Some(ctrl_transmitq) = self.queues.get_mut(&CONTROL_TRANSMITQ_IDX) {
202
0
                            ctrl_transmitq
203
0
                                .event()
204
0
                                .wait()
205
0
                                .context("failed waiting on control event")?;
206
0
                            process_control_transmit_queue(
207
0
                                ctrl_transmitq,
208
0
                                &self.ports,
209
0
                                &mut self.pending_receive_control_msgs,
210
                            );
211
0
                        }
212
213
                        // Attempt to send any new replies if there is space in the receiveq.
214
0
                        if let Some(ctrl_receiveq) = self.queues.get_mut(&CONTROL_RECEIVEQ_IDX) {
215
0
                            process_control_receive_queue(
216
0
                                ctrl_receiveq,
217
0
                                &mut self.pending_receive_control_msgs,
218
                            )
219
0
                        }
220
                    }
221
                    Token::WorkerRequest => {
222
0
                        self.worker_event.wait()?;
223
0
                        self.process_worker_requests();
224
                    }
225
0
                    Token::Kill => running = false,
226
                }
227
            }
228
        }
229
0
        Ok(())
230
0
    }
231
232
0
    fn process_worker_requests(&mut self) {
233
0
        while let Ok(request) = self.worker_receiver.try_recv() {
234
0
            match request {
235
                WorkerRequest::StartQueue {
236
0
                    idx,
237
0
                    queue,
238
0
                    response_sender,
239
0
                } => {
240
0
                    let res = self.start_queue(idx, queue);
241
0
                    let _ = response_sender.send(res);
242
0
                }
243
                WorkerRequest::StopQueue {
244
0
                    idx,
245
0
                    response_sender,
246
0
                } => {
247
0
                    let res = self.stop_queue(idx);
248
0
                    let _ = response_sender.send(res);
249
0
                }
250
            }
251
        }
252
0
    }
253
254
0
    fn start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()> {
255
0
        if let Some(port_id) = receiveq_port_id(idx) {
256
0
            self.wait_ctx
257
0
                .add(queue.event(), Token::ReceiveQueueAvailable(port_id))?;
258
0
        } else if let Some(port_id) = transmitq_port_id(idx) {
259
0
            self.wait_ctx
260
0
                .add(queue.event(), Token::TransmitQueueAvailable(port_id))?;
261
0
        } else if idx == CONTROL_RECEIVEQ_IDX {
262
0
            self.wait_ctx
263
0
                .add(queue.event(), Token::ControlReceiveQueueAvailable)?;
264
0
        } else if idx == CONTROL_TRANSMITQ_IDX {
265
0
            self.wait_ctx
266
0
                .add(queue.event(), Token::ControlTransmitQueueAvailable)?;
267
        } else {
268
0
            return Err(anyhow!("unhandled queue idx {idx}"));
269
        }
270
271
0
        let prev = self.queues.insert(idx, queue);
272
0
        assert!(prev.is_none());
273
0
        Ok(())
274
0
    }
275
276
0
    fn stop_queue(&mut self, idx: usize) -> Option<Queue> {
277
0
        if let Some(queue) = self.queues.remove(&idx) {
278
0
            let _ = self.wait_ctx.delete(queue.event());
279
0
            Some(queue)
280
        } else {
281
0
            None
282
        }
283
0
    }
284
}
285
286
pub struct WorkerHandle {
287
    worker_thread: WorkerThread<Vec<WorkerPort>>,
288
    worker_sender: mpsc::Sender<WorkerRequest>,
289
    worker_event: Event,
290
}
291
292
impl WorkerHandle {
293
0
    pub fn new(ports: Vec<WorkerPort>) -> anyhow::Result<Self> {
294
0
        let worker_event = Event::new().context("Event::new")?;
295
0
        let worker_event_clone = worker_event.try_clone().context("Event::try_clone")?;
296
0
        let (worker_sender, worker_receiver) = mpsc::channel();
297
0
        let worker_thread = WorkerThread::start("v_console", move |kill_evt| {
298
0
            let mut worker = Worker::new(ports, worker_receiver, worker_event_clone)
299
0
                .expect("console Worker::new() failed");
300
0
            if let Err(e) = worker.run(&kill_evt) {
301
0
                error!("console worker failed: {:#}", e);
302
0
            }
303
0
            worker.ports
304
0
        });
305
0
        Ok(WorkerHandle {
306
0
            worker_thread,
307
0
            worker_sender,
308
0
            worker_event,
309
0
        })
310
0
    }
311
312
0
    pub fn start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()> {
313
0
        let (response_sender, response_receiver) = mpsc::sync_channel(0);
314
0
        self.worker_sender
315
0
            .send(WorkerRequest::StartQueue {
316
0
                idx,
317
0
                queue,
318
0
                response_sender,
319
0
            })
320
0
            .context("mpsc::Sender::send")?;
321
0
        self.worker_event.signal().context("Event::signal")?;
322
0
        response_receiver.recv().context("mpsc::Receiver::recv")?
323
0
    }
324
325
0
    pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Option<Queue>> {
326
0
        let (response_sender, response_receiver) = mpsc::sync_channel(0);
327
0
        self.worker_sender
328
0
            .send(WorkerRequest::StopQueue {
329
0
                idx,
330
0
                response_sender,
331
0
            })
332
0
            .context("mpsc::Sender::send")?;
333
0
        self.worker_event.signal().context("Event::signal")?;
334
0
        response_receiver.recv().context("mpsc::Receiver::recv")
335
0
    }
336
337
0
    pub fn stop(self) -> Vec<WorkerPort> {
338
0
        self.worker_thread.stop()
339
0
    }
340
}
341
342
0
fn receiveq_idx(port_id: u32) -> Option<usize> {
343
0
    if port_id == 0 {
344
0
        Some(PORT0_RECEIVEQ_IDX)
345
    } else {
346
0
        PORT1_RECEIVEQ_IDX.checked_add((port_id - 1).checked_mul(2)?.try_into().ok()?)
347
    }
348
0
}
349
350
0
fn transmitq_idx(port_id: u32) -> Option<usize> {
351
0
    if port_id == 0 {
352
0
        Some(PORT0_TRANSMITQ_IDX)
353
    } else {
354
0
        PORT1_TRANSMITQ_IDX.checked_add((port_id - 1).checked_mul(2)?.try_into().ok()?)
355
    }
356
0
}
357
358
0
fn receiveq_port_id(queue_idx: usize) -> Option<u32> {
359
0
    if queue_idx == PORT0_RECEIVEQ_IDX {
360
0
        Some(0)
361
0
    } else if queue_idx >= PORT1_RECEIVEQ_IDX && (queue_idx & 1) == 0 {
362
0
        ((queue_idx - PORT1_RECEIVEQ_IDX) / 2)
363
0
            .checked_add(1)?
364
0
            .try_into()
365
0
            .ok()
366
    } else {
367
0
        None
368
    }
369
0
}
370
371
0
fn transmitq_port_id(queue_idx: usize) -> Option<u32> {
372
0
    if queue_idx == PORT0_TRANSMITQ_IDX {
373
0
        Some(0)
374
0
    } else if queue_idx >= PORT1_TRANSMITQ_IDX && (queue_idx & 1) == 1 {
375
0
        ((queue_idx - PORT1_TRANSMITQ_IDX) / 2)
376
0
            .checked_add(1)?
377
0
            .try_into()
378
0
            .ok()
379
    } else {
380
0
        None
381
    }
382
0
}
383
384
#[cfg(test)]
385
mod tests {
386
    use super::*;
387
388
    #[test]
389
    fn test_receiveq_idx() {
390
        assert_eq!(receiveq_idx(0), Some(0));
391
        assert_eq!(receiveq_idx(1), Some(4));
392
        assert_eq!(receiveq_idx(2), Some(6));
393
        assert_eq!(receiveq_idx(3), Some(8));
394
    }
395
396
    #[test]
397
    fn test_transmitq_idx() {
398
        assert_eq!(transmitq_idx(0), Some(1));
399
        assert_eq!(transmitq_idx(1), Some(5));
400
        assert_eq!(transmitq_idx(2), Some(7));
401
        assert_eq!(transmitq_idx(3), Some(9));
402
    }
403
404
    #[test]
405
    fn test_receiveq_port_id() {
406
        assert_eq!(receiveq_port_id(0), Some(0));
407
        assert_eq!(receiveq_port_id(1), None); // port0 transmitq
408
        assert_eq!(receiveq_port_id(2), None); // ctrl receiveq
409
        assert_eq!(receiveq_port_id(3), None); // ctrl transmitq
410
        assert_eq!(receiveq_port_id(4), Some(1));
411
        assert_eq!(receiveq_port_id(5), None);
412
        assert_eq!(receiveq_port_id(6), Some(2));
413
        assert_eq!(receiveq_port_id(7), None);
414
        assert_eq!(receiveq_port_id(8), Some(3));
415
        assert_eq!(receiveq_port_id(9), None);
416
    }
417
418
    #[test]
419
    fn test_transmitq_port_id() {
420
        assert_eq!(transmitq_port_id(0), None); // port0 receiveq
421
        assert_eq!(transmitq_port_id(1), Some(0));
422
        assert_eq!(transmitq_port_id(2), None); // ctrl receiveq
423
        assert_eq!(transmitq_port_id(3), None); // ctrl transmitq
424
        assert_eq!(transmitq_port_id(4), None); // port1 receiveq
425
        assert_eq!(transmitq_port_id(5), Some(1));
426
        assert_eq!(transmitq_port_id(6), None);
427
        assert_eq!(transmitq_port_id(7), Some(2));
428
        assert_eq!(transmitq_port_id(8), None);
429
        assert_eq!(transmitq_port_id(9), Some(3));
430
    }
431
}