Coverage Report

Created: 2025-07-23 06:46

/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.31/src/io/mod.rs
Line
Count
Source (jump to first uncovered line)
1
//! Asynchronous I/O.
2
//!
3
//! This module is the asynchronous version of `std::io`. It defines four
4
//! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
5
//! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
6
//! standard library. However, these traits integrate with the asynchronous
7
//! task system, so that if an I/O object isn't ready for reading (or writing),
8
//! the thread is not blocked, and instead the current task is queued to be
9
//! woken when I/O is ready.
10
//!
11
//! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
12
//! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
13
//! for operating with asynchronous I/O objects, including ways to work with
14
//! them using futures, streams and sinks.
15
//!
16
//! This module is only available when the `std` feature of this
17
//! library is activated, and it is activated by default.
18
19
#[cfg(feature = "io-compat")]
20
#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
21
use crate::compat::Compat;
22
use crate::future::assert_future;
23
use crate::stream::assert_stream;
24
use std::{pin::Pin, ptr, string::String, vec::Vec};
25
26
// Re-export some types from `std::io` so that users don't have to deal
27
// with conflicts when `use`ing `futures::io` and `std::io`.
28
#[doc(no_inline)]
29
pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
30
31
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
32
33
// used by `BufReader` and `BufWriter`
34
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
35
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
36
37
/// Initializes a buffer if necessary.
38
///
39
/// A buffer is currently always initialized.
40
#[inline]
41
0
unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
42
0
    unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) }
