Coverage Report

Created: 2025-06-24 06:17

/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/sink/mod.rs
Line
Count
Source (jump to first uncovered line)
1
//! Asynchronous sinks.
2
//!
3
//! This module contains:
4
//!
5
//! - The [`Sink`] trait, which allows you to asynchronously write data.
6
//! - The [`SinkExt`] trait, which provides adapters for chaining and composing
7
//!   sinks.
8
9
use crate::future::{assert_future, Either};
10
use core::pin::Pin;
11
use futures_core::future::Future;
12
use futures_core::stream::{Stream, TryStream};
13
use futures_core::task::{Context, Poll};
14
15
#[cfg(feature = "compat")]
16
use crate::compat::CompatSink;
17
18
pub use futures_sink::Sink;
19
20
mod close;
21
pub use self::close::Close;
22
23
mod drain;
24
pub use self::drain::{drain, Drain};
25
26
mod fanout;
27
pub use self::fanout::Fanout;
28
29
mod feed;
30
pub use self::feed::Feed;
31
32
mod flush;
33
pub use self::flush::Flush;
34
35
mod err_into;
36
pub use self::err_into::SinkErrInto;
37
38
mod map_err;
39
pub use self::map_err::SinkMapErr;
40
41
mod send;
42
pub use self::send::Send;
43
44
mod send_all;
45
pub use self::send_all::SendAll;
46
47
mod unfold;
48
pub use self::unfold::{unfold, Unfold};
49
50
mod with;
51
pub use self::with::With;
52
53
mod with_flat_map;
54
pub use self::with_flat_map::WithFlatMap;
55
56
#[cfg(feature = "alloc")]
57
mod buffer;
58
#[cfg(feature = "alloc")]
59
pub use self::buffer::Buffer;
60
61
impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}
62
63
/// An extension trait for `Sink`s that provides a variety of convenient
64
/// combinator functions.
65
pub trait SinkExt<Item>: Sink<Item> {
66
    /// Composes a function *in front of* the sink.
67
    ///
68
    /// This adapter produces a new sink that passes each value through the
69
    /// given function `f` before sending it to `self`.
70
    ///
71
    /// To process each value, `f` produces a *future*, which is then polled to
72
    /// completion before passing its result down to the underlying sink. If the
73
    /// future produces an error, that error is returned by the new sink.
74
    ///
75
    /// Note that this function consumes the given sink, returning a wrapped
76
    /// version, much like `Iterator::map`.
77
0
    fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
78
0
    where
79
0
        F: FnMut(U) -> Fut,
80
0
        Fut: Future<Output = Result<Item, E>>,
81
0
        E: From<Self::Error>,
82
0
        Self: Sized,
83
0
    {
84
0
        assert_sink::<U, E, _>(With::new(self, f))
85
0
    }
86
87
    /// Composes a function *in front of* the sink.
88
    ///
89
    /// This adapter produces a new sink that passes each value through the
90
    /// given function `f` before sending it to `self`.
91
    ///
92
    /// To process each value, `f` produces a *stream*, of which each value
93
    /// is passed to the underlying sink. A new value will not be accepted until
94
    /// the stream has been drained
95
    ///
96
    /// Note that this function consumes the given sink, returning a wrapped
97
    /// version, much like `Iterator::flat_map`.
98
    ///
99
    /// # Examples
100
    ///
101
    /// ```
102
    /// # futures::executor::block_on(async {
103
    /// use futures::channel::mpsc;
104
    /// use futures::sink::SinkExt;
105
    /// use futures::stream::{self, StreamExt};
106
    ///
107
    /// let (tx, rx) = mpsc::channel(5);
108
    ///
109
    /// let mut tx = tx.with_flat_map(|x| {
110
    ///     stream::iter(vec![Ok(42); x])
111
    /// });
112
    ///
113
    /// tx.send(5).await.unwrap();
114
    /// drop(tx);
115
    /// let received: Vec<i32> = rx.collect().await;
116
    /// assert_eq!(received, vec![42, 42, 42, 42, 42]);
117
    /// # });
118
    /// ```
119
0
    fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
120
0
    where
121
0
        F: FnMut(U) -> St,
122
0
        St: Stream<Item = Result<Item, Self::Error>>,
123
0
        Self: Sized,
124
0
    {
125
0
        assert_sink::<U, Self::Error, _>(WithFlatMap::new(self, f))
126
0
    }
127
128
    /*
129
    fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
130
        where F: FnMut(U) -> Self::SinkItem,
131
              Self: Sized;
132
133
    fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
134
        where F: FnMut(Self::SinkItem) -> bool,
135
              Self: Sized;
136
137
    fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
138
        where F: FnMut(U) -> Option<Self::SinkItem>,
139
              Self: Sized;
140
     */
141
142
    /// Transforms the error returned by the sink.
143
0
    fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
144
0
    where
145
0
        F: FnOnce(Self::Error) -> E,
146
0
        Self: Sized,
147
0
    {
148
0
        assert_sink::<Item, E, _>(SinkMapErr::new(self, f))
149
0
    }
150
151
    /// Map this sink's error to a different error type using the `Into` trait.
152
    ///
153
    /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
154
0
    fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
155
0
    where
156
0
        Self: Sized,
157
0
        Self::Error: Into<E>,
158
0
    {
159
0
        assert_sink::<Item, E, _>(SinkErrInto::new(self))
160
0
    }
161
162
    /// Adds a fixed-size buffer to the current sink.
163
    ///
164
    /// The resulting sink will buffer up to `capacity` items when the
165
    /// underlying sink is unwilling to accept additional items. Calling `flush`
166
    /// on the buffered sink will attempt to both empty the buffer and complete
167
    /// processing on the underlying sink.
168
    ///
169
    /// Note that this function consumes the given sink, returning a wrapped
170
    /// version, much like `Iterator::map`.
171
    ///
172
    /// This method is only available when the `std` or `alloc` feature of this
173
    /// library is activated, and it is activated by default.
174
    #[cfg(feature = "alloc")]
175
0
    fn buffer(self, capacity: usize) -> Buffer<Self, Item>
176
0
    where
177
0
        Self: Sized,
178
0
    {
179
0
        assert_sink::<Item, Self::Error, _>(Buffer::new(self, capacity))
180
0
    }
181
182
    /// Close the sink.
183
0
    fn close(&mut self) -> Close<'_, Self, Item>
184
0
    where
185
0
        Self: Unpin,
186
0
    {
187
0
        assert_future::<Result<(), Self::Error>, _>(Close::new(self))
188
0
    }
189
190
    /// Fanout items to multiple sinks.
191
    ///
192
    /// This adapter clones each incoming item and forwards it to both this as well as
193
    /// the other sink at the same time.
194
0
    fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
195
0
    where
196
0
        Self: Sized,
197
0
        Item: Clone,
198
0
        Si: Sink<Item, Error = Self::Error>,
199
0
    {
200
0
        assert_sink::<Item, Self::Error, _>(Fanout::new(self, other))
201
0
    }
202
203
    /// Flush the sink, processing all pending items.
204
    ///
205
    /// This adapter is intended to be used when you want to stop sending to the sink
206
    /// until all current requests are processed.
207
0
    fn flush(&mut self) -> Flush<'_, Self, Item>
208
0
    where
209
0
        Self: Unpin,
210
0
    {
211
0
        assert_future::<Result<(), Self::Error>, _>(Flush::new(self))
212
0
    }
213
214
    /// A future that completes after the given item has been fully processed
215
    /// into the sink, including flushing.
216
    ///
217
    /// Note that, **because of the flushing requirement, it is usually better
218
    /// to batch together items to send via `feed` or `send_all`,
219
    /// rather than flushing between each item.**
220
0
    fn send(&mut self, item: Item) -> Send<'_, Self, Item>
221
0
    where
222
0
        Self: Unpin,
223
0
    {
224
0
        assert_future::<Result<(), Self::Error>, _>(Send::new(self, item))
225
0
    }
226
227
    /// A future that completes after the given item has been received
228
    /// by the sink.
229
    ///
230
    /// Unlike `send`, the returned future does not flush the sink.
231
    /// It is the caller's responsibility to ensure all pending items
232
    /// are processed, which can be done via `flush` or `close`.
233
0
    fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
234
0
    where
235
0
        Self: Unpin,
236
0
    {
237
0
        assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item))
