Skip to content

Commit

Permalink
Add callback support to FileDescription
Browse files Browse the repository at this point in the history
   - Implementing atomic reads for contiguous buffers
   - Supports read operations with callback-based completion.

Signed-off-by: shamb0 <[email protected]>
  • Loading branch information
shamb0 committed Jan 9, 2025
1 parent 3736728 commit d98f7a2
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 15 deletions.
19 changes: 19 additions & 0 deletions src/shims/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ impl<T: FileDescription + 'static> FileDescriptionExt for T {

pub type DynFileDescriptionRef = FileDescriptionRef<dyn FileDescription>;

/// Represents a dynamic callback for file I/O operations.
pub type DynFileIOCallback<'tcx> = DynMachineCallback<'tcx, Result<usize, std::io::Error>>;

impl FileDescriptionRef<dyn FileDescription> {
pub fn downcast<T: FileDescription + 'static>(self) -> Option<FileDescriptionRef<T>> {
let inner = self.into_rc_any().downcast::<FdIdWith<T>>().ok()?;
Expand All @@ -135,6 +138,7 @@ pub trait FileDescription: std::fmt::Debug + FileDescriptionExt {
/// Reads as much as possible into the given buffer `ptr`.
/// `len` indicates how many bytes we should try to read.
/// `dest` is where the return value should be stored: number of bytes read, or `-1` in case of error.
#[allow(dead_code)]
fn read<'tcx>(
self: FileDescriptionRef<Self>,
_communicate_allowed: bool,
Expand All @@ -146,6 +150,21 @@ pub trait FileDescription: std::fmt::Debug + FileDescriptionExt {
throw_unsup_format!("cannot read from {}", self.name());
}

/// Performs an asynchronous read operation on the file description.
/// The caller must ensure that:
/// * The buffer pointer points to valid memory of sufficient size
/// * The file description remains valid for the duration of the operation
fn read_with_callback<'tcx>(
self: FileDescriptionRef<Self>,
_communicate_allowed: bool,
_ptr: Pointer,
_len: usize,
_completion_callback: DynFileIOCallback<'tcx>,
_ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
throw_unsup_format!("cannot read from {}", self.name());
}

/// Writes as much as possible from the given buffer `ptr`.
/// `len` indicates how many bytes we should try to write.
/// `dest` is where the return value should be stored: number of bytes written, or `-1` in case of error.
Expand Down
25 changes: 22 additions & 3 deletions src/shims/unix/fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::io::ErrorKind;
use rustc_abi::Size;

use crate::helpers::check_min_arg_count;
use crate::shims::files::FileDescription;
use crate::shims::files::{DynFileIOCallback, FileDescription};
use crate::shims::unix::linux_like::epoll::EpollReadyEvents;
use crate::shims::unix::*;
use crate::*;
Expand Down Expand Up @@ -203,7 +203,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
interp_ok(Scalar::from_i32(this.try_unwrap_io_result(result)?))
}

/// Read data from `fd` into buffer specified by `buf` and `count`.
/// Reads data from a file descriptor using callback-based completion.
///
/// If `offset` is `None`, reads data from current cursor position associated with `fd`
/// and updates cursor position on completion. Otherwise, reads from the specified offset
Expand Down Expand Up @@ -244,8 +244,27 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
// because it was a target's `usize`. Also we are sure that its smaller than
// `usize::MAX` because it is bounded by the host's `isize`.

// Clone the result destination for use in the completion callback
let result_destination = dest.clone();

let completion_callback: DynFileIOCallback<'tcx> = callback!(
@capture<'tcx> {
result_destination: MPlaceTy<'tcx>,
}
|this, read_result: Result<usize, std::io::Error>| {
match read_result {
Ok(read_size) => {
this.write_int(u64::try_from(read_size).unwrap(), &result_destination)
}
Err(_err_code) => {
this.set_last_error_and_return(LibcError("EIO"), &result_destination)
}
}
}
);