43
0
}
44
45
mod allow_std;
46
pub use self::allow_std::AllowStdIo;
47
48
mod buf_reader;
49
pub use self::buf_reader::{BufReader, SeeKRelative};
50
51
mod buf_writer;
52
pub use self::buf_writer::BufWriter;
53
54
mod line_writer;
55
pub use self::line_writer::LineWriter;
56
57
mod chain;
58
pub use self::chain::Chain;
59
60
mod close;
61
pub use self::close::Close;
62
63
mod copy;
64
pub use self::copy::{copy, Copy};
65
66
mod copy_buf;
67
pub use self::copy_buf::{copy_buf, CopyBuf};
68
69
mod copy_buf_abortable;
70
pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable};
71
72
mod cursor;
73
pub use self::cursor::Cursor;
74
75
mod empty;
76
pub use self::empty::{empty, Empty};
77
78
mod fill_buf;
79
pub use self::fill_buf::FillBuf;
80
81
mod flush;
82
pub use self::flush::Flush;
83
84
#[cfg(feature = "sink")]
85
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
86
mod into_sink;
87
#[cfg(feature = "sink")]
88
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
89
pub use self::into_sink::IntoSink;
90
91
mod lines;
92
pub use self::lines::Lines;
93
94
mod read;
95
pub use self::read::Read;
96
97
mod read_vectored;
98
pub use self::read_vectored::ReadVectored;
99
100
mod read_exact;
101
pub use self::read_exact::ReadExact;
102
103
mod read_line;
104
pub use self::read_line::ReadLine;
105
106
mod read_to_end;
107
pub use self::read_to_end::ReadToEnd;
108
109
mod read_to_string;
110
pub use self::read_to_string::ReadToString;
111
112
mod read_until;
113
pub use self::read_until::ReadUntil;
114
115
mod repeat;
116
pub use self::repeat::{repeat, Repeat};
117
118
mod seek;
119
pub use self::seek::Seek;
120
121
mod sink;
122
pub use self::sink::{sink, Sink};
123
124
mod split;
125
pub use self::split::{ReadHalf, ReuniteError, WriteHalf};
126
127
mod take;
128
pub use self::take::Take;
129
130
mod window;
131
pub use self::window::Window;
132
133
mod write;
134
pub use self::write::Write;
135
136
mod write_vectored;
137
pub use self::write_vectored::WriteVectored;
138
139
mod write_all;
140
pub use self::write_all::WriteAll;
141
142
#[cfg(feature = "write-all-vectored")]
143
mod write_all_vectored;
144
#[cfg(feature = "write-all-vectored")]
145
pub use self::write_all_vectored::WriteAllVectored;
146
147
/// An extension trait which adds utility methods to `AsyncRead` types.
148
pub trait AsyncReadExt: AsyncRead {
149
    /// Creates an adaptor which will chain this stream with another.
150
    ///
151
    /// The returned `AsyncRead` instance will first read all bytes from this object
152
    /// until EOF is encountered. Afterwards the output is equivalent to the
153
    /// output of `next`.
154
    ///
155
    /// # Examples
156
    ///
157
    /// ```
158
    /// # futures::executor::block_on(async {
159
    /// use futures::io::{AsyncReadExt, Cursor};
160
    ///
161
    /// let reader1 = Cursor::new([1, 2, 3, 4]);
162
    /// let reader2 = Cursor::new([5, 6, 7, 8]);
163
    ///
164
    /// let mut reader = reader1.chain(reader2);
165
    /// let mut buffer = Vec::new();
166
    ///
167
    /// // read the value into a Vec.
168
    /// reader.read_to_end(&mut buffer).await?;
169
    /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
170
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
171
    /// ```
172
0
    fn chain<R>(self, next: R) -> Chain<Self, R>
173
0
    where
174
0
        Self: Sized,
175
0
        R: AsyncRead,
176
0
    {
177
0
        assert_read(Chain::new(self, next))
178
0
    }
179
180
    /// Tries to read some bytes directly into the given `buf` in asynchronous
181
    /// manner, returning a future type.
182
    ///
183
    /// The returned future will resolve to the number of bytes read once the read
184
    /// operation is completed.
185
    ///
186
    /// # Examples
187
    ///
188
    /// ```
189
    /// # futures::executor::block_on(async {
190
    /// use futures::io::{AsyncReadExt, Cursor};
191
    ///
192
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
193
    /// let mut output = [0u8; 5];
194
    ///
195
    /// let bytes = reader.read(&mut output[..]).await?;
196
    ///
197
    /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
198
    /// // reader. In a real system you could get anywhere from 1 to
199
    /// // `output.len()` bytes in a single read.
200
    /// assert_eq!(bytes, 4);
201
    /// assert_eq!(output, [1, 2, 3, 4, 0]);
202
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
203
    /// ```
204
0
    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
205
0
    where
206
0
        Self: Unpin,
207
0
    {
208
0
        assert_future::<Result<usize>, _>(Read::new(self, buf))
209
0
    }
210
211
    /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
212
    /// IO operations.
213
    ///
214
    /// The returned future will resolve to the number of bytes read once the read
215
    /// operation is completed.
216
0
    fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
217
0
    where
218
0
        Self: Unpin,
219
0
    {
220
0
        assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
221
0
    }
222
223
    /// Creates a future which will read exactly enough bytes to fill `buf`,
224
    /// returning an error if end of file (EOF) is hit sooner.
225
    ///
226
    /// The returned future will resolve once the read operation is completed.
227
    ///
228
    /// In the case of an error the buffer and the object will be discarded, with
229
    /// the error yielded.
230
    ///
231
    /// # Examples
232
    ///
233
    /// ```
234
    /// # futures::executor::block_on(async {
235
    /// use futures::io::{AsyncReadExt, Cursor};
236
    ///
237
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
238
    /// let mut output = [0u8; 4];
239
    ///
240
    /// reader.read_exact(&mut output).await?;
241
    ///
242
    /// assert_eq!(output, [1, 2, 3, 4]);
243
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
244
    /// ```
245
    ///
246
    /// ## EOF is hit before `buf` is filled
247
    ///
248
    /// ```
249
    /// # futures::executor::block_on(async {
250
    /// use futures::io::{self, AsyncReadExt, Cursor};
251
    ///
252
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
253
    /// let mut output = [0u8; 5];
254
    ///
255
    /// let result = reader.read_exact(&mut output).await;
256
    ///
257
    /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
258
    /// # });
259
    /// ```
260
3.35M
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
261
3.35M
    where
262
3.35M
        Self: Unpin,
263
3.35M
    {
264
3.35M
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
265
3.35M
    }
Unexecuted instantiation: <_ as futures_util::io::AsyncReadExt>::read_exact
<core::pin::Pin<&mut futures_util::io::buf_reader::BufReader<mediasan_common::sync::AsyncInputAdapter<std::io::cursor::Cursor<&[u8]>>>> as futures_util::io::AsyncReadExt>::read_exact
Line
Count
Source
260
9.42k
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
261
9.42k
    where
262
9.42k
        Self: Unpin,
263
9.42k
    {
264
9.42k
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
265
9.42k
    }
<core::pin::Pin<&mut &mut core::pin::Pin<&mut futures_util::io::buf_reader::BufReader<mediasan_common::sync::AsyncInputAdapter<std::io::cursor::Cursor<&[u8]>>>>> as futures_util::io::AsyncReadExt>::read_exact
Line
Count
Source
260
218k
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
261
218k
    where
262
218k
        Self: Unpin,
263
218k
    {
264
218k
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
265
218k
    }
<core::pin::Pin<&mut &mut core::pin::Pin<&mut &mut core::pin::Pin<&mut futures_util::io::buf_reader::BufReader<mediasan_common::sync::AsyncInputAdapter<std::io::cursor::Cursor<&[u8]>>>>>> as futures_util::io::AsyncReadExt>::read_exact
Line
Count
Source
260
217k
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
261
217k
    where
262
217k
        Self: Unpin,
263
217k
    {
264
217k
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
265
217k
    }
<core::pin::Pin<&mut mediasan_common::sync::AsyncInputAdapter<bytes::buf::reader::Reader<&mut &mut bytes::bytes_mut::BytesMut>>> as futures_util::io::AsyncReadExt>::read_exact
Line
Count
Source
260
1.48M
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
261
1.48M
    where
262
1.48M
        Self: Unpin,
263
1.48M
    {
264
1.48M
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
265
1.48M
    }
<core::pin::Pin<&mut &mut core::pin::Pin<&mut mediasan_common::sync::AsyncInputAdapter<bytes::buf::reader::Reader<&mut &mut bytes::bytes_mut::BytesMut>>>> as futures_util::io::AsyncReadExt>::read_exact
Line
Count
Source
260
1.43M
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
261
1.43M
    where
262
1.43M
        Self: Unpin,
263
1.43M
    {
264
1.43M
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
265
1.43M
    }
266
267
    /// Creates a future which will read all the bytes from this `AsyncRead`.
268
    ///
269
    /// On success the total number of bytes read is returned.
270
    ///
271
    /// # Examples
272
    ///
273
    /// ```
274
    /// # futures::executor::block_on(async {
275
    /// use futures::io::{AsyncReadExt, Cursor};
276
    ///
277
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
278
    /// let mut output = Vec::with_capacity(4);
279
    ///
280
    /// let bytes = reader.read_to_end(&mut output).await?;
281
    ///
282
    /// assert_eq!(bytes, 4);
283
    /// assert_eq!(output, vec![1, 2, 3, 4]);
284
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
285
    /// ```
286
0
    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
287
0
    where
288
0
        Self: Unpin,
289
0
    {
290
0
        assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
291
0
    }
292
293
    /// Creates a future which will read all the bytes from this `AsyncRead`.
294
    ///
295
    /// On success the total number of bytes read is returned.
296
    ///
297
    /// # Examples
298
    ///
299
    /// ```
300
    /// # futures::executor::block_on(async {
301
    /// use futures::io::{AsyncReadExt, Cursor};
302
    ///
303
    /// let mut reader = Cursor::new(&b"1234"[..]);
304
    /// let mut buffer = String::with_capacity(4);
305
    ///
306
    /// let bytes = reader.read_to_string(&mut buffer).await?;
307
    ///
308
    /// assert_eq!(bytes, 4);
309
    /// assert_eq!(buffer, String::from("1234"));
310
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
311
    /// ```
312
0
    fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self>
313
0
    where
314
0
        Self: Unpin,
315
0
    {
316
0
        assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
317
0
    }
318
319
    /// Helper method for splitting this read/write object into two halves.
320
    ///
321
    /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
322
    /// traits, respectively.
323
    ///
324
    /// # Examples
325
    ///
326
    /// ```
327
    /// # futures::executor::block_on(async {
328
    /// use futures::io::{self, AsyncReadExt, Cursor};
329
    ///
330
    /// // Note that for `Cursor` the read and write halves share a single
331
    /// // seek position. This may or may not be true for other types that
332
    /// // implement both `AsyncRead` and `AsyncWrite`.
333
    ///
334
    /// let reader = Cursor::new([1, 2, 3, 4]);
335
    /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
336
    /// let mut writer = Cursor::new(vec![0u8; 5]);
337
    ///
338
    /// {
339
    ///     let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
340
    ///     io::copy(reader, &mut buffer_writer).await?;
341
    ///     io::copy(buffer_reader, &mut writer).await?;
342
    /// }
343
    ///
344
    /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
345
    /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
346
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
347
    /// ```
348
0
    fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
349
0
    where
350
0
        Self: AsyncWrite + Sized,
351
0
    {
352
0
        let (r, w) = split::split(self);
353
0
        (assert_read(r), assert_write(w))
354
0
    }
355
356
    /// Creates an AsyncRead adapter which will read at most `limit` bytes
357
    /// from the underlying reader.
358
    ///
359
    /// # Examples
360
    ///
361
    /// ```
362
    /// # futures::executor::block_on(async {
363
    /// use futures::io::{AsyncReadExt, Cursor};
364
    ///
365
    /// let reader = Cursor::new(&b"12345678"[..]);
366
    /// let mut buffer = [0; 5];
367
    ///
368
    /// let mut take = reader.take(4);
369
    /// let n = take.read(&mut buffer).await?;
370
    ///
371
    /// assert_eq!(n, 4);
372
    /// assert_eq!(&buffer, b"1234\0");
373
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
374
    /// ```
375
0
    fn take(self, limit: u64) -> Take<Self>
376
0
    where
377
0
        Self: Sized,
378
0
    {
379
0
        assert_read(Take::new(self, limit))
380
0
    }
381
382
    /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
383
    /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
384
    /// implements [`AsyncWrite`] as well, the result will also implement the
385
    /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
386
    ///
387
    /// Requires the `io-compat` feature to enable.
388
    #[cfg(feature = "io-compat")]
389
    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
390
    fn compat(self) -> Compat<Self>
391
    where
392
        Self: Sized + Unpin,
393
    {
394
        Compat::new(self)
395
    }
396
}
397
398
impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
399
400
/// An extension trait which adds utility methods to `AsyncWrite` types.
401
pub trait AsyncWriteExt: AsyncWrite {
402
    /// Creates a future which will entirely flush this `AsyncWrite`.
403
    ///
404
    /// # Examples
405
    ///
406
    /// ```
407
    /// # futures::executor::block_on(async {
408
    /// use futures::io::{AllowStdIo, AsyncWriteExt};
409
    /// use std::io::{BufWriter, Cursor};
410
    ///
411
    /// let mut output = vec![0u8; 5];
412
    ///
413
    /// {
414
    ///     let writer = Cursor::new(&mut output);
415
    ///     let mut buffered = AllowStdIo::new(BufWriter::new(writer));
416
    ///     buffered.write_all(&[1, 2]).await?;
417
    ///     buffered.write_all(&[3, 4]).await?;
418
    ///     buffered.flush().await?;
419
    /// }
420
    ///
421
    /// assert_eq!(output, [1, 2, 3, 4, 0]);
422
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
423
    /// ```
424
0
    fn flush(&mut self) -> Flush<'_, Self>
425
0
    where
426
0
        Self: Unpin,
427
0
    {
428
0
        assert_future::<Result<()>, _>(Flush::new(self))
429
0
    }
430
431
    /// Creates a future which will entirely close this `AsyncWrite`.
432
0
    fn close(&mut self) -> Close<'_, Self>
433
0
    where
434
0
        Self: Unpin,
435
0
    {
436
0
        assert_future::<Result<()>, _>(Close::new(self))
437
0
    }
438
439
    /// Creates a future which will write bytes from `buf` into the object.
440
    ///
441
    /// The returned future will resolve to the number of bytes written once the write
442
    /// operation is completed.
443
0
    fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
444
0
    where
445
0
        Self: Unpin,
446
0
    {
447
0
        assert_future::<Result<usize>, _>(Write::new(self, buf))
448
0
    }
449
450
    /// Creates a future which will write bytes from `bufs` into the object using vectored
451
    /// IO operations.
452
    ///
453
    /// The returned future will resolve to the number of bytes written once the write
454
    /// operation is completed.
455
0
    fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
456
0
    where
457
0
        Self: Unpin,
458
0
    {
459
0
        assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
460
0
    }
461
462
    /// Write data into this object.
463
    ///
464
    /// Creates a future that will write the entire contents of the buffer `buf` into
465
    /// this `AsyncWrite`.
466
    ///
467
    /// The returned future will not complete until all the data has been written.
468
    ///
469
    /// # Examples
470
    ///
471
    /// ```
472
    /// # futures::executor::block_on(async {
473
    /// use futures::io::{AsyncWriteExt, Cursor};
474
    ///
475
    /// let mut writer = Cursor::new(vec![0u8; 5]);
476
    ///
477
    /// writer.write_all(&[1, 2, 3, 4]).await?;
478
    ///
479
    /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
480
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
481
    /// ```
482
0
    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
483
0
    where
484
0
        Self: Unpin,
485
0
    {
486
0
        assert_future::<Result<()>, _>(WriteAll::new(self, buf))
487
0
    }
488
489
    /// Attempts to write multiple buffers into this writer.
490
    ///
491
    /// Creates a future that will write the entire contents of `bufs` into this
492
    /// `AsyncWrite` using [vectored writes].
493
    ///
494
    /// The returned future will not complete until all the data has been
495
    /// written.
496
    ///
497
    /// [vectored writes]: std::io::Write::write_vectored
498
    ///
499
    /// # Notes
500
    ///
501
    /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
502
    /// a slice of `IoSlice`s, not an immutable one. That's because we need to
503
    /// modify the slice to keep track of the bytes already written.
504
    ///
505
    /// Once this futures returns, the contents of `bufs` are unspecified, as
506
    /// this depends on how many calls to `write_vectored` were necessary. It is
507
    /// best to understand this function as taking ownership of `bufs` and to
508
    /// not use `bufs` afterwards. The underlying buffers, to which the
509
    /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
510
    /// can be reused.
511
    ///
512
    /// # Examples
513
    ///
514
    /// ```
515
    /// # futures::executor::block_on(async {
516
    /// use futures::io::AsyncWriteExt;
517
    /// use futures_util::io::Cursor;
518
    /// use std::io::IoSlice;
519
    ///
520
    /// let mut writer = Cursor::new(Vec::new());
521
    /// let bufs = &mut [
522
    ///     IoSlice::new(&[1]),
523
    ///     IoSlice::new(&[2, 3]),
524
    ///     IoSlice::new(&[4, 5, 6]),
525
    /// ];
526
    ///
527
    /// writer.write_all_vectored(bufs).await?;
528
    /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
529
    ///
530
    /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
531
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
532
    /// ```
533
    #[cfg(feature = "write-all-vectored")]
534
    fn write_all_vectored<'a>(
535
        &'a mut self,
536
        bufs: &'a mut [IoSlice<'a>],
