Coverage Report

Created: 2026-02-26 06:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.32/src/io/mod.rs
Line
Count
Source
1
//! Asynchronous I/O.
2
//!
3
//! This module is the asynchronous version of `std::io`. It defines four
4
//! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
5
//! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
6
//! standard library. However, these traits integrate with the asynchronous
7
//! task system, so that if an I/O object isn't ready for reading (or writing),
8
//! the thread is not blocked, and instead the current task is queued to be
9
//! woken when I/O is ready.
10
//!
11
//! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
12
//! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
13
//! for operating with asynchronous I/O objects, including ways to work with
14
//! them using futures, streams and sinks.
15
//!
16
//! This module is only available when the `std` feature of this
17
//! library is activated, and it is activated by default.
18
19
#[cfg(feature = "io-compat")]
20
#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
21
use crate::compat::Compat;
22
use crate::future::assert_future;
23
use crate::stream::assert_stream;
24
use std::{pin::Pin, string::String, vec::Vec};
25
26
// Re-export some types from `std::io` so that users don't have to deal
27
// with conflicts when `use`ing `futures::io` and `std::io`.
28
#[doc(no_inline)]
29
pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
30
31
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
32
33
// used by `BufReader` and `BufWriter`
34
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
35
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
36
37
mod allow_std;
38
pub use self::allow_std::AllowStdIo;
39
40
mod buf_reader;
41
pub use self::buf_reader::{BufReader, SeeKRelative};
42
43
mod buf_writer;
44
pub use self::buf_writer::BufWriter;
45
46
mod line_writer;
47
pub use self::line_writer::LineWriter;
48
49
mod chain;
50
pub use self::chain::Chain;
51
52
mod close;
53
pub use self::close::Close;
54
55
mod copy;
56
pub use self::copy::{copy, Copy};
57
58
mod copy_buf;
59
pub use self::copy_buf::{copy_buf, CopyBuf};
60
61
mod copy_buf_abortable;
62
pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable};
63
64
mod cursor;
65
pub use self::cursor::Cursor;
66
67
mod empty;
68
pub use self::empty::{empty, Empty};
69
70
mod fill_buf;
71
pub use self::fill_buf::FillBuf;
72
73
mod flush;
74
pub use self::flush::Flush;
75
76
#[cfg(feature = "sink")]
77
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
78
mod into_sink;
79
#[cfg(feature = "sink")]
80
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
81
pub use self::into_sink::IntoSink;
82
83
mod lines;
84
pub use self::lines::Lines;
85
86
mod read;
87
pub use self::read::Read;
88
89
mod read_vectored;
90
pub use self::read_vectored::ReadVectored;
91
92
mod read_exact;
93
pub use self::read_exact::ReadExact;
94
95
mod read_line;
96
pub use self::read_line::ReadLine;
97
98
mod read_to_end;
99
pub use self::read_to_end::ReadToEnd;
100
101
mod read_to_string;
102
pub use self::read_to_string::ReadToString;
103
104
mod read_until;
105
pub use self::read_until::ReadUntil;
106
107
mod repeat;
108
pub use self::repeat::{repeat, Repeat};
109
110
mod seek;
111
pub use self::seek::Seek;
112
113
mod sink;
114
pub use self::sink::{sink, Sink};
115
116
mod split;
117
pub use self::split::{ReadHalf, ReuniteError, WriteHalf};
118
119
mod take;
120
pub use self::take::Take;
121
122
mod window;
123
pub use self::window::Window;
124
125
mod write;
126
pub use self::write::Write;
127
128
mod write_vectored;
129
pub use self::write_vectored::WriteVectored;
130
131
mod write_all;
132
pub use self::write_all::WriteAll;
133
134
#[cfg(feature = "write-all-vectored")]
135
mod write_all_vectored;
136
#[cfg(feature = "write-all-vectored")]
137
pub use self::write_all_vectored::WriteAllVectored;
138
139
/// An extension trait which adds utility methods to `AsyncRead` types.
140
pub trait AsyncReadExt: AsyncRead {
141
    /// Creates an adaptor which will chain this stream with another.
142
    ///
143
    /// The returned `AsyncRead` instance will first read all bytes from this object
144
    /// until EOF is encountered. Afterwards the output is equivalent to the
145
    /// output of `next`.
146
    ///
147
    /// # Examples
148
    ///
149
    /// ```
150
    /// # futures::executor::block_on(async {
151
    /// use futures::io::{AsyncReadExt, Cursor};
152
    ///
153
    /// let reader1 = Cursor::new([1, 2, 3, 4]);
154
    /// let reader2 = Cursor::new([5, 6, 7, 8]);
155
    ///
156
    /// let mut reader = reader1.chain(reader2);
157
    /// let mut buffer = Vec::new();
158
    ///
159
    /// // read the value into a Vec.
160
    /// reader.read_to_end(&mut buffer).await?;
161
    /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
162
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
163
    /// ```
164
0
    fn chain<R>(self, next: R) -> Chain<Self, R>
165
0
    where
166
0
        Self: Sized,
167
0
        R: AsyncRead,
168
    {
169
0
        assert_read(Chain::new(self, next))
170
0
    }
171
172
    /// Tries to read some bytes directly into the given `buf` in asynchronous
173
    /// manner, returning a future type.
174
    ///
175
    /// The returned future will resolve to the number of bytes read once the read
176
    /// operation is completed.
177
    ///
178
    /// # Examples
179
    ///
180
    /// ```
181
    /// # futures::executor::block_on(async {
182
    /// use futures::io::{AsyncReadExt, Cursor};
183
    ///
184
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
185
    /// let mut output = [0u8; 5];
186
    ///
187
    /// let bytes = reader.read(&mut output[..]).await?;
188
    ///
189
    /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
190
    /// // reader. In a real system you could get anywhere from 1 to
191
    /// // `output.len()` bytes in a single read.
192
    /// assert_eq!(bytes, 4);
193
    /// assert_eq!(output, [1, 2, 3, 4, 0]);
194
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
195
    /// ```
196
0
    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
197
0
    where
198
0
        Self: Unpin,
199
    {
200
0
        assert_future::<Result<usize>, _>(Read::new(self, buf))
201
0
    }
202
203
    /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
204
    /// IO operations.
205
    ///
206
    /// The returned future will resolve to the number of bytes read once the read
207
    /// operation is completed.
208
0
    fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
209
0
    where
210
0
        Self: Unpin,
211
    {
212
0
        assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
213
0
    }
214
215
    /// Creates a future which will read exactly enough bytes to fill `buf`,
216
    /// returning an error if end of file (EOF) is hit sooner.
217
    ///
218
    /// The returned future will resolve once the read operation is completed.
219
    ///
220
    /// In the case of an error the buffer and the object will be discarded, with
221
    /// the error yielded.
222
    ///
223
    /// # Examples
224
    ///
225
    /// ```
226
    /// # futures::executor::block_on(async {
227
    /// use futures::io::{AsyncReadExt, Cursor};
228
    ///
229
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
230
    /// let mut output = [0u8; 4];
231
    ///
232
    /// reader.read_exact(&mut output).await?;
233
    ///
234
    /// assert_eq!(output, [1, 2, 3, 4]);
235
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
236
    /// ```
237
    ///
238
    /// ## EOF is hit before `buf` is filled
239
    ///
240
    /// ```
241
    /// # futures::executor::block_on(async {
242
    /// use futures::io::{self, AsyncReadExt, Cursor};
243
    ///
244
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
245
    /// let mut output = [0u8; 5];
246
    ///
247
    /// let result = reader.read_exact(&mut output).await;
248
    ///
249
    /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
250
    /// # });
251
    /// ```
252
5.06M
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
253
5.06M
    where
254
5.06M
        Self: Unpin,
255
    {
256
5.06M
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
257
5.06M
    }
Unexecuted instantiation: <_ as futures_util::io::AsyncReadExt>::read_exact
<core::pin::Pin<&mut futures_util::io::buf_reader::BufReader<mediasan_common::sync::AsyncInputAdapter<std::io::cursor::Cursor<&[u8]>>>> as futures_util::io::AsyncReadExt>::read_exact
Line
Count
Source
252
10.8k
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
253
10.8k
    where
254
10.8k
        Self: Unpin,
255
    {
256
10.8k
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
257
10.8k
    }
<core::pin::Pin<&mut &mut core::pin::Pin<&mut futures_util::io::buf_reader::BufReader<mediasan_common::sync::AsyncInputAdapter<std::io::cursor::Cursor<&[u8]>>>>> as futures_util::io::AsyncReadExt>::read_exact
Line
Count
Source
252
254k
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
253
254k
    where
254
254k
        Self: Unpin,
255
    {
256
254k
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
257
254k
    }
<core::pin::Pin<&mut &mut core::pin::Pin<&mut &mut core::pin::Pin<&mut futures_util::io::buf_reader::BufReader<mediasan_common::sync::AsyncInputAdapter<std::io::cursor::Cursor<&[u8]>>>>>> as futures_util::io::AsyncReadExt>::read_exact
Line
Count
Source
252
253k
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
253
253k
    where
254
253k
        Self: Unpin,
255
    {
256
253k
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
257
253k
    }
<core::pin::Pin<&mut mediasan_common::sync::AsyncInputAdapter<bytes::buf::reader::Reader<&mut &mut bytes::bytes_mut::BytesMut>>> as futures_util::io::AsyncReadExt>::read_exact
Line
Count
Source
252
2.31M
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
253
2.31M
    where
254
2.31M
        Self: Unpin,
255
    {
256
2.31M
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
257
2.31M
    }
<core::pin::Pin<&mut &mut core::pin::Pin<&mut mediasan_common::sync::AsyncInputAdapter<bytes::buf::reader::Reader<&mut &mut bytes::bytes_mut::BytesMut>>>> as futures_util::io::AsyncReadExt>::read_exact
Line
Count
Source
252
2.23M
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
253
2.23M
    where
254
2.23M
        Self: Unpin,
255
    {
256
2.23M
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
257
2.23M
    }
258
259
    /// Creates a future which will read all the bytes from this `AsyncRead`.
260
    ///
261
    /// On success the total number of bytes read is returned.
262
    ///
263
    /// # Examples
264
    ///
265
    /// ```
266
    /// # futures::executor::block_on(async {
267
    /// use futures::io::{AsyncReadExt, Cursor};
268
    ///
269
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
270
    /// let mut output = Vec::with_capacity(4);
271
    ///
272
    /// let bytes = reader.read_to_end(&mut output).await?;
273
    ///
274
    /// assert_eq!(bytes, 4);
275
    /// assert_eq!(output, vec![1, 2, 3, 4]);
276
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
277
    /// ```
278
0
    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
279
0
    where
280
0
        Self: Unpin,
281
    {
282
0
        assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
283
0
    }
284
285
    /// Creates a future which will read all the bytes from this `AsyncRead`.
286
    ///
287
    /// On success the total number of bytes read is returned.
288
    ///
289
    /// # Examples
290
    ///
291
    /// ```
292
    /// # futures::executor::block_on(async {
293
    /// use futures::io::{AsyncReadExt, Cursor};
294
    ///
295
    /// let mut reader = Cursor::new(&b"1234"[..]);
296
    /// let mut buffer = String::with_capacity(4);
297
    ///
298
    /// let bytes = reader.read_to_string(&mut buffer).await?;
299
    ///
300
    /// assert_eq!(bytes, 4);
301
    /// assert_eq!(buffer, String::from("1234"));
302
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
303
    /// ```
304
0
    fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self>
305
0
    where
306
0
        Self: Unpin,
307
    {
308
0
        assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
309
0
    }
310
311
    /// Helper method for splitting this read/write object into two halves.
312
    ///
313
    /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
314
    /// traits, respectively.
315
    ///
316
    /// # Examples
317
    ///
318
    /// ```
319
    /// # futures::executor::block_on(async {
320
    /// use futures::io::{self, AsyncReadExt, Cursor};
321
    ///
322
    /// // Note that for `Cursor` the read and write halves share a single
323
    /// // seek position. This may or may not be true for other types that
324
    /// // implement both `AsyncRead` and `AsyncWrite`.
325
    ///
326
    /// let reader = Cursor::new([1, 2, 3, 4]);
327
    /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
328
    /// let mut writer = Cursor::new(vec![0u8; 5]);
329
    ///
330
    /// {
331
    ///     let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
332
    ///     io::copy(reader, &mut buffer_writer).await?;
333
    ///     io::copy(buffer_reader, &mut writer).await?;
334
    /// }
335
    ///
336
    /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
337
    /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
338
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
339
    /// ```
340
0
    fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
341
0
    where
342
0
        Self: AsyncWrite + Sized,
343
    {
344
0
        let (r, w) = split::split(self);
345
0
        (assert_read(r), assert_write(w))
346
0
    }
347
348
    /// Creates an AsyncRead adapter which will read at most `limit` bytes
349
    /// from the underlying reader.
350
    ///
351
    /// # Examples
352
    ///
353
    /// ```
354
    /// # futures::executor::block_on(async {
355
    /// use futures::io::{AsyncReadExt, Cursor};
356
    ///
357
    /// let reader = Cursor::new(&b"12345678"[..]);
358
    /// let mut buffer = [0; 5];
359
    ///
360
    /// let mut take = reader.take(4);
361
    /// let n = take.read(&mut buffer).await?;
362
    ///
363
    /// assert_eq!(n, 4);
364
    /// assert_eq!(&buffer, b"1234\0");
365
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
366
    /// ```
367
0
    fn take(self, limit: u64) -> Take<Self>
368
0
    where
369
0
        Self: Sized,
370
    {
371
0
        assert_read(Take::new(self, limit))
372
0
    }
373
374
    /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
375
    /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
376
    /// implements [`AsyncWrite`] as well, the result will also implement the
377
    /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
378
    ///
379
    /// Requires the `io-compat` feature to enable.
380
    #[cfg(feature = "io-compat")]
381
    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
382
    fn compat(self) -> Compat<Self>
383
    where
384
        Self: Sized + Unpin,
385
    {
386
        Compat::new(self)
387
    }
388
}
389
390
impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
391
392
/// An extension trait which adds utility methods to `AsyncWrite` types.
393
pub trait AsyncWriteExt: AsyncWrite {
394
    /// Creates a future which will entirely flush this `AsyncWrite`.
395
    ///
396
    /// # Examples
397
    ///
398
    /// ```
399
    /// # futures::executor::block_on(async {
400
    /// use futures::io::{AllowStdIo, AsyncWriteExt};
401
    /// use std::io::{BufWriter, Cursor};
402
    ///
403
    /// let mut output = vec![0u8; 5];
404
    ///
405
    /// {
406
    ///     let writer = Cursor::new(&mut output);
407
    ///     let mut buffered = AllowStdIo::new(BufWriter::new(writer));
408
    ///     buffered.write_all(&[1, 2]).await?;
409
    ///     buffered.write_all(&[3, 4]).await?;
410
    ///     buffered.flush().await?;
411
    /// }
412
    ///
413
    /// assert_eq!(output, [1, 2, 3, 4, 0]);
414
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
415
    /// ```
416
0
    fn flush(&mut self) -> Flush<'_, Self>
417
0
    where
418
0
        Self: Unpin,
419
    {
420
0
        assert_future::<Result<()>, _>(Flush::new(self))
421
0
    }
422
423
    /// Creates a future which will entirely close this `AsyncWrite`.
424
0
    fn close(&mut self) -> Close<'_, Self>
425
0
    where
426
0
        Self: Unpin,
427
    {
428
0
        assert_future::<Result<()>, _>(Close::new(self))
429
0
    }
430
431
    /// Creates a future which will write bytes from `buf` into the object.
432
    ///
433
    /// The returned future will resolve to the number of bytes written once the write
434
    /// operation is completed.
435
0
    fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
436
0
    where
437
0
        Self: Unpin,
438
    {
439
0
        assert_future::<Result<usize>, _>(Write::new(self, buf))
440
0
    }
441
442
    /// Creates a future which will write bytes from `bufs` into the object using vectored
443
    /// IO operations.
444
    ///
445
    /// The returned future will resolve to the number of bytes written once the write
446
    /// operation is completed.
447
0
    fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
448
0
    where
449
0
        Self: Unpin,
450
    {
451
0
        assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
452
0
    }
453
454
    /// Write data into this object.
455
    ///
456
    /// Creates a future that will write the entire contents of the buffer `buf` into
457
    /// this `AsyncWrite`.
458
    ///
459
    /// The returned future will not complete until all the data has been written.
460
    ///
461
    /// # Examples
462
    ///
463
    /// ```
464
    /// # futures::executor::block_on(async {
465
    /// use futures::io::{AsyncWriteExt, Cursor};
466
    ///
467
    /// let mut writer = Cursor::new(vec![0u8; 5]);
468
    ///
469
    /// writer.write_all(&[1, 2, 3, 4]).await?;
470
    ///
471
    /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
472
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
473
    /// ```
474
0
    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
475
0
    where
476
0
        Self: Unpin,
477
    {
478
0
        assert_future::<Result<()>, _>(WriteAll::new(self, buf))
479
0
    }
480
481
    /// Attempts to write multiple buffers into this writer.
482
    ///
483
    /// Creates a future that will write the entire contents of `bufs` into this
484
    /// `AsyncWrite` using [vectored writes].
485
    ///
486
    /// The returned future will not complete until all the data has been
487
    /// written.
488
    ///
489
    /// [vectored writes]: std::io::Write::write_vectored
490
    ///
491
    /// # Notes
492
    ///
493
    /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
494
    /// a slice of `IoSlice`s, not an immutable one. That's because we need to
495
    /// modify the slice to keep track of the bytes already written.
496
    ///
497
    /// Once this futures returns, the contents of `bufs` are unspecified, as
498
    /// this depends on how many calls to `write_vectored` were necessary. It is
499
    /// best to understand this function as taking ownership of `bufs` and to
500
    /// not use `bufs` afterwards. The underlying buffers, to which the
501
    /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
502
    /// can be reused.
503
    ///
504
    /// # Examples
505
    ///
506
    /// ```
507
    /// # futures::executor::block_on(async {
508
    /// use futures::io::AsyncWriteExt;
509
    /// use futures_util::io::Cursor;
510
    /// use std::io::IoSlice;
511
    ///
512
    /// let mut writer = Cursor::new(Vec::new());
513
    /// let bufs = &mut [
514
    ///     IoSlice::new(&[1]),
515
    ///     IoSlice::new(&[2, 3]),
516
    ///     IoSlice::new(&[4, 5, 6]),
517
    /// ];
518
    ///
519
    /// writer.write_all_vectored(bufs).await?;
520
    /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
521
    ///
522
    /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
523
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
524
    /// ```
525
    #[cfg(feature = "write-all-vectored")]
526
    fn write_all_vectored<'a>(
527
        &'a mut self,
528
        bufs: &'a mut [IoSlice<'a>],
