/rust/registry/src/index.crates.io-6f17d22bba15001f/io-uring-0.6.3/src/squeue.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Submission Queue |
2 | | |
3 | | use std::error::Error; |
4 | | use std::fmt::{self, Debug, Display, Formatter}; |
5 | | use std::mem; |
6 | | use std::sync::atomic; |
7 | | |
8 | | use crate::sys; |
9 | | use crate::util::{private, unsync_load, Mmap}; |
10 | | |
11 | | use bitflags::bitflags; |
12 | | |
13 | | pub(crate) struct Inner<E: EntryMarker> { |
14 | | pub(crate) head: *const atomic::AtomicU32, |
15 | | pub(crate) tail: *const atomic::AtomicU32, |
16 | | pub(crate) ring_mask: u32, |
17 | | pub(crate) ring_entries: u32, |
18 | | pub(crate) flags: *const atomic::AtomicU32, |
19 | | dropped: *const atomic::AtomicU32, |
20 | | |
21 | | pub(crate) sqes: *mut E, |
22 | | } |
23 | | |
24 | | /// An io_uring instance's submission queue. This is used to send I/O requests to the kernel. |
25 | | pub struct SubmissionQueue<'a, E: EntryMarker = Entry> { |
26 | | head: u32, |
27 | | tail: u32, |
28 | | queue: &'a Inner<E>, |
29 | | } |
30 | | |
31 | | /// A submission queue entry (SQE), representing a request for an I/O operation. |
32 | | /// |
33 | | /// This is implemented for [`Entry`] and [`Entry128`]. |
34 | | pub trait EntryMarker: Clone + Debug + From<Entry> + private::Sealed { |
35 | | const BUILD_FLAGS: u32; |
36 | | } |
37 | | |
38 | | /// A 64-byte submission queue entry (SQE), representing a request for an I/O operation. |
39 | | /// |
40 | | /// These can be created via opcodes in [`opcode`](crate::opcode). |
41 | | #[repr(C)] |
42 | | pub struct Entry(pub(crate) sys::io_uring_sqe); |
43 | | |
44 | | /// A 128-byte submission queue entry (SQE), representing a request for an I/O operation. |
45 | | /// |
46 | | /// These can be created via opcodes in [`opcode`](crate::opcode). |
47 | | #[repr(C)] |
48 | 0 | #[derive(Clone)] |
49 | | pub struct Entry128(pub(crate) Entry, pub(crate) [u8; 64]); |
50 | | |
51 | | #[test] |
52 | | fn test_entry_sizes() { |
53 | | assert_eq!(mem::size_of::<Entry>(), 64); |
54 | | assert_eq!(mem::size_of::<Entry128>(), 128); |
55 | | } |
56 | | |
57 | | bitflags! { |
58 | | /// Submission flags |
59 | | pub struct Flags: u8 { |
60 | | /// When this flag is specified, |
61 | | /// `fd` is an index into the files array registered with the io_uring instance. |
62 | | #[doc(hidden)] |
63 | | const FIXED_FILE = 1 << sys::IOSQE_FIXED_FILE_BIT; |
64 | | |
65 | | /// When this flag is specified, |
66 | | /// the SQE will not be started before previously submitted SQEs have completed, |
67 | | /// and new SQEs will not be started before this one completes. |
68 | | const IO_DRAIN = 1 << sys::IOSQE_IO_DRAIN_BIT; |
69 | | |
70 | | /// When this flag is specified, |
71 | | /// it forms a link with the next SQE in the submission ring. |
72 | | /// That next SQE will not be started before this one completes. |
73 | | const IO_LINK = 1 << sys::IOSQE_IO_LINK_BIT; |
74 | | |
75 | | /// Like [`IO_LINK`](Self::IO_LINK), but it doesn’t sever regardless of the completion |
76 | | /// result. |
77 | | const IO_HARDLINK = 1 << sys::IOSQE_IO_HARDLINK_BIT; |
78 | | |
79 | | /// Normal operation for io_uring is to try and issue an sqe as non-blocking first, |
80 | | /// and if that fails, execute it in an async manner. |
81 | | /// |
82 | | /// To support more efficient overlapped operation of requests |
83 | | /// that the application knows/assumes will always (or most of the time) block, |
84 | | /// the application can ask for an sqe to be issued async from the start. |
85 | | const ASYNC = 1 << sys::IOSQE_ASYNC_BIT; |
86 | | |
87 | | /// Conceptually the kernel holds a set of buffers organized into groups. When you issue a |
88 | | /// request with this flag and set `buf_group` to a valid buffer group ID (e.g. |
89 | | /// [`buf_group` on `Read`](crate::opcode::Read::buf_group)) then once the file descriptor |
90 | | /// becomes ready the kernel will try to take a buffer from the group. |
91 | | /// |
92 | | /// If there are no buffers in the group, your request will fail with `-ENOBUFS`. Otherwise, |
93 | | /// the corresponding [`cqueue::Entry::flags`](crate::cqueue::Entry::flags) will contain the |
94 | | /// chosen buffer ID, encoded with: |
95 | | /// |
96 | | /// ```text |
97 | | /// (buffer_id << IORING_CQE_BUFFER_SHIFT) | IORING_CQE_F_BUFFER |
98 | | /// ``` |
99 | | /// |
100 | | /// You can use [`buffer_select`](crate::cqueue::buffer_select) to take the buffer ID. |
101 | | /// |
102 | | /// The buffer will then be removed from the group and won't be usable by other requests |
103 | | /// anymore. |
104 | | /// |
105 | | /// You can provide new buffers in a group with |
106 | | /// [`ProvideBuffers`](crate::opcode::ProvideBuffers). |
107 | | /// |
108 | | /// See also [the LWN thread on automatic buffer |
109 | | /// selection](https://lwn.net/Articles/815491/). |
110 | | const BUFFER_SELECT = 1 << sys::IOSQE_BUFFER_SELECT_BIT; |
111 | | |
112 | | /// Don't post CQE if request succeeded. |
113 | | const SKIP_SUCCESS = 1 << sys::IOSQE_CQE_SKIP_SUCCESS_BIT; |
114 | | } |
115 | | } |
116 | | |
117 | | impl<E: EntryMarker> Inner<E> { |
118 | | #[rustfmt::skip] |
119 | 0 | pub(crate) unsafe fn new( |
120 | 0 | sq_mmap: &Mmap, |
121 | 0 | sqe_mmap: &Mmap, |
122 | 0 | p: &sys::io_uring_params, |
123 | 0 | ) -> Self { |
124 | 0 | let head = sq_mmap.offset(p.sq_off.head ) as *const atomic::AtomicU32; |
125 | 0 | let tail = sq_mmap.offset(p.sq_off.tail ) as *const atomic::AtomicU32; |
126 | 0 | let ring_mask = sq_mmap.offset(p.sq_off.ring_mask ).cast::<u32>().read(); |
127 | 0 | let ring_entries = sq_mmap.offset(p.sq_off.ring_entries).cast::<u32>().read(); |
128 | 0 | let flags = sq_mmap.offset(p.sq_off.flags ) as *const atomic::AtomicU32; |
129 | 0 | let dropped = sq_mmap.offset(p.sq_off.dropped ) as *const atomic::AtomicU32; |
130 | 0 | let array = sq_mmap.offset(p.sq_off.array ) as *mut u32; |
131 | 0 |
|
132 | 0 | let sqes = sqe_mmap.as_mut_ptr() as *mut E; |
133 | | |
134 | | // To keep it simple, map it directly to `sqes`. |
135 | 0 | for i in 0..ring_entries { |
136 | 0 | array.add(i as usize).write_volatile(i); |
137 | 0 | } |
138 | | |
139 | 0 | Self { |
140 | 0 | head, |
141 | 0 | tail, |
142 | 0 | ring_mask, |
143 | 0 | ring_entries, |
144 | 0 | flags, |
145 | 0 | dropped, |
146 | 0 | sqes, |
147 | 0 | } |
148 | 0 | } |
149 | | |
150 | | #[inline] |
151 | 0 | pub(crate) unsafe fn borrow_shared(&self) -> SubmissionQueue<'_, E> { |
152 | 0 | SubmissionQueue { |
153 | 0 | head: (*self.head).load(atomic::Ordering::Acquire), |
154 | 0 | tail: unsync_load(self.tail), |
155 | 0 | queue: self, |
156 | 0 | } |
157 | 0 | } Unexecuted instantiation: <io_uring::squeue::Inner<io_uring::squeue::Entry>>::borrow_shared Unexecuted instantiation: <io_uring::squeue::Inner<_>>::borrow_shared |
158 | | |
159 | | #[inline] |
160 | 0 | pub(crate) fn borrow(&mut self) -> SubmissionQueue<'_, E> { |
161 | 0 | unsafe { self.borrow_shared() } |
162 | 0 | } Unexecuted instantiation: <io_uring::squeue::Inner<io_uring::squeue::Entry>>::borrow Unexecuted instantiation: <io_uring::squeue::Inner<_>>::borrow |
163 | | } |
164 | | |
165 | | impl<E: EntryMarker> SubmissionQueue<'_, E> { |
166 | | /// Synchronize this type with the real submission queue. |
167 | | /// |
168 | | /// This will flush any entries added by [`push`](Self::push) or |
169 | | /// [`push_multiple`](Self::push_multiple) and will update the queue's length if the kernel has |
170 | | /// consumed some entries in the meantime. |
171 | | #[inline] |
172 | 0 | pub fn sync(&mut self) { |
173 | 0 | unsafe { |
174 | 0 | (*self.queue.tail).store(self.tail, atomic::Ordering::Release); |
175 | 0 | self.head = (*self.queue.head).load(atomic::Ordering::Acquire); |
176 | 0 | } |
177 | 0 | } Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::sync Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::sync |
178 | | |
179 | | /// When [`is_setup_sqpoll`](crate::Parameters::is_setup_sqpoll) is set, whether the kernel |
180 | | /// threads has gone to sleep and requires a system call to wake it up. |
181 | | #[inline] |
182 | 0 | pub fn need_wakeup(&self) -> bool { |
183 | 0 | unsafe { |
184 | 0 | (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP != 0 |
185 | 0 | } |
186 | 0 | } |
187 | | |
188 | | /// The number of invalid submission queue entries that have been encountered in the ring |
189 | | /// buffer. |
190 | 0 | pub fn dropped(&self) -> u32 { |
191 | 0 | unsafe { (*self.queue.dropped).load(atomic::Ordering::Acquire) } |
192 | 0 | } |
193 | | |
194 | | /// Returns `true` if the completion queue ring is overflown. |
195 | 0 | pub fn cq_overflow(&self) -> bool { |
196 | 0 | unsafe { |
197 | 0 | (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0 |
198 | 0 | } |
199 | 0 | } |
200 | | |
201 | | /// Returns `true` if completions are pending that should be processed. Only relevant when used |
202 | | /// in conjuction with the `setup_taskrun_flag` function. Available since 5.19. |
203 | 0 | pub fn taskrun(&self) -> bool { |
204 | 0 | unsafe { (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_TASKRUN != 0 } |
205 | 0 | } |
206 | | |
207 | | /// Get the total number of entries in the submission queue ring buffer. |
208 | | #[inline] |
209 | 0 | pub fn capacity(&self) -> usize { |
210 | 0 | self.queue.ring_entries as usize |
211 | 0 | } Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::capacity Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::capacity |
212 | | |
213 | | /// Get the number of submission queue events in the ring buffer. |
214 | | #[inline] |
215 | 0 | pub fn len(&self) -> usize { |
216 | 0 | self.tail.wrapping_sub(self.head) as usize |
217 | 0 | } Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::len Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::len |
218 | | |
219 | | /// Returns `true` if the submission queue ring buffer is empty. |
220 | | #[inline] |
221 | 0 | pub fn is_empty(&self) -> bool { |
222 | 0 | self.len() == 0 |
223 | 0 | } |
224 | | |
225 | | /// Returns `true` if the submission queue ring buffer has reached capacity, and no more events |
226 | | /// can be added before the kernel consumes some. |
227 | | #[inline] |
228 | 0 | pub fn is_full(&self) -> bool { |
229 | 0 | self.len() == self.capacity() |
230 | 0 | } Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::is_full Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::is_full |
231 | | |
232 | | /// Attempts to push an entry into the queue. |
233 | | /// If the queue is full, an error is returned. |
234 | | /// |
235 | | /// # Safety |
236 | | /// |
237 | | /// Developers must ensure that parameters of the entry (such as buffer) are valid and will |
238 | | /// be valid for the entire duration of the operation, otherwise it may cause memory problems. |
239 | | #[inline] |
240 | 0 | pub unsafe fn push(&mut self, entry: &E) -> Result<(), PushError> { |
241 | 0 | if !self.is_full() { |
242 | 0 | self.push_unchecked(entry); |
243 | 0 | Ok(()) |
244 | | } else { |
245 | 0 | Err(PushError) |
246 | | } |
247 | 0 | } Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::push Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::push |
248 | | |
249 | | /// Attempts to push several entries into the queue. |
250 | | /// If the queue does not have space for all of the entries, an error is returned. |
251 | | /// |
252 | | /// # Safety |
253 | | /// |
254 | | /// Developers must ensure that parameters of all the entries (such as buffer) are valid and |
255 | | /// will be valid for the entire duration of the operation, otherwise it may cause memory |
256 | | /// problems. |
257 | | #[inline] |
258 | 0 | pub unsafe fn push_multiple(&mut self, entries: &[E]) -> Result<(), PushError> { |
259 | 0 | if self.capacity() - self.len() < entries.len() { |
260 | 0 | return Err(PushError); |
261 | 0 | } |
262 | | |
263 | 0 | for entry in entries { |
264 | 0 | self.push_unchecked(entry); |
265 | 0 | } |
266 | | |
267 | 0 | Ok(()) |
268 | 0 | } |
269 | | |
270 | | #[inline] |
271 | 0 | unsafe fn push_unchecked(&mut self, entry: &E) { |
272 | 0 | *self |
273 | 0 | .queue |
274 | 0 | .sqes |
275 | 0 | .add((self.tail & self.queue.ring_mask) as usize) = entry.clone(); |
276 | 0 | self.tail = self.tail.wrapping_add(1); |
277 | 0 | } Unexecuted instantiation: <io_uring::squeue::SubmissionQueue>::push_unchecked Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_>>::push_unchecked |
278 | | } |
279 | | |
280 | | impl<E: EntryMarker> Drop for SubmissionQueue<'_, E> { |
281 | | #[inline] |
282 | 0 | fn drop(&mut self) { |
283 | 0 | unsafe { &*self.queue.tail }.store(self.tail, atomic::Ordering::Release); |
284 | 0 | } Unexecuted instantiation: <io_uring::squeue::SubmissionQueue as core::ops::drop::Drop>::drop Unexecuted instantiation: <io_uring::squeue::SubmissionQueue<_> as core::ops::drop::Drop>::drop |
285 | | } |
286 | | |
287 | | impl Entry { |
288 | | /// Set the submission event's [flags](Flags). |
289 | | #[inline] |
290 | 0 | pub fn flags(mut self, flags: Flags) -> Entry { |
291 | 0 | self.0.flags |= flags.bits(); |
292 | 0 | self |
293 | 0 | } Unexecuted instantiation: <io_uring::squeue::Entry>::flags Unexecuted instantiation: <io_uring::squeue::Entry>::flags |
294 | | |
295 | | /// Set the user data. This is an application-supplied value that will be passed straight |
296 | | /// through into the [completion queue entry](crate::cqueue::Entry::user_data). |
297 | | #[inline] |
298 | 0 | pub fn user_data(mut self, user_data: u64) -> Entry { |
299 | 0 | self.0.user_data = user_data; |
300 | 0 | self |
301 | 0 | } Unexecuted instantiation: <io_uring::squeue::Entry>::user_data Unexecuted instantiation: <io_uring::squeue::Entry>::user_data |
302 | | |
303 | | /// Get the previously application-supplied user data. |
304 | | #[inline] |
305 | 0 | pub fn get_user_data(&self) -> u64 { |
306 | 0 | self.0.user_data |
307 | 0 | } |
308 | | |
309 | | /// Set the personality of this event. You can obtain a personality using |
310 | | /// [`Submitter::register_personality`](crate::Submitter::register_personality). |
311 | 0 | pub fn personality(mut self, personality: u16) -> Entry { |
312 | 0 | self.0.personality = personality; |
313 | 0 | self |
314 | 0 | } |
315 | | } |
316 | | |
317 | | impl private::Sealed for Entry {} |
318 | | |
319 | | impl EntryMarker for Entry { |
320 | | const BUILD_FLAGS: u32 = 0; |
321 | | } |
322 | | |
323 | | impl Clone for Entry { |
324 | 0 | fn clone(&self) -> Entry { |
325 | 0 | // io_uring_sqe doesn't implement Clone due to the 'cmd' incomplete array field. |
326 | 0 | Entry(unsafe { mem::transmute_copy(&self.0) }) |
327 | 0 | } |
328 | | } |
329 | | |
330 | | impl Debug for Entry { |
331 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
332 | 0 | f.debug_struct("Entry") |
333 | 0 | .field("op_code", &self.0.opcode) |
334 | 0 | .field("flags", &self.0.flags) |
335 | 0 | .field("user_data", &self.0.user_data) |
336 | 0 | .finish() |
337 | 0 | } |
338 | | } |
339 | | |
340 | | impl Entry128 { |
341 | | /// Set the submission event's [flags](Flags). |
342 | | #[inline] |
343 | 0 | pub fn flags(mut self, flags: Flags) -> Entry128 { |
344 | 0 | self.0 .0.flags |= flags.bits(); |
345 | 0 | self |
346 | 0 | } |
347 | | |
348 | | /// Set the user data. This is an application-supplied value that will be passed straight |
349 | | /// through into the [completion queue entry](crate::cqueue::Entry::user_data). |
350 | | #[inline] |
351 | 0 | pub fn user_data(mut self, user_data: u64) -> Entry128 { |
352 | 0 | self.0 .0.user_data = user_data; |
353 | 0 | self |
354 | 0 | } |
355 | | |
356 | | /// Set the personality of this event. You can obtain a personality using |
357 | | /// [`Submitter::register_personality`](crate::Submitter::register_personality). |
358 | | #[inline] |
359 | 0 | pub fn personality(mut self, personality: u16) -> Entry128 { |
360 | 0 | self.0 .0.personality = personality; |
361 | 0 | self |
362 | 0 | } |
363 | | } |
364 | | |
365 | | impl private::Sealed for Entry128 {} |
366 | | |
367 | | impl EntryMarker for Entry128 { |
368 | | const BUILD_FLAGS: u32 = sys::IORING_SETUP_SQE128; |
369 | | } |
370 | | |
371 | | impl From<Entry> for Entry128 { |
372 | 0 | fn from(entry: Entry) -> Entry128 { |
373 | 0 | Entry128(entry, [0u8; 64]) |
374 | 0 | } |
375 | | } |
376 | | |
377 | | impl Debug for Entry128 { |
378 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
379 | 0 | f.debug_struct("Entry128") |
380 | 0 | .field("op_code", &self.0 .0.opcode) |
381 | 0 | .field("flags", &self.0 .0.flags) |
382 | 0 | .field("user_data", &self.0 .0.user_data) |
383 | 0 | .finish() |
384 | 0 | } |
385 | | } |
386 | | |
387 | | /// An error pushing to the submission queue due to it being full. |
388 | 0 | #[derive(Debug, Clone, PartialEq, Eq)] |
389 | | #[non_exhaustive] |
390 | | pub struct PushError; |
391 | | |
392 | | impl Display for PushError { |
393 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
394 | 0 | f.write_str("submission queue is full") |
395 | 0 | } |
396 | | } |
397 | | |
398 | | impl Error for PushError {} |
399 | | |
400 | | impl<E: EntryMarker> Debug for SubmissionQueue<'_, E> { |
401 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
402 | 0 | let mut d = f.debug_list(); |
403 | 0 | let mut pos = self.head; |
404 | 0 | while pos != self.tail { |
405 | 0 | let entry: &E = unsafe { &*self.queue.sqes.add((pos & self.queue.ring_mask) as usize) }; |
406 | 0 | d.entry(&entry); |
407 | 0 | pos = pos.wrapping_add(1); |
408 | 0 | } |
409 | 0 | d.finish() |
410 | 0 | } |
411 | | } |