Coverage Report

Created: 2025-12-12 06:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.21/src/io/mod.rs
Line
Count
Source
1
//! Asynchronous I/O.
2
//!
3
//! This module is the asynchronous version of `std::io`. It defines four
4
//! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
5
//! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
6
//! standard library. However, these traits integrate with the asynchronous
7
//! task system, so that if an I/O object isn't ready for reading (or writing),
8
//! the thread is not blocked, and instead the current task is queued to be
9
//! woken when I/O is ready.
10
//!
11
//! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
12
//! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
13
//! for operating with asynchronous I/O objects, including ways to work with
14
//! them using futures, streams and sinks.
15
//!
16
//! This module is only available when the `std` feature of this
17
//! library is activated, and it is activated by default.
18
19
#[cfg(feature = "io-compat")]
20
#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
21
use crate::compat::Compat;
22
use crate::future::assert_future;
23
use crate::stream::assert_stream;
24
use std::{pin::Pin, ptr};
25
26
// Re-export some types from `std::io` so that users don't have to deal
27
// with conflicts when `use`ing `futures::io` and `std::io`.
28
#[doc(no_inline)]
29
pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
30
31
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
32
33
// used by `BufReader` and `BufWriter`
34
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
35
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
36
37
/// Initializes a buffer if necessary.
38
///
39
/// A buffer is currently always initialized.
40
#[inline]
41
0
unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
42
0
    ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