529
    ) -> WriteAllVectored<'a, Self>
530
    where
531
        Self: Unpin,
532
    {
533
        assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
534
    }
535
536
    /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
537
    /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
538
    /// Requires the `io-compat` feature to enable.
539
    #[cfg(feature = "io-compat")]
540
    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
541
    fn compat_write(self) -> Compat<Self>
542
    where
543
        Self: Sized + Unpin,
544
    {
545
        Compat::new(self)
546
    }
547
548
    /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
549
    ///
550
    /// This adapter produces a sink that will write each value passed to it
551
    /// into the underlying writer.
552
    ///
553
    /// Note that this function consumes the given writer, returning a wrapped
554
    /// version.
555
    ///
556
    /// # Examples
557
    ///
558
    /// ```
559
    /// # futures::executor::block_on(async {
560
    /// use futures::io::AsyncWriteExt;
561
    /// use futures::stream::{self, StreamExt};
562
    ///
563
    /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
564
    ///
565
    /// let mut writer = vec![];
566
    ///
567
    /// stream.forward((&mut writer).into_sink()).await?;
568
    ///
569
    /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
570
    /// # Ok::<(), Box<dyn std::error::Error>>(())
571
    /// # })?;
572
    /// # Ok::<(), Box<dyn std::error::Error>>(())
