Skip to content

Commit

Permalink
Implement copy_file_range operation
Browse files Browse the repository at this point in the history
copy_file_range is a convenient way to COW blocks within the same
filesystem.
  • Loading branch information
vlovich committed Oct 17, 2023
1 parent a3d9d22 commit b17641a
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 0 deletions.
60 changes: 60 additions & 0 deletions glommio/src/io/dma_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,29 @@ impl DmaFile {
}
}

/// Copies a file range from one file to another in kernel space. This is going to have the same performance
/// characteristic as splice except if both files are on the same filesystem and the filesystem supports reflinks.
/// In that case, the underlying disk blocks will be CoW linked instead of actually performing a copy.
/// Since `copy_file_range` is not yet implemented on io_uring (https://github.com/axboe/liburing/issues/831),
/// this is just a dispatch to the blocking thread pool to do the syscall.
pub async fn copy_file_range(
&self,
fd_in: &DmaFile,
off_in: u64,
len: usize,
off_out: u64,
) -> Result<usize> {
let source = self
.file
.reactor
.upgrade()
.unwrap()
.copy_file_range(fd_in.as_raw_fd(), off_in, self.as_raw_fd(), off_out, len)
.await;
let copy_size = enhanced_try!(source.collect_rw().await, "Copying file range", self.file)?;
Ok(copy_size)
}

/// Issues `fdatasync` for the underlying file, instructing the OS to flush
/// all writes to the device, providing durability even if the system
/// crashes or is rebooted.
Expand Down Expand Up @@ -1371,4 +1394,41 @@ pub(crate) mod test {
.await
.expect_err("O_TMPFILE requires opening with write permissions");
});

dma_file_test!(copy_file_range, path, _k, {
let file1 = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.tmpfile(true)
.dma_open(&path)
.await
.unwrap();

let file2 = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.tmpfile(true)
.dma_open(&path)
.await
.unwrap();

let mut buffer = file1.alloc_dma_buffer(file1.alignment().max(4096) as usize);
buffer.as_bytes_mut().fill(0);
for i in 0..10u8 {
buffer.as_bytes_mut()[i as usize] = i;
}

file1.write_at(buffer, 0).await.unwrap();

assert_eq!(10, file2.copy_file_range(&file1, 1, 0, 10).await.unwrap());

let read = file2
.read_at_aligned(0, file2.alignment() as usize)
.await
.unwrap();
assert_eq!(read.len(), 10);
assert!(read.iter().enumerate().all(|(i, b)| i + 1 == *b as usize));
});
}
35 changes: 35 additions & 0 deletions glommio/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,41 @@ impl Reactor {
source
}

pub(crate) fn copy_file_range(
&self,
fd_in: RawFd,
off_in: u64,
fd_out: RawFd,
off_out: u64,
len: usize,
) -> impl Future<Output = Source> {
let stats = StatsCollection {
fulfilled: Some(|result, stats, op_count| {
if let Ok(result) = result {
let len = *result as u64 * op_count;

stats.file_reads += op_count;
stats.file_bytes_read += len;
stats.file_writes += op_count;
stats.file_bytes_written += len;
}
}),
reused: None,
latency: None,
};

let source = self.new_source(
fd_out,
SourceType::CopyFileRange(fd_in, off_in, len),
Some(stats),
);
let waiter = self.sys.copy_file_range(&source, off_out);
async move {
waiter.await;
source
}
}

pub(crate) fn write_buffered(&self, raw: RawFd, buf: Vec<u8>, pos: u64) -> Source {
let stats = StatsCollection {
fulfilled: Some(|result, stats, op_count| {
Expand Down
15 changes: 15 additions & 0 deletions glommio/src/sys/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub(super) enum BlockingThreadOp {
Remove(PathBuf),
CreateDir(PathBuf, libc::c_int),
Truncate(RawFd, i64),
CopyFileRange(RawFd, i64, RawFd, i64, usize),
Fn(Box<dyn FnOnce() + Send + 'static>),
}

Expand All @@ -73,6 +74,10 @@ impl Debug for BlockingThreadOp {
write!(f, "create dir `{path:?}` (`{flags:b}`)")
}
BlockingThreadOp::Truncate(fd, to) => write!(f, "truncate `{fd}` -> `{to}`"),
BlockingThreadOp::CopyFileRange(fd_in, off_in, fd_out, off_out, len) => write!(
f,
"copy_file_range `{fd_in}` @ `{off_in}` -> {fd_out} @ `{off_out}` for {len} bytes"
),
BlockingThreadOp::Fn(_) => write!(f, "user function"),
}
}
Expand All @@ -97,6 +102,16 @@ impl BlockingThreadOp {
BlockingThreadOp::Truncate(fd, sz) => {
raw_syscall!(ftruncate(fd, sz))
}
BlockingThreadOp::CopyFileRange(fd_in, mut off_in, fd_out, mut off_out, len) => {
raw_syscall!(copy_file_range(
fd_in,
&mut off_in,
fd_out,
&mut off_out,
len,
0
))
}
BlockingThreadOp::Fn(f) => {
f();
BlockingThreadResult::Fn
Expand Down
1 change: 1 addition & 0 deletions glommio/src/sys/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub(crate) enum SourceType {
Remove(PathBuf),
BlockingFn,
Invalid,
CopyFileRange(RawFd, u64, usize),
#[cfg(feature = "bench")]
Noop,
}
Expand Down
16 changes: 16 additions & 0 deletions glommio/src/sys/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,22 @@ impl Reactor {
self.enqueue_blocking_request(source.inner.clone(), op)
}

pub(crate) fn copy_file_range(&self, source: &Source, pos: u64) -> impl Future<Output = ()> {
let (fd_in, off_in, len) = match &*source.source_type() {
SourceType::CopyFileRange(fd_in, off_in, len) => (*fd_in, *off_in, *len),
_ => panic!("Unexpected source for copy_file_range operation"),
};

let op = BlockingThreadOp::CopyFileRange(
fd_in,
(off_in).try_into().unwrap(),
source.raw(),
pos.try_into().unwrap(),
len,
);
self.enqueue_blocking_request(source.inner.clone(), op)
}

pub(crate) fn create_dir(
&self,
source: &Source,
Expand Down

0 comments on commit b17641a

Please sign in to comment.