238
0
    }
239
240
    /// A future that completes after the given stream has been fully processed
241
    /// into the sink, including flushing.
242
    ///
243
    /// This future will drive the stream to keep producing items until it is
244
    /// exhausted, sending each item to the sink. It will complete once both the
245
    /// stream is exhausted, the sink has received all items, and the sink has
246
    /// been flushed. Note that the sink is **not** closed. If the stream produces
247
    /// an error, that error will be returned by this future without flushing the sink.
248
    ///
249
    /// Doing `sink.send_all(stream)` is roughly equivalent to
250
    /// `stream.forward(sink)`. The returned future will exhaust all items from
251
    /// `stream` and send them to `self`.
252
0
    fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
253
0
    where
254
0
        St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
255
0
        // St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized,
256
0
        Self: Unpin,
257
0
    {
258
0
        // TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>`
259
0
        // assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream))
260
0
        SendAll::new(self, stream)
261
0
    }
262
263
    /// Wrap this sink in an `Either` sink, making it the left-hand variant
264
    /// of that `Either`.
265
    ///
266
    /// This can be used in combination with the `right_sink` method to write `if`
267
    /// statements that evaluate to different streams in different branches.
268
0
    fn left_sink<Si2>(self) -> Either<Self, Si2>
269
0
    where
