/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.46.1/src/fs/file.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Types for working with [`File`]. |
2 | | //! |
3 | | //! [`File`]: File |
4 | | |
5 | | use crate::fs::{asyncify, OpenOptions}; |
6 | | use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE}; |
7 | | use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; |
8 | | use crate::sync::Mutex; |
9 | | |
10 | | use std::cmp; |
11 | | use std::fmt; |
12 | | use std::fs::{Metadata, Permissions}; |
13 | | use std::future::Future; |
14 | | use std::io::{self, Seek, SeekFrom}; |
15 | | use std::path::Path; |
16 | | use std::pin::Pin; |
17 | | use std::sync::Arc; |
18 | | use std::task::{ready, Context, Poll}; |
19 | | |
20 | | #[cfg(test)] |
21 | | use super::mocks::JoinHandle; |
22 | | #[cfg(test)] |
23 | | use super::mocks::MockFile as StdFile; |
24 | | #[cfg(test)] |
25 | | use super::mocks::{spawn_blocking, spawn_mandatory_blocking}; |
26 | | #[cfg(not(test))] |
27 | | use crate::blocking::JoinHandle; |
28 | | #[cfg(not(test))] |
29 | | use crate::blocking::{spawn_blocking, spawn_mandatory_blocking}; |
30 | | #[cfg(not(test))] |
31 | | use std::fs::File as StdFile; |
32 | | |
33 | | /// A reference to an open file on the filesystem. |
34 | | /// |
35 | | /// This is a specialized version of [`std::fs::File`] for usage from the |
36 | | /// Tokio runtime. |
37 | | /// |
38 | | /// An instance of a `File` can be read and/or written depending on what options |
39 | | /// it was opened with. Files also implement [`AsyncSeek`] to alter the logical |
40 | | /// cursor that the file contains internally. |
41 | | /// |
42 | | /// A file will not be closed immediately when it goes out of scope if there |
43 | | /// are any IO operations that have not yet completed. To ensure that a file is |
44 | | /// closed immediately when it is dropped, you should call [`flush`] before |
45 | | /// dropping it. Note that this does not ensure that the file has been fully |
46 | | /// written to disk; the operating system might keep the changes around in an |
47 | | /// in-memory buffer. See the [`sync_all`] method for telling the OS to write |
48 | | /// the data to disk. |
49 | | /// |
50 | | /// Reading and writing to a `File` is usually done using the convenience |
51 | | /// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits. |
52 | | /// |
53 | | /// [`AsyncSeek`]: trait@crate::io::AsyncSeek |
54 | | /// [`flush`]: fn@crate::io::AsyncWriteExt::flush |
55 | | /// [`sync_all`]: fn@crate::fs::File::sync_all |
56 | | /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt |
57 | | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
58 | | /// |
59 | | /// # Examples |
60 | | /// |
61 | | /// Create a new file and asynchronously write bytes to it: |
62 | | /// |
63 | | /// ```no_run |
64 | | /// use tokio::fs::File; |
65 | | /// use tokio::io::AsyncWriteExt; // for write_all() |
66 | | /// |
67 | | /// # async fn dox() -> std::io::Result<()> { |
68 | | /// let mut file = File::create("foo.txt").await?; |
69 | | /// file.write_all(b"hello, world!").await?; |
70 | | /// # Ok(()) |
71 | | /// # } |
72 | | /// ``` |
73 | | /// |
74 | | /// Read the contents of a file into a buffer: |
75 | | /// |
76 | | /// ```no_run |
77 | | /// use tokio::fs::File; |
78 | | /// use tokio::io::AsyncReadExt; // for read_to_end() |
79 | | /// |
80 | | /// # async fn dox() -> std::io::Result<()> { |
81 | | /// let mut file = File::open("foo.txt").await?; |
82 | | /// |
83 | | /// let mut contents = vec![]; |
84 | | /// file.read_to_end(&mut contents).await?; |
85 | | /// |
86 | | /// println!("len = {}", contents.len()); |
87 | | /// # Ok(()) |
88 | | /// # } |
89 | | /// ``` |
90 | | pub struct File { |
91 | | std: Arc<StdFile>, |
92 | | inner: Mutex<Inner>, |
93 | | max_buf_size: usize, |
94 | | } |
95 | | |
96 | | struct Inner { |
97 | | state: State, |
98 | | |
99 | | /// Errors from writes/flushes are returned in write/flush calls. If a write |
100 | | /// error is observed while performing a read, it is saved until the next |
101 | | /// write / flush call. |
102 | | last_write_err: Option<io::ErrorKind>, |
103 | | |
104 | | pos: u64, |
105 | | } |
106 | | |
107 | | #[derive(Debug)] |
108 | | enum State { |
109 | | Idle(Option<Buf>), |
110 | | Busy(JoinHandle<(Operation, Buf)>), |
111 | | } |
112 | | |
113 | | #[derive(Debug)] |
114 | | enum Operation { |
115 | | Read(io::Result<usize>), |
116 | | Write(io::Result<()>), |
117 | | Seek(io::Result<u64>), |
118 | | } |
119 | | |
120 | | impl File { |
121 | | /// Attempts to open a file in read-only mode. |
122 | | /// |
123 | | /// See [`OpenOptions`] for more details. |
124 | | /// |
125 | | /// # Errors |
126 | | /// |
127 | | /// This function will return an error if called from outside of the Tokio |
128 | | /// runtime or if path does not already exist. Other errors may also be |
129 | | /// returned according to `OpenOptions::open`. |
130 | | /// |
131 | | /// # Examples |
132 | | /// |
133 | | /// ```no_run |
134 | | /// use tokio::fs::File; |
135 | | /// use tokio::io::AsyncReadExt; |
136 | | /// |
137 | | /// # async fn dox() -> std::io::Result<()> { |
138 | | /// let mut file = File::open("foo.txt").await?; |
139 | | /// |
140 | | /// let mut contents = vec![]; |
141 | | /// file.read_to_end(&mut contents).await?; |
142 | | /// |
143 | | /// println!("len = {}", contents.len()); |
144 | | /// # Ok(()) |
145 | | /// # } |
146 | | /// ``` |
147 | | /// |
148 | | /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait. |
149 | | /// |
150 | | /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end |
151 | | /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt |
152 | 0 | pub async fn open(path: impl AsRef<Path>) -> io::Result<File> { |
153 | 0 | let path = path.as_ref().to_owned(); |
154 | 0 | let std = asyncify(|| StdFile::open(path)).await?; |
155 | | |
156 | 0 | Ok(File::from_std(std)) |
157 | 0 | } |
158 | | |
159 | | /// Opens a file in write-only mode. |
160 | | /// |
161 | | /// This function will create a file if it does not exist, and will truncate |
162 | | /// it if it does. |
163 | | /// |
164 | | /// See [`OpenOptions`] for more details. |
165 | | /// |
166 | | /// # Errors |
167 | | /// |
168 | | /// Results in an error if called from outside of the Tokio runtime or if |
169 | | /// the underlying [`create`] call results in an error. |
170 | | /// |
171 | | /// [`create`]: std::fs::File::create |
172 | | /// |
173 | | /// # Examples |
174 | | /// |
175 | | /// ```no_run |
176 | | /// use tokio::fs::File; |
177 | | /// use tokio::io::AsyncWriteExt; |
178 | | /// |
179 | | /// # async fn dox() -> std::io::Result<()> { |
180 | | /// let mut file = File::create("foo.txt").await?; |
181 | | /// file.write_all(b"hello, world!").await?; |
182 | | /// # Ok(()) |
183 | | /// # } |
184 | | /// ``` |
185 | | /// |
186 | | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
187 | | /// |
188 | | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
189 | | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
190 | 0 | pub async fn create(path: impl AsRef<Path>) -> io::Result<File> { |
191 | 0 | let path = path.as_ref().to_owned(); |
192 | 0 | let std_file = asyncify(move || StdFile::create(path)).await?; |
193 | 0 | Ok(File::from_std(std_file)) |
194 | 0 | } |
195 | | |
196 | | /// Opens a file in read-write mode. |
197 | | /// |
198 | | /// This function will create a file if it does not exist, or return an error |
199 | | /// if it does. This way, if the call succeeds, the file returned is guaranteed |
200 | | /// to be new. |
201 | | /// |
202 | | /// This option is useful because it is atomic. Otherwise between checking |
203 | | /// whether a file exists and creating a new one, the file may have been |
204 | | /// created by another process (a TOCTOU race condition / attack). |
205 | | /// |
206 | | /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`. |
207 | | /// |
208 | | /// See [`OpenOptions`] for more details. |
209 | | /// |
210 | | /// # Examples |
211 | | /// |
212 | | /// ```no_run |
213 | | /// use tokio::fs::File; |
214 | | /// use tokio::io::AsyncWriteExt; |
215 | | /// |
216 | | /// # async fn dox() -> std::io::Result<()> { |
217 | | /// let mut file = File::create_new("foo.txt").await?; |
218 | | /// file.write_all(b"hello, world!").await?; |
219 | | /// # Ok(()) |
220 | | /// # } |
221 | | /// ``` |
222 | | /// |
223 | | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
224 | | /// |
225 | | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
226 | | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
227 | 0 | pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> { |
228 | 0 | Self::options() |
229 | 0 | .read(true) |
230 | 0 | .write(true) |
231 | 0 | .create_new(true) |
232 | 0 | .open(path) |
233 | 0 | .await |
234 | 0 | } |
235 | | |
236 | | /// Returns a new [`OpenOptions`] object. |
237 | | /// |
238 | | /// This function returns a new `OpenOptions` object that you can use to |
239 | | /// open or create a file with specific options if `open()` or `create()` |
240 | | /// are not appropriate. |
241 | | /// |
242 | | /// It is equivalent to `OpenOptions::new()`, but allows you to write more |
243 | | /// readable code. Instead of |
244 | | /// `OpenOptions::new().append(true).open("example.log")`, |
245 | | /// you can write `File::options().append(true).open("example.log")`. This |
246 | | /// also avoids the need to import `OpenOptions`. |
247 | | /// |
248 | | /// See the [`OpenOptions::new`] function for more details. |
249 | | /// |
250 | | /// # Examples |
251 | | /// |
252 | | /// ```no_run |
253 | | /// use tokio::fs::File; |
254 | | /// use tokio::io::AsyncWriteExt; |
255 | | /// |
256 | | /// # async fn dox() -> std::io::Result<()> { |
257 | | /// let mut f = File::options().append(true).open("example.log").await?; |
258 | | /// f.write_all(b"new line\n").await?; |
259 | | /// # Ok(()) |
260 | | /// # } |
261 | | /// ``` |
262 | | #[must_use] |
263 | 0 | pub fn options() -> OpenOptions { |
264 | 0 | OpenOptions::new() |
265 | 0 | } |
266 | | |
267 | | /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File). |
268 | | /// |
269 | | /// # Examples |
270 | | /// |
271 | | /// ```no_run |
272 | | /// // This line could block. It is not recommended to do this on the Tokio |
273 | | /// // runtime. |
274 | | /// let std_file = std::fs::File::open("foo.txt").unwrap(); |
275 | | /// let file = tokio::fs::File::from_std(std_file); |
276 | | /// ``` |
277 | 0 | pub fn from_std(std: StdFile) -> File { |
278 | 0 | File { |
279 | 0 | std: Arc::new(std), |
280 | 0 | inner: Mutex::new(Inner { |
281 | 0 | state: State::Idle(Some(Buf::with_capacity(0))), |
282 | 0 | last_write_err: None, |
283 | 0 | pos: 0, |
284 | 0 | }), |
285 | 0 | max_buf_size: DEFAULT_MAX_BUF_SIZE, |
286 | 0 | } |
287 | 0 | } |
288 | | |
289 | | /// Attempts to sync all OS-internal metadata to disk. |
290 | | /// |
291 | | /// This function will attempt to ensure that all in-core data reaches the |
292 | | /// filesystem before returning. |
293 | | /// |
294 | | /// # Examples |
295 | | /// |
296 | | /// ```no_run |
297 | | /// use tokio::fs::File; |
298 | | /// use tokio::io::AsyncWriteExt; |
299 | | /// |
300 | | /// # async fn dox() -> std::io::Result<()> { |
301 | | /// let mut file = File::create("foo.txt").await?; |
302 | | /// file.write_all(b"hello, world!").await?; |
303 | | /// file.sync_all().await?; |
304 | | /// # Ok(()) |
305 | | /// # } |
306 | | /// ``` |
307 | | /// |
308 | | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
309 | | /// |
310 | | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
311 | | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
312 | 0 | pub async fn sync_all(&self) -> io::Result<()> { |
313 | 0 | let mut inner = self.inner.lock().await; |
314 | 0 | inner.complete_inflight().await; |
315 | | |
316 | 0 | let std = self.std.clone(); |
317 | 0 | asyncify(move || std.sync_all()).await |
318 | 0 | } |
319 | | |
320 | | /// This function is similar to `sync_all`, except that it may not |
321 | | /// synchronize file metadata to the filesystem. |
322 | | /// |
323 | | /// This is intended for use cases that must synchronize content, but don't |
324 | | /// need the metadata on disk. The goal of this method is to reduce disk |
325 | | /// operations. |
326 | | /// |
327 | | /// Note that some platforms may simply implement this in terms of `sync_all`. |
328 | | /// |
329 | | /// # Examples |
330 | | /// |
331 | | /// ```no_run |
332 | | /// use tokio::fs::File; |
333 | | /// use tokio::io::AsyncWriteExt; |
334 | | /// |
335 | | /// # async fn dox() -> std::io::Result<()> { |
336 | | /// let mut file = File::create("foo.txt").await?; |
337 | | /// file.write_all(b"hello, world!").await?; |
338 | | /// file.sync_data().await?; |
339 | | /// # Ok(()) |
340 | | /// # } |
341 | | /// ``` |
342 | | /// |
343 | | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
344 | | /// |
345 | | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
346 | | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
347 | 0 | pub async fn sync_data(&self) -> io::Result<()> { |
348 | 0 | let mut inner = self.inner.lock().await; |
349 | 0 | inner.complete_inflight().await; |
350 | | |
351 | 0 | let std = self.std.clone(); |
352 | 0 | asyncify(move || std.sync_data()).await |
353 | 0 | } |
354 | | |
355 | | /// Truncates or extends the underlying file, updating the size of this file to become size. |
356 | | /// |
357 | | /// If the size is less than the current file's size, then the file will be |
358 | | /// shrunk. If it is greater than the current file's size, then the file |
359 | | /// will be extended to size and have all of the intermediate data filled in |
360 | | /// with 0s. |
361 | | /// |
362 | | /// # Errors |
363 | | /// |
364 | | /// This function will return an error if the file is not opened for |
365 | | /// writing. |
366 | | /// |
367 | | /// # Examples |
368 | | /// |
369 | | /// ```no_run |
370 | | /// use tokio::fs::File; |
371 | | /// use tokio::io::AsyncWriteExt; |
372 | | /// |
373 | | /// # async fn dox() -> std::io::Result<()> { |
374 | | /// let mut file = File::create("foo.txt").await?; |
375 | | /// file.write_all(b"hello, world!").await?; |
376 | | /// file.set_len(10).await?; |
377 | | /// # Ok(()) |
378 | | /// # } |
379 | | /// ``` |
380 | | /// |
381 | | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
382 | | /// |
383 | | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
384 | | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
385 | 0 | pub async fn set_len(&self, size: u64) -> io::Result<()> { |
386 | 0 | let mut inner = self.inner.lock().await; |
387 | 0 | inner.complete_inflight().await; |
388 | | |
389 | 0 | let mut buf = match inner.state { |
390 | 0 | State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(), |
391 | 0 | _ => unreachable!(), |
392 | | }; |
393 | | |
394 | 0 | let seek = if !buf.is_empty() { |
395 | 0 | Some(SeekFrom::Current(buf.discard_read())) |
396 | | } else { |
397 | 0 | None |
398 | | }; |
399 | | |
400 | 0 | let std = self.std.clone(); |
401 | 0 |
|
402 | 0 | inner.state = State::Busy(spawn_blocking(move || { |
403 | 0 | let res = if let Some(seek) = seek { |
404 | 0 | (&*std).seek(seek).and_then(|_| std.set_len(size)) |
405 | | } else { |
406 | 0 | std.set_len(size) |
407 | | } |
408 | 0 | .map(|()| 0); // the value is discarded later |
409 | 0 |
|
410 | 0 | // Return the result as a seek |
411 | 0 | (Operation::Seek(res), buf) |
412 | 0 | })); |
413 | | |
414 | 0 | let (op, buf) = match inner.state { |
415 | 0 | State::Idle(_) => unreachable!(), |
416 | 0 | State::Busy(ref mut rx) => rx.await?, |
417 | | }; |
418 | | |
419 | 0 | inner.state = State::Idle(Some(buf)); |
420 | 0 |
|
421 | 0 | match op { |
422 | 0 | Operation::Seek(res) => res.map(|pos| { |
423 | 0 | inner.pos = pos; |
424 | 0 | }), |
425 | 0 | _ => unreachable!(), |
426 | | } |
427 | 0 | } |
428 | | |
429 | | /// Queries metadata about the underlying file. |
430 | | /// |
431 | | /// # Examples |
432 | | /// |
433 | | /// ```no_run |
434 | | /// use tokio::fs::File; |
435 | | /// |
436 | | /// # async fn dox() -> std::io::Result<()> { |
437 | | /// let file = File::open("foo.txt").await?; |
438 | | /// let metadata = file.metadata().await?; |
439 | | /// |
440 | | /// println!("{:?}", metadata); |
441 | | /// # Ok(()) |
442 | | /// # } |
443 | | /// ``` |
444 | 0 | pub async fn metadata(&self) -> io::Result<Metadata> { |
445 | 0 | let std = self.std.clone(); |
446 | 0 | asyncify(move || std.metadata()).await |
447 | 0 | } |
448 | | |
449 | | /// Creates a new `File` instance that shares the same underlying file handle |
450 | | /// as the existing `File` instance. Reads, writes, and seeks will affect both |
451 | | /// File instances simultaneously. |
452 | | /// |
453 | | /// # Examples |
454 | | /// |
455 | | /// ```no_run |
456 | | /// use tokio::fs::File; |
457 | | /// |
458 | | /// # async fn dox() -> std::io::Result<()> { |
459 | | /// let file = File::open("foo.txt").await?; |
460 | | /// let file_clone = file.try_clone().await?; |
461 | | /// # Ok(()) |
462 | | /// # } |
463 | | /// ``` |
464 | 0 | pub async fn try_clone(&self) -> io::Result<File> { |
465 | 0 | self.inner.lock().await.complete_inflight().await; |
466 | 0 | let std = self.std.clone(); |
467 | 0 | let std_file = asyncify(move || std.try_clone()).await?; |
468 | 0 | Ok(File::from_std(std_file)) |
469 | 0 | } |
470 | | |
471 | | /// Destructures `File` into a [`std::fs::File`]. This function is |
472 | | /// async to allow any in-flight operations to complete. |
473 | | /// |
474 | | /// Use `File::try_into_std` to attempt conversion immediately. |
475 | | /// |
476 | | /// # Examples |
477 | | /// |
478 | | /// ```no_run |
479 | | /// use tokio::fs::File; |
480 | | /// |
481 | | /// # async fn dox() -> std::io::Result<()> { |
482 | | /// let tokio_file = File::open("foo.txt").await?; |
483 | | /// let std_file = tokio_file.into_std().await; |
484 | | /// # Ok(()) |
485 | | /// # } |
486 | | /// ``` |
487 | 0 | pub async fn into_std(mut self) -> StdFile { |
488 | 0 | self.inner.get_mut().complete_inflight().await; |
489 | 0 | Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed") |
490 | 0 | } |
491 | | |
492 | | /// Tries to immediately destructure `File` into a [`std::fs::File`]. |
493 | | /// |
494 | | /// # Errors |
495 | | /// |
496 | | /// This function will return an error containing the file if some |
497 | | /// operation is in-flight. |
498 | | /// |
499 | | /// # Examples |
500 | | /// |
501 | | /// ```no_run |
502 | | /// use tokio::fs::File; |
503 | | /// |
504 | | /// # async fn dox() -> std::io::Result<()> { |
505 | | /// let tokio_file = File::open("foo.txt").await?; |
506 | | /// let std_file = tokio_file.try_into_std().unwrap(); |
507 | | /// # Ok(()) |
508 | | /// # } |
509 | | /// ``` |
510 | 0 | pub fn try_into_std(mut self) -> Result<StdFile, Self> { |
511 | 0 | match Arc::try_unwrap(self.std) { |
512 | 0 | Ok(file) => Ok(file), |
513 | 0 | Err(std_file_arc) => { |
514 | 0 | self.std = std_file_arc; |
515 | 0 | Err(self) |
516 | | } |
517 | | } |
518 | 0 | } |
519 | | |
520 | | /// Changes the permissions on the underlying file. |
521 | | /// |
522 | | /// # Platform-specific behavior |
523 | | /// |
524 | | /// This function currently corresponds to the `fchmod` function on Unix and |
525 | | /// the `SetFileInformationByHandle` function on Windows. Note that, this |
526 | | /// [may change in the future][changes]. |
527 | | /// |
528 | | /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior |
529 | | /// |
530 | | /// # Errors |
531 | | /// |
532 | | /// This function will return an error if the user lacks permission change |
533 | | /// attributes on the underlying file. It may also return an error in other |
534 | | /// os-specific unspecified cases. |
535 | | /// |
536 | | /// # Examples |
537 | | /// |
538 | | /// ```no_run |
539 | | /// use tokio::fs::File; |
540 | | /// |
541 | | /// # async fn dox() -> std::io::Result<()> { |
542 | | /// let file = File::open("foo.txt").await?; |
543 | | /// let mut perms = file.metadata().await?.permissions(); |
544 | | /// perms.set_readonly(true); |
545 | | /// file.set_permissions(perms).await?; |
546 | | /// # Ok(()) |
547 | | /// # } |
548 | | /// ``` |
549 | 0 | pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> { |
550 | 0 | let std = self.std.clone(); |
551 | 0 | asyncify(move || std.set_permissions(perm)).await |
552 | 0 | } |
553 | | |
554 | | /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation. |
555 | | /// |
556 | | /// Although Tokio uses a sensible default value for this buffer size, this function would be |
557 | | /// useful for changing that default depending on the situation. |
558 | | /// |
559 | | /// # Examples |
560 | | /// |
561 | | /// ```no_run |
562 | | /// use tokio::fs::File; |
563 | | /// use tokio::io::AsyncWriteExt; |
564 | | /// |
565 | | /// # async fn dox() -> std::io::Result<()> { |
566 | | /// let mut file = File::open("foo.txt").await?; |
567 | | /// |
568 | | /// // Set maximum buffer size to 8 MiB |
569 | | /// file.set_max_buf_size(8 * 1024 * 1024); |
570 | | /// |
571 | | /// let mut buf = vec![1; 1024 * 1024 * 1024]; |
572 | | /// |
573 | | /// // Write the 1 GiB buffer in chunks up to 8 MiB each. |
574 | | /// file.write_all(&mut buf).await?; |
575 | | /// # Ok(()) |
576 | | /// # } |
577 | | /// ``` |
578 | 0 | pub fn set_max_buf_size(&mut self, max_buf_size: usize) { |
579 | 0 | self.max_buf_size = max_buf_size; |
580 | 0 | } |
581 | | } |
582 | | |
583 | | impl AsyncRead for File { |
584 | 0 | fn poll_read( |
585 | 0 | self: Pin<&mut Self>, |
586 | 0 | cx: &mut Context<'_>, |
587 | 0 | dst: &mut ReadBuf<'_>, |
588 | 0 | ) -> Poll<io::Result<()>> { |
589 | 0 | ready!(crate::trace::trace_leaf(cx)); |
590 | | |
591 | 0 | let me = self.get_mut(); |
592 | 0 | let inner = me.inner.get_mut(); |
593 | | |
594 | | loop { |
595 | 0 | match inner.state { |
596 | 0 | State::Idle(ref mut buf_cell) => { |
597 | 0 | let mut buf = buf_cell.take().unwrap(); |
598 | 0 |
|
599 | 0 | if !buf.is_empty() || dst.remaining() == 0 { |
600 | 0 | buf.copy_to(dst); |
601 | 0 | *buf_cell = Some(buf); |
602 | 0 | return Poll::Ready(Ok(())); |
603 | 0 | } |
604 | 0 |
|
605 | 0 | let std = me.std.clone(); |
606 | 0 |
|
607 | 0 | let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size); |
608 | 0 | inner.state = State::Busy(spawn_blocking(move || { |
609 | 0 | // SAFETY: the `Read` implementation of `std` does not |
610 | 0 | // read from the buffer it is borrowing and correctly |
611 | 0 | // reports the length of the data written into the buffer. |
612 | 0 | let res = unsafe { buf.read_from(&mut &*std, max_buf_size) }; |
613 | 0 | (Operation::Read(res), buf) |
614 | 0 | })); |
615 | 0 | } |
616 | 0 | State::Busy(ref mut rx) => { |
617 | 0 | let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; |
618 | | |
619 | 0 | match op { |
620 | | Operation::Read(Ok(_)) => { |
621 | 0 | buf.copy_to(dst); |
622 | 0 | inner.state = State::Idle(Some(buf)); |
623 | 0 | return Poll::Ready(Ok(())); |
624 | | } |
625 | 0 | Operation::Read(Err(e)) => { |
626 | 0 | assert!(buf.is_empty()); |
627 | | |
628 | 0 | inner.state = State::Idle(Some(buf)); |
629 | 0 | return Poll::Ready(Err(e)); |
630 | | } |
631 | | Operation::Write(Ok(())) => { |
632 | 0 | assert!(buf.is_empty()); |
633 | 0 | inner.state = State::Idle(Some(buf)); |
634 | 0 | continue; |
635 | | } |
636 | 0 | Operation::Write(Err(e)) => { |
637 | 0 | assert!(inner.last_write_err.is_none()); |
638 | 0 | inner.last_write_err = Some(e.kind()); |
639 | 0 | inner.state = State::Idle(Some(buf)); |
640 | | } |
641 | 0 | Operation::Seek(result) => { |
642 | 0 | assert!(buf.is_empty()); |
643 | 0 | inner.state = State::Idle(Some(buf)); |
644 | 0 | if let Ok(pos) = result { |
645 | 0 | inner.pos = pos; |
646 | 0 | } |
647 | 0 | continue; |
648 | | } |
649 | | } |
650 | | } |
651 | | } |
652 | | } |
653 | 0 | } |
654 | | } |
655 | | |
656 | | impl AsyncSeek for File { |
657 | 0 | fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> { |
658 | 0 | let me = self.get_mut(); |
659 | 0 | let inner = me.inner.get_mut(); |
660 | 0 |
|
661 | 0 | match inner.state { |
662 | 0 | State::Busy(_) => Err(io::Error::new( |
663 | 0 | io::ErrorKind::Other, |
664 | 0 | "other file operation is pending, call poll_complete before start_seek", |
665 | 0 | )), |
666 | 0 | State::Idle(ref mut buf_cell) => { |
667 | 0 | let mut buf = buf_cell.take().unwrap(); |
668 | 0 |
|
669 | 0 | // Factor in any unread data from the buf |
670 | 0 | if !buf.is_empty() { |
671 | 0 | let n = buf.discard_read(); |
672 | | |
673 | 0 | if let SeekFrom::Current(ref mut offset) = pos { |
674 | 0 | *offset += n; |
675 | 0 | } |
676 | 0 | } |
677 | | |
678 | 0 | let std = me.std.clone(); |
679 | 0 |
|
680 | 0 | inner.state = State::Busy(spawn_blocking(move || { |
681 | 0 | let res = (&*std).seek(pos); |
682 | 0 | (Operation::Seek(res), buf) |
683 | 0 | })); |
684 | 0 | Ok(()) |
685 | | } |
686 | | } |
687 | 0 | } |
688 | | |
689 | 0 | fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { |
690 | 0 | ready!(crate::trace::trace_leaf(cx)); |
691 | 0 | let inner = self.inner.get_mut(); |
692 | | |
693 | | loop { |
694 | 0 | match inner.state { |
695 | 0 | State::Idle(_) => return Poll::Ready(Ok(inner.pos)), |
696 | 0 | State::Busy(ref mut rx) => { |
697 | 0 | let (op, buf) = ready!(Pin::new(rx).poll(cx))?; |
698 | 0 | inner.state = State::Idle(Some(buf)); |
699 | | |
700 | 0 | match op { |
701 | 0 | Operation::Read(_) => {} |
702 | 0 | Operation::Write(Err(e)) => { |
703 | 0 | assert!(inner.last_write_err.is_none()); |
704 | 0 | inner.last_write_err = Some(e.kind()); |
705 | | } |
706 | 0 | Operation::Write(_) => {} |
707 | 0 | Operation::Seek(res) => { |
708 | 0 | if let Ok(pos) = res { |
709 | 0 | inner.pos = pos; |
710 | 0 | } |
711 | 0 | return Poll::Ready(res); |
712 | | } |
713 | | } |
714 | | } |
715 | | } |
716 | | } |
717 | 0 | } |
718 | | } |
719 | | |
720 | | impl AsyncWrite for File { |
721 | 0 | fn poll_write( |
722 | 0 | self: Pin<&mut Self>, |
723 | 0 | cx: &mut Context<'_>, |
724 | 0 | src: &[u8], |
725 | 0 | ) -> Poll<io::Result<usize>> { |
726 | 0 | ready!(crate::trace::trace_leaf(cx)); |
727 | 0 | let me = self.get_mut(); |
728 | 0 | let inner = me.inner.get_mut(); |
729 | | |
730 | 0 | if let Some(e) = inner.last_write_err.take() { |
731 | 0 | return Poll::Ready(Err(e.into())); |
732 | 0 | } |
733 | | |
734 | | loop { |
735 | 0 | match inner.state { |
736 | 0 | State::Idle(ref mut buf_cell) => { |
737 | 0 | let mut buf = buf_cell.take().unwrap(); |
738 | | |
739 | 0 | let seek = if !buf.is_empty() { |
740 | 0 | Some(SeekFrom::Current(buf.discard_read())) |
741 | | } else { |
742 | 0 | None |
743 | | }; |
744 | | |
745 | 0 | let n = buf.copy_from(src, me.max_buf_size); |
746 | 0 | let std = me.std.clone(); |
747 | | |
748 | 0 | let blocking_task_join_handle = spawn_mandatory_blocking(move || { |
749 | 0 | let res = if let Some(seek) = seek { |
750 | 0 | (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) |
751 | | } else { |
752 | 0 | buf.write_to(&mut &*std) |
753 | | }; |
754 | | |
755 | 0 | (Operation::Write(res), buf) |
756 | 0 | }) |
757 | 0 | .ok_or_else(|| { |
758 | 0 | io::Error::new(io::ErrorKind::Other, "background task failed") |
759 | 0 | })?; |
760 | | |
761 | 0 | inner.state = State::Busy(blocking_task_join_handle); |
762 | 0 |
|
763 | 0 | return Poll::Ready(Ok(n)); |
764 | | } |
765 | 0 | State::Busy(ref mut rx) => { |
766 | 0 | let (op, buf) = ready!(Pin::new(rx).poll(cx))?; |
767 | 0 | inner.state = State::Idle(Some(buf)); |
768 | 0 |
|
769 | 0 | match op { |
770 | | Operation::Read(_) => { |
771 | | // We don't care about the result here. The fact |
772 | | // that the cursor has advanced will be reflected in |
773 | | // the next iteration of the loop |
774 | 0 | continue; |
775 | | } |
776 | 0 | Operation::Write(res) => { |
777 | 0 | // If the previous write was successful, continue. |
778 | 0 | // Otherwise, error. |
779 | 0 | res?; |
780 | 0 | continue; |
781 | | } |
782 | | Operation::Seek(_) => { |
783 | | // Ignore the seek |
784 | 0 | continue; |
785 | | } |
786 | | } |
787 | | } |
788 | | } |
789 | | } |
790 | 0 | } |
791 | | |
792 | 0 | fn poll_write_vectored( |
793 | 0 | self: Pin<&mut Self>, |
794 | 0 | cx: &mut Context<'_>, |
795 | 0 | bufs: &[io::IoSlice<'_>], |
796 | 0 | ) -> Poll<Result<usize, io::Error>> { |
797 | 0 | ready!(crate::trace::trace_leaf(cx)); |
798 | 0 | let me = self.get_mut(); |
799 | 0 | let inner = me.inner.get_mut(); |
800 | | |
801 | 0 | if let Some(e) = inner.last_write_err.take() { |
802 | 0 | return Poll::Ready(Err(e.into())); |
803 | 0 | } |
804 | | |
805 | | loop { |
806 | 0 | match inner.state { |
807 | 0 | State::Idle(ref mut buf_cell) => { |
808 | 0 | let mut buf = buf_cell.take().unwrap(); |
809 | | |
810 | 0 | let seek = if !buf.is_empty() { |
811 | 0 | Some(SeekFrom::Current(buf.discard_read())) |
812 | | } else { |
813 | 0 | None |
814 | | }; |
815 | | |
816 | 0 | let n = buf.copy_from_bufs(bufs, me.max_buf_size); |
817 | 0 | let std = me.std.clone(); |
818 | | |
819 | 0 | let blocking_task_join_handle = spawn_mandatory_blocking(move || { |
820 | 0 | let res = if let Some(seek) = seek { |
821 | 0 | (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) |
822 | | } else { |
823 | 0 | buf.write_to(&mut &*std) |
824 | | }; |
825 | | |
826 | 0 | (Operation::Write(res), buf) |
827 | 0 | }) |
828 | 0 | .ok_or_else(|| { |
829 | 0 | io::Error::new(io::ErrorKind::Other, "background task failed") |
830 | 0 | })?; |
831 | | |
832 | 0 | inner.state = State::Busy(blocking_task_join_handle); |
833 | 0 |
|
834 | 0 | return Poll::Ready(Ok(n)); |
835 | | } |
836 | 0 | State::Busy(ref mut rx) => { |
837 | 0 | let (op, buf) = ready!(Pin::new(rx).poll(cx))?; |
838 | 0 | inner.state = State::Idle(Some(buf)); |
839 | 0 |
|
840 | 0 | match op { |
841 | | Operation::Read(_) => { |
842 | | // We don't care about the result here. The fact |
843 | | // that the cursor has advanced will be reflected in |
844 | | // the next iteration of the loop |
845 | 0 | continue; |
846 | | } |
847 | 0 | Operation::Write(res) => { |
848 | 0 | // If the previous write was successful, continue. |
849 | 0 | // Otherwise, error. |
850 | 0 | res?; |
851 | 0 | continue; |
852 | | } |
853 | | Operation::Seek(_) => { |
854 | | // Ignore the seek |
855 | 0 | continue; |
856 | | } |
857 | | } |
858 | | } |
859 | | } |
860 | | } |
861 | 0 | } |
862 | | |
863 | 0 | fn is_write_vectored(&self) -> bool { |
864 | 0 | true |
865 | 0 | } |
866 | | |
867 | 0 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
868 | 0 | ready!(crate::trace::trace_leaf(cx)); |
869 | 0 | let inner = self.inner.get_mut(); |
870 | 0 | inner.poll_flush(cx) |
871 | 0 | } |
872 | | |
873 | 0 | fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
874 | 0 | ready!(crate::trace::trace_leaf(cx)); |
875 | 0 | self.poll_flush(cx) |
876 | 0 | } |
877 | | } |
878 | | |
879 | | impl From<StdFile> for File { |
880 | 0 | fn from(std: StdFile) -> Self { |
881 | 0 | Self::from_std(std) |
882 | 0 | } |
883 | | } |
884 | | |
885 | | impl fmt::Debug for File { |
886 | 0 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
887 | 0 | fmt.debug_struct("tokio::fs::File") |
888 | 0 | .field("std", &self.std) |
889 | 0 | .finish() |
890 | 0 | } |
891 | | } |
892 | | |
893 | | #[cfg(unix)] |
894 | | impl std::os::unix::io::AsRawFd for File { |
895 | 0 | fn as_raw_fd(&self) -> std::os::unix::io::RawFd { |
896 | 0 | self.std.as_raw_fd() |
897 | 0 | } |
898 | | } |
899 | | |
900 | | #[cfg(unix)] |
901 | | impl std::os::unix::io::AsFd for File { |
902 | 0 | fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> { |
903 | 0 | unsafe { |
904 | 0 | std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self)) |
905 | 0 | } |
906 | 0 | } |
907 | | } |
908 | | |
909 | | #[cfg(unix)] |
910 | | impl std::os::unix::io::FromRawFd for File { |
911 | 0 | unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self { |
912 | 0 | StdFile::from_raw_fd(fd).into() |
913 | 0 | } |
914 | | } |
915 | | |
916 | | cfg_windows! { |
917 | | use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle}; |
918 | | |
919 | | impl AsRawHandle for File { |
920 | | fn as_raw_handle(&self) -> RawHandle { |
921 | | self.std.as_raw_handle() |
922 | | } |
923 | | } |
924 | | |
925 | | impl AsHandle for File { |
926 | | fn as_handle(&self) -> BorrowedHandle<'_> { |
927 | | unsafe { |
928 | | BorrowedHandle::borrow_raw( |
929 | | AsRawHandle::as_raw_handle(self), |
930 | | ) |
931 | | } |
932 | | } |
933 | | } |
934 | | |
935 | | impl FromRawHandle for File { |
936 | | unsafe fn from_raw_handle(handle: RawHandle) -> Self { |
937 | | StdFile::from_raw_handle(handle).into() |
938 | | } |
939 | | } |
940 | | } |
941 | | |
942 | | impl Inner { |
943 | 0 | async fn complete_inflight(&mut self) { |
944 | | use std::future::poll_fn; |
945 | | |
946 | 0 | poll_fn(|cx| self.poll_complete_inflight(cx)).await; |
947 | 0 | } |
948 | | |
949 | 0 | fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
950 | 0 | ready!(crate::trace::trace_leaf(cx)); |
951 | 0 | match self.poll_flush(cx) { |
952 | 0 | Poll::Ready(Err(e)) => { |
953 | 0 | self.last_write_err = Some(e.kind()); |
954 | 0 | Poll::Ready(()) |
955 | | } |
956 | 0 | Poll::Ready(Ok(())) => Poll::Ready(()), |
957 | 0 | Poll::Pending => Poll::Pending, |
958 | | } |
959 | 0 | } |
960 | | |
961 | 0 | fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
962 | 0 | if let Some(e) = self.last_write_err.take() { |
963 | 0 | return Poll::Ready(Err(e.into())); |
964 | 0 | } |
965 | | |
966 | 0 | let (op, buf) = match self.state { |
967 | 0 | State::Idle(_) => return Poll::Ready(Ok(())), |
968 | 0 | State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?, |
969 | | }; |
970 | | |
971 | | // The buffer is not used here |
972 | 0 | self.state = State::Idle(Some(buf)); |
973 | 0 |
|
974 | 0 | match op { |
975 | 0 | Operation::Read(_) => Poll::Ready(Ok(())), |
976 | 0 | Operation::Write(res) => Poll::Ready(res), |
977 | 0 | Operation::Seek(_) => Poll::Ready(Ok(())), |
978 | | } |
979 | 0 | } |
980 | | } |
981 | | |
982 | | #[cfg(test)] |
983 | | mod tests; |