Port make_autoclose_pipes, fd_event_signaller_t

This allows to get rid of the C++ autoclose_fd_t.
This commit is contained in:
Johannes Altmanninger
2023-12-09 22:47:24 +01:00
parent f2cd916f65
commit f3dd8d306f
19 changed files with 278 additions and 496 deletions

View File

@@ -23,7 +23,12 @@ fn main() {
if compiles("fish-rust/src/cfg/w_exitcode.cpp") {
println!("cargo:rustc-cfg=HAVE_WAITSTATUS_SIGNAL_RET");
}
if compiles("fish-rust/src/cfg/eventfd.c") {
println!("cargo:rustc-cfg=HAVE_EVENTFD");
}
if compiles("fish-rust/src/cfg/pipe2.c") {
println!("cargo:rustc-cfg=HAVE_PIPE2");
}
if compiles("fish-rust/src/cfg/spawn.c") {
println!("cargo:rustc-cfg=FISH_USE_POSIX_SPAWN");
}

View File

@@ -0,0 +1 @@
#include <sys/eventfd.h>

View File

@@ -0,0 +1,2 @@
#include <unistd.h>
int ok = pipe2(0, 0);

View File

@@ -1,17 +1,23 @@
use std::os::unix::prelude::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
pub use self::fd_monitor_ffi::ItemWakeReason;
pub use self::fd_monitor_ffi::{new_fd_event_signaller, FdEventSignaller};
use crate::common::exit_without_destructors;
use crate::fd_readable_set::FdReadableSet;
use crate::fds::AutoCloseFd;
use crate::ffi::void_ptr;
use crate::flog::FLOG;
use crate::threads::assert_is_background_thread;
use crate::wutil::perror;
use cxx::SharedPtr;
use errno::errno;
use libc::{self, c_void, EAGAIN, EINTR, EWOULDBLOCK};
#[cfg(not(HAVE_EVENTFD))]
use crate::fds::{make_autoclose_pipes, make_fd_nonblocking};
#[cfg(HAVE_EVENTFD)]
use libc::{EFD_CLOEXEC, EFD_NONBLOCK};
#[cxx::bridge]
mod fd_monitor_ffi {
@@ -28,22 +34,6 @@ enum ItemWakeReason {
Poke,
}
unsafe extern "C++" {
include!("fds.h");
/// An event signaller implemented using a file descriptor, so it can plug into
/// [`select()`](libc::select).
///
/// This is like a binary semaphore. A call to [`post()`](FdEventSignaller::post) will
/// signal an event, making the fd readable. Multiple calls to `post()` may be coalesced.
/// On Linux this uses [`eventfd()`](libc::eventfd), on other systems this uses a pipe.
/// [`try_consume()`](FdEventSignaller::try_consume) may be used to consume the event.
/// Importantly this is async signal safe. Of course it is `CLO_EXEC` as well.
#[rust_name = "FdEventSignaller"]
type fd_event_signaller_t = crate::ffi::fd_event_signaller_t;
#[rust_name = "new_fd_event_signaller"]
fn ffi_new_fd_event_signaller_t() -> SharedPtr<FdEventSignaller>;
}
extern "Rust" {
#[cxx_name = "fd_monitor_item_id_t"]
type FdMonitorItemId;
@@ -86,9 +76,151 @@ fn add_item_ffi(
}
}
// TODO: Remove once we're no longer using the FFI variant of FdEventSignaller
unsafe impl Sync for FdEventSignaller {}
unsafe impl Send for FdEventSignaller {}
/// An event signaller implemented using a file descriptor, so it can plug into
/// [`select()`](libc::select).
///
/// This is like a binary semaphore. A call to [`post()`](FdEventSignaller::post) will
/// signal an event, making the fd readable. Multiple calls to `post()` may be coalesced.
/// On Linux this uses [`eventfd()`](libc::eventfd), on other systems this uses a pipe.
/// [`try_consume()`](FdEventSignaller::try_consume) may be used to consume the event.
/// Importantly this is async signal safe. Of course it is `CLO_EXEC` as well.
pub struct FdEventSignaller {
// Always the read end of the fd; maybe the write end as well.
fd: AutoCloseFd,
#[cfg(not(HAVE_EVENTFD))]
write: AutoCloseFd,
}
impl FdEventSignaller {
/// The default constructor will abort on failure (fd exhaustion).
/// This should only be used during startup.
pub fn new() -> Self {
#[cfg(HAVE_EVENTFD)]
{
// Note we do not want to use EFD_SEMAPHORE because we are binary (not counting) semaphore.
let fd = unsafe { libc::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK) };
if fd < 0 {
perror("eventfd");
exit_without_destructors(1);
}
Self {
fd: AutoCloseFd::new(fd),
}
}
#[cfg(not(HAVE_EVENTFD))]
{
// Implementation using pipes.
let Some(pipes) = make_autoclose_pipes() else {
perror("pipe");
exit_without_destructors(1);
};
make_fd_nonblocking(pipes.read.fd()).unwrap();
make_fd_nonblocking(pipes.write.fd()).unwrap();
Self {
fd: pipes.read,
write: pipes.write,
}
}
}
/// \return the fd to read from, for notification.
pub fn read_fd(&self) -> RawFd {
self.fd.fd()
}
/// If an event is signalled, consume it; otherwise return.
/// This does not block.
/// This retries on EINTR.
pub fn try_consume(&self) -> bool {
// If we are using eventfd, we want to read a single uint64.
// If we are using pipes, read a lot; note this may leave data on the pipe if post has been
// called many more times. In no case do we care about the data which is read.
#[cfg(HAVE_EVENTFD)]
let mut buff = [0_u64; 1];
#[cfg(not(HAVE_EVENTFD))]
let mut buff = [0_u8; 1024];
let mut ret;
loop {
ret = unsafe {
libc::read(
self.read_fd(),
&mut buff as *mut _ as *mut c_void,
std::mem::size_of_val(&buff),
)
};
if ret >= 0 || errno().0 != EINTR {
break;
}
}
if ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno().0) {
perror("read");
}
ret > 0
}
/// Mark that an event has been received. This may be coalesced.
/// This retries on EINTR.
pub fn post(&self) {
// eventfd writes uint64; pipes write 1 byte.
#[cfg(HAVE_EVENTFD)]
let c = 1_u64;
#[cfg(not(HAVE_EVENTFD))]
let c = 1_u8;
let mut ret;
loop {
ret = unsafe {
libc::write(
self.write_fd(),
&c as *const _ as *const c_void,
std::mem::size_of_val(&c),
)
};
if ret >= 0 || errno().0 != EINTR {
break;
}
}
// EAGAIN occurs if either the pipe buffer is full or the eventfd overflows (very unlikely).
if ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno().0) {
perror("write");
}
}
/// Perform a poll to see if an event is received.
/// If \p wait is set, wait until it is readable; this does not consume the event
/// but guarantees that the next call to wait() will not block.
/// \return true if readable, false if not readable, or not interrupted by a signal.
pub fn poll(&self, wait: bool /* = false */) -> bool {
let mut timeout = libc::timeval {
tv_sec: 0,
tv_usec: 0,
};
let mut fds: libc::fd_set = unsafe { std::mem::zeroed() };
unsafe { libc::FD_ZERO(&mut fds) };
unsafe { libc::FD_SET(self.read_fd(), &mut fds) };
let res = unsafe {
libc::select(
self.read_fd() + 1,
&mut fds,
std::ptr::null_mut(),
std::ptr::null_mut(),
if wait {
std::ptr::null_mut()
} else {
&mut timeout
},
)
};
res > 0
}
/// \return the fd to write to.
fn write_fd(&self) -> RawFd {
#[cfg(HAVE_EVENTFD)]
return self.fd.fd();
#[cfg(not(HAVE_EVENTFD))]
return self.write.fd();
}
}
/// Each item added to fd_monitor_t is assigned a unique ID, which is not recycled. Items may have
/// their callback triggered immediately by passing the ID. Zero is a sentinel.
@@ -301,7 +433,7 @@ fn new_fd_monitor_item_ffi(
pub struct FdMonitor {
/// Our self-signaller. When this is written to, it means there are new items pending, new items
/// in the poke list, or terminate has been set.
change_signaller: SharedPtr<FdEventSignaller>,
change_signaller: Arc<FdEventSignaller>,
/// The data shared between the background thread and the `FdMonitor` instance.
data: Arc<Mutex<SharedData>>,
/// The last ID assigned or `0` if none.
@@ -337,7 +469,7 @@ struct BackgroundFdMonitor {
items: Vec<FdMonitorItem>,
/// Our self-signaller. When this is written to, it means there are new items pending, new items
/// in the poke list, or terminate has been set.
change_signaller: SharedPtr<FdEventSignaller>,
change_signaller: Weak<FdEventSignaller>,
/// The data shared between the background thread and the `FdMonitor` instance.
data: Arc<Mutex<SharedData>>,
}
@@ -377,7 +509,7 @@ pub fn add(&self, mut item: FdMonitorItem) -> FdMonitorItemId {
FLOG!(fd_monitor, "Thread starting");
let background_monitor = BackgroundFdMonitor {
data: Arc::clone(&self.data),
change_signaller: SharedPtr::clone(&self.change_signaller),
change_signaller: Arc::downgrade(&self.change_signaller),
items: Vec::new(),
};
crate::threads::spawn(move || {
@@ -441,7 +573,7 @@ pub fn new() -> Self {
running: false,
terminate: false,
})),
change_signaller: new_fd_event_signaller(),
change_signaller: Arc::new(FdEventSignaller::new()),
last_id: AtomicU64::new(0),
}
}
@@ -465,7 +597,7 @@ fn run(mut self) {
fds.clear();
// Our change_signaller is special-cased
let change_signal_fd = self.change_signaller.read_fd().into();
let change_signal_fd = self.change_signaller.upgrade().unwrap().read_fd();
fds.add(change_signal_fd);
let mut now = Instant::now();
@@ -535,7 +667,7 @@ fn run(mut self) {
let change_signalled = fds.test(change_signal_fd);
if change_signalled || is_wait_lap {
// Clear the change signaller before processing incoming changes
self.change_signaller.try_consume();
self.change_signaller.upgrade().unwrap().try_consume();
let mut data = self.data.lock().expect("Mutex poisoned!");
// Move from `pending` to the end of `items`

View File

@@ -1,8 +1,11 @@
use crate::common::wcs2zstring;
use crate::ffi;
use crate::flog::FLOG;
use crate::wchar::prelude::*;
use crate::wutil::perror;
use libc::{c_int, EINTR, FD_CLOEXEC, F_GETFD, F_GETFL, F_SETFD, F_SETFL, O_CLOEXEC, O_NONBLOCK};
use libc::{
c_int, EINTR, FD_CLOEXEC, F_DUPFD_CLOEXEC, F_GETFD, F_GETFL, F_SETFD, F_SETFL, O_CLOEXEC,
O_NONBLOCK,
};
use nix::unistd;
use std::ffi::CStr;
use std::io::{self, Read, Write};
@@ -59,12 +62,16 @@ mod autoclose_fd_t {
#[cxx_name = "autoclose_fd_t2"]
type AutoCloseFd;
fn new_autoclose_fd(fd: i32) -> Box<AutoCloseFd>;
#[cxx_name = "valid"]
fn is_valid(&self) -> bool;
fn close(&mut self);
fn fd(&self) -> i32;
}
}
fn new_autoclose_fd(fd: i32) -> Box<AutoCloseFd> {
Box::new(AutoCloseFd::new(fd))
}
impl AutoCloseFd {
// Closes the fd if not already closed.
@@ -149,18 +156,69 @@ pub struct AutoClosePipes {
/// Construct a pair of connected pipes, set to close-on-exec.
/// \return None on fd exhaustion.
pub fn make_autoclose_pipes() -> Option<AutoClosePipes> {
let pipes = ffi::make_pipes_ffi();
let mut pipes: [c_int; 2] = [-1, -1];
let readp = AutoCloseFd::new(pipes.read);
let writep = AutoCloseFd::new(pipes.write);
if !readp.is_valid() || !writep.is_valid() {
None
} else {
Some(AutoClosePipes {
read: readp,
write: writep,
})
let already_cloexec = false;
#[cfg(HAVE_PIPE2)]
{
if unsafe { libc::pipe2(&mut pipes[0], O_CLOEXEC) } < 0 {
FLOG!(warning, PIPE_ERROR);
perror("pipe2");
return None;
}
already_cloexec = true;
}
#[cfg(not(HAVE_PIPE2))]
if unsafe { libc::pipe(&mut pipes[0]) } < 0 {
FLOG!(warning, PIPE_ERROR);
perror("pipe2");
return None;
}
let readp = AutoCloseFd::new(pipes[0]);
let writep = AutoCloseFd::new(pipes[1]);
// Ensure our fds are out of the user range.
let readp = heightenize_fd(readp, already_cloexec);
if !readp.is_valid() {
return None;
}
let writep = heightenize_fd(writep, already_cloexec);
if !writep.is_valid() {
return None;
}
Some(AutoClosePipes {
read: readp,
write: writep,
})
}
/// If the given fd is in the "user range", move it to a new fd in the "high range".
/// zsh calls this movefd().
/// \p input_has_cloexec describes whether the input has CLOEXEC already set, so we can avoid
/// setting it again.
/// \return the fd, which always has CLOEXEC set; or an invalid fd on failure, in
/// which case an error will have been printed, and the input fd closed.
fn heightenize_fd(fd: AutoCloseFd, input_has_cloexec: bool) -> AutoCloseFd {
// Check if the fd is invalid or already in our high range.
if !fd.is_valid() {
return fd;
}
if fd.fd() >= FIRST_HIGH_FD {
if !input_has_cloexec {
set_cloexec(fd.fd(), true);
}
return fd;
}
// Here we are asking the kernel to give us a cloexec fd.
let newfd = unsafe { libc::fcntl(fd.fd(), F_DUPFD_CLOEXEC, FIRST_HIGH_FD) };
if newfd < 0 {
perror("fcntl");
return AutoCloseFd::default();
}
return AutoCloseFd::new(newfd);
}
/// Sets CLO_EXEC on a given fd according to the value of \p should_set.

View File

@@ -66,24 +66,18 @@
generate!("activate_flog_categories_by_pattern")
generate!("save_term_foreground_process_group")
generate!("restore_term_foreground_process_group_for_exit")
generate!("set_cloexec")
generate!("builtin_bind")
generate!("builtin_commandline")
generate!("init_input")
generate_pod!("pipes_ffi_t")
generate!("shell_modes_ffi")
generate!("make_pipes_ffi")
generate!("log_extra_to_flog_file")
generate!("wgettext_ptr")
generate!("fd_event_signaller_t")
generate!("highlight_role_t")
generate!("highlight_spec_t")

View File

@@ -34,6 +34,7 @@
ConfigPaths, EnvMode,
},
event::{self, Event},
fds::set_cloexec,
ffi::{self},
flog::{self, activate_flog_categories_by_pattern, set_flog_file_fd, FLOG, FLOGF},
function, future_feature_flags as features, history,
@@ -550,7 +551,7 @@ fn main() -> i32 {
std::process::exit(-1);
}
unsafe { ffi::set_cloexec(c_int(libc::fileno(debug_file)), true) };
set_cloexec(unsafe { libc::fileno(debug_file) }, true);
ffi::flog_setlinebuf_ffi(debug_file as *mut _);
ffi::set_flog_output_file_ffi(debug_file as *mut _);
set_flog_file_fd(unsafe { libc::fileno(debug_file) });

View File

@@ -4,7 +4,9 @@
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::fd_monitor::{FdMonitor, FdMonitorItem, FdMonitorItemId, ItemWakeReason};
use crate::fd_monitor::{
FdEventSignaller, FdMonitor, FdMonitorItem, FdMonitorItemId, ItemWakeReason,
};
use crate::fds::{make_autoclose_pipes, AutoCloseFd};
use crate::ffi_tests::add_test;
@@ -188,3 +190,28 @@ fn write42(&self) {
assert_eq!(item_pokee.total_calls.load(Ordering::Relaxed), 1);
assert_eq!(item_pokee.pokes.load(Ordering::Relaxed), 1);
});
#[test]
fn test_fd_event_signaller() {
let sema = FdEventSignaller::new();
assert!(!sema.try_consume());
assert!(!sema.poll(false));
// Post once.
sema.post();
assert!(sema.poll(false));
assert!(sema.poll(false));
assert!(sema.try_consume());
assert!(!sema.poll(false));
assert!(!sema.try_consume());
// Posts are coalesced.
sema.post();
sema.post();
sema.post();
assert!(sema.poll(false));
assert!(sema.poll(false));
assert!(sema.try_consume());
assert!(!sema.poll(false));
assert!(!sema.try_consume());
}

View File

@@ -38,16 +38,8 @@ impl FloggableDebug for ThreadId {}
static IO_THREAD_POOL: OnceBox<Mutex<ThreadPool>> = OnceBox::new();
/// The event signaller singleton used for completions and queued main thread requests.
static NOTIFY_SIGNALLER: once_cell::sync::Lazy<&'static crate::fd_monitor::FdEventSignaller> =
once_cell::sync::Lazy::new(|| unsafe {
// This is leaked to avoid C++-side destructors. When ported fully to rust, we won't need to
// leak anything.
let signaller = crate::fd_monitor::new_fd_event_signaller();
let signaller_ref: &crate::fd_monitor::FdEventSignaller = signaller.as_ref().unwrap();
let result = std::mem::transmute(signaller_ref);
std::mem::forget(signaller);
result
});
static NOTIFY_SIGNALLER: once_cell::sync::Lazy<crate::fd_monitor::FdEventSignaller> =
once_cell::sync::Lazy::new(crate::fd_monitor::FdEventSignaller::new);
#[cxx::bridge]
mod ffi {
@@ -557,7 +549,7 @@ pub fn iothread_perform_cant_wait(f: impl FnOnce() + 'static + Send) {
}
pub fn iothread_port() -> i32 {
i32::from(NOTIFY_SIGNALLER.read_fd())
NOTIFY_SIGNALLER.read_fd()
}
pub fn iothread_service_main_with_timeout(timeout: Duration) {