270
0
        Si2: Sink<Item, Error = Self::Error>,
271
0
        Self: Sized,
272
0
    {
273
0
        assert_sink::<Item, Self::Error, _>(Either::Left(self))
274
0
    }
275
276
    /// Wrap this stream in an `Either` stream, making it the right-hand variant
277
    /// of that `Either`.
278
    ///
279
    /// This can be used in combination with the `left_sink` method to write `if`
280
    /// statements that evaluate to different streams in different branches.
281
0
    fn right_sink<Si1>(self) -> Either<Si1, Self>
282
0
    where
283
0
        Si1: Sink<Item, Error = Self::Error>,
284
0
        Self: Sized,
285
0
    {
286
0
        assert_sink::<Item, Self::Error, _>(Either::Right(self))
287
0
    }
288
289
    /// Wraps a [`Sink`] into a sink compatible with libraries using
290
    /// futures 0.1 `Sink`. Requires the `compat` feature to be enabled.
291
    #[cfg(feature = "compat")]
292
    #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
293
    fn compat(self) -> CompatSink<Self, Item>
294
    where
295
        Self: Sized + Unpin,
296
    {
297
        CompatSink::new(self)
298
    }
299
300
    /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
301
    /// sink types.
302
0
    fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
303
0
    where
304
0
        Self: Unpin,
305
0
    {
306
0
        Pin::new(self).poll_ready(cx)
307
0
    }
308
309
    /// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
310
    /// sink types.
311
0
    fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
312
0
    where
313
0
        Self: Unpin,
314
0
    {
315
0
        Pin::new(self).start_send(item)
316
0
    }
317
318
    /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
319
    /// sink types.
320
0
    fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
321
0
    where
322
0
        Self: Unpin,
323
0
    {
324
0
        Pin::new(self).poll_flush(cx)
325
0
    }
326
327
    /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
328
    /// sink types.
329
0
    fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
330
0
    where
331
0
        Self: Unpin,
332
0
    {
333
0
        Pin::new(self).poll_close(cx)
334
0
    }
335
}
336
337
// Just a helper function to ensure the sinks we're returning all have the
338
// right implementations.
339
0
pub(crate) fn assert_sink<T, E, S>(sink: S) -> S
340
0
where
341
0
    S: Sink<T, Error = E>,
342
0
{
343
0
    sink
344
0
}