Use OwnedFd in AutoClosePipes

This commit is contained in:
PolyMeilex
2024-01-28 19:07:06 +01:00
committed by ridiculousfish
parent 59fe52851e
commit 971d774e67
6 changed files with 83 additions and 78 deletions

View File

@@ -14,7 +14,7 @@
use crate::env::{EnvMode, EnvStack, Environment, Statuses, READ_BYTE_LIMIT};
use crate::env_dispatch::use_posix_spawn;
use crate::fds::make_fd_blocking;
use crate::fds::{make_autoclose_pipes, open_cloexec, AutoCloseFd, AutoClosePipes, PIPE_ERROR};
use crate::fds::{make_autoclose_pipes, open_cloexec, PIPE_ERROR};
use crate::flog::FLOGF;
use crate::fork_exec::blocked_signals_for_job;
use crate::fork_exec::postfork::{
@@ -57,7 +57,7 @@
use nix::sys::stat;
use std::ffi::CStr;
use std::io::{Read, Write};
use std::os::fd::RawFd;
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::slice;
use std::sync::atomic::Ordering;
use std::sync::{atomic::AtomicUsize, Arc};
@@ -106,7 +106,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool {
let _timer = push_timer(job.wants_timing() && !no_exec());
// Get the deferred process, if any. We will have to remember its pipes.
let mut deferred_pipes = AutoClosePipes::default();
let mut deferred_pipes = PartialPipes::default();
let deferred_process = get_deferred_process(job);
// We may want to transfer tty ownership to the pgroup leader.
@@ -125,7 +125,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool {
//
// Lastly, a process may experience a pipeline-aborting error, which prevents launching
// further processes in the pipeline.
let mut pipe_next_read = AutoCloseFd::empty();
let mut pipe_next_read: Option<OwnedFd> = None;
let mut aborted_pipeline = false;
let mut procs_launched = 0;
for i in 0..job.processes().len() {
@@ -133,7 +133,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool {
// proc_pipes is the pipes applied to this process. That is, it is the read end
// containing the output of the previous process (if any), plus the write end that will
// output to the next process (if any).
let mut proc_pipes = AutoClosePipes::default();
let mut proc_pipes = PartialPipes::default();
std::mem::swap(&mut proc_pipes.read, &mut pipe_next_read);
if !p.is_last_in_job {
let Some(pipes) = make_autoclose_pipes() else {
@@ -143,8 +143,8 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool {
abort_pipeline_from(job, i);
break;
};
pipe_next_read = pipes.read;
proc_pipes.write = pipes.write;
pipe_next_read = Some(pipes.read);
proc_pipes.write = Some(pipes.write);
// Save any deferred process for last. By definition, the deferred process can
// never be the last process in the job, so it's safe to nest this in the outer
@@ -179,7 +179,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool {
transfer.to_job_group(job.group.as_ref().unwrap());
}
}
pipe_next_read.close();
drop(pipe_next_read);
// If our pipeline was aborted before any process was successfully launched, then there is
// nothing to reap, and we can perform an early return.
@@ -202,7 +202,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool {
job,
block_io,
deferred_pipes,
&AutoClosePipes::default(),
&PartialPipes::default(),
true,
)
.is_err()
@@ -1181,6 +1181,14 @@ fn exec_builtin_process(
Ok(())
}
#[derive(Default)]
struct PartialPipes {
/// Read end of the pipe.
read: Option<OwnedFd>,
/// Write end of the pipe.
write: Option<OwnedFd>,
}
/// Executes a process \p \p in \p job, using the pipes \p pipes (which may have invalid fds if this
/// is the first or last process).
/// \p deferred_pipes represents the pipes from our deferred process; if set ensure they get closed
@@ -1193,8 +1201,8 @@ fn exec_process_in_job(
p: &Process,
j: &Job,
block_io: IoChain,
pipes: AutoClosePipes,
deferred_pipes: &AutoClosePipes,
pipes: PartialPipes,
deferred_pipes: &PartialPipes,
is_deferred_run: bool,
) -> LaunchResult {
// The write pipe (destined for stdout) needs to occur before redirections. For example,
@@ -1232,11 +1240,11 @@ fn exec_process_in_job(
// The IO chain for this process.
let mut process_net_io_chain = block_io;
if pipes.write.is_valid() {
if let Some(fd) = pipes.write {
process_net_io_chain.push(Arc::new(IoPipe::new(
p.pipe_write_fd,
false, /* not input */
pipes.write,
fd,
)));
}
@@ -1249,16 +1257,17 @@ fn exec_process_in_job(
}
// Read pipe goes last.
if pipes.read.is_valid() {
let pipe_read = Arc::new(IoPipe::new(STDIN_FILENO, true /* input */, pipes.read));
if let Some(fd) = pipes.read {
let pipe_read = Arc::new(IoPipe::new(STDIN_FILENO, true /* input */, fd));
process_net_io_chain.push(pipe_read);
}
// If we have stashed pipes, make sure those get closed in the child.
for afd in [&deferred_pipes.read, &deferred_pipes.write] {
if afd.is_valid() {
process_net_io_chain.push(Arc::new(IoClose::new(afd.fd())));
}
for afd in [&deferred_pipes.read, &deferred_pipes.write]
.into_iter()
.flatten()
{
process_net_io_chain.push(Arc::new(IoClose::new(afd.as_raw_fd())));
}
if p.typ != ProcessType::block_node {

View File

@@ -38,9 +38,9 @@ pub enum ItemWakeReason {
/// Importantly this is async signal safe. Of course it is `CLO_EXEC` as well.
pub struct FdEventSignaller {
// Always the read end of the fd; maybe the write end as well.
fd: AutoCloseFd,
fd: OwnedFd,
#[cfg(not(HAVE_EVENTFD))]
write: AutoCloseFd,
write: OwnedFd,
}
impl FdEventSignaller {
@@ -56,7 +56,7 @@ pub fn new() -> Self {
exit_without_destructors(1);
}
Self {
fd: AutoCloseFd::new(fd),
fd: unsafe { OwnedFd::from_raw_fd(fd) },
}
}
#[cfg(not(HAVE_EVENTFD))]
@@ -66,8 +66,8 @@ pub fn new() -> Self {
perror("pipe");
exit_without_destructors(1);
};
make_fd_nonblocking(pipes.read.fd()).unwrap();
make_fd_nonblocking(pipes.write.fd()).unwrap();
make_fd_nonblocking(pipes.read.as_raw_fd()).unwrap();
make_fd_nonblocking(pipes.write.as_raw_fd()).unwrap();
Self {
fd: pipes.read,
write: pipes.write,
@@ -77,7 +77,7 @@ pub fn new() -> Self {
/// \return the fd to read from, for notification.
pub fn read_fd(&self) -> RawFd {
self.fd.fd()
self.fd.as_raw_fd()
}
/// If an event is signalled, consume it; otherwise return.
@@ -169,9 +169,9 @@ pub fn poll(&self, wait: bool /* = false */) -> bool {
/// \return the fd to write to.
fn write_fd(&self) -> RawFd {
#[cfg(HAVE_EVENTFD)]
return self.fd.fd();
return self.fd.as_raw_fd();
#[cfg(not(HAVE_EVENTFD))]
return self.write.fd();
return self.write.as_raw_fd();
}
}

View File

@@ -120,13 +120,12 @@ fn drop(&mut self) {
}
/// Helper type returned from make_autoclose_pipes.
#[derive(Default)]
pub struct AutoClosePipes {
/// Read end of the pipe.
pub read: AutoCloseFd,
pub read: OwnedFd,
/// Write end of the pipe.
pub write: AutoCloseFd,
pub write: OwnedFd,
}
/// Construct a pair of connected pipes, set to close-on-exec.
@@ -152,19 +151,12 @@ pub fn make_autoclose_pipes() -> Option<AutoClosePipes> {
return None;
}
let readp = AutoCloseFd::new(pipes[0]);
let writep = AutoCloseFd::new(pipes[1]);
let readp = unsafe { OwnedFd::from_raw_fd(pipes[0]) };
let writep = unsafe { OwnedFd::from_raw_fd(pipes[1]) };
// Ensure our fds are out of the user range.
let readp = heightenize_fd(readp, already_cloexec);
if !readp.is_valid() {
return None;
}
let writep = heightenize_fd(writep, already_cloexec);
if !writep.is_valid() {
return None;
}
let readp = heightenize_fd(readp, already_cloexec)?;
let writep = heightenize_fd(writep, already_cloexec)?;
Some(AutoClosePipes {
read: readp,
@@ -178,24 +170,23 @@ pub fn make_autoclose_pipes() -> Option<AutoClosePipes> {
/// setting it again.
/// \return the fd, which always has CLOEXEC set; or an invalid fd on failure, in
/// which case an error will have been printed, and the input fd closed.
fn heightenize_fd(fd: AutoCloseFd, input_has_cloexec: bool) -> AutoCloseFd {
// Check if the fd is invalid or already in our high range.
if !fd.is_valid() {
return fd;
}
if fd.fd() >= FIRST_HIGH_FD {
fn heightenize_fd(fd: OwnedFd, input_has_cloexec: bool) -> Option<OwnedFd> {
let raw_fd = fd.as_raw_fd();
if raw_fd >= FIRST_HIGH_FD {
if !input_has_cloexec {
set_cloexec(fd.fd(), true);
set_cloexec(raw_fd, true);
}
return fd;
return Some(fd);
}
// Here we are asking the kernel to give us a cloexec fd.
let newfd = unsafe { libc::fcntl(fd.fd(), F_DUPFD_CLOEXEC, FIRST_HIGH_FD) };
let newfd = unsafe { libc::fcntl(raw_fd, F_DUPFD_CLOEXEC, FIRST_HIGH_FD) };
if newfd < 0 {
perror("fcntl");
return AutoCloseFd::default();
return None;
}
return AutoCloseFd::new(newfd);
Some(unsafe { OwnedFd::from_raw_fd(newfd) })
}
/// Sets CLO_EXEC on a given fd according to the value of \p should_set.
@@ -306,7 +297,8 @@ fn test_pipes() {
}
}
for pipe in pipes {
for fd in [pipe.read.fd(), pipe.write.fd()] {
for fd in [&pipe.read, &pipe.write] {
let fd = fd.as_raw_fd();
assert!(fd >= FIRST_HIGH_FD);
let flags = unsafe { libc::fcntl(fd, F_GETFD, 0) };
assert!(flags >= 0);

View File

@@ -21,7 +21,7 @@
use nix::fcntl::OFlag;
use nix::sys::stat::Mode;
use std::cell::{RefCell, UnsafeCell};
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::os::fd::{AsRawFd, IntoRawFd, OwnedFd, RawFd};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
@@ -296,13 +296,12 @@ fn as_ptr(&self) -> *const () {
pub struct IoPipe {
fd: RawFd,
// The pipe's fd. Conceptually this is dup2'd to io_data_t::fd.
pipe_fd: AutoCloseFd,
pipe_fd: OwnedFd,
/// Whether this is an input pipe. This is used only for informational purposes.
is_input: bool,
}
impl IoPipe {
pub fn new(fd: RawFd, is_input: bool, pipe_fd: AutoCloseFd) -> Self {
assert!(pipe_fd.is_valid(), "Pipe is not valid");
pub fn new(fd: RawFd, is_input: bool, pipe_fd: OwnedFd) -> Self {
IoPipe {
fd,
pipe_fd,
@@ -318,7 +317,7 @@ fn fd(&self) -> RawFd {
self.fd
}
fn source_fd(&self) -> RawFd {
self.pipe_fd.fd()
self.pipe_fd.as_raw_fd()
}
fn print(&self) {
eprintf!(
@@ -338,7 +337,7 @@ pub struct IoBufferfill {
target: RawFd,
/// Write end. The other end is connected to an io_buffer_t.
write_fd: AutoCloseFd,
write_fd: OwnedFd,
/// The receiving buffer.
buffer: Arc<IoBuffer>,
@@ -361,7 +360,7 @@ pub fn create_opts(buffer_limit: usize, target: RawFd) -> Option<Arc<IoBufferfil
// Our buffer will read from the read end of the pipe. This end must be non-blocking. This is
// because our fillthread needs to poll to decide if it should shut down, and also accept input
// from direct buffer transfers.
match make_fd_nonblocking(pipes.read.fd()) {
match make_fd_nonblocking(pipes.read.as_raw_fd()) {
Ok(_) => (),
Err(e) => {
FLOG!(warning, PIPE_ERROR);
@@ -372,7 +371,6 @@ pub fn create_opts(buffer_limit: usize, target: RawFd) -> Option<Arc<IoBufferfil
// Our fillthread gets the read end of the pipe; out_pipe gets the write end.
let buffer = Arc::new(IoBuffer::new(buffer_limit));
begin_filling(&buffer, pipes.read);
assert!(pipes.write.is_valid(), "fd is not valid");
Some(Arc::new(IoBufferfill {
target,
write_fd: pipes.write,
@@ -408,10 +406,14 @@ fn fd(&self) -> RawFd {
self.target
}
fn source_fd(&self) -> RawFd {
self.write_fd.fd()
self.write_fd.as_raw_fd()
}
fn print(&self) {
eprintf!("bufferfill %d -> %d\n", self.write_fd.fd(), self.fd())
eprintf!(
"bufferfill %d -> %d\n",
self.write_fd.as_raw_fd(),
self.fd()
)
}
fn as_ptr(&self) -> *const () {
(self as *const Self).cast()
@@ -533,7 +535,7 @@ pub fn fillthread_running(&self) -> bool {
}
/// Begin the fill operation, reading from the given fd in the background.
fn begin_filling(iobuffer: &Arc<IoBuffer>, fd: AutoCloseFd) {
fn begin_filling(iobuffer: &Arc<IoBuffer>, fd: OwnedFd) {
assert!(!iobuffer.fillthread_running(), "Already have a fillthread");
// We want to fill buffer_ by reading from fd. fd is the read end of a pipe; the write end is
@@ -570,14 +572,14 @@ fn begin_filling(iobuffer: &Arc<IoBuffer>, fd: AutoCloseFd) {
if reason == ItemWakeReason::Readable {
// select() reported us as readable; read a bit.
let mut buf = iobuffer.buffer.lock().unwrap();
let ret = IoBuffer::read_once(fd.fd(), &mut buf);
let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf);
done = ret == 0 || (ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno::errno().0));
} else if iobuffer.shutdown_fillthread.load() {
// Here our caller asked us to shut down; read while we keep getting data.
// This will stop when the fd is closed or if we get EAGAIN.
let mut buf = iobuffer.buffer.lock().unwrap();
loop {
let ret = IoBuffer::read_once(fd.fd(), &mut buf);
let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf);
if ret <= 0 {
break;
}
@@ -599,6 +601,7 @@ fn begin_filling(iobuffer: &Arc<IoBuffer>, fd: AutoCloseFd) {
})
};
let fd = AutoCloseFd::new(fd.into_raw_fd());
let item_id = fd_monitor().add(FdMonitorItem::new(fd, None, item_callback));
iobuffer.item_id.store(u64::from(item_id), Ordering::SeqCst);
}

View File

@@ -1,5 +1,6 @@
use std::fs::File;
use std::io::Write;
use std::os::fd::AsRawFd;
use std::os::fd::{AsRawFd, IntoRawFd};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -21,7 +22,7 @@ struct ItemMaker {
pub total_calls: AtomicUsize,
item_id: AtomicU64,
pub always_exit: bool,
pub writer: Mutex<AutoCloseFd>,
pub writer: Mutex<Option<File>>,
}
impl ItemMaker {
@@ -43,7 +44,7 @@ pub fn insert_new_into2<F: Fn(&mut Self)>(
total_calls: 0.into(),
item_id: 0.into(),
always_exit: false,
writer: Mutex::new(pipes.write),
writer: Mutex::new(Some(File::from(pipes.write))),
};
config(&mut result);
@@ -53,7 +54,8 @@ pub fn insert_new_into2<F: Fn(&mut Self)>(
let result = Arc::clone(&result);
move |fd: &mut AutoCloseFd, reason: ItemWakeReason| result.callback(fd, reason)
};
let item = FdMonitorItem::new(pipes.read, timeout, Box::new(callback));
let fd = AutoCloseFd::new(pipes.read.into_raw_fd());
let item = FdMonitorItem::new(fd, timeout, Box::new(callback));
let item_id = monitor.add(item);
result.item_id.store(u64::from(item_id), Ordering::Relaxed);
@@ -97,6 +99,8 @@ fn write42(&self) {
let buf = [0u8; 42];
let mut writer = self.writer.lock().expect("Mutex poisoned!");
writer
.as_mut()
.unwrap()
.write_all(&buf)
.expect("Error writing 42 bytes to pipe!");
}
@@ -137,11 +141,7 @@ fn fd_monitor_items() {
item42_timeout.write42();
item42_no_timeout.write42();
item42_then_close.write42();
item42_then_close
.writer
.lock()
.expect("Mutex poisoned!")
.close();
*item42_then_close.writer.lock().expect("Mutex poisoned!") = None;
item_oneshot.write42();
monitor.poke_item(item_pokee.item_id());

View File

@@ -28,6 +28,7 @@
use nix::errno::Errno;
use nix::unistd;
use std::cell::{Cell, UnsafeCell};
use std::os::fd::AsRawFd;
use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Condvar, Mutex, MutexGuard};
@@ -187,7 +188,7 @@ pub fn new() -> binary_semaphore_t {
// we'll never receive SIGCHLD and so deadlock. So if tsan is enabled, we mark our fd as
// non-blocking (so reads will never block) and use select() to poll it.
if cfg!(feature = "FISH_TSAN_WORKAROUNDS") {
let _ = make_fd_nonblocking(pipes.read.fd());
let _ = make_fd_nonblocking(pipes.read.as_raw_fd());
}
Self::Pipes(pipes)
@@ -207,7 +208,7 @@ pub fn post(&self) {
Self::Pipes(pipes) => {
// Write exactly one byte.
loop {
match unistd::write(pipes.write.fd(), &[0]) {
match unistd::write(pipes.write.as_raw_fd(), &[0]) {
Err(Errno::EINTR) => continue,
Err(_) => self.die("write"),
Ok(_) => break,
@@ -232,7 +233,7 @@ pub fn wait(&self) {
}
}
Self::Pipes(pipes) => {
let fd = pipes.read.fd();
let fd = pipes.read.as_raw_fd();
// We must read exactly one byte.
loop {
// Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read()