/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/sink/mod.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Asynchronous sinks. |
2 | | //! |
3 | | //! This module contains: |
4 | | //! |
5 | | //! - The [`Sink`] trait, which allows you to asynchronously write data. |
6 | | //! - The [`SinkExt`] trait, which provides adapters for chaining and composing |
7 | | //! sinks. |
8 | | |
9 | | use crate::future::{assert_future, Either}; |
10 | | use core::pin::Pin; |
11 | | use futures_core::future::Future; |
12 | | use futures_core::stream::{Stream, TryStream}; |
13 | | use futures_core::task::{Context, Poll}; |
14 | | |
15 | | #[cfg(feature = "compat")] |
16 | | use crate::compat::CompatSink; |
17 | | |
18 | | pub use futures_sink::Sink; |
19 | | |
20 | | mod close; |
21 | | pub use self::close::Close; |
22 | | |
23 | | mod drain; |
24 | | pub use self::drain::{drain, Drain}; |
25 | | |
26 | | mod fanout; |
27 | | pub use self::fanout::Fanout; |
28 | | |
29 | | mod feed; |
30 | | pub use self::feed::Feed; |
31 | | |
32 | | mod flush; |
33 | | pub use self::flush::Flush; |
34 | | |
35 | | mod err_into; |
36 | | pub use self::err_into::SinkErrInto; |
37 | | |
38 | | mod map_err; |
39 | | pub use self::map_err::SinkMapErr; |
40 | | |
41 | | mod send; |
42 | | pub use self::send::Send; |
43 | | |
44 | | mod send_all; |
45 | | pub use self::send_all::SendAll; |
46 | | |
47 | | mod unfold; |
48 | | pub use self::unfold::{unfold, Unfold}; |
49 | | |
50 | | mod with; |
51 | | pub use self::with::With; |
52 | | |
53 | | mod with_flat_map; |
54 | | pub use self::with_flat_map::WithFlatMap; |
55 | | |
56 | | #[cfg(feature = "alloc")] |
57 | | mod buffer; |
58 | | #[cfg(feature = "alloc")] |
59 | | pub use self::buffer::Buffer; |
60 | | |
61 | | impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {} |
62 | | |
63 | | /// An extension trait for `Sink`s that provides a variety of convenient |
64 | | /// combinator functions. |
65 | | pub trait SinkExt<Item>: Sink<Item> { |
66 | | /// Composes a function *in front of* the sink. |
67 | | /// |
68 | | /// This adapter produces a new sink that passes each value through the |
69 | | /// given function `f` before sending it to `self`. |
70 | | /// |
71 | | /// To process each value, `f` produces a *future*, which is then polled to |
72 | | /// completion before passing its result down to the underlying sink. If the |
73 | | /// future produces an error, that error is returned by the new sink. |
74 | | /// |
75 | | /// Note that this function consumes the given sink, returning a wrapped |
76 | | /// version, much like `Iterator::map`. |
77 | 0 | fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F> |
78 | 0 | where |
79 | 0 | F: FnMut(U) -> Fut, |
80 | 0 | Fut: Future<Output = Result<Item, E>>, |
81 | 0 | E: From<Self::Error>, |
82 | 0 | Self: Sized, |
83 | 0 | { |
84 | 0 | assert_sink::<U, E, _>(With::new(self, f)) |
85 | 0 | } |
86 | | |
87 | | /// Composes a function *in front of* the sink. |
88 | | /// |
89 | | /// This adapter produces a new sink that passes each value through the |
90 | | /// given function `f` before sending it to `self`. |
91 | | /// |
92 | | /// To process each value, `f` produces a *stream*, of which each value |
93 | | /// is passed to the underlying sink. A new value will not be accepted until |
94 | | /// the stream has been drained |
95 | | /// |
96 | | /// Note that this function consumes the given sink, returning a wrapped |
97 | | /// version, much like `Iterator::flat_map`. |
98 | | /// |
99 | | /// # Examples |
100 | | /// |
101 | | /// ``` |
102 | | /// # futures::executor::block_on(async { |
103 | | /// use futures::channel::mpsc; |
104 | | /// use futures::sink::SinkExt; |
105 | | /// use futures::stream::{self, StreamExt}; |
106 | | /// |
107 | | /// let (tx, rx) = mpsc::channel(5); |
108 | | /// |
109 | | /// let mut tx = tx.with_flat_map(|x| { |
110 | | /// stream::iter(vec![Ok(42); x]) |
111 | | /// }); |
112 | | /// |
113 | | /// tx.send(5).await.unwrap(); |
114 | | /// drop(tx); |
115 | | /// let received: Vec<i32> = rx.collect().await; |
116 | | /// assert_eq!(received, vec![42, 42, 42, 42, 42]); |
117 | | /// # }); |
118 | | /// ``` |
119 | 0 | fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F> |
120 | 0 | where |
121 | 0 | F: FnMut(U) -> St, |
122 | 0 | St: Stream<Item = Result<Item, Self::Error>>, |
123 | 0 | Self: Sized, |
124 | 0 | { |
125 | 0 | assert_sink::<U, Self::Error, _>(WithFlatMap::new(self, f)) |
126 | 0 | } |
127 | | |
128 | | /* |
129 | | fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F> |
130 | | where F: FnMut(U) -> Self::SinkItem, |
131 | | Self: Sized; |
132 | | |
133 | | fn with_filter<F>(self, f: F) -> WithFilter<Self, F> |
134 | | where F: FnMut(Self::SinkItem) -> bool, |
135 | | Self: Sized; |
136 | | |
137 | | fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F> |
138 | | where F: FnMut(U) -> Option<Self::SinkItem>, |
139 | | Self: Sized; |
140 | | */ |
141 | | |
142 | | /// Transforms the error returned by the sink. |
143 | 0 | fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F> |
144 | 0 | where |
145 | 0 | F: FnOnce(Self::Error) -> E, |
146 | 0 | Self: Sized, |
147 | 0 | { |
148 | 0 | assert_sink::<Item, E, _>(SinkMapErr::new(self, f)) |
149 | 0 | } |
150 | | |
151 | | /// Map this sink's error to a different error type using the `Into` trait. |
152 | | /// |
153 | | /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`. |
154 | 0 | fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E> |
155 | 0 | where |
156 | 0 | Self: Sized, |
157 | 0 | Self::Error: Into<E>, |
158 | 0 | { |
159 | 0 | assert_sink::<Item, E, _>(SinkErrInto::new(self)) |
160 | 0 | } |
161 | | |
162 | | /// Adds a fixed-size buffer to the current sink. |
163 | | /// |
164 | | /// The resulting sink will buffer up to `capacity` items when the |
165 | | /// underlying sink is unwilling to accept additional items. Calling `flush` |
166 | | /// on the buffered sink will attempt to both empty the buffer and complete |
167 | | /// processing on the underlying sink. |
168 | | /// |
169 | | /// Note that this function consumes the given sink, returning a wrapped |
170 | | /// version, much like `Iterator::map`. |
171 | | /// |
172 | | /// This method is only available when the `std` or `alloc` feature of this |
173 | | /// library is activated, and it is activated by default. |
174 | | #[cfg(feature = "alloc")] |
175 | 0 | fn buffer(self, capacity: usize) -> Buffer<Self, Item> |
176 | 0 | where |
177 | 0 | Self: Sized, |
178 | 0 | { |
179 | 0 | assert_sink::<Item, Self::Error, _>(Buffer::new(self, capacity)) |
180 | 0 | } |
181 | | |
182 | | /// Close the sink. |
183 | 0 | fn close(&mut self) -> Close<'_, Self, Item> |
184 | 0 | where |
185 | 0 | Self: Unpin, |
186 | 0 | { |
187 | 0 | assert_future::<Result<(), Self::Error>, _>(Close::new(self)) |
188 | 0 | } |
189 | | |
190 | | /// Fanout items to multiple sinks. |
191 | | /// |
192 | | /// This adapter clones each incoming item and forwards it to both this as well as |
193 | | /// the other sink at the same time. |
194 | 0 | fn fanout<Si>(self, other: Si) -> Fanout<Self, Si> |
195 | 0 | where |
196 | 0 | Self: Sized, |
197 | 0 | Item: Clone, |
198 | 0 | Si: Sink<Item, Error = Self::Error>, |
199 | 0 | { |
200 | 0 | assert_sink::<Item, Self::Error, _>(Fanout::new(self, other)) |
201 | 0 | } |
202 | | |
203 | | /// Flush the sink, processing all pending items. |
204 | | /// |
205 | | /// This adapter is intended to be used when you want to stop sending to the sink |
206 | | /// until all current requests are processed. |
207 | 0 | fn flush(&mut self) -> Flush<'_, Self, Item> |
208 | 0 | where |
209 | 0 | Self: Unpin, |
210 | 0 | { |
211 | 0 | assert_future::<Result<(), Self::Error>, _>(Flush::new(self)) |
212 | 0 | } |
213 | | |
214 | | /// A future that completes after the given item has been fully processed |
215 | | /// into the sink, including flushing. |
216 | | /// |
217 | | /// Note that, **because of the flushing requirement, it is usually better |
218 | | /// to batch together items to send via `feed` or `send_all`, |
219 | | /// rather than flushing between each item.** |
220 | 0 | fn send(&mut self, item: Item) -> Send<'_, Self, Item> |
221 | 0 | where |
222 | 0 | Self: Unpin, |
223 | 0 | { |
224 | 0 | assert_future::<Result<(), Self::Error>, _>(Send::new(self, item)) |
225 | 0 | } |
226 | | |
227 | | /// A future that completes after the given item has been received |
228 | | /// by the sink. |
229 | | /// |
230 | | /// Unlike `send`, the returned future does not flush the sink. |
231 | | /// It is the caller's responsibility to ensure all pending items |
232 | | /// are processed, which can be done via `flush` or `close`. |
233 | 0 | fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> |
234 | 0 | where |
235 | 0 | Self: Unpin, |
236 | 0 | { |
237 | 0 | assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item)) |
238 | 0 | } |
239 | | |
240 | | /// A future that completes after the given stream has been fully processed |
241 | | /// into the sink, including flushing. |
242 | | /// |
243 | | /// This future will drive the stream to keep producing items until it is |
244 | | /// exhausted, sending each item to the sink. It will complete once both the |
245 | | /// stream is exhausted, the sink has received all items, and the sink has |
246 | | /// been flushed. Note that the sink is **not** closed. If the stream produces |
247 | | /// an error, that error will be returned by this future without flushing the sink. |
248 | | /// |
249 | | /// Doing `sink.send_all(stream)` is roughly equivalent to |
250 | | /// `stream.forward(sink)`. The returned future will exhaust all items from |
251 | | /// `stream` and send them to `self`. |
252 | 0 | fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St> |
253 | 0 | where |
254 | 0 | St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized, |
255 | 0 | // St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized, |
256 | 0 | Self: Unpin, |
257 | 0 | { |
258 | 0 | // TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>` |
259 | 0 | // assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream)) |
260 | 0 | SendAll::new(self, stream) |
261 | 0 | } |
262 | | |
263 | | /// Wrap this sink in an `Either` sink, making it the left-hand variant |
264 | | /// of that `Either`. |
265 | | /// |
266 | | /// This can be used in combination with the `right_sink` method to write `if` |
267 | | /// statements that evaluate to different streams in different branches. |
268 | 0 | fn left_sink<Si2>(self) -> Either<Self, Si2> |
269 | 0 | where |
270 | 0 | Si2: Sink<Item, Error = Self::Error>, |
271 | 0 | Self: Sized, |
272 | 0 | { |
273 | 0 | assert_sink::<Item, Self::Error, _>(Either::Left(self)) |
274 | 0 | } |
275 | | |
276 | | /// Wrap this stream in an `Either` stream, making it the right-hand variant |
277 | | /// of that `Either`. |
278 | | /// |
279 | | /// This can be used in combination with the `left_sink` method to write `if` |
280 | | /// statements that evaluate to different streams in different branches. |
281 | 0 | fn right_sink<Si1>(self) -> Either<Si1, Self> |
282 | 0 | where |
283 | 0 | Si1: Sink<Item, Error = Self::Error>, |
284 | 0 | Self: Sized, |
285 | 0 | { |
286 | 0 | assert_sink::<Item, Self::Error, _>(Either::Right(self)) |
287 | 0 | } |
288 | | |
289 | | /// Wraps a [`Sink`] into a sink compatible with libraries using |
290 | | /// futures 0.1 `Sink`. Requires the `compat` feature to be enabled. |
291 | | #[cfg(feature = "compat")] |
292 | | #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] |
293 | | fn compat(self) -> CompatSink<Self, Item> |
294 | | where |
295 | | Self: Sized + Unpin, |
296 | | { |
297 | | CompatSink::new(self) |
298 | | } |
299 | | |
300 | | /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`] |
301 | | /// sink types. |
302 | 0 | fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> |
303 | 0 | where |
304 | 0 | Self: Unpin, |
305 | 0 | { |
306 | 0 | Pin::new(self).poll_ready(cx) |
307 | 0 | } |
308 | | |
309 | | /// A convenience method for calling [`Sink::start_send`] on [`Unpin`] |
310 | | /// sink types. |
311 | 0 | fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error> |
312 | 0 | where |
313 | 0 | Self: Unpin, |
314 | 0 | { |
315 | 0 | Pin::new(self).start_send(item) |
316 | 0 | } |
317 | | |
318 | | /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`] |
319 | | /// sink types. |
320 | 0 | fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> |
321 | 0 | where |
322 | 0 | Self: Unpin, |
323 | 0 | { |
324 | 0 | Pin::new(self).poll_flush(cx) |
325 | 0 | } |
326 | | |
327 | | /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`] |
328 | | /// sink types. |
329 | 0 | fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> |
330 | 0 | where |
331 | 0 | Self: Unpin, |
332 | 0 | { |
333 | 0 | Pin::new(self).poll_close(cx) |
334 | 0 | } |
335 | | } |
336 | | |
337 | | // Just a helper function to ensure the sinks we're returning all have the |
338 | | // right implementations. |
339 | 0 | pub(crate) fn assert_sink<T, E, S>(sink: S) -> S |
340 | 0 | where |
341 | 0 | S: Sink<T, Error = E>, |
342 | 0 | { |
343 | 0 | sink |
344 | 0 | } |