/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-sink-0.3.28/src/lib.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Asynchronous sinks |
2 | | //! |
3 | | //! This crate contains the `Sink` trait which allows values to be sent |
4 | | //! asynchronously. |
5 | | |
6 | | #![cfg_attr(not(feature = "std"), no_std)] |
7 | | #![warn(missing_debug_implementations, missing_docs, rust_2018_idioms, unreachable_pub)] |
8 | | // It cannot be included in the published code because this lints have false positives in the minimum required version. |
9 | | #![cfg_attr(test, warn(single_use_lifetimes))] |
10 | | #![doc(test( |
11 | | no_crate_inject, |
12 | | attr( |
13 | | deny(warnings, rust_2018_idioms, single_use_lifetimes), |
14 | | allow(dead_code, unused_assignments, unused_variables) |
15 | | ) |
16 | | ))] |
17 | | |
18 | | #[cfg(feature = "alloc")] |
19 | | extern crate alloc; |
20 | | |
21 | | use core::ops::DerefMut; |
22 | | use core::pin::Pin; |
23 | | use core::task::{Context, Poll}; |
24 | | |
25 | | /// A `Sink` is a value into which other values can be sent, asynchronously. |
26 | | /// |
27 | | /// Basic examples of sinks include the sending side of: |
28 | | /// |
29 | | /// - Channels |
30 | | /// - Sockets |
31 | | /// - Pipes |
32 | | /// |
33 | | /// In addition to such "primitive" sinks, it's typical to layer additional |
34 | | /// functionality, such as buffering, on top of an existing sink. |
35 | | /// |
36 | | /// Sending to a sink is "asynchronous" in the sense that the value may not be |
37 | | /// sent in its entirety immediately. Instead, values are sent in a two-phase |
38 | | /// way: first by initiating a send, and then by polling for completion. This |
39 | | /// two-phase setup is analogous to buffered writing in synchronous code, where |
40 | | /// writes often succeed immediately, but internally are buffered and are |
41 | | /// *actually* written only upon flushing. |
42 | | /// |
43 | | /// In addition, the `Sink` may be *full*, in which case it is not even possible |
44 | | /// to start the sending process. |
45 | | /// |
46 | | /// As with `Future` and `Stream`, the `Sink` trait is built from a few core |
47 | | /// required methods, and a host of default methods for working in a |
48 | | /// higher-level way. The `Sink::send_all` combinator is of particular |
49 | | /// importance: you can use it to send an entire stream to a sink, which is |
50 | | /// the simplest way to ultimately consume a stream. |
51 | | #[must_use = "sinks do nothing unless polled"] |
52 | | pub trait Sink<Item> { |
53 | | /// The type of value produced by the sink when an error occurs. |
54 | | type Error; |
55 | | |
56 | | /// Attempts to prepare the `Sink` to receive a value. |
57 | | /// |
58 | | /// This method must be called and return `Poll::Ready(Ok(()))` prior to |
59 | | /// each call to `start_send`. |
60 | | /// |
61 | | /// This method returns `Poll::Ready` once the underlying sink is ready to |
62 | | /// receive data. If this method returns `Poll::Pending`, the current task |
63 | | /// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready` |
64 | | /// should be called again. |
65 | | /// |
66 | | /// In most cases, if the sink encounters an error, the sink will |
67 | | /// permanently be unable to receive items. |
68 | | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; |
69 | | |
70 | | /// Begin the process of sending a value to the sink. |
71 | | /// Each call to this function must be preceded by a successful call to |
72 | | /// `poll_ready` which returned `Poll::Ready(Ok(()))`. |
73 | | /// |
74 | | /// As the name suggests, this method only *begins* the process of sending |
75 | | /// the item. If the sink employs buffering, the item isn't fully processed |
76 | | /// until the buffer is fully flushed. Since sinks are designed to work with |
77 | | /// asynchronous I/O, the process of actually writing out the data to an |
78 | | /// underlying object takes place asynchronously. **You *must* use |
79 | | /// `poll_flush` or `poll_close` in order to guarantee completion of a |
80 | | /// send**. |
81 | | /// |
82 | | /// Implementations of `poll_ready` and `start_send` will usually involve |
83 | | /// flushing behind the scenes in order to make room for new messages. |
84 | | /// It is only necessary to call `poll_flush` if you need to guarantee that |
85 | | /// *all* of the items placed into the `Sink` have been sent. |
86 | | /// |
87 | | /// In most cases, if the sink encounters an error, the sink will |
88 | | /// permanently be unable to receive items. |
89 | | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>; |
90 | | |
91 | | /// Flush any remaining output from this sink. |
92 | | /// |
93 | | /// Returns `Poll::Ready(Ok(()))` when no buffered items remain. If this |
94 | | /// value is returned then it is guaranteed that all previous values sent |
95 | | /// via `start_send` have been flushed. |
96 | | /// |
97 | | /// Returns `Poll::Pending` if there is more work left to do, in which |
98 | | /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when |
99 | | /// `poll_flush` should be called again. |
100 | | /// |
101 | | /// In most cases, if the sink encounters an error, the sink will |
102 | | /// permanently be unable to receive items. |
103 | | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; |
104 | | |
105 | | /// Flush any remaining output and close this sink, if necessary. |
106 | | /// |
107 | | /// Returns `Poll::Ready(Ok(()))` when no buffered items remain and the sink |
108 | | /// has been successfully closed. |
109 | | /// |
110 | | /// Returns `Poll::Pending` if there is more work left to do, in which |
111 | | /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when |
112 | | /// `poll_close` should be called again. |
113 | | /// |
114 | | /// If this function encounters an error, the sink should be considered to |
115 | | /// have failed permanently, and no more `Sink` methods should be called. |
116 | | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; |
117 | | } |
118 | | |
119 | | impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for &mut S { |
120 | | type Error = S::Error; |
121 | | |
122 | 0 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
123 | 0 | Pin::new(&mut **self).poll_ready(cx) |
124 | 0 | } |
125 | | |
126 | 0 | fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
127 | 0 | Pin::new(&mut **self).start_send(item) |
128 | 0 | } |
129 | | |
130 | 0 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
131 | 0 | Pin::new(&mut **self).poll_flush(cx) |
132 | 0 | } |
133 | | |
134 | 0 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
135 | 0 | Pin::new(&mut **self).poll_close(cx) |
136 | 0 | } |
137 | | } |
138 | | |
139 | | impl<P, Item> Sink<Item> for Pin<P> |
140 | | where |
141 | | P: DerefMut + Unpin, |
142 | | P::Target: Sink<Item>, |
143 | | { |
144 | | type Error = <P::Target as Sink<Item>>::Error; |
145 | | |
146 | 0 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
147 | 0 | self.get_mut().as_mut().poll_ready(cx) |
148 | 0 | } |
149 | | |
150 | 0 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
151 | 0 | self.get_mut().as_mut().start_send(item) |
152 | 0 | } |
153 | | |
154 | 0 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
155 | 0 | self.get_mut().as_mut().poll_flush(cx) |
156 | 0 | } |
157 | | |
158 | 0 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
159 | 0 | self.get_mut().as_mut().poll_close(cx) |
160 | 0 | } |
161 | | } |
162 | | |
163 | | #[cfg(feature = "alloc")] |
164 | | mod if_alloc { |
165 | | use super::*; |
166 | | use core::convert::Infallible as Never; |
167 | | |
168 | | impl<T> Sink<T> for alloc::vec::Vec<T> { |
169 | | type Error = Never; |
170 | | |
171 | 0 | fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
172 | 0 | Poll::Ready(Ok(())) |
173 | 0 | } |
174 | | |
175 | 0 | fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { |
176 | 0 | // TODO: impl<T> Unpin for Vec<T> {} |
177 | 0 | unsafe { self.get_unchecked_mut() }.push(item); |
178 | 0 | Ok(()) |
179 | 0 | } |
180 | | |
181 | 0 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
182 | 0 | Poll::Ready(Ok(())) |
183 | 0 | } |
184 | | |
185 | 0 | fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
186 | 0 | Poll::Ready(Ok(())) |
187 | 0 | } |
188 | | } |
189 | | |
190 | | impl<T> Sink<T> for alloc::collections::VecDeque<T> { |
191 | | type Error = Never; |
192 | | |
193 | 0 | fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
194 | 0 | Poll::Ready(Ok(())) |
195 | 0 | } |
196 | | |
197 | 0 | fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { |
198 | 0 | // TODO: impl<T> Unpin for Vec<T> {} |
199 | 0 | unsafe { self.get_unchecked_mut() }.push_back(item); |
200 | 0 | Ok(()) |
201 | 0 | } |
202 | | |
203 | 0 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
204 | 0 | Poll::Ready(Ok(())) |
205 | 0 | } |
206 | | |
207 | 0 | fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
208 | 0 | Poll::Ready(Ok(())) |
209 | 0 | } |
210 | | } |
211 | | |
212 | | impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for alloc::boxed::Box<S> { |
213 | | type Error = S::Error; |
214 | | |
215 | 0 | fn poll_ready( |
216 | 0 | mut self: Pin<&mut Self>, |
217 | 0 | cx: &mut Context<'_>, |
218 | 0 | ) -> Poll<Result<(), Self::Error>> { |
219 | 0 | Pin::new(&mut **self).poll_ready(cx) |
220 | 0 | } |
221 | | |
222 | 0 | fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
223 | 0 | Pin::new(&mut **self).start_send(item) |
224 | 0 | } |
225 | | |
226 | 0 | fn poll_flush( |
227 | 0 | mut self: Pin<&mut Self>, |
228 | 0 | cx: &mut Context<'_>, |
229 | 0 | ) -> Poll<Result<(), Self::Error>> { |
230 | 0 | Pin::new(&mut **self).poll_flush(cx) |
231 | 0 | } |
232 | | |
233 | 0 | fn poll_close( |
234 | 0 | mut self: Pin<&mut Self>, |
235 | 0 | cx: &mut Context<'_>, |
236 | 0 | ) -> Poll<Result<(), Self::Error>> { |
237 | 0 | Pin::new(&mut **self).poll_close(cx) |
238 | 0 | } |
239 | | } |
240 | | } |