match offset {
None => fd.read(communicate, buf, count, dest, this)?,
None => fd.read_with_callback(communicate, buf, count, completion_callback, this)?,
Some(offset) => {
let Ok(offset) = u64::try_from(offset) else {
return this.set_last_error_and_return(LibcError("EINVAL"), dest);
Expand Down
149 changes: 137 additions & 12 deletions src/shims/unix/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use rustc_data_structures::fx::FxHashMap;

use self::shims::time::system_time_to_duration;
use crate::helpers::check_min_arg_count;
use crate::shims::files::{EvalContextExt as _, FileDescription, FileDescriptionRef};
use crate::shims::files::{
DynFileIOCallback, EvalContextExt as _, FileDescription, FileDescriptionRef,
WeakFileDescriptionRef,
};
use crate::shims::os_str::bytes_to_os_str;
use crate::shims::unix::fd::{FlockOp, UnixFileDescription};
use crate::*;
Expand All @@ -23,27 +26,141 @@ use crate::*;
struct FileHandle {
file: File,
writable: bool,
/// Mutex for synchronizing file access across threads.
file_lock: MutexRef,
}

impl VisitProvenance for FileHandle {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// No provenance tracking needed for FileHandle as it contains no references.
// This implementation satisfies the trait requirement but performs no operations.
}
}

impl FileHandle {
/// Creates a new FileHandle with specified permissions and synchronization primitive.
fn new(file: File, writable: bool, file_lock: MutexRef) -> Self {
Self { file, writable, file_lock }
}

/// Attempts to create a clone of the file handle while preserving all attributes.
///
/// # Errors
/// Returns an `InterpResult` error if file handle cloning fails.
fn try_clone<'tcx>(&self) -> InterpResult<'tcx, FileHandle> {
let cloned_file = self
.file
.try_clone()
.map_err(|e| err_unsup_format!("Failed to clone file handle: {}", e))?;

interp_ok(FileHandle {
file: cloned_file,
writable: self.writable,
file_lock: self.file_lock.clone(),
})
}

/// Performs a synchronized file read operation with callback completion.
/// Acquires a mutex lock, validates the file descriptor, performs the read,
/// and invokes the callback with the result.
fn perform_read<'tcx>(
this: &mut MiriInterpCx<'tcx>,
completion_callback: DynFileIOCallback<'tcx>,
mut file_handle: FileHandle,
weak_fd: WeakFileDescriptionRef<FileHandle>,
buffer_ptr: Pointer,
length: usize,
) -> InterpResult<'tcx> {
this.mutex_lock(&file_handle.file_lock);

let result = {
// Verify file descriptor is still valid
if weak_fd.upgrade().is_none() {
throw_unsup_format!("file got closed while blocking")
}

let mut bytes = vec![0; length];
let read_result = file_handle.file.read(&mut bytes);

// Handle the read result
match read_result {
Ok(read_size) => {
// Write the bytes to memory
if let Err(err_code) = this
.write_bytes_ptr(buffer_ptr, bytes[..read_size].iter().copied())
.report_err()
{
throw_unsup_format!(
"Memory write failed during file read operation: {:#?}",
err_code
)
}
completion_callback.call(this, Ok(read_size))
}
Err(err_code) => completion_callback.call(this, Err(err_code)),
}
};

// Always unlock the mutex, even if the read operation failed
this.mutex_unlock(&file_handle.file_lock)?;

result
}
}

impl FileDescription for FileHandle {
fn name(&self) -> &'static str {
"file"
}

fn read<'tcx>(
fn read_with_callback<'tcx>(
self: FileDescriptionRef<Self>,
communicate_allowed: bool,
ptr: Pointer,
len: usize,
dest: &MPlaceTy<'tcx>,
completion_callback: DynFileIOCallback<'tcx>,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let this = ecx;
assert!(communicate_allowed, "isolation should have prevented even opening a file");
let mut bytes = vec![0; len];
let result = (&mut &self.file).read(&mut bytes);
match result {
Ok(read_size) => ecx.return_read_success(ptr, &bytes, read_size, dest),
Err(e) => ecx.set_last_error_and_return(e, dest),

// Clone the underlying File
let clone_file_handle = match self.try_clone().report_err() {
Ok(handle) => handle,
Err(ec) => throw_unsup_format!("unable to clone file discp {:#?}", ec),
};

let weak_fd = FileDescriptionRef::downgrade(&self);

if this.mutex_is_locked(&self.file_lock) {
this.block_thread(
BlockReason::Mutex,
None,
callback!(
@capture<'tcx> {
completion_callback: DynFileIOCallback<'tcx>,
clone_file_handle: FileHandle,
weak_fd: WeakFileDescriptionRef<FileHandle>,
ptr: Pointer,
len: usize,
}
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
FileHandle::perform_read(this, completion_callback, clone_file_handle, weak_fd, ptr, len)
}
),
);

unreachable!()
} else {
FileHandle::perform_read(
this,
completion_callback,
clone_file_handle,
weak_fd,
ptr,
len,
)
}
}

Expand Down Expand Up @@ -584,9 +701,13 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
return this.set_last_error_and_return_i32(ErrorKind::PermissionDenied);
}

let fd = options
.open(path)
.map(|file| this.machine.fds.insert_new(FileHandle { file, writable }));
let fd = options.open(path).map(|file| {
this.machine.fds.insert_new(FileHandle::new(
file,
writable,
this.machine.sync.mutex_create(),
))
});

interp_ok(Scalar::from_i32(this.try_unwrap_io_result(fd)?))
}
Expand Down Expand Up @@ -1645,7 +1766,11 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {

match file {
Ok(f) => {
let fd = this.machine.fds.insert_new(FileHandle { file: f, writable: true });
let fd = this.machine.fds.insert_new(FileHandle::new(
f,
true,
this.machine.sync.mutex_create(),
));
return interp_ok(Scalar::from_i32(fd));
}
Err(e) =>
Expand Down

0 comments on commit d98f7a2

Please sign in to comment.