diff --git a/glommio/src/io/dma_file.rs b/glommio/src/io/dma_file.rs index 0074e4443..e2561b9bc 100644 --- a/glommio/src/io/dma_file.rs +++ b/glommio/src/io/dma_file.rs @@ -482,6 +482,29 @@ impl DmaFile { } } + /// Copies a file range from one file to another in kernel space. This is going to have the same performance + /// characteristic as splice except if both files are on the same filesystem and the filesystem supports reflinks. + /// In that case, the underlying disk blocks will be CoW linked instead of actually performing a copy. + /// Since `copy_file_range` is not yet implemented on io_uring (https://github.com/axboe/liburing/issues/831), + /// this is just a dispatch to the blocking thread pool to do the syscall. + pub async fn copy_file_range_aligned( + &self, + fd_in: &DmaFile, + off_in: u64, + len: usize, + off_out: u64, + ) -> Result { + let source = self + .file + .reactor + .upgrade() + .unwrap() + .copy_file_range(fd_in.as_raw_fd(), off_in, self.as_raw_fd(), off_out, len) + .await; + let copy_size = enhanced_try!(source.collect_rw().await, "Copying file range", self.file)?; + Ok(copy_size) + } + /// Issues `fdatasync` for the underlying file, instructing the OS to flush /// all writes to the device, providing durability even if the system /// crashes or is rebooted. @@ -1583,4 +1606,46 @@ pub(crate) mod test { let stat = file.stat().await.unwrap(); assert_eq!(stat.file_size, alignment as u64, "{:?}", stat); }); + + dma_file_test!(copy_file_range, path, _k, { + let file1 = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .tmpfile(true) + .dma_open(&path) + .await + .unwrap(); + + let file2 = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .tmpfile(true) + .dma_open(&path) + .await + .unwrap(); + + let buffer_len = file1.alignment().max(4096) as usize; + let mut buffer = file1.alloc_dma_buffer(buffer_len); + buffer.as_bytes_mut().fill(0); + for i in 0..10u8 { + buffer.as_bytes_mut()[i as usize] = i; + } + let original_write_buffer = buffer.as_bytes_mut().to_vec(); + + file1.write_at(buffer, 0).await.unwrap(); + + assert_eq!( + buffer_len, + file2 + .copy_file_range_aligned(&file1, 0, buffer_len, 0) + .await + .unwrap() + ); + + let read = file2.read_at_aligned(0, buffer_len).await.unwrap(); + assert_eq!(read.len(), buffer_len); + assert_eq!(original_write_buffer.as_slice(), &read[..]); + }); } diff --git a/glommio/src/reactor.rs b/glommio/src/reactor.rs index e14883f65..7f5f9895e 100644 --- a/glommio/src/reactor.rs +++ b/glommio/src/reactor.rs @@ -347,6 +347,41 @@ impl Reactor { source } + pub(crate) fn copy_file_range( + &self, + fd_in: RawFd, + off_in: u64, + fd_out: RawFd, + off_out: u64, + len: usize, + ) -> impl Future { + let stats = StatsCollection { + fulfilled: Some(|result, stats, op_count| { + if let Ok(result) = result { + let len = *result as u64 * op_count; + + stats.file_reads += op_count; + stats.file_bytes_read += len; + stats.file_writes += op_count; + stats.file_bytes_written += len; + } + }), + reused: None, + latency: None, + }; + + let source = self.new_source( + fd_out, + SourceType::CopyFileRange(fd_in, off_in, len), + Some(stats), + ); + let waiter = self.sys.copy_file_range(&source, off_out); + async move { + waiter.await; + source + } + } + pub(crate) fn write_buffered(&self, raw: RawFd, buf: Vec, pos: u64) -> Source { let stats = StatsCollection { fulfilled: Some(|result, stats, op_count| { diff --git a/glommio/src/sys/blocking.rs b/glommio/src/sys/blocking.rs index c29c0a5cb..72e26bf38 100644 --- a/glommio/src/sys/blocking.rs +++ b/glommio/src/sys/blocking.rs @@ -61,6 +61,7 @@ pub(super) enum BlockingThreadOp { Remove(PathBuf), CreateDir(PathBuf, libc::c_int), Truncate(RawFd, i64), + CopyFileRange(RawFd, i64, RawFd, i64, usize), Fn(Box), } @@ -73,6 +74,10 @@ impl Debug for BlockingThreadOp { write!(f, "create dir `{path:?}` (`{flags:b}`)") } BlockingThreadOp::Truncate(fd, to) => write!(f, "truncate `{fd}` -> `{to}`"), + BlockingThreadOp::CopyFileRange(fd_in, off_in, fd_out, off_out, len) => write!( + f, + "copy_file_range `{fd_in}` @ `{off_in}` -> {fd_out} @ `{off_out}` for {len} bytes" + ), BlockingThreadOp::Fn(_) => write!(f, "user function"), } } @@ -97,6 +102,16 @@ impl BlockingThreadOp { BlockingThreadOp::Truncate(fd, sz) => { raw_syscall!(ftruncate(fd, sz)) } + BlockingThreadOp::CopyFileRange(fd_in, mut off_in, fd_out, mut off_out, len) => { + raw_syscall!(copy_file_range( + fd_in, + &mut off_in, + fd_out, + &mut off_out, + len, + 0 + )) + } BlockingThreadOp::Fn(f) => { f(); BlockingThreadResult::Fn diff --git a/glommio/src/sys/source.rs b/glommio/src/sys/source.rs index 98950444d..c91b3e1a8 100644 --- a/glommio/src/sys/source.rs +++ b/glommio/src/sys/source.rs @@ -61,6 +61,7 @@ pub(crate) enum SourceType { Remove(PathBuf), BlockingFn, Invalid, + CopyFileRange(RawFd, u64, usize), #[cfg(feature = "bench")] Noop, } diff --git a/glommio/src/sys/uring.rs b/glommio/src/sys/uring.rs index 7020877d3..f7acb28d4 100644 --- a/glommio/src/sys/uring.rs +++ b/glommio/src/sys/uring.rs @@ -1591,6 +1591,22 @@ impl Reactor { self.enqueue_blocking_request(source.inner.clone(), op) } + pub(crate) fn copy_file_range(&self, source: &Source, pos: u64) -> impl Future { + let (fd_in, off_in, len) = match &*source.source_type() { + SourceType::CopyFileRange(fd_in, off_in, len) => (*fd_in, *off_in, *len), + _ => panic!("Unexpected source for copy_file_range operation"), + }; + + let op = BlockingThreadOp::CopyFileRange( + fd_in, + (off_in).try_into().unwrap(), + source.raw(), + pos.try_into().unwrap(), + len, + ); + self.enqueue_blocking_request(source.inner.clone(), op) + } + pub(crate) fn create_dir( &self, source: &Source,