573
    /// ```
574
    #[cfg(feature = "sink")]
575
    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
576
    fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
577
    where
578
        Self: Sized,
579
    {
580
        crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
581
    }
582
}
583
584
impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
585
586
/// An extension trait which adds utility methods to `AsyncSeek` types.
587
pub trait AsyncSeekExt: AsyncSeek {
588
    /// Creates a future which will seek an IO object, and then yield the
589
    /// new position in the object and the object itself.
590
    ///
591
    /// In the case of an error the buffer and the object will be discarded, with
592
    /// the error yielded.
593
0
    fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
594
0
    where
595
0
        Self: Unpin,
596
    {
597
0
        assert_future::<Result<u64>, _>(Seek::new(self, pos))
598
0
    }
599
600
    /// Creates a future which will return the current seek position from the
601
    /// start of the stream.
602
    ///
603
    /// This is equivalent to `self.seek(SeekFrom::Current(0))`.
604
0
    fn stream_position(&mut self) -> Seek<'_, Self>
605
0
    where
606
0
        Self: Unpin,
607
    {
608
0
        self.seek(SeekFrom::Current(0))
609
0
    }
610
}
611
612
impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
613
614
/// An extension trait which adds utility methods to `AsyncBufRead` types.
615
pub trait AsyncBufReadExt: AsyncBufRead {
616
    /// Creates a future which will wait for a non-empty buffer to be available from this I/O
617
    /// object or EOF to be reached.
618
    ///
619
    /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
620
    ///
621
    /// ```rust
622
    /// # futures::executor::block_on(async {
623
    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
624
    ///
625
    /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
626
    ///
627
    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
628
    /// stream.consume_unpin(2);
629
    ///
630
    /// assert_eq!(stream.fill_buf().await?, vec![3]);
631
    /// stream.consume_unpin(1);
632
    ///
633
    /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
634
    /// stream.consume_unpin(3);
635
    ///
636
    /// assert_eq!(stream.fill_buf().await?, vec![]);
637
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
638
    /// ```
639
255k
    fn fill_buf(&mut self) -> FillBuf<'_, Self>
640
255k
    where
641
255k
        Self: Unpin,
642
    {
643
255k
        assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
644
255k
    }
Unexecuted instantiation: <_ as futures_util::io::AsyncBufReadExt>::fill_buf
<core::pin::Pin<&mut futures_util::io::buf_reader::BufReader<mediasan_common::sync::AsyncInputAdapter<std::io::cursor::Cursor<&[u8]>>>> as futures_util::io::AsyncBufReadExt>::fill_buf
Line
Count
Source
639
255k
    fn fill_buf(&mut self) -> FillBuf<'_, Self>
640
255k
    where
641
255k
        Self: Unpin,
642
    {
643
255k
        assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
644
255k
    }
645
646
    /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
647
    ///
648
    /// ```rust
649
    /// # futures::executor::block_on(async {
650
    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
651
    ///
652
    /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
653
    ///
654
    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
655
    /// stream.consume_unpin(2);
656
    ///
657
    /// assert_eq!(stream.fill_buf().await?, vec![3]);
658
    /// stream.consume_unpin(1);
659
    ///
660
    /// assert_eq!(stream.fill_buf().await?, vec![]);
661
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
662
    /// ```
663
0
    fn consume_unpin(&mut self, amt: usize)
664
0
    where
665
0
        Self: Unpin,
666
    {
667
0
        Pin::new(self).consume(amt)
668
0
    }
669
670
    /// Creates a future which will read all the bytes associated with this I/O
671
    /// object into `buf` until the delimiter `byte` or EOF is reached.
672
    /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
673
    ///
674
    /// This function will read bytes from the underlying stream until the
675
    /// delimiter or EOF is found. Once found, all bytes up to, and including,
676
    /// the delimiter (if found) will be appended to `buf`.
677
    ///
678
    /// The returned future will resolve to the number of bytes read once the read
679
    /// operation is completed.
680
    ///
681
    /// In the case of an error the buffer and the object will be discarded, with
682
    /// the error yielded.
683
    ///
684
    /// # Examples
685
    ///
686
    /// ```
687
    /// # futures::executor::block_on(async {
688
    /// use futures::io::{AsyncBufReadExt, Cursor};
689
    ///
690
    /// let mut cursor = Cursor::new(b"lorem-ipsum");
691
    /// let mut buf = vec![];
692
    ///
693
    /// // cursor is at 'l'
694
    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
695
    /// assert_eq!(num_bytes, 6);
696
    /// assert_eq!(buf, b"lorem-");
697
    /// buf.clear();
698
    ///
699
    /// // cursor is at 'i'
700
    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
701
    /// assert_eq!(num_bytes, 5);
702
    /// assert_eq!(buf, b"ipsum");
703
    /// buf.clear();
704
    ///
705
    /// // cursor is at EOF
706
    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
707
    /// assert_eq!(num_bytes, 0);
708
    /// assert_eq!(buf, b"");
709
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
710
    /// ```
711
0
    fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
712
0
    where
713
0
        Self: Unpin,
714
    {
715
0
        assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
716
0
    }
717
718
    /// Creates a future which will read all the bytes associated with this I/O
719
    /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
720
    /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
721
    ///
722
    /// This function will read bytes from the underlying stream until the
723
    /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
724
    /// up to, and including, the delimiter (if found) will be appended to
725
    /// `buf`.
726
    ///
727
    /// The returned future will resolve to the number of bytes read once the read
728
    /// operation is completed.
729
    ///
730
    /// In the case of an error the buffer and the object will be discarded, with
731
    /// the error yielded.
732
    ///
733
    /// # Errors
734
    ///
735
    /// This function has the same error semantics as [`read_until`] and will
736
    /// also return an error if the read bytes are not valid UTF-8. If an I/O
737
    /// error is encountered then `buf` may contain some bytes already read in
738
    /// the event that all data read so far was valid UTF-8.
739
    ///
740
    /// [`read_until`]: AsyncBufReadExt::read_until
741
    ///
742
    /// # Examples
743
    ///
744
    /// ```
745
    /// # futures::executor::block_on(async {
746
    /// use futures::io::{AsyncBufReadExt, Cursor};
747
    ///
748
    /// let mut cursor = Cursor::new(b"foo\nbar");
749
    /// let mut buf = String::new();
750
    ///
751
    /// // cursor is at 'f'
752
    /// let num_bytes = cursor.read_line(&mut buf).await?;
753
    /// assert_eq!(num_bytes, 4);
754
    /// assert_eq!(buf, "foo\n");
755
    /// buf.clear();
756
    ///
757
    /// // cursor is at 'b'
758
    /// let num_bytes = cursor.read_line(&mut buf).await?;
759
    /// assert_eq!(num_bytes, 3);
760
    /// assert_eq!(buf, "bar");
761
    /// buf.clear();
762
    ///
763
    /// // cursor is at EOF
764
    /// let num_bytes = cursor.read_line(&mut buf).await?;
765
    /// assert_eq!(num_bytes, 0);
766
    /// assert_eq!(buf, "");
767
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
768
    /// ```
769
0
    fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
770
0
    where
771
0
        Self: Unpin,
772
    {
773
0
        assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
774
0
    }
775
776
    /// Returns a stream over the lines of this reader.
777
    /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
778
    ///
779
    /// The stream returned from this function will yield instances of
780
    /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
781
    /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
782
    ///
783
    /// [`io::Result`]: std::io::Result
784
    /// [`String`]: String
785
    ///
786
    /// # Errors
787
    ///
788
    /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
789
    ///
790
    /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
791
    ///
792
    /// # Examples
793
    ///
794
    /// ```
795
    /// # futures::executor::block_on(async {
796
    /// use futures::io::{AsyncBufReadExt, Cursor};
797
    /// use futures::stream::StreamExt;
798
    ///
799
    /// let cursor = Cursor::new(b"lorem\nipsum\xc2\r\ndolor");
800
    ///
801
    /// let mut lines_stream = cursor.lines().map(|l| l.unwrap_or(String::from("invalid UTF_8")));
802
    /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
803
    /// assert_eq!(lines_stream.next().await, Some(String::from("invalid UTF_8")));
804
    /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
805
    /// assert_eq!(lines_stream.next().await, None);
806
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
807
    /// ```
808
0
    fn lines(self) -> Lines<Self>
809
0
    where
810
0
        Self: Sized,
811
    {
812
0
        assert_stream::<Result<String>, _>(Lines::new(self))
813
0
    }
814
}
815
816
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
817
818
// Just a helper function to ensure the reader we're returning all have the
819
// right implementations.
820
0
pub(crate) fn assert_read<R>(reader: R) -> R
821
0
where
822
0
    R: AsyncRead,
823
{
824
0
    reader
825
0
}
826
// Just a helper function to ensure the writer we're returning all have the
827
// right implementations.
828
0
pub(crate) fn assert_write<W>(writer: W) -> W
829
0
where
830
0
    W: AsyncWrite,
831
{
832
0
    writer
833
0
}