diff --git a/glommio/src/io/dma_file.rs b/glommio/src/io/dma_file.rs index b974060d9..3c4f3310f 100644 --- a/glommio/src/io/dma_file.rs +++ b/glommio/src/io/dma_file.rs @@ -482,6 +482,29 @@ impl DmaFile { } } + /// Copies a file range from one file to another in kernel space. This is going to have the same performance + /// characteristic as splice except if both files are on the same filesystem and the filesystem supports reflinks. + /// In that case, the underlying disk blocks will be CoW linked instead of actually performing a copy. + /// Since `copy_file_range` is not yet implemented on io_uring (https://github.com/axboe/liburing/issues/831), + /// this is just a dispatch to the blocking thread pool to do the syscall. + pub async fn copy_file_range( + &self, + fd_in: &DmaFile, + off_in: u64, + len: usize, + off_out: u64, + ) -> Result { + let source = self + .file + .reactor + .upgrade() + .unwrap() + .copy_file_range(fd_in.as_raw_fd(), off_in, self.as_raw_fd(), off_out, len) + .await; + let copy_size = enhanced_try!(source.collect_rw().await, "Copying file range", self.file)?; + Ok(copy_size) + } + /// Issues `fdatasync` for the underlying file, instructing the OS to flush /// all writes to the device, providing durability even if the system /// crashes or is rebooted. @@ -1371,4 +1394,41 @@ pub(crate) mod test { .await .expect_err("O_TMPFILE requires opening with write permissions"); }); + + dma_file_test!(copy_file_range, path, _k, { + let file1 = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .tmpfile(true) + .dma_open(&path) + .await + .unwrap(); + + let file2 = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .tmpfile(true) + .dma_open(&path) + .await + .unwrap(); + + let mut buffer = file1.alloc_dma_buffer(file1.alignment().max(4096) as usize); + buffer.as_bytes_mut().fill(0); + for i in 0..10u8 { + buffer.as_bytes_mut()[i as usize] = i; + } + + file1.write_at(buffer, 0).await.unwrap(); + + assert_eq!(10, file2.copy_file_range(&file1, 1, 0, 10).await.unwrap()); + + let read = file2 + .read_at_aligned(0, file2.alignment() as usize) + .await + .unwrap(); + assert_eq!(read.len(), 10); + assert!(read.iter().enumerate().all(|(i, b)| i + 1 == *b as usize)); + }); } diff --git a/glommio/src/reactor.rs b/glommio/src/reactor.rs index 70dd85640..67262b978 100644 --- a/glommio/src/reactor.rs +++ b/glommio/src/reactor.rs @@ -347,6 +347,41 @@ impl Reactor { source } + pub(crate) fn copy_file_range( + &self, + fd_in: RawFd, + off_in: u64, + fd_out: RawFd, + off_out: u64, + len: usize, + ) -> impl Future { + let stats = StatsCollection { + fulfilled: Some(|result, stats, op_count| { + if let Ok(result) = result { + let len = *result as u64 * op_count; + + stats.file_reads += op_count; + stats.file_bytes_read += len; + stats.file_writes += op_count; + stats.file_bytes_written += len; + } + }), + reused: None, + latency: None, + }; + + let source = self.new_source( + fd_out, + SourceType::CopyFileRange(fd_in, off_in, len), + Some(stats), + ); + let waiter = self.sys.copy_file_range(&source, off_out); + async move { + waiter.await; + source + } + } + pub(crate) fn write_buffered(&self, raw: RawFd, buf: Vec, pos: u64) -> Source { let stats = StatsCollection { fulfilled: Some(|result, stats, op_count| { diff --git a/glommio/src/sys/blocking.rs b/glommio/src/sys/blocking.rs index c29c0a5cb..72e26bf38 100644 --- a/glommio/src/sys/blocking.rs +++ b/glommio/src/sys/blocking.rs @@ -61,6 +61,7 @@ pub(super) enum BlockingThreadOp { Remove(PathBuf), CreateDir(PathBuf, libc::c_int), Truncate(RawFd, i64), + CopyFileRange(RawFd, i64, RawFd, i64, usize), Fn(Box), } @@ -73,6 +74,10 @@ impl Debug for BlockingThreadOp { write!(f, "create dir `{path:?}` (`{flags:b}`)") } BlockingThreadOp::Truncate(fd, to) => write!(f, "truncate `{fd}` -> `{to}`"), + BlockingThreadOp::CopyFileRange(fd_in, off_in, fd_out, off_out, len) => write!( + f, + "copy_file_range `{fd_in}` @ `{off_in}` -> {fd_out} @ `{off_out}` for {len} bytes" + ), BlockingThreadOp::Fn(_) => write!(f, "user function"), } } @@ -97,6 +102,16 @@ impl BlockingThreadOp { BlockingThreadOp::Truncate(fd, sz) => { raw_syscall!(ftruncate(fd, sz)) } + BlockingThreadOp::CopyFileRange(fd_in, mut off_in, fd_out, mut off_out, len) => { + raw_syscall!(copy_file_range( + fd_in, + &mut off_in, + fd_out, + &mut off_out, + len, + 0 + )) + } BlockingThreadOp::Fn(f) => { f(); BlockingThreadResult::Fn diff --git a/glommio/src/sys/source.rs b/glommio/src/sys/source.rs index 98950444d..c91b3e1a8 100644 --- a/glommio/src/sys/source.rs +++ b/glommio/src/sys/source.rs @@ -61,6 +61,7 @@ pub(crate) enum SourceType { Remove(PathBuf), BlockingFn, Invalid, + CopyFileRange(RawFd, u64, usize), #[cfg(feature = "bench")] Noop, } diff --git a/glommio/src/sys/uring.rs b/glommio/src/sys/uring.rs index 7020877d3..f7acb28d4 100644 --- a/glommio/src/sys/uring.rs +++ b/glommio/src/sys/uring.rs @@ -1591,6 +1591,22 @@ impl Reactor { self.enqueue_blocking_request(source.inner.clone(), op) } + pub(crate) fn copy_file_range(&self, source: &Source, pos: u64) -> impl Future { + let (fd_in, off_in, len) = match &*source.source_type() { + SourceType::CopyFileRange(fd_in, off_in, len) => (*fd_in, *off_in, *len), + _ => panic!("Unexpected source for copy_file_range operation"), + }; + + let op = BlockingThreadOp::CopyFileRange( + fd_in, + (off_in).try_into().unwrap(), + source.raw(), + pos.try_into().unwrap(), + len, + ); + self.enqueue_blocking_request(source.inner.clone(), op) + } + pub(crate) fn create_dir( &self, source: &Source,