tokio/fs/file.rs
1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::cmp;
11use std::fmt;
12use std::fs::{Metadata, Permissions};
13use std::future::Future;
14use std::io::{self, Seek, SeekFrom};
15use std::path::Path;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{ready, Context, Poll};
19
20#[cfg(test)]
21use super::mocks::JoinHandle;
22#[cfg(test)]
23use super::mocks::MockFile as StdFile;
24#[cfg(test)]
25use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
26#[cfg(not(test))]
27use crate::blocking::JoinHandle;
28#[cfg(not(test))]
29use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
30#[cfg(not(test))]
31use std::fs::File as StdFile;
32
33/// A reference to an open file on the filesystem.
34///
35/// This is a specialized version of [`std::fs::File`] for usage from the
36/// Tokio runtime.
37///
38/// An instance of a `File` can be read and/or written depending on what options
39/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
40/// cursor that the file contains internally.
41///
42/// A file will not be closed immediately when it goes out of scope if there
43/// are any IO operations that have not yet completed. To ensure that a file is
44/// closed immediately when it is dropped, you should call [`flush`] before
45/// dropping it. Note that this does not ensure that the file has been fully
46/// written to disk; the operating system might keep the changes around in an
47/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
48/// the data to disk.
49///
50/// Reading and writing to a `File` is usually done using the convenience
51/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
52///
53/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
54/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
55/// [`sync_all`]: fn@crate::fs::File::sync_all
56/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
57/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
58///
59/// # Examples
60///
61/// Create a new file and asynchronously write bytes to it:
62///
63/// ```no_run
64/// use tokio::fs::File;
65/// use tokio::io::AsyncWriteExt; // for write_all()
66///
67/// # async fn dox() -> std::io::Result<()> {
68/// let mut file = File::create("foo.txt").await?;
69/// file.write_all(b"hello, world!").await?;
70/// # Ok(())
71/// # }
72/// ```
73///
74/// Read the contents of a file into a buffer:
75///
76/// ```no_run
77/// use tokio::fs::File;
78/// use tokio::io::AsyncReadExt; // for read_to_end()
79///
80/// # async fn dox() -> std::io::Result<()> {
81/// let mut file = File::open("foo.txt").await?;
82///
83/// let mut contents = vec![];
84/// file.read_to_end(&mut contents).await?;
85///
86/// println!("len = {}", contents.len());
87/// # Ok(())
88/// # }
89/// ```
90pub struct File {
91 std: Arc<StdFile>,
92 inner: Mutex<Inner>,
93 max_buf_size: usize,
94}
95
96struct Inner {
97 state: State,
98
99 /// Errors from writes/flushes are returned in write/flush calls. If a write
100 /// error is observed while performing a read, it is saved until the next
101 /// write / flush call.
102 last_write_err: Option<io::ErrorKind>,
103
104 pos: u64,
105}
106
107#[derive(Debug)]
108enum State {
109 Idle(Option<Buf>),
110 Busy(JoinHandle<(Operation, Buf)>),
111}
112
113#[derive(Debug)]
114enum Operation {
115 Read(io::Result<usize>),
116 Write(io::Result<()>),
117 Seek(io::Result<u64>),
118}
119
120impl File {
121 /// Attempts to open a file in read-only mode.
122 ///
123 /// See [`OpenOptions`] for more details.
124 ///
125 /// # Errors
126 ///
127 /// This function will return an error if called from outside of the Tokio
128 /// runtime or if path does not already exist. Other errors may also be
129 /// returned according to `OpenOptions::open`.
130 ///
131 /// # Examples
132 ///
133 /// ```no_run
134 /// use tokio::fs::File;
135 /// use tokio::io::AsyncReadExt;
136 ///
137 /// # async fn dox() -> std::io::Result<()> {
138 /// let mut file = File::open("foo.txt").await?;
139 ///
140 /// let mut contents = vec![];
141 /// file.read_to_end(&mut contents).await?;
142 ///
143 /// println!("len = {}", contents.len());
144 /// # Ok(())
145 /// # }
146 /// ```
147 ///
148 /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
149 ///
150 /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
151 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
152 pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
153 Self::options().read(true).open(path).await
154 }
155
156 /// Opens a file in write-only mode.
157 ///
158 /// This function will create a file if it does not exist, and will truncate
159 /// it if it does.
160 ///
161 /// See [`OpenOptions`] for more details.
162 ///
163 /// # Errors
164 ///
165 /// Results in an error if called from outside of the Tokio runtime or if
166 /// the underlying [`create`] call results in an error.
167 ///
168 /// [`create`]: std::fs::File::create
169 ///
170 /// # Examples
171 ///
172 /// ```no_run
173 /// use tokio::fs::File;
174 /// use tokio::io::AsyncWriteExt;
175 ///
176 /// # async fn dox() -> std::io::Result<()> {
177 /// let mut file = File::create("foo.txt").await?;
178 /// file.write_all(b"hello, world!").await?;
179 /// # Ok(())
180 /// # }
181 /// ```
182 ///
183 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
184 ///
185 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
186 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
187 pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
188 Self::options()
189 .write(true)
190 .create(true)
191 .truncate(true)
192 .open(path)
193 .await
194 }
195
196 /// Opens a file in read-write mode.
197 ///
198 /// This function will create a file if it does not exist, or return an error
199 /// if it does. This way, if the call succeeds, the file returned is guaranteed
200 /// to be new.
201 ///
202 /// This option is useful because it is atomic. Otherwise between checking
203 /// whether a file exists and creating a new one, the file may have been
204 /// created by another process (a TOCTOU race condition / attack).
205 ///
206 /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
207 ///
208 /// See [`OpenOptions`] for more details.
209 ///
210 /// # Examples
211 ///
212 /// ```no_run
213 /// use tokio::fs::File;
214 /// use tokio::io::AsyncWriteExt;
215 ///
216 /// # async fn dox() -> std::io::Result<()> {
217 /// let mut file = File::create_new("foo.txt").await?;
218 /// file.write_all(b"hello, world!").await?;
219 /// # Ok(())
220 /// # }
221 /// ```
222 ///
223 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
224 ///
225 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
226 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
227 pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
228 Self::options()
229 .read(true)
230 .write(true)
231 .create_new(true)
232 .open(path)
233 .await
234 }
235
236 /// Returns a new [`OpenOptions`] object.
237 ///
238 /// This function returns a new `OpenOptions` object that you can use to
239 /// open or create a file with specific options if `open()` or `create()`
240 /// are not appropriate.
241 ///
242 /// It is equivalent to `OpenOptions::new()`, but allows you to write more
243 /// readable code. Instead of
244 /// `OpenOptions::new().append(true).open("example.log")`,
245 /// you can write `File::options().append(true).open("example.log")`. This
246 /// also avoids the need to import `OpenOptions`.
247 ///
248 /// See the [`OpenOptions::new`] function for more details.
249 ///
250 /// # Examples
251 ///
252 /// ```no_run
253 /// use tokio::fs::File;
254 /// use tokio::io::AsyncWriteExt;
255 ///
256 /// # async fn dox() -> std::io::Result<()> {
257 /// let mut f = File::options().append(true).open("example.log").await?;
258 /// f.write_all(b"new line\n").await?;
259 /// # Ok(())
260 /// # }
261 /// ```
262 #[must_use]
263 pub fn options() -> OpenOptions {
264 OpenOptions::new()
265 }
266
267 /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
268 ///
269 /// # Examples
270 ///
271 /// ```no_run
272 /// // This line could block. It is not recommended to do this on the Tokio
273 /// // runtime.
274 /// let std_file = std::fs::File::open("foo.txt").unwrap();
275 /// let file = tokio::fs::File::from_std(std_file);
276 /// ```
277 pub fn from_std(std: StdFile) -> File {
278 File {
279 std: Arc::new(std),
280 inner: Mutex::new(Inner {
281 state: State::Idle(Some(Buf::with_capacity(0))),
282 last_write_err: None,
283 pos: 0,
284 }),
285 max_buf_size: DEFAULT_MAX_BUF_SIZE,
286 }
287 }
288
289 /// Attempts to sync all OS-internal metadata to disk.
290 ///
291 /// This function will attempt to ensure that all in-core data reaches the
292 /// filesystem before returning.
293 ///
294 /// # Examples
295 ///
296 /// ```no_run
297 /// use tokio::fs::File;
298 /// use tokio::io::AsyncWriteExt;
299 ///
300 /// # async fn dox() -> std::io::Result<()> {
301 /// let mut file = File::create("foo.txt").await?;
302 /// file.write_all(b"hello, world!").await?;
303 /// file.sync_all().await?;
304 /// # Ok(())
305 /// # }
306 /// ```
307 ///
308 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
309 ///
310 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
311 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
312 pub async fn sync_all(&self) -> io::Result<()> {
313 let mut inner = self.inner.lock().await;
314 inner.complete_inflight().await;
315
316 let std = self.std.clone();
317 asyncify(move || std.sync_all()).await
318 }
319
320 /// This function is similar to `sync_all`, except that it may not
321 /// synchronize file metadata to the filesystem.
322 ///
323 /// This is intended for use cases that must synchronize content, but don't
324 /// need the metadata on disk. The goal of this method is to reduce disk
325 /// operations.
326 ///
327 /// Note that some platforms may simply implement this in terms of `sync_all`.
328 ///
329 /// # Examples
330 ///
331 /// ```no_run
332 /// use tokio::fs::File;
333 /// use tokio::io::AsyncWriteExt;
334 ///
335 /// # async fn dox() -> std::io::Result<()> {
336 /// let mut file = File::create("foo.txt").await?;
337 /// file.write_all(b"hello, world!").await?;
338 /// file.sync_data().await?;
339 /// # Ok(())
340 /// # }
341 /// ```
342 ///
343 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
344 ///
345 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
346 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
347 pub async fn sync_data(&self) -> io::Result<()> {
348 let mut inner = self.inner.lock().await;
349 inner.complete_inflight().await;
350
351 let std = self.std.clone();
352 asyncify(move || std.sync_data()).await
353 }
354
355 /// Truncates or extends the underlying file, updating the size of this file to become size.
356 ///
357 /// If the size is less than the current file's size, then the file will be
358 /// shrunk. If it is greater than the current file's size, then the file
359 /// will be extended to size and have all of the intermediate data filled in
360 /// with 0s.
361 ///
362 /// # Errors
363 ///
364 /// This function will return an error if the file is not opened for
365 /// writing.
366 ///
367 /// # Examples
368 ///
369 /// ```no_run
370 /// use tokio::fs::File;
371 /// use tokio::io::AsyncWriteExt;
372 ///
373 /// # async fn dox() -> std::io::Result<()> {
374 /// let mut file = File::create("foo.txt").await?;
375 /// file.write_all(b"hello, world!").await?;
376 /// file.set_len(10).await?;
377 /// # Ok(())
378 /// # }
379 /// ```
380 ///
381 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
382 ///
383 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
384 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
385 pub async fn set_len(&self, size: u64) -> io::Result<()> {
386 let mut inner = self.inner.lock().await;
387 inner.complete_inflight().await;
388
389 let mut buf = match inner.state {
390 State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
391 _ => unreachable!(),
392 };
393
394 let seek = if !buf.is_empty() {
395 Some(SeekFrom::Current(buf.discard_read()))
396 } else {
397 None
398 };
399
400 let std = self.std.clone();
401
402 inner.state = State::Busy(spawn_blocking(move || {
403 let res = if let Some(seek) = seek {
404 (&*std).seek(seek).and_then(|_| std.set_len(size))
405 } else {
406 std.set_len(size)
407 }
408 .map(|()| 0); // the value is discarded later
409
410 // Return the result as a seek
411 (Operation::Seek(res), buf)
412 }));
413
414 let (op, buf) = match inner.state {
415 State::Idle(_) => unreachable!(),
416 State::Busy(ref mut rx) => rx.await?,
417 };
418
419 inner.state = State::Idle(Some(buf));
420
421 match op {
422 Operation::Seek(res) => res.map(|pos| {
423 inner.pos = pos;
424 }),
425 _ => unreachable!(),
426 }
427 }
428
429 /// Queries metadata about the underlying file.
430 ///
431 /// # Examples
432 ///
433 /// ```no_run
434 /// use tokio::fs::File;
435 ///
436 /// # async fn dox() -> std::io::Result<()> {
437 /// let file = File::open("foo.txt").await?;
438 /// let metadata = file.metadata().await?;
439 ///
440 /// println!("{:?}", metadata);
441 /// # Ok(())
442 /// # }
443 /// ```
444 pub async fn metadata(&self) -> io::Result<Metadata> {
445 let std = self.std.clone();
446 asyncify(move || std.metadata()).await
447 }
448
449 /// Executes an `IORING_OP_URING_CMD` operation on this file descriptor.
450 ///
451 /// This submits a 16-byte device/file-specific command payload that is
452 /// handled by the kernel subsystem backing this file descriptor.
453 /// Since the commands and their payloads are device specific, there is no
454 /// central list of possible commands. See the relevant kernel subsystem
455 /// documentation for the device you are interacting with.
456 ///
457 /// # io_uring support
458 ///
459 /// To use this API, enable `--cfg tokio_unstable`, the `io-uring` feature,
460 /// `Builder::enable_io`, and `Builder::enable_io_uring` on Linux.
461 ///
462 /// # Safety
463 ///
464 /// This function is unsafe because one could send privileged commands
465 /// or commands with payloads "telling" the kernel to read/write random
466 /// memory addresses specific to the command, which could fail due to
467 /// use-after-free or cause data corruption.
468 ///
469 /// # Examples
470 ///
471 /// ```no_run
472 /// # #[tokio::main]
473 /// # async fn main() -> std::io::Result<()> {
474 /// use tokio::fs::File;
475 ///
476 /// // Suppose you have a character device that supports a specific `uring_cmd`.
477 /// let file = File::open("/dev/my_custom_device").await?;
478 ///
479 /// // The `cmd_op` is defined by the device driver (similar to an ioctl number).
480 /// const MY_DRIVER_CMD_OP: u32 = 0x1234;
481 ///
482 /// // The 16-byte payload is also defined by the device driver.
483 /// // Often, this is used to pass small serialized C structs or pointers.
484 /// #[repr(C)]
485 /// struct MyCmdData {
486 /// device_id: u32,
487 /// size_param: u32,
488 /// _pad: [u8; 8], // Pad to exactly 16 bytes
489 /// }
490 ///
491 /// let data = MyCmdData {
492 /// device_id: 42,
493 /// size_param: 100,
494 /// _pad: [0; 8],
495 /// };
496 ///
497 /// // Safely transmute the 16-byte struct into a byte array
498 /// let cmd_payload: [u8; 16] = unsafe { std::mem::transmute(data) };
499 ///
500 /// // SAFETY: The command and payload must be exactly what the specific
501 /// // device driver expects to prevent undefined behavior in the kernel.
502 /// let _ = unsafe { file.uring_cmd(MY_DRIVER_CMD_OP, cmd_payload, None).await };
503 /// # Ok(())
504 /// # }
505 /// ```
506 ///
507 /// # Errors
508 ///
509 /// Returns [`std::io::ErrorKind::Unsupported`] if `io_uring` or
510 /// `IORING_OP_URING_CMD` is not available at runtime.
511 #[cfg(all(
512 tokio_unstable,
513 feature = "io-uring",
514 feature = "rt",
515 feature = "fs",
516 target_os = "linux"
517 ))]
518 pub async unsafe fn uring_cmd(
519 &self,
520 cmd_op: u32,
521 cmd: [u8; 16],
522 buf_index: Option<u16>,
523 ) -> io::Result<u32> {
524 use crate::runtime::driver::op::Op;
525 use std::os::fd::OwnedFd;
526
527 self.inner.lock().await.complete_inflight().await;
528
529 let handle = crate::runtime::Handle::current();
530 let driver_handle = handle.inner.driver().io();
531
532 if !driver_handle
533 .check_and_init(io_uring::opcode::UringCmd16::CODE)
534 .await?
535 {
536 return Err(io::Error::new(
537 io::ErrorKind::Unsupported,
538 "io_uring uring_cmd is not supported",
539 ));
540 }
541
542 let fd: OwnedFd = self.std.try_clone()?.into();
543 let (res, _fd) = Op::uring_cmd16(fd, cmd_op, cmd, buf_index).await;
544 res
545 }
546
547 /// Creates a new `File` instance that shares the same underlying file handle
548 /// as the existing `File` instance. Reads, writes, and seeks will affect both
549 /// File instances simultaneously.
550 ///
551 /// # Examples
552 ///
553 /// ```no_run
554 /// use tokio::fs::File;
555 ///
556 /// # async fn dox() -> std::io::Result<()> {
557 /// let file = File::open("foo.txt").await?;
558 /// let file_clone = file.try_clone().await?;
559 /// # Ok(())
560 /// # }
561 /// ```
562 pub async fn try_clone(&self) -> io::Result<File> {
563 self.inner.lock().await.complete_inflight().await;
564 let std = self.std.clone();
565 let std_file = asyncify(move || std.try_clone()).await?;
566 let mut file = File::from_std(std_file);
567 file.set_max_buf_size(self.max_buf_size);
568 Ok(file)
569 }
570
571 /// Destructures `File` into a [`std::fs::File`]. This function is
572 /// async to allow any in-flight operations to complete.
573 ///
574 /// Use `File::try_into_std` to attempt conversion immediately.
575 ///
576 /// # Examples
577 ///
578 /// ```no_run
579 /// use tokio::fs::File;
580 ///
581 /// # async fn dox() -> std::io::Result<()> {
582 /// let tokio_file = File::open("foo.txt").await?;
583 /// let std_file = tokio_file.into_std().await;
584 /// # Ok(())
585 /// # }
586 /// ```
587 pub async fn into_std(mut self) -> StdFile {
588 self.inner.get_mut().complete_inflight().await;
589 Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
590 }
591
592 /// Tries to immediately destructure `File` into a [`std::fs::File`].
593 ///
594 /// # Errors
595 ///
596 /// This function will return an error containing the file if some
597 /// operation is in-flight.
598 ///
599 /// # Examples
600 ///
601 /// ```no_run
602 /// use tokio::fs::File;
603 ///
604 /// # async fn dox() -> std::io::Result<()> {
605 /// let tokio_file = File::open("foo.txt").await?;
606 /// let std_file = tokio_file.try_into_std().unwrap();
607 /// # Ok(())
608 /// # }
609 /// ```
610 #[allow(clippy::result_large_err)]
611 pub fn try_into_std(mut self) -> Result<StdFile, Self> {
612 match Arc::try_unwrap(self.std) {
613 Ok(file) => Ok(file),
614 Err(std_file_arc) => {
615 self.std = std_file_arc;
616 Err(self)
617 }
618 }
619 }
620
621 /// Changes the permissions on the underlying file.
622 ///
623 /// # Platform-specific behavior
624 ///
625 /// This function currently corresponds to the `fchmod` function on Unix and
626 /// the `SetFileInformationByHandle` function on Windows. Note that, this
627 /// [may change in the future][changes].
628 ///
629 /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
630 ///
631 /// # Errors
632 ///
633 /// This function will return an error if the user lacks permission change
634 /// attributes on the underlying file. It may also return an error in other
635 /// os-specific unspecified cases.
636 ///
637 /// # Examples
638 ///
639 /// ```no_run
640 /// use tokio::fs::File;
641 ///
642 /// # async fn dox() -> std::io::Result<()> {
643 /// let file = File::open("foo.txt").await?;
644 /// let mut perms = file.metadata().await?.permissions();
645 /// perms.set_readonly(true);
646 /// file.set_permissions(perms).await?;
647 /// # Ok(())
648 /// # }
649 /// ```
650 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
651 let std = self.std.clone();
652 asyncify(move || std.set_permissions(perm)).await
653 }
654
655 /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
656 ///
657 /// Although Tokio uses a sensible default value for this buffer size, this function would be
658 /// useful for changing that default depending on the situation.
659 ///
660 /// # Examples
661 ///
662 /// ```no_run
663 /// use tokio::fs::File;
664 /// use tokio::io::AsyncWriteExt;
665 ///
666 /// # async fn dox() -> std::io::Result<()> {
667 /// let mut file = File::open("foo.txt").await?;
668 ///
669 /// // Set maximum buffer size to 8 MiB
670 /// file.set_max_buf_size(8 * 1024 * 1024);
671 ///
672 /// let mut buf = vec![1; 1024 * 1024 * 1024];
673 ///
674 /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
675 /// file.write_all(&mut buf).await?;
676 /// # Ok(())
677 /// # }
678 /// ```
679 pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
680 self.max_buf_size = max_buf_size;
681 }
682
683 /// Get the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
684 pub fn max_buf_size(&self) -> usize {
685 self.max_buf_size
686 }
687}
688
689impl AsyncRead for File {
690 fn poll_read(
691 self: Pin<&mut Self>,
692 cx: &mut Context<'_>,
693 dst: &mut ReadBuf<'_>,
694 ) -> Poll<io::Result<()>> {
695 ready!(crate::trace::trace_leaf(cx));
696
697 let me = self.get_mut();
698 let inner = me.inner.get_mut();
699
700 loop {
701 match inner.state {
702 State::Idle(ref mut buf_cell) => {
703 let mut buf = buf_cell.take().unwrap();
704
705 if !buf.is_empty() || dst.remaining() == 0 {
706 buf.copy_to(dst);
707 *buf_cell = Some(buf);
708 return Poll::Ready(Ok(()));
709 }
710
711 let std = me.std.clone();
712
713 let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
714 inner.state = State::Busy(spawn_blocking(move || {
715 // SAFETY: the `Read` implementation of `std` does not
716 // read from the buffer it is borrowing and correctly
717 // reports the length of the data written into the buffer.
718 let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
719 (Operation::Read(res), buf)
720 }));
721 }
722 State::Busy(ref mut rx) => {
723 let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
724
725 match op {
726 Operation::Read(Ok(_)) => {
727 buf.copy_to(dst);
728 inner.state = State::Idle(Some(buf));
729 return Poll::Ready(Ok(()));
730 }
731 Operation::Read(Err(e)) => {
732 assert!(buf.is_empty());
733
734 inner.state = State::Idle(Some(buf));
735 return Poll::Ready(Err(e));
736 }
737 Operation::Write(Ok(())) => {
738 assert!(buf.is_empty());
739 inner.state = State::Idle(Some(buf));
740 continue;
741 }
742 Operation::Write(Err(e)) => {
743 assert!(inner.last_write_err.is_none());
744 inner.last_write_err = Some(e.kind());
745 inner.state = State::Idle(Some(buf));
746 }
747 Operation::Seek(result) => {
748 assert!(buf.is_empty());
749 inner.state = State::Idle(Some(buf));
750 if let Ok(pos) = result {
751 inner.pos = pos;
752 }
753 continue;
754 }
755 }
756 }
757 }
758 }
759 }
760}
761
762impl AsyncSeek for File {
763 fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
764 let me = self.get_mut();
765 let inner = me.inner.get_mut();
766
767 match inner.state {
768 State::Busy(_) => Err(io::Error::new(
769 io::ErrorKind::Other,
770 "other file operation is pending, call poll_complete before start_seek",
771 )),
772 State::Idle(ref mut buf_cell) => {
773 let mut buf = buf_cell.take().unwrap();
774
775 // Factor in any unread data from the buf
776 if !buf.is_empty() {
777 let n = buf.discard_read();
778
779 if let SeekFrom::Current(ref mut offset) = pos {
780 *offset += n;
781 }
782 }
783
784 let std = me.std.clone();
785
786 inner.state = State::Busy(spawn_blocking(move || {
787 let res = (&*std).seek(pos);
788 (Operation::Seek(res), buf)
789 }));
790 Ok(())
791 }
792 }
793 }
794
795 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
796 ready!(crate::trace::trace_leaf(cx));
797 let inner = self.inner.get_mut();
798
799 loop {
800 match inner.state {
801 State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
802 State::Busy(ref mut rx) => {
803 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
804 inner.state = State::Idle(Some(buf));
805
806 match op {
807 Operation::Read(_) => {}
808 Operation::Write(Err(e)) => {
809 assert!(inner.last_write_err.is_none());
810 inner.last_write_err = Some(e.kind());
811 }
812 Operation::Write(_) => {}
813 Operation::Seek(res) => {
814 if let Ok(pos) = res {
815 inner.pos = pos;
816 }
817 return Poll::Ready(res);
818 }
819 }
820 }
821 }
822 }
823 }
824}
825
826impl AsyncWrite for File {
827 fn poll_write(
828 self: Pin<&mut Self>,
829 cx: &mut Context<'_>,
830 src: &[u8],
831 ) -> Poll<io::Result<usize>> {
832 ready!(crate::trace::trace_leaf(cx));
833 let me = self.get_mut();
834 let inner = me.inner.get_mut();
835
836 if let Some(e) = inner.last_write_err.take() {
837 return Poll::Ready(Err(e.into()));
838 }
839
840 loop {
841 match inner.state {
842 State::Idle(ref mut buf_cell) => {
843 let mut buf = buf_cell.take().unwrap();
844
845 let seek = if !buf.is_empty() {
846 Some(SeekFrom::Current(buf.discard_read()))
847 } else {
848 None
849 };
850
851 let n = buf.copy_from(src, me.max_buf_size);
852 let std = me.std.clone();
853
854 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
855 let res = if let Some(seek) = seek {
856 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
857 } else {
858 buf.write_to(&mut &*std)
859 };
860
861 (Operation::Write(res), buf)
862 })
863 .ok_or_else(|| {
864 io::Error::new(io::ErrorKind::Other, "background task failed")
865 })?;
866
867 inner.state = State::Busy(blocking_task_join_handle);
868
869 return Poll::Ready(Ok(n));
870 }
871 State::Busy(ref mut rx) => {
872 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
873 inner.state = State::Idle(Some(buf));
874
875 match op {
876 Operation::Read(_) => {
877 // We don't care about the result here. The fact
878 // that the cursor has advanced will be reflected in
879 // the next iteration of the loop
880 continue;
881 }
882 Operation::Write(res) => {
883 // If the previous write was successful, continue.
884 // Otherwise, error.
885 res?;
886 continue;
887 }
888 Operation::Seek(_) => {
889 // Ignore the seek
890 continue;
891 }
892 }
893 }
894 }
895 }
896 }
897
898 fn poll_write_vectored(
899 self: Pin<&mut Self>,
900 cx: &mut Context<'_>,
901 bufs: &[io::IoSlice<'_>],
902 ) -> Poll<Result<usize, io::Error>> {
903 ready!(crate::trace::trace_leaf(cx));
904 let me = self.get_mut();
905 let inner = me.inner.get_mut();
906
907 if let Some(e) = inner.last_write_err.take() {
908 return Poll::Ready(Err(e.into()));
909 }
910
911 loop {
912 match inner.state {
913 State::Idle(ref mut buf_cell) => {
914 let mut buf = buf_cell.take().unwrap();
915
916 let seek = if !buf.is_empty() {
917 Some(SeekFrom::Current(buf.discard_read()))
918 } else {
919 None
920 };
921
922 let n = buf.copy_from_bufs(bufs, me.max_buf_size);
923 let std = me.std.clone();
924
925 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
926 let res = if let Some(seek) = seek {
927 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
928 } else {
929 buf.write_to(&mut &*std)
930 };
931
932 (Operation::Write(res), buf)
933 })
934 .ok_or_else(|| {
935 io::Error::new(io::ErrorKind::Other, "background task failed")
936 })?;
937
938 inner.state = State::Busy(blocking_task_join_handle);
939
940 return Poll::Ready(Ok(n));
941 }
942 State::Busy(ref mut rx) => {
943 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
944 inner.state = State::Idle(Some(buf));
945
946 match op {
947 Operation::Read(_) => {
948 // We don't care about the result here. The fact
949 // that the cursor has advanced will be reflected in
950 // the next iteration of the loop
951 continue;
952 }
953 Operation::Write(res) => {
954 // If the previous write was successful, continue.
955 // Otherwise, error.
956 res?;
957 continue;
958 }
959 Operation::Seek(_) => {
960 // Ignore the seek
961 continue;
962 }
963 }
964 }
965 }
966 }
967 }
968
969 fn is_write_vectored(&self) -> bool {
970 true
971 }
972
973 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
974 ready!(crate::trace::trace_leaf(cx));
975 let inner = self.inner.get_mut();
976 inner.poll_flush(cx)
977 }
978
979 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
980 ready!(crate::trace::trace_leaf(cx));
981 self.poll_flush(cx)
982 }
983}
984
985impl From<StdFile> for File {
986 fn from(std: StdFile) -> Self {
987 Self::from_std(std)
988 }
989}
990
991impl fmt::Debug for File {
992 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
993 fmt.debug_struct("tokio::fs::File")
994 .field("std", &self.std)
995 .finish()
996 }
997}
998
999#[cfg(unix)]
1000impl std::os::unix::io::AsRawFd for File {
1001 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
1002 self.std.as_raw_fd()
1003 }
1004}
1005
1006#[cfg(unix)]
1007impl std::os::unix::io::AsFd for File {
1008 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
1009 unsafe {
1010 std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
1011 }
1012 }
1013}
1014
1015#[cfg(unix)]
1016impl std::os::unix::io::FromRawFd for File {
1017 unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
1018 // Safety: exactly the same safety contract as
1019 // `std::os::unix::io::FromRawFd::from_raw_fd`.
1020 unsafe { StdFile::from_raw_fd(fd).into() }
1021 }
1022}
1023
1024cfg_windows! {
1025 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
1026
1027 impl AsRawHandle for File {
1028 fn as_raw_handle(&self) -> RawHandle {
1029 self.std.as_raw_handle()
1030 }
1031 }
1032
1033 impl AsHandle for File {
1034 fn as_handle(&self) -> BorrowedHandle<'_> {
1035 unsafe {
1036 BorrowedHandle::borrow_raw(
1037 AsRawHandle::as_raw_handle(self),
1038 )
1039 }
1040 }
1041 }
1042
1043 impl FromRawHandle for File {
1044 unsafe fn from_raw_handle(handle: RawHandle) -> Self {
1045 // Safety: exactly the same safety contract as
1046 // `FromRawHandle::from_raw_handle`.
1047 unsafe { StdFile::from_raw_handle(handle).into() }
1048 }
1049 }
1050}
1051
1052impl Inner {
1053 async fn complete_inflight(&mut self) {
1054 use std::future::poll_fn;
1055
1056 poll_fn(|cx| self.poll_complete_inflight(cx)).await;
1057 }
1058
1059 fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
1060 ready!(crate::trace::trace_leaf(cx));
1061 match self.poll_flush(cx) {
1062 Poll::Ready(Err(e)) => {
1063 self.last_write_err = Some(e.kind());
1064 Poll::Ready(())
1065 }
1066 Poll::Ready(Ok(())) => Poll::Ready(()),
1067 Poll::Pending => Poll::Pending,
1068 }
1069 }
1070
1071 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
1072 if let Some(e) = self.last_write_err.take() {
1073 return Poll::Ready(Err(e.into()));
1074 }
1075
1076 let (op, buf) = match self.state {
1077 State::Idle(_) => return Poll::Ready(Ok(())),
1078 State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
1079 };
1080
1081 // The buffer is not used here
1082 self.state = State::Idle(Some(buf));
1083
1084 match op {
1085 Operation::Read(_) => Poll::Ready(Ok(())),
1086 Operation::Write(res) => Poll::Ready(res),
1087 Operation::Seek(_) => Poll::Ready(Ok(())),
1088 }
1089 }
1090}
1091
1092#[cfg(test)]
1093mod tests;