From 659c926dbda366710d4416b03b830dc54145bf57 Mon Sep 17 00:00:00 2001 From: Peter Ammon Date: Fri, 27 Dec 2024 14:07:17 -0800 Subject: [PATCH] Additional cleanup of io buffering Eliminate some ugly bits. No functional change expected. --- src/exec.rs | 2 +- src/io.rs | 119 +++++++++++++++++----------------------------------- 2 files changed, 40 insertions(+), 81 deletions(-) diff --git a/src/exec.rs b/src/exec.rs index 0d8362498..a38407249 100644 --- a/src/exec.rs +++ b/src/exec.rs @@ -766,7 +766,7 @@ fn create_output_stream_for_builtin( IoMode::bufferfill => { // Our IO redirection is to an internal buffer, e.g. a command substitution. // We will write directly to it. - let buffer = io.as_bufferfill().unwrap().buffer_ref(); + let buffer = io.as_bufferfill().unwrap().buffer(); OutputStream::Buffered(BufferedOutputStream::new(buffer.clone())) } IoMode::close => { diff --git a/src/io.rs b/src/io.rs index 67aa0ec1e..de6ea4d62 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,11 +1,10 @@ use crate::builtins::shared::{STATUS_CMD_ERROR, STATUS_CMD_OK, STATUS_READ_TOO_MUCH}; use crate::common::{str2wcstring, wcs2string, EMPTY_STRING}; -use crate::fd_monitor::{Callback, FdMonitor}; +use crate::fd_monitor::{Callback, FdMonitor, FdMonitorItemId}; use crate::fds::{ make_autoclose_pipes, make_fd_nonblocking, wopen_cloexec, AutoCloseFd, PIPE_ERROR, }; use crate::flog::{should_flog, FLOG, FLOGF}; -use crate::global_safety::RelaxedAtomicBool; use crate::nix::isatty; use crate::path::path_apply_working_directory; use crate::proc::JobGroupRef; @@ -19,14 +18,9 @@ use nix::fcntl::OFlag; use nix::sys::stat::Mode; use once_cell::sync::Lazy; -#[cfg(not(target_has_atomic = "64"))] -use portable_atomic::AtomicU64; use std::fs::File; use std::io; use std::os::fd::{AsRawFd, IntoRawFd, OwnedFd, RawFd}; -#[cfg(target_has_atomic = "64")] -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex, MutexGuard}; /// separated_buffer_t represents a buffer of output from commands, prepared to be turned into a @@ -344,7 +338,10 @@ pub struct IoBufferfill { write_fd: OwnedFd, /// The receiving buffer. - buffer: Arc, + buffer: IoBuffer, + + /// The id of the item in the fd monitor, used to stop the background fillthread operation. + item_id: FdMonitorItemId, } impl IoBufferfill { /// Create an IoBufferfill which, when written from, fills a buffer with the contents. @@ -372,20 +369,17 @@ pub fn create_opts(buffer_limit: usize, target: RawFd) -> io::Result &Arc { - &self.buffer - } - pub fn buffer(&self) -> &IoBuffer { &self.buffer } @@ -394,12 +388,10 @@ pub fn buffer(&self) -> &IoBuffer { /// of the buffer. Return the buffer. pub fn finish(filler: Arc) -> SeparatedBuffer { // The io filler is passed in. This typically holds the only instance of the write side of the - // pipe used by the buffer's fillthread (except for that side held by other processes). Get the - // buffer out of the bufferfill and clear the shared_ptr; this will typically widow the pipe. + // pipe used by the buffer's fillthread (except for that side held by other processes). // Then allow the buffer to finish. - filler - .buffer - .complete_background_fillthread_and_take_buffer() + let fd = fd_monitor().remove_item(filler.item_id); + filler.buffer.complete_and_take_buffer(fd) } } impl IoData for IoBufferfill { @@ -427,40 +419,24 @@ fn as_bufferfill(&self) -> Option<&IoBufferfill> { } } -/// An IoBuffer is a buffer which can populate itself by reading from an fd. -/// It is not an IoData. -pub struct IoBuffer { - /// Buffer storing what we have read. - buffer: Mutex, - - /// Atomic flag indicating our fillthread is running. - fillthread_running: RelaxedAtomicBool, - - /// The item id of our background fillthread fd monitor item. - item_id: AtomicU64, -} - -// safety: todo!("rationalize why fill_waiter is safe") -unsafe impl Send for IoBuffer {} -unsafe impl Sync for IoBuffer {} +/// Type wrapping a lock-protected separated buffer. +#[derive(Clone)] +pub struct IoBuffer(Arc>); impl IoBuffer { - pub fn new(limit: usize) -> Self { - IoBuffer { - buffer: Mutex::new(SeparatedBuffer::new(limit)), - fillthread_running: RelaxedAtomicBool::new(false), - item_id: AtomicU64::new(0), - } + /// Create a new IoBuffer. + fn new(buffer_limit: usize) -> Self { + IoBuffer(Arc::new(Mutex::new(SeparatedBuffer::new(buffer_limit)))) } /// Append a string to the buffer. pub fn append(&self, data: &[u8], typ: SeparationType) -> bool { - self.buffer.lock().unwrap().append(data, typ) + self.0.lock().unwrap().append(data, typ) } /// Return true if output was discarded due to exceeding the read limit. pub fn discarded(&self) -> bool { - self.buffer.lock().unwrap().discarded() + self.0.lock().unwrap().discarded() } /// Read some, filling the buffer. The buffer is passed in to enforce that the append lock is @@ -497,18 +473,10 @@ pub fn read_once(fd: RawFd, buffer: &mut MutexGuard<'_, SeparatedBuffer>) -> isi } /// End the background fillthread operation, and return the buffer, transferring ownership. - pub fn complete_background_fillthread_and_take_buffer(&self) -> SeparatedBuffer { - // Mark that our fillthread is done, then wake it up. - assert!(self.fillthread_running.load(), "Should have a fillthread"); - assert!( - self.item_id.load(Ordering::SeqCst) != 0, - "Should have a valid item ID" - ); - let item_id = self.item_id.load(Ordering::SeqCst).into(); - let fd = fd_monitor().remove_item(item_id); - + /// The read end of the pipe is provided. + pub fn complete_and_take_buffer(&self, fd: AutoCloseFd) -> SeparatedBuffer { // Read any remaining data from the pipe. - let mut locked_buff = self.buffer.lock().unwrap(); + let mut locked_buff = self.0.lock().unwrap(); while fd.is_valid() && IoBuffer::read_once(fd.as_raw_fd(), &mut locked_buff) > 0 { // pass } @@ -522,16 +490,11 @@ pub fn complete_background_fillthread_and_take_buffer(&self) -> SeparatedBuffer } /// Begin the fill operation, reading from the given fd in the background. -fn begin_filling(iobuffer: &Arc, fd: OwnedFd) { - assert!( - !iobuffer.fillthread_running.load(), - "Already have a fillthread" - ); - iobuffer.fillthread_running.store(true); - - // We want to fill buffer_ by reading from fd. fd is the read end of a pipe; the write end is +/// Return item ID of the newly created item in the fd monitor. +fn begin_filling(iobuffer: IoBuffer, fd: OwnedFd) -> FdMonitorItemId { + // We want to fill iobuffer by reading from fd. fd is the read end of a pipe; the write end is // owned by another process, or something else writing in fish. - // Pass fd to an fd_monitor. It will add fd to its select() loop, and give us a callback when + // Pass fd to the FdMonitor. It will add fd to its select() loop, and give us a callback when // the fd is readable. The usual path is that we will get called back, read a bit from the fd, // and append it to the buffer. Eventually the write end of the pipe will be closed - probably // the other process exited - and fd will be widowed; read() will then return 0 and we will stop @@ -543,22 +506,18 @@ fn begin_filling(iobuffer: &Arc, fd: OwnedFd) { // In this case, when complete_background_fillthread() is called, we grab the file descriptor // and read until we get EAGAIN and then give up. // Run our function to read until the receiver is closed. - let item_callback: Callback = { - let iobuffer = iobuffer.clone(); - Box::new(move |fd: &mut AutoCloseFd| { - assert!(fd.as_raw_fd() >= 0, "Invalid fd"); - let mut buf = iobuffer.buffer.lock().unwrap(); - let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf); - if ret == 0 || (ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno::errno().0)) { - // Either it's finished or some other error - we're done. - fd.close(); - } - }) - }; + let item_callback: Callback = Box::new(move |fd: &mut AutoCloseFd| { + assert!(fd.as_raw_fd() >= 0, "Invalid fd"); + let mut buf = iobuffer.0.lock().unwrap(); + let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf); + if ret == 0 || (ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno::errno().0)) { + // Either it's finished or some other error - we're done. + fd.close(); + } + }); let fd = AutoCloseFd::new(fd.into_raw_fd()); - let item_id = fd_monitor().add(fd, item_callback); - iobuffer.item_id.store(u64::from(item_id), Ordering::SeqCst); + fd_monitor().add(fd, item_callback) } pub type IoDataRef = Arc; @@ -892,10 +851,10 @@ fn contents(&self) -> &wstr { /// An output stream for builtins which writes into a separated buffer. pub struct BufferedOutputStream { /// The buffer we are filling. - buffer: Arc, + buffer: IoBuffer, } impl BufferedOutputStream { - pub fn new(buffer: Arc) -> Self { + pub fn new(buffer: IoBuffer) -> Self { Self { buffer } } fn append(&mut self, s: &wstr) -> bool {