43
0
}
44
45
mod allow_std;
46
pub use self::allow_std::AllowStdIo;
47
48
mod buf_reader;
49
pub use self::buf_reader::{BufReader, SeeKRelative};
50
51
mod buf_writer;
52
pub use self::buf_writer::BufWriter;
53
54
mod line_writer;
55
pub use self::line_writer::LineWriter;
56
57
mod chain;
58
pub use self::chain::Chain;
59
60
mod close;
61
pub use self::close::Close;
62
63
mod copy;
64
pub use self::copy::{copy, Copy};
65
66
mod copy_buf;
67
pub use self::copy_buf::{copy_buf, CopyBuf};
68
69
mod cursor;
70
pub use self::cursor::Cursor;
71
72
mod empty;
73
pub use self::empty::{empty, Empty};
74
75
mod fill_buf;
76
pub use self::fill_buf::FillBuf;
77
78
mod flush;
79
pub use self::flush::Flush;
80
81
#[cfg(feature = "sink")]
82
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
83
mod into_sink;
84
#[cfg(feature = "sink")]
85
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
86
pub use self::into_sink::IntoSink;
87
88
mod lines;
89
pub use self::lines::Lines;
90
91
mod read;
92
pub use self::read::Read;
93
94
mod read_vectored;
95
pub use self::read_vectored::ReadVectored;
96
97
mod read_exact;
98
pub use self::read_exact::ReadExact;
99
100
mod read_line;
101
pub use self::read_line::ReadLine;
102
103
mod read_to_end;
104
pub use self::read_to_end::ReadToEnd;
105
106
mod read_to_string;
107
pub use self::read_to_string::ReadToString;
108
109
mod read_until;
110
pub use self::read_until::ReadUntil;
111
112
mod repeat;
113
pub use self::repeat::{repeat, Repeat};
114
115
mod seek;
116
pub use self::seek::Seek;
117
118
mod sink;
119
pub use self::sink::{sink, Sink};
120
121
mod split;
122
pub use self::split::{ReadHalf, ReuniteError, WriteHalf};
123
124
mod take;
125
pub use self::take::Take;
126
127
mod window;
128
pub use self::window::Window;
129
130
mod write;
131
pub use self::write::Write;
132
133
mod write_vectored;
134
pub use self::write_vectored::WriteVectored;
135
136
mod write_all;
137
pub use self::write_all::WriteAll;
138
139
#[cfg(feature = "write-all-vectored")]
140
mod write_all_vectored;
141
#[cfg(feature = "write-all-vectored")]
142
pub use self::write_all_vectored::WriteAllVectored;
143
144
/// An extension trait which adds utility methods to `AsyncRead` types.
145
pub trait AsyncReadExt: AsyncRead {
146
    /// Creates an adaptor which will chain this stream with another.
147
    ///
148
    /// The returned `AsyncRead` instance will first read all bytes from this object
149
    /// until EOF is encountered. Afterwards the output is equivalent to the
150
    /// output of `next`.
151
    ///
152
    /// # Examples
153
    ///
154
    /// ```
155
    /// # futures::executor::block_on(async {
156
    /// use futures::io::{AsyncReadExt, Cursor};
157
    ///
158
    /// let reader1 = Cursor::new([1, 2, 3, 4]);
159
    /// let reader2 = Cursor::new([5, 6, 7, 8]);
160
    ///
161
    /// let mut reader = reader1.chain(reader2);
162
    /// let mut buffer = Vec::new();
163
    ///
164
    /// // read the value into a Vec.
165
    /// reader.read_to_end(&mut buffer).await?;
166
    /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
167
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
168
    /// ```
169
0
    fn chain<R>(self, next: R) -> Chain<Self, R>
170
0
    where
171
0
        Self: Sized,
172
0
        R: AsyncRead,
173
    {
174
0
        assert_read(Chain::new(self, next))
175
0
    }
176
177
    /// Tries to read some bytes directly into the given `buf` in asynchronous
178
    /// manner, returning a future type.
179
    ///
180
    /// The returned future will resolve to the number of bytes read once the read
181
    /// operation is completed.
182
    ///
183
    /// # Examples
184
    ///
185
    /// ```
186
    /// # futures::executor::block_on(async {
187
    /// use futures::io::{AsyncReadExt, Cursor};
188
    ///
189
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
190
    /// let mut output = [0u8; 5];
191
    ///
192
    /// let bytes = reader.read(&mut output[..]).await?;
193
    ///
194
    /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
195
    /// // reader. In a real system you could get anywhere from 1 to
196
    /// // `output.len()` bytes in a single read.
197
    /// assert_eq!(bytes, 4);
198
    /// assert_eq!(output, [1, 2, 3, 4, 0]);
199
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
200
    /// ```
201
0
    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
202
0
    where
203
0
        Self: Unpin,
204
    {
205
0
        assert_future::<Result<usize>, _>(Read::new(self, buf))
206
0
    }
207
208
    /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
209
    /// IO operations.
210
    ///
211
    /// The returned future will resolve to the number of bytes read once the read
212
    /// operation is completed.
213
0
    fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
214
0
    where
215
0
        Self: Unpin,
216
    {
217
0
        assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
218
0
    }
219
220
    /// Creates a future which will read exactly enough bytes to fill `buf`,
221
    /// returning an error if end of file (EOF) is hit sooner.
222
    ///
223
    /// The returned future will resolve once the read operation is completed.
224
    ///
225
    /// In the case of an error the buffer and the object will be discarded, with
226
    /// the error yielded.
227
    ///
228
    /// # Examples
229
    ///
230
    /// ```
231
    /// # futures::executor::block_on(async {
232
    /// use futures::io::{AsyncReadExt, Cursor};
233
    ///
234
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
235
    /// let mut output = [0u8; 4];
236
    ///
237
    /// reader.read_exact(&mut output).await?;
238
    ///
239
    /// assert_eq!(output, [1, 2, 3, 4]);
240
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
241
    /// ```
242
    ///
243
    /// ## EOF is hit before `buf` is filled
244
    ///
245
    /// ```
246
    /// # futures::executor::block_on(async {
247
    /// use futures::io::{self, AsyncReadExt, Cursor};
248
    ///
249
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
250
    /// let mut output = [0u8; 5];
251
    ///
252
    /// let result = reader.read_exact(&mut output).await;
253
    ///
254
    /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
255
    /// # });
256
    /// ```
257
0
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
258
0
    where
259
0
        Self: Unpin,
260
    {
261
0
        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
262
0
    }
263
264
    /// Creates a future which will read all the bytes from this `AsyncRead`.
265
    ///
266
    /// On success the total number of bytes read is returned.
267
    ///
268
    /// # Examples
269
    ///
270
    /// ```
271
    /// # futures::executor::block_on(async {
272
    /// use futures::io::{AsyncReadExt, Cursor};
273
    ///
274
    /// let mut reader = Cursor::new([1, 2, 3, 4]);
275
    /// let mut output = Vec::with_capacity(4);
276
    ///
277
    /// let bytes = reader.read_to_end(&mut output).await?;
278
    ///
279
    /// assert_eq!(bytes, 4);
280
    /// assert_eq!(output, vec![1, 2, 3, 4]);
281
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
282
    /// ```
283
0
    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
284
0
    where
285
0
        Self: Unpin,
286
    {
287
0
        assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
288
0
    }
289
290
    /// Creates a future which will read all the bytes from this `AsyncRead`.
291
    ///
292
    /// On success the total number of bytes read is returned.
293
    ///
294
    /// # Examples
295
    ///
296
    /// ```
297
    /// # futures::executor::block_on(async {
298
    /// use futures::io::{AsyncReadExt, Cursor};
299
    ///
300
    /// let mut reader = Cursor::new(&b"1234"[..]);
301
    /// let mut buffer = String::with_capacity(4);
302
    ///
303
    /// let bytes = reader.read_to_string(&mut buffer).await?;
304
    ///
305
    /// assert_eq!(bytes, 4);
306
    /// assert_eq!(buffer, String::from("1234"));
307
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
308
    /// ```
309
0
    fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self>
310
0
    where
311
0
        Self: Unpin,
312
    {
313
0
        assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
314
0
    }
315
316
    /// Helper method for splitting this read/write object into two halves.
317
    ///
318
    /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
319
    /// traits, respectively.
320
    ///
321
    /// # Examples
322
    ///
323
    /// ```
324
    /// # futures::executor::block_on(async {
325
    /// use futures::io::{self, AsyncReadExt, Cursor};
326
    ///
327
    /// // Note that for `Cursor` the read and write halves share a single
328
    /// // seek position. This may or may not be true for other types that
329
    /// // implement both `AsyncRead` and `AsyncWrite`.
330
    ///
331
    /// let reader = Cursor::new([1, 2, 3, 4]);
332
    /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
333
    /// let mut writer = Cursor::new(vec![0u8; 5]);
334
    ///
335
    /// {
336
    ///     let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
337
    ///     io::copy(reader, &mut buffer_writer).await?;
338
    ///     io::copy(buffer_reader, &mut writer).await?;
339
    /// }
340
    ///
341
    /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
342
    /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
343
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
344
    /// ```
345
0
    fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
346
0
    where
347
0
        Self: AsyncWrite + Sized,
348
    {
349
0
        let (r, w) = split::split(self);
350
0
        (assert_read(r), assert_write(w))
351
0
    }
352
353
    /// Creates an AsyncRead adapter which will read at most `limit` bytes
354
    /// from the underlying reader.
355
    ///
356
    /// # Examples
357
    ///
358
    /// ```
359
    /// # futures::executor::block_on(async {
360
    /// use futures::io::{AsyncReadExt, Cursor};
361
    ///
362
    /// let reader = Cursor::new(&b"12345678"[..]);
363
    /// let mut buffer = [0; 5];
364
    ///
365
    /// let mut take = reader.take(4);
366
    /// let n = take.read(&mut buffer).await?;
367
    ///
368
    /// assert_eq!(n, 4);
369
    /// assert_eq!(&buffer, b"1234\0");
370
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
371
    /// ```
372
0
    fn take(self, limit: u64) -> Take<Self>
373
0
    where
374
0
        Self: Sized,
375
    {
376
0
        assert_read(Take::new(self, limit))
377
0
    }
378
379
    /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
380
    /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
381
    /// implements [`AsyncWrite`] as well, the result will also implement the
382
    /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
383
    ///
384
    /// Requires the `io-compat` feature to enable.
385
    #[cfg(feature = "io-compat")]
386
    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
387
    fn compat(self) -> Compat<Self>
388
    where
389
        Self: Sized + Unpin,
390
    {
391
        Compat::new(self)
392
    }
393
}
394
395
impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
396
397
/// An extension trait which adds utility methods to `AsyncWrite` types.
398
pub trait AsyncWriteExt: AsyncWrite {
399
    /// Creates a future which will entirely flush this `AsyncWrite`.
400
    ///
401
    /// # Examples
402
    ///
403
    /// ```
404
    /// # futures::executor::block_on(async {
405
    /// use futures::io::{AllowStdIo, AsyncWriteExt};
406
    /// use std::io::{BufWriter, Cursor};
407
    ///
408
    /// let mut output = vec![0u8; 5];
409
    ///
410
    /// {
411
    ///     let writer = Cursor::new(&mut output);
412
    ///     let mut buffered = AllowStdIo::new(BufWriter::new(writer));
413
    ///     buffered.write_all(&[1, 2]).await?;
414
    ///     buffered.write_all(&[3, 4]).await?;
415
    ///     buffered.flush().await?;
416
    /// }
417
    ///
418
    /// assert_eq!(output, [1, 2, 3, 4, 0]);
419
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
420
    /// ```
421
0
    fn flush(&mut self) -> Flush<'_, Self>
422
0
    where
423
0
        Self: Unpin,
424
    {
425
0
        assert_future::<Result<()>, _>(Flush::new(self))
426
0
    }
427
428
    /// Creates a future which will entirely close this `AsyncWrite`.
429
0
    fn close(&mut self) -> Close<'_, Self>
430
0
    where
431
0
        Self: Unpin,
432
    {
433
0
        assert_future::<Result<()>, _>(Close::new(self))
434
0
    }
435
436
    /// Creates a future which will write bytes from `buf` into the object.
437
    ///
438
    /// The returned future will resolve to the number of bytes written once the write
439
    /// operation is completed.
440
0
    fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
441
0
    where
442
0
        Self: Unpin,
443
    {
444
0
        assert_future::<Result<usize>, _>(Write::new(self, buf))
445
0
    }
446
447
    /// Creates a future which will write bytes from `bufs` into the object using vectored
448
    /// IO operations.
449
    ///
450
    /// The returned future will resolve to the number of bytes written once the write
451
    /// operation is completed.
452
0
    fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
453
0
    where
454
0
        Self: Unpin,
455
    {
456
0
        assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
457
0
    }
458
459
    /// Write data into this object.
460
    ///
461
    /// Creates a future that will write the entire contents of the buffer `buf` into
462
    /// this `AsyncWrite`.
463
    ///
464
    /// The returned future will not complete until all the data has been written.
465
    ///
466
    /// # Examples
467
    ///
468
    /// ```
469
    /// # futures::executor::block_on(async {
470
    /// use futures::io::{AsyncWriteExt, Cursor};
471
    ///
472
    /// let mut writer = Cursor::new(vec![0u8; 5]);
473
    ///
474
    /// writer.write_all(&[1, 2, 3, 4]).await?;
475
    ///
476
    /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
477
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
478
    /// ```
479
0
    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
480
0
    where
481
0
        Self: Unpin,
482
    {
483
0
        assert_future::<Result<()>, _>(WriteAll::new(self, buf))
484
0
    }
485
486
    /// Attempts to write multiple buffers into this writer.
487
    ///
488
    /// Creates a future that will write the entire contents of `bufs` into this
489
    /// `AsyncWrite` using [vectored writes].
490
    ///
491
    /// The returned future will not complete until all the data has been
492
    /// written.
493
    ///
494
    /// [vectored writes]: std::io::Write::write_vectored
495
    ///
496
    /// # Notes
497
    ///
498
    /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
499
    /// a slice of `IoSlice`s, not an immutable one. That's because we need to
500
    /// modify the slice to keep track of the bytes already written.
501
    ///
502
    /// Once this futures returns, the contents of `bufs` are unspecified, as
503
    /// this depends on how many calls to `write_vectored` were necessary. It is
504
    /// best to understand this function as taking ownership of `bufs` and to
505
    /// not use `bufs` afterwards. The underlying buffers, to which the
506
    /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
507
    /// can be reused.
508
    ///
509
    /// # Examples
510
    ///
511
    /// ```
512
    /// # futures::executor::block_on(async {
513
    /// use futures::io::AsyncWriteExt;
514
    /// use futures_util::io::Cursor;
515
    /// use std::io::IoSlice;
516
    ///
517
    /// let mut writer = Cursor::new(Vec::new());
518
    /// let bufs = &mut [
519
    ///     IoSlice::new(&[1]),
520
    ///     IoSlice::new(&[2, 3]),
521
    ///     IoSlice::new(&[4, 5, 6]),
522
    /// ];
523
    ///
524
    /// writer.write_all_vectored(bufs).await?;
525
    /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
526
    ///
527
    /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
528
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
529
    /// ```
530
    #[cfg(feature = "write-all-vectored")]
531
    fn write_all_vectored<'a>(
532
        &'a mut self,
533
        bufs: &'a mut [IoSlice<'a>],
