diff --git a/src/exec.rs b/src/exec.rs index dc1bd092a..f0de731f6 100644 --- a/src/exec.rs +++ b/src/exec.rs @@ -14,7 +14,7 @@ use crate::env::{EnvMode, EnvStack, Environment, Statuses, READ_BYTE_LIMIT}; use crate::env_dispatch::use_posix_spawn; use crate::fds::make_fd_blocking; -use crate::fds::{make_autoclose_pipes, open_cloexec, AutoCloseFd, AutoClosePipes, PIPE_ERROR}; +use crate::fds::{make_autoclose_pipes, open_cloexec, PIPE_ERROR}; use crate::flog::FLOGF; use crate::fork_exec::blocked_signals_for_job; use crate::fork_exec::postfork::{ @@ -57,7 +57,7 @@ use nix::sys::stat; use std::ffi::CStr; use std::io::{Read, Write}; -use std::os::fd::RawFd; +use std::os::fd::{AsRawFd, OwnedFd, RawFd}; use std::slice; use std::sync::atomic::Ordering; use std::sync::{atomic::AtomicUsize, Arc}; @@ -106,7 +106,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool { let _timer = push_timer(job.wants_timing() && !no_exec()); // Get the deferred process, if any. We will have to remember its pipes. - let mut deferred_pipes = AutoClosePipes::default(); + let mut deferred_pipes = PartialPipes::default(); let deferred_process = get_deferred_process(job); // We may want to transfer tty ownership to the pgroup leader. @@ -125,7 +125,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool { // // Lastly, a process may experience a pipeline-aborting error, which prevents launching // further processes in the pipeline. - let mut pipe_next_read = AutoCloseFd::empty(); + let mut pipe_next_read: Option = None; let mut aborted_pipeline = false; let mut procs_launched = 0; for i in 0..job.processes().len() { @@ -133,7 +133,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool { // proc_pipes is the pipes applied to this process. That is, it is the read end // containing the output of the previous process (if any), plus the write end that will // output to the next process (if any). - let mut proc_pipes = AutoClosePipes::default(); + let mut proc_pipes = PartialPipes::default(); std::mem::swap(&mut proc_pipes.read, &mut pipe_next_read); if !p.is_last_in_job { let Some(pipes) = make_autoclose_pipes() else { @@ -143,8 +143,8 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool { abort_pipeline_from(job, i); break; }; - pipe_next_read = pipes.read; - proc_pipes.write = pipes.write; + pipe_next_read = Some(pipes.read); + proc_pipes.write = Some(pipes.write); // Save any deferred process for last. By definition, the deferred process can // never be the last process in the job, so it's safe to nest this in the outer @@ -179,7 +179,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool { transfer.to_job_group(job.group.as_ref().unwrap()); } } - pipe_next_read.close(); + drop(pipe_next_read); // If our pipeline was aborted before any process was successfully launched, then there is // nothing to reap, and we can perform an early return. @@ -202,7 +202,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool { job, block_io, deferred_pipes, - &AutoClosePipes::default(), + &PartialPipes::default(), true, ) .is_err() @@ -1181,6 +1181,14 @@ fn exec_builtin_process( Ok(()) } +#[derive(Default)] +struct PartialPipes { + /// Read end of the pipe. + read: Option, + /// Write end of the pipe. + write: Option, +} + /// Executes a process \p \p in \p job, using the pipes \p pipes (which may have invalid fds if this /// is the first or last process). /// \p deferred_pipes represents the pipes from our deferred process; if set ensure they get closed @@ -1193,8 +1201,8 @@ fn exec_process_in_job( p: &Process, j: &Job, block_io: IoChain, - pipes: AutoClosePipes, - deferred_pipes: &AutoClosePipes, + pipes: PartialPipes, + deferred_pipes: &PartialPipes, is_deferred_run: bool, ) -> LaunchResult { // The write pipe (destined for stdout) needs to occur before redirections. For example, @@ -1232,11 +1240,11 @@ fn exec_process_in_job( // The IO chain for this process. let mut process_net_io_chain = block_io; - if pipes.write.is_valid() { + if let Some(fd) = pipes.write { process_net_io_chain.push(Arc::new(IoPipe::new( p.pipe_write_fd, false, /* not input */ - pipes.write, + fd, ))); } @@ -1249,16 +1257,17 @@ fn exec_process_in_job( } // Read pipe goes last. - if pipes.read.is_valid() { - let pipe_read = Arc::new(IoPipe::new(STDIN_FILENO, true /* input */, pipes.read)); + if let Some(fd) = pipes.read { + let pipe_read = Arc::new(IoPipe::new(STDIN_FILENO, true /* input */, fd)); process_net_io_chain.push(pipe_read); } // If we have stashed pipes, make sure those get closed in the child. - for afd in [&deferred_pipes.read, &deferred_pipes.write] { - if afd.is_valid() { - process_net_io_chain.push(Arc::new(IoClose::new(afd.fd()))); - } + for afd in [&deferred_pipes.read, &deferred_pipes.write] + .into_iter() + .flatten() + { + process_net_io_chain.push(Arc::new(IoClose::new(afd.as_raw_fd()))); } if p.typ != ProcessType::block_node { diff --git a/src/fd_monitor.rs b/src/fd_monitor.rs index fe5a35861..eba24e5c1 100644 --- a/src/fd_monitor.rs +++ b/src/fd_monitor.rs @@ -38,9 +38,9 @@ pub enum ItemWakeReason { /// Importantly this is async signal safe. Of course it is `CLO_EXEC` as well. pub struct FdEventSignaller { // Always the read end of the fd; maybe the write end as well. - fd: AutoCloseFd, + fd: OwnedFd, #[cfg(not(HAVE_EVENTFD))] - write: AutoCloseFd, + write: OwnedFd, } impl FdEventSignaller { @@ -56,7 +56,7 @@ pub fn new() -> Self { exit_without_destructors(1); } Self { - fd: AutoCloseFd::new(fd), + fd: unsafe { OwnedFd::from_raw_fd(fd) }, } } #[cfg(not(HAVE_EVENTFD))] @@ -66,8 +66,8 @@ pub fn new() -> Self { perror("pipe"); exit_without_destructors(1); }; - make_fd_nonblocking(pipes.read.fd()).unwrap(); - make_fd_nonblocking(pipes.write.fd()).unwrap(); + make_fd_nonblocking(pipes.read.as_raw_fd()).unwrap(); + make_fd_nonblocking(pipes.write.as_raw_fd()).unwrap(); Self { fd: pipes.read, write: pipes.write, @@ -77,7 +77,7 @@ pub fn new() -> Self { /// \return the fd to read from, for notification. pub fn read_fd(&self) -> RawFd { - self.fd.fd() + self.fd.as_raw_fd() } /// If an event is signalled, consume it; otherwise return. @@ -169,9 +169,9 @@ pub fn poll(&self, wait: bool /* = false */) -> bool { /// \return the fd to write to. fn write_fd(&self) -> RawFd { #[cfg(HAVE_EVENTFD)] - return self.fd.fd(); + return self.fd.as_raw_fd(); #[cfg(not(HAVE_EVENTFD))] - return self.write.fd(); + return self.write.as_raw_fd(); } } diff --git a/src/fds.rs b/src/fds.rs index 8e26e4556..a8e976ec1 100644 --- a/src/fds.rs +++ b/src/fds.rs @@ -120,13 +120,12 @@ fn drop(&mut self) { } /// Helper type returned from make_autoclose_pipes. -#[derive(Default)] pub struct AutoClosePipes { /// Read end of the pipe. - pub read: AutoCloseFd, + pub read: OwnedFd, /// Write end of the pipe. - pub write: AutoCloseFd, + pub write: OwnedFd, } /// Construct a pair of connected pipes, set to close-on-exec. @@ -152,19 +151,12 @@ pub fn make_autoclose_pipes() -> Option { return None; } - let readp = AutoCloseFd::new(pipes[0]); - let writep = AutoCloseFd::new(pipes[1]); + let readp = unsafe { OwnedFd::from_raw_fd(pipes[0]) }; + let writep = unsafe { OwnedFd::from_raw_fd(pipes[1]) }; // Ensure our fds are out of the user range. - let readp = heightenize_fd(readp, already_cloexec); - if !readp.is_valid() { - return None; - } - - let writep = heightenize_fd(writep, already_cloexec); - if !writep.is_valid() { - return None; - } + let readp = heightenize_fd(readp, already_cloexec)?; + let writep = heightenize_fd(writep, already_cloexec)?; Some(AutoClosePipes { read: readp, @@ -178,24 +170,23 @@ pub fn make_autoclose_pipes() -> Option { /// setting it again. /// \return the fd, which always has CLOEXEC set; or an invalid fd on failure, in /// which case an error will have been printed, and the input fd closed. -fn heightenize_fd(fd: AutoCloseFd, input_has_cloexec: bool) -> AutoCloseFd { - // Check if the fd is invalid or already in our high range. - if !fd.is_valid() { - return fd; - } - if fd.fd() >= FIRST_HIGH_FD { +fn heightenize_fd(fd: OwnedFd, input_has_cloexec: bool) -> Option { + let raw_fd = fd.as_raw_fd(); + + if raw_fd >= FIRST_HIGH_FD { if !input_has_cloexec { - set_cloexec(fd.fd(), true); + set_cloexec(raw_fd, true); } - return fd; + return Some(fd); } // Here we are asking the kernel to give us a cloexec fd. - let newfd = unsafe { libc::fcntl(fd.fd(), F_DUPFD_CLOEXEC, FIRST_HIGH_FD) }; + let newfd = unsafe { libc::fcntl(raw_fd, F_DUPFD_CLOEXEC, FIRST_HIGH_FD) }; if newfd < 0 { perror("fcntl"); - return AutoCloseFd::default(); + return None; } - return AutoCloseFd::new(newfd); + + Some(unsafe { OwnedFd::from_raw_fd(newfd) }) } /// Sets CLO_EXEC on a given fd according to the value of \p should_set. @@ -306,7 +297,8 @@ fn test_pipes() { } } for pipe in pipes { - for fd in [pipe.read.fd(), pipe.write.fd()] { + for fd in [&pipe.read, &pipe.write] { + let fd = fd.as_raw_fd(); assert!(fd >= FIRST_HIGH_FD); let flags = unsafe { libc::fcntl(fd, F_GETFD, 0) }; assert!(flags >= 0); diff --git a/src/io.rs b/src/io.rs index b8f4d0dc6..85a62caa7 100644 --- a/src/io.rs +++ b/src/io.rs @@ -21,7 +21,7 @@ use nix::fcntl::OFlag; use nix::sys::stat::Mode; use std::cell::{RefCell, UnsafeCell}; -use std::os::fd::{AsRawFd, OwnedFd, RawFd}; +use std::os::fd::{AsRawFd, IntoRawFd, OwnedFd, RawFd}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Condvar, Mutex, MutexGuard}; @@ -296,13 +296,12 @@ fn as_ptr(&self) -> *const () { pub struct IoPipe { fd: RawFd, // The pipe's fd. Conceptually this is dup2'd to io_data_t::fd. - pipe_fd: AutoCloseFd, + pipe_fd: OwnedFd, /// Whether this is an input pipe. This is used only for informational purposes. is_input: bool, } impl IoPipe { - pub fn new(fd: RawFd, is_input: bool, pipe_fd: AutoCloseFd) -> Self { - assert!(pipe_fd.is_valid(), "Pipe is not valid"); + pub fn new(fd: RawFd, is_input: bool, pipe_fd: OwnedFd) -> Self { IoPipe { fd, pipe_fd, @@ -318,7 +317,7 @@ fn fd(&self) -> RawFd { self.fd } fn source_fd(&self) -> RawFd { - self.pipe_fd.fd() + self.pipe_fd.as_raw_fd() } fn print(&self) { eprintf!( @@ -338,7 +337,7 @@ pub struct IoBufferfill { target: RawFd, /// Write end. The other end is connected to an io_buffer_t. - write_fd: AutoCloseFd, + write_fd: OwnedFd, /// The receiving buffer. buffer: Arc, @@ -361,7 +360,7 @@ pub fn create_opts(buffer_limit: usize, target: RawFd) -> Option (), Err(e) => { FLOG!(warning, PIPE_ERROR); @@ -372,7 +371,6 @@ pub fn create_opts(buffer_limit: usize, target: RawFd) -> Option RawFd { self.target } fn source_fd(&self) -> RawFd { - self.write_fd.fd() + self.write_fd.as_raw_fd() } fn print(&self) { - eprintf!("bufferfill %d -> %d\n", self.write_fd.fd(), self.fd()) + eprintf!( + "bufferfill %d -> %d\n", + self.write_fd.as_raw_fd(), + self.fd() + ) } fn as_ptr(&self) -> *const () { (self as *const Self).cast() @@ -533,7 +535,7 @@ pub fn fillthread_running(&self) -> bool { } /// Begin the fill operation, reading from the given fd in the background. -fn begin_filling(iobuffer: &Arc, fd: AutoCloseFd) { +fn begin_filling(iobuffer: &Arc, fd: OwnedFd) { assert!(!iobuffer.fillthread_running(), "Already have a fillthread"); // We want to fill buffer_ by reading from fd. fd is the read end of a pipe; the write end is @@ -570,14 +572,14 @@ fn begin_filling(iobuffer: &Arc, fd: AutoCloseFd) { if reason == ItemWakeReason::Readable { // select() reported us as readable; read a bit. let mut buf = iobuffer.buffer.lock().unwrap(); - let ret = IoBuffer::read_once(fd.fd(), &mut buf); + let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf); done = ret == 0 || (ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno::errno().0)); } else if iobuffer.shutdown_fillthread.load() { // Here our caller asked us to shut down; read while we keep getting data. // This will stop when the fd is closed or if we get EAGAIN. let mut buf = iobuffer.buffer.lock().unwrap(); loop { - let ret = IoBuffer::read_once(fd.fd(), &mut buf); + let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf); if ret <= 0 { break; } @@ -599,6 +601,7 @@ fn begin_filling(iobuffer: &Arc, fd: AutoCloseFd) { }) }; + let fd = AutoCloseFd::new(fd.into_raw_fd()); let item_id = fd_monitor().add(FdMonitorItem::new(fd, None, item_callback)); iobuffer.item_id.store(u64::from(item_id), Ordering::SeqCst); } diff --git a/src/tests/fd_monitor.rs b/src/tests/fd_monitor.rs index ad065b49a..9231658d9 100644 --- a/src/tests/fd_monitor.rs +++ b/src/tests/fd_monitor.rs @@ -1,5 +1,6 @@ +use std::fs::File; use std::io::Write; -use std::os::fd::AsRawFd; +use std::os::fd::{AsRawFd, IntoRawFd}; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -21,7 +22,7 @@ struct ItemMaker { pub total_calls: AtomicUsize, item_id: AtomicU64, pub always_exit: bool, - pub writer: Mutex, + pub writer: Mutex>, } impl ItemMaker { @@ -43,7 +44,7 @@ pub fn insert_new_into2( total_calls: 0.into(), item_id: 0.into(), always_exit: false, - writer: Mutex::new(pipes.write), + writer: Mutex::new(Some(File::from(pipes.write))), }; config(&mut result); @@ -53,7 +54,8 @@ pub fn insert_new_into2( let result = Arc::clone(&result); move |fd: &mut AutoCloseFd, reason: ItemWakeReason| result.callback(fd, reason) }; - let item = FdMonitorItem::new(pipes.read, timeout, Box::new(callback)); + let fd = AutoCloseFd::new(pipes.read.into_raw_fd()); + let item = FdMonitorItem::new(fd, timeout, Box::new(callback)); let item_id = monitor.add(item); result.item_id.store(u64::from(item_id), Ordering::Relaxed); @@ -97,6 +99,8 @@ fn write42(&self) { let buf = [0u8; 42]; let mut writer = self.writer.lock().expect("Mutex poisoned!"); writer + .as_mut() + .unwrap() .write_all(&buf) .expect("Error writing 42 bytes to pipe!"); } @@ -137,11 +141,7 @@ fn fd_monitor_items() { item42_timeout.write42(); item42_no_timeout.write42(); item42_then_close.write42(); - item42_then_close - .writer - .lock() - .expect("Mutex poisoned!") - .close(); + *item42_then_close.writer.lock().expect("Mutex poisoned!") = None; item_oneshot.write42(); monitor.poke_item(item_pokee.item_id()); diff --git a/src/topic_monitor.rs b/src/topic_monitor.rs index 95bb17708..c06234688 100644 --- a/src/topic_monitor.rs +++ b/src/topic_monitor.rs @@ -28,6 +28,7 @@ use nix::errno::Errno; use nix::unistd; use std::cell::{Cell, UnsafeCell}; +use std::os::fd::AsRawFd; use std::pin::Pin; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::{Condvar, Mutex, MutexGuard}; @@ -187,7 +188,7 @@ pub fn new() -> binary_semaphore_t { // we'll never receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as // non-blocking (so reads will never block) and use select() to poll it. if cfg!(feature = "FISH_TSAN_WORKAROUNDS") { - let _ = make_fd_nonblocking(pipes.read.fd()); + let _ = make_fd_nonblocking(pipes.read.as_raw_fd()); } Self::Pipes(pipes) @@ -207,7 +208,7 @@ pub fn post(&self) { Self::Pipes(pipes) => { // Write exactly one byte. loop { - match unistd::write(pipes.write.fd(), &[0]) { + match unistd::write(pipes.write.as_raw_fd(), &[0]) { Err(Errno::EINTR) => continue, Err(_) => self.die("write"), Ok(_) => break, @@ -232,7 +233,7 @@ pub fn wait(&self) { } } Self::Pipes(pipes) => { - let fd = pipes.read.fd(); + let fd = pipes.read.as_raw_fd(); // We must read exactly one byte. loop { // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read()