537
    ) -> WriteAllVectored<'a, Self>
538
    where
539
        Self: Unpin,
540
    {
541
        assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
542
    }
543
544
    /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
545
    /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
546
    /// Requires the `io-compat` feature to enable.
547
    #[cfg(feature = "io-compat")]
548
    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
549
    fn compat_write(self) -> Compat<Self>
550
    where
551
        Self: Sized + Unpin,
552
    {
553
        Compat::new(self)
554
    }
555
556
    /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
557
    ///
558
    /// This adapter produces a sink that will write each value passed to it
559
    /// into the underlying writer.
560
    ///
561
    /// Note that this function consumes the given writer, returning a wrapped
562
    /// version.
563
    ///
564
    /// # Examples
565
    ///
566
    /// ```
567
    /// # futures::executor::block_on(async {
568
    /// use futures::io::AsyncWriteExt;
569
    /// use futures::stream::{self, StreamExt};
570
    ///
571
    /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
572
    ///
573
    /// let mut writer = vec![];
574
    ///
575
    /// stream.forward((&mut writer).into_sink()).await?;
576
    ///
577
    /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
578
    /// # Ok::<(), Box<dyn std::error::Error>>(())
579
    /// # })?;
580
    /// # Ok::<(), Box<dyn std::error::Error>>(())