534
    ) -> WriteAllVectored<'a, Self>
535
    where
536
        Self: Unpin,
537
    {
538
        assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
539
    }
540
541
    /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
542
    /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
543
    /// Requires the `io-compat` feature to enable.
544
    #[cfg(feature = "io-compat")]
545
    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
546
    fn compat_write(self) -> Compat<Self>
547
    where
548
        Self: Sized + Unpin,
549
    {
550
        Compat::new(self)
551
    }
552
553
    /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
554
    ///
555
    /// This adapter produces a sink that will write each value passed to it
556
    /// into the underlying writer.
557
    ///
558
    /// Note that this function consumes the given writer, returning a wrapped
559
    /// version.
560
    ///
561
    /// # Examples
562
    ///
563
    /// ```
564
    /// # futures::executor::block_on(async {
565
    /// use futures::io::AsyncWriteExt;
566
    /// use futures::stream::{self, StreamExt};
567
    ///
568
    /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
569
    ///
570
    /// let mut writer = vec![];
571
    ///
572
    /// stream.forward((&mut writer).into_sink()).await?;
573
    ///
574
    /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
575
    /// # Ok::<(), Box<dyn std::error::Error>>(())
576
    /// # })?;
577
    /// # Ok::<(), Box<dyn std::error::Error>>(())
