Skip to main content

tokio/fs/
file.rs

1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::cmp;
11use std::fmt;
12use std::fs::{Metadata, Permissions};
13use std::future::Future;
14use std::io::{self, Seek, SeekFrom};
15use std::path::Path;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{ready, Context, Poll};
19
20#[cfg(test)]
21use super::mocks::JoinHandle;
22#[cfg(test)]
23use super::mocks::MockFile as StdFile;
24#[cfg(test)]
25use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
26#[cfg(not(test))]
27use crate::blocking::JoinHandle;
28#[cfg(not(test))]
29use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
30#[cfg(not(test))]
31use std::fs::File as StdFile;
32
33/// A reference to an open file on the filesystem.
34///
35/// This is a specialized version of [`std::fs::File`] for usage from the
36/// Tokio runtime.
37///
38/// An instance of a `File` can be read and/or written depending on what options
39/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
40/// cursor that the file contains internally.
41///
42/// A file will not be closed immediately when it goes out of scope if there
43/// are any IO operations that have not yet completed. To ensure that a file is
44/// closed immediately when it is dropped, you should call [`flush`] before
45/// dropping it. Note that this does not ensure that the file has been fully
46/// written to disk; the operating system might keep the changes around in an
47/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
48/// the data to disk.
49///
50/// Reading and writing to a `File` is usually done using the convenience
51/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
52///
53/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
54/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
55/// [`sync_all`]: fn@crate::fs::File::sync_all
56/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
57/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
58///
59/// # Examples
60///
61/// Create a new file and asynchronously write bytes to it:
62///
63/// ```no_run
64/// use tokio::fs::File;
65/// use tokio::io::AsyncWriteExt; // for write_all()
66///
67/// # async fn dox() -> std::io::Result<()> {
68/// let mut file = File::create("foo.txt").await?;
69/// file.write_all(b"hello, world!").await?;
70/// # Ok(())
71/// # }
72/// ```
73///
74/// Read the contents of a file into a buffer:
75///
76/// ```no_run
77/// use tokio::fs::File;
78/// use tokio::io::AsyncReadExt; // for read_to_end()
79///
80/// # async fn dox() -> std::io::Result<()> {
81/// let mut file = File::open("foo.txt").await?;
82///
83/// let mut contents = vec![];
84/// file.read_to_end(&mut contents).await?;
85///
86/// println!("len = {}", contents.len());
87/// # Ok(())
88/// # }
89/// ```
90pub struct File {
91    std: Arc<StdFile>,
92    inner: Mutex<Inner>,
93    max_buf_size: usize,
94}
95
96struct Inner {
97    state: State,
98
99    /// Errors from writes/flushes are returned in write/flush calls. If a write
100    /// error is observed while performing a read, it is saved until the next
101    /// write / flush call.
102    last_write_err: Option<io::ErrorKind>,
103
104    pos: u64,
105}
106
107#[derive(Debug)]
108enum State {
109    Idle(Option<Buf>),
110    Busy(JoinHandle<(Operation, Buf)>),
111}
112
113#[derive(Debug)]
114enum Operation {
115    Read(io::Result<usize>),
116    Write(io::Result<()>),
117    Seek(io::Result<u64>),
118}
119
120impl File {
121    /// Attempts to open a file in read-only mode.
122    ///
123    /// See [`OpenOptions`] for more details.
124    ///
125    /// # Errors
126    ///
127    /// This function will return an error if called from outside of the Tokio
128    /// runtime or if path does not already exist. Other errors may also be
129    /// returned according to `OpenOptions::open`.
130    ///
131    /// # Examples
132    ///
133    /// ```no_run
134    /// use tokio::fs::File;
135    /// use tokio::io::AsyncReadExt;
136    ///
137    /// # async fn dox() -> std::io::Result<()> {
138    /// let mut file = File::open("foo.txt").await?;
139    ///
140    /// let mut contents = vec![];
141    /// file.read_to_end(&mut contents).await?;
142    ///
143    /// println!("len = {}", contents.len());
144    /// # Ok(())
145    /// # }
146    /// ```
147    ///
148    /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
149    ///
150    /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
151    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
152    pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
153        Self::options().read(true).open(path).await
154    }
155
156    /// Opens a file in write-only mode.
157    ///
158    /// This function will create a file if it does not exist, and will truncate
159    /// it if it does.
160    ///
161    /// See [`OpenOptions`] for more details.
162    ///
163    /// # Errors
164    ///
165    /// Results in an error if called from outside of the Tokio runtime or if
166    /// the underlying [`create`] call results in an error.
167    ///
168    /// [`create`]: std::fs::File::create
169    ///
170    /// # Examples
171    ///
172    /// ```no_run
173    /// use tokio::fs::File;
174    /// use tokio::io::AsyncWriteExt;
175    ///
176    /// # async fn dox() -> std::io::Result<()> {
177    /// let mut file = File::create("foo.txt").await?;
178    /// file.write_all(b"hello, world!").await?;
179    /// # Ok(())
180    /// # }
181    /// ```
182    ///
183    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
184    ///
185    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
186    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
187    pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
188        Self::options()
189            .write(true)
190            .create(true)
191            .truncate(true)
192            .open(path)
193            .await
194    }
195
196    /// Opens a file in read-write mode.
197    ///
198    /// This function will create a file if it does not exist, or return an error
199    /// if it does. This way, if the call succeeds, the file returned is guaranteed
200    /// to be new.
201    ///
202    /// This option is useful because it is atomic. Otherwise between checking
203    /// whether a file exists and creating a new one, the file may have been
204    /// created by another process (a TOCTOU race condition / attack).
205    ///
206    /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
207    ///
208    /// See [`OpenOptions`] for more details.
209    ///
210    /// # Examples
211    ///
212    /// ```no_run
213    /// use tokio::fs::File;
214    /// use tokio::io::AsyncWriteExt;
215    ///
216    /// # async fn dox() -> std::io::Result<()> {
217    /// let mut file = File::create_new("foo.txt").await?;
218    /// file.write_all(b"hello, world!").await?;
219    /// # Ok(())
220    /// # }
221    /// ```
222    ///
223    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
224    ///
225    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
226    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
227    pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
228        Self::options()
229            .read(true)
230            .write(true)
231            .create_new(true)
232            .open(path)
233            .await
234    }
235
236    /// Returns a new [`OpenOptions`] object.
237    ///
238    /// This function returns a new `OpenOptions` object that you can use to
239    /// open or create a file with specific options if `open()` or `create()`
240    /// are not appropriate.
241    ///
242    /// It is equivalent to `OpenOptions::new()`, but allows you to write more
243    /// readable code. Instead of
244    /// `OpenOptions::new().append(true).open("example.log")`,
245    /// you can write `File::options().append(true).open("example.log")`. This
246    /// also avoids the need to import `OpenOptions`.
247    ///
248    /// See the [`OpenOptions::new`] function for more details.
249    ///
250    /// # Examples
251    ///
252    /// ```no_run
253    /// use tokio::fs::File;
254    /// use tokio::io::AsyncWriteExt;
255    ///
256    /// # async fn dox() -> std::io::Result<()> {
257    /// let mut f = File::options().append(true).open("example.log").await?;
258    /// f.write_all(b"new line\n").await?;
259    /// # Ok(())
260    /// # }
261    /// ```
262    #[must_use]
263    pub fn options() -> OpenOptions {
264        OpenOptions::new()
265    }
266
267    /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
268    ///
269    /// # Examples
270    ///
271    /// ```no_run
272    /// // This line could block. It is not recommended to do this on the Tokio
273    /// // runtime.
274    /// let std_file = std::fs::File::open("foo.txt").unwrap();
275    /// let file = tokio::fs::File::from_std(std_file);
276    /// ```
277    pub fn from_std(std: StdFile) -> File {
278        File {
279            std: Arc::new(std),
280            inner: Mutex::new(Inner {
281                state: State::Idle(Some(Buf::with_capacity(0))),
282                last_write_err: None,
283                pos: 0,
284            }),
285            max_buf_size: DEFAULT_MAX_BUF_SIZE,
286        }
287    }
288
289    /// Attempts to sync all OS-internal metadata to disk.
290    ///
291    /// This function will attempt to ensure that all in-core data reaches the
292    /// filesystem before returning.
293    ///
294    /// # Examples
295    ///
296    /// ```no_run
297    /// use tokio::fs::File;
298    /// use tokio::io::AsyncWriteExt;
299    ///
300    /// # async fn dox() -> std::io::Result<()> {
301    /// let mut file = File::create("foo.txt").await?;
302    /// file.write_all(b"hello, world!").await?;
303    /// file.sync_all().await?;
304    /// # Ok(())
305    /// # }
306    /// ```
307    ///
308    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
309    ///
310    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
311    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
312    pub async fn sync_all(&self) -> io::Result<()> {
313        let mut inner = self.inner.lock().await;
314        inner.complete_inflight().await;
315
316        let std = self.std.clone();
317        asyncify(move || std.sync_all()).await
318    }
319
320    /// This function is similar to `sync_all`, except that it may not
321    /// synchronize file metadata to the filesystem.
322    ///
323    /// This is intended for use cases that must synchronize content, but don't
324    /// need the metadata on disk. The goal of this method is to reduce disk
325    /// operations.
326    ///
327    /// Note that some platforms may simply implement this in terms of `sync_all`.
328    ///
329    /// # Examples
330    ///
331    /// ```no_run
332    /// use tokio::fs::File;
333    /// use tokio::io::AsyncWriteExt;
334    ///
335    /// # async fn dox() -> std::io::Result<()> {
336    /// let mut file = File::create("foo.txt").await?;
337    /// file.write_all(b"hello, world!").await?;
338    /// file.sync_data().await?;
339    /// # Ok(())
340    /// # }
341    /// ```
342    ///
343    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
344    ///
345    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
346    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
347    pub async fn sync_data(&self) -> io::Result<()> {
348        let mut inner = self.inner.lock().await;
349        inner.complete_inflight().await;
350
351        let std = self.std.clone();
352        asyncify(move || std.sync_data()).await
353    }
354
355    /// Truncates or extends the underlying file, updating the size of this file to become size.
356    ///
357    /// If the size is less than the current file's size, then the file will be
358    /// shrunk. If it is greater than the current file's size, then the file
359    /// will be extended to size and have all of the intermediate data filled in
360    /// with 0s.
361    ///
362    /// # Errors
363    ///
364    /// This function will return an error if the file is not opened for
365    /// writing.
366    ///
367    /// # Examples
368    ///
369    /// ```no_run
370    /// use tokio::fs::File;
371    /// use tokio::io::AsyncWriteExt;
372    ///
373    /// # async fn dox() -> std::io::Result<()> {
374    /// let mut file = File::create("foo.txt").await?;
375    /// file.write_all(b"hello, world!").await?;
376    /// file.set_len(10).await?;
377    /// # Ok(())
378    /// # }
379    /// ```
380    ///
381    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
382    ///
383    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
384    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
385    pub async fn set_len(&self, size: u64) -> io::Result<()> {
386        let mut inner = self.inner.lock().await;
387        inner.complete_inflight().await;
388
389        let mut buf = match inner.state {
390            State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
391            _ => unreachable!(),
392        };
393
394        let seek = if !buf.is_empty() {
395            Some(SeekFrom::Current(buf.discard_read()))
396        } else {
397            None
398        };
399
400        let std = self.std.clone();
401
402        inner.state = State::Busy(spawn_blocking(move || {
403            let res = if let Some(seek) = seek {
404                (&*std).seek(seek).and_then(|_| std.set_len(size))
405            } else {
406                std.set_len(size)
407            }
408            .map(|()| 0); // the value is discarded later
409
410            // Return the result as a seek
411            (Operation::Seek(res), buf)
412        }));
413
414        let (op, buf) = match inner.state {
415            State::Idle(_) => unreachable!(),
416            State::Busy(ref mut rx) => rx.await?,
417        };
418
419        inner.state = State::Idle(Some(buf));
420
421        match op {
422            Operation::Seek(res) => res.map(|pos| {
423                inner.pos = pos;
424            }),
425            _ => unreachable!(),
426        }
427    }
428
429    /// Queries metadata about the underlying file.
430    ///
431    /// # Examples
432    ///
433    /// ```no_run
434    /// use tokio::fs::File;
435    ///
436    /// # async fn dox() -> std::io::Result<()> {
437    /// let file = File::open("foo.txt").await?;
438    /// let metadata = file.metadata().await?;
439    ///
440    /// println!("{:?}", metadata);
441    /// # Ok(())
442    /// # }
443    /// ```
444    pub async fn metadata(&self) -> io::Result<Metadata> {
445        let std = self.std.clone();
446        asyncify(move || std.metadata()).await
447    }
448
449    /// Executes an `IORING_OP_URING_CMD` operation on this file descriptor.
450    ///
451    /// This submits a 16-byte device/file-specific command payload that is
452    /// handled by the kernel subsystem backing this file descriptor.
453    /// Since the commands and their payloads are device specific, there is no
454    /// central list of possible commands. See the relevant kernel subsystem
455    /// documentation for the device you are interacting with.
456    ///
457    /// # io_uring support
458    ///
459    /// To use this API, enable `--cfg tokio_unstable`, the `io-uring` feature,
460    /// `Builder::enable_io`, and `Builder::enable_io_uring` on Linux.
461    ///
462    /// # Safety
463    ///
464    /// This function is unsafe because one could send privileged commands
465    /// or commands with payloads "telling" the kernel to read/write random
466    /// memory addresses specific to the command, which could fail due to
467    /// use-after-free or cause data corruption.
468    ///
469    /// # Examples
470    ///
471    /// ```no_run
472    /// # #[tokio::main]
473    /// # async fn main() -> std::io::Result<()> {
474    /// use tokio::fs::File;
475    ///
476    /// // Suppose you have a character device that supports a specific `uring_cmd`.
477    /// let file = File::open("/dev/my_custom_device").await?;
478    ///
479    /// // The `cmd_op` is defined by the device driver (similar to an ioctl number).
480    /// const MY_DRIVER_CMD_OP: u32 = 0x1234;
481    ///
482    /// // The 16-byte payload is also defined by the device driver.
483    /// // Often, this is used to pass small serialized C structs or pointers.
484    /// #[repr(C)]
485    /// struct MyCmdData {
486    ///     device_id: u32,
487    ///     size_param: u32,
488    ///     _pad: [u8; 8], // Pad to exactly 16 bytes
489    /// }
490    ///
491    /// let data = MyCmdData {
492    ///     device_id: 42,
493    ///     size_param: 100,
494    ///     _pad: [0; 8],
495    /// };
496    ///
497    /// // Safely transmute the 16-byte struct into a byte array
498    /// let cmd_payload: [u8; 16] = unsafe { std::mem::transmute(data) };
499    ///
500    /// // SAFETY: The command and payload must be exactly what the specific
501    /// // device driver expects to prevent undefined behavior in the kernel.
502    /// let _ = unsafe { file.uring_cmd(MY_DRIVER_CMD_OP, cmd_payload, None).await };
503    /// # Ok(())
504    /// # }
505    /// ```
506    ///
507    /// # Errors
508    ///
509    /// Returns [`std::io::ErrorKind::Unsupported`] if `io_uring` or
510    /// `IORING_OP_URING_CMD` is not available at runtime.
511    #[cfg(all(
512        tokio_unstable,
513        feature = "io-uring",
514        feature = "rt",
515        feature = "fs",
516        target_os = "linux"
517    ))]
518    pub async unsafe fn uring_cmd(
519        &self,
520        cmd_op: u32,
521        cmd: [u8; 16],
522        buf_index: Option<u16>,
523    ) -> io::Result<u32> {
524        use crate::runtime::driver::op::Op;
525        use std::os::fd::OwnedFd;
526
527        self.inner.lock().await.complete_inflight().await;
528
529        let handle = crate::runtime::Handle::current();
530        let driver_handle = handle.inner.driver().io();
531
532        if !driver_handle
533            .check_and_init(io_uring::opcode::UringCmd16::CODE)
534            .await?
535        {
536            return Err(io::Error::new(
537                io::ErrorKind::Unsupported,
538                "io_uring uring_cmd is not supported",
539            ));
540        }
541
542        let fd: OwnedFd = self.std.try_clone()?.into();
543        let (res, _fd) = Op::uring_cmd16(fd, cmd_op, cmd, buf_index).await;
544        res
545    }
546
547    /// Creates a new `File` instance that shares the same underlying file handle
548    /// as the existing `File` instance. Reads, writes, and seeks will affect both
549    /// File instances simultaneously.
550    ///
551    /// # Examples
552    ///
553    /// ```no_run
554    /// use tokio::fs::File;
555    ///
556    /// # async fn dox() -> std::io::Result<()> {
557    /// let file = File::open("foo.txt").await?;
558    /// let file_clone = file.try_clone().await?;
559    /// # Ok(())
560    /// # }
561    /// ```
562    pub async fn try_clone(&self) -> io::Result<File> {
563        self.inner.lock().await.complete_inflight().await;
564        let std = self.std.clone();
565        let std_file = asyncify(move || std.try_clone()).await?;
566        let mut file = File::from_std(std_file);
567        file.set_max_buf_size(self.max_buf_size);
568        Ok(file)
569    }
570
571    /// Destructures `File` into a [`std::fs::File`]. This function is
572    /// async to allow any in-flight operations to complete.
573    ///
574    /// Use `File::try_into_std` to attempt conversion immediately.
575    ///
576    /// # Examples
577    ///
578    /// ```no_run
579    /// use tokio::fs::File;
580    ///
581    /// # async fn dox() -> std::io::Result<()> {
582    /// let tokio_file = File::open("foo.txt").await?;
583    /// let std_file = tokio_file.into_std().await;
584    /// # Ok(())
585    /// # }
586    /// ```
587    pub async fn into_std(mut self) -> StdFile {
588        self.inner.get_mut().complete_inflight().await;
589        Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
590    }
591
592    /// Tries to immediately destructure `File` into a [`std::fs::File`].
593    ///
594    /// # Errors
595    ///
596    /// This function will return an error containing the file if some
597    /// operation is in-flight.
598    ///
599    /// # Examples
600    ///
601    /// ```no_run
602    /// use tokio::fs::File;
603    ///
604    /// # async fn dox() -> std::io::Result<()> {
605    /// let tokio_file = File::open("foo.txt").await?;
606    /// let std_file = tokio_file.try_into_std().unwrap();
607    /// # Ok(())
608    /// # }
609    /// ```
610    #[allow(clippy::result_large_err)]
611    pub fn try_into_std(mut self) -> Result<StdFile, Self> {
612        match Arc::try_unwrap(self.std) {
613            Ok(file) => Ok(file),
614            Err(std_file_arc) => {
615                self.std = std_file_arc;
616                Err(self)
617            }
618        }
619    }
620
621    /// Changes the permissions on the underlying file.
622    ///
623    /// # Platform-specific behavior
624    ///
625    /// This function currently corresponds to the `fchmod` function on Unix and
626    /// the `SetFileInformationByHandle` function on Windows. Note that, this
627    /// [may change in the future][changes].
628    ///
629    /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
630    ///
631    /// # Errors
632    ///
633    /// This function will return an error if the user lacks permission change
634    /// attributes on the underlying file. It may also return an error in other
635    /// os-specific unspecified cases.
636    ///
637    /// # Examples
638    ///
639    /// ```no_run
640    /// use tokio::fs::File;
641    ///
642    /// # async fn dox() -> std::io::Result<()> {
643    /// let file = File::open("foo.txt").await?;
644    /// let mut perms = file.metadata().await?.permissions();
645    /// perms.set_readonly(true);
646    /// file.set_permissions(perms).await?;
647    /// # Ok(())
648    /// # }
649    /// ```
650    pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
651        let std = self.std.clone();
652        asyncify(move || std.set_permissions(perm)).await
653    }
654
655    /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
656    ///
657    /// Although Tokio uses a sensible default value for this buffer size, this function would be
658    /// useful for changing that default depending on the situation.
659    ///
660    /// # Examples
661    ///
662    /// ```no_run
663    /// use tokio::fs::File;
664    /// use tokio::io::AsyncWriteExt;
665    ///
666    /// # async fn dox() -> std::io::Result<()> {
667    /// let mut file = File::open("foo.txt").await?;
668    ///
669    /// // Set maximum buffer size to 8 MiB
670    /// file.set_max_buf_size(8 * 1024 * 1024);
671    ///
672    /// let mut buf = vec![1; 1024 * 1024 * 1024];
673    ///
674    /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
675    /// file.write_all(&mut buf).await?;
676    /// # Ok(())
677    /// # }
678    /// ```
679    pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
680        self.max_buf_size = max_buf_size;
681    }
682
683    /// Get the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
684    pub fn max_buf_size(&self) -> usize {
685        self.max_buf_size
686    }
687}
688
689impl AsyncRead for File {
690    fn poll_read(
691        self: Pin<&mut Self>,
692        cx: &mut Context<'_>,
693        dst: &mut ReadBuf<'_>,
694    ) -> Poll<io::Result<()>> {
695        ready!(crate::trace::trace_leaf(cx));
696
697        let me = self.get_mut();
698        let inner = me.inner.get_mut();
699
700        loop {
701            match inner.state {
702                State::Idle(ref mut buf_cell) => {
703                    let mut buf = buf_cell.take().unwrap();
704
705                    if !buf.is_empty() || dst.remaining() == 0 {
706                        buf.copy_to(dst);
707                        *buf_cell = Some(buf);
708                        return Poll::Ready(Ok(()));
709                    }
710
711                    let std = me.std.clone();
712
713                    let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
714                    inner.state = State::Busy(spawn_blocking(move || {
715                        // SAFETY: the `Read` implementation of `std` does not
716                        // read from the buffer it is borrowing and correctly
717                        // reports the length of the data written into the buffer.
718                        let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
719                        (Operation::Read(res), buf)
720                    }));
721                }
722                State::Busy(ref mut rx) => {
723                    let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
724
725                    match op {
726                        Operation::Read(Ok(_)) => {
727                            buf.copy_to(dst);
728                            inner.state = State::Idle(Some(buf));
729                            return Poll::Ready(Ok(()));
730                        }
731                        Operation::Read(Err(e)) => {
732                            assert!(buf.is_empty());
733
734                            inner.state = State::Idle(Some(buf));
735                            return Poll::Ready(Err(e));
736                        }
737                        Operation::Write(Ok(())) => {
738                            assert!(buf.is_empty());
739                            inner.state = State::Idle(Some(buf));
740                            continue;
741                        }
742                        Operation::Write(Err(e)) => {
743                            assert!(inner.last_write_err.is_none());
744                            inner.last_write_err = Some(e.kind());
745                            inner.state = State::Idle(Some(buf));
746                        }
747                        Operation::Seek(result) => {
748                            assert!(buf.is_empty());
749                            inner.state = State::Idle(Some(buf));
750                            if let Ok(pos) = result {
751                                inner.pos = pos;
752                            }
753                            continue;
754                        }
755                    }
756                }
757            }
758        }
759    }
760}
761
762impl AsyncSeek for File {
763    fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
764        let me = self.get_mut();
765        let inner = me.inner.get_mut();
766
767        match inner.state {
768            State::Busy(_) => Err(io::Error::new(
769                io::ErrorKind::Other,
770                "other file operation is pending, call poll_complete before start_seek",
771            )),
772            State::Idle(ref mut buf_cell) => {
773                let mut buf = buf_cell.take().unwrap();
774
775                // Factor in any unread data from the buf
776                if !buf.is_empty() {
777                    let n = buf.discard_read();
778
779                    if let SeekFrom::Current(ref mut offset) = pos {
780                        *offset += n;
781                    }
782                }
783
784                let std = me.std.clone();
785
786                inner.state = State::Busy(spawn_blocking(move || {
787                    let res = (&*std).seek(pos);
788                    (Operation::Seek(res), buf)
789                }));
790                Ok(())
791            }
792        }
793    }
794
795    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
796        ready!(crate::trace::trace_leaf(cx));
797        let inner = self.inner.get_mut();
798
799        loop {
800            match inner.state {
801                State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
802                State::Busy(ref mut rx) => {
803                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
804                    inner.state = State::Idle(Some(buf));
805
806                    match op {
807                        Operation::Read(_) => {}
808                        Operation::Write(Err(e)) => {
809                            assert!(inner.last_write_err.is_none());
810                            inner.last_write_err = Some(e.kind());
811                        }
812                        Operation::Write(_) => {}
813                        Operation::Seek(res) => {
814                            if let Ok(pos) = res {
815                                inner.pos = pos;
816                            }
817                            return Poll::Ready(res);
818                        }
819                    }
820                }
821            }
822        }
823    }
824}
825
826impl AsyncWrite for File {
827    fn poll_write(
828        self: Pin<&mut Self>,
829        cx: &mut Context<'_>,
830        src: &[u8],
831    ) -> Poll<io::Result<usize>> {
832        ready!(crate::trace::trace_leaf(cx));
833        let me = self.get_mut();
834        let inner = me.inner.get_mut();
835
836        if let Some(e) = inner.last_write_err.take() {
837            return Poll::Ready(Err(e.into()));
838        }
839
840        loop {
841            match inner.state {
842                State::Idle(ref mut buf_cell) => {
843                    let mut buf = buf_cell.take().unwrap();
844
845                    let seek = if !buf.is_empty() {
846                        Some(SeekFrom::Current(buf.discard_read()))
847                    } else {
848                        None
849                    };
850
851                    let n = buf.copy_from(src, me.max_buf_size);
852                    let std = me.std.clone();
853
854                    let blocking_task_join_handle = spawn_mandatory_blocking(move || {
855                        let res = if let Some(seek) = seek {
856                            (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
857                        } else {
858                            buf.write_to(&mut &*std)
859                        };
860
861                        (Operation::Write(res), buf)
862                    })
863                    .ok_or_else(|| {
864                        io::Error::new(io::ErrorKind::Other, "background task failed")
865                    })?;
866
867                    inner.state = State::Busy(blocking_task_join_handle);
868
869                    return Poll::Ready(Ok(n));
870                }
871                State::Busy(ref mut rx) => {
872                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
873                    inner.state = State::Idle(Some(buf));
874
875                    match op {
876                        Operation::Read(_) => {
877                            // We don't care about the result here. The fact
878                            // that the cursor has advanced will be reflected in
879                            // the next iteration of the loop
880                            continue;
881                        }
882                        Operation::Write(res) => {
883                            // If the previous write was successful, continue.
884                            // Otherwise, error.
885                            res?;
886                            continue;
887                        }
888                        Operation::Seek(_) => {
889                            // Ignore the seek
890                            continue;
891                        }
892                    }
893                }
894            }
895        }
896    }
897
898    fn poll_write_vectored(
899        self: Pin<&mut Self>,
900        cx: &mut Context<'_>,
901        bufs: &[io::IoSlice<'_>],
902    ) -> Poll<Result<usize, io::Error>> {
903        ready!(crate::trace::trace_leaf(cx));
904        let me = self.get_mut();
905        let inner = me.inner.get_mut();
906
907        if let Some(e) = inner.last_write_err.take() {
908            return Poll::Ready(Err(e.into()));
909        }
910
911        loop {
912            match inner.state {
913                State::Idle(ref mut buf_cell) => {
914                    let mut buf = buf_cell.take().unwrap();
915
916                    let seek = if !buf.is_empty() {
917                        Some(SeekFrom::Current(buf.discard_read()))
918                    } else {
919                        None
920                    };
921
922                    let n = buf.copy_from_bufs(bufs, me.max_buf_size);
923                    let std = me.std.clone();
924
925                    let blocking_task_join_handle = spawn_mandatory_blocking(move || {
926                        let res = if let Some(seek) = seek {
927                            (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
928                        } else {
929                            buf.write_to(&mut &*std)
930                        };
931
932                        (Operation::Write(res), buf)
933                    })
934                    .ok_or_else(|| {
935                        io::Error::new(io::ErrorKind::Other, "background task failed")
936                    })?;
937
938                    inner.state = State::Busy(blocking_task_join_handle);
939
940                    return Poll::Ready(Ok(n));
941                }
942                State::Busy(ref mut rx) => {
943                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
944                    inner.state = State::Idle(Some(buf));
945
946                    match op {
947                        Operation::Read(_) => {
948                            // We don't care about the result here. The fact
949                            // that the cursor has advanced will be reflected in
950                            // the next iteration of the loop
951                            continue;
952                        }
953                        Operation::Write(res) => {
954                            // If the previous write was successful, continue.
955                            // Otherwise, error.
956                            res?;
957                            continue;
958                        }
959                        Operation::Seek(_) => {
960                            // Ignore the seek
961                            continue;
962                        }
963                    }
964                }
965            }
966        }
967    }
968
969    fn is_write_vectored(&self) -> bool {
970        true
971    }
972
973    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
974        ready!(crate::trace::trace_leaf(cx));
975        let inner = self.inner.get_mut();
976        inner.poll_flush(cx)
977    }
978
979    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
980        ready!(crate::trace::trace_leaf(cx));
981        self.poll_flush(cx)
982    }
983}
984
985impl From<StdFile> for File {
986    fn from(std: StdFile) -> Self {
987        Self::from_std(std)
988    }
989}
990
991impl fmt::Debug for File {
992    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
993        fmt.debug_struct("tokio::fs::File")
994            .field("std", &self.std)
995            .finish()
996    }
997}
998
999#[cfg(unix)]
1000impl std::os::unix::io::AsRawFd for File {
1001    fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
1002        self.std.as_raw_fd()
1003    }
1004}
1005
1006#[cfg(unix)]
1007impl std::os::unix::io::AsFd for File {
1008    fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
1009        unsafe {
1010            std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
1011        }
1012    }
1013}
1014
1015#[cfg(unix)]
1016impl std::os::unix::io::FromRawFd for File {
1017    unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
1018        // Safety: exactly the same safety contract as
1019        // `std::os::unix::io::FromRawFd::from_raw_fd`.
1020        unsafe { StdFile::from_raw_fd(fd).into() }
1021    }
1022}
1023
1024cfg_windows! {
1025    use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
1026
1027    impl AsRawHandle for File {
1028        fn as_raw_handle(&self) -> RawHandle {
1029            self.std.as_raw_handle()
1030        }
1031    }
1032
1033    impl AsHandle for File {
1034        fn as_handle(&self) -> BorrowedHandle<'_> {
1035            unsafe {
1036                BorrowedHandle::borrow_raw(
1037                    AsRawHandle::as_raw_handle(self),
1038                )
1039            }
1040        }
1041    }
1042
1043    impl FromRawHandle for File {
1044        unsafe fn from_raw_handle(handle: RawHandle) -> Self {
1045            // Safety: exactly the same safety contract as
1046            // `FromRawHandle::from_raw_handle`.
1047            unsafe { StdFile::from_raw_handle(handle).into() }
1048        }
1049    }
1050}
1051
1052impl Inner {
1053    async fn complete_inflight(&mut self) {
1054        use std::future::poll_fn;
1055
1056        poll_fn(|cx| self.poll_complete_inflight(cx)).await;
1057    }
1058
1059    fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
1060        ready!(crate::trace::trace_leaf(cx));
1061        match self.poll_flush(cx) {
1062            Poll::Ready(Err(e)) => {
1063                self.last_write_err = Some(e.kind());
1064                Poll::Ready(())
1065            }
1066            Poll::Ready(Ok(())) => Poll::Ready(()),
1067            Poll::Pending => Poll::Pending,
1068        }
1069    }
1070
1071    fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
1072        if let Some(e) = self.last_write_err.take() {
1073            return Poll::Ready(Err(e.into()));
1074        }
1075
1076        let (op, buf) = match self.state {
1077            State::Idle(_) => return Poll::Ready(Ok(())),
1078            State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
1079        };
1080
1081        // The buffer is not used here
1082        self.state = State::Idle(Some(buf));
1083
1084        match op {
1085            Operation::Read(_) => Poll::Ready(Ok(())),
1086            Operation::Write(res) => Poll::Ready(res),
1087            Operation::Seek(_) => Poll::Ready(Ok(())),
1088        }
1089    }
1090}
1091
1092#[cfg(test)]
1093mod tests;