Use Option<OwnedFd> instead of AutoCloseFd

Closes #12199
This commit is contained in:
xtqqczze
2025-12-23 19:44:14 +00:00
committed by Johannes Altmanninger
parent 1cf4e191b3
commit 771b33b3a3
5 changed files with 65 additions and 152 deletions

View File

@@ -11,7 +11,6 @@
use crate::common::exit_without_destructors;
use crate::fd_readable_set::{FdReadableSet, Timeout};
use crate::fds::AutoCloseFd;
use crate::flog::flog;
use crate::threads::assert_is_background_thread;
use crate::wutil::perror;
@@ -185,13 +184,13 @@ fn from(value: u64) -> Self {
/// The callback type used by [`FdMonitorItem`]. It is passed a mutable reference to the
/// `FdMonitorItem`'s [`FdMonitorItem::fd`]. If the fd is closed, the callback will not
/// be invoked again.
pub type Callback = Box<dyn Fn(&mut AutoCloseFd) + Send + Sync>;
pub type Callback = Box<dyn Fn(&mut Option<OwnedFd>) + Send + Sync>;
/// An item containing an fd and callback, which can be monitored to watch when it becomes readable
/// and invoke the callback.
pub struct FdMonitorItem {
/// The fd to monitor
fd: AutoCloseFd,
fd: Option<OwnedFd>,
/// A callback to be invoked when the fd is readable, or for another reason given by the wake reason.
/// If the fd is invalid on return from the function, then the item is removed from the [`FdMonitor`] set.
callback: Callback,
@@ -249,12 +248,13 @@ struct BackgroundFdMonitor {
impl FdMonitor {
/// Add an item to the monitor. Returns the [`FdMonitorItemId`] assigned to the item.
pub fn add(&self, fd: AutoCloseFd, callback: Callback) -> FdMonitorItemId {
assert!(fd.is_valid());
pub fn add(&self, fd: OwnedFd, callback: Callback) -> FdMonitorItemId {
let item_id = self.last_id.fetch_add(1, Ordering::Relaxed) + 1;
let item_id = FdMonitorItemId(item_id);
let item: FdMonitorItem = FdMonitorItem { fd, callback };
let item: FdMonitorItem = FdMonitorItem {
fd: Some(fd),
callback,
};
let start_thread = {
// Lock around a local region
let mut data = self.data.lock().expect("Mutex poisoned!");
@@ -286,16 +286,18 @@ pub fn add(&self, fd: AutoCloseFd, callback: Callback) -> FdMonitorItemId {
item_id
}
pub fn with_fd(&self, item_id: FdMonitorItemId, cb: impl FnOnce(&AutoCloseFd)) {
pub fn with_fd(&self, item_id: FdMonitorItemId, cb: impl FnOnce(BorrowedFd)) {
let data = self.data.lock().expect("Mutex poisoned!");
cb(&data.items.get(&item_id).unwrap().fd);
if let Some(fd) = &data.items.get(&item_id).unwrap().fd {
cb(fd.as_fd());
}
}
/// Remove an item from the monitor and return its file descriptor.
/// Note we may remove an item whose fd is currently being waited on in select(); this is
/// considered benign because the underlying item will no longer be present and so its
/// callback will not be invoked.
pub fn remove_item(&self, item_id: FdMonitorItemId) -> AutoCloseFd {
pub fn remove_item(&self, item_id: FdMonitorItemId) -> Option<OwnedFd> {
assert!(item_id.0 > 0, "Invalid item id!");
let mut data = self.data.lock().expect("Mutex poisoned!");
let removed = data.items.remove(&item_id).expect("Item ID not found");
@@ -374,9 +376,8 @@ fn run(self) {
item_ids.clear();
item_ids.reserve(data.items.len());
for (item_id, item) in &data.items {
let fd = item.fd.as_raw_fd();
if fd >= 0 {
fds.add(fd);
if let Some(fd) = &item.fd {
fds.add(fd.as_raw_fd());
item_ids.push(*item_id);
}
}
@@ -423,7 +424,7 @@ fn run(self) {
// Note there is no risk of an ABA problem because ItemIDs are never recycled.
continue;
};
if fds.test(item.fd.as_raw_fd()) {
if item.fd.as_ref().is_some_and(|fd| fds.test(fd.as_raw_fd())) {
item.service();
}
}
@@ -472,7 +473,7 @@ mod tests {
use portable_atomic::AtomicU64;
use std::fs::File;
use std::io::Write;
use std::os::fd::{AsRawFd, IntoRawFd, OwnedFd};
use std::os::fd::{AsRawFd, OwnedFd};
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -484,7 +485,7 @@ mod tests {
use crate::fd_monitor::{FdEventSignaller, FdMonitor};
use crate::fd_readable_set::{FdReadableSet, Timeout};
use crate::fds::{AutoCloseFd, AutoClosePipes, make_autoclose_pipes};
use crate::fds::{AutoClosePipes, make_autoclose_pipes};
use crate::tests::prelude::*;
/// Helper to make an item which counts how many times its callback was invoked.
@@ -520,25 +521,25 @@ pub fn insert_new_into2<F: Fn(&mut Self)>(monitor: &FdMonitor, config: F) -> Arc
let result = Arc::new(result);
let callback = {
let result = Arc::clone(&result);
move |fd: &mut AutoCloseFd| result.callback(fd)
move |fd: &mut Option<OwnedFd>| result.callback(fd)
};
let fd = AutoCloseFd::new(pipes.read.into_raw_fd());
let fd = pipes.read;
let item_id = monitor.add(fd, Box::new(callback));
result.item_id.store(u64::from(item_id), Ordering::Relaxed);
result
}
fn callback(&self, fd: &mut AutoCloseFd) {
fn callback(&self, fd: &mut Option<OwnedFd>) {
let mut buf = [0u8; 1024];
let res = nix::unistd::read(&fd, &mut buf);
let res = nix::unistd::read(fd.as_ref().unwrap(), &mut buf);
let amt = res.expect("read error!");
self.length_read.fetch_add(amt, Ordering::Relaxed);
let was_closed = amt == 0;
self.total_calls.fetch_add(1, Ordering::Relaxed);
if was_closed || self.always_close {
fd.close();
drop(fd.take());
}
}

View File

@@ -6,10 +6,10 @@
use cfg_if::cfg_if;
use libc::{EINTR, F_GETFD, F_GETFL, F_SETFD, F_SETFL, FD_CLOEXEC, O_NONBLOCK, c_int};
use nix::fcntl::FcntlArg;
use nix::{fcntl::OFlag, unistd};
use nix::fcntl::OFlag;
use std::ffi::CStr;
use std::fs::File;
use std::io::{self, Read, Write};
use std::io;
use std::os::unix::prelude::*;
localizable_consts!(
@@ -24,105 +24,6 @@
/// A sentinel value indicating no timeout.
pub const NO_TIMEOUT: u64 = u64::MAX;
/// A helper type for managing and automatically closing a file descriptor.
/// Importantly this supports an invalid state with an fd of -1.
pub struct AutoCloseFd {
fd_: RawFd,
}
impl Read for AutoCloseFd {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
nix::unistd::read(self, buf).map_err(std::io::Error::from)
}
}
impl Write for AutoCloseFd {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
nix::unistd::write(self, buf).map_err(std::io::Error::from)
}
fn flush(&mut self) -> std::io::Result<()> {
// We don't buffer anything so this is a no-op.
Ok(())
}
}
impl AutoCloseFd {
// Closes the fd if not already closed.
pub fn close(&mut self) {
if self.fd_ != -1 {
_ = unistd::close(self.fd_);
self.fd_ = -1;
}
}
// Returns the fd.
pub fn fd(&self) -> RawFd {
self.fd_
}
// Returns the fd, transferring ownership to the caller.
pub fn acquire(&mut self) -> RawFd {
let temp = self.fd_;
self.fd_ = -1;
temp
}
// Resets to a new fd, taking ownership.
pub fn reset(&mut self, fd: RawFd) {
if fd == self.fd_ {
return;
}
self.close();
self.fd_ = fd;
}
// Returns if this has a valid fd.
pub fn is_valid(&self) -> bool {
self.fd_ >= 0
}
// Create a new AutoCloseFd instance taking ownership of the passed fd
pub fn new(fd: RawFd) -> Self {
AutoCloseFd { fd_: fd }
}
// Create a new AutoCloseFd without an open fd
pub fn empty() -> Self {
AutoCloseFd { fd_: -1 }
}
}
impl FromRawFd for AutoCloseFd {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
AutoCloseFd { fd_: fd }
}
}
impl AsRawFd for AutoCloseFd {
fn as_raw_fd(&self) -> RawFd {
self.fd()
}
}
impl AsFd for AutoCloseFd {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.fd()) }
}
}
impl Default for AutoCloseFd {
fn default() -> AutoCloseFd {
AutoCloseFd { fd_: -1 }
}
}
impl Drop for AutoCloseFd {
fn drop(&mut self) {
self.close()
}
}
/// Helper type returned from make_autoclose_pipes.
pub struct AutoClosePipes {
/// Read end of the pipe.

View File

@@ -1,9 +1,7 @@
use crate::builtins::shared::{STATUS_CMD_ERROR, STATUS_CMD_OK, STATUS_READ_TOO_MUCH};
use crate::common::{EMPTY_STRING, bytes2wcstring, wcs2bytes};
use crate::fd_monitor::{Callback, FdMonitor, FdMonitorItemId};
use crate::fds::{
AutoCloseFd, PIPE_ERROR, make_autoclose_pipes, make_fd_nonblocking, wopen_cloexec,
};
use crate::fds::{PIPE_ERROR, make_autoclose_pipes, make_fd_nonblocking, wopen_cloexec};
use crate::flog::{flog, flogf, should_flog};
use crate::nix::isatty;
use crate::path::path_apply_working_directory;
@@ -21,7 +19,7 @@
use once_cell::sync::Lazy;
use std::fs::File;
use std::io;
use std::os::fd::{AsRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
use std::sync::{Arc, Mutex, MutexGuard};
/// separated_buffer_t represents a buffer of output from commands, prepared to be turned into a
@@ -439,28 +437,31 @@ pub fn read_once(fd: RawFd, buffer: &mut MutexGuard<'_, SeparatedBuffer>) -> isi
amt
}
pub fn read_all_available(&self, fd: &AutoCloseFd) {
pub fn read_all_available(&self, fd: BorrowedFd) {
let mut locked_buff = self.0.lock().unwrap();
self.do_read_all_available(fd, &mut locked_buff);
}
fn do_read_all_available(
&self,
fd: &AutoCloseFd,
fd: BorrowedFd,
locked_buff: &mut MutexGuard<'_, SeparatedBuffer>,
) {
// Read any remaining data from the pipe.
while fd.is_valid() && IoBuffer::read_once(fd.as_raw_fd(), &mut *locked_buff) > 0 {
while IoBuffer::read_once(fd.as_raw_fd(), &mut *locked_buff) > 0 {
// pass
}
}
/// End the background fillthread operation, and return the buffer, transferring ownership.
/// The read end of the pipe is provided.
pub fn complete_and_take_buffer(&self, fd: AutoCloseFd) -> SeparatedBuffer {
pub fn complete_and_take_buffer(&self, fd: Option<OwnedFd>) -> SeparatedBuffer {
// Read any remaining data from the pipe.
let mut locked_buff = self.0.lock().unwrap();
self.do_read_all_available(&fd, &mut locked_buff);
if let Some(fd) = fd {
self.do_read_all_available(fd.as_fd(), &mut locked_buff);
}
// Return our buffer, transferring ownership.
let mut result = SeparatedBuffer::new(locked_buff.limit());
@@ -487,17 +488,15 @@ fn begin_filling(iobuffer: IoBuffer, fd: OwnedFd) -> FdMonitorItemId {
// In this case, when complete_background_fillthread() is called, we grab the file descriptor
// and read until we get EAGAIN and then give up.
// Run our function to read until the receiver is closed.
let item_callback: Callback = Box::new(move |fd: &mut AutoCloseFd| {
assert!(fd.as_raw_fd() >= 0, "Invalid fd");
let item_callback: Callback = Box::new(move |fd: &mut Option<OwnedFd>| {
let mut buf = iobuffer.0.lock().unwrap();
let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf);
let ret = IoBuffer::read_once(fd.as_ref().unwrap().as_raw_fd(), &mut buf);
if ret == 0 || (ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno::errno().0)) {
// Either it's finished or some other error - we're done.
fd.close();
drop(fd.take());
}
});
let fd = AutoCloseFd::new(fd.into_raw_fd());
fd_monitor().add(fd, item_callback)
}

View File

@@ -36,6 +36,8 @@
use std::ops::ControlFlow;
use std::ops::Range;
use std::os::fd::BorrowedFd;
use std::os::fd::FromRawFd;
use std::os::fd::OwnedFd;
use std::os::fd::{AsRawFd, RawFd};
use std::pin::Pin;
#[cfg(target_has_atomic = "64")]
@@ -72,7 +74,7 @@
use crate::expand::expand_one;
use crate::expand::{ExpandFlags, ExpandResultCode, expand_string, expand_tilde};
use crate::fd_readable_set::poll_fd_readable;
use crate::fds::{AutoCloseFd, make_fd_blocking, wopen_cloexec};
use crate::fds::{make_fd_blocking, wopen_cloexec};
use crate::flog::{flog, flogf};
use crate::future_feature_flags::{self, FeatureFlag};
use crate::global_safety::RelaxedAtomicBool;
@@ -5948,14 +5950,19 @@ fn check_for_orphaned_process(loop_count: usize, shell_pgid: libc::pid_t) -> boo
}
// Open the tty. Presumably this is stdin, but maybe not?
let tty_fd = AutoCloseFd::new(unsafe { libc::open(tty, O_RDONLY | O_NONBLOCK) });
if !tty_fd.is_valid() {
perror("open");
exit_without_destructors(1);
}
let tty_fd = {
let res = unsafe { libc::open(tty, O_RDONLY | O_NONBLOCK) };
if res < 0 {
perror("open");
exit_without_destructors(1);
}
unsafe { OwnedFd::from_raw_fd(res) }
};
let mut tmp = 0 as libc::c_char;
if unsafe { libc::read(tty_fd.fd(), (&raw mut tmp).cast(), 1) } < 0 && errno().0 == EIO {
if unsafe { libc::read(tty_fd.as_raw_fd(), (&raw mut tmp).cast(), 1) } < 0
&& errno().0 == EIO
{
we_think_we_are_orphaned = true;
}
}

View File

@@ -472,12 +472,15 @@ mod tests {
normalize_path, unescape_bytes_and_write_to_fd, wbasename, wdirname, wstr_offset_in,
};
use crate::common::bytes2wcstring;
use crate::fds::AutoCloseFd;
use crate::prelude::*;
use crate::tests::prelude::*;
use libc::{O_CREAT, O_RDWR, O_TRUNC, SEEK_SET};
use rand::Rng;
use std::{ffi::CString, ptr};
use std::{
ffi::CString,
os::fd::{AsRawFd, FromRawFd, OwnedFd},
ptr,
};
mod test_path_normalize_for_cd {
use super::super::path_normalize_for_cd;
@@ -658,24 +661,26 @@ fn test_wwrite_to_fd() {
let mut rng = rand::rng();
let sizes = [1, 2, 3, 5, 13, 23, 64, 128, 255, 4096, 4096 * 2];
for &size in &sizes {
let fd = AutoCloseFd::new(unsafe {
libc::open(filename.as_ptr(), O_RDWR | O_TRUNC | O_CREAT, 0o666)
});
assert!(fd.is_valid());
let fd = unsafe {
let res = libc::open(filename.as_ptr(), O_RDWR | O_TRUNC | O_CREAT, 0o666);
assert!(res != 0);
OwnedFd::from_raw_fd(res)
};
let mut input = Vec::new();
for _i in 0..size {
input.push(rng.random());
}
let amt = unescape_bytes_and_write_to_fd(&bytes2wcstring(&input), fd.fd()).unwrap();
let amt =
unescape_bytes_and_write_to_fd(&bytes2wcstring(&input), fd.as_raw_fd()).unwrap();
assert_eq!(amt, input.len());
assert!(unsafe { libc::lseek(fd.fd(), 0, SEEK_SET) } >= 0);
assert!(unsafe { libc::lseek(fd.as_raw_fd(), 0, SEEK_SET) } >= 0);
let mut contents = vec![0u8; input.len()];
let read_amt = unsafe {
libc::read(
fd.fd(),
fd.as_raw_fd(),
if size == 0 {
ptr::null_mut()
} else {