Coverage Report

Created: 2024-05-21 06:19

/rust/registry/src/index.crates.io-6f17d22bba15001f/io-uring-0.6.3/src/squeue.rs
Line
Count
Source (jump to first uncovered line)
1
//! Submission Queue
2
3
use std::error::Error;
4
use std::fmt::{self, Debug, Display, Formatter};
5
use std::mem;
6
use std::sync::atomic;
7
8
use crate::sys;
9
use crate::util::{private, unsync_load, Mmap};
10
11
use bitflags::bitflags;
12
13
pub(crate) struct Inner<E: EntryMarker> {
14
    pub(crate) head: *const atomic::AtomicU32,
15
    pub(crate) tail: *const atomic::AtomicU32,
16
    pub(crate) ring_mask: u32,
17
    pub(crate) ring_entries: u32,
18
    pub(crate) flags: *const atomic::AtomicU32,
19
    dropped: *const atomic::AtomicU32,
20
21
    pub(crate) sqes: *mut E,
22
}
23
24
/// An io_uring instance's submission queue. This is used to send I/O requests to the kernel.
25
pub struct SubmissionQueue<'a, E: EntryMarker = Entry> {
26
    head: u32,
27
    tail: u32,
28
    queue: &'a Inner<E>,
29
}
30
31
/// A submission queue entry (SQE), representing a request for an I/O operation.
32
///
33
/// This is implemented for [`Entry`] and [`Entry128`].
34
pub trait EntryMarker: Clone + Debug + From<Entry> + private::Sealed {
35
    const BUILD_FLAGS: u32;
36
}
37
38
/// A 64-byte submission queue entry (SQE), representing a request for an I/O operation.
39
///
40
/// These can be created via opcodes in [`opcode`](crate::opcode).
41
#[repr(C)]
42
pub struct Entry(pub(crate) sys::io_uring_sqe);
43
44
/// A 128-byte submission queue entry (SQE), representing a request for an I/O operation.
45
///
46
/// These can be created via opcodes in [`opcode`](crate::opcode).
47
#[repr(C)]
48
0
#[derive(Clone)]
49
pub struct Entry128(pub(crate) Entry, pub(crate) [u8; 64]);
50
51
#[test]
52
fn test_entry_sizes() {
53
    assert_eq!(mem::size_of::<Entry>(), 64);
54
    assert_eq!(mem::size_of::<Entry128>(), 128);
55
}
56
57
bitflags! {
58
    /// Submission flags
59
    pub struct Flags: u8 {
60
        /// When this flag is specified,
61
        /// `fd` is an index into the files array registered with the io_uring instance.
62
        #[doc(hidden)]
63
        const FIXED_FILE = 1 << sys::IOSQE_FIXED_FILE_BIT;
64
65
        /// When this flag is specified,
66
        /// the SQE will not be started before previously submitted SQEs have completed,
67
        /// and new SQEs will not be started before this one completes.
68
        const IO_DRAIN = 1 << sys::IOSQE_IO_DRAIN_BIT;
69
70
        /// When this flag is specified,
71
        /// it forms a link with the next SQE in the submission ring.
72
        /// That next SQE will not be started before this one completes.
73
        const IO_LINK = 1 << sys::IOSQE_IO_LINK_BIT;
74
75
        /// Like [`IO_LINK`](Self::IO_LINK), but it doesn’t sever regardless of the completion
76
        /// result.
77
        const IO_HARDLINK = 1 << sys::IOSQE_IO_HARDLINK_BIT;
78
79
        /// Normal operation for io_uring is to try and issue an sqe as non-blocking first,
80
        /// and if that fails, execute it in an async manner.
81
        ///
82
        /// To support more efficient overlapped operation of requests
83
        /// that the application knows/assumes will always (or most of the time) block,
84
        /// the application can ask for an sqe to be issued async from the start.
85
        const ASYNC = 1 << sys::IOSQE_ASYNC_BIT;
86
87
        /// Conceptually the kernel holds a set of buffers organized into groups. When you issue a
88
        /// request with this flag and set `buf_group` to a valid buffer group ID (e.g.
89
        /// [`buf_group` on `Read`](crate::opcode::Read::buf_group)) then once the file descriptor
90
        /// becomes ready the kernel will try to take a buffer from the group.
91
        ///
92
        /// If there are no buffers in the group, your request will fail with `-ENOBUFS`. Otherwise,
93
        /// the corresponding [`cqueue::Entry::flags`](crate::cqueue::Entry::flags) will contain the
94
        /// chosen buffer ID, encoded with:
95
        ///
96
        /// ```text
97
        /// (buffer_id << IORING_CQE_BUFFER_SHIFT) | IORING_CQE_F_BUFFER
98
        /// ```
99
        ///
100
        /// You can use [`buffer_select`](crate::cqueue::buffer_select) to take the buffer ID.
101
        ///
102
        /// The buffer will then be removed from the group and won't be usable by other requests
103
        /// anymore.
104
        ///
105
        /// You can provide new buffers in a group with
106
        /// [`ProvideBuffers`](crate::opcode::ProvideBuffers).
107
        ///
108
        /// See also [the LWN thread on automatic buffer
109
        /// selection](https://lwn.net/Articles/815491/).
110
        const BUFFER_SELECT = 1 << sys::IOSQE_BUFFER_SELECT_BIT;
111
112
        /// Don't post CQE if request succeeded.
113
        const SKIP_SUCCESS = 1 << sys::IOSQE_CQE_SKIP_SUCCESS_BIT;
114
    }
115
}
116
117
impl<E: EntryMarker> Inner<E> {
118
    #[rustfmt::skip]
119
0
    pub(crate) unsafe fn new(
120
0
        sq_mmap: &Mmap,
121
0
        sqe_mmap: &Mmap,
122
0
        p: &sys::io_uring_params,
123
0
    ) -> Self {
124
0
        let head         = sq_mmap.offset(p.sq_off.head        ) as *const atomic::AtomicU32;
125
0
        let tail         = sq_mmap.offset(p.sq_off.tail        ) as *const atomic::AtomicU32;
126
0
        let ring_mask    = sq_mmap.offset(p.sq_off.ring_mask   ).cast::<u32>().read();
127
0
        let ring_entries = sq_mmap.offset(p.sq_off.ring_entries).cast::<u32>().read();
128
0
        let flags        = sq_mmap.offset(p.sq_off.flags       ) as *const atomic::AtomicU32;
129
0
        let dropped      = sq_mmap.offset(p.sq_off.dropped     ) as *const atomic::AtomicU32;
130
0
        let array        = sq_mmap.offset(p.sq_off.array       ) as *mut u32;
131
0
132
0
        let sqes         = sqe_mmap.as_mut_ptr() as *mut E;
133
134
        // To keep it simple, map it directly to `sqes`.
135
0
        for i in 0..ring_entries {
136
0
            array.add(i as usize).write_volatile(i);
137
0
        }
138
139
0
        Self {
140
0
            head,
141
0
            tail,
142
0
            ring_mask,
143
0
            ring_entries,
144
0
            flags,
145
0
            dropped,
146
0
            sqes,
147
0
        }
148
0
    }
149
150
    #[inline]
151
0
    pub(crate) unsafe fn borrow_shared(&self) -> SubmissionQueue<'_, E> {
152
0
        SubmissionQueue {
153
0
            head: (*self.head).load(atomic::Ordering::Acquire),
154
0
            tail: unsync_load(self.tail),
155
0
            queue: self,
156
0
        }
157
0
    }
