Additional cleanup of io buffering

Eliminate some ugly bits. No functional change expected.
This commit is contained in:
Peter Ammon
2024-12-27 14:07:17 -08:00
parent 56da15d11f
commit 659c926dbd
2 changed files with 40 additions and 81 deletions

View File

@@ -766,7 +766,7 @@ fn create_output_stream_for_builtin(
IoMode::bufferfill => {
// Our IO redirection is to an internal buffer, e.g. a command substitution.
// We will write directly to it.
let buffer = io.as_bufferfill().unwrap().buffer_ref();
let buffer = io.as_bufferfill().unwrap().buffer();
OutputStream::Buffered(BufferedOutputStream::new(buffer.clone()))
}
IoMode::close => {

119
src/io.rs
View File

@@ -1,11 +1,10 @@
use crate::builtins::shared::{STATUS_CMD_ERROR, STATUS_CMD_OK, STATUS_READ_TOO_MUCH};
use crate::common::{str2wcstring, wcs2string, EMPTY_STRING};
use crate::fd_monitor::{Callback, FdMonitor};
use crate::fd_monitor::{Callback, FdMonitor, FdMonitorItemId};
use crate::fds::{
make_autoclose_pipes, make_fd_nonblocking, wopen_cloexec, AutoCloseFd, PIPE_ERROR,
};
use crate::flog::{should_flog, FLOG, FLOGF};
use crate::global_safety::RelaxedAtomicBool;
use crate::nix::isatty;
use crate::path::path_apply_working_directory;
use crate::proc::JobGroupRef;
@@ -19,14 +18,9 @@
use nix::fcntl::OFlag;
use nix::sys::stat::Mode;
use once_cell::sync::Lazy;
#[cfg(not(target_has_atomic = "64"))]
use portable_atomic::AtomicU64;
use std::fs::File;
use std::io;
use std::os::fd::{AsRawFd, IntoRawFd, OwnedFd, RawFd};
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex, MutexGuard};
/// separated_buffer_t represents a buffer of output from commands, prepared to be turned into a
@@ -344,7 +338,10 @@ pub struct IoBufferfill {
write_fd: OwnedFd,
/// The receiving buffer.
buffer: Arc<IoBuffer>,
buffer: IoBuffer,
/// The id of the item in the fd monitor, used to stop the background fillthread operation.
item_id: FdMonitorItemId,
}
impl IoBufferfill {
/// Create an IoBufferfill which, when written from, fills a buffer with the contents.
@@ -372,20 +369,17 @@ pub fn create_opts(buffer_limit: usize, target: RawFd) -> io::Result<Arc<IoBuffe
return Err(e);
}
}
// Our fillthread gets the read end of the pipe; out_pipe gets the write end.
let buffer = Arc::new(IoBuffer::new(buffer_limit));
begin_filling(&buffer, pipes.read);
// Our fillthread gets the read end of the pipe. Our returned Bufferfill gets the write end.
let buffer = IoBuffer::new(buffer_limit);
let item_id = begin_filling(buffer.clone(), pipes.read);
Ok(Arc::new(IoBufferfill {
target,
write_fd: pipes.write,
buffer,
item_id,
}))
}
pub fn buffer_ref(&self) -> &Arc<IoBuffer> {
&self.buffer
}
pub fn buffer(&self) -> &IoBuffer {
&self.buffer
}
@@ -394,12 +388,10 @@ pub fn buffer(&self) -> &IoBuffer {
/// of the buffer. Return the buffer.
pub fn finish(filler: Arc<IoBufferfill>) -> SeparatedBuffer {
// The io filler is passed in. This typically holds the only instance of the write side of the
// pipe used by the buffer's fillthread (except for that side held by other processes). Get the
// buffer out of the bufferfill and clear the shared_ptr; this will typically widow the pipe.
// pipe used by the buffer's fillthread (except for that side held by other processes).
// Then allow the buffer to finish.
filler
.buffer
.complete_background_fillthread_and_take_buffer()
let fd = fd_monitor().remove_item(filler.item_id);
filler.buffer.complete_and_take_buffer(fd)
}
}
impl IoData for IoBufferfill {
@@ -427,40 +419,24 @@ fn as_bufferfill(&self) -> Option<&IoBufferfill> {
}
}
/// An IoBuffer is a buffer which can populate itself by reading from an fd.
/// It is not an IoData.
pub struct IoBuffer {
/// Buffer storing what we have read.
buffer: Mutex<SeparatedBuffer>,
/// Atomic flag indicating our fillthread is running.
fillthread_running: RelaxedAtomicBool,
/// The item id of our background fillthread fd monitor item.
item_id: AtomicU64,
}
// safety: todo!("rationalize why fill_waiter is safe")
unsafe impl Send for IoBuffer {}
unsafe impl Sync for IoBuffer {}
/// Type wrapping a lock-protected separated buffer.
#[derive(Clone)]
pub struct IoBuffer(Arc<Mutex<SeparatedBuffer>>);
impl IoBuffer {
pub fn new(limit: usize) -> Self {
IoBuffer {
buffer: Mutex::new(SeparatedBuffer::new(limit)),
fillthread_running: RelaxedAtomicBool::new(false),
item_id: AtomicU64::new(0),
}
/// Create a new IoBuffer.
fn new(buffer_limit: usize) -> Self {
IoBuffer(Arc::new(Mutex::new(SeparatedBuffer::new(buffer_limit))))
}
/// Append a string to the buffer.
pub fn append(&self, data: &[u8], typ: SeparationType) -> bool {
self.buffer.lock().unwrap().append(data, typ)
self.0.lock().unwrap().append(data, typ)
}
/// Return true if output was discarded due to exceeding the read limit.
pub fn discarded(&self) -> bool {
self.buffer.lock().unwrap().discarded()
self.0.lock().unwrap().discarded()
}
/// Read some, filling the buffer. The buffer is passed in to enforce that the append lock is
@@ -497,18 +473,10 @@ pub fn read_once(fd: RawFd, buffer: &mut MutexGuard<'_, SeparatedBuffer>) -> isi
}
/// End the background fillthread operation, and return the buffer, transferring ownership.
pub fn complete_background_fillthread_and_take_buffer(&self) -> SeparatedBuffer {
// Mark that our fillthread is done, then wake it up.
assert!(self.fillthread_running.load(), "Should have a fillthread");
assert!(
self.item_id.load(Ordering::SeqCst) != 0,
"Should have a valid item ID"
);
let item_id = self.item_id.load(Ordering::SeqCst).into();
let fd = fd_monitor().remove_item(item_id);
/// The read end of the pipe is provided.
pub fn complete_and_take_buffer(&self, fd: AutoCloseFd) -> SeparatedBuffer {
// Read any remaining data from the pipe.
let mut locked_buff = self.buffer.lock().unwrap();
let mut locked_buff = self.0.lock().unwrap();
while fd.is_valid() && IoBuffer::read_once(fd.as_raw_fd(), &mut locked_buff) > 0 {
// pass
}
@@ -522,16 +490,11 @@ pub fn complete_background_fillthread_and_take_buffer(&self) -> SeparatedBuffer
}
/// Begin the fill operation, reading from the given fd in the background.
fn begin_filling(iobuffer: &Arc<IoBuffer>, fd: OwnedFd) {
assert!(
!iobuffer.fillthread_running.load(),
"Already have a fillthread"
);
iobuffer.fillthread_running.store(true);
// We want to fill buffer_ by reading from fd. fd is the read end of a pipe; the write end is
/// Return item ID of the newly created item in the fd monitor.
fn begin_filling(iobuffer: IoBuffer, fd: OwnedFd) -> FdMonitorItemId {
// We want to fill iobuffer by reading from fd. fd is the read end of a pipe; the write end is
// owned by another process, or something else writing in fish.
// Pass fd to an fd_monitor. It will add fd to its select() loop, and give us a callback when
// Pass fd to the FdMonitor. It will add fd to its select() loop, and give us a callback when
// the fd is readable. The usual path is that we will get called back, read a bit from the fd,
// and append it to the buffer. Eventually the write end of the pipe will be closed - probably
// the other process exited - and fd will be widowed; read() will then return 0 and we will stop
@@ -543,22 +506,18 @@ fn begin_filling(iobuffer: &Arc<IoBuffer>, fd: OwnedFd) {
// In this case, when complete_background_fillthread() is called, we grab the file descriptor
// and read until we get EAGAIN and then give up.
// Run our function to read until the receiver is closed.
let item_callback: Callback = {
let iobuffer = iobuffer.clone();
Box::new(move |fd: &mut AutoCloseFd| {
assert!(fd.as_raw_fd() >= 0, "Invalid fd");
let mut buf = iobuffer.buffer.lock().unwrap();
let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf);
if ret == 0 || (ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno::errno().0)) {
// Either it's finished or some other error - we're done.
fd.close();
}
})
};
let item_callback: Callback = Box::new(move |fd: &mut AutoCloseFd| {
assert!(fd.as_raw_fd() >= 0, "Invalid fd");
let mut buf = iobuffer.0.lock().unwrap();
let ret = IoBuffer::read_once(fd.as_raw_fd(), &mut buf);
if ret == 0 || (ret < 0 && ![EAGAIN, EWOULDBLOCK].contains(&errno::errno().0)) {
// Either it's finished or some other error - we're done.
fd.close();
}
});
let fd = AutoCloseFd::new(fd.into_raw_fd());
let item_id = fd_monitor().add(fd, item_callback);
iobuffer.item_id.store(u64::from(item_id), Ordering::SeqCst);
fd_monitor().add(fd, item_callback)
}
pub type IoDataRef = Arc<dyn IoDataSync>;
@@ -892,10 +851,10 @@ fn contents(&self) -> &wstr {
/// An output stream for builtins which writes into a separated buffer.
pub struct BufferedOutputStream {
/// The buffer we are filling.
buffer: Arc<IoBuffer>,
buffer: IoBuffer,
}
impl BufferedOutputStream {
pub fn new(buffer: Arc<IoBuffer>) -> Self {
pub fn new(buffer: IoBuffer) -> Self {
Self { buffer }
}
fn append(&mut self, s: &wstr) -> bool {