578
    /// ```
579
    #[cfg(feature = "sink")]
580
    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
581
0
    fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
582
0
    where
583
0
        Self: Sized,
584
    {
585
0
        crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
586
0
    }
587
}
588
589
impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
590
591
/// An extension trait which adds utility methods to `AsyncSeek` types.
592
pub trait AsyncSeekExt: AsyncSeek {
593
    /// Creates a future which will seek an IO object, and then yield the
594
    /// new position in the object and the object itself.
595
    ///
596
    /// In the case of an error the buffer and the object will be discarded, with
597
    /// the error yielded.
598
0
    fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
599
0
    where
600
0
        Self: Unpin,
601
    {
602
0
        assert_future::<Result<u64>, _>(Seek::new(self, pos))
603
0
    }
604
605
    /// Creates a future which will return the current seek position from the
606
    /// start of the stream.
607
    ///
608
    /// This is equivalent to `self.seek(SeekFrom::Current(0))`.
609
0
    fn stream_position(&mut self) -> Seek<'_, Self>
610
0
    where
611
0
        Self: Unpin,
612
    {
613
0
        self.seek(SeekFrom::Current(0))
614
0
    }
615
}
616
617
impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
618
619
/// An extension trait which adds utility methods to `AsyncBufRead` types.
620
pub trait AsyncBufReadExt: AsyncBufRead {
621
    /// Creates a future which will wait for a non-empty buffer to be available from this I/O
622
    /// object or EOF to be reached.
623
    ///
624
    /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
625
    ///
626
    /// ```rust
627
    /// # futures::executor::block_on(async {
628
    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
629
    ///
630
    /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
631
    ///
632
    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
633
    /// stream.consume_unpin(2);
634
    ///
635
    /// assert_eq!(stream.fill_buf().await?, vec![3]);
636
    /// stream.consume_unpin(1);
637
    ///
638
    /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
639
    /// stream.consume_unpin(3);
640
    ///
641
    /// assert_eq!(stream.fill_buf().await?, vec![]);
642
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
643
    /// ```
644
0
    fn fill_buf(&mut self) -> FillBuf<'_, Self>
645
0
    where
646
0
        Self: Unpin,
647
    {
648
0
        assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
649
0
    }
650
651
    /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
652
    ///
653
    /// ```rust
654
    /// # futures::executor::block_on(async {
655
    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
656
    ///
657
    /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
658
    ///
659
    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
660
    /// stream.consume_unpin(2);
661
    ///
662
    /// assert_eq!(stream.fill_buf().await?, vec![3]);
663
    /// stream.consume_unpin(1);
664
    ///
665
    /// assert_eq!(stream.fill_buf().await?, vec![]);
666
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
667
    /// ```
668
0
    fn consume_unpin(&mut self, amt: usize)
669
0
    where
670
0
        Self: Unpin,
671
    {
672
0
        Pin::new(self).consume(amt)
673
0
    }
674
675
    /// Creates a future which will read all the bytes associated with this I/O
676
    /// object into `buf` until the delimiter `byte` or EOF is reached.
677
    /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
678
    ///
679
    /// This function will read bytes from the underlying stream until the
680
    /// delimiter or EOF is found. Once found, all bytes up to, and including,
681
    /// the delimiter (if found) will be appended to `buf`.
682
    ///
683
    /// The returned future will resolve to the number of bytes read once the read
684
    /// operation is completed.
685
    ///
686
    /// In the case of an error the buffer and the object will be discarded, with
687
    /// the error yielded.
688
    ///
689
    /// # Examples
690
    ///
691
    /// ```
692
    /// # futures::executor::block_on(async {
693
    /// use futures::io::{AsyncBufReadExt, Cursor};
694
    ///
695
    /// let mut cursor = Cursor::new(b"lorem-ipsum");
696
    /// let mut buf = vec![];
697
    ///
698
    /// // cursor is at 'l'
699
    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
700
    /// assert_eq!(num_bytes, 6);
701
    /// assert_eq!(buf, b"lorem-");
702
    /// buf.clear();
703
    ///
704
    /// // cursor is at 'i'
705
    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
706
    /// assert_eq!(num_bytes, 5);
707
    /// assert_eq!(buf, b"ipsum");
708
    /// buf.clear();
709
    ///
710
    /// // cursor is at EOF
711
    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
712
    /// assert_eq!(num_bytes, 0);
713
    /// assert_eq!(buf, b"");
714
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
715
    /// ```
716
0
    fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
717
0
    where
718
0
        Self: Unpin,
719
    {
720
0
        assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
721
0
    }
722
723
    /// Creates a future which will read all the bytes associated with this I/O
724
    /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
725
    /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
726
    ///
727
    /// This function will read bytes from the underlying stream until the
728
    /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
729
    /// up to, and including, the delimiter (if found) will be appended to
730
    /// `buf`.
731
    ///
732
    /// The returned future will resolve to the number of bytes read once the read
733
    /// operation is completed.
734
    ///
735
    /// In the case of an error the buffer and the object will be discarded, with
736
    /// the error yielded.
737
    ///
738
    /// # Errors
739
    ///
740
    /// This function has the same error semantics as [`read_until`] and will
741
    /// also return an error if the read bytes are not valid UTF-8. If an I/O
742
    /// error is encountered then `buf` may contain some bytes already read in
743
    /// the event that all data read so far was valid UTF-8.
744
    ///
745
    /// [`read_until`]: AsyncBufReadExt::read_until
746
    ///
747
    /// # Examples
748
    ///
749
    /// ```
750
    /// # futures::executor::block_on(async {
751
    /// use futures::io::{AsyncBufReadExt, Cursor};
752
    ///
753
    /// let mut cursor = Cursor::new(b"foo\nbar");
754
    /// let mut buf = String::new();
755
    ///
756
    /// // cursor is at 'f'
757
    /// let num_bytes = cursor.read_line(&mut buf).await?;
758
    /// assert_eq!(num_bytes, 4);
759
    /// assert_eq!(buf, "foo\n");
760
    /// buf.clear();
761
    ///
762
    /// // cursor is at 'b'
763
    /// let num_bytes = cursor.read_line(&mut buf).await?;
764
    /// assert_eq!(num_bytes, 3);
765
    /// assert_eq!(buf, "bar");
766
    /// buf.clear();
767
    ///
768
    /// // cursor is at EOF
769
    /// let num_bytes = cursor.read_line(&mut buf).await?;
770
    /// assert_eq!(num_bytes, 0);
771
    /// assert_eq!(buf, "");
772
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
773
    /// ```
774
0
    fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
775
0
    where
776
0
        Self: Unpin,
777
    {
778
0
        assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
779
0
    }
780
781
    /// Returns a stream over the lines of this reader.
782
    /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
783
    ///
784
    /// The stream returned from this function will yield instances of
785
    /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
786
    /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
787
    ///
788
    /// [`io::Result`]: std::io::Result
789
    /// [`String`]: String
790
    ///
791
    /// # Errors
792
    ///
793
    /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
794
    ///
795
    /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
796
    ///
797
    /// # Examples
798
    ///
799
    /// ```
800
    /// # futures::executor::block_on(async {
801
    /// use futures::io::{AsyncBufReadExt, Cursor};
802
    /// use futures::stream::StreamExt;
803
    ///
804
    /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
805
    ///
806
    /// let mut lines_stream = cursor.lines().map(|l| l.unwrap());
807
    /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
808
    /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum")));
809
    /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
810
    /// assert_eq!(lines_stream.next().await, None);
811
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
812
    /// ```
813
0
    fn lines(self) -> Lines<Self>
814
0
    where
815
0
        Self: Sized,
816
    {
817
0
        assert_stream::<Result<String>, _>(Lines::new(self))
818
0
    }
819
}
820
821
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
822
823
// Just a helper function to ensure the reader we're returning all have the
824
// right implementations.
825
0
pub(crate) fn assert_read<R>(reader: R) -> R
826
0
where
827
0
    R: AsyncRead,
828
{
829
0
    reader
830
0
}
831
// Just a helper function to ensure the writer we're returning all have the
832
// right implementations.
833
0
pub(crate) fn assert_write<W>(writer: W) -> W
834
0
where
835
0
    W: AsyncWrite,
836
{
837
0
    writer
838
0
}