diff --git a/CMakeLists.txt b/CMakeLists.txt index 27a9a5229..21e2074d6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -117,7 +117,7 @@ set(FISH_BUILTIN_SRCS set(FISH_SRCS src/ast.cpp src/abbrs.cpp src/autoload.cpp src/color.cpp src/common.cpp src/complete.cpp src/env.cpp src/env_dispatch.cpp src/env_universal_common.cpp src/event.cpp - src/exec.cpp src/expand.cpp src/fallback.cpp src/fd_monitor.cpp src/fish_version.cpp + src/exec.cpp src/expand.cpp src/fallback.cpp src/fish_version.cpp src/flog.cpp src/function.cpp src/highlight.cpp src/history.cpp src/history_file.cpp src/input.cpp src/input_common.cpp src/io.cpp src/iothread.cpp src/job_group.cpp src/kill.cpp diff --git a/fish-rust/build.rs b/fish-rust/build.rs index cef14f542..907b3e0cc 100644 --- a/fish-rust/build.rs +++ b/fish-rust/build.rs @@ -19,7 +19,9 @@ fn main() -> miette::Result<()> { // This allows "Rust to be used from C++" // This must come before autocxx so that cxx can emit its cxx.h header. let source_files = vec![ + "src/fd_monitor.rs", "src/fd_readable_set.rs", + "src/fds.rs", "src/ffi_init.rs", "src/ffi_tests.rs", "src/future_feature_flags.rs", diff --git a/fish-rust/src/builtins/mod.rs b/fish-rust/src/builtins/mod.rs index dd530c8ae..16c4ca8cb 100644 --- a/fish-rust/src/builtins/mod.rs +++ b/fish-rust/src/builtins/mod.rs @@ -2,7 +2,7 @@ pub mod echo; pub mod emit; -mod exit; +pub mod exit; pub mod random; pub mod r#return; pub mod wait; diff --git a/fish-rust/src/fd_monitor.rs b/fish-rust/src/fd_monitor.rs new file mode 100644 index 000000000..9e74c64e5 --- /dev/null +++ b/fish-rust/src/fd_monitor.rs @@ -0,0 +1,567 @@ +use std::os::fd::{AsRawFd, RawFd}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use self::fd_monitor::{c_void, new_fd_event_signaller, FdEventSignaller, ItemWakeReason}; +use crate::fd_readable_set::FdReadableSet; +use crate::fds::AutoCloseFd; +use crate::ffi::void_ptr; +use crate::flog::FLOG; +use crate::wutil::perror; +use cxx::SharedPtr; + +#[cxx::bridge] +mod fd_monitor { + /// Reason for waking an item + #[repr(u8)] + #[cxx_name = "item_wake_reason_t"] + enum ItemWakeReason { + /// The fd became readable (or was HUP'd) + Readable, + /// The requested timeout was hit + Timeout, + /// The item was "poked" (woken up explicitly) + Poke, + } + + // Defines and exports a type shared between C++ and rust + struct c_void { + _unused: u8, + } + + unsafe extern "C++" { + include!("fds.h"); + + /// An event signaller implemented using a file descriptor, so it can plug into + /// [`select()`](libc::select). + /// + /// This is like a binary semaphore. A call to [`post()`](FdEventSignaller::post) will + /// signal an event, making the fd readable. Multiple calls to `post()` may be coalesced. + /// On Linux this uses [`eventfd()`](libc::eventfd), on other systems this uses a pipe. + /// [`try_consume()`](FdEventSignaller::try_consume) may be used to consume the event. + /// Importantly this is async signal safe. Of course it is `CLO_EXEC` as well. + #[rust_name = "FdEventSignaller"] + type fd_event_signaller_t = crate::ffi::fd_event_signaller_t; + #[rust_name = "new_fd_event_signaller"] + fn ffi_new_fd_event_signaller_t() -> SharedPtr; + } + extern "Rust" { + #[cxx_name = "fd_monitor_item_id_t"] + type FdMonitorItemId; + } + + extern "Rust" { + #[cxx_name = "fd_monitor_item_t"] + type FdMonitorItem; + + #[cxx_name = "make_fd_monitor_item_t"] + fn new_fd_monitor_item_ffi( + fd: i32, + timeout_usecs: u64, + callback: *const c_void, + param: *const c_void, + ) -> Box; + } + + extern "Rust" { + #[cxx_name = "fd_monitor_t"] + type FdMonitor; + + #[cxx_name = "make_fd_monitor_t"] + fn new_fd_monitor_ffi() -> Box; + + #[cxx_name = "add_item"] + fn add_item_ffi( + &mut self, + fd: i32, + timeout_usecs: u64, + callback: *const c_void, + param: *const c_void, + ) -> u64; + + #[cxx_name = "poke_item"] + fn poke_item_ffi(&self, item_id: u64); + + #[cxx_name = "add"] + pub fn add_ffi(&mut self, item: Box) -> u64; + } +} + +// TODO: Remove once we're no longer using the FFI variant of FdEventSignaller +unsafe impl Sync for FdEventSignaller {} +unsafe impl Send for FdEventSignaller {} + +/// Each item added to fd_monitor_t is assigned a unique ID, which is not recycled. Items may have +/// their callback triggered immediately by passing the ID. Zero is a sentinel. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct FdMonitorItemId(u64); + +type FfiCallback = extern "C" fn(*mut AutoCloseFd, u8, void_ptr); + +/// The callback type used by [`FdMonitorItem`]. It is passed a mutable reference to the +/// `FdMonitorItem`'s [`FdMonitorItem::fd`] and [the reason](ItemWakeupReason) for the wakeup. The +/// callback may close the fd, in which case the `FdMonitorItem` is removed from [`FdMonitor`]'s +/// set. +/// +/// As capturing C++ closures can't be safely used via ffi interop and cxx bridge doesn't support +/// passing typed `fn(...)` pointers from C++ to rust, we have a separate variant of the type that +/// uses the C abi to invoke a callback. This will be removed when the dependent C++ code (currently +/// only `src/io.cpp`) is ported to rust +enum FdMonitorCallback { + None, + Native(Box), + Ffi(FfiCallback /* fn ptr */, void_ptr /* param */), +} + +/// An item containing an fd and callback, which can be monitored to watch when it becomes readable +/// and invoke the callback. +pub struct FdMonitorItem { + /// The fd to monitor + fd: AutoCloseFd, + /// A callback to be invoked when the fd is readable, or when we are timed out. If we time out, + /// then timed_out will be true. If the fd is invalid on return from the function, then the item + /// is removed from the [`FdMonitor`] set. + callback: FdMonitorCallback, + /// The timeout associated with waiting on this item or `None` to wait indefinitely. A timeout + /// of `0` is not supported. + timeout: Option, + /// The last time we were called or the time of initialization. + last_time: Option, + /// The id for this item, assigned by [`FdMonitor`]. + item_id: FdMonitorItemId, +} + +/// Unlike C++, rust's `Vec` has `Vec::retain()` instead of `std::remove_if(...)` with the inverse +/// logic. It's hard to keep track of which bool means what across the different layers, so be more +/// explicit. +#[derive(PartialEq, Eq)] +enum ItemAction { + Remove, + Retain, +} + +impl FdMonitorItem { + /// Return the duration until the timeout should trigger or `None`. A return of `0` means we are + /// at or past the timeout. + fn remaining_time(&self, now: &Instant) -> Option { + let last_time = self.last_time.expect("Should always have a last_time!"); + let timeout = self.timeout?; + assert!(now >= &last_time, "Steady clock went backwards or bug!"); + let since = *now - last_time; + Some(if since >= timeout { + Duration::ZERO + } else { + timeout - since + }) + } + + /// Invoke this item's callback if its value (when its value is set in the fd or has timed out). + /// Returns `true` if the item should be retained or `false` if it should be removed from the + /// set. + fn service_item(&mut self, fds: &FdReadableSet, now: &Instant) -> ItemAction { + let mut result = ItemAction::Retain; + let readable = fds.test(self.fd.as_raw_fd()); + let timed_out = !readable && self.remaining_time(now) == Some(Duration::ZERO); + if readable || timed_out { + self.last_time = Some(*now); + let reason = if readable { + ItemWakeReason::Readable + } else { + ItemWakeReason::Timeout + }; + match &self.callback { + FdMonitorCallback::None => panic!("Callback not assigned!"), + FdMonitorCallback::Native(callback) => (callback)(&mut self.fd, reason), + FdMonitorCallback::Ffi(callback, param) => { + // Safety: identical objects are generated on both sides by cxx bridge as + // integers of the same size (minimum size to fit the enum). + let reason = unsafe { std::mem::transmute(reason) }; + (callback)(&mut self.fd as *mut _, reason, *param) + } + } + if !self.fd.is_valid() { + result = ItemAction::Remove; + } + } + return result; + } + + /// Invoke this item's callback with a poke, if its id is present in the sorted poke list. + // TODO: Rename to `maybe_poke_item()` to reflect its actual behavior. + fn poke_item(&mut self, pokelist: &[FdMonitorItemId]) -> ItemAction { + if self.item_id.0 == 0 || pokelist.binary_search(&self.item_id).is_err() { + // Not pokeable or not in the poke list. + return ItemAction::Retain; + } + + match &self.callback { + FdMonitorCallback::None => panic!("Callback not assigned!"), + FdMonitorCallback::Native(callback) => (callback)(&mut self.fd, ItemWakeReason::Poke), + FdMonitorCallback::Ffi(callback, param) => { + // Safety: identical objects are generated on both sides by cxx bridge as + // integers of the same size (minimum size to fit the enum). + let reason = unsafe { std::mem::transmute(ItemWakeReason::Poke) }; + (callback)(&mut self.fd as *mut _, reason, *param) + } + } + // Return `ItemAction::Remove` if the callback closed the fd + match self.fd.is_valid() { + true => ItemAction::Retain, + false => ItemAction::Remove, + } + } + + fn new() -> Self { + Self { + callback: FdMonitorCallback::None, + fd: AutoCloseFd::empty(), + timeout: None, + last_time: None, + item_id: FdMonitorItemId(0), + } + } + + fn set_callback_ffi(&mut self, callback: *const c_void, param: *const c_void) { + // Safety: we are just marshalling our function pointers with identical definitions on both + // sides of the ffi bridge as void pointers to keep cxx bridge happy. Whether we invoke the + // raw function as a void pointer or as a typed fn that helps us keep track of what we're + // doing is unsafe in all cases, so might as well make the best of it. + let callback = unsafe { std::mem::transmute(callback) }; + self.callback = FdMonitorCallback::Ffi(callback, void_ptr(param as _)); + } +} + +// cxx bridge does not support "static member functions" in C++ or rust, so we need a top-level fn. +fn new_fd_monitor_ffi() -> Box { + Box::new(FdMonitor::new()) +} + +// cxx bridge does not support "static member functions" in C++ or rust, so we need a top-level fn. +fn new_fd_monitor_item_ffi( + fd: RawFd, + timeout_usecs: u64, + callback: *const c_void, + param: *const c_void, +) -> Box { + // Safety: we are just marshalling our function pointers with identical definitions on both + // sides of the ffi bridge as void pointers to keep cxx bridge happy. Whether we invoke the + // raw function as a void pointer or as a typed fn that helps us keep track of what we're + // doing is unsafe in all cases, so might as well make the best of it. + let callback = unsafe { std::mem::transmute(callback) }; + let mut item = FdMonitorItem::new(); + item.fd.reset(fd); + item.callback = FdMonitorCallback::Ffi(callback, void_ptr(param as _)); + if timeout_usecs != FdReadableSet::kNoTimeout { + item.timeout = Some(Duration::from_micros(timeout_usecs)); + } + return Box::new(item); +} + +/// A thread-safe class which can monitor a set of fds, invoking a callback when any becomes +/// readable (or has been HUP'd) or when per-item-configurable timeouts are reached. +pub struct FdMonitor { + /// Our self-signaller. When this is written to, it means there are new items pending, new items + /// in the poke list, or terminate has been set. + change_signaller: SharedPtr, + /// The data shared between the background thread and the `FdMonitor` instance. + data: Arc>, + /// The last ID assigned or `0` if none. + last_id: AtomicU64, +} + +// We don't want to manually implement `Sync` for `FdMonitor` but we do want to make sure that it's +// always using interior mutability correctly and therefore automatically `Sync`. +const _: () = { + // It is sufficient to declare the generic function pointers; calling them too would require + // using `const fn` with Send/Sync constraints which wasn't stabilized until rustc 1.61.0 + fn assert_sync() {} + let _ = assert_sync::; +}; + +/// Data shared between the `FdMonitor` instance and its associated `BackgroundFdMonitor`. +struct SharedData { + /// Pending items. This is set by the main thread with the mutex locked, then the background + /// thread grabs them. + pending: Vec, + /// List of IDs for items that need to be poked (explicitly woken up). + pokelist: Vec, + /// Whether the background thread is running. + running: bool, + /// Used to signal that the background thread should terminate. + terminate: bool, +} + +/// The background half of the fd monitor, running on its own thread. +struct BackgroundFdMonitor { + /// The list of items to monitor. This is only accessed from the background thread. + /// This doesn't need to be in any particular order. + items: Vec, + /// Our self-signaller. When this is written to, it means there are new items pending, new items + /// in the poke list, or terminate has been set. + change_signaller: SharedPtr, + /// The data shared between the background thread and the `FdMonitor` instance. + data: Arc>, +} + +impl FdMonitor { + pub fn add_ffi(&self, item: Box) -> u64 { + self.add(*item).0 + } + + /// Add an item to the monitor. Returns the [`FdMonitorItemId`] assigned to the item. + pub fn add(&self, mut item: FdMonitorItem) -> FdMonitorItemId { + assert!(item.fd.is_valid()); + assert!(item.timeout != Some(Duration::ZERO), "Invalid timeout!"); + assert!( + item.item_id == FdMonitorItemId(0), + "Item should not already have an id!" + ); + + let item_id = self.last_id.fetch_add(1, Ordering::Relaxed) + 1; + let item_id = FdMonitorItemId(item_id); + let start_thread = { + // Lock around a local region + let mut data = self.data.lock().expect("Mutex poisoned!"); + + // Assign an id and add the item to pending + item.item_id = item_id; + data.pending.push(item); + + // Start the thread if it hasn't already been started + let already_started = data.running; + data.running = true; + !already_started + }; + + if start_thread { + FLOG!(fd_monitor, "Thread starting"); + let background_monitor = BackgroundFdMonitor { + data: Arc::clone(&self.data), + change_signaller: SharedPtr::clone(&self.change_signaller), + items: Vec::new(), + }; + crate::threads::spawn(move || { + background_monitor.run(); + }); + } + + item_id + } + + /// Avoid requiring a separate UniquePtr for each item C++ wants to add to the set by giving an + /// all-in-one entry point that can initialize the item on our end and insert it to the set. + fn add_item_ffi( + &mut self, + fd: RawFd, + timeout_usecs: u64, + callback: *const c_void, + param: *const c_void, + ) -> u64 { + // Safety: we are just marshalling our function pointers with identical definitions on both + // sides of the ffi bridge as void pointers to keep cxx bridge happy. Whether we invoke the + // raw function as a void pointer or as a typed fn that helps us keep track of what we're + // doing is unsafe in all cases, so might as well make the best of it. + let callback = unsafe { std::mem::transmute(callback) }; + let mut item = FdMonitorItem::new(); + item.fd.reset(fd); + item.callback = FdMonitorCallback::Ffi(callback, void_ptr(param as _)); + if timeout_usecs != FdReadableSet::kNoTimeout { + item.timeout = Some(Duration::from_micros(timeout_usecs)); + } + let item_id = self.add(item).0; + item_id + } + + /// Mark that the item with the given ID needs to be woken up explicitly. + pub fn poke_item(&self, item_id: FdMonitorItemId) { + assert!(item_id.0 > 0, "Invalid item id!"); + let needs_notification = { + let mut data = self.data.lock().expect("Mutex poisoned!"); + let needs_notification = data.pokelist.is_empty(); + // Insert it, sorted. + // TODO: The C++ code inserts it even if it's already in the poke list. That seems + // unnecessary? + let pos = match data.pokelist.binary_search(&item_id) { + Ok(pos) => pos, + Err(pos) => pos, + }; + data.pokelist.insert(pos, item_id); + needs_notification + }; + + if needs_notification { + self.change_signaller.post(); + } + } + + fn poke_item_ffi(&self, item_id: u64) { + self.poke_item(FdMonitorItemId(item_id)) + } + + pub fn new() -> Self { + Self { + data: Arc::new(Mutex::new(SharedData { + pending: Vec::new(), + pokelist: Vec::new(), + running: false, + terminate: false, + })), + change_signaller: new_fd_event_signaller(), + last_id: AtomicU64::new(0), + } + } +} + +impl BackgroundFdMonitor { + /// Starts monitoring the fd set and listening for new fds to add to the set. Takes ownership + /// over its instance so that this method cannot be called again. + fn run(mut self) { + let mut pokelist: Vec = Vec::new(); + let mut fds = FdReadableSet::new(); + + loop { + // Poke any items that need it + if !pokelist.is_empty() { + self.poke(&mut pokelist); + pokelist.clear(); + } + fds.clear(); + + // Our change_signaller is special-cased + let change_signal_fd = self.change_signaller.read_fd().into(); + fds.add(change_signal_fd); + + let mut now = Instant::now(); + // Use Duration::MAX to represent no timeout for comparison purposes. + let mut timeout = Duration::MAX; + + for item in &mut self.items { + fds.add(item.fd.as_raw_fd()); + if !item.last_time.is_some() { + item.last_time = Some(now); + } + timeout = timeout.min(item.timeout.unwrap_or(Duration::MAX)); + } + + // If we have no items, then we wish to allow the thread to exit, but after a time, so + // we aren't spinning up and tearing down the thread repeatedly. Set a timeout of 256 + // msec; if nothing becomes readable by then we will exit. We refer to this as the + // wait-lap. + let is_wait_lap = self.items.is_empty(); + if is_wait_lap { + assert!( + timeout == Duration::MAX, + "Should not have a timeout on wait lap!" + ); + timeout = Duration::from_millis(256); + } + + // Don't leave Duration::MAX as an actual timeout value + let timeout = match timeout { + Duration::MAX => None, + timeout => Some(timeout), + }; + + // Call select() + let ret = fds.check_readable( + timeout + .map(|duration| duration.as_micros() as u64) + .unwrap_or(FdReadableSet::kNoTimeout), + ); + if ret < 0 && errno::errno().0 != libc::EINTR { + // Surprising error + perror("select"); + } + + // Update the value of `now` after waiting on `fds.check_readable()`; it's used in the + // servicer closure. + now = Instant::now(); + + // A predicate which services each item in turn, returning true if it should be removed + let servicer = |item: &mut FdMonitorItem| { + let fd = item.fd.as_raw_fd(); + if item.service_item(&fds, &now) == ItemAction::Remove { + FLOG!(fd_monitor, "Removing fd", fd); + return ItemAction::Remove; + } + return ItemAction::Retain; + }; + + // Service all items that are either readable or have timed out, and remove any which + // say to do so. + + // This line is from the C++ codebase (fd_monitor.cpp:170) but this write is never read. + // now = Instant::now(); + + self.items + .retain_mut(|item| servicer(item) == ItemAction::Retain); + + // Handle any changes if the change signaller was set. Alternatively, this may be the + // wait lap, in which case we might want to commit to exiting. + let change_signalled = fds.test(change_signal_fd); + if change_signalled || is_wait_lap { + // Clear the change signaller before processing incoming changes + self.change_signaller.try_consume(); + let mut data = self.data.lock().expect("Mutex poisoned!"); + + // Move from `pending` to the end of `items` + self.items.extend(&mut data.pending.drain(..)); + + // Grab any poke list + assert!( + pokelist.is_empty(), + "poke list should be empty or else we're dropping pokes!" + ); + std::mem::swap(&mut pokelist, &mut data.pokelist); + + if data.terminate + || (is_wait_lap + && self.items.is_empty() + && pokelist.is_empty() + && !change_signalled) + { + // Maybe terminate is set. Alternatively, maybe we had no items, waited a bit, + // and still have no items. It's important to do this while holding the lock, + // otherwise we race with new items being added. + assert!( + data.running, + "Thread should be running because we're that thread" + ); + FLOG!(fd_monitor, "Thread exiting"); + data.running = false; + break; + } + } + } + } + + /// Poke items in the poke list, removing any items that close their fd in their callback. The + /// poke list is consumed after this. This is only called from the background thread. + fn poke(&mut self, pokelist: &[FdMonitorItemId]) { + self.items.retain_mut(|item| { + let action = item.poke_item(&*pokelist); + if action == ItemAction::Remove { + FLOG!(fd_monitor, "Removing fd", item.fd.as_raw_fd()); + } + return action == ItemAction::Retain; + }); + } +} + +/// In ordinary usage, we never invoke the destructor. This is used in the tests to not leave stale +/// fds arounds; this is why it's very hacky! +impl Drop for FdMonitor { + fn drop(&mut self) { + // Safety: this is a port of the C++ code and we are running in the destructor. The C++ code + // had no way to bubble back any errors encountered here, and the pthread mutex the C++ code + // uses does not have a concept of mutex poisoning. + self.data.lock().expect("Mutex poisoned!").terminate = true; + self.change_signaller.post(); + + // Safety: see note above. + while self.data.lock().expect("Mutex poisoned!").running { + std::thread::sleep(Duration::from_millis(5)); + } + } +} diff --git a/fish-rust/src/fd_readable_set.rs b/fish-rust/src/fd_readable_set.rs index 4bea1248d..eeefaf21b 100644 --- a/fish-rust/src/fd_readable_set.rs +++ b/fish-rust/src/fd_readable_set.rs @@ -1,6 +1,8 @@ use libc::c_int; use std::os::unix::io::RawFd; +pub use fd_readable_set_t as FdReadableSet; + #[cxx::bridge] mod fd_readable_set_ffi { extern "Rust" { @@ -20,13 +22,13 @@ pub fn new_fd_readable_set() -> Box { Box::new(fd_readable_set_t::new()) } -/// \return true if the fd is or becomes readable within the given timeout. -/// This returns false if the waiting is interrupted by a signal. +/// Returns `true` if the fd is or becomes readable within the given timeout. +/// This returns `false` if the waiting is interrupted by a signal. pub fn is_fd_readable(fd: i32, timeout_usec: u64) -> bool { fd_readable_set_t::is_fd_readable(fd, timeout_usec) } -/// \return whether an fd is readable. +/// Returns whether an fd is readable. pub fn poll_fd_readable(fd: i32) -> bool { fd_readable_set_t::poll_fd_readable(fd) } @@ -75,13 +77,14 @@ pub fn add(&mut self, fd: RawFd) { } } - /// \return true if the given fd is marked as set, in our set. \returns false if negative. + /// Returns `true` if the given `fd` is marked as set, in our set. Returns `false` if `fd` is + /// negative. pub fn test(&self, fd: RawFd) -> bool { fd >= 0 && unsafe { libc::FD_ISSET(fd, &self.fdset_) } } - /// Call select() or poll(), according to FISH_READABLE_SET_USE_POLL. Note this destructively - /// modifies the set. \return the result of select() or poll(). + /// Call `select()` or `poll()`, according to FISH_READABLE_SET_USE_POLL. Note this + /// destructively modifies the set. Returns the result of `select()` or `poll()`. pub fn check_readable(&mut self, timeout_usec: u64) -> c_int { let null = std::ptr::null_mut(); if timeout_usec == Self::kNoTimeout { @@ -106,7 +109,7 @@ pub fn check_readable(&mut self, timeout_usec: u64) -> c_int { } /// Check if a single fd is readable, with a given timeout. - /// \return true if readable, false if not. + /// Returns `true` if readable, `false` otherwise. pub fn is_fd_readable(fd: RawFd, timeout_usec: u64) -> bool { if fd < 0 { return false; @@ -118,7 +121,7 @@ pub fn is_fd_readable(fd: RawFd, timeout_usec: u64) -> bool { } /// Check if a single fd is readable, without blocking. - /// \return true if readable, false if not. + /// Returns `true` if readable, `false` if not. pub fn poll_fd_readable(fd: RawFd) -> bool { return Self::is_fd_readable(fd, 0); } @@ -151,23 +154,29 @@ fn pollfd_get_fd(pollfd: &libc::pollfd) -> RawFd { pollfd.fd } - /// Add an fd to the set. The fd is ignored if negative (for convenience). + /// Add an fd to the set. The fd is ignored if negative (for convenience). The fd is also + /// ignored if it's already in the set. pub fn add(&mut self, fd: RawFd) { - if fd >= 0 { - if let Err(pos) = self.pollfds_.binary_search_by_key(&fd, Self::pollfd_get_fd) { - self.pollfds_.insert( - pos, - libc::pollfd { - fd, - events: libc::POLLIN, - revents: 0, - }, - ); - } + if fd < 0 { + return; } + let pos = match self.pollfds_.binary_search_by_key(&fd, Self::pollfd_get_fd) { + Ok(_) => return, + Err(pos) => pos, + }; + + self.pollfds_.insert( + pos, + libc::pollfd { + fd, + events: libc::POLLIN, + revents: 0, + }, + ); } - /// \return true if the given fd is marked as set, in our set. \returns false if negative. + /// Returns `true` if the given `fd` has input available to read or has been HUP'd. + /// Returns `false` if `fd` is negative or was not found in the set. pub fn test(&self, fd: RawFd) -> bool { // If a pipe is widowed with no data, Linux sets POLLHUP but not POLLIN, so test for both. if let Ok(pos) = self.pollfds_.binary_search_by_key(&fd, Self::pollfd_get_fd) { @@ -178,7 +187,7 @@ pub fn test(&self, fd: RawFd) -> bool { return false; } - // Convert from a usec to a poll-friendly msec. + /// Convert from usecs to poll-friendly msecs. fn usec_to_poll_msec(timeout_usec: u64) -> c_int { let mut timeout_msec: u64 = timeout_usec / kUsecPerMsec; // Round to nearest, down for halfway. @@ -206,6 +215,8 @@ fn do_poll(fds: &mut [libc::pollfd], timeout_usec: u64) -> c_int { /// Call select() or poll(), according to FISH_READABLE_SET_USE_POLL. Note this destructively /// modifies the set. \return the result of select() or poll(). + /// + /// TODO: Change to [`Duration`](std::time::Duration) once FFI usage is done. pub fn check_readable(&mut self, timeout_usec: u64) -> c_int { if self.pollfds_.is_empty() { return 0; @@ -214,7 +225,7 @@ pub fn check_readable(&mut self, timeout_usec: u64) -> c_int { } /// Check if a single fd is readable, with a given timeout. - /// \return true if readable, false if not. + /// \return true if `fd` is our set and is readable, `false` otherwise. pub fn is_fd_readable(fd: RawFd, timeout_usec: u64) -> bool { if fd < 0 { return false; diff --git a/fish-rust/src/fds.rs b/fish-rust/src/fds.rs index a7092c644..ab1c7bdd6 100644 --- a/fish-rust/src/fds.rs +++ b/fish-rust/src/fds.rs @@ -1,13 +1,30 @@ use crate::ffi; use nix::unistd; -use std::os::unix::io::RawFd; +use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; /// A helper type for managing and automatically closing a file descriptor -pub struct autoclose_fd_t { +/// +/// This was implemented in rust as a port of the existing C++ code but it didn't take its place +/// (yet) and there's still the original cpp implementation in `src/fds.h`, so its name is +/// disambiguated because some code uses a mix of both for interop purposes. +pub struct AutoCloseFd { fd_: RawFd, } -impl autoclose_fd_t { +#[cxx::bridge] +mod autoclose_fd_t { + extern "Rust" { + #[cxx_name = "autoclose_fd_t2"] + type AutoCloseFd; + + #[cxx_name = "valid"] + fn is_valid(&self) -> bool; + fn close(&mut self); + fn fd(&self) -> i32; + } +} + +impl AutoCloseFd { // Closes the fd if not already closed. pub fn close(&mut self) { if self.fd_ != -1 { @@ -37,24 +54,41 @@ pub fn reset(&mut self, fd: RawFd) { self.fd_ = fd; } - // \return if this has a valid fd. - pub fn valid(&self) -> bool { + // Returns if this has a valid fd. + pub fn is_valid(&self) -> bool { self.fd_ >= 0 } - // Construct, taking ownership of an fd. - pub fn new(fd: RawFd) -> autoclose_fd_t { - autoclose_fd_t { fd_: fd } + // Create a new AutoCloseFd instance taking ownership of the passed fd + pub fn new(fd: RawFd) -> Self { + AutoCloseFd { fd_: fd } + } + + // Create a new AutoCloseFd without an open fd + pub fn empty() -> Self { + AutoCloseFd { fd_: -1 } } } -impl Default for autoclose_fd_t { - fn default() -> autoclose_fd_t { - autoclose_fd_t { fd_: -1 } +impl FromRawFd for AutoCloseFd { + unsafe fn from_raw_fd(fd: RawFd) -> Self { + AutoCloseFd { fd_: fd } } } -impl Drop for autoclose_fd_t { +impl AsRawFd for AutoCloseFd { + fn as_raw_fd(&self) -> RawFd { + self.fd() + } +} + +impl Default for AutoCloseFd { + fn default() -> AutoCloseFd { + AutoCloseFd { fd_: -1 } + } +} + +impl Drop for AutoCloseFd { fn drop(&mut self) { self.close() } @@ -64,10 +98,10 @@ fn drop(&mut self) { #[derive(Default)] pub struct autoclose_pipes_t { /// Read end of the pipe. - pub read: autoclose_fd_t, + pub read: AutoCloseFd, /// Write end of the pipe. - pub write: autoclose_fd_t, + pub write: AutoCloseFd, } /// Construct a pair of connected pipes, set to close-on-exec. @@ -75,9 +109,9 @@ pub struct autoclose_pipes_t { pub fn make_autoclose_pipes() -> Option { let pipes = ffi::make_pipes_ffi(); - let readp = autoclose_fd_t::new(pipes.read); - let writep = autoclose_fd_t::new(pipes.write); - if !readp.valid() || !writep.valid() { + let readp = AutoCloseFd::new(pipes.read); + let writep = AutoCloseFd::new(pipes.write); + if !readp.is_valid() || !writep.is_valid() { None } else { Some(autoclose_pipes_t { diff --git a/fish-rust/src/ffi.rs b/fish-rust/src/ffi.rs index b39db82fc..be576f120 100644 --- a/fish-rust/src/ffi.rs +++ b/fish-rust/src/ffi.rs @@ -1,9 +1,7 @@ use crate::wchar; -#[rustfmt::skip] -use ::std::pin::Pin; -#[rustfmt::skip] -use ::std::slice; use autocxx::prelude::*; +use core::pin::Pin; +use core::slice; use cxx::SharedPtr; // autocxx has been hacked up to know about this. @@ -73,6 +71,8 @@ generate!("sig2wcs") generate!("wcs2sig") generate!("signal_get_desc") + + generate!("fd_event_signaller_t") } impl parser_t { @@ -135,3 +135,19 @@ impl Repin for output_stream_t {} pub use autocxx::c_int; pub use ffi::*; pub use libc::c_char; + +/// A version of [`* const core::ffi::c_void`] (or [`* const libc::c_void`], if you prefer) that +/// implements `Copy` and `Clone`, because those two don't. Used to represent a `void *` ptr for ffi +/// purposes. +#[repr(transparent)] +#[derive(Copy, Clone)] +pub struct void_ptr(pub *const core::ffi::c_void); + +impl core::fmt::Debug for void_ptr { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{:p}", &self.0) + } +} + +unsafe impl Send for void_ptr {} +unsafe impl Sync for void_ptr {} diff --git a/fish-rust/src/lib.rs b/fish-rust/src/lib.rs index 9a12c3e69..55f55768f 100644 --- a/fish-rust/src/lib.rs +++ b/fish-rust/src/lib.rs @@ -5,6 +5,7 @@ #![allow(clippy::manual_is_ascii_check)] mod common; +mod fd_monitor; mod fd_readable_set; mod fds; #[allow(rustdoc::broken_intra_doc_links)] @@ -22,6 +23,7 @@ mod redirection; mod signal; mod smoke; +mod threads; mod timer; mod tokenizer; mod topic_monitor; diff --git a/fish-rust/src/threads.rs b/fish-rust/src/threads.rs new file mode 100644 index 000000000..aa7c35198 --- /dev/null +++ b/fish-rust/src/threads.rs @@ -0,0 +1,67 @@ +//! The rusty version of iothreads from the cpp code, to be consumed by native rust code. This isn't +//! ported directly from the cpp code so we can use rust threads instead of using pthreads. + +use crate::flog::FLOG; + +/// The rusty version of `iothreads::make_detached_pthread()`. We will probably need a +/// `spawn_scoped` version of the same to handle some more advanced borrow cases safely, and maybe +/// an unsafe version that doesn't do any lifetime checking akin to +/// `spawn_unchecked()`[std::thread::Builder::spawn_unchecked], which is a nightly-only feature. +/// +/// Returns a boolean indicating whether or not the thread was successfully launched. Failure here +/// is not dependent on the passed callback and implies a system error (likely insufficient +/// resources). +pub fn spawn(callback: F) -> bool { + // The spawned thread inherits our signal mask. Temporarily block signals, spawn the thread, and + // then restore it. But we must not block SIGBUS, SIGFPE, SIGILL, or SIGSEGV; that's undefined + // (#7837). Conservatively don't try to mask SIGKILL or SIGSTOP either; that's ignored on Linux + // but maybe has an effect elsewhere. + let saved_set = unsafe { + let mut new_set: libc::sigset_t = std::mem::zeroed(); + let new_set = &mut new_set as *mut _; + libc::sigfillset(new_set); + libc::sigdelset(new_set, libc::SIGILL); // bad jump + libc::sigdelset(new_set, libc::SIGFPE); // divide-by-zero + libc::sigdelset(new_set, libc::SIGBUS); // unaligned memory access + libc::sigdelset(new_set, libc::SIGSEGV); // bad memory access + libc::sigdelset(new_set, libc::SIGSTOP); // unblockable + libc::sigdelset(new_set, libc::SIGKILL); // unblockable + + let mut saved_set: libc::sigset_t = std::mem::zeroed(); + let result = libc::pthread_sigmask(libc::SIG_BLOCK, new_set, &mut saved_set as *mut _); + assert_eq!(result, 0, "Failed to override thread signal mask!"); + saved_set + }; + + // Spawn a thread. If this fails, it means there's already a bunch of threads; it is very + // unlikely that they are all on the verge of exiting, so one is likely to be ready to handle + // extant requests. So we can ignore failure with some confidence. + // We don't have to port the PTHREAD_CREATE_DETACHED logic. Rust threads are detached + // automatically if the returned join handle is dropped. + + let result = match std::thread::Builder::new().spawn(|| callback()) { + Ok(handle) => { + let id = handle.thread().id(); + FLOG!(iothread, "rust thread", id, "spawned"); + // Drop the handle to detach the thread + drop(handle); + true + } + Err(e) => { + eprintln!("rust thread spawn failure: {e}"); + false + } + }; + + // Restore our sigmask + unsafe { + let result = libc::pthread_sigmask( + libc::SIG_SETMASK, + &saved_set as *const _, + std::ptr::null_mut(), + ); + assert_eq!(result, 0, "Failed to restore thread signal mask!"); + }; + + result +} diff --git a/fish-rust/src/wutil/mod.rs b/fish-rust/src/wutil/mod.rs index ae29f5cca..f4bec1c99 100644 --- a/fish-rust/src/wutil/mod.rs +++ b/fish-rust/src/wutil/mod.rs @@ -2,6 +2,24 @@ pub mod gettext; mod wcstoi; +use std::io::Write; + pub(crate) use format::printf::sprintf; pub(crate) use gettext::{wgettext, wgettext_fmt}; pub use wcstoi::*; + +/// Port of the wide-string wperror from `src/wutil.cpp` but for rust `&str`. +pub fn perror(s: &str) { + let e = errno::errno().0; + let mut stderr = std::io::stderr().lock(); + if !s.is_empty() { + let _ = write!(stderr, "{s}: "); + } + let slice = unsafe { + let msg = libc::strerror(e) as *const u8; + let len = libc::strlen(msg as *const _); + std::slice::from_raw_parts(msg, len) + }; + let _ = stderr.write_all(slice); + let _ = stderr.write_all(b"\n"); +} diff --git a/src/fd_monitor.cpp b/src/fd_monitor.cpp deleted file mode 100644 index 7f932c53f..000000000 --- a/src/fd_monitor.cpp +++ /dev/null @@ -1,215 +0,0 @@ -// Support for monitoring a set of fds. -#include "config.h" // IWYU pragma: keep - -#include "fd_monitor.h" - -#include - -#include -#include -#include //this_thread::sleep_for -#include - -#include "flog.h" -#include "iothread.h" -#include "wutil.h" - -static constexpr uint64_t kUsecPerMsec = 1000; - -fd_monitor_t::fd_monitor_t() = default; - -fd_monitor_t::~fd_monitor_t() { - // In ordinary usage, we never invoke the dtor. - // This is used in the tests to not leave stale fds around. - // That is why this is very hacky! - data_.acquire()->terminate = true; - change_signaller_.post(); - while (data_.acquire()->running) { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } -} - -fd_monitor_item_id_t fd_monitor_t::add(fd_monitor_item_t &&item) { - assert(item.fd.valid() && "Invalid fd"); - assert(item.timeout_usec != 0 && "Invalid timeout"); - assert(item.item_id == 0 && "Item should not already have an ID"); - bool start_thread = false; - fd_monitor_item_id_t item_id{}; - { - // Lock around a local region. - auto data = data_.acquire(); - - // Assign an id and add the item to pending. - item_id = ++data->last_id; - item.item_id = item_id; - data->pending.push_back(std::move(item)); - - // Maybe plan to start the thread. - if (!data->running) { - FLOG(fd_monitor, "Thread starting"); - data->running = true; - start_thread = true; - } - } - if (start_thread) { - void *(*trampoline)(void *) = [](void *self) -> void * { - static_cast(self)->run_in_background(); - return nullptr; - }; - bool made_thread = make_detached_pthread(trampoline, this); - if (!made_thread) { - DIE("Unable to create a new pthread"); - } - } - // Tickle our signaller. - change_signaller_.post(); - return item_id; -} - -void fd_monitor_t::poke_item(fd_monitor_item_id_t item_id) { - assert(item_id > 0 && "Invalid item ID"); - bool needs_notification = false; - { - auto data = data_.acquire(); - needs_notification = data->pokelist.empty(); - // Insert it, sorted. - auto where = std::lower_bound(data->pokelist.begin(), data->pokelist.end(), item_id); - data->pokelist.insert(where, item_id); - } - if (needs_notification) { - change_signaller_.post(); - } -} - -uint64_t fd_monitor_item_t::usec_remaining(const time_point_t &now) const { - assert(last_time.has_value() && "Should always have a last_time"); - if (timeout_usec == kNoTimeout) return kNoTimeout; - assert(now >= *last_time && "steady clock went backwards!"); - uint64_t since = static_cast( - std::chrono::duration_cast(now - *last_time).count()); - return since >= timeout_usec ? 0 : timeout_usec - since; -} - -bool fd_monitor_item_t::service_item(const fd_readable_set_t &fds, const time_point_t &now) { - bool should_retain = true; - bool readable = fds.test(fd.fd()); - bool timed_out = !readable && usec_remaining(now) == 0; - if (readable || timed_out) { - last_time = now; - item_wake_reason_t reason = - readable ? item_wake_reason_t::readable : item_wake_reason_t::timeout; - callback(fd, reason); - should_retain = fd.valid(); - } - return should_retain; -} - -bool fd_monitor_item_t::poke_item(const poke_list_t &pokelist) { - if (item_id == 0 || !std::binary_search(pokelist.begin(), pokelist.end(), item_id)) { - // Not pokeable or not in the pokelist. - return true; - } - callback(fd, item_wake_reason_t::poke); - return fd.valid(); -} - -void fd_monitor_t::run_in_background() { - ASSERT_IS_BACKGROUND_THREAD(); - poke_list_t pokelist; - auto fds_box = new_fd_readable_set(); - auto &fds = *fds_box; - for (;;) { - // Poke any items that need it. - if (!pokelist.empty()) { - this->poke_in_background(pokelist); - pokelist.clear(); - } - - fds.clear(); - - // Our change_signaller is special cased. - int change_signal_fd = change_signaller_.read_fd(); - fds.add(change_signal_fd); - - auto now = std::chrono::steady_clock::now(); - uint64_t timeout_usec = kNoTimeout; - - for (auto &item : items_) { - fds.add(item.fd.fd()); - if (!item.last_time.has_value()) item.last_time = now; - timeout_usec = std::min(timeout_usec, item.usec_remaining(now)); - } - - // If we have no items, then we wish to allow the thread to exit, but after a time, so we - // aren't spinning up and tearing down the thread repeatedly. - // Set a timeout of 256 msec; if nothing becomes readable by then we will exit. - // We refer to this as the wait-lap. - bool is_wait_lap = (items_.size() == 0); - if (is_wait_lap) { - assert(timeout_usec == kNoTimeout && "Should not have a timeout on wait-lap"); - timeout_usec = 256 * kUsecPerMsec; - } - - // Call select(). - int ret = fds.check_readable(timeout_usec); - if (ret < 0 && errno != EINTR) { - // Surprising error. - wperror(L"select"); - } - - // A predicate which services each item in turn, returning true if it should be removed. - auto servicer = [&fds, &now](fd_monitor_item_t &item) { - int fd = item.fd.fd(); - bool remove = !item.service_item(fds, now); - if (remove) FLOG(fd_monitor, "Removing fd", fd); - return remove; - }; - - // Service all items that are either readable or timed out, and remove any which say to do - // so. - now = std::chrono::steady_clock::now(); - items_.erase(std::remove_if(items_.begin(), items_.end(), servicer), items_.end()); - - // Handle any changes if the change signaller was set. Alternatively this may be the wait - // lap, in which case we might want to commit to exiting. - bool change_signalled = fds.test(change_signal_fd); - if (change_signalled || is_wait_lap) { - // Clear the change signaller before processing incoming changes. - change_signaller_.try_consume(); - auto data = data_.acquire(); - - // Move from 'pending' to 'items'. - items_.insert(items_.end(), std::make_move_iterator(data->pending.begin()), - std::make_move_iterator(data->pending.end())); - data->pending.clear(); - - // Grab any pokelist. - assert(pokelist.empty() && "pokelist should be empty or else we're dropping pokes"); - pokelist = std::move(data->pokelist); - data->pokelist.clear(); - - if (data->terminate || - (is_wait_lap && items_.empty() && pokelist.empty() && !change_signalled)) { - // Maybe terminate is set. - // Alternatively, maybe we had no items, waited a bit, and still have no items. - // It's important to do this while holding the lock, otherwise we race with new - // items being added. - assert(data->running && "Thread should be running because we're that thread"); - FLOG(fd_monitor, "Thread exiting"); - data->running = false; - return; - } - } - } -} - -void fd_monitor_t::poke_in_background(const poke_list_t &pokelist) { - ASSERT_IS_BACKGROUND_THREAD(); - auto poker = [&pokelist](fd_monitor_item_t &item) { - int fd = item.fd.fd(); - bool remove = !item.poke_item(pokelist); - if (remove) FLOG(fd_monitor, "Removing fd", fd); - return remove; - }; - items_.erase(std::remove_if(items_.begin(), items_.end(), poker), items_.end()); -} diff --git a/src/fd_monitor.h b/src/fd_monitor.h deleted file mode 100644 index 311606940..000000000 --- a/src/fd_monitor.h +++ /dev/null @@ -1,140 +0,0 @@ -#ifndef FISH_FD_MONITOR_H -#define FISH_FD_MONITOR_H - -#include -#include -#include -#include -#include - -// Needed for musl -#include // IWYU pragma: keep - -#include "common.h" -#include "fd_readable_set.rs.h" -#include "fds.h" -#include "maybe.h" - -/// Each item added to fd_monitor_t is assigned a unique ID, which is not recycled. -/// Items may have their callback triggered immediately by passing the ID. -/// Zero is a sentinel. -using fd_monitor_item_id_t = uint64_t; - -/// Reasons for waking an item. -enum class item_wake_reason_t { - readable, // the fd became readable - timeout, // the requested timeout was hit - poke, // the item was "poked" (woken up explicitly) -}; - -/// An item containing an fd and callback, which can be monitored to watch when it becomes readable, -/// and invoke the callback. -struct fd_monitor_item_t { - /// The callback type for the item. It is passed \p fd, and the reason for waking \p reason. - /// The callback may close \p fd, in which case the item is removed. - using callback_t = std::function; - - /// The fd to monitor. - autoclose_fd_t fd{}; - - /// A callback to be invoked when the fd is readable, or when we are timed out. - /// If we time out, then timed_out will be true. - /// If the fd is invalid on return from the function, then the item is removed. - callback_t callback{}; - - /// The timeout in microseconds, or kNoTimeout for none. - /// 0 timeouts are unsupported. - uint64_t timeout_usec{kNoTimeout}; - - /// Construct from a file, callback, and optional timeout. - fd_monitor_item_t(autoclose_fd_t fd, callback_t callback, uint64_t timeout_usec = kNoTimeout) - : fd(std::move(fd)), callback(std::move(callback)), timeout_usec(timeout_usec) { - assert(timeout_usec > 0 && "Invalid timeout"); - } - - fd_monitor_item_t() = default; - - private: - // Fields and methods for the private use of fd_monitor_t. - using time_point_t = std::chrono::time_point; - - // The last time we were called, or the initialization point. - maybe_t last_time{}; - - // The ID for this item. This is assigned by the fd monitor. - fd_monitor_item_id_t item_id{0}; - - // \return the number of microseconds until the timeout should trigger, or kNoTimeout for none. - // A 0 return means we are at or past the timeout. - uint64_t usec_remaining(const time_point_t &now) const; - - // Invoke this item's callback if its value is set in fd or has timed out. - // \return true to retain the item, false to remove it. - bool service_item(const fd_readable_set_t &fds, const time_point_t &now); - - // Invoke this item's callback with a poke, if its ID is present in the (sorted) pokelist. - // \return true to retain the item, false to remove it. - using poke_list_t = std::vector; - bool poke_item(const poke_list_t &pokelist); - - friend class fd_monitor_t; -}; - -/// A class which can monitor a set of fds, invoking a callback when any becomes readable, or when -/// per-item-configurable timeouts are hit. -class fd_monitor_t { - public: - using item_list_t = std::vector; - - // A "pokelist" is a sorted list of item IDs which need explicit wakeups. - using poke_list_t = std::vector; - - fd_monitor_t(); - ~fd_monitor_t(); - - /// Add an item to monitor. \return the ID assigned to the item. - fd_monitor_item_id_t add(fd_monitor_item_t &&item); - - /// Mark that an item with a given ID needs to be explicitly woken up. - void poke_item(fd_monitor_item_id_t item_id); - - private: - // The background thread runner. - void run_in_background(); - - // If our self-signaller is reported as ready, this reads from it and handles any changes. - // Called in the background thread. - void handle_self_signal_in_background(); - - // Poke items in the pokelist, removing any items that close their FD. - // The pokelist is consumed after this. - // This is only called in the background thread. - void poke_in_background(const poke_list_t &pokelist); - - // The list of items to monitor. This is only accessed on the background thread. - item_list_t items_{}; - - struct data_t { - /// Pending items. This is set under the lock, then the background thread grabs them. - item_list_t pending{}; - - /// List of IDs for items that need to be poked (explicitly woken up). - poke_list_t pokelist{}; - - /// The last ID assigned, or if none. - fd_monitor_item_id_t last_id{0}; - - /// Whether the thread is running. - bool running{false}; - - // Set if we should terminate. - bool terminate{false}; - }; - owning_lock data_; - - /// Our self-signaller. When this is written to, it means there are new items pending, or new - /// items in the pokelist, or terminate is set. - fd_event_signaller_t change_signaller_; -}; - -#endif diff --git a/src/fds.cpp b/src/fds.cpp index 225b6b7b4..408d8af46 100644 --- a/src/fds.cpp +++ b/src/fds.cpp @@ -29,6 +29,10 @@ void autoclose_fd_t::close() { fd_ = -1; } +std::shared_ptr ffi_new_fd_event_signaller_t() { + return std::make_shared(); +} + #ifdef HAVE_EVENTFD // Note we do not want to use EFD_SEMAPHORE because we are binary (not counting) semaphore. fd_event_signaller_t::fd_event_signaller_t() { @@ -78,7 +82,7 @@ bool fd_event_signaller_t::try_consume() const { return ret > 0; } -void fd_event_signaller_t::post() { +void fd_event_signaller_t::post() const { // eventfd writes uint64; pipes write 1 byte. #ifdef HAVE_EVENTFD const uint64_t c = 1; diff --git a/src/fds.h b/src/fds.h index 0f5b508ce..0aed9b840 100644 --- a/src/fds.h +++ b/src/fds.h @@ -109,7 +109,7 @@ class fd_event_signaller_t { /// Mark that an event has been received. This may be coalesced. /// This retries on EINTR. - void post(); + void post() const; /// Perform a poll to see if an event is received. /// If \p wait is set, wait until it is readable; this does not consume the event @@ -135,6 +135,8 @@ class fd_event_signaller_t { #endif }; +std::shared_ptr ffi_new_fd_event_signaller_t(); + /// Sets CLO_EXEC on a given fd according to the value of \p should_set. int set_cloexec(int fd, bool should_set = true); diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 2642541bb..f04dc7748 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -41,6 +41,8 @@ #include #include #include +#include "fds.rs.h" +#include "parse_constants.rs.h" #ifdef FISH_CI_SAN #include @@ -59,7 +61,7 @@ #include "env_universal_common.h" #include "expand.h" #include "fallback.h" // IWYU pragma: keep -#include "fd_monitor.h" +#include "fd_monitor.rs.h" #include "fd_readable_set.rs.h" #include "fds.h" #include "ffi_init.rs.h" @@ -806,36 +808,42 @@ static void test_fd_monitor() { std::atomic length_read{0}; std::atomic pokes{0}; std::atomic total_calls{0}; - fd_monitor_item_id_t item_id{0}; + uint64_t item_id{0}; bool always_exit{false}; - fd_monitor_item_t item; + std::unique_ptr> item; autoclose_fd_t writer; + void callback(autoclose_fd_t2 &fd, item_wake_reason_t reason) { + bool was_closed = false; + switch (reason) { + case item_wake_reason_t::Timeout: + this->did_timeout = true; + break; + case item_wake_reason_t::Poke: + this->pokes += 1; + break; + case item_wake_reason_t::Readable: + char buff[4096]; + ssize_t amt = read(fd.fd(), buff, sizeof buff); + this->length_read += amt; + was_closed = (amt == 0); + break; + } + total_calls += 1; + if (always_exit || was_closed) { + fd.close(); + } + } + + static void trampoline(autoclose_fd_t2 &fd, item_wake_reason_t reason, c_void *param) { + auto &instance = *(item_maker_t*)(param); + instance.callback(fd, reason); + } + explicit item_maker_t(uint64_t timeout_usec) { auto pipes = make_autoclose_pipes().acquire(); writer = std::move(pipes.write); - auto callback = [this](autoclose_fd_t &fd, item_wake_reason_t reason) { - bool was_closed = false; - switch (reason) { - case item_wake_reason_t::timeout: - this->did_timeout = true; - break; - case item_wake_reason_t::poke: - this->pokes += 1; - break; - case item_wake_reason_t::readable: - char buff[4096]; - ssize_t amt = read(fd.fd(), buff, sizeof buff); - this->length_read += amt; - was_closed = (amt == 0); - break; - } - total_calls += 1; - if (always_exit || was_closed) { - fd.close(); - } - }; - item = fd_monitor_item_t(std::move(pipes.read), std::move(callback), timeout_usec); + item = std::make_unique>(make_fd_monitor_item_t(pipes.read.acquire(), timeout_usec, (c_void *)item_maker_t::trampoline, (c_void*)this)); } // Write 42 bytes to our write end. @@ -871,18 +879,18 @@ static void test_fd_monitor() { item_oneshot.always_exit = true; { - fd_monitor_t monitor; + auto monitor = make_fd_monitor_t(); for (item_maker_t *item : {&item_never, &item_hugetimeout, &item0_timeout, &item42_timeout, &item42_nottimeout, &item42_thenclose, &item_pokee, &item_oneshot}) { - item->item_id = monitor.add(std::move(item->item)); + item->item_id = monitor->add(std::move(*(std::move(item->item)))); } item42_timeout.write42(); item42_nottimeout.write42(); item42_thenclose.write42(); item42_thenclose.writer.close(); item_oneshot.write42(); - monitor.poke_item(item_pokee.item_id); + monitor->poke_item(item_pokee.item_id); // May need to loop here to ensure our fd_monitor gets scheduled - see #7699. for (int i = 0; i < 100; i++) { diff --git a/src/io.cpp b/src/io.cpp index f8cb64b17..dbea5faa3 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -14,7 +14,9 @@ #include "common.h" #include "fallback.h" // IWYU pragma: keep -#include "fd_monitor.h" +#include "fd_monitor.rs.h" +#include "fds.h" +#include "fds.rs.h" #include "flog.h" #include "maybe.h" #include "path.h" @@ -31,7 +33,7 @@ /// Provide the fd monitor used for background fillthread operations. static fd_monitor_t &fd_monitor() { // Deliberately leaked to avoid shutdown dtors. - static auto fdm = new fd_monitor_t(); + static auto fdm = make_fd_monitor_t(); return *fdm; } @@ -75,6 +77,18 @@ ssize_t io_buffer_t::read_once(int fd, acquired_lock &buffer return amt; } +struct callback_args_t { + io_buffer_t *instance; + std::shared_ptr> promise; +}; + +extern "C" { +static void item_callback_trampoline(autoclose_fd_t2 &fd, item_wake_reason_t reason, + callback_args_t *args) { + (args->instance)->item_callback(fd, (uint8_t)reason, args); +} +} + void io_buffer_t::begin_filling(autoclose_fd_t fd) { assert(!fillthread_running() && "Already have a fillthread"); @@ -102,38 +116,51 @@ void io_buffer_t::begin_filling(autoclose_fd_t fd) { // Run our function to read until the receiver is closed. // It's OK to capture 'this' by value because 'this' waits for the promise in its dtor. - fd_monitor_item_t item; - item.fd = std::move(fd); - item.callback = [this, promise](autoclose_fd_t &fd, item_wake_reason_t reason) { - ASSERT_IS_BACKGROUND_THREAD(); - // Only check the shutdown flag if we timed out or were poked. - // It's important that if select() indicated we were readable, that we call select() again - // allowing it to time out. Note the typical case is that the fd will be closed, in which - // case select will return immediately. - bool done = false; - if (reason == item_wake_reason_t::readable) { - // select() reported us as readable; read a bit. - auto buffer = buffer_.acquire(); - ssize_t ret = read_once(fd.fd(), buffer); - done = (ret == 0 || (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK)); - } else if (shutdown_fillthread_) { - // Here our caller asked us to shut down; read while we keep getting data. - // This will stop when the fd is closed or if we get EAGAIN. - auto buffer = buffer_.acquire(); - ssize_t ret; - do { - ret = read_once(fd.fd(), buffer); - } while (ret > 0); - done = true; - } - if (done) { - fd.close(); - promise->set_value(); - } - }; - this->item_id_ = fd_monitor().add(std::move(item)); + auto args = new callback_args_t; + args->instance = this; + args->promise = std::move(promise); + + item_id_ = + fd_monitor().add_item(fd.acquire(), kNoTimeout, (::c_void *)item_callback_trampoline, (::c_void *)args); } +/// This is a hack to work around the difficulties in passing a capturing lambda across FFI +/// boundaries. A static function that takes a generic/untyped callback parameter is easy to +/// marshall with the basic C ABI. +void io_buffer_t::item_callback(autoclose_fd_t2 &fd, uint8_t r, callback_args_t *args) { + item_wake_reason_t reason = (item_wake_reason_t)r; + auto &promise = *args->promise; + + // Only check the shutdown flag if we timed out or were poked. + // It's important that if select() indicated we were readable, that we call select() again + // allowing it to time out. Note the typical case is that the fd will be closed, in which + // case select will return immediately. + bool done = false; + if (reason == item_wake_reason_t::Readable) { + // select() reported us as readable; read a bit. + auto buffer = buffer_.acquire(); + ssize_t ret = read_once(fd.fd(), buffer); + done = (ret == 0 || (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK)); + } else if (shutdown_fillthread_) { + // Here our caller asked us to shut down; read while we keep getting data. + // This will stop when the fd is closed or if we get EAGAIN. + auto buffer = buffer_.acquire(); + ssize_t ret; + do { + ret = read_once(fd.fd(), buffer); + } while (ret > 0); + done = true; + } + if (done) { + fd.close(); + promise.set_value(); + // When we close the fd, we signal to the caller that the fd should be removed from its set + // and that this callback should never be called again. + // Manual memory management is not nice but this is just during the cpp-to-rust transition. + delete args; + } +}; + separated_buffer_t io_buffer_t::complete_background_fillthread_and_take_buffer() { // Mark that our fillthread is done, then wake it up. assert(fillthread_running() && "Should have a fillthread"); diff --git a/src/io.h b/src/io.h index 6908e598f..ace06e958 100644 --- a/src/io.h +++ b/src/io.h @@ -275,6 +275,9 @@ class io_bufferfill_t final : public io_data_t { static separated_buffer_t finish(std::shared_ptr &&filler); }; +struct callback_args_t; +struct autoclose_fd_t2; + /// An io_buffer_t is a buffer which can populate itself by reading from an fd. /// It is not an io_data_t. class io_buffer_t { @@ -291,6 +294,9 @@ class io_buffer_t { /// \return true if output was discarded due to exceeding the read limit. bool discarded() { return buffer_.acquire()->discarded(); } + /// FFI callback workaround. + void item_callback(autoclose_fd_t2 &fd, uint8_t reason, callback_args_t *args); + private: /// Read some, filling the buffer. The buffer is passed in to enforce that the append lock is /// held. \return positive on success, 0 if closed, -1 on error (in which case errno will be