581
    /// ```
582
    #[cfg(feature = "sink")]
583
    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
584
    fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
585
    where
586
        Self: Sized,
587
    {
588
        crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
589
    }
590
}
591
592
impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
593
594
/// An extension trait which adds utility methods to `AsyncSeek` types.
595
pub trait AsyncSeekExt: AsyncSeek {
596
    /// Creates a future which will seek an IO object, and then yield the
597
    /// new position in the object and the object itself.
598
    ///
599
    /// In the case of an error the buffer and the object will be discarded, with
600
    /// the error yielded.
601
0
    fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
602
0
    where
603
0
        Self: Unpin,
604
0
    {
605
0
        assert_future::<Result<u64>, _>(Seek::new(self, pos))
606
0
    }
607
608
    /// Creates a future which will return the current seek position from the
609
    /// start of the stream.
610
    ///
611
    /// This is equivalent to `self.seek(SeekFrom::Current(0))`.
612
0
    fn stream_position(&mut self) -> Seek<'_, Self>
613
0
    where
614
0
        Self: Unpin,
615
0
    {
616
0
        self.seek(SeekFrom::Current(0))
617
0
    }
618
}
619
620
impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
621
622
/// An extension trait which adds utility methods to `AsyncBufRead` types.
623
pub trait AsyncBufReadExt: AsyncBufRead {
624
    /// Creates a future which will wait for a non-empty buffer to be available from this I/O
625
    /// object or EOF to be reached.
626
    ///
627
    /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
628
    ///
629
    /// ```rust
630
    /// # futures::executor::block_on(async {
631
    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
632
    ///
633
    /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
634
    ///
635
    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
636
    /// stream.consume_unpin(2);
637
    ///
638
    /// assert_eq!(stream.fill_buf().await?, vec![3]);
639
    /// stream.consume_unpin(1);
640
    ///
641
    /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
642
    /// stream.consume_unpin(3);
643
    ///
644
    /// assert_eq!(stream.fill_buf().await?, vec![]);
645
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
646
    /// ```
647
219k
    fn fill_buf(&mut self) -> FillBuf<'_, Self>
648
219k
    where
649
219k
        Self: Unpin,
650
219k
    {
651
219k
        assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
652
219k
    }
Unexecuted instantiation: <_ as futures_util::io::AsyncBufReadExt>::fill_buf
<core::pin::Pin<&mut futures_util::io::buf_reader::BufReader<mediasan_common::sync::AsyncInputAdapter<std::io::cursor::Cursor<&[u8]>>>> as futures_util::io::AsyncBufReadExt>::fill_buf
Line
Count
Source
647
219k
    fn fill_buf(&mut self) -> FillBuf<'_, Self>
648
219k
    where
649
219k
        Self: Unpin,
650
219k
    {
651
219k
        assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
652
219k
    }
653
654
    /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
655
    ///
656
    /// ```rust
657
    /// # futures::executor::block_on(async {
658
    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
659
    ///
660
    /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
661
    ///
662
    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
663
    /// stream.consume_unpin(2);
664
    ///
665
    /// assert_eq!(stream.fill_buf().await?, vec![3]);
666
    /// stream.consume_unpin(1);
667
    ///
668
    /// assert_eq!(stream.fill_buf().await?, vec![]);
669
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
670
    /// ```
671
0
    fn consume_unpin(&mut self, amt: usize)
672
0
    where
673
0
        Self: Unpin,
674
0
    {
675
0
        Pin::new(self).consume(amt)
676
0
    }
677
678
    /// Creates a future which will read all the bytes associated with this I/O
679
    /// object into `buf` until the delimiter `byte` or EOF is reached.
680
    /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
681
    ///
682
    /// This function will read bytes from the underlying stream until the
683
    /// delimiter or EOF is found. Once found, all bytes up to, and including,
684
    /// the delimiter (if found) will be appended to `buf`.
685
    ///
686
    /// The returned future will resolve to the number of bytes read once the read
687
    /// operation is completed.
688
    ///
689
    /// In the case of an error the buffer and the object will be discarded, with
690
    /// the error yielded.
691
    ///
692
    /// # Examples
693
    ///
694
    /// ```
695
    /// # futures::executor::block_on(async {
696
    /// use futures::io::{AsyncBufReadExt, Cursor};
697
    ///
698
    /// let mut cursor = Cursor::new(b"lorem-ipsum");
699
    /// let mut buf = vec![];
700
    ///
701
    /// // cursor is at 'l'
702
    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
703
    /// assert_eq!(num_bytes, 6);
704
    /// assert_eq!(buf, b"lorem-");
705
    /// buf.clear();
706
    ///
707
    /// // cursor is at 'i'
708
    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
709
    /// assert_eq!(num_bytes, 5);
710
    /// assert_eq!(buf, b"ipsum");
711
    /// buf.clear();
712
    ///
713
    /// // cursor is at EOF
714
    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
715
    /// assert_eq!(num_bytes, 0);
716
    /// assert_eq!(buf, b"");
717
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
718
    /// ```
719
0
    fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
720
0
    where
721
0
        Self: Unpin,
722
0
    {
723
0
        assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
724
0
    }
725
726
    /// Creates a future which will read all the bytes associated with this I/O
727
    /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
728
    /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
729
    ///
730
    /// This function will read bytes from the underlying stream until the
731
    /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
732
    /// up to, and including, the delimiter (if found) will be appended to
733
    /// `buf`.
734
    ///
735
    /// The returned future will resolve to the number of bytes read once the read
736
    /// operation is completed.
737
    ///
738
    /// In the case of an error the buffer and the object will be discarded, with
739
    /// the error yielded.
740
    ///
741
    /// # Errors
742
    ///
743
    /// This function has the same error semantics as [`read_until`] and will
744
    /// also return an error if the read bytes are not valid UTF-8. If an I/O
745
    /// error is encountered then `buf` may contain some bytes already read in
746
    /// the event that all data read so far was valid UTF-8.
747
    ///
748
    /// [`read_until`]: AsyncBufReadExt::read_until
749
    ///
750
    /// # Examples
751
    ///
752
    /// ```
753
    /// # futures::executor::block_on(async {
754
    /// use futures::io::{AsyncBufReadExt, Cursor};
755
    ///
756
    /// let mut cursor = Cursor::new(b"foo\nbar");
757
    /// let mut buf = String::new();
758
    ///
759
    /// // cursor is at 'f'
760
    /// let num_bytes = cursor.read_line(&mut buf).await?;
761
    /// assert_eq!(num_bytes, 4);
762
    /// assert_eq!(buf, "foo\n");
763
    /// buf.clear();
764
    ///
765
    /// // cursor is at 'b'
766
    /// let num_bytes = cursor.read_line(&mut buf).await?;
767
    /// assert_eq!(num_bytes, 3);
768
    /// assert_eq!(buf, "bar");
769
    /// buf.clear();
770
    ///
771
    /// // cursor is at EOF
772
    /// let num_bytes = cursor.read_line(&mut buf).await?;
773
    /// assert_eq!(num_bytes, 0);
774
    /// assert_eq!(buf, "");
775
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
776
    /// ```
777
0
    fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
778
0
    where
779
0
        Self: Unpin,
780
0
    {
781
0
        assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
782
0
    }
783
784
    /// Returns a stream over the lines of this reader.
785
    /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
786
    ///
787
    /// The stream returned from this function will yield instances of
788
    /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
789
    /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
790
    ///
791
    /// [`io::Result`]: std::io::Result
792
    /// [`String`]: String
793
    ///
794
    /// # Errors
795
    ///
796
    /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
797
    ///
798
    /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
799
    ///
800
    /// # Examples
801
    ///
802
    /// ```
803
    /// # futures::executor::block_on(async {
804
    /// use futures::io::{AsyncBufReadExt, Cursor};
805
    /// use futures::stream::StreamExt;
806
    ///
807
    /// let cursor = Cursor::new(b"lorem\nipsum\xc2\r\ndolor");
808
    ///
809
    /// let mut lines_stream = cursor.lines().map(|l| l.unwrap_or(String::from("invalid UTF_8")));
810
    /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
811
    /// assert_eq!(lines_stream.next().await, Some(String::from("invalid UTF_8")));
812
    /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
813
    /// assert_eq!(lines_stream.next().await, None);
814
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
815
    /// ```
816
0
    fn lines(self) -> Lines<Self>
817
0
    where
818
0
        Self: Sized,
819
0
    {
820
0
        assert_stream::<Result<String>, _>(Lines::new(self))
821
0
    }
822
}
823
824
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
825
826
// Just a helper function to ensure the reader we're returning all have the
827
// right implementations.
828
0
pub(crate) fn assert_read<R>(reader: R) -> R
829
0
where
830
0
    R: AsyncRead,
831
0
{
832
0
    reader
833
0
}
834
// Just a helper function to ensure the writer we're returning all have the
835
// right implementations.
836
0
pub(crate) fn assert_write<W>(writer: W) -> W
837
0
where
838
0
    W: AsyncWrite,
839
0
{
840
0
    writer
841
0
}