Unexecuted instantiation: <io_uring::squeue::Inner<io_uring::squeue::Entry>>::borrow_shared
Unexecuted instantiation: <io_uring::squeue::Inner<_>>::borrow_shared
158
159
    #[inline]
160
0
    pub(crate) fn borrow(&mut self) -> SubmissionQueue<'_, E> {
161
0
        unsafe { self.borrow_shared() }
162
0
    }
Unexecuted instantiation: <io_uring::squeue::Inner<io_uring::squeue::Entry>>::borrow
Unexecuted instantiation: <io_uring::squeue::Inner<_>>::borrow
163
}
164
165
impl<E: EntryMarker> SubmissionQueue<'_, E> {
166
    /// Synchronize this type with the real submission queue.
167
    ///
168
    /// This will flush any entries added by [`push`](Self::push) or
169
    /// [`push_multiple`](Self::push_multiple) and will update the queue's length if the kernel has
170
    /// consumed some entries in the meantime.
171
    #[inline]
172
0
    pub fn sync(&mut self) {
173
0
        unsafe {
174
0
            (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
175
0
            self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
176
0
        }
177
0
    }
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::sync
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::sync
178
179
    /// When [`is_setup_sqpoll`](crate::Parameters::is_setup_sqpoll) is set, whether the kernel
180
    /// threads has gone to sleep and requires a system call to wake it up.
181
    #[inline]
182
0
    pub fn need_wakeup(&self) -> bool {
183
0
        unsafe {
184
0
            (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP != 0
185
0
        }
186
0
    }
187
188
    /// The number of invalid submission queue entries that have been encountered in the ring
189
    /// buffer.
190
0
    pub fn dropped(&self) -> u32 {
191
0
        unsafe { (*self.queue.dropped).load(atomic::Ordering::Acquire) }
192
0
    }
193
194
    /// Returns `true` if the completion queue ring is overflown.
195
0
    pub fn cq_overflow(&self) -> bool {
196
0
        unsafe {
197
0
            (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0
198
0
        }
199
0
    }
200
201
    /// Returns `true` if completions are pending that should be processed. Only relevant when used
202
    /// in conjuction with the `setup_taskrun_flag` function. Available since 5.19.
203
0
    pub fn taskrun(&self) -> bool {
204
0
        unsafe { (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_TASKRUN != 0 }
205
0
    }
206
207
    /// Get the total number of entries in the submission queue ring buffer.
208
    #[inline]
209
0
    pub fn capacity(&self) -> usize {
210
0
        self.queue.ring_entries as usize
211
0
    }
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::capacity
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::capacity
212
213
    /// Get the number of submission queue events in the ring buffer.
214
    #[inline]
215
0
    pub fn len(&self) -> usize {
216
0
        self.tail.wrapping_sub(self.head) as usize
217
0
    }
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::len
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::len
218
219
    /// Returns `true` if the submission queue ring buffer is empty.
220
    #[inline]
221
0
    pub fn is_empty(&self) -> bool {
222
0
        self.len() == 0
223
0
    }
224
225
    /// Returns `true` if the submission queue ring buffer has reached capacity, and no more events
226
    /// can be added before the kernel consumes some.
227
    #[inline]
228
0
    pub fn is_full(&self) -> bool {
229
0
        self.len() == self.capacity()
230
0
    }
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::is_full
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::is_full
231
232
    /// Attempts to push an entry into the queue.
233
    /// If the queue is full, an error is returned.
234
    ///
235
    /// # Safety
236
    ///
237
    /// Developers must ensure that parameters of the entry (such as buffer) are valid and will
238
    /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
239
    #[inline]
240
0
    pub unsafe fn push(&mut self, entry: &E) -> Result<(), PushError> {
241
0
        if !self.is_full() {
242
0
            self.push_unchecked(entry);
243
0
            Ok(())
244
        } else {
245
0
            Err(PushError)
246
        }
247
0
    }
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::push
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::push
248
249
    /// Attempts to push several entries into the queue.
250
    /// If the queue does not have space for all of the entries, an error is returned.
251
    ///
252
    /// # Safety
253
    ///
254
    /// Developers must ensure that parameters of all the entries (such as buffer) are valid and
255
    /// will be valid for the entire duration of the operation, otherwise it may cause memory
256
    /// problems.
257
    #[inline]
258
0
    pub unsafe fn push_multiple(&mut self, entries: &[E]) -> Result<(), PushError> {
259
0
        if self.capacity() - self.len() < entries.len() {
260
0
            return Err(PushError);
261
0
        }
262
263
0
        for entry in entries {
264
0
            self.push_unchecked(entry);
265
0
        }
266
267
0
        Ok(())
268
0
    }
269
270
    #[inline]
271
0
    unsafe fn push_unchecked(&mut self, entry: &E) {
272
0
        *self
273
0
            .queue
274
0
            .sqes
275
0
            .add((self.tail & self.queue.ring_mask) as usize) = entry.clone();
276
0
        self.tail = self.tail.wrapping_add(1);
277
0
    }
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::push_unchecked
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::push_unchecked
278
}
279
280
impl<E: EntryMarker> Drop for SubmissionQueue<'_, E> {
281
    #[inline]
282
0
    fn drop(&mut self) {
283
0
        unsafe { &*self.queue.tail }.store(self.tail, atomic::Ordering::Release);
284
0
    }
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue as core::ops::drop::Drop>::drop
Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_> as core::ops::drop::Drop>::drop
285
}
286
287
impl Entry {
288
    /// Set the submission event's [flags](Flags).
289
    #[inline]
290
0
    pub fn flags(mut self, flags: Flags) -> Entry {
291
0
        self.0.flags |= flags.bits();
292
0
        self
293
0
    }
Unexecuted instantiation: <io_uring::squeue::Entry>::flags
Unexecuted instantiation: <io_uring::squeue::Entry>::flags
294
295
    /// Set the user data. This is an application-supplied value that will be passed straight
296
    /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
297
    #[inline]
298
0
    pub fn user_data(mut self, user_data: u64) -> Entry {
299
0
        self.0.user_data = user_data;
300
0
        self
301
0
    }
Unexecuted instantiation: <io_uring::squeue::Entry>::user_data
Unexecuted instantiation: <io_uring::squeue::Entry>::user_data
302
303
    /// Get the previously application-supplied user data.
304
    #[inline]
305
0
    pub fn get_user_data(&self) -> u64 {
306
0
        self.0.user_data
307
0
    }
308
309
    /// Set the personality of this event. You can obtain a personality using
310
    /// [`Submitter::register_personality`](crate::Submitter::register_personality).
311
0
    pub fn personality(mut self, personality: u16) -> Entry {
312
0
        self.0.personality = personality;
313
0
        self
314
0
    }
315
}
316
317
impl private::Sealed for Entry {}
318
319
impl EntryMarker for Entry {
320
    const BUILD_FLAGS: u32 = 0;
321
}
322
323
impl Clone for Entry {
324
0
    fn clone(&self) -> Entry {
325
0
        // io_uring_sqe doesn't implement Clone due to the 'cmd' incomplete array field.
326
0
        Entry(unsafe { mem::transmute_copy(&self.0) })
327
0
    }
328
}
329
330
impl Debug for Entry {
331
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
332
0
        f.debug_struct("Entry")
333
0
            .field("op_code", &self.0.opcode)
334
0
            .field("flags", &self.0.flags)
335
0
            .field("user_data", &self.0.user_data)
336
0
            .finish()
337
0
    }
338
}
339
340
impl Entry128 {
341
    /// Set the submission event's [flags](Flags).
342
    #[inline]
343
0
    pub fn flags(mut self, flags: Flags) -> Entry128 {
344
0
        self.0 .0.flags |= flags.bits();
345
0
        self
346
0
    }
347
348
    /// Set the user data. This is an application-supplied value that will be passed straight
349
    /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
350
    #[inline]
351
0
    pub fn user_data(mut self, user_data: u64) -> Entry128 {
352
0
        self.0 .0.user_data = user_data;
353
0
        self
354
0
    }
355
356
    /// Set the personality of this event. You can obtain a personality using
357
    /// [`Submitter::register_personality`](crate::Submitter::register_personality).
358
    #[inline]
359
0
    pub fn personality(mut self, personality: u16) -> Entry128 {
360
0
        self.0 .0.personality = personality;
361
0
        self
362
0
    }
363
}
364
365
impl private::Sealed for Entry128 {}
366
367
impl EntryMarker for Entry128 {
368
    const BUILD_FLAGS: u32 = sys::IORING_SETUP_SQE128;
369
}
370
371
impl From<Entry> for Entry128 {
372
0
    fn from(entry: Entry) -> Entry128 {
373
0
        Entry128(entry, [0u8; 64])
374
0
    }
375
}
376
377
impl Debug for Entry128 {
378
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
379
0
        f.debug_struct("Entry128")
380
0
            .field("op_code", &self.0 .0.opcode)
381
0
            .field("flags", &self.0 .0.flags)
382
0
            .field("user_data", &self.0 .0.user_data)
383
0
            .finish()
384
0
    }
385
}
386
387
/// An error pushing to the submission queue due to it being full.
388
0
#[derive(Debug, Clone, PartialEq, Eq)]
389
#[non_exhaustive]
390
pub struct PushError;
391
392
impl Display for PushError {
393
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
394
0
        f.write_str("submission queue is full")
395
0
    }
396
}
397
398
impl Error for PushError {}
399
400
impl<E: EntryMarker> Debug for SubmissionQueue<'_, E> {
401
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
402
0
        let mut d = f.debug_list();
403
0
        let mut pos = self.head;
404
0
        while pos != self.tail {
405
0
            let entry: &E = unsafe { &*self.queue.sqes.add((pos & self.queue.ring_mask) as usize) };
406
0
            d.entry(&entry);
407
0
            pos = pos.wrapping_add(1);
408
0
        }
409
0
        d.finish()
410
0
    }
411
}